У меня есть некоторый опыт работы с LMAX Disruptor, и мне бы очень хотелось реализовать собственный почтовый ящик актера, используя разрушитель.
Есть ли какие-либо рекомендации? Это вообще возможно? Каковы ограничения почтовых ящиков актеров Akka?
У меня есть некоторый опыт работы с LMAX Disruptor, и мне бы очень хотелось реализовать собственный почтовый ящик актера, используя разрушитель.
Есть ли какие-либо рекомендации? Это вообще возможно? Каковы ограничения почтовых ящиков актеров Akka?
Как сказано здесь, вам просто нужно реализовать несколько методов - конечно, вы должны писать/читать сообщения напрямую, используя ваш указатель на кольцевой буфер. Вы также должны иметь в виду:
Disruptor обычно предварительно выделяет большой объем памяти, поэтому использование одного Disruptor на каждого актора — плохая идея, вы можете использовать один роутер Router (с Disruptor внутри) в сочетании с BalancingPool
.
если вы хотите использовать разные типы сообщений, отдельные потребители для ведения журнала, восстановления и т. д. - вы должны передать другой экземпляр RingBufferPointer (подобный smthng) в качестве параметра в свой почтовый ящик (с одинаковым начальным значением для ведения журнала, другим начальным значением для разных типы сообщений), но по-прежнему использовать один Disruptor. Таким образом, разные почтовые ящики будут относиться к одному разрушителю.
вы потеряете низкоуровневый контроль над созданием, извлечением сообщений и т. д., поэтому по умолчанию не будет пакетного распределения.
вы также можете использовать историю из кольца, чтобы восстановить неисправное состояние актера (в preRestart
или в супервизоре).
Что говорит LMAX:
Он работает иначе, чем более традиционные подходы, поэтому вы используете его немного иначе, чем вы привыкли. Например, применить шаблон к вашей системе не так просто, как заменить все ваши очереди волшебным кольцевым буфером. У нас есть примеры кода, которые помогут вам, растущее число блогов и статей, дающих обзор того, как это работает, технический документ, как и следовало ожидать, содержит некоторые подробности, а тесты производительности дают примеры того, как использовать Disruptor. http://mechanitis.blogspot.com/2011/06/dissecting-disruptor-whats-so-special.html
И здесь краткое сравнение Queues/Disruptors/Actors
В псевдоскала-коде это будет примерно так:
object MyUnboundedMailbox {
val buffer = new RingBuffer()
class MyMessageQueue(val startPointer: Pointer, readerPointer: Pointer, writerPointer: Pointer) extends MessageQueue {
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
writerPointer.allocate(() => handle) //allocate one element and set, if you want different message types - you should allocate big amount of data before and block when it ends (to not interfere with another messages), so it has to be bounded queue then
}
def dequeue(): Envelope = readerPointer.poll()
def numberOfMessages: Int = writerPointer - readerPointer //should be synchronized
def hasMessages: Boolean = readerPointer == writerPointer //should be synchronized
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) { }
}
trait MyUnboundedMessageQueueSemantics
}
class MyUnboundedMailbox(settings: ActorSystem.Settings, config: Config) extends MailboxType
with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
import MyUnboundedMailbox._
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue = {
val pointer = ring.newPointer
val read = pointer.copy
val write = pointer.copy
new MyMessageQueue(pointer, read, write)
}
// you may use another strategy here based on owner (you can access name and path here),
// so for example may allocate same pointers for same prefixes in the name or path
}
Вы можете использовать неизмененный MyMessageQueue.startPointer для доступа к журналу сообщений во время восстановления после сбоя (вы также можете посмотреть Источник событий< akka /a> по аналогии).
Использование подхода UnboundedQueue не гарантирует доставку сообщения здесь, так как очень старое недоставленное сообщение может быть перезаписано новой версией, если кольцо «заканчивается», поэтому вам может понадобиться BoundedQueue, например здесь.