Как отправлять сообщения из потока данных Google (Apache Beam) на бегуне Flink в Kafka

Я пытаюсь написать доказательство концепции, которое берет сообщения от Kafka, преобразует их с помощью Beam на Flink, а затем отправляет результаты в другую тему Kafka.

Я использовал KafkaWindowedWordCountExample в качестве отправной точки, и он делает первую часть того, что я хочу сделать, но он выводит в текстовые файлы, а не в Kafka. FlinkKafkaProducer08 выглядит многообещающе, но я не могу понять, как подключить его к конвейеру. Я думал, что он будет обернут UnboundedFlinkSink или чем-то подобным, но, похоже, этого не существует.

Любые советы или мысли о том, что я пытаюсь сделать?

Я запускаю последний луч-инкубатор (по состоянию на прошлый вечер с Github), Flink 1.0.0 в кластерном режиме и Kafka 0.9.0.1, все на Google Compute Engine (Debian Jessie).


person Bill McCarthy    schedule 18.03.2016    source источник


Ответы (2)


Преобразование приемника для записи в Kafka было добавлено в Apache Beam / Dataflow в 2016 году. Пример использования см. в JavaDoc для KafkaIO в Apache Beam.

person Raghu Angadi    schedule 30.08.2017

В настоящее время в Beam нет класса UnboundedSink. Большинство неограниченных приемников реализованы с использованием ParDo.

Вы можете проверить коннектор KafkaIO. Это считыватель Kafka, который работает во всех бегунах Beam и реализует параллельное чтение, создание контрольных точек и другие UnboundedSource API. Этот запрос на вытягивание также включает грубый приемник в конвейере примера TopHashtags, написав Kafka в ParDo:

class KafkaWriter extends DoFn<String, Void> {

  private final String topic;
  private final Map<String, Object> config;
  private transient KafkaProducer<String, String> producer = null;

  public KafkaWriter(Options options) {
    this.topic = options.getOutputTopic();
    this.config = ImmutableMap.<String, Object>of(
        "bootstrap.servers", options.getBootstrapServers(),
        "key.serializer",    StringSerializer.class.getName(),
        "value.serializer",  StringSerializer.class.getName());
  }

  @Override
  public void startBundle(Context c) throws Exception {
    if (producer == null) { // in Beam, startBundle might be called multiple times.
      producer = new KafkaProducer<String, String>(config);
    }
  }

  @Override
  public void finishBundle(Context c) throws Exception {
    producer.close();
  }

  @Override
  public void processElement(ProcessContext ctx) throws Exception {
    producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
  }
}

Конечно, мы хотели бы добавить поддержку стоков и в KafkaIO. По сути, это будет то же самое, что и KafkaWriter выше, но гораздо проще в использовании.

person Raghu Angadi    schedule 23.03.2016