fix: 元数据查询增加版本号控制

master
old-tom 2 years ago
parent c2a89c247f
commit a001e27f01

@ -9,8 +9,6 @@ import time
import abc
from datahub.log_conf import log
logger = Logger().get_logger()
class IdGenerator(object):
"""
@ -95,7 +93,7 @@ class SnowFlakeId(AbsIdGenerator):
timestamp = self._gen_timestamp()
# 时钟回拨
if timestamp < self.last_timestamp:
logger.error('clock is moving backwards. Rejecting requests until{}'.format(self.last_timestamp))
log.error('clock is moving backwards. Rejecting requests until{}'.format(self.last_timestamp))
raise Exception
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.SEQUENCE_MASK

@ -12,9 +12,6 @@ from sqlalchemy import text
from datahub.log_conf import log
from datahub.datasource.constant import ds_conf_param
# 日志
logger = Logger().get_logger()
class DataSourceDao(BaseDao):
def __init__(self, connector: ConnFactory):
@ -36,7 +33,7 @@ class DataSourceDao(BaseDao):
return True
except Exception as e:
conn.rollback()
logger.error(f'添加数据源[{conf}]失败,e={e}')
log.error(f'添加数据源[{conf}]失败,e={e}')
return False
finally:
conn.close()

@ -14,4 +14,4 @@ local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432
local_conn = ConnFactory(local_db)
# neo4j配置
graph = Graph("bolt://localhost:7687", auth=("neo4j", "root@12345"))
graph = Graph("bolt://localhost:7687", auth=("neo4j", "havana-academy-emerald-herman-scale-5422"))

@ -0,0 +1,107 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/5/9 16:06
# @Author : old tom
# @File : metadat_scan.py
# @Project : futool-tiny-datahub
# @Desc :
from datahub.datasource.datasource_manage import DataSourceManage
from datahub.metadata.metadata_reader import MetadataReader
from datahub.metadata.metadata_warehouse import MetadataWareHouse
from datahub.metadata.constant.metadata_constant import MetaDataObjType
from datahub.log_conf import log
from datahub.metadata.metaversion.metadata_version import MetadataVersionKeeper
from common.futool.core.fu_date import StopWatch
class MetadataScanner(object):
def __init__(self):
self.datasource_manage = DataSourceManage()
@staticmethod
def _join_meta_detail(meta_datas, metadata_reader: MetadataReader, obj_type):
"""
组装明细结果
:return:
"""
save_datas = []
for i, t in enumerate(meta_datas):
meta_id, obj_name = t[0], t[1]
meta_result = metadata_reader.query_metadata_detail(obj_name, obj_type)
# 组装插入数据
obj_data = ([meta_id] + list(x) for x in meta_result)
for d in obj_data:
save_datas.append(d)
log.info(f"剩余{len(meta_datas) - (i + 1)}")
return save_datas
def _scan_detail(self, warehouse: MetadataWareHouse, metadata_reader: MetadataReader):
"""
获取明细数据
:param warehouse: 元数据仓库
:param metadata_reader: 元数据读取器
:return:
"""
source_id = warehouse.source_id
watch = StopWatch()
# 读取表字段
watch.start(f"{source_id}_table")
meta_tables = warehouse.query_metadata_id_name(MetaDataObjType.Table.value)
log.info(f"开始查询[{warehouse.source_id}] 表字段数据,共{len(meta_tables)}张表")
table_field_datas = self._join_meta_detail(meta_tables, metadata_reader, MetaDataObjType.Table.value)
warehouse.save_metadata_obj_field(table_field_datas)
table_time = watch.stop(f"{source_id}_table")
log.info(f"[{warehouse.source_id}] 表字段数据查询结束,耗时{table_time}")
# 读取视图创建语句
watch.start(f"{source_id}_view")
meta_views = warehouse.query_metadata_id_name(MetaDataObjType.View.value)
log.info(f"开始查询[{warehouse.source_id}] 视图创建语句,共{len(meta_views)}张视图")
view_datas = self._join_meta_detail(meta_views, metadata_reader, MetaDataObjType.View.value)
warehouse.save_metadata_obj_detail(view_datas)
view_time = watch.stop(f"{source_id}_view")
log.info(f"[{warehouse.source_id}] 视图语句查询结束,耗时{view_time}")
# 读取存储过程创建语句
watch.start(f"{source_id}_procedure")
meta_procedure = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value)
log.info(f"开始查询[{warehouse.source_id}] 存储过程创建语句,共{len(meta_views)}张过程")
procedure_datas = self._join_meta_detail(meta_procedure, metadata_reader, MetaDataObjType.Procedure.value)
warehouse.save_metadata_obj_detail(procedure_datas)
procedure_time = watch.stop(f"{source_id}_procedure")
log.info(f"[{warehouse.source_id}] 存储过程语句查询结束,耗时{procedure_time}")
def scan_metadata(self, source_id):
"""
TODO python暂时无法实现注解事务,后续使用动态代理或装饰器实现
扫描元数据
:param source_id:
:return:
"""
# 元数据仓库API
warehouse = MetadataWareHouse(source_id)
# 获取待扫描数据源
datasource = self.datasource_manage.get(source_id)
# 初始化元数据读取器
metadata_reader = MetadataReader(datasource)
# 获取版本号
# version_keeper = MetadataVersionKeeper(source_id)
# version_code = version_keeper.increase_version()
# # 分别读取表\视图\存储过程并入库
# log.info(f'开始扫描[{source_id}]元数据,版本号为[{version_code}]')
# tables = metadata_reader.query_tables()
# warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value, version_code)
# log.info(f'[{source_id}]读取表完毕,共{len(tables)}张')
# views = metadata_reader.query_views()
# warehouse.save_metadata_obj(views, MetaDataObjType.View.value, version_code)
# log.info(f'[{source_id}]读取视图完毕,共{len(views)}张')
# procedures = metadata_reader.query_procedure()
# warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value, version_code)
# log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}个')
# 查询明细数据
self._scan_detail(warehouse, metadata_reader)
if __name__ == '__main__':
scanner = MetadataScanner()
scanner.scan_metadata('834164a2d62de959c0261e6239dd1e55')

@ -21,14 +21,15 @@ class MetadataWareHouse(object):
self.source_id = source_id
self.dao = MetadataDao(local_conn)
def save_metadata_obj(self, objs, obj_type):
def save_metadata_obj(self, objs, obj_type, version_code):
"""
保存元数据对象
:param version_code: 版本号
:param obj_type: 元数据类型
:param objs:
:return:
"""
data = [(id_gen.get_id(), self.source_id, obj_type, x[0], x[1]) for x in objs]
data = [(id_gen.get_id(), self.source_id, obj_type, x[0], x[1], version_code) for x in objs]
return self.dao.save_metadata_obj(data)
def save_metadata_obj_detail(self, details):
@ -55,21 +56,23 @@ class MetadataWareHouse(object):
"""
return self.dao.query_metadata_by_type(obj_type)
def query_metadata_name(self, obj_type=None):
def query_metadata_name(self, obj_type=None, version_code=None):
"""
查询元数据名称
:param version_code: 版本号
:param obj_type:
:return:
"""
return self.dao.query_metadata_name_by_type(obj_type)
return self.dao.query_metadata_name_by_type(obj_type, self.source_id, version_code)
def query_metadata_id_name(self, obj_type=None):
def query_metadata_id_name(self, obj_type=None, version_code=None):
"""
查询元数据ID及名称
:param version_code: 版本号
:param obj_type:
:return:
"""
return self.dao.query_metadata_id(obj_type)
return self.dao.query_metadata_id_name(obj_type, self.source_id, version_code)
def query_view_create(self, meta_id):
"""
@ -79,13 +82,14 @@ class MetadataWareHouse(object):
"""
return self.dao.query_metadata_create(MetaDataObjType.View.value, meta_id, self.source_id)
def query_view_create_by_name(self, view_name):
def query_view_create_by_name(self, view_name, version_code=None):
"""
视图查询创建语句
:param version_code: 版本号
:param view_name:
:return:
"""
return self.dao.query_metadata_create_by_name(view_name, self.source_id)
return self.dao.query_metadata_create_by_name(view_name, self.source_id, version_code)
def query_procedure_create(self, meta_id):
"""
@ -95,24 +99,26 @@ class MetadataWareHouse(object):
"""
return self.dao.query_metadata_create(MetaDataObjType.Procedure.value, meta_id, self.source_id)
def query_metadata_type(self, meta_id):
def query_metadata_type(self, meta_id, version_code=None):
"""
查询元数据类型
:param version_code: 版本号
:param meta_id:
:return:
"""
return self.dao.query_metadata_type(meta_id, self.source_id)
return self.dao.query_metadata_type(meta_id, self.source_id, version_code)
def query_metadata_type_by_name(self, meta_name):
def query_metadata_type_by_name(self, meta_name, version_code=None):
"""
根据名称查询元数据类型
:param version_code: 版本号
:param meta_name: 表名视图名存储过程名
:return:
"""
return self.dao.query_metadata_type_by_name(meta_name, self.source_id)
return self.dao.query_metadata_type_by_name(meta_name, self.source_id, version_code)
def is_table(self, table_name):
return self.query_metadata_type_by_name(table_name) == MetaDataObjType.Table.value
def is_table(self, table_name, version_code=None):
return self.query_metadata_type_by_name(table_name, version_code) == MetaDataObjType.Table.value
def is_view(self, view_name):
return self.query_metadata_type_by_name(view_name) == MetaDataObjType.View.value
def is_view(self, view_name, version_code=None):
return self.query_metadata_type_by_name(view_name, version_code) == MetaDataObjType.View.value

