feat: 新增元数据数仓模块

master
old-tom 2 years ago
parent a2de66574f
commit 048848ad4f

@ -27,7 +27,7 @@ class BaseDao(object):
def execute_update(self, sql):
return execute_update(self.connector.get_conn(), sql)
def batch_insert(self, db_type, sql_tpl, data, batch_size):
def batch_insert(self, sql_tpl, data, batch_size, db_type='postgresql'):
return batch_insert(self.connector.get_conn(), db_type, sql_tpl, data, batch_size)
def dynamic_update_by_param(self, table_name, condition, param: dict):

@ -88,6 +88,7 @@ def execute_update(conn: Connection, sql):
def batch_insert(conn: Connection, db_type, sql_tpl, data, batch_size=1500):
"""
批量插入
将sql转为into oracle_table ( id, code ) values( 1 , '1' ),( 2 , '2' ),( 3 , '3' )
:param conn: 数据库连接
:param batch_size: 每次插入量
:param db_type: 数据库类型
@ -168,7 +169,7 @@ class BatchInsertHandler(object):
for ds in data_set:
val = '('
for ele in ds:
val += "'" + ele + "'," if isinstance(ele, str) else str(ele) + ','
val += self._field_value_convert(ele)
val = val[0:-1] + ')'
begin += (self.sql_tpl.replace('%s', val) + ' \r ')
end = 'select 1 from dual'
@ -179,10 +180,28 @@ class BatchInsertHandler(object):
for ds in data_set:
val = '('
for ele in ds:
val += "'" + ele + "'," if isinstance(ele, str) else str(ele) + ','
val += self._field_value_convert(ele)
val = val[0:-1] + ')'
vals += val + ','
return self.sql_tpl.replace('%s', vals[0:-1])
def build_mysql_insert(self, data_set):
return self.build_pg_insert(data_set)
@staticmethod
def _field_value_convert(field_val):
"""
字段类型转换
:param field_val: 字段值
:return:
"""
# None处理
if field_val is None:
field_val = ''
# 特殊字符处理
if isinstance(field_val, str) and "'" in field_val:
field_val = field_val.replace("'", '')
# TODO 不可见字符处理,查询创建SQL时会删除\r,导致SQL格式混乱
# if isinstance(field_val, str):
# field_val = ''.join(x for x in field_val if x.isprintable())
return "'" + field_val + "'," if isinstance(field_val, str) else str(field_val) + ','

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/14 14:09
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-tiny-datahub
# @Desc :

@ -0,0 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/14 14:09
# @Author : old tom
# @File : metadata_constant.py
# @Project : futool-tiny-datahub
# @Desc :
from enum import Enum
class MetaDataObjType(Enum):
Table = 'table'
View = 'view'
Procedure = 'procedure'

@ -7,16 +7,16 @@
# @Desc : 元数据读取
import abc
import os
from configparser import ConfigParser
from datahub.datasource.constant import ds_conf_param
from datahub.datasource.datasource_manage import DataSource, DataSourceManage
from datahub.datasource.datasource_manage import DataSource
from datahub.metadata.metadatadao.metadata_dao import MetadataDao
from common.fudb.connectors.connector_factory import ConnFactory
from sqllineage.runner import LineageRunner
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
reader_conf = ConfigParser()
reader_conf.read('./reader_conf.ini')
reader_conf.read(os.path.join(BASE_DIR, 'reader_conf.ini'))
class AbsMetadataReader(metaclass=abc.ABCMeta):
@ -89,27 +89,11 @@ class MetadataReader(AbsMetadataReader):
:return:
"""
return {
'table': '',
'table': self.dao.query_table_field(
reader_conf[self.db_type][obj_type + '_detail'].replace('#$#', obj_name)),
'view': self.dao.query_view_detail(obj_name,
reader_conf[self.db_type][obj_type + '_detail'].replace('#$#',
obj_name)),
'procedure': self.dao.query_procedure_detail(
reader_conf[self.db_type][obj_type + '_detail'].replace('#$#', obj_name))
}[obj_type]
if __name__ == '__main__':
local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres')
dsm = DataSourceManage(ConnFactory(local_ds))
ds = dsm.get('834164a2d62de959c0261e6239dd1e55')
mtr = MetadataReader(ds)
# print(mtr.query_tables())
# print(mtr.query_views())
# print(mtr.query_procedure())
# sql = mtr.query_metadata_detail('DW__FINANCE_SURVEY_MONTH_TJ_VW', 'view')
# runner = LineageRunner(sql, dialect='oracle')
# 画图
# runner.draw(dialect='oracle')
# print(runner)
# runner.draw(dialect='oracle')
print(mtr.query_table_fields('ODS_CONTRACT_PAY_INFO'))

