我正在尝试对我的卡夫卡消费者进行单元测试.我正在尝试使用java api MockConsumer
附带的类kafka-client
.以下是我的配置代码
@Bean public MockConsumer consumer(){ MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST); consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0))); HashMapbeginningOffsets = new HashMap<>(); beginningOffsets.put(new TopicPartition("test-topic", 0), 0L); consumer.updateBeginningOffsets(beginningOffsets); consumer.addRecord(new ConsumerRecord ("test-topic",0, 0L, "mykey", "myvalue0")); consumer.addRecord(new ConsumerRecord ("test-topic", 0, 1L, "mykey", "myvalue1")); consumer.addRecord(new ConsumerRecord ("test-topic", 0, 2L, "mykey", "myvalue2")); consumer.addRecord(new ConsumerRecord ("test-topic", 0, 3L, "mykey", "myvalue3")); consumer.addRecord(new ConsumerRecord ("test-topic", 0, 4L, "mykey", "myvalue4")); HashMap endOffsets = new HashMap<>(); endOffsets.put(new TopicPartition("test-topic", 0), 4L); consumer.updateEndOffsets(endOffsets); return consumer; }
现在当我在我的测试用例中使用这个MockConsumer Bean时,如下所示
@Autowired MockConsumer kafkaConsumer; @Autowired @InjectMocks MyConsumer myConsumer; //this is the class having consumer code. This //is the class under test @Test public void testConsumeWithAutoAssignment() throws Exception { myConsumer.consumeTopic("test-topic"); }
我得到了例外
kafkaConsumer.subscribe(topic)
java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
如果有人发现了问题或解决了这个问题,请告诉我.
这是因为在您使用的bean中,consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));
这意味着消费者想要从"test-topic"中使用特定分区(0).然后在某个地方,但我没有看到你提供的代码在哪里有一个电话subscribe(topic)
.使用subscribe,消费者成为消费者组的一部分,Kafka代理自动分配分区(用于重新平衡).您不能同时使用两者:分配特定分区(USER DEFINED)和订阅自动分配.