Разница в выводе потоковой передачи Spark и Spark для одного и того же задания

Я делаю некоторые POC с потоковой передачей Spark и Spark для своего проекта. Итак, все, что я делаю, это читаю имя файла из Topic. Скачивание файла из src/main/sresource и запуск обычного частотного приложения WordCount.

 
@KafkaListener(topics = Constants.ABCWordTopic, groupId = Constants.ABC_WORD_COMSUMER_GROUP_ID) 
public void processTask(@Payload String fileResourcePath) {
        log.info("ABC Receiving task from WordProducer filepath {} at time {}", fileResourcePath,
                LocalDateTime.now());
        // Spark job
        /*
         * JavaRDD wordRDD =
         * sparkContext.parallelize(Arrays.asList(extractFile(fileResourcePath).split(" ")));
         * log.info("ABC Map Contents : {}", wordRDD.countByValue().toString());
         * wordRDD.coalesce(1,
         * true).saveAsTextFile("ResultSparklog_"+ System.currentTimeMillis());
         */
        // Spark Streaming job
        JavaPairDStream wordPairStream = streamingContext
                .textFileStream(extractFile(fileResourcePath))
                .flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
                .mapToPair(s -> new Tuple2(s, 1)).reduceByKey((i1, i2) -> i1 + i2);
        wordPairStream.foreachRDD(wordRDD -> {
        //  javaFunctions(wordTempRDD).writerBuilder("vocabulary", "words", mapToRow(String.class))
        //                  .saveToCassandra();
            log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
            wordRDD.coalesce(1, true)
                    .saveAsTextFile("SparkStreamResultlog_" + System.currentTimeMillis());
        });
        streamingContext.start();
        try {
            streamingContext.awaitTerminationOrTimeout(-1);
        } catch (InterruptedException e) {
            log.error("Terminated streaming context {}", e);
        }
    }
  • В приведенном выше коде я слушаю тему Kafka (ABCtopic) и обрабатываю ее. Код с комментариями задания Spark работает отлично. Он подсчитывает слово и выдает ожидаемые результаты, однако код задания потоковой передачи искры ведет себя не так, как ожидалось, и выводит значение null.
  • Строка log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString()); дает '{}' в качестве вывода. Запись в файл пуста. Будучи новичком в потоковой передаче Spark из того, что мало известно, потоковая передача Spark — это дополнительная библиотека для непрерывной обработки данных в режиме реального времени из любого источника, такого как файл, тема и т. д.
  • Чего не хватает в приведенном выше коде для потоковой передачи искры для вывода «null» в выделенной строке журнала, а также в выходном файле данных, который записывается на диск, тогда как задание Spark отлично выполняет ту же работу.

person Guru    schedule 18.09.2020    source источник


Ответы (1)


Добавление его ответа для других людей, которые могут застрять в этом вопросе. На первый взгляд кажется, что это должно работать, однако, прочитав документацию по Spark, можно сделать вывод.
API streamingContext.textFileStream(..) не считывает статическое содержимое ни из одного каталога. Поэтому он не может прочитать файлы из каталога или, скорее, обработать его. Он предназначен для чтения потоковых данных, поэтому данные необходимо добавлять или обновлять в каталоге мониторинга. Поэтому быстрый взлом из того, что я прочитал в Интернете, заключается в перемещении файлов или обновлении файлов в каталог Windows (я использую Windows 10) после начала выполнения программы (т.е. StreamingContext.start выполнен).
Обратите внимание, что мне НЕ удалось заставить его выполняться даже после того, как я испробовал все эти хаки, но учитывая, что это предположительно не правильный вариант использования для потоковой передачи (чтение из папки и обработка могут быть легко достигнуты с помощью задания Spark). что и продемонстрировал мой код) на этом я должен остановиться.

person Guru    schedule 21.09.2020