diff --git a/common/futool/core/fu_id.py b/common/futool/core/fu_id.py index 2cb3973..b1fda37 100644 --- a/common/futool/core/fu_id.py +++ b/common/futool/core/fu_id.py @@ -7,7 +7,7 @@ # @Desc : ID 生成器 import time import abc -from datahub.log_conf import Logger +from datahub.log_conf import log logger = Logger().get_logger() diff --git a/datahub/datasource/datasource_manage.py b/datahub/datasource/datasource_manage.py index 1320d9b..06e2918 100644 --- a/datahub/datasource/datasource_manage.py +++ b/datahub/datasource/datasource_manage.py @@ -9,14 +9,10 @@ from common.fudb.connectors.connector_factory import ConnFactory from common.fudb.dbapis.fu_db_api import select_one from datahub.datasource.dsdao.ds_dao import DataSourceDao -from datahub.log_conf import Logger from datahub.datasource.constant import ds_conf_param, DUAL_DB_TYPE from datahub.local_db_conf import local_conn from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper -# 日志 -logger = Logger().get_logger() - class DataSource(object): def __init__(self, source_id, conf: ds_conf_param): diff --git a/datahub/datasource/dsdao/ds_dao.py b/datahub/datasource/dsdao/ds_dao.py index a4f79f8..711d90f 100644 --- a/datahub/datasource/dsdao/ds_dao.py +++ b/datahub/datasource/dsdao/ds_dao.py @@ -9,7 +9,7 @@ from common.fudb.connectors.connector_factory import ConnFactory from common.fudb.dbapis.fu_dao import BaseDao from sqlalchemy import text -from datahub.log_conf import Logger +from datahub.log_conf import log from datahub.datasource.constant import ds_conf_param # 日志 diff --git a/datahub/log_conf.py b/datahub/log_conf.py index 0a89ffc..c7d61d1 100644 --- a/datahub/log_conf.py +++ b/datahub/log_conf.py @@ -39,3 +39,6 @@ class Logger(object): def get_logger(self): return self.logger + + +log = Logger().get_logger() diff --git a/datahub/metadata/metadata_reader.py b/datahub/metadata/metadata_reader.py index 6fc080b..bf262e2 100644 --- a/datahub/metadata/metadata_reader.py +++ b/datahub/metadata/metadata_reader.py @@ -9,7 +9,6 @@ import abc import os from configparser import ConfigParser -from datahub.log_conf import Logger from datahub.datasource.datasource_manage import DataSource from datahub.metadata.metadatadao.metadata_dao import MetadataDao @@ -18,8 +17,6 @@ BASE_DIR = os.path.dirname(os.path.abspath(__file__)) reader_conf = ConfigParser() reader_conf.read(os.path.join(BASE_DIR, 'reader_conf.ini')) -logger = Logger().get_logger() - class AbsMetadataReader(metaclass=abc.ABCMeta): """ diff --git a/datahub/metadata/metadatadao/metadata_version_dao.py b/datahub/metadata/metadatadao/metadata_version_dao.py index 6b286e0..450feda 100644 --- a/datahub/metadata/metadatadao/metadata_version_dao.py +++ b/datahub/metadata/metadatadao/metadata_version_dao.py @@ -38,8 +38,7 @@ class MetadataVersionDao(BaseDao): :return: """ result = self.query_one( - f"select version_code from metadata_object_version_record where source_id='{source_id}' " - f"order by version_code desc limit 1") + f"select max(version_code) from metadata_object_version_record where source_id='{source_id}'") return result[0] if result else None def add_version(self, source_id): diff --git a/datahub/metadata/metaversion/metadata_version.py b/datahub/metadata/metaversion/metadata_version.py index 6a48469..4b31804 100644 --- a/datahub/metadata/metaversion/metadata_version.py +++ b/datahub/metadata/metaversion/metadata_version.py @@ -7,11 +7,10 @@ # @Desc : 元数据版本管理 from datahub.metadata.metadatadao.metadata_version_dao import MetadataVersionDao from datahub.local_db_conf import local_conn -from datahub.log_conf import Logger +from datahub.log_conf import log import threading lock = threading.RLock() -logger = Logger().get_logger() class MetadataVersionKeeper(object): @@ -30,9 +29,9 @@ class MetadataVersionKeeper(object): """ flag = self.dao.init_version(self.source_id) == 1 if flag: - logger.info(f"[{self.source_id}] 数据源初始化版本成功") + log.info(f"[{self.source_id}] 数据源初始化版本成功") else: - logger.error(f"[{self.source_id}] 数据源初始化版本失败") + log.error(f"[{self.source_id}] 数据源初始化版本失败") def get_latest_version(self): """ @@ -44,7 +43,7 @@ class MetadataVersionKeeper(object): def increase_version(self): """ 递增并返回最新版本 - 加锁操作 + 读改写加锁,多进程下锁无效 :return: """ try: @@ -52,11 +51,3 @@ class MetadataVersionKeeper(object): return self.dao.add_version(self.source_id) finally: lock.release() - - -if __name__ == '__main__': - version_keeper = MetadataVersionKeeper('834164a2d62de959c0261e6239dd1e55') - # 多线程测试 - - # for i in range(100): - # print(version_keeper.increase_version()) diff --git a/datahub/relation/relation_analyze.py b/datahub/relation/relation_analyze.py index d9e7a65..ef35b42 100644 --- a/datahub/relation/relation_analyze.py +++ b/datahub/relation/relation_analyze.py @@ -11,12 +11,10 @@ import sqlparse from sqllineage.runner import LineageRunner from datahub.graph.graph_helper import Neo4jHelper from datahub.local_db_conf import graph -from datahub.log_conf import Logger +from datahub.log_conf import log from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.metadata.constant.metadata_constant import MetaDataObjType -from common.futool.fu_collection import is_not_empty - -logger = Logger().get_logger() +from common.futool.core.fu_collection import is_not_empty def _format_table_name(unformat: str): @@ -118,7 +116,7 @@ class MetadataRelationAnalyzer(object): if views and len(views) > 0: # 查询创建语句 for view in views: - logger.info(f'开始分析视图{view[1]}') + log.info(f'开始分析视图{view[1]}') start_time = time.time() try: self._recurrence_view(view[1]) @@ -126,7 +124,7 @@ class MetadataRelationAnalyzer(object): logger.error(f'视图{view[1]}分析异常,e={e}') finally: stop_time = time.time() - logger.info(f'视图{view[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒') + log.info(f'视图{view[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒') def _recurrence_view(self, view_name): """ @@ -173,11 +171,11 @@ class MetadataRelationAnalyzer(object): """ procedures = self.warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value) procedures_count = len(procedures) - logger.info(f'开始分析存储过程语句,共{procedures_count}条') + log.info(f'开始分析存储过程语句,共{procedures_count}条') if is_not_empty(procedures): for i, procedure in enumerate(procedures): try: - logger.info(f'开始分析存储过程{procedure[1]}') + log.info(f'开始分析存储过程{procedure[1]}') start_time = time.time() create_sql = self.warehouse.query_view_create_by_name(procedure[1]) analyze_result = analyze_procedure_sql(create_sql) @@ -202,7 +200,7 @@ class MetadataRelationAnalyzer(object): logger.error(f'存储过程{procedure[1]}分析异常,e={e}') finally: stop_time = time.time() - logger.info( + log.info( f'存储过程{procedure[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒,剩余{procedures_count - (i + 1)}个') diff --git a/datahub/scheduletask/scan_task.py b/datahub/scheduletask/scan_task.py index 5beda88..9613a95 100644 --- a/datahub/scheduletask/scan_task.py +++ b/datahub/scheduletask/scan_task.py @@ -13,11 +13,9 @@ from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.scheduletask.scandao.scan_task_dao import ScanTaskDao from datahub.scheduletask.task_executor import ScheduleExecutor from datahub.scheduletask.schedule import CronExpTrigger -from datahub.log_conf import Logger +from datahub.log_conf import log from datahub.metadata.constant.metadata_constant import MetaDataObjType -logger = Logger().get_logger() - class ScanTaskManage(object): def __init__(self): @@ -87,7 +85,7 @@ class ScanTaskRunner(object): if enable_task: for task in enable_task: self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_print) - logger.info(f'datasource scan task [{task[0]}] submit success') + log.info(f'datasource scan task [{task[0]}] submit success') class ScanTaskExecutor(object): @@ -103,9 +101,9 @@ class ScanTaskExecutor(object): # 初始化元数据读取器 metadata_reader = MetadataReader(datasource) # 分别读取表\视图\存储过程并入库 - logger.info(f'开始扫描[{source_id}]元数据') + log.info(f'开始扫描[{source_id}]元数据') tables = metadata_reader.query_tables() - logger.info(f'[{source_id}]读取表完毕,共{len(tables)}张') + log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') def scan_metadata(self, source_id): """ @@ -121,13 +119,13 @@ class ScanTaskExecutor(object): # 初始化元数据读取器 metadata_reader = MetadataReader(datasource) # 分别读取表\视图\存储过程并入库 - logger.info(f'开始扫描[{source_id}]元数据') + log.info(f'开始扫描[{source_id}]元数据') tables = metadata_reader.query_tables() warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value) - logger.info(f'[{source_id}]读取表完毕,共{len(tables)}张') + log.info(f'[{source_id}]读取表完毕,共{len(tables)}张') views = metadata_reader.query_views() warehouse.save_metadata_obj(views, MetaDataObjType.View.value) - logger.info(f'[{source_id}]读取视图完毕,共{len(views)}张') + log.info(f'[{source_id}]读取视图完毕,共{len(views)}张') procedures = metadata_reader.query_procedure() warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value) - logger.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个') + log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个')