本文为您介绍如何将同步至MaxCompute的用户信息表ods_user_info_d及访问日志数据ods_raw_log_d,通过DataWorks的ODPS SQL节点加工得到目标用户画像数据,阅读本文后您可了解到如何通过DataWorks+MaxCompute产品组合来计算和分析已同步的数据,完成数仓简单数据加工场景。进入数据开发登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发。步骤一:新建MaxCompute表提前新建dwd_log_info_di、dws_user_info_all_di、ads_user_info_1d表,用于存放每层加工后的数据。以下仅快速创建相关表,更多MaxCompute表相关操作,请参见创建并使用MaxCompute表。进入新建表入口。在数据开发页面,打开数据同步阶段创建的业务流程WorkShop。右键单击MaxCompute,选择新建表。定义MaxCompute表结构。在新建表对话框中,输入表名,单击新建。此处需要创建三张表,表名分别为dwd_log_info_di、dws_user_info_all_di、ads_user_info_1d。选择DDL方式建表,三张表建表命令请参考下文。提交至引擎生效。表结构定义完成后,分别单击提交到开发环境和提交到生产环境,系统将根据您的配置在开发环境与生产环境对应计算引擎项目分别创建目标引擎物理表。提交表至DataWorks的开发环境,即在开发环境的MaxCompute引擎中创建当前表。提交表至DataWorks的生产环境,即在生产环境的MaxCompute引擎中创建当前表。1、新建dwd_log_info_di表双击dwd_log_info_di表,在右侧的编辑页面单击DDL,输入下述建表语句。CREATE TABLE IF NOT EXISTS dwd_log_info_di (
ip STRING COMMENT 'ip地址',
uid STRING COMMENT '用户ID',
time STRING COMMENT '时间yyyymmddhh:mi:ss',
status STRING COMMENT '服务器返回状态码',
bytes STRING COMMENT '返回给客户端的字节数',
region STRING COMMENT '地域,根据ip得到',
method STRING COMMENT 'http请求类型',
url STRING COMMENT 'url',
protocol STRING COMMENT 'http协议版本号',
referer STRING COMMENT '来源url',
device STRING COMMENT '终端类型 ',
identity STRING COMMENT '访问类型 crawler feed user unknown'
)
PARTITIONED BY (
dt STRING
)
LIFECYCLE 14;2、新建dws_user_info_all_di表双击dws_user_info_all_di表,在右侧的编辑页面单击DDL,输入下述建表语句。CREATE TABLE IF NOT EXISTS dws_user_info_all_di (
uid STRING COMMENT '用户ID',
gender STRING COMMENT '性别',
age_range STRING COMMENT '年龄段',
zodiac STRING COMMENT '星座',
region STRING COMMENT '地域,根据ip得到',
device STRING COMMENT '终端类型 ',
identity STRING COMMENT '访问类型 crawler feed user unknown',
method STRING COMMENT 'http请求类型',
url STRING COMMENT 'url',
referer STRING COMMENT '来源url',
time STRING COMMENT '时间yyyymmddhh:mi:ss'
)
PARTITIONED BY (
dt STRING
)
LIFECYCLE 14;3、新建ads_user_info_1d表双击ads_user_info_1d表,在右侧的编辑页面单击DDL,输入下述建表语句。CREATE TABLE IF NOT EXISTS ads_user_info_1d (
uid STRING COMMENT '用户ID',
region STRING COMMENT '地域,根据ip得到',
device STRING COMMENT '终端类型 ',
pv BIGINT COMMENT 'pv',
gender STRING COMMENT '性别',
age_range STRING COMMENT '年龄段',
zodiac STRING COMMENT '星座'
)
PARTITIONED BY (
dt STRING
)
LIFECYCLE 14; 步骤二:创建函数(getregion)根据同步的原始日志数据格式,我们需要通过函数等方式将其拆解为目标格式。本案例已为您提供用于将IP解析为地域的函数所需资源,您仅需将其下载至本地,并在DataWorks注册函数前,将函数涉及的资源上传至DataWorks空间即可。1、上传资源(ip2region.jar)下载ip2region.jar。在数据开发页面打开WorkShop业务流程。右键单击MaxCompute,选择新建资源 > JAR。单击上传,选择已下载至本地的ip2region.jar,单击打开。单击工具栏按钮,将资源提交至开发环境对应的MaxCompute引擎项目。2、注册函数(getregion)进入函数注册页。在数据开发页面打开业务流程,右键单击MaxCompute,选择新建函数。填写函数名称。在新建函数对话框中,输入函数名称(getregion),单击新建。在注册函数对话框中,配置各项参数。参数描述函数类型选择函数类型。MaxCompute引擎实例默认不可以修改。函数名新建函数时输入的函数名称。责任人选择责任人。类名输入org.alidata.odps.udf.Ip2Region。资源列表输入ip2region.jar。描述输入IP地址转换地域。命令格式输入getregion('ip')。参数说明输入IP地址。提交函数。单击按钮,将函数提交至开发环境对应的引擎。步骤三:配置ODPS SQL节点本案例需要将每层加工逻辑通过ODPS SQL调度实现,由于各层节点间存在强血缘依赖,并且在数据同步阶段已将同步任务产出表手动添加为节点的输出,所以本案例数据加工类的任务依赖关系通过DataWorks自动解析机制根据血缘自动配置。打开业务流程。在数据开发页面,双击同步数据阶段创建的业务流程名,本案例业务流程名为WorkShop。新建节点。在该业务流程下,右键单击MaxCompute,选择新建节点 >ODPS SQL。本案例需要依次创建如下三个节点:dwd_log_info_di、dws_user_info_all_di、ads_user_info_1d,具体配置如下。1、配置dwd_log_info_di节点使用步骤二创建的getregion函数对ods_raw_log_d表中的IP信息进行解析,并使用正则等方式,拆解为可分析字段写入dwd_log_info_di表,dwd加工前后数据比对,可参见附录:加工示例。1. 编辑代码在业务流程面板中,双击打开dwd_log_info_di节点,并配置如下代码,DataWorks通过${变量名}格式定义代码变量。其中代码中的${bizdate}为代码变量,该变量将在后续步骤2中为其赋值。-- 场景:以下SQL使用函数getregion对原始日志数据中的ip进行解析,并通过正则等方式,将原始数据拆解为可分析字段写入并写入dwd_log_info_di表。
-- 本案例已为您准备好用于将IP解析为地域的函数getregion。
-- 补充:
-- 1. 在DataWorks节点中使用函数前,您需要先将注册函数所需资源上传至DataWorks,再通过可视化方式使用该资源注册函数,详见:https://www.alibabacloud.com/help/zh/dataworks/user-guide/create-and-use-maxcompute-resources
-- 本案例注册函数getregion所用的资源为ip2region.jar。
-- 2. DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
-- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT OVERWRITE TABLE dwd_log_info_di PARTITION (dt='${bizdate}')
SELECT ip
, uid
, time
, status
, bytes
, getregion(ip) AS region --使用自定义UDF通过IP得到地域。
, regexp_substr(request, '(^[^ ]+ )') AS method --通过正则把request差分为3个字段。
, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') AS url
, regexp_substr(request, '([^ ]+$)') AS protocol
, regexp_extract(referer, '^[^/]+://([^/]+){1}') AS referer --通过正则清晰refer,得到更精准的URL。
, CASE
WHEN TOLOWER(agent) RLIKE 'android' THEN 'android' --通过agent得到终端信息和访问形式。
WHEN TOLOWER(agent) RLIKE 'iphone' THEN 'iphone'
WHEN TOLOWER(agent) RLIKE 'ipad' THEN 'ipad'
WHEN TOLOWER(agent) RLIKE 'macintosh' THEN 'macintosh'
WHEN TOLOWER(agent) RLIKE 'windows phone' THEN 'windows_phone'
WHEN TOLOWER(agent) RLIKE 'windows' THEN 'windows_pc'
ELSE 'unknown'
END AS device
, CASE
WHEN TOLOWER(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler'
WHEN TOLOWER(agent) RLIKE 'feed'
OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') RLIKE 'feed' THEN 'feed'
WHEN TOLOWER(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)'
AND agent RLIKE '^[Mozilla|Opera]'
AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$') NOT RLIKE 'feed' THEN 'user'
ELSE 'unknown'
END AS identity
FROM (
SELECT SPLIT(col, '##&')[0] AS ip
, SPLIT(col, '##&')[1] AS uid
, SPLIT(col, '##&')[2] AS time
, SPLIT(col, '##&')[3] AS request
, SPLIT(col, '##&')[4] AS status
, SPLIT(col, '##&')[5] AS bytes
, SPLIT(col, '##&')[6] AS referer
, SPLIT(col, '##&')[7] AS agent
FROM ods_raw_log_d
WHERE dt ='${bizdate}'
) a;2. 配置调度属性通过以下配置实现调度场景下,每日00:15待上游ods_raw_log_d节点将存储于OSS的user_log.txt数据同步至MaxCompute的ods_raw_log_d表后,可触发当前dwd_log_info_di节点对ods_raw_log_d表数据进行加工,加工结果写入dwd_log_info_di表对应业务时间分区。配置调度参数:为代码中的变量bizdate赋值$[yyyymmdd-1],获取前一天的日期。配置定时调度时间:配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程根节点WorkShop的定时调度时间控制,即每日00:15后才会调度。配置依赖关系:通过代码自动解析自动将产出ods_raw_log_d表数据的ods_raw_log_d节点设置为当前节点dwd_log_info_di的上游依赖。将dwd_log_info_di表作为节点输出,方便下游查询该表数据时自动挂上当前节点依赖。3. 保存配置本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的按钮,保存当前配置。2、配置dws_user_info_all_di节点对同步到MaxCompute的用户基本信息数据ods_user_info_d和初步加工后的日志数据dwd_log_info_di进行汇总,产出用户访问信息汇总表dws_user_info_all_di。1. 编辑代码在业务流程面板中,双击打开dws_user_info_all_di节点,并配置如下代码,DataWorks通过${变量名}格式定义代码变量。其中代码中的${bizdate}为代码变量,该变量将在后续步骤2中为其赋值。-- 场景:将加工后的日志数据dwd_log_info_di与用户基本信息数据ods_user_info_d汇总写入dws_user_info_all_di表。
-- 补充:DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
-- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT OVERWRITE TABLE dws_user_info_all_di PARTITION (dt='${bizdate}')
SELECT COALESCE(a.uid, b.uid) AS uid
, b.gender
, b.age_range
, b.zodiac
, a.region
, a.device
, a.identity
, a.method
, a.url
, a.referer
, a.time
FROM (
SELECT *
FROM dwd_log_info_di
WHERE dt = '${bizdate}'
) a
LEFT OUTER JOIN (
SELECT *
FROM ods_user_info_d
WHERE dt = '${bizdate}'
) b
ON a.uid = b.uid;2. 配置调度属性通过以下配置实现调度场景下,每日00:15待上游MySQL用户基本数据通过数据集成同步至MaxCompute的ods_user_info_d表,以及dwd_log_info_di节点对ods_raw_log_d表加工完成后,将其汇总写入dws_user_info_all_di表对应业务分区。配置调度参数:为代码中的变量bizdate赋值$[yyyymmdd-1],获取前一天的日期。配置定时调度时间:配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日起调时间由业务流程根节点WorkShop的定时调度时间控制,即每日00:15分后才会调度。配置依赖关系:通过代码自动解析自动将产出dwd_log_info_di和ods_user_info_d表数据的节点dwd_log_info_di、ods_user_info_d作为当前节点dws_user_info_all_di的上游依赖。将节点产出表dws_user_info_all_di作为节点输出,方便下游查询该表数据时自动挂上当前节点依赖。3. 保存配置本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的按钮,保存当前配置。3、配置ads_user_info_1d节点对用户访问信息汇总表dws_user_info_all_di进一步加工产出基本的用户画像数据ads_user_info_1d。1. 编辑代码在业务流程面板中,双击打开ads_user_info_1d节点,并配置如下代码,DataWorks通过${变量名}格式定义代码变量。其中代码中的${bizdate}为代码变量,该变量将在后续步骤2中为其赋值。-- 场景:以下SQL用于对用户访问信息宽表dws_user_info_all_di进一步加工产出基本的用户画像数据写入ads_user_info_1d表。
-- 补充:DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
-- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT OVERWRITE TABLE ads_user_info_1d PARTITION (dt='${bizdate}')
SELECT uid
, MAX(region)
, MAX(device)
, COUNT(0) AS pv
, MAX(gender)
, MAX(age_range)
, MAX(zodiac)
FROM dws_user_info_all_di
WHERE dt = '${bizdate}'
GROUP BY uid; 2. 配置调度属性为实现周期调度,我们需要定义任务周期调度的相关属性。配置调度参数:为代码中的变量bizdate赋值$[yyyymmdd-1],获取前一天的日期。配置定时调度时间:无需单独配置当前节点定时调度时间,当前节点每日起调时间由业务流程根节点WorkShop的定时调度时间控制,即每日00:15分后才会调度。配置依赖关系:通过代码自动解析自动根据节点血缘关系配置节点上下游依赖关系,即将产出dws_user_info_all_1d表数据的dws_user_info_all_1d节点设置为当前节点ads_user_info_1d的上游。将节点产出表ads_user_info_1d作为节点输出,方便下游查询该表数据时自动挂上当前节点依赖。3. 保存配置本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的按钮,保存当前配置。步骤四:运行业务流程发布任务至生产环境前,您可以运行整个业务流程,对代码进行测试,确保其正确性。运行业务流程在业务流程(WorkShop)的编辑页面,您需要确认最终通过自动解析设置的依赖关系是否与下图一致。确认依赖关系无误后,单击工具栏的图标,运行整个任务。查看运行结果待所有任务处于状态后,查询最终加工的结果表。您可在数据开发页面的左侧导航栏,单击,进入临时查询面板。右键单击临时查询,选择新建节点 > ODPS SQL。在ODPS SQL节点中执行如下SQL语句,确认本案例最终的结果表。//您需要将分区过滤条件更新为您当前操作的实际业务日期。例如,任务运行的日期为20230222,则业务日期为20230221,即任务运行日期的前一天。
select count(*) from ads_user_info_1d where dt='业务日期';步骤五:提交并发布业务流程任务需要发布至生产环境后才可自动调度运行,请参见以下内容。提交至开发环境在业务流程面板工具栏中,单击按钮,提交整个业务流程中的任务,请按照图示配置,并单击确认。发布至生产环境提交业务流程后,表示任务已进入开发环境。由于开发环境的任务不会自动调度,您需要发布任务至生产环境。 在业务流程面板,单击工具栏中的图标,或单击数据开发页面任务发布按钮,进入创建发布包页面。批量发布目标任务,包括该业务流程涉及的资源、函数。步骤六:在生产环境执行任务在实际开发场景下,您可通过在生产环境执行补数据操作实现历史数据回刷,具体操作如下。进入运维中心。任务发布成功后,单击右上角的运维中心。您也可以进入业务流程的编辑页面,单击工具栏中的前往运维,进入运维中心页面。针对周期任务执行补数据操作。在左侧导航栏,单击周期任务运维 > 周期任务,进入周期任务页面,单击workshop业务流程的起始根节点workshop_start。右键单击workshop_start节点,选择补数据 > 当前节点及下游节点。选中workshop_start节点的所有下游节点,输入业务日期,单击确定,自动跳转至补数据实例页面。单击刷新,直至SQL任务全部运行成功即可。后续步骤任务周期性调度场景下,为保障任务产出的表数据符合预期,我们可以对任务产出的表数据进行数据质量监控,详情请参见配置数据质量监控。附录:加工示例加工前58.246.10.82##&2d24d94f14784##&2014-02-12 13:12:25##&GET /wp-content/themes/inove/img/feeds.gif HTTP/1.1##&200##&2572###&Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36加工后