Соедините два обычных RDD с/без Spark SQL

Мне нужно объединить два обычных RDDs в один или несколько столбцов. Логически эта операция эквивалентна операции соединения двух таблиц с базой данных. Интересно, это возможно только через Spark SQL или есть другие способы сделать это.

В качестве конкретного примера рассмотрим RDD r1 с первичным ключом ITEM_ID:

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID)

и СДР r2 с первичным ключом COMPANY_ID:

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY)

Я хочу присоединиться к r1 и r2.

Как это может быть сделано?


person learning_spark    schedule 12.12.2014    source источник


Ответы (4)


Сумья Симанта дала хороший ответ. Однако значения в объединенном RDD равны Iterable, поэтому результаты могут быть не очень похожи на обычное соединение таблиц.

Кроме того, вы можете:

val mappedItems = items.map(item => (item.companyId, item))
val mappedComp = companies.map(comp => (comp.companyId, comp))
mappedItems.join(mappedComp).take(10).foreach(println)

Результат будет:

(c1,(Item(1,first,2,c1),Company(c1,company-1,city-1)))
(c1,(Item(2,second,2,c1),Company(c1,company-1,city-1)))
(c2,(Item(3,third,2,c2),Company(c2,company-2,city-2)))
person viirya    schedule 12.12.2014
comment
Ваши карты такие же, как items.keyBy{_.companyId}, companies.keyBy{_.companyId}. Поскольку они являются частью Spark, есть шанс, что это будет более эффективно? - person The Archetypal Paul; 12.12.2014
comment
@Paul Это исходный код искры для keyBy: def keyBy[K](f: T => K): RDD[(K, T)] = { map(x => (f(x), x)) }, так что ваше решение и решение @virya совершенно одинаковы. - person jlopezmat; 12.12.2014
comment
ОК :) Тем не менее, возможно, намерение немного яснее с keyBy. Хотя не важный момент - person The Archetypal Paul; 12.12.2014

(Используя Scala) Допустим, у вас есть два RDD:

  • emp: (empid, ename, dept)

  • отдел: (имя, отдел)

Ниже приведен еще один способ:

//val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))
val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30)))

val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))

//val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2))
val shifted_fields_emp = emp.map(t => (t._2, t._1))

val shifted_fields_dept = dept.map(t => (t._2,t._1))

shifted_fields_emp.join(shifted_fields_dept)
// Create emp RDD
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))

// Create dept RDD
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))

// Establishing that the third field is to be considered as the Key for the emp RDD
val manipulated_emp = emp.keyBy(t => t._3)

// Establishing that the second field need to be considered as the Key for dept RDD
val manipulated_dept = dept.keyBy(t => t._2)

// Inner Join
val join_data = manipulated_emp.join(manipulated_dept)
// Left Outer Join
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept)
// Right Outer Join
val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept)
// Full Outer Join
val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept)

// Formatting the Joined Data for better understandable (using map)
val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1))

Это даст результат как:

// Печатаем выходные данные clean_joined_data на консоли

scala> cleaned_joined_data.collect()
res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop))
person New Coder    schedule 14.08.2015

Что-то вроде этого должно работать.

scala> case class Item(id:String, name:String, unit:Int, companyId:String)

scala> case class Company(companyId:String, name:String, city:String)

scala> val i1 = Item("1", "first", 2, "c1")

scala> val i2 = i1.copy(id="2", name="second")

scala> val i3 = i1.copy(id="3", name="third", companyId="c2")

scala> val items = sc.parallelize(List(i1,i2,i3))
items: org.apache.spark.rdd.RDD[Item] = ParallelCollectionRDD[14] at parallelize at <console>:20

scala> val c1 = Company("c1", "company-1", "city-1")

scala> val c2 = Company("c2", "company-2", "city-2")

scala> val companies = sc.parallelize(List(c1,c2))

scala> val groupedItems = items.groupBy( x => x.companyId) 
groupedItems: org.apache.spark.rdd.RDD[(String, Iterable[Item])] = ShuffledRDD[16] at groupBy at <console>:22

scala> val groupedComp = companies.groupBy(x => x.companyId)
groupedComp: org.apache.spark.rdd.RDD[(String, Iterable[Company])] = ShuffledRDD[18] at groupBy at <console>:20

scala> groupedItems.join(groupedComp).take(10).foreach(println)

14/12/12 00:52:32 INFO DAGScheduler: Job 5 finished: take at <console>:35, took 0.021870 s
(c1,(CompactBuffer(Item(1,first,2,c1), Item(2,second,2,c1)),CompactBuffer(Company(c1,company-1,city-1))))
(c2,(CompactBuffer(Item(3,third,2,c2)),CompactBuffer(Company(c2,company-2,city-2))))
person Soumya Simanta    schedule 12.12.2014

Spark SQL может выполнять присоединение к RDD SPARK.

Нижеприведенный код выполняет соединение SQL для RDD Company и Items.

object SparkSQLJoin {

case class Item(id:String, name:String, unit:Int, companyId:String)
case class Company(companyId:String, name:String, city:String)

def main(args: Array[String]) {

    val sparkConf = new SparkConf()
    val sc= new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.createSchemaRDD

    val i1 = Item("1", "first", 1, "c1")
    val i2 = Item("2", "second", 2, "c2")
    val i3 = Item("3", "third", 3, "c3")
    val c1 = Company("c1", "company-1", "city-1")
    val c2 = Company("c2", "company-2", "city-2")

    val companies = sc.parallelize(List(c1,c2))
    companies.registerAsTable("companies")

    val items = sc.parallelize(List(i1,i2,i3))
    items.registerAsTable("items")

    val result = sqlContext.sql("SELECT * FROM companies C JOIN items I ON C.companyId= I.companyId").collect

    result.foreach(println)

    }
}

Вывод отображается как

     [c1,company-1,city-1,1,first,1,c1]
     [c2,company-2,city-2,2,second,2,c2]
person Vijay Innamuri    schedule 12.12.2014
comment
У меня есть несколько столбцов, поэтому мне нужно указать схему программно. Также RDD создаются из больших текстовых файлов в HDFS. Я считаю, что описанный выше подход все еще работает, верно? Пожалуйста, дайте мне знать, если необходимы какие-либо изменения. - person learning_spark; 15.12.2014
comment
Да, этот подход отлично работает и для больших данных. Для определения схемы программным путем проверьте spark .apache.org/docs/latest/ - person Vijay Innamuri; 15.12.2014