博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用blink CEP实现流计算中的超时统计问题
阅读量:6991 次
发布时间:2019-06-27

本文共 6801 字,大约阅读时间需要 22 分钟。

hot3.png

案例与解决方案汇总页:

一. 背景介绍

如<利用blink+MQ实现流计算中的延时统计问题>一文中所描述的场景,我们将其简化为以下案例:

实时流的数据源结构如下:

物流订单号 支付时间 仓接单时间 仓出库时间
LP1 2018-08-01 08:00    
LP1 2018-08-01 08:00 2018-08-01 09:00  
LP2 2018-08-01 09:10  
LP2 2018-08-01 09:10 2018-08-01 09:50  
LP2 2018-08-01 09:10 2018-08-01 09:50 ​2018-08-01 12:00

我们期望通过以上数据源,按照支付日期统计,每个仓库的仓接单量、仓出库量、仓接单超2H未出库单量、仓接单超6H未出库单量。可以看出,其中LP1仓接单时间是2018-08-01 09:00,但一直到2018-08-01 12:00点之前,一直都没有出库,LP1满足仓接单超2H未出库的行为。

该场景的难点就在于:订单未出库。而对于TT中的源消息流,订单未出库,TT就不会下发新的消息,不下发新的消息,blink就无法被触发计算。而针对上述的场景,对于LP1,我们需要在仓接单时间是2018-08-01 09:00+2H,也就是2018-08-01 11:00的之后,就要知道LP1已经仓接单但超2H未出库了。

二. 解决方案

本文主要是利用blink CEP来实现上述场景,具体实现步骤如下所述。

第一步:在source DDL中定义event_timestamp,并定义sink,如下:

----定义sourcecreate table sourcett_dwd_ri(     lg_order_code                  varchar comment '物流订单号'    ,ded_pay_time                   varchar comment '支付时间'    ,store_code                     varchar comment '仓库编码'    ,store_name                     varchar comment '仓库名称'    ,wms_create_time                varchar comment '仓接单时间'    ,wms_consign_create_time        varchar comment '仓出库时间'    ,evtstamp as case when coalesce(wms_create_time, '') <> ''                      then to_timestamp(wms_create_time, 'yyyy-MM-dd HH:mm:ss')                      else to_timestamp('1970-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss')                 end   --构造event_timestamp,如果源表本身带有消息的occur_time,可直接选择occur_time作为event_timestamp    ,WATERMARK FOR evtstamp AS withOffset(evtstamp, 10000)  --设置延迟10秒处理)with(     type='tt'    ,topic='dwd_ri'    ,accessKey='xxxxxx'    ,accessId='xxxxxx'    ,lengthCheck='PAD'    ,nullValues='\\N|');----定义sinkcreate table sink_hybrid_blink_cep(     ded_pay_date                   varchar comment '支付日期'    ,store_code                     varchar comment '仓库编码'    ,store_name                     varchar comment '仓库名称'    ,wms_create_ord_cnt             bigint  comment '仓接单量'    ,wms_confirm_ord_cnt            bigint  comment '仓出库量'    ,wmsin_nowmsout_2h_ord_cnt      bigint  comment '仓接单超2小时未出库单量'    ,wmsin_nowmsout_6h_ord_cnt      bigint  comment '仓接单超6小时未出库单量'        ,sub_partition                  bigint  comment '二级分区(支付日期)'    ,PRIMARY KEY (ded_pay_date, store_code, sub_partition))with(     type='PetaData'    ,url = 'xxxxxx'    ,tableName='blink_cep'    ,userName='xxxxxx'    ,password='xxxxxx'    ,bufferSize='30000'    ,batchSize='3000'    ,batchWriteTimeoutMs='15000');

第二步:根据blink CEP的标准语义进行改写,如下:

create view blink_cep_v1asselect   '仓接单-仓出库超时' as timeout_type        ,lg_order_code        ,wms_create_time as start_time        ,wms_consign_create_time as end_timefrom     source_dwd_csn_whc_lgt_fl_ord_riMATCH_RECOGNIZE(         PARTITION BY lg_order_code         ORDER BY     evtstamp         MEASURES                      e1.wms_create_time         as wms_create_time                     ,e2.wms_consign_create_time as wms_consign_create_time         ONE ROW PER MATCH WITH TIMEOUT ROWS  --重要,必须设置延迟也下发         AFTER MATCH SKIP TO NEXT ROW         PATTERN (e1 -> e2) WITHIN INTERVAL '6' HOUR         EMIT TIMEOUT (INTERVAL '2' HOUR, INTERVAL '6' HOUR)         DEFINE             e1 as e1.wms_create_time is not null and e1.wms_consign_create_time is null            ,e2 as e2.wms_create_time is not null and e2.wms_consign_create_time is not null)where    wms_create_time is not null      --重要,可以大大减少进入CEP的消息量and      wms_consign_create_time is null  --重要,可以大大减少进入CEP的消息量;

第三步:根据blink的执行机制,我们通过源实时流sourcett_dwd_ri与超时消息流blink_cep_v1关联,来触发blink对超时消息进行聚合操作,如下:

