Пользовательский Java-плагин Akka Persistence

В настоящее время я пишу свой собственный плагин для Akka SyncWriteJournal API для реализации соединения с HSQLDB.

Проблема в том, что я не понимаю требований к методу doAsyncReplayMessages. В нем говорится, что ему нужно вернуть будущее и что все сообщения должны вызываться replayCallback.

Допустим, у меня есть запрос, который возвращает список сообщений: List<Message> messages. Может ли кто-нибудь предоставить минимальный пример (с объяснением) того, как использовать replayCallback и Future для правильной реализации метода с использованием этого списка? Как replayCallback и Future будут работать вместе и что должен возвращать метод doAsyncReplayMessages?

Спасибо!

-Редактировать-

С помощью некоторых комментариев я набросал реализацию, которая не является полной, но включает в себя предложенную идею:

public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
        final Procedure<PersistentRepr> replayCallback) {
    final ExecutionContext ec = context().system().dispatcher();

    final Future<Void> future = Futures.future(new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            final List<Message> messages = getMessages();
            for (int i = 0; i < feedbackList.size(); i++) {
                replayCallback.apply(
                        new PersistentImpl(messages.get(i), i, persistenceId, false, null, null));
            }
            return null;
        }
    }, ec);

    return future;
}

Как вы могли видеть, в нем пропущено несколько ключевых понятий, которых я до сих пор не вижу. PersistentImpl требуется один аргумент Seq<String> confirm, который все еще null. И, возможно, что более важно, я возвращаю null, так как будущее ожидает Void в качестве возвращаемого типа, и я не уверен, как это реализовать. В настоящее время он выдает NPE:

[ERROR] [08/28/2014 12:31:19.582] [akkaSystem-akka.actor.default-dispatcher-7] [akka://akkaSystem/system/journal] null
java.lang.NullPointerException
    at akka.persistence.journal.japi.AsyncRecovery.asyncReadHighestSequenceNr(AsyncRecovery.scala:26)
    at akka.persistence.journal.SyncWriteJournal$$anonfun$receive$1.applyOrElse(SyncWriteJournal.scala:53)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at akka.persistence.journal.japi.SyncWriteJournal.aroundReceive(SyncWriteJournal.scala:16)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

person user3354890    schedule 26.08.2014    source источник
comment
эй удалось? я хотел бы найти собственный пример плагина сохраняемости java akka   -  person bodrin    schedule 23.02.2017


Ответы (1)


Вы можете просто обернуть операцию блокировки в Future, например: Future { fetchStuff() }.

Вы можете обратиться к dnvriend/akka-persistence-jdbc: JdbcSyncWriteJournal для полноценной реализации синхронного журнала.

person Konrad 'ktoso' Malawski    schedule 26.08.2014
comment
Извините, я не умею читать scala (моя реализация на Java). Можете ли вы уточнить немного больше? - person user3354890; 26.08.2014
comment
Вот документы по использованию фьючерсов из Java: doc. akka.io/docs/akka/2.3.5/java/futures.html#Use_Directly Вы можете вызвать Futures.future(callable, executeContext), где вызываемый объект будет извлекать List‹Messages›, вызывать replyCallback для каждого из них, а затем закончить. Этот вызываемый объект должен иметь возвращаемый тип Void, чтобы соответствовать ожидаемому возвращаемому типу Future‹Void›. - person Endre Varga; 27.08.2014