我有一个logstash输入设置为
input { kafka { bootstrap_servers => "zookeper_address" topics => ["topic1","topic2"] } }
我需要在elasticsearch中将主题提供给两个不同的索引.任何人都可以帮我解决如何为这样的任务设置输出.这时我只能设置
output { elasticsearch { hosts => ["localhost:9200"] index => "my_index" codec => "json" document_id => "%{id}" } }
我需要在同一elasticsearch例如两个指标说index1
和index2
,这将通过对未来的信息供给topic1
和topic2
首先,您需要添加decorate_events
到您的kafka
输入,以便知道消息来自哪个主题
input { kafka { bootstrap_servers => "zookeper_address" topics => ["topic1","topic2"] decorate_events => true } }
然后,您有两个选项,都涉及条件逻辑.第一种方法是引入一个过滤器,用于根据主题名称添加正确的索引名称.为此你需要添加
filter { if [kafka][topic] == "topic1" { mutate { add_field => {"[@metadata][index]" => "index1"} } } else { mutate { add_field => {"[@metadata][index]" => "index2"} } } # remove the field containing the decorations, unless you want them to land into ES mutate { remove_field => ["kafka"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "%{[@metadata][index]}" codec => "json" document_id => "%{id}" } }
然后第二个选项是直接在输出部分中执行if/else,就像这样(但附加kafka
字段将落入ES):
output { if [@metadata][kafka][topic] == "topic1" { elasticsearch { hosts => ["localhost:9200"] index => "index1" codec => "json" document_id => "%{id}" } } else { elasticsearch { hosts => ["localhost:9200"] index => "index2" codec => "json" document_id => "%{id}" } } }