Попытка превратить большой двоичный объект в несколько столбцов в Spark

У меня есть сериализованный блоб и функция, которая преобразует его в java-карту. Я зарегистрировал функцию как UDF и попытался использовать ее в Spark SQL следующим образом:

sqlCtx.udf.register("blobToMap", Utils.blobToMap)
val df = sqlCtx.sql(""" SELECT mp['c1'] as c1, mp['c2'] as c2 FROM
                        (SELECT *, blobToMap(payload) AS mp FROM t1) a """)

Мне это удается, но по какой-то причине очень тяжелая функция blobToMap запускается дважды для каждой строки, а на самом деле я извлекаю 20 полей, и она запускается 20 раз для каждой строки. Я видел предложения в Получить несколько столбцов из один столбец в Spark DataFrame, но они действительно не масштабируются — я не хочу создавать класс каждый раз, когда мне нужно извлечь данные.

Как я могу заставить Spark делать то, что разумно? Я попытался разделить на два этапа. Единственное, что сработало, - это кэшировать внутренний выбор, но это тоже невозможно, потому что это действительно большой двоичный объект, и мне нужно от него всего несколько десятков полей.


person uzadude    schedule 04.01.2016    source источник


Ответы (1)


Я отвечу сам себе, надеясь, что это поможет кому-нибудь ... поэтому после десятков экспериментов я смог заставить искру оценить udf и превратить его в карту один раз, вместо того, чтобы пересчитывать его снова и снова для каждого ключевого запроса, разделяя запрос и делает злой уродливый трюк - превращая его в RDD и обратно в DataFrame:

val df1 = sqlCtx.sql("SELECT *, blobToMap(payload) AS mp FROM t1")
sqlCtx.createDataFrame(df.rdd, df.schema).registerTempTable("t1_with_mp")
val final_df = sqlCtx.sql("SELECT mp['c1'] as c1, mp['c2'] as c2 FROM t1_with_mp")
person uzadude    schedule 06.01.2016