Как реализовать собственный почтовый ящик актера с помощью разрушителя?

У меня есть некоторый опыт работы с LMAX Disruptor, и мне бы очень хотелось реализовать собственный почтовый ящик актера, используя разрушитель.

Есть ли какие-либо рекомендации? Это вообще возможно? Каковы ограничения почтовых ящиков актеров Akka?


person herburos    schedule 06.01.2015    source источник


Ответы (1)


Как сказано здесь, вам просто нужно реализовать несколько методов - конечно, вы должны писать/читать сообщения напрямую, используя ваш указатель на кольцевой буфер. Вы также должны иметь в виду:

  • Disruptor обычно предварительно выделяет большой объем памяти, поэтому использование одного Disruptor на каждого актора — плохая идея, вы можете использовать один роутер Router (с Disruptor внутри) в сочетании с BalancingPool.

  • если вы хотите использовать разные типы сообщений, отдельные потребители для ведения журнала, восстановления и т. д. - вы должны передать другой экземпляр RingBufferPointer (подобный smthng) в качестве параметра в свой почтовый ящик (с одинаковым начальным значением для ведения журнала, другим начальным значением для разных типы сообщений), но по-прежнему использовать один Disruptor. Таким образом, разные почтовые ящики будут относиться к одному разрушителю.

  • вы потеряете низкоуровневый контроль над созданием, извлечением сообщений и т. д., поэтому по умолчанию не будет пакетного распределения.

  • вы также можете использовать историю из кольца, чтобы восстановить неисправное состояние актера (в preRestart или в супервизоре).

Что говорит LMAX:

Он работает иначе, чем более традиционные подходы, поэтому вы используете его немного иначе, чем вы привыкли. Например, применить шаблон к вашей системе не так просто, как заменить все ваши очереди волшебным кольцевым буфером. У нас есть примеры кода, которые помогут вам, растущее число блогов и статей, дающих обзор того, как это работает, технический документ, как и следовало ожидать, содержит некоторые подробности, а тесты производительности дают примеры того, как использовать Disruptor. http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html

И здесь краткое сравнение Queues/Disruptors/Actors

В псевдоскала-коде это будет примерно так:

object MyUnboundedMailbox {
  val buffer = new RingBuffer()

  class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue {

    // these should be implemented; queue used as example
    def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
      writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then  

    }
    def dequeue(): Envelope = readerPointer.poll()
    def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized
    def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { }
  }

  trait MyUnboundedMessageQueueSemantics 

}

class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType
  with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {

  import MyUnboundedMailbox._
  final override def create(owner: Option[ActorRef],
                            system: Option[ActorSystem]): MessageQueue = {

    val pointer = ring.newPointer
    val read = pointer.copy
    val write = pointer.copy
    new MyMessageQueue(pointer, read, write) 
  }
    // you may use another strategy here based on owner (you can access name and path here), 
    // so for example may allocate same pointers for same prefixes in the name or path 
}

Вы можете использовать неизмененный MyMessageQueue.startPointer для доступа к журналу сообщений во время восстановления после сбоя (вы также можете посмотреть Источник событий< akka /a> по аналогии).

Использование подхода UnboundedQueue не гарантирует доставку сообщения здесь, так как очень старое недоставленное сообщение может быть перезаписано новой версией, если кольцо «заканчивается», поэтому вам может понадобиться BoundedQueue, например здесь.

person dk14    schedule 06.01.2015