Как записать RDD[Array[Byte]]
в файл с помощью Apache Spark и прочитать его снова?
Запись и чтение массивов необработанных байтов в Spark — с помощью Sequence File SequenceFile
Ответы (2)
Общие проблемы, кажется, получают странное исключение, которое не может привести от BytesWritable к NullWritable. Другая распространенная проблема заключается в том, что BytesWritable getBytes
представляет собой совершенно бессмысленную кучу ерунды, которая вообще не получает байтов. Что делает getBytes
, так это получает ваши байты, а не добавляет массу нулей в конце! Вы должны использовать copyBytes
val rdd: RDD[Array[Byte]] = ???
// To write
rdd.map(bytesArray => (NullWritable.get(), new BytesWritable(bytesArray)))
.saveAsSequenceFile("/output/path", codecOpt)
// To read
val rdd: RDD[Array[Byte]] = sc.sequenceFile[NullWritable, BytesWritable]("/input/path")
.map(_._2.copyBytes())
person
samthebest
schedule
06.06.2014
Этот пост относительно старый, поэтому просто хотел узнать, актуален ли ответ? Нужно ли использовать copyBytes перед чтением?
- person Sam Stoelinga; 17.01.2016
@SamStoelinga Да, я так думаю, это Hadoop API вряд ли изменится.
- person samthebest; 18.01.2016
Более эффективная альтернатива — использовать
<BytesWritableInstance>.getBytes()
и обрабатывать только до <BytesWritableInstance>.getLength()
байт. Конечно, если вам строго нужен RDD[Array[Byte]]
, этот подход не сработает, но вы можете подумать о RDD[(Array[Byte], Int)]
.
- person user1609012; 09.06.2016
Может ли кто-нибудь опубликовать весь фрагмент рабочего кода, включая пакеты, которые нужно импортировать? Спасибо.
- person Choix; 25.03.2018
@Choix - у меня была такая же проблема. Фрагмент публикации, который решил мою проблему, как отдельный ответ.
- person Chris Bedford; 28.07.2019
Вот фрагмент со всеми необходимыми импортами, которые вы можете запустить из spark-shell по запросу @Choix.
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
val path = "/tmp/path"
val rdd = sc.parallelize(List("foo"))
val bytesRdd = rdd.map{str => (NullWritable.get, new BytesWritable(str.getBytes) ) }
bytesRdd.saveAsSequenceFile(path)
val recovered = sc.sequenceFile[NullWritable, BytesWritable]("/tmp/path").map(_._2.copyBytes())
val recoveredAsString = recovered.map( new String(_) )
recoveredAsString.collect()
// result is: Array[String] = Array(foo)
person
Chris Bedford
schedule
27.07.2019