当我发出消息时,我想从经纪人那里得到一些回应.我已经尝试过使用的CallBack机制(通过实现CallBack),KafkaProducer.send
但它没有工作,也没有调用onCompletion
方法.
当我关闭Kafka服务器并尝试生成消息时,它会调用回调方法.
有没有其他方式得到确认?
@Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; System.out.println("Called Callback method"); if (metadata != null) { System.out.println("message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } props.put("bootstrap.servers", "localhost:9092"); props.put("client.id", "mytopic"); props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class); props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); KafkaProducerproducer = new KafkaProducer (props); long runtime = new Date().getTime(); String ip = "192.168.2."+ rnd.nextInt(255); String msg = runtime + ".www.ppop.com," + ip; producer.send(new ProducerRecord ("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`
我使用kafka-client api 0.9.1与代理版本0.8.2.