我在服务器上安装了kafka,并想学习如何使用它,找到了scala编写的示例代码,下面是其中的一部分,
def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = { val props = new Properties() props.put("zookeeper.connect", zookeeper) props.put("group.id", groupId) props.put("auto.offset.reset", "largest") props.put("zookeeper.session.timeout.ms", "400") props.put("zookeeper.sync.time.ms", "200") props.put("auto.commit.interval.ms", "1000") val config = new ConsumerConfig(props) config }
但我不知道如何在服务器上找到组ID。
该group id
是你自己定义为你的消费者通过东西为它提供一个字符串ID。以相同ID开头的所有使用者将以协作的方式“协作”并阅读主题,其中每个使用者实例将处理主题中消息的子集。提供不存在的组ID将被视为新使用者,并在Zookeeper中创建新条目,其中将存储已提交的偏移量。