当前位置:  开发笔记 > 前端 > 正文

KafkaException:无法实例化类JsonDeserializer

如何解决《KafkaException:无法实例化类JsonDeserializer》经验,为你挑选了1个好方法。

我正在尝试使用Kafka客户端按照此参考指南启动Spring Boot应用程序,我收到以下错误.
你能告诉我如何修理吗?

@Bean
public Map consumerConfig() {
    final HashMap result = new HashMap<>();
    result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap.servers"));
    result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return result;
}

@Bean
public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig());
}

@Bean
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory());
    containerFactory.setConcurrency(3);
    containerFactory.getContainerProperties().setPollTimeout(3000);
    return containerFactory;
}

-

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:545) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
    at com.ubs.wma.bmss.BmssConsumerApp.main(BmssConsumerApp.java:12) [classes/:na]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:703) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:553) ~[kafka-clients-0.10.1.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:305) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:230) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:180) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:124) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236) ~[spring-kafka-1.1.2.RELEASE.jar:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    ... 12 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.springframework.kafka.support.serializer.JsonDeserializer
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:316) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:203) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:632) ~[kafka-clients-0.10.1.1.jar:na]
    ... 24 common frames omitted
Caused by: java.lang.IllegalAccessException: Class org.apache.kafka.common.utils.Utils can not access a member of class org.springframework.kafka.support.serializer.JsonDeserializer with modifiers "protected"
    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) ~[na:1.8.0_121]
    at java.lang.Class.newInstance(Class.java:436) ~[na:1.8.0_121]
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:314) ~[kafka-clients-0.10.1.1.jar:na]
    ... 26 common frames omitted

Artem Bilan.. 8

根据该文件,我们有:

对于更复杂或特定的情况,KafkaConsumer因此KafkaProducer,提供重载的构造函数以分别接受(De)Serializer键和/或值的实例.

为了达到这个API中,DefaultKafkaProducerFactory并且DefaultKafkaConsumerFactory还提供了性能,使注入的自定义(De)Serializer目标Producer/Consumer.

进一步的Apache Kafka JavaDocs:

/**
 * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
 * Valid configuration strings are documented here.
 * Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
 * either the string "42" or the integer 42).
 * @param configs   The producer configs
 * @param keySerializer  The serializer for key that implements {@link Serializer}. The configure() method won't be
 *                       called in the producer when the serializer is passed in directly.
 * @param valueSerializer  The serializer for value that implements {@link Serializer}. The configure() method won't
 *                         be called in the producer when the serializer is passed in directly.
 */
public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) {

所以,你需要的是这样的:

@Bean
public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig(), null, new JsonDeserializer(Foo.class));
}

JsonDeserializer无法通过反射实例化的问题,因为它需要特定类型来反序列化.



1> Artem Bilan..:

根据该文件,我们有:

对于更复杂或特定的情况,KafkaConsumer因此KafkaProducer,提供重载的构造函数以分别接受(De)Serializer键和/或值的实例.

为了达到这个API中,DefaultKafkaProducerFactory并且DefaultKafkaConsumerFactory还提供了性能,使注入的自定义(De)Serializer目标Producer/Consumer.

进一步的Apache Kafka JavaDocs:

/**
 * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
 * Valid configuration strings are documented here.
 * Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
 * either the string "42" or the integer 42).
 * @param configs   The producer configs
 * @param keySerializer  The serializer for key that implements {@link Serializer}. The configure() method won't be
 *                       called in the producer when the serializer is passed in directly.
 * @param valueSerializer  The serializer for value that implements {@link Serializer}. The configure() method won't
 *                         be called in the producer when the serializer is passed in directly.
 */
public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) {

所以,你需要的是这样的:

@Bean
public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfig(), null, new JsonDeserializer(Foo.class));
}

JsonDeserializer无法通过反射实例化的问题,因为它需要特定类型来反序列化.

推荐阅读
携手相约幸福
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有