@ -7,6 +7,8 @@
# @Desc : 元数据存储
from common.futool.fu_id import id_gen
from datahub.metadata.metadatadao.metadata_dao import MetadataDao
from datahub.local_db_conf import local_conn
class MetadataWareHouse(object):
@ -16,12 +18,54 @@ class MetadataWareHouse(object):
def __init__(self, source_id):
self.source_id = source_id
self.dao = MetadataDao(local_conn)
def save_metadata_obj(self):
pass
def save_metadata_obj(self, objs, obj_type):
"""
保存元数据对象
:param obj_type: 元数据类型
:param objs:
:return:
"""
data = [(id_gen.get_id(), self.source_id, obj_type, x[0], x[1]) for x in objs]
return self.dao.save_metadata_obj(data)
def save_metadata_obj_detail(self, details):
"""
保存元数据对象明细
:param details:
:return:
"""
return self.dao.save_metadata_create(details)
def save_metadata_obj_detail(self):
pass
def save_metadata_obj_field(self, fields):
"""
保存字段
:param fields:
:return:
"""
return self.dao.save_table_fields(fields)
def save_metadata_obj_field(self):
pass
def query_metadata(self, obj_type=None):
"""
查询元数据
:param obj_type:
:return:
"""
return self.dao.query_metadata_by_type(obj_type)
def query_metadata_name(self, obj_type=None):
"""
查询元数据名称
:param obj_type:
:return:
"""
return self.dao.query_metadata_name_by_type(obj_type)
def query_metadata_id_name(self, obj_type=None):
"""
查询元数据ID及名称
:param obj_type:
:return:
"""
return self.dao.query_metadata_id(obj_type)

