feat: 新增元数据库管理、扫描任务模块

master
old-tom 2 years ago
parent 5bb28d2ebb
commit a2de66574f

@ -28,7 +28,7 @@ class ConnFactory(object):
self.connector_id = self._gen_connector_id(conf.db_type, conf.user, conf.password, conf.host, conf.port,
conf.database)
# 尝试从缓存获取
if connector_cache.exist(self.connector_id):
if connector_cache.exist(self.connector_id) and connector_cache.get(self.connector_id):
self.connector = connector_cache.get(self.connector_id)
else:
# urlquote 用于处理密码中的特殊字符例如@

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/9 14:53
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-tiny-datahub
# @Desc :

@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/12 15:09
# @Author : old tom
# @File : fu_function.py
# @Project : futool-tiny-datahub
# @Desc :
def singleton(cls):
"""
单例装饰器
:param cls:
:return:
"""
_instance = {}
def inner():
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return inner

@ -0,0 +1,134 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/9 14:54
# @Author : old tom
# @File : fu_id.py
# @Project : futool-tiny-datahub
# @Desc : ID 生成器
import time
import abc
from common.log_conf import Logger
logger = Logger().get_logger()
class IdGenerator(object):
"""
ID 生成器
"""
def __init__(self, id_type='snowflake'):
"""
:param id_type: id类型,默认雪花算法
"""
self.id_type = id_type
self.snowflake = SnowFlakeId(datacenter_id=1, worker_id=1)
self.uuid = UUID()
def get_id(self):
return {
'snowflake': self.snowflake,
'uuid': self.uuid
}[self.id_type].get_id()
class AbsIdGenerator(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_id(self):
pass
class SnowFlakeId(AbsIdGenerator):
"""
雪花ID生成
会生成一个64bit的整数最终存到数据库就只占用8字节
1bit: 一般是符号位代表正负数的所以这一位不做处理
41bit这个部分用来记录时间戳如果从1970-01-01 00:00:00来计算开始时间的话它可以记录到2039年足够我们用了并且后续我们可以设置起始时间这样就不用担心不够的问题 这一个部分是保证我们生辰的id趋势递增的关键
10bit这是用来记录机器id的 默认情况下这10bit会分成两部分前5bit代表数据中心后5bit代表某个数据中心的机器id默认情况下计算大概可以支持32*32 - 1= 1023台机器
12bit循环位来对应1毫秒内产生的不同的id 大概可以满足1毫秒并发生成2^12-1=4095次id的要求
"""
# 64位ID的划分
WORKER_ID_BITS = 5
DATACENTER_ID_BITS = 5
SEQUENCE_BITS = 12
# 最大取值计算
MAX_WORKER_ID = -1 ^ (-1 << WORKER_ID_BITS) # 2**5-1 0b11111
MAX_DATACENTER_ID = -1 ^ (-1 << DATACENTER_ID_BITS)
# 移位偏移计算
WOKER_ID_SHIFT = SEQUENCE_BITS
DATACENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS
TIMESTAMP_LEFT_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS
# 序号循环掩码
SEQUENCE_MASK = -1 ^ (-1 << SEQUENCE_BITS)
# Twitter元年时间戳
TWEPOCH = 1288834974657
def __init__(self, datacenter_id, worker_id, start_seq=0):
"""
:param datacenter_id: 数据中心ID
:param worker_id: 机器ID
:param start_seq: 起始序号
"""
# sanity check
if worker_id > self.MAX_WORKER_ID or worker_id < 0:
raise ValueError('worker_id值越界')
if datacenter_id > self.MAX_DATACENTER_ID or datacenter_id < 0:
raise ValueError('datacenter_id值越界')
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.sequence = start_seq
self.last_timestamp = -1 # 上次计算的时间戳
@staticmethod
def _gen_timestamp():
"""
生成整数时间戳
:return:int timestamp
"""
return int(time.time() * 1000)
def get_id(self):
"""
获取新ID
:return:
"""
timestamp = self._gen_timestamp()
# 时钟回拨
if timestamp < self.last_timestamp:
logger.error('clock is moving backwards. Rejecting requests until{}'.format(self.last_timestamp))
raise Exception
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & self.SEQUENCE_MASK
if self.sequence == 0:
timestamp = self._til_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
new_id = ((timestamp - self.TWEPOCH) << self.TIMESTAMP_LEFT_SHIFT) | (
self.datacenter_id << self.DATACENTER_ID_SHIFT) | \
(self.worker_id << self.WOKER_ID_SHIFT) | self.sequence
return new_id
def _til_next_millis(self, last_timestamp):
"""
等到下一毫秒
"""
timestamp = self._gen_timestamp()
while timestamp <= last_timestamp:
timestamp = self._gen_timestamp()
return timestamp
class UUID(AbsIdGenerator):
"""
UUID生成
"""
def __init__(self):
pass
def get_id(self):
pass
id_gen = IdGenerator()

@ -10,7 +10,7 @@ import sys
from loguru import logger
# 日志输出路径
LOG_PATH = '../logout/info_log.txt'
LOG_PATH = '../logout/info_log.log'
class Logger(object):

@ -11,6 +11,7 @@ from common.fudb.dbapis.fu_db_api import select_one
from datahub.datasource.dsdao.ds_dao import DataSourceDao
from common.log_conf import Logger
from datahub.datasource.constant import ds_conf_param, DUAL_DB_TYPE
from datahub.local_db_conf import local_conn
# 日志
logger = Logger().get_logger()
@ -31,10 +32,10 @@ class DataSourceManage(object):
数据源管理
"""
def __init__(self, local_db):
self.dao = DataSourceDao(local_db)
def __init__(self):
self.dao = DataSourceDao(local_conn)
def add(self, conf: ds_conf_param):
def add(self, conf: ds_conf_param, cron='0 0 0 1/1 * ?'):
# 初始化连接器
connector = ConnFactory(conf)
if self.dao.exist_by_source(connector.connector_id):
@ -44,8 +45,8 @@ class DataSourceManage(object):
checker = DataSourceChecker(connector)
result, msg = checker.check()
if result:
# 入库
return self.dao.add_datasource(connector.connector_id, conf), ''
# 入库并添加扫描任务
return self.dao.add_datasource(connector.connector_id, conf, cron), ''
else:
# 返回错误信息
return result, f'check failed,{msg}'
@ -103,16 +104,8 @@ class DataSourceChecker(object):
return False, f'cannot get connection,e={e}'
try:
# 测试select 1
return int(select_one(conn, 'select 1 ' + (
'from dual' if self.connector.db_type in DUAL_DB_TYPE else ''))) > 0, 'success'
sql = 'select 1 ' + (
'from dual' if self.connector.db_type in DUAL_DB_TYPE else '')
return int(select_one(conn, sql)[0]) > 0, 'success'
except Exception as e:
return False, f'cannot execute "select 1",e={e}'
if __name__ == '__main__':
local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres')
factory_1 = ConnFactory(local_ds)
factory_2 = ConnFactory(local_ds)
print(factory_1.connector_id, factory_2.connector_id)
print(factory_1 is factory_2)
print(factory_2.connector is factory_1.connector)

@ -8,17 +8,38 @@
from common.fudb.connectors.connector_factory import ConnFactory
from common.fudb.dbapis.fu_dao import BaseDao
from sqlalchemy import text
from common.log_conf import Logger
from datahub.datasource.constant import ds_conf_param
# 日志
logger = Logger().get_logger()
class DataSourceDao(BaseDao):
def __init__(self, connector: ConnFactory):
super().__init__(connector)
def add_datasource(self, source_id, conf: ds_conf_param):
sql = f"insert into datasource_main (source_id,source_type,host,port,username,password,database_name) values ('{source_id}'" \
f",'{conf.db_type}','{conf.host}',{conf.port},'{conf.user}','{conf.password}','{conf.database}')"
return self.execute_update(sql) > 0
def add_datasource(self, source_id, conf: ds_conf_param, cron):
conn = self.connector.get_conn()
try:
# 开启事务
conn.begin()
# 添加数据源
sql = f"insert into datasource_main (source_id,source_type,host,port,username,password,database_name) values ('{source_id}'" \
f",'{conf.db_type}','{conf.host}',{conf.port},'{conf.user}','{conf.password}','{conf.database}')"
conn.execute(text(sql))
# 创建扫描任务
scan_sql = f"insert into scan_task_conf (source_id,cron_expression) values ('{source_id}','{cron}')"
conn.execute(text(scan_sql))
conn.commit()
return True
except Exception as e:
conn.rollback()
logger.error(f'添加数据源[{conf}]失败,e={e}')
return False
finally:
conn.close()
def remove_datasource(self, source_id):
return self.execute_update(f"delete from datasource_main where source_id='{source_id}'")
@ -33,7 +54,8 @@ class DataSourceDao(BaseDao):
:param source_id:
:return:
"""
return self.query_one(f"select 1 from datasource_main where source_id='{source_id}'") > 0
sql = f"select 1 from datasource_main where source_id='{source_id}'"
return self.query_one(sql) is not None
def edit_datasource_conf(self, source_id, param_dict):
"""
@ -49,10 +71,3 @@ class DataSourceDao(BaseDao):
conf_field = ['source_type', 'username', 'password', 'host', 'port', 'database_name']
return self.query_one(
f"select {','.join(conf_field)} from datasource_main where source_id='{source_id}'")
if __name__ == '__main__':
local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres')
dao = DataSourceDao(ConnFactory(local_ds))
rt = dao.query_datasource_conf('db143d11741a9575fdea92ed2b39dc53')
print(ds_conf_param._make(rt))

@ -0,0 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/9 15:37
# @Author : old tom
# @File : local_db_conf.py
# @Project : futool-tiny-datahub
# @Desc : 数据库连接配置
from datahub.datasource.constant import ds_conf_param
from common.fudb.connectors.connector_factory import ConnFactory
# 系统使用数据库配置 数据库类型 用户名 密码 host 端口 默认数据库
local_db = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres')
local_conn = ConnFactory(local_db)

@ -13,6 +13,7 @@ from datahub.datasource.constant import ds_conf_param
from datahub.datasource.datasource_manage import DataSource, DataSourceManage
from datahub.metadata.metadatadao.metadata_dao import MetadataDao
from common.fudb.connectors.connector_factory import ConnFactory
from sqllineage.runner import LineageRunner
reader_conf = ConfigParser()
reader_conf.read('./reader_conf.ini')
@ -73,16 +74,42 @@ class MetadataReader(AbsMetadataReader):
return self.dao.query_all_views(reader_conf[self.db_type]['views'])
def query_procedure(self):
pass
return self.dao.query_all_procedure(reader_conf[self.db_type]['procedure'])
def query_table_fields(self, table):
pass
return self.dao.query_table_field(reader_conf[self.db_type]['table_field'].replace('#$#', table))
def query_metadata_detail(self, obj_name, obj_type):
"""
查询元数据明细
obj_type 为view或procedure 则查询对应创建语句
为table 则查询中文注释
:param obj_name: 对象名称
:param obj_type: 对象类型
:return:
"""
return {
'table': '',
'view': self.dao.query_view_detail(obj_name,
reader_conf[self.db_type][obj_type + '_detail'].replace('#$#',
obj_name)),
'procedure': self.dao.query_procedure_detail(
reader_conf[self.db_type][obj_type + '_detail'].replace('#$#', obj_name))
}[obj_type]
if __name__ == '__main__':
local_ds = ds_conf_param('postgresql', 'postgres', 'root@123', 'localhost', 5432, 'postgres')
dsm = DataSourceManage(ConnFactory(local_ds))
ds = dsm.get('db143d11741a9575fdea92ed2b39dc53')
ds = dsm.get('834164a2d62de959c0261e6239dd1e55')
mtr = MetadataReader(ds)
print(mtr.query_tables())
print(mtr.query_views())
# print(mtr.query_tables())
# print(mtr.query_views())
# print(mtr.query_procedure())
# sql = mtr.query_metadata_detail('DW__FINANCE_SURVEY_MONTH_TJ_VW', 'view')
# runner = LineageRunner(sql, dialect='oracle')
# 画图
# runner.draw(dialect='oracle')
# print(runner)
# runner.draw(dialect='oracle')
print(mtr.query_table_fields('ODS_CONTRACT_PAY_INFO'))

@ -0,0 +1,27 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/9 14:41
# @Author : old tom
# @File : metadata_warehouse.py
# @Project : futool-tiny-datahub
# @Desc : 元数据存储
from common.futool.fu_id import id_gen
class MetadataWareHouse(object):
"""
元数据仓库
"""
def __init__(self, source_id):
self.source_id = source_id
def save_metadata_obj(self):
pass
def save_metadata_obj_detail(self):
pass
def save_metadata_obj_field(self):
pass

@ -29,3 +29,38 @@ class MetadataDao(BaseDao):
:return:
"""
return [v[0] for v in self.query_all(sql)]
def query_all_procedure(self, sql):
"""
查询所有存储过程
:param sql:
:return:
"""
return [p[0] for p in self.query_all(sql)]
def query_procedure_detail(self, sql):
"""
查询视图及存储过程明细
:param sql:
:return:
"""
rt = self.query_all(sql)
# 合并为完整SQL
return 'CREATE OR REPLACE ' + '\r'.join([x[0] for x in rt])
def query_view_detail(self, view_name, sql):
"""
查询视图创建语句
:param view_name:
:param sql:
:return:
"""
return f'CREATE OR REPLACE VIEW {view_name} AS ' + '\r\n' + self.query_one(sql)[0]
def query_table_field(self, sql):
"""
查询表字段
:param sql:
:return:
"""
return self.query_all(sql)

@ -1,7 +1,14 @@
[oracle]
tables = select distinct table_name from user_tab_comments where table_type='TABLE'
views = select distinct table_name from user_tab_comments where table_type='VIEW'
procedure = select distinct name From user_source where type = 'PROCEDURE'
view_detail = select text from all_views where view_name='#$#'
procedure_detail = SELECT text FROM user_source WHERE NAME = '#$#' ORDER BY line
table_field = select b.COLUMN_NAME,a.COMMENTS, b.COLUMN_ID, b.DATA_TYPE, b.DATA_LENGTH, b.NULLABLE, b.DEFAULT_LENGTH, b.DATA_DEFAULT from user_col_comments a left join user_tab_columns b on a.table_name=b.table_name
and a.COLUMN_NAME = b.COLUMN_NAME where a.table_name = '#$#' ORDER BY b.COLUMN_ID asc
[postgresql]
tables = SELECT distinct table_name FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name
views = select distinct table_name from information_schema.views WHERE table_schema = 'public' ORDER BY table_name
views = select distinct table_name from information_schema.views WHERE table_schema = 'public' ORDER BY table_name
view_detail =
procedure_detail =

@ -6,10 +6,24 @@
# @Project : futool-tiny-datahub
# @Desc :
from sqllineage.runner import LineageRunner
class MetadataRelationAnalyzer(object):
"""
元数据关系分析
流程从元数据存储读取视图-->分析视图递归-->视图与表关联关系-->存入Neo4j
"""
def __init__(self, source_id):
pass
self.source_id = source_id
if __name__ == '__main__':
with open(r'D:\文档\工作\ATD\2023上半年\项目\事权\DW_FINANCE_PLAN_DETAIL_BASE_VW.sql', 'r', encoding='utf-8') as f:
sql = f.read()
runner = LineageRunner(sql, dialect='oracle')
# 画图
# runner.draw(dialect='oracle')
print(runner)
print(runner.source_tables)

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/11 18:52
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-tiny-datahub
# @Desc : 任务定时调度模块

@ -0,0 +1,61 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/11 19:08
# @Author : old tom
# @File : scan_task.py
# @Project : futool-tiny-datahub
# @Desc : 扫描任务,通过调度扫描数据源-->获取数据源-->读取元数据并同步
from datahub.local_db_conf import local_conn
from datahub.scheduletask.scandao.scan_task_dao import ScanTaskDao
class ScanTaskManage(object):
def __init__(self):
self.dao = ScanTaskDao(local_conn)
def add_task(self, source_id, cron='0 0 0 1/1 * ?'):
"""
添加任务
:param source_id: 数据源ID
:param cron: cron表达式
:return:
"""
if self.dao.exist_by_id(source_id):
return False, f'[{source_id}] all ready exist'
return self.dao.add_task(source_id, cron), 'add success'
def switch_task(self, source_id, enable='N'):
"""
开关任务
:param source_id: 源ID
:param enable: 是否开启 Y开|N关
:return:
"""
return self.dao.switch_task(source_id, enable)
def edit_cron(self, source_id, cron):
"""
修改任务CRON表达式
:param source_id: 源ID
:param cron: cron表达式
:return:
"""
return self.dao.edit_task_cron(source_id, cron)
def query_task(self, enable=None):
"""
任务查询
:param enable:
:return:
"""
return self.dao.query_all_task(enable)
def query_task_by_id(self, source_id):
"""
根据ID 查任务
:return:
"""
return self.dao.query_task_by_id(source_id)
scan_task_manage = ScanTaskManage()

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/12 15:24
# @Author : old tom
# @File : __init__.py.py
# @Project : futool-tiny-datahub
# @Desc :

@ -0,0 +1,67 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/12 15:24
# @Author : old tom
# @File : scan_task_dao.py
# @Project : futool-tiny-datahub
# @Desc :
from common.fudb.connectors.connector_factory import ConnFactory
from common.fudb.dbapis.fu_dao import BaseDao
class ScanTaskDao(BaseDao):
def __init__(self, connector: ConnFactory):
super().__init__(connector)
def add_task(self, source_id, cron):
return self.execute_update(
f"insert into scan_task_conf (source_id,cron_expression) values ('{source_id}','{cron}')") > 0
def exist_by_id(self, source_id):
"""
判断任务是否存在
:param source_id:
:return:
"""
return self.query_one(f"select count(1) from scan_task_conf where source_id='{source_id}'")[0] > 0
def remove_task(self, source_id):
return self.execute_update(f"delete from scan_task_conf where source_id='{source_id}'") > 0
def switch_task(self, source_id, enable):
"""
开关任务
:param source_id: 数据源ID
:param enable: Y开启 N关闭
:return:
"""
return self.execute_update(f"update scan_task_conf set enable='{enable}' where source_id='{source_id}'") > 0
def edit_task_cron(self, source_id, cron):
"""
修改任务cron表达式
:param source_id: 数据源ID
:param cron: cron表达式
:return:
"""
return self.execute_update(
f"update scan_task_conf set cron_expression='{cron}' where source_id='{source_id}'") > 0
def query_all_task(self, enable):
"""
查询所有已开启任务
:return:
"""
if not enable:
return self.query_all("select source_id,cron_expression from scan_task_conf")
return self.query_all(f"select source_id,cron_expression from scan_task_conf where enable='{enable}'")
def query_task_by_id(self, source_id):
"""
源ID 查任务
:param source_id:
:return:
"""
return self.query_one(
f"select source_id,cron_expression,enable from scan_task_conf where source_id='{source_id}'")

@ -0,0 +1,38 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/12 15:15
# @Author : old tom
# @File : schedule.py
# @Project : futool-tiny-datahub
# @Desc : 定时任务配置
from apscheduler.triggers.cron import CronTrigger
from apscheduler.schedulers.background import BackgroundScheduler
# 后台指定调度配合fastapi使用
# BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时如上例使用。
# BackgroundScheduler: 调用start后主线程不会阻塞。当你不运行任何其他框架时使用并希望调度器在你应用的后台执行。
# 每隔 1分钟 运行一次 job 方法
# scheduler.add_job(tick, trigger=CronExpTrigger.parse_crontab('0/1 * * * * * *'), kwargs={
# "name": "bob"
# }
scheduler = BackgroundScheduler()
class CronExpTrigger(CronTrigger):
"""
重写cron触发器,支持6\7 cron表达式
7* * * * * * *
"""
@classmethod
def parse_crontab(cls, expr, timezone=None):
values = expr.split()
if len(values) == 6:
# 6位转7位-->? 转 *-->末尾补充 *
values[5] = '*'
values = values + ['*']
if len(values) != 7:
raise ValueError('Wrong number of fields; got {}, expected 7'.format(len(values)))
return cls(second=values[0], minute=values[1], hour=values[2], day=values[3], month=values[4],
day_of_week=values[5], year=values[6], timezone=timezone)

@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/11 19:10
# @Author : old tom
# @File : task_executor.py
# @Project : futool-tiny-datahub
# @Desc : 任务执行器,负责将任务添加到scheduler队列中
from common.futool.fu_function import singleton
@singleton
class CommonTaskExecutor(object):
"""
通用任务执行器
"""
def __init__(self, scheduler, cron_trigger):
"""
:param scheduler: 调度器
:param cron_trigger: cron触发器
"""
self.cron_trigger = cron_trigger
self.scheduler = scheduler
def submit(self, source_id, cron):
"""
提交任务
:param source_id: 数据源ID
:param cron: cron表达式
:return:
"""
pass

@ -1,3 +0,0 @@
20230408 11:11:37 - MainProcess | MainThread | test_log.test:16 - INFO -fefefe
20230408 11:11:37 - MainProcess | MainThread | test_log.test:17 - ERROR -223232
20230408 11:11:37 - MainProcess | MainThread | test_log.test:18 - WARNING -999
Loading…
Cancel
Save