Как извлечь журналы клиента заданий Spark, отправленные с помощью метода POST пакетной обработки Apache Livy с помощью AirFlow

Я работаю над отправкой задания Spark с использованием метода POST пакетов Apache Livy.

Этот HTTP-запрос отправляется с помощью AirFlow. После отправки работы я отслеживаю статус, используя пакетный идентификатор.

Я хочу отображать журналы драйвера (клиентские журналы) в журналах Air Flow, чтобы не заходить в несколько мест AirFLow и Apache Livy / Resource Manager.

Возможно ли это сделать с помощью API REST Apache Livy?


person Ramdev Sharma    schedule 20.01.2019    source источник
comment
aws.amazon.com/blogs/big-data/ См. часть track_statement_progress   -  person ookboy24    schedule 20.01.2019
comment
Спасибо. Я был полезен, когда начинал. Я получил более подробную информацию, которую искал, из ответа @kaxil. Спасибо.   -  person Ramdev Sharma    schedule 21.01.2019
comment
@RamdevSharma - конечная точка живого журнала, похоже, является журналом для Ливи, отправляющего пакет, а не самим журналом драйвера искры. Я что-то упускаю? Я пытаюсь решить ту же проблему, выставить журналы искр в Airflow, чтобы мне не приходилось переходить к EMR для отладки. Спасибо   -  person alexP_Keaton    schedule 17.07.2019
comment
@alexP_Keaton Удалось ли вам решить эту проблему? Конечная точка журнала на Livy - это не журналы драйвера, и я не уверен, есть ли способ (по крайней мере, прямо сейчас) опросить журналы драйверов Spark через Livy.   -  person Scrotch    schedule 09.05.2020


Ответы (2)


У Ливи есть конечная точка для получения журналов /sessions/{sessionId}/log & /batches/{batchId}/log.

Документация:

Вы можете создавать функции Python, подобные показанной ниже, для получения журналов:

http = HttpHook("GET", http_conn_id=http_conn_id)

def _http_rest_call(self, method, endpoint, data=None, headers=None, extra_options=None):
    if not extra_options:
        extra_options = {}

    self.http.method = method
    response = http.run(endpoint, json.dumps(data), headers, extra_options=extra_options)

    return response


def _get_batch_session_logs(self, batch_id):
    method = "GET"
    endpoint = "batches/" + str(batch_id) + "/log"
    response = self._http_rest_call(method=method, endpoint=endpoint)
    # return response.json()
    return response
person kaxil    schedule 21.01.2019
comment
Спасибо @kaxil, это то, что я искал и как-то пропустил. На основе входных данных конечной точки и размера я могу показывать журналы при каждой проверке состояния. - person Ramdev Sharma; 21.01.2019
comment
Кажется, livy хранит журналы пакетов в jvm, не сохраняя их на диске ... Я не могу найти файлы журналов. Если livy сохраняет журналы, где это местоположение? - person Archon; 21.10.2019
comment
@RamdevSharma Не могли бы вы поделиться своим кодом? Я застрял на http_conn_id = http_conn_id и не знаю, откуда это. Спасибо! - person user1983682; 14.01.2020
comment
@ user1983682, извините, но у меня нет исходного кода, так как я вышел из проекта. Вы можете найти Connection is Air-flow UI для создания для http. - person Ramdev Sharma; 14.01.2020
comment
Я понимаю. @kaxil, знаете ли вы, почему я получаю сообщение об ошибке ERROR - _get_batch_session_logs () получил неожиданный аргумент ключевого слова 'next_execution_date' с этим решением? - person user1983682; 15.01.2020
comment
@ user1983682 Рад помочь, не могли бы вы поделиться своим кодом? - person kaxil; 15.01.2020
comment
Кто-нибудь знает, где livy хранит детали партии, потому что, если вы запрашиваете детали партии после завершения партии, вы не получаете статус - person dileepVikram; 08.06.2020

Ливи предоставляет REST API двумя способами: сессионным и пакетным. В вашем случае, поскольку мы предполагаем, что вы не используете сеанс, вы отправляете с использованием пакетов. Вы можете опубликовать свою партию, используя команду curl:

curl http://livy-server-IP:8998/batches

После того, как вы отправите задание, вы получите взамен идентификатор партии. Затем вы можете завить с помощью команды:

curl http://livy-server-IP:8998/batches/ {batchId} / бревно

Вы можете найти документацию по адресу: https://livy.incubator.apache.org/docs/latest/rest-api.html

Если вы хотите избежать описанных выше шагов, вы можете использовать готовый AMI (а именно, LightningFLow) из AWS Marketplace, который предоставляет Airflow настраиваемый оператор Livy. Оператор Livy отправляет и отслеживает статус задания каждые 30 секунд (настраивается), а также предоставляет журналы искр в конце задания искры в журналах пользовательского интерфейса Airflow.

Примечание. LightningFlow предварительно интегрирован со всеми необходимыми библиотеками, Livy, пользовательскими операторами и локальным кластером Spark.

Ссылка для AWS Marketplace: https://aws.amazon.com/marketplace/pp/Lightning-Analytics-Inc-LightningFlow-Integrated-o/B084BSD66V

Это позволит вам просматривать консолидированные журналы в одном месте, вместо того, чтобы переключаться между журналами Airflow и EMR / Spark (Ambari / Resource Manager).

person Lightning-Analytics    schedule 14.02.2020