我试图用kafka读取火花流时遇到一些问题.
我的代码是:
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor") val ssc = new StreamingContext(sparkConf, Seconds(2)) val kafkaParams = Map[String, String]( "zookeeper.connect" -> "localhost:2181", "group.id" -> "consumergroup", "metadata.broker.list" -> "localhost:9092", "zookeeper.connection.timeout.ms" -> "10000" //"kafka.auto.offset.reset" -> "smallest" ) val topics = Set("test") val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
我之前在端口2181启动了zookeeper,在端口9092启动了Kafka服务器0.9.0.0.但是我在Spark驱动程序中遇到以下错误:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
Zookeeper日志:
[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor)
任何提示?
非常感谢你
问题与错误的spark-streaming-kafka版本有关.
如文档中所述
Kafka:Spark Streaming 1.5.2与Kafka 0.8.2.1兼容
所以,包括
org.apache.kafka kafka_2.10 0.8.2.2
在我的pom.xml(而不是版本0.9.0.0)中解决了这个问题.
希望这可以帮助