@ -20,7 +20,7 @@ class MetadataDao(BaseDao):
:param sql:
:return:
"""
return [t[0] for t in self.query_all(sql)]
return self.query_all(sql)
def query_all_views(self, sql):
"""
@ -28,7 +28,7 @@ class MetadataDao(BaseDao):
:param sql:
:return:
"""
return [v[0] for v in self.query_all(sql)]
return self.query_all(sql)
def query_all_procedure(self, sql):
"""
@ -36,7 +36,7 @@ class MetadataDao(BaseDao):
:param sql:
:return:
"""
return [p[0] for p in self.query_all(sql)]
return self.query_all(sql)
def query_procedure_detail(self, sql):
"""
@ -46,7 +46,7 @@ class MetadataDao(BaseDao):
"""
rt = self.query_all(sql)
# 合并为完整SQL
return 'CREATE OR REPLACE ' + '\r'.join([x[0] for x in rt])
return 'create or replace ' + (''.join([str(x[0]).replace('\n', '\r') for x in rt]))
def query_view_detail(self, view_name, sql):
"""
@ -64,3 +64,47 @@ class MetadataDao(BaseDao):
:return:
"""
return self.query_all(sql)
def save_table_fields(self, data):
"""
保存表字段
:param data:
:return:
"""
return self.batch_insert(
sql_tpl='insert into metadata_object_field(meta_id,field_name,field_ch_name,order_num,field_type,nullable,default_value,field_length) values %s',
data=data, batch_size=1500)
def save_metadata_obj(self, objs):
"""
保存元数据
:param objs:
:return:
"""
return self.batch_insert(
sql_tpl='insert into metadata_object(meta_id,source_id,meta_type,meta_name,meta_ch_name) values %s',
data=objs, batch_size=1500)
def save_metadata_create(self, details):
return self.batch_insert(
sql_tpl='insert into metadata_object_create(meta_id,create_sql) values %s',
data=details, batch_size=1500)
def query_metadata_by_type(self, obj_type):
return self.query_all(
f"select * from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all(
'select * from metadata_object')
def query_metadata_name_by_type(self, obj_type):
"""
查询元数据名称
:param obj_type:
:return:
"""
sql = f"select meta_name from metadata_object where meta_type='{obj_type}'" if obj_type else 'select meta_name from metadata_object'
return [x[0] for x in self.query_all(sql)]
def query_metadata_id(self, obj_type):
return self.query_all(
f"select meta_id,meta_name from metadata_object where meta_type='{obj_type}'") if obj_type else self.query_all(
'select meta_id,meta_name from metadata_object')

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/14 14:04
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-tiny-datahub
# @Desc : 元数据版本

@ -1,10 +1,10 @@
[oracle]
tables = select distinct table_name from user_tab_comments where table_type='TABLE'
views = select distinct table_name from user_tab_comments where table_type='VIEW'
procedure = select distinct name From user_source where type = 'PROCEDURE'
tables = select distinct table_name,comments from user_tab_comments where table_type='TABLE' and table_name not like '%BIN$%'
views = select distinct table_name,comments from user_tab_comments where table_type='VIEW'
procedure = select distinct name,'' as comments From user_source where type = 'PROCEDURE'
view_detail = select text from all_views where view_name='#$#'
procedure_detail = SELECT text FROM user_source WHERE NAME = '#$#' ORDER BY line
table_field = select b.COLUMN_NAME,a.COMMENTS, b.COLUMN_ID, b.DATA_TYPE, b.DATA_LENGTH, b.NULLABLE, b.DEFAULT_LENGTH, b.DATA_DEFAULT from user_col_comments a left join user_tab_columns b on a.table_name=b.table_name
table_detail = select b.COLUMN_NAME,a.COMMENTS, b.COLUMN_ID, b.DATA_TYPE, b.NULLABLE,b.DATA_DEFAULT,b.DATA_LENGTH from user_col_comments a left join user_tab_columns b on a.table_name=b.table_name
and a.COLUMN_NAME = b.COLUMN_NAME where a.table_name = '#$#' ORDER BY b.COLUMN_ID asc
[postgresql]

@ -5,10 +5,22 @@
# @File : scan_task.py
# @Project : futool-tiny-datahub
# @Desc : 扫描任务,通过调度扫描数据源-->获取数据源-->读取元数据并同步
from datahub.datasource.datasource_manage import DataSourceManage
from datahub.local_db_conf import local_conn
from datahub.metadata.metadata_reader import MetadataReader
from datahub.metadata.metadata_warehouse import MetadataWareHouse
from datahub.scheduletask.scandao.scan_task_dao import ScanTaskDao
from common.futool.fu_function import singleton
from datahub.scheduletask.task_executor import ScheduleExecutor
from datahub.scheduletask.schedule import CronExpTrigger
from common.log_conf import Logger
from datahub.metadata.constant.metadata_constant import MetaDataObjType
logger = Logger().get_logger()
@singleton
class ScanTaskManage(object):
def __init__(self):
self.dao = ScanTaskDao(local_conn)
@ -58,4 +70,71 @@ class ScanTaskManage(object):
return self.dao.query_task_by_id(source_id)
scan_task_manage = ScanTaskManage()
class ScanTaskRunner(object):
"""
扫描任务执行器
"""
def __init__(self):
self.dao = ScanTaskDao(local_conn)
self.executor = ScheduleExecutor()
self.scanner = ScanTaskExecutor()
def run(self):
"""
获取扫描配置提交到执行器
:return:
"""
enable_task = self.dao.query_all_task('Y')
if enable_task:
for task in enable_task:
self.executor.submit(task[0], CronExpTrigger.parse_crontab(task[1]), self.scanner.scan_metadata)
logger.info(f'task [{task[0]}] submit success')
class ScanTaskExecutor(object):
def __init__(self):
self.datasource_manage = DataSourceManage()
def scan_metadata(self, source_id):
"""
扫描元数据
:param source_id:
:return:
"""
# 元数据仓库API
warehouse = MetadataWareHouse(source_id)
# 获取待扫描数据源
datasource = self.datasource_manage.get(source_id)
# 初始化元数据读取器
metadata_reader = MetadataReader(datasource)
# 分别读取表\视图\存储过程并入库
tables = metadata_reader.query_tables()
warehouse.save_metadata_obj(tables, MetaDataObjType.Table.value)
views = metadata_reader.query_views()
warehouse.save_metadata_obj(views, MetaDataObjType.View.value)
procedures = metadata_reader.query_procedure()
warehouse.save_metadata_obj(procedures, MetaDataObjType.Procedure.value)
# TODO 临时调用
# w_procedures = warehouse.query_metadata_id_name(MetaDataObjType.Procedure.value)
# w_procedures = warehouse.query_metadata_id_name(MetaDataObjType.View.value)
# 查询视图语句
# for v in w_procedures:
# sql = metadata_reader.query_metadata_detail(v[1], MetaDataObjType.View.value)
# # warehouse.save_metadata_obj_detail([list(v)[0]] + [sql])
# warehouse.save_metadata_obj_detail([(v[0], sql)])
# w_tables = warehouse.query_metadata_id_name(MetaDataObjType.Table.value)
# for t in w_tables:
# fields = metadata_reader.query_metadata_detail(t[1], MetaDataObjType.Table.value)
# fields_data = []
# for f in fields:
# fields_data.append(tuple([list(t)[0]] + list(f)))
# warehouse.save_metadata_obj_field(fields_data)
if __name__ == '__main__':
ste = ScanTaskExecutor()
ste.scan_metadata('834164a2d62de959c0261e6239dd1e55')

@ -6,7 +6,8 @@
# @Project : futool-tiny-datahub
# @Desc : 定时任务配置
from apscheduler.triggers.cron import CronTrigger
from apscheduler.schedulers.background import BackgroundScheduler
# from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
# 后台指定调度配合fastapi使用
# BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时如上例使用。
@ -15,7 +16,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
# scheduler.add_job(tick, trigger=CronExpTrigger.parse_crontab('0/1 * * * * * *'), kwargs={
# "name": "bob"
# }
scheduler = BackgroundScheduler()
sch = BlockingScheduler()
class CronExpTrigger(CronTrigger):

@ -5,28 +5,36 @@
# @File : task_executor.py
# @Project : futool-tiny-datahub
# @Desc : 任务执行器,负责将任务添加到scheduler队列中
from common.futool.fu_function import singleton
from datahub.scheduletask.schedule import sch
@singleton
class CommonTaskExecutor(object):
class ScheduleExecutor(object):
"""
通用任务执行器
定时任务任务执行器
"""
def __init__(self, scheduler, cron_trigger):
def __init__(self, scheduler=sch):
"""
:param scheduler: 调度器
:param cron_trigger: cron触发器
:param scheduler: 调度器默认使用BackgroundScheduler
"""
self.cron_trigger = cron_trigger
self.scheduler = scheduler
def submit(self, source_id, cron):
def submit(self, source_id, cron_trigger, execute_fun):
"""
提交任务
:param execute_fun: 定时任务执行函数
:param cron_trigger: cron触发器
:param source_id: 数据源ID
:param cron: cron表达式
:return:
"""
pass
self.scheduler.add_job(execute_fun, trigger=cron_trigger, id=source_id, kwargs={
'source_id': source_id
})
def remove_job(self, source_id):
"""
移除任务
:param source_id:
:return:
"""
self.scheduler.remove_job(job_id=source_id)

Loading…
Cancel
Save