当前位置:  开发笔记 > 前端 > 正文

Kafka Streams API:KStream到KTable

如何解决《KafkaStreamsAPI:KStream到KTable》经验,为你挑选了1个好方法。

我有一个Kafka主题,我发送位置事件(key = user_id,value = user_location).我能够阅读并处理它KStream:

KStreamBuilder builder = new KStreamBuilder();

KStream locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });

这很有效,但我想拥有KTable每个用户的最后一个已知位置.我怎么能这样做?

我能够写一个中间主题并从中读取:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable table = builder.table("location_topic_aux", "store");

有没有一种简单的方法来获得KTable一个KStream?这是我第一个使用Kafka Streams的应用程序,所以我可能会遗漏一些明显的东西.



1> Matthias J. ..:

目前没有直接的方法来做到这一点.您的方法绝对有效,如汇总常见问题解答中所述:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an -aggregation步

这是关于代码的最简单方法.但是,它的缺点是:(a)您需要管理其他主题,并且(b)由于数据被写入Kafka并从Kafka重新读取,因此会导致额外的网络流量.

有一种替代方案,使用"虚拟减少":

KStreamBuilder builder = new KStreamBuilder();
KStream stream = ...; // some computation that creates the derived KStream

KTable table = stream.groupByKey().reduce(
    new Reducer() {
        @Override
        public Long apply(Long aggValue, Long newValue) {
            return newValue;
        }
    },
    "dummy-aggregation-store");

与选项1相比,这种方法在代码方面稍微复杂一些,但其优点是:(a)不需要手动主题管理;(b)不需要从Kafka重读数据.

总的来说,你需要自己决定,你更喜欢哪种方法:

在选项2中,Kafka Streams将创建一个内部更改日志主题,以备份KTable以实现容错.因此,这两种方法都需要在Kafka中增加一些存储空间并导致额外的网络流量.总的来说,这是选项2中稍微复杂的代码与选项1中的手动主题管理之间的权衡.


什么是你的Streams版本?对于旧版本,它应该是`stream.reduceByKey(...)`而不是`stream.groupByKey().reduce(...)`.请参阅http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation
推荐阅读
手机用户2402851335
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有