@ -82,7 +82,7 @@ class MetadataDao(BaseDao):
:return:
"""
return self.batch_insert(
sql_tpl='insert into metadata_object(meta_id,source_id,meta_type,meta_name,meta_ch_name) values %s',
sql_tpl='insert into metadata_object(meta_id,source_id,meta_type,meta_name,meta_ch_name,version_code) values %s',
data=objs, batch_size=1500)
def save_metadata_create(self, details):
@ -90,24 +90,39 @@ class MetadataDao(BaseDao):
sql_tpl='insert into metadata_object_create(meta_id,create_sql) values %s',
data=details, batch_size=1500)
@staticmethod
def _get_version_sql(source_id, version_code=None):
return ' and ' + (f'version_code={version_code}' if version_code \
else f"version_code=(select max(version_code) from metadata_object_version_record where source_id='{source_id}')")
def query_metadata_by_type(self, obj_type):
return self.query_all(
f"select * from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all(
'select * from metadata_object')
def query_metadata_name_by_type(self, obj_type):
def query_metadata_name_by_type(self, obj_type, source_id, version_code=None):
"""
查询元数据名称
:param obj_type:
:param version_code: 版本号,默认查最新
:param source_id: 源ID
:param obj_type: 类型
:return:
"""
sql = f"select meta_name from metadata_object where meta_type='{obj_type}'" if obj_type else 'select meta_name from metadata_object'
sql = f"select meta_name from metadata_object where source_id='{source_id}' and meta_type='{obj_type}'" + self._get_version_sql(
source_id, version_code)
return [x[0] for x in self.query_all(sql)]
def query_metadata_id(self, obj_type):
return self.query_all(
f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all(
'select meta_id,meta_name from metadata_object')
def query_metadata_id_name(self, obj_type, source_id, version_code=None):
"""
查询元数据meta_id及meta_name
:param obj_type:
:param source_id:
:param version_code: 版本号
:return:
"""
sql = f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'" \
+ self._get_version_sql(source_id, version_code)+" limit 10"
return self.query_all(sql)
def query_metadata_create(self, obj_type, meta_id, source_id):
"""
@ -118,39 +133,45 @@ class MetadataDao(BaseDao):
:return:
"""
# metadata_object_create与metadata_object非强外键关系所以加入source_id保证查询结果正确
create_sql = self.query_one(
sql=f"select create_sql from metadata_object_create where meta_id in (select meta_id from metadata_object where meta_id={meta_id} and source_id='{source_id}' and meta_type='{obj_type}')")
sql = f"select create_sql from metadata_object_create where meta_id in (select meta_id from metadata_object where meta_id={meta_id} and source_id='{source_id}' and meta_type='{obj_type}')"
create_sql = self.query_one(sql)
return create_sql[0] if create_sql and len(create_sql) > 0 else ''
def query_metadata_create_by_name(self, meta_name, source_id):
def query_metadata_create_by_name(self, meta_name, source_id, version_code=None):
"""
名称查询创建语句
:param version_code: 版本号
:param meta_name:
:param source_id:
:return:
"""
create_sql = self.query_one(
sql=f"select create_sql from metadata_object_create where meta_id=(select meta_id from metadata_object where meta_name='{meta_name}' and source_id='{source_id}')")
sql = f"select create_sql from metadata_object_create where meta_id=(select meta_id from metadata_object where meta_name='{meta_name}' and source_id='{source_id}'" + self._get_version_sql(
source_id, version_code) + ")"
create_sql = self.query_one(sql)
return create_sql[0] if create_sql and len(create_sql) > 0 else ''
def query_metadata_type(self, meta_id, source_id):
def query_metadata_type(self, meta_id, source_id, version_code=None):
"""
查询元数据类型
:param version_code: 版本号
:param meta_id:
:param source_id:
:return:
"""
rt = self.query_one(
f"select meta_type from metadata_object where meta_id={meta_id} and source_id='{source_id}'")
sql = f"select meta_type from metadata_object where meta_id={meta_id} and source_id='{source_id}'" \
+ self._get_version_sql(source_id, version_code)
rt = self.query_one(sql)
return rt[0] if rt and len(rt) > 0 else None
def query_metadata_type_by_name(self, meta_name, source_id):
def query_metadata_type_by_name(self, meta_name, source_id, version_code=None):
"""
根据元数据名称查询类型
:param version_code: 版本号
:param meta_name: 元数据名
:param source_id: 源ID
:return:
"""
rt = self.query_one(
f"select meta_type from metadata_object where meta_name='{meta_name}' and source_id='{source_id}' limit 1")
sql = f"select meta_type from metadata_object where meta_name='{meta_name}' and source_id='{source_id}'" \
+ self._get_version_sql(source_id, version_code)
rt = self.query_one(sql)
return rt[0] if rt and len(rt) > 0 else None

@ -35,7 +35,7 @@ def analyze_view_sql(sql: str):
try:
analyze_result = LineageRunner(sql)
except Exception as e:
logger.error(f'sql解析异常,e={e}')
log.error(f'sql解析异常,e={e}')
return None
try:
# 获取源表
@ -43,10 +43,10 @@ def analyze_view_sql(sql: str):
target_table = analyze_result.target_tables
if source_table[0] == target_table[0]:
# 防止无限递归
logger.warning(f'源表与目标表相同')
log.warning(f'源表与目标表相同')
return None
except Exception as e:
logger.error(f'获取源表异常,e={e}')
log.error(f'获取源表异常,e={e}')
return None
else:
return [str(s).split(sep='.')[1].upper() for s in source_table]
@ -66,7 +66,7 @@ def analyze_procedure_sql(sql: str):
try:
analyze_result = LineageRunner(sql)
except Exception as e:
logger.error(f'sql解析异常,e={e}')
log.error(f'sql解析异常,e={e}')
return None
try:
# 获取源表
@ -79,7 +79,7 @@ def analyze_procedure_sql(sql: str):
'source': [_format_table_name(s) for s in source_table]
})
except Exception as e:
logger.error(f'获取源表异常,e={e}')
log.error(f'获取源表异常,e={e}')
return None
return result
@ -121,7 +121,7 @@ class MetadataRelationAnalyzer(object):
try:
self._recurrence_view(view[1])
except Exception as e:
logger.error(f'视图{view[1]}分析异常,e={e}')
log.error(f'视图{view[1]}分析异常,e={e}')
finally:
stop_time = time.time()
log.info(f'视图{view[1]}分析结束,耗时{round(stop_time - start_time, 2)}')
@ -197,7 +197,7 @@ class MetadataRelationAnalyzer(object):
source_type,
'name')
except Exception as e:
logger.error(f'存储过程{procedure[1]}分析异常,e={e}')
log.error(f'存储过程{procedure[1]}分析异常,e={e}')
finally:
stop_time = time.time()
log.info(

@ -5,16 +5,12 @@
# @File : scan_task.py
# @Project : futool-tiny-datahub
# @Desc : 扫描任务,通过调度扫描数据源-->获取数据源-->读取元数据并同步
from datahub.datasource.datasource_manage import DataSourceManage
from datahub.local_db_conf import local_conn
from datahub.metadata.metadata_reader import MetadataReader
from datahub.metadata.metadata_warehouse import MetadataWareHouse
from datahub.metadata.metadat_scan import MetadataScanner
from datahub.scheduletask.scandao.scan_task_dao import ScanTaskDao
from datahub.scheduletask.task_executor import ScheduleExecutor
from datahub.scheduletask.schedule import CronExpTrigger
from datahub.log_conf import log
from datahub.metadata.constant.metadata_constant import MetaDataObjType
class ScanTaskManage(object):
@ -74,7 +70,7 @@ class ScanTaskRunner(object):
def __init__(self):
self.dao = ScanTaskDao(local_conn)
self.executor = ScheduleExecutor()
self.scanner = ScanTaskExecutor()
self.scanner = MetadataScanner()
def run(self):
"""
@ -84,48 +80,5 @@ class ScanTaskRunner(object):
enable_task = self.dao.query_all_task('Y')
if enable_task:
for task in enable_task:
self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_print)
self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_metadata)
log.info(f'datasource scan task [{task[0]}] submit success')
class ScanTaskExecutor(object):
def __init__(self):
self.datasource_manage = DataSourceManage()
def scan_print(self, source_id):
# 元数据仓库API
warehouse = MetadataWareHouse(source_id)
# 获取待扫描数据源
datasource = self.datasource_manage.get(source_id)
# 初始化元数据读取器
metadata_reader = MetadataReader(datasource)
# 分别读取表\视图\存储过程并入库
log.info(f'开始扫描[{source_id}]元数据')
tables = metadata_reader.query_tables()
log.info(f'[{source_id}]读取表完毕,共{len(tables)}')
def scan_metadata(self, source_id):
"""
TODO python暂时无法实现注解事务,后续使用动态代理处理
扫描元数据
:param source_id:
:return:
"""
# 元数据仓库API
warehouse = MetadataWareHouse(source_id)
# 获取待扫描数据源
datasource = self.datasource_manage.get(source_id)
# 初始化元数据读取器
metadata_reader = MetadataReader(datasource)
# 分别读取表\视图\存储过程并入库
log.info(f'开始扫描[{source_id}]元数据')
tables = metadata_reader.query_tables()
warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value)
log.info(f'[{source_id}]读取表完毕,共{len(tables)}')
views = metadata_reader.query_views()
warehouse.save_metadata_obj(views, MetaDataObjType.View.value)
log.info(f'[{source_id}]读取视图完毕,共{len(views)}')
procedures = metadata_reader.query_procedure()
warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value)
log.info(f'[{source_id}]读取存储过程完毕,共{len(procedures)}')

Loading…
Cancel
Save