当前位置:  开发笔记 > 编程语言 > 正文

为什么我看不到Kafka Streams reduce方法的任何输出?

如何解决《为什么我看不到KafkaStreamsreduce方法的任何输出?》经验,为你挑选了1个好方法。

给出以下代码:

KStream stream =  
    builder.stream(Serdes.String(), customSerde, "test_in");

stream
    .groupByKey(Serdes.String(), customSerde)
    .reduce(new CustomReducer(), "reduction_state")
    .print(Serdes.String(), customSerde);

println在Reducer的apply方法中有一个声明,当我希望减少时,它会成功打印出来.但是,上面显示的最终打印语句不显示任何内容.同样,如果我使用to方法而不是print,我在目标主题中看不到任何消息.

在reduce语句之后我需要什么才能看到减少的结果?如果一个值被推送到输入,我不希望看到任何东西.如果按下具有相同键的第二个值,我希望减速器应用(它确实如此),并且我还期望减少的结果继续到处理管道中的下一步.如上所述,我在管道的后续步骤中没有看到任何内容,我不明白为什么.



1> Matthias J. ..:

从Kafka开始,0.10.1.0所有聚合运算符都使用内部重复数据删除缓存来减少结果KTable更改日志流的负载.例如,如果您使用相同的密钥直接计算和处理两个记录,则完整的更改日志流将是, .

使用新的缓存功能,缓存将接收并存储它,但不会立即将其发送到下游.当计算,它替换高速缓存中的第一项.根据缓存大小,不同密钥数,吞吐量和提交间隔,缓存会向下游发送条目.这发生在单个密钥条目的缓存逐出或缓存的完全刷新(向下游发送所有条目).因此,KTable更改日志可能只显示(因为重复数据删除).

您可以通过Streams配置参数控制缓存的大小StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG.如果将值设置为零,则完全禁用缓存,KTable更改日志将包含所有更新(有效地提供预先0.10.1.0行为).

汇编文档包含更详细地解释缓存的部分:

http://docs.confluent.io/current/streams/architecture.html#record-caches

http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-memory-management

推荐阅读
携手相约幸福
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有