当前位置:  开发笔记 > 编程语言 > 正文

Flink Streaming - 在Windows中应用功能

如何解决《FlinkStreaming-在Windows中应用功能》经验,为你挑选了1个好方法。

我也是flink和流媒体的新手.我想为每个分区应用一个特定的功能到流的每个窗口(使用事件时间).到目前为止我所做的是:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val inputStream = env.readTextFile("dataset.txt")
      .map(transformStream(_))
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.id)
      .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))

def transformStream(input: String): EventStream = {...}

case class EventStream(val eventTime: Long, val id: String, actualEvent: String)

我想要做的是对每个窗口批处理的每个分区应用一般函数,可能应用复杂的处理算法或类似的东西.我已经看到该方法适用于DataStream API,但我不明白它是如何工作的.在Flink API中,它表示它在Scala中的使用方式如下:

inputStream.apply { WindowFunction }

有人可以解释一下apply方法的用途或使用方法吗?Scala中的一个例子是可取的.apply方法是否符合我的要求?



1> Dawid Wysako..:

因此,根据您想要进行的计算类型,基本上有两个可能的方向.无论是使用:fold/ reduce/ aggregate或更通用的一个,你已经提到的- apply.所有这些都适用于Windows的钥匙.

至于apply它是一种非常通用的计算方法.最基本的版本(在Scala中)将是:

def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R] 

其中function有4个参数:

窗口的键(记住你正在使用keyedStream)

窗口(您可以从中提取窗口的开始或结束)

分配给此特定窗口和键的元素

您应该向其发出处理结果的收集器

必须记住,这个版本必须保持每个元素处于状态,直到窗口被发出.更好的内存性能解决方案是使用带有preAgreggator的版本,该版本在触发上述功能之前执行一些计算.

在这里你可以看到一个预先聚合的简短片段:

val stream: DataStream[(String,Int)] =   ...

stream.keyBy(_._1)
      .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
      .apply((e1, e2) => (e1._1, e1._2 + e2._2),
             (key, window, in, out: Collector[(String, Long, Long, Int)]) => {
                out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
      })

它会计算会话窗口中密钥的出现次数.

所以基本上如果你不需要窗口的元信息,我会坚持fold\ reduce\ aggregate如果它们足够了.考虑应用某种预先聚合,如果这还不够,请看一下最通用的apply.

有关更完整的示例,请查看此处.

推荐阅读
云聪京初瑞子_617
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有