You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

215 lines
8.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/9 9:18
# @Author : old tom
# @File : relation_analyze.py
# @Project : futool-tiny-datahub
# @Desc : 血缘关系分析
import time
import sqlparse
from sqllineage.runner import LineageRunner
from datahub.graph.graph_helper import Neo4jHelper
from datahub.local_db_conf import graph
from common.log_conf import Logger
from datahub.metadata.metadata_warehouse import MetadataWareHouse
from datahub.metadata.constant.metadata_constant import MetaDataObjType
from common.futool.fu_collection import is_not_empty
logger = Logger().get_logger()
def _format_table_name(unformat: str):
"""
格式化sqllineage输出表名
:param unformat:
:return:
"""
return str(unformat).split(sep='.')[1].upper()
def analyze_view_sql(sql: str):
"""
视图sql分析
:param sql: 视图创建语句
:return:
"""
try:
analyze_result = LineageRunner(sql)
except Exception as e:
logger.error(f'sql解析异常,e={e}')
return None
try:
# 获取源表
source_table = analyze_result.source_tables
target_table = analyze_result.target_tables
if source_table[0] == target_table[0]:
# 防止无限递归
logger.warning(f'源表与目标表相同')
return None
except Exception as e:
logger.error(f'获取源表异常,e={e}')
return None
else:
return [str(s).split(sep='.')[1].upper() for s in source_table]
def analyze_procedure_sql(sql: str):
"""
解析存储过程
存储过程SQL相比视图存在一定特殊性需要先进行SQL拆分
:param sql:
:return:
"""
result = []
split_result = sqlparse.split(sql, encoding='utf-8')
if is_not_empty(split_result):
for sql in split_result:
try:
analyze_result = LineageRunner(sql)
except Exception as e:
logger.error(f'sql解析异常,e={e}')
return None
try:
# 获取源表
source_table = analyze_result.source_tables
target_table = analyze_result.target_tables
if len(target_table) == 1 and len(source_table) > 0:
# 目标表唯一且源表不为空
result.append({
'target': _format_table_name(target_table[0]),
'source': [_format_table_name(s) for s in source_table]
})
except Exception as e:
logger.error(f'获取源表异常,e={e}')
return None
return result
class MetadataRelationAnalyzer(object):
"""
元数据关系分析
流程:从元数据存储读取视图-->分析视图(递归)-->视图与表关联关系-->存入Neo4j
"""
def __init__(self, source_id):
self.source_id = source_id
self.warehouse = MetadataWareHouse(source_id)
self.neo4j_helper = Neo4jHelper(graph)
def analyze(self):
"""
读取视图、存储过程
以视图为例:
视图数据来自多表即source:[t1,t2,t3] dest:视图。根据结果创建出Node(节点)和RelationShip(关系),并且为单项
:return:
"""
# 分析视图
self.analyze_views()
# 分析存储过程
self.analyze_procedure()
def analyze_views(self):
"""
分析视图
:return:
"""
views = self.warehouse.query_metadata_id_name(MetaDataObjType.View.value)
if views and len(views) > 0:
# 查询创建语句
for view in views:
logger.info(f'开始分析视图{view[1]}')
start_time = time.time()
try:
self._recurrence_view(view[1])
except Exception as e:
logger.error(f'视图{view[1]}分析异常,e={e}')
finally:
stop_time = time.time()
logger.info(f'视图{view[1]}分析结束,耗时{round(stop_time - start_time, 2)}')
def _recurrence_view(self, view_name):
"""
递归分析视图
:param view_name: 视图名称
:return:
"""
create_sql = self.warehouse.query_view_create_by_name(view_name)
# source_table 可能包含表或视图
source_tables = analyze_view_sql(create_sql)
if is_not_empty(source_tables):
tables = []
views = []
for source_name in source_tables:
source_type = self.warehouse.query_metadata_type_by_name(source_name)
if source_type == MetaDataObjType.View.value:
views.append(source_name)
elif source_type == MetaDataObjType.Table.value:
tables.append(source_name)
# 构造节点与关系
# 根节点(视图),视图关系是1对多,使用传入的view_name作为根节点
root_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.View.value, self.source_id,
name=view_name)
for t in tables:
# 表节点
t_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value, self.source_id, name=t)
self.neo4j_helper.create_relationship_with_merge(t_node, 'from', root_node, MetaDataObjType.Table.value,
'name')
for v in views:
# 视图节点
v_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.View.value, self.source_id, name=v)
self.neo4j_helper.create_relationship_with_merge(v_node, 'from', root_node, MetaDataObjType.View.value,
'name')
if is_not_empty(views):
# 递归视图节点
for v in views:
self._recurrence_view(v)
def analyze_procedure(self):
"""
分析存储过程
:return:
"""
procedures = self.warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value)
procedures_count = len(procedures)
logger.info(f'开始分析存储过程语句,共{procedures_count}')
if is_not_empty(procedures):
for i, procedure in enumerate(procedures):
try:
logger.info(f'开始分析存储过程{procedure[1]}')
start_time = time.time()
create_sql = self.warehouse.query_view_create_by_name(procedure[1])
analyze_result = analyze_procedure_sql(create_sql)
if is_not_empty(analyze_result):
# 构建节点与关系
for single_result in analyze_result:
# target 作为目标节点,source 作为源节点 source-->target
target = single_result['target']
source = single_result['source']
# 构建节点
target_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value,
self.source_id,
name=target)
# 构建多对1关系
for s in source:
# TODO 还需要判断类型
s_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value,
self.source_id,
name=s)
self.neo4j_helper.create_relationship_with_merge(s_node, 'from', target_node,
MetaDataObjType.Table.value,
'name')
except Exception as e:
logger.error(f'存储过程{procedure[1]}分析异常,e={e}')
finally:
stop_time = time.time()
logger.info(
f'存储过程{procedure[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒,剩余{procedures_count - (i + 1)}')
if __name__ == '__main__':
graph.delete_all()
mta = MetadataRelationAnalyzer('834164a2d62de959c0261e6239dd1e55')
mta.analyze()
'MATCH (c:table{name:"DW_LAND_PROBLEM"})<-[r*0..]-(result) return result'