Flink窗口的应用

本文整理自:Apache Flink Contributor、OPPO大数据平台研发负责人张俊老师在Flink实时数仓篇的分享

Window 应用场景

image-20230610204417554

  • 聚合统计:比如从Kafka中读取数据,根据不同的维度做一分钟或几分钟的聚合计算,然后写入到外部存储。
  • 记录合并:对可合并的原始数据进行合并。例如:用户的行为数据,对单个用户来说,可以考虑在一定的窗口范围内进行合并,再写入到下游系统,比如ES,这样也可以降低下游的写入压力。
  • 双流join:针对join场景,如果进行全量join,Flink会把数据记录在状态里,并且不会清除,导致成本开销非常大。所以,通常这种情况下,需要用窗口join。

Window 抽象概念

image-20230610205648588

  • TimestampAssigner:如果时间语义是Event-Time,需要通过TimestampAssigner来告诉Flink,数据的哪一部分来当作Event-Time。
  • KeySelector:Key选择器,用来告诉Flink做聚合统计的维度。
  • WindowAssigner:窗口分配器,用来确定到来的数据被划分到哪个窗口。
  • State:存储窗口内的元素,如果有AggregateFunction,那么存储的就是增量聚合的中间状态。
  • AggregateFunction(可选):增量聚合函数,适用于可以增量聚合的逻辑,减轻State的压力。
  • Trigger:窗口触发器,用来确定何时触发窗口的计算。
  • Evictor(可选):驱逐器,用于在窗口函数计算之前(后)对满足其驱逐条件的数据进行过滤。
  • WindowFunction:窗口函数,对窗口的数据进行计算。
  • Collector:将窗口的计算结果发送到下游。

上图中,红色的部分都是可以自定义的模块,通过自定义这些模块的组合,可以实现更高级的窗口应用。同时Flink也提供了一些内置的实现。

Window 编程接口

keyed Windows

1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- 仅 keyed 窗口需要
.window(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (省略则使用默认 trigger)
[.evictor(...)] <- 可选项:"evictor" (省略则不使用 evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (省略则为 0)
[.sideOutputLateData(...)] <- 可选项:"output tag" (省略则不对迟到数据使用 side output)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"

Non-keyed Windows

1
2
3
4
5
6
7
8
stream
.windowAll(...) <- 必填项:"assigner"
[.trigger(...)] <- 可选项:"trigger" (else default trigger)
[.evictor(...)] <- 可选项:"evictor" (else no evictor)
[.allowedLateness(...)] <- 可选项:"lateness" (else zero)
[.sideOutputLateData(...)] <- 可选项:"output tag" (else no side output for late data)
.reduce/aggregate/apply() <- 必填项:"function"
[.getSideOutput(...)] <- 可选项:"output tag"

Window Assigner

image-20230610211444506

总结一下主要有三类窗口:

  • Time Window
  • Count Window
  • Custom Window

Window Trigger

Trigger用来确定窗口什么时候触发计算。

Flink内置了一些Trigger:

内置Trigger 说明
ProcessingTrigger 一次触发,machine time大于窗口结束时间触发
EventTimeTrigger 一次触发,watermark大于窗口结束时间触发
ContinuousProcessingTimeTrigger 多次触发,基于processing time的固定时间间隔
ContinuourEventTimeTrigger 多次触发,基于event time的固定时间间隔
CountTrigger 多次触发,基于element的固定条数
DeltaTrigger 多次触发,当前element与上一次触发trigger的element做delta计算,超过threshold时触发
PurgingTrigger trigger wrapper,当nested trigger触发时,额外清理从窗口当前的中间状态

Trigger示例

image-20230610213254767

如上图,定义一个五分钟基于Event-Time的Window,定义一个两分钟触发一次的Trigger,有四条数据事件事件分别是20:01,20:02,20:03,20:04,对应的值分别是1,2,3,4。window的计算逻辑是对值进行sum计算。初始状态,State和Result中的值都是0。

image-20230610213737445

当第一条数据在20:01时进入窗口,State的值变为1,此时还没到达Trigger的触发时机。

image-20230610213844038

第二条数据进入window,State中的值变为1+2=3,此时满足Trigger的触发机制,所以Result输出结果为3。

image-20230610214055540

第三条数据进入window,State值变为6,此时不能触发Trigger,没有结果输出。

image-20230610214132542

第四条数据进入window,State值变为8,此时Trigger又可以触发窗口计算,输出结果为8。如果把结果输出到支持update的存储,比如MySQL,那么结果就从3更新成8。

问题:如果外部存储只支持append呢?

image-20230610214353949

如果Result不支持update操作,只能进行append,则会输出两条记录,在此基础上再做计算就会引起错误。

使用PurgingTrigger解决上述问题。

PurgingTrigger的应用

image-20230610214534278

还是和前面案例类似,不同的是,此时将触发器包装了一个PurgingTrigger。PurigingTrigger的作用是再Trigger触发窗口计算逻辑后,将窗口内的State清除。

流程如下:

image-20230610214800074

前两条数据到达窗口,state变为3,同时触发窗口计算,并输出结果。

image-20230610214846942

由于包装了PurgingTrigger,窗口中的State会被清理掉。

这样针对于下游只能Append的系统,使用PurgingTrigger,外部存储只需要再进行sum计算,就能得到最终的正确结果。

Delta Trigger

image-20230610215125246

首先要考虑的时如何来划分窗口,它既不是一个时间的窗口,也不是一个基于数量的窗口。用创痛的窗口实现比较困难,这种情况下可以考虑使用DeltaTrigger来实现。

案例代码链接:DeltaTrigger

  • 思考点

上述案例我们通过GlobalWindow和DeltaTrigger来实现自定义的Window Assigner的功能。对于一些复杂的窗口,我们也可以定义WindowAssigner,但是实现起来不一定简单,倒不如使用GlobalWindow+Trigger来达到同样的效果。

Flink内置的CountWindow的实现,也是基于GlobalWindow + Trigger来实现的。

image-20230610215841460

Window Evictor

Flink内置了一些Evictor

内置Evictor 说明
CountEvictor 窗口计算时,只保留最近N条element
TimeEvictor 窗口计算时,只保留最近N段时间范围内的element
DeltaEvictor 窗口计算时,最新的一条element与其他element做delta计算,只保留在delta在threshold内的element

TImeEvicot的应用

image-20230610220226175

实现上只需要在前面的基础上增加Evictor,过滤掉最后15分钟前的数据。

image-20230610220329978

Window Function

Flink内置的WindowFunction有两类,一种是AggregateFunction,适用于增量计算的场景,每到来一条数据就做一次聚合,状态中存储的是最新的计算逻辑值。

  • 优点:增量聚合,状态小。
  • 缺点:输出只有一个聚合值,场景有限。

另外一种是ProcessingWindowFunction,做全量计算,数据全部都存在State中,当窗口触发时才会调用计算逻辑。

优点:可以获取到窗口内的所有数据,实现逻辑灵活,可以获取聚合的key和窗口的相关信息。

缺点:State压力大。

增量计算和全量计算可以一起使用,通过AggregateFunction做增量聚合,减少中间状态的压力。通过ProcessWindowFunction来输出我们想要的信息,比如Key和窗口等信息。