Использование продолжений scala со слушателями netty / NIO

Я использую библиотеку Netty (версия 4 от GitHub). Он отлично работает в Scala, но я надеюсь, что моя библиотека сможет использовать стиль передачи продолжения для асинхронного ожидания.

Традиционно с Netty вы должны сделать что-то вроде этого (пример операции асинхронного подключения):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})

Если вы реализуете библиотеку (каковой я являюсь), то у вас в основном есть три простых варианта, позволяющих пользователю библиотеки выполнять какие-либо действия после установления соединения:

  1. Просто верните ChannelFuture из вашего метода подключения и позвольте пользователю разобраться с ним - это не обеспечивает большой абстракции от netty.
  2. Возьмите ChannelFutureListener в качестве параметра вашего метода подключения и добавьте его в качестве слушателя в ChannelFuture.
  3. Возьмите объект функции обратного вызова в качестве параметра вашего метода подключения и вызовите его из созданного вами ChannelFutureListener (это сделало бы стиль, управляемый обратным вызовом, чем-то вроде node.js).

Я пытаюсь сделать четвертый вариант; Я не включил его в счет выше, потому что это непросто.

Я хочу использовать продолжения с разделителями scala, чтобы использование библиотеки было чем-то вроде библиотеки блокировки, но за кулисами она не будет блокироваться:

class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}

Представьте, что другие операции чтения / записи реализуются таким же образом. Цель этого состоит в том, чтобы код пользователя мог выглядеть примерно так:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}

Другими словами, программа будет выглядеть как простая программа в стиле блокировки, но за кулисами не будет никакой блокировки или потоковой передачи.

Проблема, с которой я сталкиваюсь, заключается в том, что я не совсем понимаю, как работает типизация разделенных продолжений. Когда я пытаюсь реализовать это описанным выше способом, компилятор жалуется, что моя реализация operationComplete фактически возвращает Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] вместо Unit. Я понимаю, что в scala CPS есть своего рода "ошибка" в том, что вы должны аннотировать возвращаемый тип метода shift с помощью @suspendable, который передается вверх по стеку вызовов до reset, но, похоже, нет никакого способа согласовать это с уже существующей библиотекой Java, которая не имеет концепции разграниченных продолжений.

Я чувствую, что действительно должен быть способ обойти это - если Swarm может сериализовать продолжения и заглушить их по сети для вычисления в другом месте, тогда должна быть возможность просто вызвать продолжение из уже существующего класса Java. Но я не могу понять, как это можно сделать. Придется ли мне полностью переписать netty на Scala, чтобы это произошло?


person Jeremy    schedule 07.02.2012    source источник
comment
Я не знаю, как исправить материал Scala, но я против вашей идеи. Позвольте мне рассказать вам, почему. Но, чтобы пользователь не знал об асинхронной природе вашей библиотеки, вы скажете ему, что это нормально, блокирующие вызовы в коде слушателя. Фактически он не знал бы, что он даже написал свой код в слушателе. Выполнение блокирующего вызова в слушателе может привести ко всевозможным проблемам. Проблема, с которой вы столкнетесь в большинстве случаев, заключается в том, что он замедляет другие io-задачи и, таким образом, ограничивает пропускную способность.   -  person Norman Maurer    schedule 08.02.2012
comment
Вы правы, но я не согласен. Я думаю, что пользователь моей библиотеки, если есть хоть какие-то, кроме меня, вероятно, должен будет сначала понять, что делает reset, и, таким образом, поймет, что вызовы не блокируются. На самом деле это всего лишь способ: A) получить более глубокое понимание продолжений с разделителями и B) поэкспериментировать с написанием кода, основанного на обратном вызове, более чистым способом.   -  person Jeremy    schedule 08.02.2012


Ответы (1)


Я нашел это объяснение продолжения Scala чрезвычайно полезным, когда я только начинал . В частности, обратите внимание на части, где он объясняет shift[A, B, C] и reset[B, C]. Добавление фиктивного null в качестве последнего оператора operationComplete должно помочь.

Кстати, вам нужно вызвать retrn() внутри другого reset, если внутри него может быть вложен shift.

Изменить: вот рабочий пример

import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}

с возможным выходом:

Outside reset
operationComplete starts
This will happen after the connection is finished
person shams    schedule 08.02.2012
comment
Фактически, это действительно радует компилятор и даже кажется, что он работает правильно. Ключ, я полагаю, в том, что вы переместили shift за пределы анонимного ChannelFutureListener и использовали закрытие для вызова продолжения изнутри operationComplete. Я не уверен, что понимаю, почему это работает, а другой - нет, но я возьму его. Спасибо! - person Jeremy; 08.02.2012
comment
И это очень хорошее чтение о продолжениях scala. Им следует удалить бесполезные примеры продолжения со страницы scala-lang.org и заменить их статьей, на которую вы указали ссылку. - person Jeremy; 08.02.2012
comment
@Jeremy, кстати, разница между вашим кодом и моим в том, что я явно аннотировал возвращаемые типы некоторых методов. - person shams; 09.02.2012
comment
Я знаю, что мой код выглядит почти так же, как ваш в том, что я опубликовал. Но по какой-то причине это не то, что у меня было в моем реальном коде. Я потерял блок shift внутри реализации operationComplete, что вызывало ошибку компилятора. Я не понимал, что могу поставить shift снаружи и использовать закрытие, чтобы вызвать это. Хотя я ошибочно написал в своем вопросе :) - person Jeremy; 09.02.2012