Получение java.lang.RuntimeException: класс драйвера не найден при использовании jdbc_hook более одного раза в операторе воздушного потока

Вариант использования — запустить список sql в улье и обновить метаданные impala. Как показано ниже, два метода для hive и impala используют jdbc_hook. В каком бы порядке я ни вызывал эти методы, только первый запускается, а второй выдает ОШИБКУ — java.lang.RuntimeException: класс «имя драйвера куста/импалы» не найден. Каждый метод работает нормально, если используется отдельно. Пожалуйста, найдите метод выполнения пользовательского оператора воздушного потока ::: Примечание :: Я не могу использовать hive_operator для запуска операторов куста. И я не вижу никаких методов в HiveServer2_Hook. Я новичок в воздушном потоке, любая помощь очень ценится

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.jdbc_hook import JdbcHook
import sqlparse

class CustomHiveOperator(BaseOperator):
    """
    Executes hql code and invalidates,compute stats impala for that table.

    Requires JdbcHook,sqlparse.

    :param hive_jdbc_conn: reference to a predefined hive database
    :type hive_jdbc_conn: str
    :param impala_jdbc_conn: reference to a predefined impala database
    :type impala_jdbc_conn: str
    :param table_name: hive table name, used for post process in impala
    :type table_name: str
    :param script_path: hql scirpt path to run in hive
    :type script_path: str
    :param autocommit: if True, each command is automatically committed.
        (default value: False)
    :type autocommit: bool
    :param parameters: (optional) the parameters to render the SQL query with.
    :type parameters: mapping or iterable
    """
    
    
    @apply_defaults
    def __init__(
            self,
            hive_jdbc_conn: str,
            impala_jdbc_conn:str,
            table_name:str,
            script_path:str,
            autocommit=False,
            parameters=None,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.hive_jdbc_conn= hive_jdbc_conn
        self.impala_jdbc_conn= impala_jdbc_conn
        self.table_name=table_name
        self.script_path=script_path
        self.autocommit=autocommit
        self.parameters=parameters

    def execute(self,context):
        self.hive_run()
        self.impala_post_process()
        
    
    def format_string(self,x):
        return x.replace(";","")    
    
    
    def hive_run(self):
        with open(self.script_path) as f:
            data = f.read()
        hql_temp = sqlparse.split((data))
        hql = [self.format_string(x) for x in hql_temp]
        self.log.info('Executing: %s', hql)
        self.hive_hook = JdbcHook(jdbc_conn_id=self.hive_jdbc_conn)
        self.hive_hook.run(hql, self.autocommit, parameters=self.parameters)            
        
    
    
    def impala_post_process(self):
        invalidate = 'INVALIDATE METADATA '+self.table_name
        compute_stats = 'COMPUTE STATS '+self.table_name
        hql = [invalidate,compute_stats]
        self.log.info('Executing: %s', hql)     
        self.impala_hook = JdbcHook(jdbc_conn_id=self.impala_jdbc_conn)
        self.impala_hook.run(hql, self.autocommit, parameters=self.parameters)

person arya.s    schedule 30.07.2020    source источник


Ответы (1)


На самом деле это проблема с тем, как Airflow использует jaydebeapi и базовые модули JPype для облегчения соединения JDBC.

Виртуальная машина Java запускается при первом использовании JPype (первый вызов JdbcHook.get_conn), и единственные библиотеки, о которых сообщает виртуальная машина, — это та, которую вы используете для любого соединения JDBC. сделанный. Когда вы создаете другое подключение, виртуальная машина уже запущена и не знает о библиотеках, необходимых для другого типа подключения.

Единственный способ, который я нашел, — это использовать расширение JdbcHook, которое переопределяет метод get_conn для сбора путей всех драйверов JDBC, определенных как объект Connection в Airflow. см. здесь реализацию Airflow.

person joebeeson    schedule 31.07.2020