当前位置:  开发笔记 > 前端 > 正文

测试Kafka Streams拓扑

如何解决《测试KafkaStreams拓扑》经验,为你挑选了1个好方法。

我正在寻找一种测试Kafka Streams应用程序的方法.这样我就可以定义输入事件,测试套件会显示输出.

没有真正的Kafka设置,这可能吗?



1> Dmitry Minko..:

更新 Kafka 1.1.0(2018年3月23日发布):

KIP-247增加了官方测试工具.根据升级指南:

有一个新的神器kafka-streams-test-utils提供TopologyTestDriver,ConsumerRecordFactoryOutputVerifier类.您可以将新工件包含为单元测试的常规依赖项,并使用测试驱动程序测试Kafka Streams应用程序的业务逻辑.有关更多详细信息,请参阅KIP-247.

从文档:


    org.apache.kafka
    kafka-streams-test-utils
    1.1.0
    test

测试驱动程序模拟库运行时,该库运行时连续从输入主题中提取记录并通过遍历拓扑来处理它们.您可以使用测试驱动程序验证指定的处理器拓扑是否使用手动管道的数据记录计算正确的结果.测试驱动程序捕获结果记录并允许查询其嵌入的状态存储:

// Create your topology
Topology topology = new Topology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

// Feed input data
ConsumerRecordFactory factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));

// Verify output
ProducerRecord outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

有关详细信息,请参阅文档


ProcessorTopologyTestDriver从0.11.0.0开始提供.它在kafka-streams测试工件中可用(test在Maven中指定):


    org.apache.kafka
    kafka-streams
    0.11.0.0
    test
    test

您还需要添加kafka-clients测试工件:


    org.apache.kafka
    kafka-clients
    0.11.0.0
    test
    test

然后你可以使用测试驱动程序.根据Javadoc,首先创建一个ProcessorTopologyTestDriver:

StringSerializer strSerializer = new StringSerializer();
StringDeserializer strDeserializer = new StringDeserializer();
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = ...
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

您可以将输入提供给拓扑,就像您实际写入其中一个输入主题一样:

driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

并阅读输出主题:

ProducerRecord record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

然后你可以断言这些结果.

推荐阅读
Chloemw
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有