From f73d5e82ab6eb69a258ad8f4d915638359d06e7b Mon Sep 17 00:00:00 2001 From: old-tom <892955278@qq.com> Date: Sun, 14 May 2023 21:06:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E7=BA=BF=E7=A8=8B=E6=B1=A0=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E5=85=83=E6=95=B0=E6=8D=AE=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/fudb/dbapis/fu_dao.py | 6 +- common/fudb/dbapis/fu_db_api.py | 13 +++ datahub/metadata/metadat_scan.py | 89 +++++++++++--------- datahub/metadata/metadata_reader.py | 2 +- datahub/metadata/metadata_warehouse.py | 3 + datahub/metadata/metadatadao/metadata_dao.py | 16 +++- 6 files changed, 87 insertions(+), 42 deletions(-) diff --git a/common/fudb/dbapis/fu_dao.py b/common/fudb/dbapis/fu_dao.py index c962389..a3b4208 100644 --- a/common/fudb/dbapis/fu_dao.py +++ b/common/fudb/dbapis/fu_dao.py @@ -8,7 +8,8 @@ from collections import namedtuple from common.fudb.connectors.connector_factory import ConnFactory -from common.fudb.dbapis.fu_db_api import select_all, select_one, batch_insert, count, execute_update +from common.fudb.dbapis.fu_db_api import select_all, select_one, batch_insert, count, execute_update, \ + execute_many_sql_with_tx class BaseDao(object): @@ -30,6 +31,9 @@ class BaseDao(object): def batch_insert(self, sql_tpl, data, batch_size, db_type='postgresql'): return batch_insert(self.connector.get_conn(), db_type, sql_tpl, data, batch_size) + def execute_update_tx(self, sql_list): + execute_many_sql_with_tx(self.connector.get_conn(), sql_list) + def dynamic_update_by_param(self, table_name, condition, param: dict): """ 动态更新语句 diff --git a/common/fudb/dbapis/fu_db_api.py b/common/fudb/dbapis/fu_db_api.py index 95f5345..e749b0c 100644 --- a/common/fudb/dbapis/fu_db_api.py +++ b/common/fudb/dbapis/fu_db_api.py @@ -44,6 +44,19 @@ def _execute_with_tx(conn: Connection, sql): conn.close() +def execute_many_sql_with_tx(conn: Connection, sql_list): + try: + conn.begin() + for sql in sql_list: + conn.execute(text(sql)) + conn.commit() + except Exception as e: + conn.rollback() + raise SqlExecuteError(msg=f'sql [{sql_list}] 执行失败,开始回滚,e={e}') + finally: + conn.close() + + def select_one(conn: Connection, sql): """ 查询一个 diff --git a/datahub/metadata/metadat_scan.py b/datahub/metadata/metadat_scan.py index 6f3070e..8e404f4 100644 --- a/datahub/metadata/metadat_scan.py +++ b/datahub/metadata/metadat_scan.py @@ -5,6 +5,7 @@ # @File : metadat_scan.py # @Project : futool-tiny-datahub # @Desc : +from multiprocessing.dummy import Pool from datahub.datasource.datasource_manage import DataSourceManage from datahub.metadata.metadata_reader import MetadataReader from datahub.metadata.metadata_warehouse import MetadataWareHouse @@ -19,24 +20,30 @@ class MetadataScanner(object): def __init__(self): self.datasource_manage = DataSourceManage() - @staticmethod - def _join_meta_detail(meta_datas, metadata_reader: MetadataReader, obj_type): - """ - 组装明细结果 - :return: - """ + def _join_meta_detail(self, meta_datas, source_id, obj_type): save_datas = [] - for i, t in enumerate(meta_datas): - meta_id, obj_name = t[0], t[1] + + def detail_handle(meta_data): + metadata_reader = MetadataReader(self.datasource_manage.get(source_id)) + meta_id, obj_name = meta_data[0], meta_data[1] meta_result = metadata_reader.query_metadata_detail(obj_name, obj_type) # 组装插入数据 - obj_data = ([meta_id] + list(x) for x in meta_result) - for d in obj_data: - save_datas.append(d) - log.info(f"剩余{len(meta_datas) - (i + 1)}") + if isinstance(meta_result, list): + obj_data = ([meta_id] + list(x) for x in meta_result) + for d in obj_data: + save_datas.append(d) + else: + # 视图及存储过程 + save_datas.append([meta_id, meta_result]) + + # 小于等于数据库连接数 + pool = Pool(15) + pool.map(detail_handle, meta_datas) + pool.close() + pool.join() return save_datas - def _scan_detail(self, warehouse: MetadataWareHouse, metadata_reader: MetadataReader): + def _scan_detail(self, warehouse: MetadataWareHouse): """ 获取明细数据 :param warehouse: 元数据仓库 @@ -49,26 +56,27 @@ class MetadataScanner(object): watch.start(f"{source_id}_table") meta_tables = warehouse.query_metadata_id_name(MetaDataObjType.Table.value) log.info(f"开始查询[{warehouse.source_id}] 表字段数据,共{len(meta_tables)}张表") - table_field_datas = self._join_meta_detail(meta_tables, metadata_reader, MetaDataObjType.Table.value) + table_field_datas = self._join_meta_detail(meta_tables, source_id, MetaDataObjType.Table.value) warehouse.save_metadata_obj_field(table_field_datas) table_time = watch.stop(f"{source_id}_table") - log.info(f"[{warehouse.source_id}] 表字段数据查询结束,耗时{table_time}秒") + log.info(f"[{warehouse.source_id}] 表字段数据入库结束,耗时{table_time}秒") # 读取视图创建语句 watch.start(f"{source_id}_view") meta_views = warehouse.query_metadata_id_name(MetaDataObjType.View.value) log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图") - view_datas = self._join_meta_detail(meta_views, metadata_reader, MetaDataObjType.View.value) + view_datas = self._join_meta_detail(meta_views, source_id, MetaDataObjType.View.value) warehouse.save_metadata_obj_detail(view_datas) view_time = watch.stop(f"{source_id}_view") - log.info(f"[{warehouse.source_id}] 视图语句查询结束,耗时{view_time}秒") + log.info(f"[{warehouse.source_id}] 视图语句入库结束,耗时{view_time}秒") # 读取存储过程创建语句 watch.start(f"{source_id}_procedure") meta_procedure = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value) - log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_views)}张过程") - procedure_datas = self._join_meta_detail(meta_procedure, metadata_reader, MetaDataObjType.Procedure.value) + log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_procedure)}张过程") + procedure_datas = self._join_meta_detail(meta_procedure, source_id, MetaDataObjType.Procedure.value) warehouse.save_metadata_obj_detail(procedure_datas) procedure_time = watch.stop(f"{source_id}_procedure") - log.info(f"[{warehouse.source_id}] 存储过程语句查询结束,耗时{procedure_time}秒") + log.info(f"[{warehouse.source_id}] 存储过程语句入库结束,耗时{procedure_time}秒") + watch.clear() def scan_metadata(self, source_id): """ @@ -79,27 +87,30 @@ class MetadataScanner(object): """ # 元数据仓库API warehouse = MetadataWareHouse(source_id) - # 获取待扫描数据源 - datasource = self.datasource_manage.get(source_id) # 初始化元数据读取器 - metadata_reader = MetadataReader(datasource) + metadata_reader = MetadataReader(self.datasource_manage.get(source_id)) # 获取版本号 - - # version_keeper = MetadataVersionKeeper(source_id) - # version_code = version_keeper.increase_version() - # # 分别读取表\视图\存储过程并入库 - # log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]') - # tables = metadata_reader.query_tables() - # warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code) - # log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') - # views = metadata_reader.query_views() - # warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code) - # log.info(f'[{source_id}]读取视图完毕,共{len(views)}张') - # procedures = metadata_reader.query_procedure() - # warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code) - # log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个') - # 查询明细数据 - self._scan_detail(warehouse, metadata_reader) + version_keeper = MetadataVersionKeeper(source_id) + # 版本自增 + version_code = version_keeper.increase_version() + # 分别读取表\视图\存储过程并入库 + log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]') + try: + tables = metadata_reader.query_tables() + warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code) + log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') + views = metadata_reader.query_views() + warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code) + log.info(f'[{source_id}]读取视图完毕,共{len(views)}张') + procedures = metadata_reader.query_procedure() + warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code) + log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个') + # 查询明细数据 + self._scan_detail(warehouse) + except Exception as e: + # 按照版本号全量删除 + warehouse.delete_by_version(version_code) + log.error(f'[{source_id}]元数据扫描异常,进行回滚,删除版本号为[{version_code}]全部数据,e={e}') if __name__ == '__main__': diff --git a/datahub/metadata/metadata_reader.py b/datahub/metadata/metadata_reader.py index bf262e2..aa8a556 100644 --- a/datahub/metadata/metadata_reader.py +++ b/datahub/metadata/metadata_reader.py @@ -64,7 +64,7 @@ class MetadataReader(AbsMetadataReader): def __init__(self, datasource: DataSource): super().__init__(datasource) self.db_type = datasource.connector.db_type - self.dao = MetadataDao(datasource.connector) + self.dao = MetadataDao(datasource.get_connector()) def query_tables(self): return self.dao.query_all_tables(reader_conf[self.db_type]['tables']) diff --git a/datahub/metadata/metadata_warehouse.py b/datahub/metadata/metadata_warehouse.py index 7e75c4e..2e108b9 100644 --- a/datahub/metadata/metadata_warehouse.py +++ b/datahub/metadata/metadata_warehouse.py @@ -122,3 +122,6 @@ class MetadataWareHouse(object): def is_view(self, view_name, version_code=None): return self.query_metadata_type_by_name(view_name, version_code) == MetaDataObjType.View.value + + def delete_by_version(self, version_code): + self.dao.remove_metadata_by_version(self.source_id, version_code) diff --git a/datahub/metadata/metadatadao/metadata_dao.py b/datahub/metadata/metadatadao/metadata_dao.py index 305548c..b7e27e4 100644 --- a/datahub/metadata/metadatadao/metadata_dao.py +++ b/datahub/metadata/metadatadao/metadata_dao.py @@ -121,7 +121,7 @@ class MetadataDao(BaseDao): :return: """ sql = f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'" \ - + self._get_version_sql(source_id, version_code)+" limit 10" + + self._get_version_sql(source_id, version_code) return self.query_all(sql) def query_metadata_create(self, obj_type, meta_id, source_id): @@ -175,3 +175,17 @@ class MetadataDao(BaseDao): + self._get_version_sql(source_id, version_code) rt = self.query_one(sql) return rt[0] if rt and len(rt) > 0 else None + + def remove_metadata_by_version(self, source_id, version_code): + """ + 根据版本号删除元数据 + :param source_id: + :param version_code: + :return: + """ + del_version_sql = f"delete from metadata_object_version_record where source_id = '{source_id}' AND version_code = {version_code}" + del_metadata_obj_field_detail = f"delete from metadata_object_field where meta_id in (SELECT meta_id FROM metadata_object WHERE source_id = '{source_id}' AND version_code = {version_code})" + del_metadata_obj_create_detail = f"delete from metadata_object_create where meta_id in (SELECT meta_id FROM metadata_object WHERE source_id = '{source_id}' AND version_code = {version_code})" + del_metadata_obj = f"delete from metadata_object where source_id = '{source_id}' AND version_code = {version_code}" + self.execute_update_tx( + [del_version_sql, del_metadata_obj_field_detail, del_metadata_obj_create_detail, del_metadata_obj])