我正在尝试对主题数据进行一些丰富.因此,从Kafka读取使用Spark结构化流媒体回到Kafka.
val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("group.id", groupId)
.option("subscribe", "topicname")
.load()
val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
record._2, record._3)
val query = enriched.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("group.id", groupId)
.option("topic", "desttopic")
.start()
但我得到一个例外:
Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266) at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319) at kafka_bridge.KafkaBridge.main(KafkaBridge.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
任何解决方法?