当前位置:  开发笔记 > 编程语言 > 正文

Kafka流加入特定键作为输入

如何解决《Kafka流加入特定键作为输入》经验,为你挑选了1个好方法。

我在架构注册表中有3个不同的主题和3个Avro文件,我想流这些主题并将它们结合在一起并将它们写入一个主题。问题是我要加入的密钥与我将数据写入每个主题的密钥不同。

假设我们有以下3个Avro文件:
Alarm

{
  "type" : "record",
  "name" : "Alarm",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "alarm_id",
    "type" : "string",
    "doc" : "Unique identifier of the alarm."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "Unique identifier of the  network element ID that produces the alarm."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the alarm was generated."
  }, {
    "name" : "severity",
    "type" : [ "null", "string" ],
    "doc" : "The severity field is the default severity associated to the alarm ",
    "default" : null
  }]
}

事件:

{
  "type" : "record",
  "name" : "Incident",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "incident_id",
    "type" : "string",
    "doc" : "Unique identifier of the incident."
  }, {
    "name" : "incident_type",
    "type" : [ "null", "string" ],
    "doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
    "default" : null
  }, {
    "name" : "alarm_source_id",
    "type" : "string",
    "doc" : "Respective Alarm"
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the incident was generated on the node."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "ID of specific network element."
  }]
}

保养:

{
  "type" : "record",
  "name" : "Maintenance",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "maintenance_id",
    "type" : "string",
    "doc" : "The message number is the unique ID for every maintenance"
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "The NE ID is the network element ID on which the maintenance is done."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }, {
    "name" : "end_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }]
}

我在Kafka中有3个针对每个Avro的主题(例如,alarm_raw,incident_raw,maintenance_raw),每当我要写入这些主题时,我都会使用ne_id作为键(因此主题由ne_id划分)。现在,我想加入这3个主题并获得新的记录并将其写入新的主题。问题是,我想加入报警事件基于alarm_idalarm_source_id并加入基于报警和维护NE_ID。我想避免创建新主题并重新分配新密钥。无论如何,我在加入时是否指定了密钥?



1> Matthias J. ..:

这取决于您要使用哪种联接(请参阅https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)

对于KStream-KStream连接,当前(v0.10.2以及更早的时间)除了设置新密钥(例如使用selectKey())并进行重新分区外,别无其他方法。

对于KStream-KTable加入,Kafka 0.10.2(将在下周发布)包含一个名为GlobalKTables(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables的新功能)+ to + Kafka + Streams)。这使您可以在KTable上执行非键联接(即,KStream-GlobalKTable联接,因此您无需在GlobalKTable中重新分区数据)。

注意:KStream-GlobalKTable连接的语义与KStream-KTable连接的语义不同。与后面的时间相比,它没有时间同步,因此,对于GlobalKTable更新,联接在设计上是不确定的。即,不能保证哪个KStream记录将是第一个“看到” GlobalKTable更新并因此与更新的GlobalKTable记录联接的记录。

也有计划添加KTable-GlobalKTable连接。这可能在中可用0.10.3。尽管没有计划添加“全局” KStream-KStream连接。

推荐阅读
乐韵答题
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有