Clojure: реализация интерфейса Java с отслеживанием состояния

Kafka Streams имеет интерфейс, Processor, реализация которого зависит от состояния. Дан пример реализации в руководстве разработчика есть:

public class WordCountProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, Long> kvStore;

  @Override
  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
      // keep the processor context locally because we need it in punctuate() and commit()
      this.context = context;

      // call this processor's punctuate() method every 1000 time units.
      this.context.schedule(1000);

      // retrieve the key-value store named "Counts"
      kvStore = (KeyValueStore) context.getStateStore("Counts");
  }

  @Override
  public void process(String dummy, String line) {
      String[] words = line.toLowerCase().split(" ");

      for (String word : words) {
          Long oldValue = kvStore.get(word);
          if (oldValue == null) {
              kvStore.put(word, 1L);
          } else {
              kvStore.put(word, oldValue + 1L);
          }
      }
  }

  @Override
  public void punctuate(long timestamp) {
      KeyValueIterator<String, Long> iter = this.kvStore.all();
      while (iter.hasNext()) {
          KeyValue<String, Long> entry = iter.next();
          context.forward(entry.key, entry.value.toString());
      }
      iter.close();
      // commit the current processing progress
      context.commit();
  }

  @Override
  public void close() {
      // close the key-value store
      kvStore.close();
  }

}

Метод init инициализирует внутреннее состояние WordCountProcessor, например получение хранилища ключей и значений. Другие методы, такие как process и close, используют это состояние.

Мне непонятно, как reify такой интерфейс в Clojure. Как бы мы передали состояние, полученное init, process, close и т. д.?

Использование замыкания?

У меня есть одна идея - использовать закрытие:

(let [ctx (atom nil)]
  (reify Processor
    (close [this]
      ;; Do something w/ ctx
      )
    (init [this context]
      (reset! ctx context))
    (process [this k v]
      ;; Do something w/ ctx
      )
    (punctuate [this timestamp]
      ;; Do something w/ ctx
      )))

Досадно, что нам придется каждый раз начинать с объекта ProcessorContext, поэтому код хранилища ключ-значение будет повторяться во всех методах, которым требуется хранилище ключ-значение.

Я не вижу (общего) способа обойти это, хотя в каждом конкретном случае мы можем заменить атом ctx более конкретным состоянием, которое необходимо методам.

Есть ли способ лучше?


person Tianxiang Xiong    schedule 17.12.2016    source источник


Ответы (1)


Замыкание атома было бы основным способом сделать это. Ваш исходный класс имеет два поля, поэтому вы можете закрыть два атома, чтобы получить тот же эффект.

(let [ctx (atom nil)
      kv-store (atom nil)]
  (reify Processor
    ,,,
    (init [this context]
      (reset! ctx context)
      (reset! kv-store (.getStateStore context "Counts")))
    ,,,))

Если это все еще слишком утомительно, вы можете добавить некоторые удобные функции, которые также закрывают атомы.

(let [ctx (atom nil)
      kv-store (atom nil)]

  (def kv-get [key]
    (.get @kv-store key))

  (def kv-all []
    (iterator-seq (.all @kv-store)))

  (def kv-put [key value]
    (.put @kv-store key value))

  (reify Processor
    ,,,
    (init [this context]
      (reset! ctx context)
      (reset! kv-store (.getStateStore context "Counts")))
    ,,,
  (punctuate [this timestamp]
    (do-seq [x (kv-all)]
      ,,,)
  )))

Альтернативой может быть использование gen-class, но вы с reify будет лучше.

person Arne Brasseur    schedule 17.12.2016