Проектирование иерархии супервизора Akka

Обратите внимание: я разработчик Java и практически не знаю Scala (к сожалению). Я бы попросил, чтобы любые примеры кода, представленные в ответе, использовали Java API Akka.

Я новичок в Akka и актерах и пытаюсь настроить довольно простую систему актеров:

введите здесь описание изображения

Итак, актор DataSplitter запускается и разбивает довольно большой фрагмент двоичных данных, скажем, 20 ГБ, на фрагменты по 100 КБ. Для каждого фрагмента данные хранятся в DataCache через DataCacher. В фоновом режиме DataCacheCleaner копается в кеше и находит фрагменты данных, которые можно безопасно удалить. Таким образом мы предотвращаем увеличение размера кеша до 20 ГБ.

После отправки фрагмента в DataCacher для кэширования, DataSplitter затем уведомляет ProcessorPool о фрагменте, который теперь необходимо обработать. ProcessorPool – это маршрутизатор/пул, состоящий из десятков тысяч различных ProcessorActors. Когда каждый ProcessActor получает уведомление о «обработке» фрагмента данных размером 100 КБ, он затем извлекает данные из DataCacher и выполняет некоторую обработку.

Если вам интересно, почему я вообще здесь что-то кэширую (отсюда и DataCacher, DataCache и DataCacheCleaner), я думаю, что 100 КБ — это все еще довольно большое сообщение, которое можно передать десяткам тысяч актеров. экземпляров (100 КБ * 1000 = 100 МБ), поэтому я пытаюсь просто сохранить фрагмент размером 100 КБ один раз (в кеше), а затем предоставить каждому действующему лицу доступ к нему по ссылке через API кеша.

Также есть актор Mailman, который подписывается на шину событий и перехватывает все DeadLetters.

Итак, всего 6 актеров:

  • DataSplitter
  • DataCacher
  • DataCacheCleaner
  • ProcessorPool
  • ProcessorActor
  • Mailman

Документы Akka проповедуют, что вы должны разлагать свою систему акторов на основе разделения подзадач, а не только по функциям, но я не совсем понимаю, как это применимо здесь. Проблема в том, что я пытаюсь организовать иерархию супервайзеров между этими актерами, и я не уверен, каков наилучший/правильный подход. Очевидно, что ProcessorPool — это маршрутизатор, который должен быть родительским/супервизором для ProcessorActors, поэтому у нас есть известная иерархия:

/user/processorPool/
    processorActors

Но кроме этой известной/очевидной связи, я не уверен, как организовать остальных моих актеров. Я мог бы сделать их всех «одноранговыми» под одним общим/главным актером:

/user/master/
    dataSplitter/
    dataCacher/
    dataCacheCleaner/
    processorPool/
        processorActors/
    mailman/

Или я мог бы опустить master (корневой) актор и попытаться сделать вещи более вертикальными вокруг кеша:

/user/
    dataSplitter/
    cacheSupervisor/
        dataCacher/
        dataCacheCleaner/
    processorPool/
        processorActors/
    mailman/

Будучи таким новичком в Akka, я просто не уверен, как лучше поступить, и если кто-то может помочь с некоторой первоначальной поддержкой, я уверен, что все лампочки включатся. И, как бы ни была важна организация этой иерархии, я даже не уверен, какие конструкции API я могу использовать для фактического создания иерархии в коде.


person smeeb    schedule 16.06.2015    source источник
comment
Как вы спроектировали это, наконец?   -  person biniam    schedule 01.12.2015


Ответы (1)


Объединение их в один master упрощает управление, поскольку вы можете получить доступ ко всем участникам watched через супервайзера (в данном случае master).

Одна иерархическая реализация может быть:

Мастер-супервайзер Актер

class MasterSupervisor extends UntypedActor {

private static SupervisorStrategy strategy = new AllForOneStrategy(2,
        Duration.create(5, TimeUnit.MINUTES),

        new Function<Throwable, Directive>() {
            @Override
            public Directive apply(Throwable t) {

                if (t instanceof SQLException) {
                    log.error("Error: SQLException")
                    return restart()
                } else if (t instanceof IllegalArgumentException) {
                    log.error("Error: IllegalArgumentException")
                    return stop()
                } else {
                    log.error("Error: GeneralException")
                    return stop()
                }
            }
        });

@Override
public SupervisorStrategy supervisorStrategy() { return strategy }

@Override
void onReceive(Object message) throws Exception {
     if (message.equals("SPLIT")) {
          // CREATE A CHILD OF MyOtherSupervisor
          if (!dataSplitter) {
              dataSplitter = context().actorOf(FromConfig.getInstance().props(Props.create(DataSplitter.class)), "DataSplitter")

              // WATCH THE CHILD
              context().watch(dataSplitter)

              log.info("${self().path()} has created, watching and sent JobId = ${message} message to DataSplitter")
          }

          // do something with message such as Forward
          dataSplitter.forward(message, context())
      }
}

Актер DataSplitter

class DataSplitter extends UntypedActor {

    // Inject a Service to do the main operation
    DataSplitterService dataSplitterService

    @Override
    void onReceive(Object message) throws Exception {
        if (message.equals("SPLIT")) {
            log.info("${self().path()} recieved message: ${message} from ${sender()}")
            // do something with message such as Forward
            dataSplitterService.splitData()
        }
    }
}
person biniam    schedule 02.12.2015