В настоящее время я пишу свой собственный плагин для Akka
SyncWriteJournal
API для реализации соединения с HSQLDB
.
Проблема в том, что я не понимаю требований к методу doAsyncReplayMessages
. В нем говорится, что ему нужно вернуть будущее и что все сообщения должны вызываться replayCallback
.
Допустим, у меня есть запрос, который возвращает список сообщений: List<Message> messages
. Может ли кто-нибудь предоставить минимальный пример (с объяснением) того, как использовать replayCallback
и Future
для правильной реализации метода с использованием этого списка? Как replayCallback
и Future
будут работать вместе и что должен возвращать метод doAsyncReplayMessages
?
Спасибо!
-Редактировать-
С помощью некоторых комментариев я набросал реализацию, которая не является полной, но включает в себя предложенную идею:
public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
final Procedure<PersistentRepr> replayCallback) {
final ExecutionContext ec = context().system().dispatcher();
final Future<Void> future = Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
final List<Message> messages = getMessages();
for (int i = 0; i < feedbackList.size(); i++) {
replayCallback.apply(
new PersistentImpl(messages.get(i), i, persistenceId, false, null, null));
}
return null;
}
}, ec);
return future;
}
Как вы могли видеть, в нем пропущено несколько ключевых понятий, которых я до сих пор не вижу. PersistentImpl требуется один аргумент Seq<String> confirm
, который все еще null
. И, возможно, что более важно, я возвращаю null
, так как будущее ожидает Void
в качестве возвращаемого типа, и я не уверен, как это реализовать. В настоящее время он выдает NPE:
[ERROR] [08/28/2014 12:31:19.582] [akkaSystem-akka.actor.default-dispatcher-7] [akka://akkaSystem/system/journal] null
java.lang.NullPointerException
at akka.persistence.journal.japi.AsyncRecovery.asyncReadHighestSequenceNr(AsyncRecovery.scala:26)
at akka.persistence.journal.SyncWriteJournal$$anonfun$receive$1.applyOrElse(SyncWriteJournal.scala:53)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.persistence.journal.japi.SyncWriteJournal.aroundReceive(SyncWriteJournal.scala:16)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)