From 8822444bb9470de22f1f70a9d130a4171763bbb0 Mon Sep 17 00:00:00 2001 From: old-tom <892955278@qq.com> Date: Sat, 22 Apr 2023 23:15:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E8=BF=87=E7=A8=8B=E5=88=86=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/log_conf.py | 5 +- datahub/local_db_conf.py | 2 +- datahub/relation/relation_analyze.py | 88 +++++++++++++++++++++++++--- 3 files changed, 86 insertions(+), 9 deletions(-) diff --git a/common/log_conf.py b/common/log_conf.py index d82d48a..34c0fa7 100644 --- a/common/log_conf.py +++ b/common/log_conf.py @@ -7,10 +7,13 @@ # @Desc : 日志配置 import sys +import os from loguru import logger +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + # 日志输出路径 -LOG_PATH = '../logout/info_log.log' +LOG_PATH = os.path.join(BASE_DIR, r'logout\info_log.log') class Logger(object): diff --git a/datahub/local_db_conf.py b/datahub/local_db_conf.py index 16e513b..d2fcae5 100644 --- a/datahub/local_db_conf.py +++ b/datahub/local_db_conf.py @@ -14,4 +14,4 @@ local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432 local_conn = ConnFactory(local_db) # neo4j配置 -graph = Graph("bolt://localhost:7687", auth=("neo4j", "root@12345")) +graph = Graph("bolt://localhost:7687", auth=("neo4j", "clone-formula-shelf-hair-neptune-3446")) diff --git a/datahub/relation/relation_analyze.py b/datahub/relation/relation_analyze.py index d23aec7..1ecc7af 100644 --- a/datahub/relation/relation_analyze.py +++ b/datahub/relation/relation_analyze.py @@ -6,8 +6,8 @@ # @Project : futool-tiny-datahub # @Desc : 血缘关系分析 -import sys import time +import sqlparse from sqllineage.runner import LineageRunner from datahub.graph.graph_helper import Neo4jHelper from datahub.local_db_conf import graph @@ -17,14 +17,21 @@ from datahub.metadata.constant.metadata_constant import MetaDataObjType from common.futool.fu_collection import is_not_empty logger = Logger().get_logger() -# 修改最大递归深度 -sys.setrecursionlimit(500) + + +def _format_table_name(unformat: str): + """ + 格式化sqllineage输出表名 + :param unformat: + :return: + """ + return str(unformat).split(sep='.')[1].upper() def analyze_view_sql(sql: str): """ - sql 分析 - :param sql: + 视图sql分析 + :param sql: 视图创建语句 :return: """ try: @@ -36,7 +43,7 @@ def analyze_view_sql(sql: str): # 获取源表 source_table = analyze_result.source_tables target_table = analyze_result.target_tables - if len(source_table) == len(target_table) and source_table[0] == target_table[0]: + if source_table[0] == target_table[0]: # 防止无限递归 logger.warning(f'源表与目标表相同') return None @@ -47,6 +54,38 @@ def analyze_view_sql(sql: str): return [str(s).split(sep='.')[1].upper() for s in source_table] +def analyze_procedure_sql(sql: str): + """ + 解析存储过程 + 存储过程SQL相比视图存在一定特殊性,需要先进行SQL拆分 + :param sql: + :return: + """ + result = [] + split_result = sqlparse.split(sql, encoding='utf-8') + if is_not_empty(split_result): + for sql in split_result: + try: + analyze_result = LineageRunner(sql) + except Exception as e: + logger.error(f'sql解析异常,e={e}') + return None + try: + # 获取源表 + source_table = analyze_result.source_tables + target_table = analyze_result.target_tables + if len(target_table) == 1 and len(source_table) > 0: + # 目标表唯一且源表不为空 + result.append({ + 'target': _format_table_name(target_table[0]), + 'source': [_format_table_name(s) for s in source_table] + }) + except Exception as e: + logger.error(f'获取源表异常,e={e}') + return None + return result + + class MetadataRelationAnalyzer(object): """ 元数据关系分析 @@ -131,10 +170,45 @@ class MetadataRelationAnalyzer(object): 分析存储过程 :return: """ - pass + procedures = self.warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value) + procedures_count = len(procedures) + logger.info(f'开始分析存储过程语句,共{procedures_count}条') + if is_not_empty(procedures): + for i, procedure in enumerate(procedures): + try: + logger.info(f'开始分析存储过程{procedure[1]}') + start_time = time.time() + create_sql = self.warehouse.query_view_create_by_name(procedure[1]) + analyze_result = analyze_procedure_sql(create_sql) + if is_not_empty(analyze_result): + # 构建节点与关系 + for single_result in analyze_result: + # target 作为目标节点,source 作为源节点 source-->target + target = single_result['target'] + source = single_result['source'] + # 构建节点 + target_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value, + self.source_id, + name=target) + # 构建多对1关系 + for s in source: + # TODO 还需要判断类型 + s_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value, + self.source_id, + name=s) + self.neo4j_helper.create_relationship_with_merge(s_node, 'from', target_node, + MetaDataObjType.Table.value, + 'name') + except Exception as e: + logger.error(f'存储过程{procedure[1]}分析异常,e={e}') + finally: + stop_time = time.time() + logger.info( + f'存储过程{procedure[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒,剩余{procedures_count - (i + 1)}个') if __name__ == '__main__': graph.delete_all() mta = MetadataRelationAnalyzer('834164a2d62de959c0261e6239dd1e55') mta.analyze() + 'MATCH (c:table{name:"DW_LAND_PROBLEM"})<-[r*0..]-(result) return result'