我正在一个4小时的窗口上进行跳跃窗口聚合,每5分钟推进一次.由于跳跃窗口重叠,我得到具有不同聚合值的重复键.
TimeWindows.of(240 * 60 * 1000L).advanceBy(5 * 60* 1000L)
如何使用重复数据消除重复键或仅选择包含最新值的键.
如果我理解正确,那么这是预期的行为.您没有看到"重复"键,但您看到相同键的连续更新.
认为:
# Extreme case: record caches disabled (size set to 0) alice->1, alice->2, alice->3, alice->4, ..., alice->100, ... # With record cache enabled, you would see sth like this. alice->23, alice->59, alice->100, ...
请查看http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management上的说明,该说明更详细地描述了这一点.如果您希望每个记录键看到更少的"重复",您可以在应用程序的配置中通过cache.max.bytes.buffering
aka 增加记录缓存(使用DSL时)的大小StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
.还有一个相互作用commit.interval.ms
.
如果您想知道"为什么Kafka Streams API首先以这种方式运行",我建议您发布博客文章https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-本周早些时候发布的模型.
除了Michael所写的内容,在跳跃窗口中还有另一层"重复".由于窗口重叠,后续窗口发出的值可能相同.例如,假设你有一个五分钟的窗口和一分钟的跳跃:{0..5},{1..6},{2..7}
依此类推.来自输入主题的给定记录可以属于不同的时间窗口.
这与翻转窗口相反,窗口不重叠,因此每个记录都是单个窗口的一部分.不幸的是,翻滚的窗户并不适合所有用例; 示例可以是聚合,其中具有相同键的两个记录落在两个后续窗口的边缘中.
使用跳跃窗口时,有几种方法可以"重复删除".一种方法是在下游'重复删除'.另一种方法是在Kafka Streams中执行此操作,但这仅适用于特定拓扑.如上所述,这些结果不是真正的重复,而是连续窗口的结果.如果您只想要某个键的最后一个窗口的结果,您可以编写如下内容:
windowedKtable .toStream((windowedKey, value) -> windowedKey.key()) .groupByKey() .reduce((value1, value2) -> value1.lastestActivity() > value2.lastestActivity() ? value1 : value2)
我不会说这是一种最佳做法,只是在特定情况下克服问题的一种方法.
有关Kafka Streams中窗口的更多信息:https: //docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing