diff --git a/common/fudb/dbapis/fu_db_api.py b/common/fudb/dbapis/fu_db_api.py index 5c157ff..3d70a3d 100644 --- a/common/fudb/dbapis/fu_db_api.py +++ b/common/fudb/dbapis/fu_db_api.py @@ -7,7 +7,7 @@ # @Desc : from sqlalchemy import Connection, text, CursorResult -from common.fudb.dbapis.fu_collection import split_coll +from common.futool.fu_collection import split_coll class SqlExecuteError(Exception): diff --git a/common/fudb/dbapis/fu_collection.py b/common/futool/fu_collection.py similarity index 78% rename from common/fudb/dbapis/fu_collection.py rename to common/futool/fu_collection.py index af3ad4b..9c54f56 100644 --- a/common/fudb/dbapis/fu_collection.py +++ b/common/futool/fu_collection.py @@ -22,3 +22,12 @@ def split_coll(data: [], part_size=5): if j > 0 and j % part_size == 0: rt.append(data[j:j + part_size]) return rt + + +def is_not_empty(coll: list) -> bool: + """ + 集合不为空 + :param coll: + :return: + """ + return coll is not None and len(coll) > 0 diff --git a/datahub/graph/graph_helper.py b/datahub/graph/graph_helper.py index 137ca91..d382ae3 100644 --- a/datahub/graph/graph_helper.py +++ b/datahub/graph/graph_helper.py @@ -6,56 +6,127 @@ # @Project : futool-tiny-datahub # @Desc : 血缘关系上图,关系查询 from py2neo import Node, Relationship, Graph, Subgraph -from datahub.metadata.constant.metadata_constant import MetaDataObjType +from py2neo.matching import NodeMatcher -class MetaDataGraphBuilder(object): +class Neo4jHelper(object): def __init__(self, graph: Graph): self.graph = graph + self.node_matcher = NodeMatcher(graph) - def add_view_relation(self, tables: list, view_name: str): + def find_node(self, *labels, **properties): """ - 添加视图关系 - :param tables: 创建视图的数据名称及类型[(名称大写,类型)] - :param view_name: 视图名 + 查找节点 + :param labels: + :param properties: + :return: 可能有多个 + """ + rt = self.node_matcher.match(*labels, **properties) + return rt + + def find_node_by_id(self, node_id): + """ + 节点ID 查找节点 + :param node_id: :return: """ - source_nodes = [] - for t in tables: - # 只添加表名和属性 - source_nodes.append(Node(t[1], name=t[0])) - source_nodes.append(Node(MetaDataObjType.View.value, name=view_name)) - self._add_one_to_many_relation(source_nodes, 'from') + return self.node_matcher.get(node_id) - def add_table_relation(self): - pass + def check_node_exists(self, *labels, **properties): + """ + 判断节点是否存在 + :param labels: + :param properties: + :return: + """ + return self.find_node(*labels, **properties).exists() + + def same_node_count(self, *labels, **properties): + """ + 相同节点数 + :param labels: + :param properties: + :return: + """ + return self.find_node(labels, properties).count() - def add_procedure_relation(self, tables: list, procedure_name: str): - source_nodes = [] - for t in tables: - # 只添加表名和属性 - source_nodes.append(Node(t[1], name=t[0])) - source_nodes.append(Node(MetaDataObjType.View.value, name=procedure_name)) - self._add_one_to_many_relation(source_nodes, 'into') + def find_same_node(self, *labels, **properties): + """ + 查找相同节点 + :param labels: + :param properties: + :return: + """ + return self.find_node(*labels, **properties).all() - def _add_one_to_many_relation(self, nodes: [], relation): + def merge_subgraph(self, relationship, label, *property_keys): """ - 添加一对多关系 - :param one_node: 单节点 - :param other_nodes: 多节点 - :param relation: 关系描述 + 合并子图 + :param property_keys: 判断节点重复属性 + :param label: 节点标签 + :param relationship: + :return: + """ + self.graph.merge(relationship, label, *property_keys) + + def find_subgraph(self): + pass + + def create_subgraph(self, nodes: list, edges: list): + """ + 创建子图,注:相同节点不会自动去重 + :param nodes: 节点列表 + :param edges: 边(关系列表) :return: """ - # 批量创建relation - relation_ls = [] - for other in nodes[0:-1]: - relation_ls.append(Relationship(other, relation, nodes[-1])) - subgraph = Subgraph(nodes=nodes, relationships=relation_ls) # 开启事务 tx = self.graph.begin() try: - tx.create(subgraph) + tx.create(Subgraph(nodes=nodes, relationships=edges)) self.graph.commit(tx) except Exception as e: self.graph.rollback(tx) - print('图创建失败', e) + + def create_node(self, *labels, **properties): + """ + 创建节点 + :param node: + :return: + """ + node = Node(*labels, **properties) + self.graph.create(node) + return node + + def create_node_with_check(self, *labels, **properties): + """ + 创建节点前判断是否存在 + :param labels: + :param properties: + :return: 创建的节点或已经存在的节点 + """ + if not self.check_node_exists(*labels, **properties): + return self.create_node(*labels, **properties) + return self.find_node(*labels, **properties).first() + + def create_relationship(self, from_node, relation_desc, to_node): + """ + 创建关系 + :param to_node: + :param relation_desc: 关系描述 + :param from_node: + :return: + """ + self.graph.create(Relationship(from_node, relation_desc, to_node)) + + def create_relationship_with_merge(self, from_node, relation_desc, to_node, label, *property_keys): + """ + 创建关系并合并相同节点 + :param label: 标签 + :param from_node: + :param relation_desc: 关系描述 + :param to_node: + :return: + """ + relation = Relationship(from_node, relation_desc, to_node) + self.graph.create(relation) + self.merge_subgraph(relation, label, *property_keys) diff --git a/datahub/metadata/metadata_warehouse.py b/datahub/metadata/metadata_warehouse.py index 2e93ef4..bccacab 100644 --- a/datahub/metadata/metadata_warehouse.py +++ b/datahub/metadata/metadata_warehouse.py @@ -79,6 +79,14 @@ class MetadataWareHouse(object): """ return self.dao.query_metadata_create(MetaDataObjType.View.value, meta_id, self.source_id) + def query_view_create_by_name(self, view_name): + """ + 视图查询创建语句 + :param view_name: + :return: + """ + return self.dao.query_metadata_create_by_name(view_name, self.source_id) + def query_procedure_create(self, meta_id): """ 查询存储过程创建语句 diff --git a/datahub/metadata/metadatadao/metadata_dao.py b/datahub/metadata/metadatadao/metadata_dao.py index cc8937d..cdb94e2 100644 --- a/datahub/metadata/metadatadao/metadata_dao.py +++ b/datahub/metadata/metadatadao/metadata_dao.py @@ -122,6 +122,17 @@ class MetadataDao(BaseDao): sql=f"select create_sql from metadata_object_create where meta_id in (select meta_id from metadata_object where meta_id={meta_id} and source_id='{source_id}' and meta_type='{obj_type}')") return create_sql[0] if create_sql and len(create_sql) > 0 else '' + def query_metadata_create_by_name(self, meta_name, source_id): + """ + 名称查询创建语句 + :param meta_name: + :param source_id: + :return: + """ + create_sql = self.query_one( + sql=f"select create_sql from metadata_object_create where meta_id=(select meta_id from metadata_object where meta_name='{meta_name}' and source_id='{source_id}')") + return create_sql[0] if create_sql and len(create_sql) > 0 else '' + def query_metadata_type(self, meta_id, source_id): """ 查询元数据类型 diff --git a/datahub/relation/relation_analyze.py b/datahub/relation/relation_analyze.py index 7346311..d23aec7 100644 --- a/datahub/relation/relation_analyze.py +++ b/datahub/relation/relation_analyze.py @@ -6,13 +6,45 @@ # @Project : futool-tiny-datahub # @Desc : 血缘关系分析 +import sys +import time from sqllineage.runner import LineageRunner - -from datahub.graph.graph_helper import MetaDataGraphBuilder +from datahub.graph.graph_helper import Neo4jHelper from datahub.local_db_conf import graph - +from common.log_conf import Logger from datahub.metadata.metadata_warehouse import MetadataWareHouse from datahub.metadata.constant.metadata_constant import MetaDataObjType +from common.futool.fu_collection import is_not_empty + +logger = Logger().get_logger() +# 修改最大递归深度 +sys.setrecursionlimit(500) + + +def analyze_view_sql(sql: str): + """ + sql 分析 + :param sql: + :return: + """ + try: + analyze_result = LineageRunner(sql) + except Exception as e: + logger.error(f'sql解析异常,e={e}') + return None + try: + # 获取源表 + source_table = analyze_result.source_tables + target_table = analyze_result.target_tables + if len(source_table) == len(target_table) and source_table[0] == target_table[0]: + # 防止无限递归 + logger.warning(f'源表与目标表相同') + return None + except Exception as e: + logger.error(f'获取源表异常,e={e}') + return None + else: + return [str(s).split(sep='.')[1].upper() for s in source_table] class MetadataRelationAnalyzer(object): @@ -24,7 +56,7 @@ class MetadataRelationAnalyzer(object): def __init__(self, source_id): self.source_id = source_id self.warehouse = MetadataWareHouse(source_id) - self.graph_builder = MetaDataGraphBuilder(graph) + self.neo4j_helper = Neo4jHelper(graph) def analyze(self): """ @@ -33,42 +65,66 @@ class MetadataRelationAnalyzer(object): 视图数据来自多表,即source:[t1,t2,t3] dest:视图。根据结果创建出Node(节点)和RelationShip(关系),并且为单项 :return: """ + # 分析视图 + self.analyze_views() + # 分析存储过程 + self.analyze_procedure() + + def analyze_views(self): + """ + 分析视图 + :return: + """ views = self.warehouse.query_metadata_id_name(MetaDataObjType.View.value) if views and len(views) > 0: # 查询创建语句 - for v in views: - create_sql = self.warehouse.query_view_create(v[0]) - # 分析SQL - try: - analyze_result = LineageRunner(create_sql) - except Exception as e: - print(f'视图{v[1]}分析SQL异常,e={e}') - continue - # 数据来源 + for view in views: + logger.info(f'开始分析视图{view[1]}') + start_time = time.time() try: - source_obj = analyze_result.source_tables + self._recurrence_view(view[1]) except Exception as e: - print(f'获取数据来源表异常,e={e}') - continue - else: - # 确认数据来源类型,来自表、视图 - # 表名全大写 - source = [] - for s in source_obj: - source_name = str(s).split(sep='.')[1].upper() - source_type = self.warehouse.query_metadata_type_by_name(source_name) - # 名称 类型 - source.append((source_name, source_type)) - # 创建节点及关系 - self.graph_builder.add_view_relation(source, v[1]) - pass + logger.error(f'视图{view[1]}分析异常,e={e}') + finally: + stop_time = time.time() + logger.info(f'视图{view[1]}分析结束,耗时{round(stop_time - start_time, 2)}秒') - def analyze_view(self): + def _recurrence_view(self, view_name): """ - 分析视图 + 递归分析视图 + :param view_name: 视图名称 :return: """ - pass + create_sql = self.warehouse.query_view_create_by_name(view_name) + # source_table 可能包含表或视图 + source_tables = analyze_view_sql(create_sql) + if is_not_empty(source_tables): + tables = [] + views = [] + for source_name in source_tables: + source_type = self.warehouse.query_metadata_type_by_name(source_name) + if source_type == MetaDataObjType.View.value: + views.append(source_name) + elif source_type == MetaDataObjType.Table.value: + tables.append(source_name) + # 构造节点与关系 + # 根节点(视图),视图关系是1对多,使用传入的view_name作为根节点 + root_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.View.value, self.source_id, + name=view_name) + for t in tables: + # 表节点 + t_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.Table.value, self.source_id, name=t) + self.neo4j_helper.create_relationship_with_merge(t_node, 'from', root_node, MetaDataObjType.Table.value, + 'name') + for v in views: + # 视图节点 + v_node = self.neo4j_helper.create_node_with_check(MetaDataObjType.View.value, self.source_id, name=v) + self.neo4j_helper.create_relationship_with_merge(v_node, 'from', root_node, MetaDataObjType.View.value, + 'name') + if is_not_empty(views): + # 递归视图节点 + for v in views: + self._recurrence_view(v) def analyze_procedure(self): """ @@ -79,6 +135,6 @@ class MetadataRelationAnalyzer(object): if __name__ == '__main__': - # graph.delete_all() + graph.delete_all() mta = MetadataRelationAnalyzer('834164a2d62de959c0261e6239dd1e55') mta.analyze()