我有2个由3个ZK节点支持的Kafkas。我想通过在每个节点上本地运行kafka-console-producer和-consumer来测试Kafka节点。
因此,我使用2个不同的终端通过SSH进入我的Kafka经纪人之一。在1号航站楼中,我按如下方式运行使用者:
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper a.b.c.d:2181 --topic test1
其中abcd是我的3个ZK节点之一的专用IP。
然后在2号航站楼中,我像这样运行生产者:
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
我可以毫无问题地启动消费者和生产者。
但是,在生产者终端中,如果我通过输入一些文本(例如“ hello”)并按Enter键来在test1主题上“触发”消息,我将立即开始看到以下内容:
[2017-01-17 19:45:57,353] WARN Error while fetching metadata with correlation id 0 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,372] WARN Error while fetching metadata with correlation id 1 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,477] WARN Error while fetching metadata with correlation id 2 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2017-01-17 19:45:57,582] WARN Error while fetching metadata with correlation id 3 : {test1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) ...and it keeps going!
而且,在消费者终端中,即使启动消费者时没有出现任何错误,大约30秒后,我也会收到以下警告消息:
[2017-01-17 19:46:07,292] WARN Fetching topic metadata with correlation id 1 for topics [Set(test1)] from broker [BrokerEndPoint(1,ip-x-y-z-w.ec2.internal,9092)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) at kafka.producer.SyncProducer.send(SyncProducer.scala:124) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
有趣的是,ip-xyzw.ec2.internal是其他Kafka经纪人的专用DNS,所以也许这是经纪人之间的通信失败?
关于这里发生的事情以及如何解决问题的任何想法?
这是server.properties
两个Kafkas节点的完整文件:
listeners=PLAINTEXT://0.0.0.0:9092 advertised.host.name=.ec2.internal advertised.listeners=PLAINTEXT://0.0.0.0:9092 broker.id=1 port=9092 num.partitions=4 zookeeper.connect=zkA:2181,zkB:2181,zkC:2181 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 log.dirs=/tmp/kafka-logs num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connection.timeout.ms=6000 offset.metadata.max.bytes=4096
请让我知道是否有任何类似配置的气味。