Потоковая передача Kafka Spark: невозможно читать сообщения

Я интегрирую Kafka и Spark, используя искру. Как продюсер кафки создал тему:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 

Я публикую сообщения в kafka и пытаюсь прочитать их с помощью Java-кода с потоковой передачей искр и отобразить их на экране.
Все демоны включены: Spark-master, worker; работник зоопарка; kafka.
Я пишу java-код для этого, используя KafkaUtils.createStream.
код ниже:

public class SparkStream {
    public static void main(String args[])
    {
        if(args.length != 3)
        {
            System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
            System.exit(1);
        }


        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(1));
        }

        JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

        System.out.println("Connection done++++++++++++++");
        JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
                                                {
                                                    public String call(Tuple2<String, String> message)
                                                    {
                                                        System.out.println("NewMessage: "+message._2()+"++++++++++++++++++");
                                                        return message._2();
                                                    }
                                                }
                                                );
        data.print();

        jssc.start();
        jssc.awaitTermination();

    }
}

Я выполняю задание, а на другом терминале я запускаю kafka-продюсер для публикации сообщений:

Hi kafka
second message
another message

Но в журналах вывода на консоли потоковой передачи искры сообщения не отображаются, а отображается ноль полученных блоков:

-------------------------------------------
Time: 1417438988000 ms
-------------------------------------------

2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,008 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0 from job set of time 1417438988000 ms
2014-12-01 08:03:08,009 INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000 ms (execution: 0.000 s)
2014-12-01 08:03:08,010 INFO  [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
2014-12-01 08:03:08,015 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
2014-12-01 08:03:08,024 INFO  [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 39
2014-12-01 08:03:08,027 INFO  [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
2014-12-01 08:03:08,031 INFO  [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 38
2014-12-01 08:03:08,033 INFO  [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
2014-12-01 08:03:09,002 INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

Почему не приходит блок данных? я пробовал использовать kafka продюсер-потребитель на консоли bin/kafka-console-producer.... и bin/kafka-console-consumer... он работает идеально, но почему не мой код ... любая идея?


person aiman    schedule 28.11.2014    source источник


Ответы (2)


Проблема решена.

приведенный выше код правильный. Мы просто добавим еще две строки, чтобы подавить сгенерированные [INFO] и [WARN]. Итак, окончательный код:

package com.spark;

import scala.Tuple2;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
    public static void main(String args[])
    {
        if(args.length != 3)
        {
            System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
            System.exit(1);
        }

        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(3));
        }

        JavaStreamingContext jssc = new JavaStreamingContext("local[4]", "SparkStream", new Duration(1000));
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

        System.out.println("Connection done++++++++++++++");
        JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, String>() 
                                                {
                                                    public String call(Tuple2<String, String> message)
                                                    {
                                                        return message._2();
                                                    }
                                                }
                                                );
        data.print();

        jssc.start();
        jssc.awaitTermination();

    }
}

Также нам нужно добавить зависимость в POM.xml:

<dependency>
<groupId>com.msiops.footing</groupId>
<artifactId>footing-tuple</artifactId>
<version>0.2</version>
</dependency>  

Эта зависимость используется для использования scala.Tuple2
Ошибка Stream 0 received 0 block возникла из-за того, что искровый рабочий недоступен, а для искрового рабочего-сердечника было установлено значение 1. Для потоковой передачи искры нам нужно, чтобы ядро ​​было> = 2. Итак, нам нужно внести изменения в файл spark-config. Обратитесь к руководству по установке. чтобы добавить строку export SPARK_WORKER_CORE=5 Также измените SPARK_MASTER='hostname' на SPARK_MASTER=<your local IP>. Этот локальный IP-адрес выделен жирным шрифтом при переходе в веб-консоль Spark UI ... что-то вроде: spark://192.168..:<port>. Здесь порт нам не нужен. требуется только IP.
Теперь перезапустите Spark-Master и Spark-Worker и начните потоковую передачу :)

выход:

-------------------------------------------
Time: 1417443060000 ms
-------------------------------------------
message 1

-------------------------------------------
Time: 1417443061000 ms
-------------------------------------------
message 2

-------------------------------------------
Time: 1417443063000 ms
-------------------------------------------
message 3
message 4

-------------------------------------------
Time: 1417443064000 ms
-------------------------------------------
message 5
message 6
messag 7

-------------------------------------------
Time: 1417443065000 ms
-------------------------------------------
message 8
person aiman    schedule 02.12.2014
comment
@ aiman- Не могли бы вы уточнить свой ответ. Какой файл конфигурации свечей. Где я могу найти этот файл? - person ketan; 07.09.2017
comment
измените количество ядер (сценарий conf / spark-env.sh) на каждом узле или измените его глобально в conf / spark-defaults.conf - person aiman; 13.09.2017

Да, вам нужно получить доступ к контенту из DStream.

messages.foreachRDD(<<processing for the input received in the interval>>);
person Vijay Innamuri    schedule 28.11.2014
comment
Привет Виджай. Я уже добавил строку messages.print (). это не сработает? как использовать логику печати в foreachRDD ()? - person aiman; 29.11.2014
comment
Привет, Виджей, согласно вашему предложению, я отредактировал приведенный выше код, то есть добавил эти строки: - person aiman; 01.12.2014
comment
JavaDStream ‹String› data = messages.map (новая функция ‹Tuple2‹ String, String ›, String› () {вызов public String (Tuple2 ‹String, String› message) {System.out.println (NewMessage: + message._2 ()); вернуть сообщение._2 ();}}); data.print (); - person aiman; 01.12.2014
comment
Но результат все тот же: поток 0 получил 0 блоков. - person aiman; 01.12.2014