feat:线程池实现元数据查询

master
old-tom 2 years ago
parent a001e27f01
commit f73d5e82ab

@ -8,7 +8,8 @@
from collections import namedtuple
from common.fudb.connectors.connector_factory import ConnFactory
from common.fudb.dbapis.fu_db_api import select_all, select_one, batch_insert, count, execute_update
from common.fudb.dbapis.fu_db_api import select_all, select_one, batch_insert, count, execute_update, \
execute_many_sql_with_tx
class BaseDao(object):
@ -30,6 +31,9 @@ class BaseDao(object):
def batch_insert(self, sql_tpl, data, batch_size, db_type='postgresql'):
return batch_insert(self.connector.get_conn(), db_type, sql_tpl, data, batch_size)
def execute_update_tx(self, sql_list):
execute_many_sql_with_tx(self.connector.get_conn(), sql_list)
def dynamic_update_by_param(self, table_name, condition, param: dict):
"""
动态更新语句

@ -44,6 +44,19 @@ def _execute_with_tx(conn: Connection, sql):
conn.close()
def execute_many_sql_with_tx(conn: Connection, sql_list):
try:
conn.begin()
for sql in sql_list:
conn.execute(text(sql))
conn.commit()
except Exception as e:
conn.rollback()
raise SqlExecuteError(msg=f'sql [{sql_list}] 执行失败,开始回滚,e={e}')
finally:
conn.close()
def select_one(conn: Connection, sql):
"""
查询一个