create view blink_cep_v2asselect   a.lg_order_code                       as lg_order_code        ,last_value(a.store_code             ) as store_code        ,last_value(a.store_name             ) as store_name        ,last_value(a.ded_pay_time           ) as ded_pay_time        ,last_value(a.wms_create_time        ) as wms_create_time        ,last_value(a.real_wms_confirm_time  ) as real_wms_confirm_time        ,last_value(case when coalesce(a.wms_create_time, '') <> ''                         and  coalesce(a.real_wms_confirm_time, '') = ''                          and  now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 7200                         then 'Y' else 'N' end) as flag_01        ,last_value(case when coalesce(a.wms_create_time, '') <> ''                         and  coalesce(a.real_wms_confirm_time, '') = ''                          and  now() - unix_timestamp(a.wms_create_time,'yyyy-MM-dd HH:mm:ss') >= 21600                         then 'Y' else 'N' end) as flag_02from        (select   lg_order_code                       as lg_order_code                 ,last_value(store_code             ) as store_code                 ,last_value(store_name             ) as store_name                 ,last_value(ded_pay_time           ) as ded_pay_time                 ,last_value(wms_create_time        ) as wms_create_time                 ,last_value(wms_consign_create_time) as real_wms_confirm_time         from     sourcett_dwd_ri         group by lg_order_code         ) aleft outer join        (select   lg_order_code                 ,count(*) as cnt         from     blink_cep_v1         group by lg_order_code         ) bon       a.lg_order_code = b.lg_order_codegroup by a.lg_order_code;insert into sink_hybrid_blink_cepselect   regexp_replace(substring(a.ded_pay_time, 1, 10), '-', '') as ded_pay_date        ,a.store_code        ,max(a.store_name)        as store_name        ,count(case when coalesce(a.wms_create_time, '') <> '' then a.lg_order_code end) as wmsin_ord_cnt        ,count(case when coalesce(a.real_wms_confirm_time, '') <> '' then a.lg_order_code end) as wmsout_ord_cnt        ,count(case when a.flag_01 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_2h_ord_cnt        ,count(case when a.flag_02 = 'Y' then a.lg_order_code end) as wmsin_nowmsout_6h_ord_cnt        ,cast(regexp_replace(SUBSTRING(ded_pay_time, 1, 10), '-', '') as bigint) as sub_partitionfrom     blink_cep_v2 as t1where    coalesce(lg_cancel_time, '') = ''and      coalesce(ded_pay_time, '') <> ''group by regexp_replace(substring(ded_pay_time, 1, 10), '-', '')        ,a.store_code;

三. 问题拓展

  1. blink CEP的参数比较多,要完全看懂,着实需要一些时间,但CEP的强大是毋庸置疑的。CEP不仅可以解决物流场景中的超时统计问题,风控中的很多场景也是信手拈来。这里有一个风控中的场景,通过上述物流案例的用法,我们是否能推敲出这个场景的用法呢?
    风控案例测试数据如下:
刷卡时间 银行卡ID 刷卡地点
2018-04-13 12:00:00 1 WW
2018-04-13 12:05:00 1 WW1
2018-04-13 12:10:00 1 WW2
2018-04-13 12:20:00 1 WW

我们认为,当一张银行卡在10min之内,在不同的地点被刷卡大于等于两次,我们就期望对消费者出发预警机制。

  1. blink CEP是万能的么?答案是否定的,当消息乱序程度比较高的时候,实时性和准确性就成了一对矛盾的存在。要想实时性比较高,必然要求设置的offset越小越好,但offset设置比较小,就直接可能导致很多eventtime<watermark-offset的消息,直接被丢弃,准确性很难保证。比如,在CP回传物流详情的时候,经常回传的时间跟实操的时间差异很大(实操时间是10点,但回传时间是15点),如果以实操时间作为eventtime,可能就会导致这种差异很大的消息被直接丢掉,无法进入CEP,进而无法触发CEP后续的计算,在使用CEP的过程中,应该注意这一点。

四. 作者简介

花名:缘桥,来自菜鸟-CTO-数据部-仓配数据研发,主要负责菜鸟仓配业务的离线和实时数据仓库建设以及创新数据技术和工具的探索和应用。

转载于:https://my.oschina.net/u/1464083/blog/3018748

你可能感兴趣的文章
基于 HTML5 的 WebGL 3D 档案馆可视化管理系统
查看>>
Spring webflux 函数式编程web框架
查看>>
[译] 使用 React 和 ImmutableJS 构建一个拖放布局构建器
查看>>
AFNetworking源码阅读1
查看>>
ENFI下载器地址——百度网盘不限速下载工具
查看>>
栈的应用---后缀表达式
查看>>
吴恩达机器学习系列7:逻辑回归
查看>>
使用Charles搭建反向代理
查看>>
程序员笔记| 详解Eureka 缓存机制
查看>>
在Mac 系统下进行文件的显示和隐藏
查看>>
聊聊hazelcast的PhiAccrualFailureDetector
查看>>
Item 16 Favor composition over inheritance
查看>>
破解bmob云模糊查询收费 微信小程序端
查看>>
mysql索引使用经验总结
查看>>
【浅度渣文】Jackson之jackson-core
查看>>
吴恩达机器学习系列15:学习曲线
查看>>
记录 iView 的表单验证
查看>>
你可能并没有真正理解for-in
查看>>
block初窥
查看>>
扁平化图标的终极设计指南
查看>>