我正在寻找一种测试Kafka Streams应用程序的方法.这样我就可以定义输入事件,测试套件会显示输出.
没有真正的Kafka设置,这可能吗?
更新 Kafka 1.1.0(2018年3月23日发布):
KIP-247增加了官方测试工具.根据升级指南:
有一个新的神器
kafka-streams-test-utils
提供TopologyTestDriver
,ConsumerRecordFactory
和OutputVerifier
类.您可以将新工件包含为单元测试的常规依赖项,并使用测试驱动程序测试Kafka Streams应用程序的业务逻辑.有关更多详细信息,请参阅KIP-247.
从文档:
org.apache.kafka kafka-streams-test-utils 1.1.0 test
测试驱动程序模拟库运行时,该库运行时连续从输入主题中提取记录并通过遍历拓扑来处理它们.您可以使用测试驱动程序验证指定的处理器拓扑是否使用手动管道的数据记录计算正确的结果.测试驱动程序捕获结果记录并允许查询其嵌入的状态存储:
// Create your topology Topology topology = new Topology(); Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); // Run it on the test driver TopologyTestDriver testDriver = new TopologyTestDriver(topology, config); // Feed input data ConsumerRecordFactoryfactory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer()); testDriver.pipe(factory.create("key", 42L)); // Verify output ProducerRecord outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
有关详细信息,请参阅文档
ProcessorTopologyTestDriver
从0.11.0.0开始提供.它在kafka-streams
测试工件中可用(
在Maven中指定):
org.apache.kafka kafka-streams 0.11.0.0 test test
您还需要添加kafka-clients
测试工件:
org.apache.kafka kafka-clients 0.11.0.0 test test
然后你可以使用测试驱动程序.根据Javadoc,首先创建一个ProcessorTopologyTestDriver
:
StringSerializer strSerializer = new StringSerializer(); StringDeserializer strDeserializer = new StringDeserializer(); Properties props = new Properties(); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); StreamsConfig config = new StreamsConfig(props); TopologyBuilder builder = ... ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
您可以将输入提供给拓扑,就像您实际写入其中一个输入主题一样:
driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
并阅读输出主题:
ProducerRecordrecord1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer); ProducerRecord record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer); ProducerRecord record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
然后你可以断言这些结果.