当前位置:  开发笔记 > 大数据 > 正文

KafkaStreams - InconsistentGroupProtocolException

如何解决《KafkaStreams-InconsistentGroupProtocolException》经验,为你挑选了1个好方法。

我有一个Kafka Streams应用程序,它使用Kafka Streams DSL连接到我们的Kafka集群,如下所示:

KStreamBuilder builder = new KStreamBuilder();
KStream stream = builder.stream(myTopic);

// do work

kStreams = new KafkaStreams(builder, config);
kStreams.start();

我的代码库的另一部分是直接使用客户端客户端与我们的集群建立连接.

KafkaConsumer consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();

我这样做的原因是在有条件地启动应用程序的其他部分(包括Kafka Streams拓扑)之前收集有关使用者组的元数据.可能还有其他方法可以做到这一点(例如通过各种钩子或不是什么),但我更好奇为什么这些方法的混合有时会(间歇地)导致InconsistentGroupProtocolException被抛出.

请问有人可以解释为什么会被扔掉?我很难确定源代码本身到底发生了什么,但我想Kafka Streams构建的底层消费者正在指定与KafkaConsumer客户端不同的分区协议.无论如何,我们将非常感谢您对理解此异常的任何帮助



1> Matthias J. ..:

你自己回答了问题.Kafka Streams使用自定义分区分配器,而Kafka Streams客户端仅适用于其他Kafka Streams客户端.如果您使用KafkaConsumer与Kafka Streams应用程序相同的组ID,则无法阻止KafkaConsumer其加入Kafka Streams使用者组.显然,KafkaConsumer不能与Kafka Streams"玩".

推荐阅读
跟我搞对象吧
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有