大数据 ETL 流程生成器 - 根据源表 DDL 生成标准化 ETL 加工 SQL(HiveSQL/MySQL)
根据源表 DDL 自动生成标准化的 ETL 加工 SQL,支持 HiveSQL、MySQL、ODPS。
大数据专家(20 年经验)
ods_[表名]_didwd_[表名]_di_at 或 _time 结尾的 TIMESTAMP 字段 → STRING(时区转换)_date 结尾的字段 → STRING(不转换)DATE_FORMAT(FROM_UTC_TIMESTAMP(created_at, "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS created_at
DATE_FORMAT(FROM_UTC_TIMESTAMP(updated_at, "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS updated_at
DATE_FORMAT(FROM_UTC_TIMESTAMP(created_at, "${timezone}"), "yyyy-MM-dd") AS ds
ods_data_base_di 表_operation_ 和 _after_image_ 识别ROW_NUMBER() OVER(PARTITION BY id ORDER BY updated_at DESC) as rn
WHERE rn = 1
SELECT `(rn)?+.+` FROM (...)
# 从文件读取 DDL
python3 skills/etl-generator/etl_generator.py source_table.ddl > etl_sql.sql
# 从标准输入读取
cat source_table.ddl | python3 skills/etl-generator/etl_generator.py > etl_sql.sql
from etl_generator import parse_table_ddl, generate_target_table_ddl, generate_etl_sql
ddl = """
CREATE TABLE IF NOT EXISTS ods_delivery_attempt_di(
id STRING COMMENT '主键',
pno STRING COMMENT '运单号',
client_id STRING COMMENT '客户 ID',
returned BIGINT COMMENT '是否退货件',
delivery_date STRING COMMENT '派送日期',
marker_id BIGINT COMMENT '标记原因',
store_id STRING COMMENT '网点 ID',
created_at TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP COMMENT '更新时间'
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES ("columnar.nested.type"="true", "comment"="有效尝试派送详情")
LIFECYCLE 36500;
"""
table_name, fields, table_comment = parse_table_ddl(ddl)
target_ddl = generate_target_table_ddl(table_name, fields, table_comment)
etl_sql = generate_etl_sql(table_name, fields, table_comment)
CREATE TABLE IF NOT EXISTS ods_sap_store_cash_pay_info_di(
id STRING COMMENT "主键",
store_id STRING COMMENT "网点编号",
business_date STRING COMMENT "业务日期",
sap_state BIGINT COMMENT "0:待处理 1:待发送 2:不需要发送 3:已发送 4:异常",
created_at TIMESTAMP COMMENT "创建时间",
updated_at TIMESTAMP COMMENT "更新时间'
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES ("columnar.nested.type"="true", "comment"="SAP 门店现金支付信息")
LIFECYCLE 36500;
-- 目标表 DDL
CREATE TABLE IF NOT EXISTS dwd_sap_store_cash_pay_info_di(
id STRING COMMENT '主键',
store_id STRING COMMENT '网点编号',
business_date STRING COMMENT '业务日期',
sap_state BIGINT COMMENT '0:待处理 1:待发送 2:不需要发送 3:已发送 4:异常',
created_at STRING COMMENT '创建时间',
updated_at STRING COMMENT '更新时间'
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES ("columnar.nested.type"="true", "comment"="SAP 门店现金支付信息")
LIFECYCLE 36500;
-- ETL 加工 SQL
WITH ods_data AS (
SELECT
id,
store_id,
business_date,
sap_state,
DATE_FORMAT(FROM_UTC_TIMESTAMP(created_at, "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS created_at,
DATE_FORMAT(FROM_UTC_TIMESTAMP(updated_at, "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS updated_at,
DATE_FORMAT(FROM_UTC_TIMESTAMP(created_at, "${timezone}"), "yyyy-MM-dd") AS ds
FROM ods_sap_store_cash_pay_info_di
WHERE ds >= "${y-m-d}"
UNION ALL
SELECT
get_json_object(values, "$.id") as id,
get_json_object(values, "$.store_id") as store_id,
get_json_object(values, "$.business_date") as business_date,
get_json_object(values, "$.sap_state") as sap_state,
DATE_FORMAT(FROM_UTC_TIMESTAMP(get_json_object(values, "$.created_at"), "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS created_at,
DATE_FORMAT(FROM_UTC_TIMESTAMP(get_json_object(values, "$.updated_at"), "${timezone}"), "yyyy-MM-dd HH:mm:ss.SSS") AS updated_at,
DATE_FORMAT(FROM_UTC_TIMESTAMP(get_json_object(values, "$.created_at"), "${timezone}"), "yyyy-MM-dd") AS ds
FROM ods_data_base_di
WHERE (
(_after_image_ = "Y" AND _operation_ IN ("INSERT", "UPDATE"))
OR (_operation_ = "DELETE" AND _before_image_ = "Y")
OR _id_ IS NULL
)
AND ds >= "${y-m-d}"
AND table_name = "sap_store_cash_pay_info"
AND db_name = "source_db"
)
INSERT OVERWRITE TABLE dwd_sap_store_cash_pay_info_di PARTITION(ds)
SELECT `(rn)?+.+` FROM (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY id ORDER BY updated_at DESC) as rn
FROM (
SELECT * FROM dwd_sap_store_cash_pay_info_di WHERE ds IN (
SELECT DISTINCT ds FROM ods_data
)
UNION ALL
SELECT * FROM ods_data
) a
) t1
WHERE rn = 1;
自动生成以下检查 SQL:
自动生成字段映射文档:
-- ============================================
-- 字段映射说明
-- ============================================
-- 源表字段 (7 个): id, store_id, business_date, sap_state, created_at, updated_at
-- 目标表字段 (7 个): id, store_id, business_date, sap_state, created_at, updated_at
-- 分区字段:ds
--
-- 字段转换规则:
-- created_at: TIMESTAMP → STRING, 时区转换
-- updated_at: TIMESTAMP → STRING, 时区转换
-- business_date: 直接映射
-- ============================================
| 参数 | 说明 | 默认值 |
|---|---|---|
${timezone} | 时区 | UTC |
${y-m-d} | 业务日期 | ${yyyymmdd-1} |
${bizdate} | 业务日期(质量检查) | ${yyyymmdd-1} |
skills/etl-generator/
├── SKILL.md # 技能说明
├── etl_generator.py # 核心脚本
├── README.md # 使用文档
└── examples/ # 示例 DDL
└── delivery_attempt.ddl
# 批量处理多个表
for ddl in ddl/*.ddl; do
python3 skills/etl-generator/etl_generator.py $ddl > etl/$(basename $ddl .ddl)_etl.sql
done
修改 etl_generator.py 中的模板函数,适配特定业务场景。
# 生成 DataWorks 节点配置
python3 skills/etl-generator/etl_generator.py source.ddl | \
python3 skills/etl-generator/dataworks_adapter.py > node_config.yaml
(rn)?+.+ 排除 rn 字段_date 结尾的字段不转换ods_[表名]_didwd_[表名]_diods_data_base_di 表table_name 和 db_name维护者: 汉克 (Hank)
更新时间: 2026-03-06