我也是flink和流媒体的新手.我想为每个分区应用一个特定的功能到流的每个窗口(使用事件时间).到目前为止我所做的是:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val inputStream = env.readTextFile("dataset.txt") .map(transformStream(_)) .assignAscendingTimestamps(_.eventTime) .keyBy(_.id) .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep)) def transformStream(input: String): EventStream = {...} case class EventStream(val eventTime: Long, val id: String, actualEvent: String)
我想要做的是对每个窗口批处理的每个分区应用一般函数,可能应用复杂的处理算法或类似的东西.我已经看到该方法适用于DataStream API,但我不明白它是如何工作的.在Flink API中,它表示它在Scala中的使用方式如下:
inputStream.apply { WindowFunction }
有人可以解释一下apply方法的用途或使用方法吗?Scala中的一个例子是可取的.apply方法是否符合我的要求?
因此,根据您想要进行的计算类型,基本上有两个可能的方向.无论是使用:fold
/ reduce
/ aggregate
或更通用的一个,你已经提到的- apply
.所有这些都适用于Windows的钥匙.
至于apply
它是一种非常通用的计算方法.最基本的版本(在Scala中)将是:
def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R]
其中function有4个参数:
窗口的键(记住你正在使用keyedStream)
窗口(您可以从中提取窗口的开始或结束)
分配给此特定窗口和键的元素
您应该向其发出处理结果的收集器
必须记住,这个版本必须保持每个元素处于状态,直到窗口被发出.更好的内存性能解决方案是使用带有preAgreggator的版本,该版本在触发上述功能之前执行一些计算.
在这里你可以看到一个预先聚合的简短片段:
val stream: DataStream[(String,Int)] = ... stream.keyBy(_._1) .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap()))) .apply((e1, e2) => (e1._1, e1._2 + e2._2), (key, window, in, out: Collector[(String, Long, Long, Int)]) => { out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum)) })
它会计算会话窗口中密钥的出现次数.
所以基本上如果你不需要窗口的元信息,我会坚持fold
\ reduce
\ aggregate
如果它们足够了.考虑应用某种预先聚合,如果这还不够,请看一下最通用的apply
.
有关更完整的示例,请查看此处.