Flink_EventTime 与 Window
文章目录
- 1 EventTime 的引入
- 2. Watermark
- 2.1 基本概念
- 2.2 Watermark 的引入
- 3 EventTimeWindow API
- 3.1 滚动窗口(TumblingEventTimeWindows)
- 3.2 滑动窗口(SlidingEventTimeWindows)
- 3.3 会话窗口(EventTimeSessionWindows)
1 EventTime 的引入
在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。 如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2. Watermark
2.1 基本概念
2.2 Watermark 的引入
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream =
env.readTextFile("eventTest.txt").assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200
)) {override def extractTimestamp(t: String): Long = {// EventTime 是日志生成时间,我们从日志中解析 EventTime t.split(" ")(0).toLong } })
3 EventTimeWindow API
3.1 滚动窗口(TumblingEventTimeWindows)
参考代码
package com.czxy.flink.stream.waterwindowimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._object TumblingEventTimeWindowsDemo {def main(args: Array[String]): Unit = {/*** 步骤:* 1.创建流处理环境* 2.设置EventTime* 3.构建数据源* 4.设置水印* 5.逻辑处理* 6.引入滚动窗口TumblingEventTimeWindows* 7.聚合操作* 8.输出打印* 9.执行程序*///1.创建流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.设置EventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//3.构建数据源//数据格式: 1000 helloval socketSource = env.socketTextStream("node01",9999)//4.设置水印val waterMark: DataStream[String] = socketSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {override def extractTimestamp(element: String): Long = {val eventTime: Long = element.split(" ")(0).toLongeventTime}})//5.逻辑处理val groupStream: KeyedStream[(String, Int), String] = waterMark.map(x=>x.split(" ")(1)).map((_,1)).keyBy(_._1)//6.引入滚动窗口TumblingEventTimeWindowsval windowStream: WindowedStream[(String, Int), String, TimeWindow] = groupStream.window(TumblingEventTimeWindows.of(Time.seconds(3)))//7.聚合操作val result: DataStream[(String, Int)] = windowStream.sum(1)// val resultDataStream: DataStream[(String, Int)] = windowStream.reduce((v1, v2)=>(v1._1,v1._2+v2._2))//8.输出打印result.print()//9.执行程序env.execute(this.getClass.getSimpleName)}
}
结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)
3.2 滑动窗口(SlidingEventTimeWindows)
参考代码
package com.czxy.flink.stream.waterwindowimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._object SlidingEventTimeWindowsDemo {def main(args: Array[String]): Unit = {/** * 步骤:* 1.创建流处理环境* 2.设置EventTime* 3.构建数据源* 4.设置水印* 5.逻辑处理* 6.引入滑动窗口SlidingEventTimeWindows* 7.聚合操作* 8.输出打印* 9.执行程序*///1.创建流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.设置EventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//3.构建数据源//数据格式为:1000 helloval socketSource: DataStream[String] = env.socketTextStream("node01", 9999)//4.设置水印val waterMark: DataStream[String] = socketSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {override def extractTimestamp(element: String): Long = {val eventTime: Long = element.split(" ")(0).toLongeventTime}})//5.逻辑处理val groupStream: KeyedStream[(String, Int), String] = waterMark.map(x => x.split(" ")(1)).map((_, 1)).keyBy(_._1)//6.引入滑动窗口SlidingEventTimeWindowsval windowStream: WindowedStream[(String, Int), String, TimeWindow] = groupStream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(2)))//7.聚合计算val result: DataStream[(String, Int)] = windowStream.sum(1)//8.打印输出result.print()//9.执行程序env.execute(this.getClass.getSimpleName)}
}
3.3 会话窗口(EventTimeSessionWindows)
相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入 Watermark,那么当触发执行时,所有满足时间间隔而还没有触发的 Window 会同时触发执 行。
参考代码
package com.czxy.flink.stream.waterwindowimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._object EventTimeSessionWindowsDemo {def main(args: Array[String]): Unit = {/*** 步骤:* 1.创建流处理环境* 2.设置EventTime* 3.构建数据源* 4.设置水印* 5.逻辑处理* 6.引入会话窗口EventTimeSessionWindows* 7.聚合操作* 8.输出打印*///1.创建流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.设置EventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//3.构建数据源//数据格式为:1000 helloval socketSource: DataStream[String] = env.socketTextStream("node01", 9999)//4.设置水印val waterMark: DataStream[String] = socketSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {override def extractTimestamp(element: String): Long = {val eventTime: Long = element.split(" ")(0).toLongeventTime}})//5.逻辑处理val groupStream: KeyedStream[(String, Int), String] = waterMark.map(x => x.split(" ")(1)).map((_, 1)).keyBy(_._1)//6.引入会话窗口EventTimeSessionWindowsval windowStream: WindowedStream[(String, Int), String, TimeWindow] = groupStream.window(EventTimeSessionWindows.withGap(Time.seconds(3)))//7.聚合计算val result = windowStream.sum(1)//8.打印输出result.print()//9.执行程序env.execute(this.getClass.getSimpleName)}
}
发布评论