我遇到了一些麻烦,理解事件时间窗口周围的语义.以下程序生成一些带有时间戳的元组,这些时间戳用作事件时间并执行简单的窗口聚合.我希望输出与输入的顺序相同,但输出的排序方式不同.为什么输出与事件时间无关?
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
object WindowExample extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.enableTimestamps()
env.setParallelism(1)
val start = 1449597577379L
val tuples = (1 to 10).map(t => (start + t * 1000, t))
env.fromCollection(tuples)
.assignAscendingTimestamps(_._1)
.timeWindowAll(Time.of(1, TimeUnit.SECONDS))
.sum(1)
.print()
env.execute()
}
输入:
(1449597578379,1) (1449597579379,2) (1449597580379,3) (1449597581379,4) (1449597582379,5) (1449597583379,6) (1449597584379,7) (1449597585379,8) (1449597586379,9) (1449597587379,10)
结果:
[info] (1449597579379,2) [info] (1449597581379,4) [info] (1449597583379,6) [info] (1449597585379,8) [info] (1449597587379,10) [info] (1449597578379,1) [info] (1449597580379,3) [info] (1449597582379,5) [info] (1449597584379,7) [info] (1449597586379,9)
aljoscha.. 11
这种行为的原因是在Flink中,不考虑元素的排序(相对于时间戳).只有水印的正确性及其与元素时间戳的关系对于考虑时间的操作很重要,因为水印通常会触发基于时间的操作中的计算.
在您的示例中,窗口运算符将源中的所有元素存储在内部窗口缓冲区中.然后,源会发出一个水印,表示将来不会出现时间戳较小的元素.反过来,这会告诉窗口操作员使用低于水印的结束时间戳处理所有窗口(对于所有窗口都是如此).因此,它发出所有窗口(具有任意顺序),然后它自己发出水印.下游的操作本身将接收元素,并且一旦接收到水印就可以进行处理.
默认情况下,从源发出水印的时间间隔为200 ms.由于源发出的元素数量很少,所以在发出第一个水印之前会发出所有这些元素.在实际使用情况中,水印发射间隔比窗口大小小得多,您将获得按时间戳顺序发出的窗口的预期行为.例如,如果每500毫秒有1小时的窗口和水印.
这种行为的原因是在Flink中,不考虑元素的排序(相对于时间戳).只有水印的正确性及其与元素时间戳的关系对于考虑时间的操作很重要,因为水印通常会触发基于时间的操作中的计算.
在您的示例中,窗口运算符将源中的所有元素存储在内部窗口缓冲区中.然后,源会发出一个水印,表示将来不会出现时间戳较小的元素.反过来,这会告诉窗口操作员使用低于水印的结束时间戳处理所有窗口(对于所有窗口都是如此).因此,它发出所有窗口(具有任意顺序),然后它自己发出水印.下游的操作本身将接收元素,并且一旦接收到水印就可以进行处理.
默认情况下,从源发出水印的时间间隔为200 ms.由于源发出的元素数量很少,所以在发出第一个水印之前会发出所有这些元素.在实际使用情况中,水印发射间隔比窗口大小小得多,您将获得按时间戳顺序发出的窗口的预期行为.例如,如果每500毫秒有1小时的窗口和水印.