当前位置: 移动技术网 > IT编程>脚本编程>Python > pythonETL工具的使用

pythonETL工具的使用

2018年04月11日  | 移动技术网IT编程  | 我要评论

空调加氟收费标准,修多罗千手丸,有声小说藏地密码

应对小规模业务场景使用专业的etl会有点费劲,还得增加运维监控成本,想想直接写个框架,用代码直接做etl数据接入

安装

pip install pyetl

使用

首先看下最简单的demo

from pyetl import Etl
class TestConfig:
    # 这里直接是把源库和目的库配置成了一个,只是为了演示使用
    DST_URI = SRC_URI = {'uri': 'mysql+pymysql://root:hadoop@localhost:3306/test'}
# 'test_src'和'test_dst'分别是源表名称和目的表名称
app = Etl('test_src', 'test_dst')
app.config(TestConfig)
app.run()

我们正常的业务需求肯定比这个复杂的多,这就涉及转换操作,这里的处理方式是添加udf函数

最简单的数据转换,比如某个字段全部大写处理(下面以id字段为例,一般不可能有这种简单的转换,这里只做演示)

app = Etl('src_table', 'dst_table', unique='id') # unique这个参数作用是确定唯一键,数据插入时会做merge操作
app.config(TestConfig)
@app.add('id')(lambda x:x.upper()) # add(参数可以是str或list) 可以向任务注册某个或某几个字段的转换函数,可以是一个字段分解为多个字段,也可以是多个字段合并为一个字段
@app.befor(lambda app:app.dst.empty('dst_table')) # befor 函数是在开始一次etl前的预处理操作
@app.after(lambda app:app.dst.empty('src_table')) # after 函数是在一次etl完成后的扫尾操作
app.run(where='limit 10') # 这里where可以对数据源做数据筛选过滤过滤

下面是增量更新场景

mapping = {'id': 'id_code'} 
# mapping是当源库和目的库之间表字段不一致时配置的映射关系(key是目的库字段名,value是源库字段名,value可以是list表示两个字段合并到目的字段,怎么合并需要用add注册具体的处理函数)
# src_update为更新标志字段(是源库的字段名),每次拉取数据都会记录时间点
app = Etl('src_table', 'dst_table', mapping=mapping, unique='id', src_update='update_time')
app.config(TestConfig)
app.run(days=1) # 这里的days作用是在原来更新时间点的基础上向前推days天,days为0时忽虐src_update字段记录的时间直接全表接入

创建项目

这里提供了一个简单的项目结构做参考,直接使用pyetl命令行生成一个简单的项目结构,其中的job.py就是我们的etl任务,数据库配置都在config.py里,根据实际环境添加

pyetl -b [project name]

项目结构

以上命令创建好一个工程文件,需要根据个人环境进行配置,下面是一些参数说明
app/config.py文件(主要是SRC_URI和DST_URI参数)
    SRC_URI(源库配置)
    DST_URI(目的库配置)
    SRC_PLACEHOLDER(数据源驱动的占位符形式, 以sqlalchemy连接形式配置时不需要关注)
    QUERY_SIZE(单次获取数据量,根据实际环境机器性能,可以不关注)
    INSERT_SIZE(单次插入数据量,根据实际环境机器性能,可以不关注)

app/etl/job.py文件,单个etl任务示例(主要是src_tab和dst_tab参数)
    src_tab(源表名称)
    dst_tab(目的表名称)
    mapping(可选关键字参数,目的表到源表的字段映射,当源表和目的表字段名称不一致是使用)
    src_update(可选关键字参数,源表的数据更新标志,只要是增长类型的都可以,比如数据变更时间字段,非空以增量形式插入)
    dst_unique(可选关键字参数,目的表的唯一键,非空以merge形式插入)

由于不同数据库驱动配置的形式都会不一样,因此这里统一的都采用字典的形式配置的,下面以连接impala为例:

DST_URI = {

‘host’: ‘192.168.1.1’,

‘port’: 21050,

‘use_kerberos’: True,

‘kerberos_service_name’: ‘impala’,

‘timeout’: 3600,

‘driver’: ‘impala.dbapi’,

}

这里要在原有impala驱动参数配置的基础上增加driver参数,来让代码识别对应的数据库驱动程序,没办法python没有像java一样统一的jdbc。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网