#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2023/5/9 16:06 # @Author : old tom # @File : metadat_scan.py # @Project : futool-tiny-datahub # @Desc : from multiprocessing.dummy import Pool from datahub.datasource.datasource_manage import DataSourceManage from datahub.metadata.metadata_reader import MetadataReader from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.metadata.constant.metadata_constant import MetaDataObjType from datahub.log_conf import log from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper from common.futool.core.fu_date import StopWatch class MetadataScanner(object): def __init__(self): self.datasource_manage = DataSourceManage() def _join_meta_detail(self, meta_datas, source_id, obj_type): save_datas = [] def detail_handle(meta_data): metadata_reader = MetadataReader(self.datasource_manage.get(source_id)) meta_id, obj_name = meta_data[0], meta_data[1] meta_result = metadata_reader.query_metadata_detail(obj_name, obj_type) # 组装插入数据 if isinstance(meta_result, list): obj_data = ([meta_id] + list(x) for x in meta_result) for d in obj_data: save_datas.append(d) else: # 视图及存储过程 save_datas.append([meta_id, meta_result]) # 小于等于数据库连接数 pool = Pool(15) pool.map(detail_handle, meta_datas) pool.close() pool.join() return save_datas def _scan_detail(self, warehouse: MetadataWareHouse): """ 获取明细数据 :param warehouse: 元数据仓库 :param metadata_reader: 元数据读取器 :return: """ source_id = warehouse.source_id watch = StopWatch() # 读取表字段 watch.start(f"{source_id}_table") meta_tables = warehouse.query_metadata_id_name(MetaDataObjType.Table.value) log.info(f"开始查询[{warehouse.source_id}] 表字段数据,共{len(meta_tables)}张表") table_field_datas = self._join_meta_detail(meta_tables, source_id, MetaDataObjType.Table.value) warehouse.save_metadata_obj_field(table_field_datas) table_time = watch.stop(f"{source_id}_table") log.info(f"[{warehouse.source_id}] 表字段数据入库结束,耗时{table_time}秒") # 读取视图创建语句 watch.start(f"{source_id}_view") meta_views = warehouse.query_metadata_id_name(MetaDataObjType.View.value) log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图") view_datas = self._join_meta_detail(meta_views, source_id, MetaDataObjType.View.value) warehouse.save_metadata_obj_detail(view_datas) view_time = watch.stop(f"{source_id}_view") log.info(f"[{warehouse.source_id}] 视图语句入库结束,耗时{view_time}秒") # 读取存储过程创建语句 watch.start(f"{source_id}_procedure") meta_procedure = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value) log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_procedure)}张过程") procedure_datas = self._join_meta_detail(meta_procedure, source_id, MetaDataObjType.Procedure.value) warehouse.save_metadata_obj_detail(procedure_datas) procedure_time = watch.stop(f"{source_id}_procedure") log.info(f"[{warehouse.source_id}] 存储过程语句入库结束,耗时{procedure_time}秒") watch.clear() def scan_metadata(self, source_id): """ TODO python暂时无法实现注解事务,后续使用动态代理或装饰器实现 扫描元数据 :param source_id: :return: """ # 元数据仓库API warehouse = MetadataWareHouse(source_id) # 初始化元数据读取器 metadata_reader = MetadataReader(self.datasource_manage.get(source_id)) # 获取版本号 version_keeper = MetadataVersionKeeper(source_id) # 版本自增 version_code = version_keeper.increase_version() # 分别读取表\视图\存储过程并入库 log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]') try: tables = metadata_reader.query_tables() warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code) log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') views = metadata_reader.query_views() warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code) log.info(f'[{source_id}]读取视图完毕,共{len(views)}张') procedures = metadata_reader.query_procedure() warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code) log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个') # 查询明细数据 self._scan_detail(warehouse) except Exception as e: # 按照版本号全量删除 warehouse.delete_by_version(version_code) log.error(f'[{source_id}]元数据扫描异常,进行回滚,删除版本号为[{version_code}]全部数据,e={e}') if __name__ == '__main__': scanner = MetadataScanner() scanner.scan_metadata('f98fede74826c709329a65d63db167df')