当前位置:  开发笔记 > 前端 > 正文

logstash 5.0.1:设置elasticsearch多个索引输出多个kafka输入主题

如何解决《logstash5.0.1:设置elasticsearch多个索引输出多个kafka输入主题》经验,为你挑选了1个好方法。

我有一个logstash输入设置为

input {
  kafka {
  bootstrap_servers => "zookeper_address"
  topics => ["topic1","topic2"]
  }
}

我需要在elasticsearch中将主题提供给两个不同的索引.任何人都可以帮我解决如何为这样的任务设置输出.这时我只能设置

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
  }
}

我需要在同一elasticsearch例如两个指标说index1index2,这将通过对未来的信息供给topic1topic2



1> Val..:

首先,您需要添加decorate_events到您的kafka输入,以便知道消息来自哪个主题

input {
  kafka {
    bootstrap_servers => "zookeper_address"
    topics => ["topic1","topic2"]
    decorate_events => true
  }
}

然后,您有两个选项,都涉及条件逻辑.第一种方法是引入一个过滤器,用于根据主题名称添加正确的索引名称.为此你需要添加

filter {
   if [kafka][topic] == "topic1" {
      mutate {
         add_field => {"[@metadata][index]" => "index1"}
      }
   } else {
      mutate {
         add_field => {"[@metadata][index]" => "index2"}
      }
   }
   # remove the field containing the decorations, unless you want them to land into ES
   mutate {
      remove_field => ["kafka"]
   }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][index]}"
    codec => "json"
    document_id => "%{id}"
  }
}

然后第二个选项是直接在输出部分中执行if/else,就像这样(但附加kafka字段将落入ES):

output {
   if [@metadata][kafka][topic] == "topic1" {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index1"
       codec => "json"
       document_id => "%{id}"
     }
   } else {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index2"
       codec => "json"
       document_id => "%{id}"
     }
   }
}

推荐阅读
落单鸟人
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有