当前位置:  开发笔记 > 编程语言 > 正文

Kafka python使用者启动时阅读所有消息

如何解决《Kafkapython使用者启动时阅读所有消息》经验,为你挑选了1个好方法。

我正在使用下面的代码来阅读主题的消息。我面临两个问题。每当我启动消费者时,它正在读取队列中的所有消息?如何只阅读未读邮件?

from kafka import KafkaConsumer


consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    consumer.commit() 
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

se7entyse7en.. 6

正如@Kenji所说,您必须使用来提交偏移量consumer.commit()。如果你不想手动提交你可以通过启用自动提交enable_auto_commit=True给你的KafkaConsumer。您可能还需要调整auto_commit_interval_ms每次自动提交之间的间隔(以毫秒为单位)。参见此处:http : //kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html。



1> se7entyse7en..:

正如@Kenji所说,您必须使用来提交偏移量consumer.commit()。如果你不想手动提交你可以通过启用自动提交enable_auto_commit=True给你的KafkaConsumer。您可能还需要调整auto_commit_interval_ms每次自动提交之间的间隔(以毫秒为单位)。参见此处:http : //kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html。

推荐阅读
虎仔球妈_459
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有