Запись и чтение массивов необработанных байтов в Spark — с помощью Sequence File SequenceFile

Как записать RDD[Array[Byte]] в файл с помощью Apache Spark и прочитать его снова?


person samthebest    schedule 06.06.2014    source источник


Ответы (2)


Общие проблемы, кажется, получают странное исключение, которое не может привести от BytesWritable к NullWritable. Другая распространенная проблема заключается в том, что BytesWritable getBytes представляет собой совершенно бессмысленную кучу ерунды, которая вообще не получает байтов. Что делает getBytes, так это получает ваши байты, а не добавляет массу нулей в конце! Вы должны использовать copyBytes

val rdd: RDD[Array[Byte]] = ???

// To write
rdd.map(bytesArray => (NullWritable.get(), new BytesWritable(bytesArray)))
  .saveAsSequenceFile("/output/path", codecOpt)

// To read
val rdd: RDD[Array[Byte]] = sc.sequenceFile[NullWritable, BytesWritable]("/input/path")
  .map(_._2.copyBytes())
person samthebest    schedule 06.06.2014
comment
Этот пост относительно старый, поэтому просто хотел узнать, актуален ли ответ? Нужно ли использовать copyBytes перед чтением? - person Sam Stoelinga; 17.01.2016
comment
@SamStoelinga Да, я так думаю, это Hadoop API вряд ли изменится. - person samthebest; 18.01.2016
comment
Более эффективная альтернатива — использовать <BytesWritableInstance>.getBytes() и обрабатывать только до <BytesWritableInstance>.getLength() байт. Конечно, если вам строго нужен RDD[Array[Byte]], этот подход не сработает, но вы можете подумать о RDD[(Array[Byte], Int)]. - person user1609012; 09.06.2016
comment
Может ли кто-нибудь опубликовать весь фрагмент рабочего кода, включая пакеты, которые нужно импортировать? Спасибо. - person Choix; 25.03.2018
comment
@Choix - у меня была такая же проблема. Фрагмент публикации, который решил мою проблему, как отдельный ответ. - person Chris Bedford; 28.07.2019

Вот фрагмент со всеми необходимыми импортами, которые вы можете запустить из spark-shell по запросу @Choix.

import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable

val path = "/tmp/path"

val rdd = sc.parallelize(List("foo"))
val bytesRdd = rdd.map{str  =>  (NullWritable.get, new BytesWritable(str.getBytes) )  }
bytesRdd.saveAsSequenceFile(path)

val recovered = sc.sequenceFile[NullWritable, BytesWritable]("/tmp/path").map(_._2.copyBytes())
val recoveredAsString = recovered.map( new String(_) )
recoveredAsString.collect()
// result is:  Array[String] = Array(foo)
person Chris Bedford    schedule 27.07.2019