Вариант использования — запустить список sql в улье и обновить метаданные impala. Как показано ниже, два метода для hive и impala используют jdbc_hook. В каком бы порядке я ни вызывал эти методы, только первый запускается, а второй выдает ОШИБКУ — java.lang.RuntimeException: класс «имя драйвера куста/импалы» не найден. Каждый метод работает нормально, если используется отдельно. Пожалуйста, найдите метод выполнения пользовательского оператора воздушного потока ::: Примечание :: Я не могу использовать hive_operator для запуска операторов куста. И я не вижу никаких методов в HiveServer2_Hook. Я новичок в воздушном потоке, любая помощь очень ценится
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.jdbc_hook import JdbcHook
import sqlparse
class CustomHiveOperator(BaseOperator):
"""
Executes hql code and invalidates,compute stats impala for that table.
Requires JdbcHook,sqlparse.
:param hive_jdbc_conn: reference to a predefined hive database
:type hive_jdbc_conn: str
:param impala_jdbc_conn: reference to a predefined impala database
:type impala_jdbc_conn: str
:param table_name: hive table name, used for post process in impala
:type table_name: str
:param script_path: hql scirpt path to run in hive
:type script_path: str
:param autocommit: if True, each command is automatically committed.
(default value: False)
:type autocommit: bool
:param parameters: (optional) the parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
@apply_defaults
def __init__(
self,
hive_jdbc_conn: str,
impala_jdbc_conn:str,
table_name:str,
script_path:str,
autocommit=False,
parameters=None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.hive_jdbc_conn= hive_jdbc_conn
self.impala_jdbc_conn= impala_jdbc_conn
self.table_name=table_name
self.script_path=script_path
self.autocommit=autocommit
self.parameters=parameters
def execute(self,context):
self.hive_run()
self.impala_post_process()
def format_string(self,x):
return x.replace(";","")
def hive_run(self):
with open(self.script_path) as f:
data = f.read()
hql_temp = sqlparse.split((data))
hql = [self.format_string(x) for x in hql_temp]
self.log.info('Executing: %s', hql)
self.hive_hook = JdbcHook(jdbc_conn_id=self.hive_jdbc_conn)
self.hive_hook.run(hql, self.autocommit, parameters=self.parameters)
def impala_post_process(self):
invalidate = 'INVALIDATE METADATA '+self.table_name
compute_stats = 'COMPUTE STATS '+self.table_name
hql = [invalidate,compute_stats]
self.log.info('Executing: %s', hql)
self.impala_hook = JdbcHook(jdbc_conn_id=self.impala_jdbc_conn)
self.impala_hook.run(hql, self.autocommit, parameters=self.parameters)