一. 背景介绍
实时流的数据源结构如下:物流订单号 | 支付时间 | 仓接单时间 | 仓出库时间 |
LP1 | 2018-08-01 08:00 | ||
LP1 | 2018-08-01 08:00 | 2018-08-01 09:00 | |
LP2 | 2018-08-01 09:10 | | |
LP2 | 2018-08-01 09:10 | 2018-08-01 09:50 | |
LP2 | 2018-08-01 09:10 | 2018-08-01 09:50 | 2018-08-01 12:00 |
我们期望通过以上数据源,按照支付日期统计,每个仓库的仓接单量、仓出库量、仓接单超2H未出库单量、仓接单超6H未出库单量。可以看出,其中LP1仓接单时间是2018-08-01 09:00,但一直到2018-08-01 12:00点之前,一直都没有出库,LP1满足仓接单超2H未出库的行为。
该场景的难点就在于:订单未出库。而对于TT中的源消息流,订单未出库,TT就不会下发新的消息,不下发新的消息,blink就无法被触发计算。而针对上述的场景,对于LP1,我们需要在仓接单时间是2018-08-01 09:00+2H,也就是2018-08-01 11:00的之后,就要知道LP1已经仓接单但超2H未出库了。
二. 解决方案
本文主要是利用blink CEP来实现上述场景,具体实现步骤如下所述。
第一步:在source DDL中定义event_timestamp,并定义sink,如下:----定义sourcecreate table sourcett_dwd_ri( lg_order_code varchar comment '物流订单号' ,ded_pay_time varchar comment '支付时间' ,store_code varchar comment '仓库编码' ,store_name varchar comment '仓库名称' ,wms_create_time varchar comment '仓接单时间' ,wms_consign_create_time varchar comment '仓出库时间' ,evtstamp as case when coalesce(wms_create_time, '') <> '' then to_timestamp(wms_create_time, 'yyyy-MM-dd HH:mm:ss') else to_timestamp('1970-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss') end --构造event_timestamp,如果源表本身带有消息的occur_time,可直接选择occur_time作为event_timestamp ,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000) --设置延迟10秒处理)with( type='tt' ,topic='dwd_ri' ,accessKey='xxxxxx' ,accessId='xxxxxx' ,lengthCheck='PAD' ,nullValues='\\N|');----定义sinkcreate table sink_hybrid_blink_cep( ded_pay_date varchar comment '支付日期' ,store_code varchar comment '仓库编码' ,store_name varchar comment '仓库名称' ,wms_create_ord_cnt bigint comment '仓接单量' ,wms_confirm_ord_cnt bigint comment '仓出库量' ,wmsin_nowmsout_2h_ord_cnt bigint comment '仓接单超2小时未出库单量' ,wmsin_nowmsout_6h_ord_cnt bigint comment '仓接单超6小时未出库单量' ,sub_partition bigint comment '二级分区(支付日期)' ,PRIMARY KEY (ded_pay_date, store_code, sub_partition))with( type='PetaData' ,url = 'xxxxxx' ,tableName='blink_cep' ,userName='xxxxxx' ,password='xxxxxx' ,bufferSize='30000' ,batchSize='3000' ,batchWriteTimeoutMs='15000');
第二步:根据blink CEP的标准语义进行改写,如下:
create view blink_cep_v1asselect '仓接单-仓出库超时' as timeout_type ,lg_order_code ,wms_create_time as start_time ,wms_consign_create_time as end_timefrom source_dwd_csn_whc_lgt_fl_ord_riMATCH_RECOGNIZE( PARTITION BY lg_order_code ORDER BY evtstamp MEASURES e1.wms_create_time as wms_create_time ,e2.wms_consign_create_time as wms_consign_create_time ONE ROW PER MATCH WITH TIMEOUT ROWS --重要,必须设置延迟也下发 AFTER MATCH SKIP TO NEXT ROW PATTERN (e1 -> e2) WITHIN INTERVAL '6' HOUR EMIT TIMEOUT (INTERVAL '2' HOUR, INTERVAL '6' HOUR) DEFINE e1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null ,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null)where wms_create_time is not null --重要,可以大大减少进入CEP的消息量and wms_consign_create_time is null --重要,可以大大减少进入CEP的消息量;
create view blink_cep_v2asselect a.lg_order_code as lg_order_code ,last_value(a.store_code ) as store_code ,last_value(a.store_name ) as store_name ,last_value(a.ded_pay_time ) as ded_pay_time ,last_value(a.wms_create_time ) as wms_create_time ,last_value(a.real_wms_confirm_time ) as real_wms_confirm_time ,last_value(case when coalesce(a.wms_create_time, '') <> '' and coalesce(a.real_wms_confirm_time, '') = '' and now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 7200 then 'Y' else 'N' end) as flag_01 ,last_value(case when coalesce(a.wms_create_time, '') <> '' and coalesce(a.real_wms_confirm_time, '') = '' and now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 21600 then 'Y' else 'N' end) as flag_02from (select lg_order_code as lg_order_code ,last_value(store_code ) as store_code ,last_value(store_name ) as store_name ,last_value(ded_pay_time ) as ded_pay_time ,last_value(wms_create_time ) as wms_create_time ,last_value(wms_consign_create_time) as real_wms_confirm_time from sourcett_dwd_ri group by lg_order_code ) aleft outer join (select lg_order_code ,count(*) as cnt from blink_cep_v1 group by lg_order_code ) bon a.lg_order_code = b.lg_order_codegroup by a.lg_order_code;insert into sink_hybrid_blink_cepselect regexp_replace(substring(a.ded_pay_time, 1, 10), '-', '') as ded_pay_date ,a.store_code ,max(a.store_name) as store_name ,count(case when coalesce(a.wms_create_time, '') <> '' then a.lg_order_code end) as wmsin_ord_cnt ,count(case when coalesce(a.real_wms_confirm_time, '') <> '' then a.lg_order_code end) as wmsout_ord_cnt ,count(case when a.flag_01 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt ,count(case when a.flag_02 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt ,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), '-', '') as bigint) as sub_partitionfrom blink_cep_v2 as t1where coalesce(lg_cancel_time, '') = ''and coalesce(ded_pay_time, '') <> ''group by regexp_replace(substring(ded_pay_time, 1, 10), '-', '') ,a.store_code;
三. 问题拓展
- blink CEP的参数比较多,要完全看懂,着实需要一些时间,但CEP的强大是毋庸置疑的。CEP不仅可以解决物流场景中的超时统计问题,风控中的很多场景也是信手拈来。这里有一个风控中的场景,通过上述物流案例的用法,我们是否能推敲出这个场景的用法呢? 风控案例测试数据如下:
刷卡时间 | 银行卡ID | 刷卡地点 |
2018-04-13 12:00:00 | 1 | WW |
2018-04-13 12:05:00 | 1 | WW1 |
2018-04-13 12:10:00 | 1 | WW2 |
2018-04-13 12:20:00 | 1 | WW |
- blink CEP是万能的么?答案是否定的,当消息乱序程度比较高的时候,实时性和准确性就成了一对矛盾的存在。要想实时性比较高,必然要求设置的offset越小越好,但offset设置比较小,就直接可能导致很多eventtime<watermark-offset的消息,直接被丢弃,准确性很难保证。比如,在CP回传物流详情的时候,经常回传的时间跟实操的时间差异很大(实操时间是10点,但回传时间是15点),如果以实操时间作为eventtime,可能就会导致这种差异很大的消息被直接丢掉,无法进入CEP,进而无法触发CEP后续的计算,在使用CEP的过程中,应该注意这一点。
四. 作者简介