Как хранить данные в HDFS с помощью потоковой передачи искры

Я хочу хранить потоковые данные в hdfs. Это искровой потоковый код, собирающий данные из темы kafka. Я пробовал это lines.saveAsHadoopFiles("hdfs://192.168.10.31:9000/user/spark/mystream/", "abc")

это мой код, дайте мне знать, чтобы написать код для сохранения данных в hdfs и как. в консоли я получаю вывод, который нужно сохранить в hdfs

заранее спасибо

package com.spark.cons.conskafka

import java.util.HashMap

import kafka.serializer.StringDecoder

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import _root_.kafka.serializer.StringDecoder


object Consume {

  def createContext(brokers: String, topics: String, checkpointDirectory: String): StreamingContext = {

    println("Creating new context")

    val conf = new SparkConf().setMaster("local[*]").setAppName("Spark Streaming - Kafka DirectReceiver - PopularHashTags").set("spark.executor.memory", "1g")

    val sc = new SparkContext(conf)

    sc.setLogLevel("WARN")

    // Set the Spark StreamingContext to create a DStream for every 2 seconds  
    val ssc = new StreamingContext(sc, Seconds(2))
    ssc.checkpoint("checkpoint")

    // Define the Kafka parameters, broker list must be specified
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      // start from the smallest available offset, ie the beginning of the kafka log
      "auto.offset.reset" -> "largest")

    // Define which topics to read from
    val topicsSet = topics.split(",").toSet

    // Map value from the kafka message (k, v) pair      
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)


    // Filter hashtags
    val hashTags = lines.map(_._2).flatMap(_.split(" ")).filter(_.startsWith("#"))

    // Get the top hashtags over the previous 60/10 sec window   
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    lines.print()

    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })

    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })

    lines.count().map(cnt => "Received " + cnt + " kafka messages.").print()

    ssc

  }

  def main(args: Array[String]) {

    if (args.length < 2) {
      System.err.println(s"""
        |Usage: KafkaDirectPopularHashTags <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |  <checkpointDirectory> the directory where the metadata is stored
        |
        """.stripMargin)
      System.exit(1)
    }

    // Create an array of arguments: brokers, topicname, checkpoint directory  
    val Array(brokers, topics, checkpointDirectory) = args

    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(brokers, topics, checkpointDirectory))

    ssc.start()
    ssc.awaitTermination()
  }

}

person Arun    schedule 03.09.2019    source источник
comment
см. это stackoverflow.com/questions/44492558/   -  person AbhishekN    schedule 11.09.2019