Я делаю некоторые POC с потоковой передачей Spark и Spark для своего проекта. Итак, все, что я делаю, это читаю имя файла из Topic. Скачивание файла из src/main/sresource и запуск обычного частотного приложения WordCount.
@KafkaListener(topics = Constants.ABCWordTopic, groupId = Constants.ABC_WORD_COMSUMER_GROUP_ID)
public void processTask(@Payload String fileResourcePath) {
log.info("ABC Receiving task from WordProducer filepath {} at time {}", fileResourcePath,
LocalDateTime.now());
// Spark job
/*
* JavaRDD wordRDD =
* sparkContext.parallelize(Arrays.asList(extractFile(fileResourcePath).split(" ")));
* log.info("ABC Map Contents : {}", wordRDD.countByValue().toString());
* wordRDD.coalesce(1,
* true).saveAsTextFile("ResultSparklog_"+ System.currentTimeMillis());
*/
// Spark Streaming job
JavaPairDStream wordPairStream = streamingContext
.textFileStream(extractFile(fileResourcePath))
.flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
.mapToPair(s -> new Tuple2(s, 1)).reduceByKey((i1, i2) -> i1 + i2);
wordPairStream.foreachRDD(wordRDD -> {
// javaFunctions(wordTempRDD).writerBuilder("vocabulary", "words", mapToRow(String.class))
// .saveToCassandra();
log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
wordRDD.coalesce(1, true)
.saveAsTextFile("SparkStreamResultlog_" + System.currentTimeMillis());
});
streamingContext.start();
try {
streamingContext.awaitTerminationOrTimeout(-1);
} catch (InterruptedException e) {
log.error("Terminated streaming context {}", e);
}
}
- В приведенном выше коде я слушаю тему Kafka (ABCtopic) и обрабатываю ее. Код с комментариями задания Spark работает отлично. Он подсчитывает слово и выдает ожидаемые результаты, однако код задания потоковой передачи искры ведет себя не так, как ожидалось, и выводит значение null.
- Строка
log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
дает '{}' в качестве вывода. Запись в файл пуста. Будучи новичком в потоковой передаче Spark из того, что мало известно, потоковая передача Spark — это дополнительная библиотека для непрерывной обработки данных в режиме реального времени из любого источника, такого как файл, тема и т. д. - Чего не хватает в приведенном выше коде для потоковой передачи искры для вывода «null» в выделенной строке журнала, а также в выходном файле данных, который записывается на диск, тогда как задание Spark отлично выполняет ту же работу.