任何人都可以提出建议,使用java KafkaProducer,我们需要设置哪些属性来将消息发送到受SSL保护的kafka主题,这是kafka的新增功能,无法向受SSL保护的kafka发送一条消息
我假设您已经知道如何为SSL配置Kafka。您需要添加用于SSL加密和SSL身份验证的配置设置。基本上,这是一个基本的生产者结构。
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //configure the following three settings for SSL Encryption props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.truststore.jks"); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234"); // configure the following three settings for SSL Authentication props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/var/private/ssl/kafka.client.keystore.jks"); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234"); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer (props); TestCallback callback = new TestCallback(); Random rnd = new Random(); for (long i = 0; i < 100 ; i++) { ProducerRecord data = new ProducerRecord ( "test-topic", "key-" + i, "message-"+i ); producer.send(data, callback); } producer.close();