feat: 新增视图递归分析并对接Neo4j

master
old-tom 2 years ago
parent 36b7eb1001
commit c89362d568

@ -7,7 +7,7 @@
# @Desc :
from sqlalchemy import Connection, text, CursorResult
from common.fudb.dbapis.fu_collection import split_coll
from common.futool.fu_collection import split_coll
class SqlExecuteError(Exception):

@ -22,3 +22,12 @@ def split_coll(data: [], part_size=5):
if j > 0 and j % part_size == 0:
rt.append(data[j:j + part_size])
return rt
def is_not_empty(coll: list) -> bool:
"""
集合不为空
:param coll:
:return:
"""
return coll is not None and len(coll) > 0

@ -6,56 +6,127 @@
# @Project : futool-tiny-datahub
# @Desc : 血缘关系上图,关系查询
from py2neo import Node, Relationship, Graph, Subgraph
from datahub.metadata.constant.metadata_constant import MetaDataObjType
from py2neo.matching import NodeMatcher
class MetaDataGraphBuilder(object):
class Neo4jHelper(object):
def __init__(self, graph: Graph):
self.graph = graph
self.node_matcher = NodeMatcher(graph)
def add_view_relation(self, tables: list, view_name: str):
def find_node(self, *labels, **properties):
"""
添加视图关系
:param tables: 创建视图的数据名称及类型[(名称大写,类型)]
:param view_name: 视图名
查找节点
:param labels:
:param properties:
:return: 可能有多个
"""
rt = self.node_matcher.match(*labels, **properties)
return rt
def find_node_by_id(self, node_id):
"""
节点ID 查找节点
:param node_id:
:return:
"""
source_nodes = []
for t in tables:
# 只添加表名和属性
source_nodes.append(Node(t[1], name=t[0]))
source_nodes.append(Node(MetaDataObjType.View.value, name=view_name))
self._add_one_to_many_relation(source_nodes, 'from')
return self.node_matcher.get(node_id)
def add_table_relation(self):
pass
def check_node_exists(self, *labels, **properties):
"""
判断节点是否存在
:param labels:
:param properties:
:return:
"""
return self.find_node(*labels, **properties).exists()
def same_node_count(self, *labels, **properties):
"""
相同节点数
:param labels:
:param properties:
:return:
"""
return self.find_node(labels, properties).count()
def add_procedure_relation(self, tables: list, procedure_name: str):
source_nodes = []
for t in tables:
# 只添加表名和属性
source_nodes.append(Node(t[1], name=t[0]))
source_nodes.append(Node(MetaDataObjType.View.value, name=procedure_name))
self._add_one_to_many_relation(source_nodes, 'into')
def find_same_node(self, *labels, **properties):
"""
查找相同节点
:param labels:
:param properties:
:return:
"""
return self.find_node(*labels, **properties).all()
def _add_one_to_many_relation(self, nodes: [], relation):
def merge_subgraph(self, relationship, label, *property_keys):
"""
添加一对多关系
:param one_node: 单节点
:param other_nodes: 多节点
:param relation: 关系描述
合并子图
:param property_keys: 判断节点重复属性
:param label: 节点标签
:param relationship:
:return:
"""
self.graph.merge(relationship, label, *property_keys)
def find_subgraph(self):
pass
def create_subgraph(self, nodes: list, edges: list):
"""
创建子图,相同节点不会自动去重
:param nodes: 节点列表
:param edges: 关系列表
:return:
"""
# 批量创建relation
relation_ls = []
for other in nodes[0:-1]:
relation_ls.append(Relationship(other, relation, nodes[-1]))
subgraph = Subgraph(nodes=nodes, relationships=relation_ls)
# 开启事务
tx = self.graph.begin()
try:
tx.create(subgraph)
tx.create(Subgraph(nodes=nodes, relationships=edges))
self.graph.commit(tx)
except Exception as e:
self.graph.rollback(tx)
print('图创建失败', e)
def create_node(self, *labels, **properties):
"""
创建节点
:param node:
:return:
"""
node = Node(*labels, **properties)
self.graph.create(node)
return node
def create_node_with_check(self, *labels, **properties):
"""
创建节点前判断是否存在
:param labels:
:param properties:
:return: 创建的节点或已经存在的节点
"""
if not self.check_node_exists(*labels, **properties):
return self.create_node(*labels, **properties)
return self.find_node(*labels, **properties).first()
def create_relationship(self, from_node, relation_desc, to_node):
"""
创建关系
:param to_node:
:param relation_desc: 关系描述
:param from_node:
:return:
"""
self.graph.create(Relationship(from_node, relation_desc, to_node))
def create_relationship_with_merge(self, from_node, relation_desc, to_node, label, *property_keys):
"""
创建关系并合并相同节点
:param label: 标签
:param from_node:
:param relation_desc: 关系描述
:param to_node:
:return:
"""
relation = Relationship(from_node, relation_desc, to_node)
self.graph.create(relation)
self.merge_subgraph(relation, label, *property_keys)

@ -79,6 +79,14 @@ class MetadataWareHouse(object):
"""
return self.dao.query_metadata_create(MetaDataObjType.View.value, meta_id, self.source_id)
def query_view_create_by_name(self, view_name):
"""
视图查询创建语句
:param view_name:
:return:
"""
return self.dao.query_metadata_create_by_name(view_name, self.source_id)
def query_procedure_create(self, meta_id):
"""
查询存储过程创建语句

