当前位置:  开发笔记 > 前端 > 正文

如何管理Kafka KStream到Kstream窗口加入?

如何解决《如何管理KafkaKStream到Kstream窗口加入?》经验,为你挑选了2个好方法。

基于apache Kafka docs KStream-to-KStream Joins are always windowed joins,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者,例如,我们可以将数据保留1个月,但过去一周加入流?

是否有任何好的示例来显示窗口化的KStream-to-kStream窗口连接?

在我的情况,让我们说我有2 KStream,kstream1并且kstream2我希望能够加入十天kstream1至30天kstream2.



1> Matthias J. ..:

这绝对是可能的.定义Stream运算符时,可以显式指定连接窗口大小.

KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 JoinWindows.of(joinWindowSizeMs)
);

// or if you want to use retention time

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 (JoinWindows)JoinWindows.of(joinWindowSizeMs)
                                         .until(windowRetentionTimeMs)
);

有关详细信息,请参阅http://docs.confluent.io/current/streams/developer-guide.html#joining-streams.

滑动窗口基本上定义了一个额外的连接谓词.在类似SQL的语法中,这将是:

SELECT * FROM stream1, stream2
WHERE
   stream1.key = stream2.key
   AND
   stream1.ts - before <= stream2.ts
   AND
   stream2.ts <= stream1.ts + after

其中before == after == joinWindowSizeMs在本实施例中.如果您使用和显式设置这些值before,after也可以使用不同的值.JoinWindows#before()JoinWindows#after()

源主题的保留时间完全独立于windowRetentionTimeMs应用于Kafka Streams自身创建的更改日志主题的指定.窗口保留允许彼此连接无序记录,即迟到的记录(请记住,Kafka具有基于偏移的订购保证,但是关于时间戳,记录可能是乱序的) .



2> Michael G. N..:

除了Matthias J.Sax所说的之外,还有一个流对流(窗口式)连接示例,位于:https : //github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test /java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java

这适用于带有Apache Kafka 0.10.1的Confluent 3.1.x,即截至2017年1月的最新版本。有关master使用较新版本的代码示例,请参见上面的存储库中的分支。

这是上面代码示例的关键部分(同样,对于Kafka 0.10.1),略微适合您的问题。请注意,此示例恰好演示了OUTER JOIN。

long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);

final Serde stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");

KStream impressionsAndClicks = alerts.outerJoin(incidents,
    (impressionValue, clickValue) -> impressionValue + "/" + clickValue,
    // KStream-KStream joins are always windowed joins, hence we must provide a join window.
    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
    stringSerde, stringSerde, stringSerde);

// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");

推荐阅读
sx-March23
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有