我正在使用下面的代码来阅读主题的消息。我面临两个问题。每当我启动消费者时,它正在读取队列中的所有消息?如何只阅读未读邮件?
from kafka import KafkaConsumer consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) for message in consumer: consumer.commit() # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
se7entyse7en.. 6
正如@Kenji所说,您必须使用来提交偏移量consumer.commit()
。如果你不想手动提交你可以通过启用自动提交enable_auto_commit=True
给你的KafkaConsumer
。您可能还需要调整auto_commit_interval_ms
每次自动提交之间的间隔(以毫秒为单位)。参见此处:http : //kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html。
正如@Kenji所说,您必须使用来提交偏移量consumer.commit()
。如果你不想手动提交你可以通过启用自动提交enable_auto_commit=True
给你的KafkaConsumer
。您可能还需要调整auto_commit_interval_ms
每次自动提交之间的间隔(以毫秒为单位)。参见此处:http : //kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html。