@ -122,6 +122,17 @@ class MetadataDao(BaseDao):
sql=f"select create_sql from metadata_object_create where meta_id in (select meta_id from metadata_object where meta_id={meta_id} and source_id='{source_id}' and meta_type='{obj_type}')")
return create_sql[0] if create_sql and len(create_sql) > 0 else ''
def query_metadata_create_by_name(self, meta_name, source_id):
"""
名称查询创建语句
:param meta_name:
:param source_id:
:return:
"""
create_sql = self.query_one(
sql=f"select create_sql from metadata_object_create where meta_id=(select meta_id from metadata_object where meta_name='{meta_name}' and source_id='{source_id}')")
return create_sql[0] if create_sql and len(create_sql) > 0 else ''
def query_metadata_type(self, meta_id, source_id):
"""
查询元数据类型

@ -6,13 +6,45 @@
# @Project : futool-tiny-datahub
# @Desc : 血缘关系分析
import sys
import time
from sqllineage.runner import LineageRunner
from datahub.graph.graph_helper import MetaDataGraphBuilder
from datahub.graph.graph_helper import Neo4jHelper
from datahub.local_db_conf import graph
from common.log_conf import Logger
from datahub.metadata.metadata_warehouse import MetadataWareHouse
from datahub.metadata.constant.metadata_constant import MetaDataObjType
from common.futool.fu_collection import is_not_empty
logger = Logger().get_logger()
# 修改最大递归深度
sys.setrecursionlimit(500)
def analyze_view_sql(sql: str):
"""
sql 分析
:param sql:
:return:
"""
try:
analyze_result = LineageRunner(sql)
except Exception as e:
logger.error(f'sql解析异常,e={e}')
return None
try:
# 获取源表
source_table = analyze_result.source_tables
target_table = analyze_result.target_tables
if len(source_table) == len(target_table) and source_table[0] == target_table[0]:
# 防止无限递归
logger.warning(f'源表与目标表相同')
return None
except Exception as e:
logger.error(f'获取源表异常,e={e}')
return None
else:
return [str(s).split(sep='.')[1].upper() for s in source_table]
class MetadataRelationAnalyzer(object):
@ -24,7 +56,7 @@ class MetadataRelationAnalyzer(object):
def __init__(self, source_id):
self.source_id = source_id
self.warehouse = MetadataWareHouse(source_id)
self.graph_builder = MetaDataGraphBuilder(graph)
self.neo4j_helper = Neo4jHelper(graph)
def analyze(self):
"""
@ -33,42 +65,66 @@ class MetadataRelationAnalyzer(object):
视图数据来自多表即source:[t1,t2,t3] dest:视图根据结果创建出Node(节点)和RelationShip(关系),并且为单项
:return:
"""
# 分析视图
self.analyze_views()
# 分析存储过程
self.analyze_procedure()
def analyze_views(self):
"""
分析视图
:return:
"""
views = self.warehouse.query_metadata_id_name(MetaDataObjType.View.value)
if views and len(views) > 0:
# 查询创建语句
for v in views:
create_sql = self.warehouse.query_view_create(v[0])
# 分析SQL
try:
analyze_result = LineageRunner(create_sql)
except Exception as e:
print(f'视图{v[1]}分析SQL异常,e={e}')
continue
# 数据来源
for view in views:
logger.info(f'开始分析视图{view[1]}')
start_time = time.time()
try:
source_obj = analyze_result.source_tables
self._recurrence_view(view[1])
except Exception as e:
print(f'获取数据来源表异常,e={e}')
continue
else:
# 确认数据来源类型,来自表、视图
# 表名全大写
source = []
for s in source_obj:
source_name = str(s).split(sep='.')[1].upper()
source_type = self.warehouse.query_metadata_type_by_name(source_name)
# 名称 类型
source.append((source_name, source_type))
# 创建节点及关系
self.graph_builder.add_view_relation(source, v[1])
pass
logger.error(f'视图{view[1]}分析异常,e={e}')
finally:
stop_time = time.time()
logger.info(f'视图{view[1]}分析结束,耗时{round(stop_time - start_time, 2)}')
def analyze_view(self):
def _recurrence_view(self, view_name):
"""
分析视图
递归分析视图
:param view_name: 视图名称
:return:
"""
pass
create_sql = self.warehouse.query_view_create_by_name(view_name)
# source_table 可能包含表或视图
source_tables = analyze_view_sql(create_sql)
if is_not_empty(source_tables):
tables = []
views = []
for source_name in source_tables:
source_type = self.warehouse.query_metadata_type_by_name(source_name)
if source_type == MetaDataObjType.View.value:
views.append(source_name)
elif source_type == MetaDataObjType.Table.value:
tables.append(source_name)
# 构造节点与关系
# 根节点(视图),视图关系是1对多,使用传入的view_name作为根节点
root_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.View.value, self.source_id,
name=view_name)
for t in tables:
# 表节点
t_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value, self.source_id, name=t)
self.neo4j_helper.create_relationship_with_merge(t_node, 'from', root_node, MetaDataObjType.Table.value,
'name')
for v in views:
# 视图节点
v_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.View.value, self.source_id, name=v)
self.neo4j_helper.create_relationship_with_merge(v_node, 'from', root_node, MetaDataObjType.View.value,
'name')
if is_not_empty(views):
# 递归视图节点
for v in views:
self._recurrence_view(v)
def analyze_procedure(self):
"""
@ -79,6 +135,6 @@ class MetadataRelationAnalyzer(object):
if __name__ == '__main__':
# graph.delete_all()
graph.delete_all()
mta = MetadataRelationAnalyzer('834164a2d62de959c0261e6239dd1e55')
mta.analyze()

Loading…
Cancel
Save