@ -5,6 +5,7 @@
# @File : metadat_scan.py
# @Project : futool-tiny-datahub
# @Desc :
from multiprocessing.dummy import Pool
from datahub.datasource.datasource_manage import DataSourceManage
from datahub.metadata.metadata_reader import MetadataReader
from datahub.metadata.metadata_warehouse import MetadataWareHouse
@ -19,24 +20,30 @@ class MetadataScanner(object):
def __init__(self):
self.datasource_manage = DataSourceManage()
@staticmethod
def _join_meta_detail(meta_datas, metadata_reader: MetadataReader, obj_type):
"""
组装明细结果
:return:
"""
def _join_meta_detail(self, meta_datas, source_id, obj_type):
save_datas = []
for i, t in enumerate(meta_datas):
meta_id, obj_name = t[0], t[1]
def detail_handle(meta_data):
metadata_reader = MetadataReader(self.datasource_manage.get(source_id))
meta_id, obj_name = meta_data[0], meta_data[1]
meta_result = metadata_reader.query_metadata_detail(obj_name, obj_type)
# 组装插入数据
obj_data = ([meta_id] + list(x) for x in meta_result)
for d in obj_data:
save_datas.append(d)
log.info(f"剩余{len(meta_datas) - (i + 1)}")
if isinstance(meta_result, list):
obj_data = ([meta_id] + list(x) for x in meta_result)
for d in obj_data:
save_datas.append(d)
else:
# 视图及存储过程
save_datas.append([meta_id, meta_result])
# 小于等于数据库连接数
pool = Pool(15)
pool.map(detail_handle, meta_datas)
pool.close()
pool.join()
return save_datas
def _scan_detail(self, warehouse: MetadataWareHouse, metadata_reader: MetadataReader):
def _scan_detail(self, warehouse: MetadataWareHouse):
"""
获取明细数据
:param warehouse: 元数据仓库
@ -49,26 +56,27 @@ class MetadataScanner(object):
watch.start(f"{source_id}_table")
meta_tables = warehouse.query_metadata_id_name(MetaDataObjType.Table.value)
log.info(f"开始查询[{warehouse.source_id}] 表字段数据,共{len(meta_tables)}张表")
table_field_datas = self._join_meta_detail(meta_tables, metadata_reader, MetaDataObjType.Table.value)
table_field_datas = self._join_meta_detail(meta_tables, source_id, MetaDataObjType.Table.value)
warehouse.save_metadata_obj_field(table_field_datas)
table_time = watch.stop(f"{source_id}_table")
log.info(f"[{warehouse.source_id}] 表字段数据查询结束,耗时{table_time}")
log.info(f"[{warehouse.source_id}] 表字段数据入库结束,耗时{table_time}")
# 读取视图创建语句
watch.start(f"{source_id}_view")
meta_views = warehouse.query_metadata_id_name(MetaDataObjType.View.value)
log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图")
view_datas = self._join_meta_detail(meta_views, metadata_reader, MetaDataObjType.View.value)
view_datas = self._join_meta_detail(meta_views, source_id, MetaDataObjType.View.value)
warehouse.save_metadata_obj_detail(view_datas)
view_time = watch.stop(f"{source_id}_view")
log.info(f"[{warehouse.source_id}] 视图语句查询结束,耗时{view_time}")
log.info(f"[{warehouse.source_id}] 视图语句入库结束,耗时{view_time}")
# 读取存储过程创建语句
watch.start(f"{source_id}_procedure")
meta_procedure = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value)
log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_views)}张过程")
procedure_datas = self._join_meta_detail(meta_procedure, metadata_reader, MetaDataObjType.Procedure.value)
log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_procedure)}张过程")
procedure_datas = self._join_meta_detail(meta_procedure, source_id, MetaDataObjType.Procedure.value)
warehouse.save_metadata_obj_detail(procedure_datas)
procedure_time = watch.stop(f"{source_id}_procedure")
log.info(f"[{warehouse.source_id}] 存储过程语句查询结束,耗时{procedure_time}")
log.info(f"[{warehouse.source_id}] 存储过程语句入库结束,耗时{procedure_time}")
watch.clear()
def scan_metadata(self, source_id):
"""
@ -79,27 +87,30 @@ class MetadataScanner(object):
"""
# 元数据仓库API
warehouse = MetadataWareHouse(source_id)
# 获取待扫描数据源
datasource = self.datasource_manage.get(source_id)
# 初始化元数据读取器
metadata_reader = MetadataReader(datasource)
metadata_reader = MetadataReader(self.datasource_manage.get(source_id))
# 获取版本号
# version_keeper = MetadataVersionKeeper(source_id)
# version_code = version_keeper.increase_version()
# # 分别读取表\视图\存储过程并入库
# log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]')
# tables = metadata_reader.query_tables()
# warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code)
# log.info(f'[{source_id}]读取表完毕,共{len(tables)}张')
# views = metadata_reader.query_views()
# warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code)
# log.info(f'[{source_id}]读取视图完毕,共{len(views)}张')
# procedures = metadata_reader.query_procedure()
# warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code)
# log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个')
# 查询明细数据
self._scan_detail(warehouse, metadata_reader)
version_keeper = MetadataVersionKeeper(source_id)
# 版本自增
version_code = version_keeper.increase_version()
# 分别读取表\视图\存储过程并入库
log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]')
try:
tables = metadata_reader.query_tables()
warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code)
log.info(f'[{source_id}]读取表完毕,共{len(tables)}')
views = metadata_reader.query_views()
warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code)
log.info(f'[{source_id}]读取视图完毕,共{len(views)}')
procedures = metadata_reader.query_procedure()
warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code)
log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}')
# 查询明细数据
self._scan_detail(warehouse)
except Exception as e:
# 按照版本号全量删除
warehouse.delete_by_version(version_code)
log.error(f'[{source_id}]元数据扫描异常,进行回滚,删除版本号为[{version_code}]全部数据,e={e}')
if __name__ == '__main__':

@ -64,7 +64,7 @@ class MetadataReader(AbsMetadataReader):
def __init__(self, datasource: DataSource):
super().__init__(datasource)
self.db_type = datasource.connector.db_type
self.dao = MetadataDao(datasource.connector)
self.dao = MetadataDao(datasource.get_connector())
def query_tables(self):
return self.dao.query_all_tables(reader_conf[self.db_type]['tables'])

@ -122,3 +122,6 @@ class MetadataWareHouse(object):
def is_view(self, view_name, version_code=None):
return self.query_metadata_type_by_name(view_name, version_code) == MetaDataObjType.View.value
def delete_by_version(self, version_code):
self.dao.remove_metadata_by_version(self.source_id, version_code)

@ -121,7 +121,7 @@ class MetadataDao(BaseDao):
:return:
"""
sql = f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'" \
+ self._get_version_sql(source_id, version_code)+" limit 10"
+ self._get_version_sql(source_id, version_code)
return self.query_all(sql)
def query_metadata_create(self, obj_type, meta_id, source_id):
@ -175,3 +175,17 @@ class MetadataDao(BaseDao):
+ self._get_version_sql(source_id, version_code)
rt = self.query_one(sql)
return rt[0] if rt and len(rt) > 0 else None
def remove_metadata_by_version(self, source_id, version_code):
"""
根据版本号删除元数据
:param source_id:
:param version_code:
:return:
"""
del_version_sql = f"delete from metadata_object_version_record where source_id = '{source_id}' AND version_code = {version_code}"
del_metadata_obj_field_detail = f"delete from metadata_object_field where meta_id in (SELECT meta_id FROM metadata_object WHERE source_id = '{source_id}' AND version_code = {version_code})"
del_metadata_obj_create_detail = f"delete from metadata_object_create where meta_id in (SELECT meta_id FROM metadata_object WHERE source_id = '{source_id}' AND version_code = {version_code})"
del_metadata_obj = f"delete from metadata_object where source_id = '{source_id}' AND version_code = {version_code}"
self.execute_update_tx(
[del_version_sql, del_metadata_obj_field_detail, del_metadata_obj_create_detail, del_metadata_obj])

Loading…
Cancel
Save