我使用的是Kafka主题中的KafkaMessageListenerContainer,我有一个应用程序逻辑来处理每个记录,该记录也依赖于其他微服务。我现在在处理每条记录后手动提交偏移量。
但是,如果我的应用程序逻辑失败,则需要寻找失败的偏移量并继续对其进行处理,直到成功为止。为此,我需要对最后一个偏移量进行运行时手动搜索。
KafkaMessageListenerContainer是否可能呢?
请参阅寻求特定的偏移量。
为了进行搜索,您的侦听器必须实现
ConsumerSeekAware
具有以下方法的方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map
assignments, ConsumerSeekCallback callback);
void onIdleContainer(Map
assignments, ConsumerSeekCallback callback); 第一个在容器启动时被调用;在初始化后的任意时间搜索时,应使用此回调。您应该保存对回调的引用;如果您在多个容器(或中
ConcurrentMessageListenerContainer
)使用同一个侦听器,则应将回调存储在ThreadLocal
或其他由侦听器线程键控的结构中。