|
|
#!/usr/bin/env python
|
|
|
# -*- coding: utf-8 -*-
|
|
|
# @Time : 2023/4/9 9:18
|
|
|
# @Author : old tom
|
|
|
# @File : relation_analyze.py
|
|
|
# @Project : futool-tiny-datahub
|
|
|
# @Desc : 血缘关系分析
|
|
|
|
|
|
import time
|
|
|
import sqlparse
|
|
|
from sqllineage.runner import LineageRunner
|
|
|
from datahub.graph.graph_helper import Neo4jHelper
|
|
|
from datahub.local_db_conf import graph
|
|
|
from common.log_conf import Logger
|
|
|
from datahub.metadata.metadata_warehouse import MetadataWareHouse
|
|
|
from datahub.metadata.constant.metadata_constant import MetaDataObjType
|
|
|
from common.futool.fu_collection import is_not_empty
|
|
|
|
|
|
logger = Logger().get_logger()
|
|
|
|
|
|
|
|
|
def _format_table_name(unformat: str):
|
|
|
"""
|
|
|
格式化sqllineage输出表名
|
|
|
:param unformat:
|
|
|
:return:
|
|
|
"""
|
|
|
return str(unformat).split(sep='.')[1].upper()
|
|
|
|
|
|
|
|
|
def analyze_view_sql(sql: str):
|
|
|
"""
|
|
|
视图sql分析
|
|
|
:param sql: 视图创建语句
|
|
|
:return:
|
|
|
"""
|
|
|
try:
|
|
|
analyze_result = LineageRunner(sql)
|
|
|
except Exception as e:
|
|
|
logger.error(f'sql解析异常,e={e}')
|
|
|
return None
|
|
|
try:
|
|
|
# 获取源表
|
|
|
source_table = analyze_result.source_tables
|
|
|
target_table = analyze_result.target_tables
|
|
|
if source_table[0] == target_table[0]:
|
|
|
# 防止无限递归
|
|
|
logger.warning(f'源表与目标表相同')
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
logger.error(f'获取源表异常,e={e}')
|
|
|
return None
|
|
|
else:
|
|
|
return [str(s).split(sep='.')[1].upper() for s in source_table]
|
|
|
|
|
|
|
|
|
def analyze_procedure_sql(sql: str):
|
|
|
"""
|
|
|
解析存储过程
|
|
|
存储过程SQL相比视图存在一定特殊性,需要先进行SQL拆分
|
|
|
:param sql:
|
|
|
:return:
|
|
|
"""
|
|
|
result = []
|
|
|
split_result = sqlparse.split(sql, encoding='utf-8')
|
|
|
if is_not_empty(split_result):
|
|
|
for sql in split_result:
|
|
|
try:
|
|
|
analyze_result = LineageRunner(sql)
|
|
|
except Exception as e:
|
|
|
logger.error(f'sql解析异常,e={e}')
|
|
|
return None
|
|
|
try:
|
|
|
# 获取源表
|
|
|
source_table = analyze_result.source_tables
|
|
|
target_table = analyze_result.target_tables
|
|
|
if len(target_table) == 1 and len(source_table) > 0:
|
|
|
# 目标表唯一且源表不为空
|
|
|
result.append({
|
|
|
'target': _format_table_name(target_table[0]),
|
|
|
'source': [_format_table_name(s) for s in source_table]
|
|
|
})
|
|
|
except Exception as e:
|
|
|
logger.error(f'获取源表异常,e={e}')
|
|
|
return None
|
|
|
return result
|
|
|
|
|
|
|
|
|
class MetadataRelationAnalyzer(object):
|
|
|
"""
|
|
|
元数据关系分析
|
|
|
流程:从元数据存储读取视图-->分析视图(递归)-->视图与表关联关系-->存入Neo4j
|
|
|
"""
|
|
|
|
|
|
def __init__(self, source_id):
|
|
|
self.source_id = source_id
|
|
|
self.warehouse = MetadataWareHouse(source_id)
|
|
|
self.neo4j_helper = Neo4jHelper(graph)
|
|
|
|
|
|
def analyze(self):
|
|
|
"""
|
|
|
读取视图、存储过程
|
|
|
以视图为例:
|
|
|
视图数据来自多表,即source:[t1,t2,t3] dest:视图。根据结果创建出Node(节点)和RelationShip(关系),并且为单项
|
|
|
:return:
|
|
|
"""
|
|
|
# 分析视图
|
|
|
self.analyze_views()
|
|
|
# 分析存储过程
|
|
|
self.analyze_procedure()
|
|
|
|
|
|
def analyze_views(self):
|
|
|
"""
|
|
|
分析视图
|
|
|
:return:
|
|
|
"""
|
|
|
views = self.warehouse.query_metadata_id_name(MetaDataObjType.View.value)
|
|
|
if views and len(views) > 0:
|
|
|
# 查询创建语句
|
|
|
for view in views:
|
|
|
logger.info(f'开始分析视图{view[1]}')
|
|
|
start_time = time.time()
|
|
|
try:
|
|
|
self._recurrence_view(view[1])
|
|
|
except Exception as e:
|
|
|
logger.error(f'视图{view[1]}分析异常,e={e}')
|
|
|
finally:
|
|
|
stop_time = time.time()
|
|
|
logger.info(f'视图{view[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒')
|
|
|
|
|
|
def _recurrence_view(self, view_name):
|
|
|
"""
|
|
|
递归分析视图
|
|
|
:param view_name: 视图名称
|
|
|
:return:
|
|
|
"""
|
|
|
create_sql = self.warehouse.query_view_create_by_name(view_name)
|
|
|
# source_table 可能包含表或视图
|
|
|
source_tables = analyze_view_sql(create_sql)
|
|
|
if is_not_empty(source_tables):
|
|
|
tables = []
|
|
|
views = []
|
|
|
for source_name in source_tables:
|
|
|
source_type = self.warehouse.query_metadata_type_by_name(source_name)
|
|
|
# 区分视图及表
|
|
|
if source_type == MetaDataObjType.View.value:
|
|
|
views.append(source_name)
|
|
|
elif source_type == MetaDataObjType.Table.value:
|
|
|
tables.append(source_name)
|
|
|
# 构造节点与关系
|
|
|
# 根节点(视图),视图关系是1对多,使用传入的view_name作为根节点
|
|
|
root_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.View.value, self.source_id,
|
|
|
name=view_name)
|
|
|
for t in tables:
|
|
|
# 表节点
|
|
|
t_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value, self.source_id, name=t)
|
|
|
self.neo4j_helper.create_relationship_with_merge(t_node, 'from', root_node, MetaDataObjType.Table.value,
|
|
|
'name')
|
|
|
for v in views:
|
|
|
# 视图节点
|
|
|
v_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.View.value, self.source_id, name=v)
|
|
|
self.neo4j_helper.create_relationship_with_merge(v_node, 'from', root_node, MetaDataObjType.View.value,
|
|
|
'name')
|
|
|
if is_not_empty(views):
|
|
|
# 递归视图节点
|
|
|
for v in views:
|
|
|
self._recurrence_view(v)
|
|
|
|
|
|
def analyze_procedure(self):
|
|
|
"""
|
|
|
分析存储过程
|
|
|
:return:
|
|
|
"""
|
|
|
procedures = self.warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value)
|
|
|
procedures_count = len(procedures)
|
|
|
logger.info(f'开始分析存储过程语句,共{procedures_count}条')
|
|
|
if is_not_empty(procedures):
|
|
|
for i, procedure in enumerate(procedures):
|
|
|
try:
|
|
|
logger.info(f'开始分析存储过程{procedure[1]}')
|
|
|
start_time = time.time()
|
|
|
create_sql = self.warehouse.query_view_create_by_name(procedure[1])
|
|
|
analyze_result = analyze_procedure_sql(create_sql)
|
|
|
if is_not_empty(analyze_result):
|
|
|
# 构建节点与关系
|
|
|
for single_result in analyze_result:
|
|
|
# target 作为目标节点,source 作为源节点 source-->target
|
|
|
target = single_result['target']
|
|
|
source = single_result['source']
|
|
|
# 构建节点
|
|
|
target_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value,
|
|
|
self.source_id,
|
|
|
name=target)
|
|
|
# 构建多对1关系
|
|
|
for s in source:
|
|
|
source_type = self.warehouse.query_metadata_type_by_name(s)
|
|
|
s_node = self.neo4j_helper.create_node_with_check(source_type, self.source_id, name=s)
|
|
|
self.neo4j_helper.create_relationship_with_merge(s_node, 'from', target_node,
|
|
|
source_type,
|
|
|
'name')
|
|
|
except Exception as e:
|
|
|
logger.error(f'存储过程{procedure[1]}分析异常,e={e}')
|
|
|
finally:
|
|
|
stop_time = time.time()
|
|
|
logger.info(
|
|
|
f'存储过程{procedure[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒,剩余{procedures_count - (i + 1)}个')
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
graph.delete_all()
|
|
|
mta = MetadataRelationAnalyzer('834164a2d62de959c0261e6239dd1e55')
|
|
|
mta.analyze()
|