diff --git a/datahub/graph/graph_helper.py b/datahub/graph/graph_helper.py new file mode 100644 index 0000000..137ca91 --- /dev/null +++ b/datahub/graph/graph_helper.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Time : 2023/4/15 11:05 +# @Author : old tom +# @File : graph_helper.py +# @Project : futool-tiny-datahub +# @Desc : 血缘关系上图,关系查询 +from py2neo import Node, Relationship, Graph, Subgraph +from datahub.metadata.constant.metadata_constant import MetaDataObjType + + +class MetaDataGraphBuilder(object): + def __init__(self, graph: Graph): + self.graph = graph + + def add_view_relation(self, tables: list, view_name: str): + """ + 添加视图关系 + :param tables: 创建视图的数据名称及类型[(名称大写,类型)] + :param view_name: 视图名 + :return: + """ + source_nodes = [] + for t in tables: + # 只添加表名和属性 + source_nodes.append(Node(t[1], name=t[0])) + source_nodes.append(Node(MetaDataObjType.View.value, name=view_name)) + self._add_one_to_many_relation(source_nodes, 'from') + + def add_table_relation(self): + pass + + def add_procedure_relation(self, tables: list, procedure_name: str): + source_nodes = [] + for t in tables: + # 只添加表名和属性 + source_nodes.append(Node(t[1], name=t[0])) + source_nodes.append(Node(MetaDataObjType.View.value, name=procedure_name)) + self._add_one_to_many_relation(source_nodes, 'into') + + def _add_one_to_many_relation(self, nodes: [], relation): + """ + 添加一对多关系 + :param one_node: 单节点 + :param other_nodes: 多节点 + :param relation: 关系描述 + :return: + """ + # 批量创建relation + relation_ls = [] + for other in nodes[0:-1]: + relation_ls.append(Relationship(other, relation, nodes[-1])) + subgraph = Subgraph(nodes=nodes, relationships=relation_ls) + # 开启事务 + tx = self.graph.begin() + try: + tx.create(subgraph) + self.graph.commit(tx) + except Exception as e: + self.graph.rollback(tx) + print('图创建失败', e) diff --git a/datahub/local_db_conf.py b/datahub/local_db_conf.py index 761fafa..16e513b 100644 --- a/datahub/local_db_conf.py +++ b/datahub/local_db_conf.py @@ -7,7 +7,11 @@ # @Desc : 数据库连接配置 from datahub.datasource.constant import ds_conf_param from common.fudb.connectors.connector_factory import ConnFactory +from py2neo import Graph # 系统使用数据库配置 数据库类型 用户名 密码 host 端口 默认数据库 local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres') local_conn = ConnFactory(local_db) + +# neo4j配置 +graph = Graph("bolt://localhost:7687", auth=("neo4j", "root@12345")) diff --git a/datahub/metadata/metadata_warehouse.py b/datahub/metadata/metadata_warehouse.py index ee00cf0..2e93ef4 100644 --- a/datahub/metadata/metadata_warehouse.py +++ b/datahub/metadata/metadata_warehouse.py @@ -9,6 +9,7 @@ from common.futool.fu_id import id_gen from datahub.metadata.metadatadao.metadata_dao import MetadataDao from datahub.local_db_conf import local_conn +from datahub.metadata.constant.metadata_constant import MetaDataObjType class MetadataWareHouse(object): @@ -69,3 +70,46 @@ class MetadataWareHouse(object): :return: """ return self.dao.query_metadata_id(obj_type) + + def query_view_create(self, meta_id): + """ + 查询视图创建语句 + :param meta_id: + :return: + """ + return self.dao.query_metadata_create(MetaDataObjType.View.value, meta_id, self.source_id) + + def query_procedure_create(self, meta_id): + """ + 查询存储过程创建语句 + :param meta_id: + :return: + """ + return self.dao.query_metadata_create(MetaDataObjType.Procedure.value, meta_id, self.source_id) + + def query_metadata_type(self, meta_id): + """ + 查询元数据类型 + :param meta_id: + :return: + """ + return self.dao.query_metadata_type(meta_id, self.source_id) + + def query_metadata_type_by_name(self, meta_name): + """ + 根据名称查询元数据类型 + :param meta_name: 表名、视图名、存储过程名 + :return: + """ + return self.dao.query_metadata_type_by_name(meta_name, self.source_id) + + def is_table(self, table_name): + return self.query_metadata_type_by_name(table_name) == MetaDataObjType.Table.value + + def is_view(self, view_name): + return self.query_metadata_type_by_name(view_name) == MetaDataObjType.View.value + + +if __name__ == '__main__': + mtw = MetadataWareHouse('834164a2d62de959c0261e6239dd1e55') + print(mtw.query_metadata_name('view')) diff --git a/datahub/metadata/metadatadao/metadata_dao.py b/datahub/metadata/metadatadao/metadata_dao.py index 5f0d4b3..cc8937d 100644 --- a/datahub/metadata/metadatadao/metadata_dao.py +++ b/datahub/metadata/metadatadao/metadata_dao.py @@ -108,3 +108,38 @@ class MetadataDao(BaseDao): return self.query_all( f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all( 'select meta_id,meta_name from metadata_object') + + def query_metadata_create(self, obj_type, meta_id, source_id): + """ + 查询创建语句 + :param obj_type: + :param meta_id: + :param source_id: + :return: + """ + # metadata_object_create与metadata_object非强外键关系,所以加入source_id保证查询结果正确 + create_sql = self.query_one( + sql=f"select create_sql from metadata_object_create where meta_id in (select meta_id from metadata_object where meta_id={meta_id} and source_id='{source_id}' and meta_type='{obj_type}')") + return create_sql[0] if create_sql and len(create_sql) > 0 else '' + + def query_metadata_type(self, meta_id, source_id): + """ + 查询元数据类型 + :param meta_id: + :param source_id: + :return: + """ + rt = self.query_one( + f"select meta_type from metadata_object where meta_id={meta_id} and source_id='{source_id}'") + return rt[0] if rt and len(rt) > 0 else None + + def query_metadata_type_by_name(self, meta_name, source_id): + """ + 根据元数据名称查询类型 + :param meta_name: 元数据名 + :param source_id: 源ID + :return: + """ + rt = self.query_one( + f"select meta_type from metadata_object where meta_name='{meta_name}' and source_id='{source_id}' limit 1") + return rt[0] if rt and len(rt) > 0 else None diff --git a/datahub/relation/relation_analyze.py b/datahub/relation/relation_analyze.py index e71ba8e..7346311 100644 --- a/datahub/relation/relation_analyze.py +++ b/datahub/relation/relation_analyze.py @@ -4,10 +4,16 @@ # @Author : old tom # @File : relation_analyze.py # @Project : futool-tiny-datahub -# @Desc : +# @Desc : 血缘关系分析 from sqllineage.runner import LineageRunner +from datahub.graph.graph_helper import MetaDataGraphBuilder +from datahub.local_db_conf import graph + +from datahub.metadata.metadata_warehouse import MetadataWareHouse +from datahub.metadata.constant.metadata_constant import MetaDataObjType + class MetadataRelationAnalyzer(object): """ @@ -17,13 +23,62 @@ class MetadataRelationAnalyzer(object): def __init__(self, source_id): self.source_id = source_id + self.warehouse = MetadataWareHouse(source_id) + self.graph_builder = MetaDataGraphBuilder(graph) + + def analyze(self): + """ + 读取视图、存储过程 + 以视图为例: + 视图数据来自多表,即source:[t1,t2,t3] dest:视图。根据结果创建出Node(节点)和RelationShip(关系),并且为单项 + :return: + """ + views = self.warehouse.query_metadata_id_name(MetaDataObjType.View.value) + if views and len(views) > 0: + # 查询创建语句 + for v in views: + create_sql = self.warehouse.query_view_create(v[0]) + # 分析SQL + try: + analyze_result = LineageRunner(create_sql) + except Exception as e: + print(f'视图{v[1]}分析SQL异常,e={e}') + continue + # 数据来源 + try: + source_obj = analyze_result.source_tables + except Exception as e: + print(f'获取数据来源表异常,e={e}') + continue + else: + # 确认数据来源类型,来自表、视图 + # 表名全大写 + source = [] + for s in source_obj: + source_name = str(s).split(sep='.')[1].upper() + source_type = self.warehouse.query_metadata_type_by_name(source_name) + # 名称 类型 + source.append((source_name, source_type)) + # 创建节点及关系 + self.graph_builder.add_view_relation(source, v[1]) + pass + + def analyze_view(self): + """ + 分析视图 + :return: + """ + pass + + def analyze_procedure(self): + """ + 分析存储过程 + :return: + """ + pass if __name__ == '__main__': - with open(r'D:\文档\工作\ATD\2023上半年\项目\事权\DW_FINANCE_PLAN_DETAIL_BASE_VW.sql', 'r', encoding='utf-8') as f: - sql = f.read() - runner = LineageRunner(sql, dialect='oracle') - # 画图 - # runner.draw(dialect='oracle') - print(runner) - print(runner.source_tables) + # graph.delete_all() + mta = MetadataRelationAnalyzer('834164a2d62de959c0261e6239dd1e55') + mta.analyze()