我使用以下代码来读取主题中的消息.如何在阅读后删除邮件?
from kafka import KafkaConsumer consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
小智.. 5
没有办法从kafka中删除特定的消息 - kafka根本就不是为了这样做而设计的.删除邮件的唯一方法是将log.retention.hours
kafka 设置config/server.properties
为您喜欢的值.默认值为168 - 表示168小时后不保留邮件.
如果您正在寻找一种方法来读取特定偏移量的消息 - 即每次都不从头开始读取,请查看http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html
commit()
- 提交读取kafka的偏移量
seek_to_end()
- 快进以消耗新到达的消息
seek()
- 移动到给定的偏移量(可能存储在除kafka之外的其他地方)
没有办法从kafka中删除特定的消息 - kafka根本就不是为了这样做而设计的.删除邮件的唯一方法是将log.retention.hours
kafka 设置config/server.properties
为您喜欢的值.默认值为168 - 表示168小时后不保留邮件.
如果您正在寻找一种方法来读取特定偏移量的消息 - 即每次都不从头开始读取,请查看http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html
commit()
- 提交读取kafka的偏移量
seek_to_end()
- 快进以消耗新到达的消息
seek()
- 移动到给定的偏移量(可能存储在除kafka之外的其他地方)