我试图通过数据采集器上的spark-shell使用spark-cassandra-connector,但是我无法连接到我的集群.似乎版本不匹配,因为类路径包含来自其他地方的更古老的番石榴版本,即使我在启动时指定了正确的版本.我怀疑这可能是由默认情况下放入类路径的所有Hadoop依赖项引起的.
反正有没有火花壳只使用适当版本的番石榴,而没有摆脱所有与Hadoop相关的数据包包括罐子?
相关数据:
启动spark-shell,显示它具有适当版本的Guava: $ spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M3
:: loading settings :: url = jar:file:/usr/lib/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar!/org/apache/ivy/core/settings/ivysettings.xml com.datastax.spark#spark-cassandra-connector_2.10 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 in central found org.apache.cassandra#cassandra-clientutil;2.2.2 in central found com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 in central found io.netty#netty-handler;4.0.27.Final in central found io.netty#netty-buffer;4.0.27.Final in central found io.netty#netty-common;4.0.27.Final in central found io.netty#netty-transport;4.0.27.Final in central found io.netty#netty-codec;4.0.27.Final in central found com.codahale.metrics#metrics-core;3.0.2 in central found org.slf4j#slf4j-api;1.7.5 in central found org.apache.commons#commons-lang3;3.3.2 in central found com.google.guava#guava;16.0.1 in central found org.joda#joda-convert;1.2 in central found joda-time#joda-time;2.3 in central found com.twitter#jsr166e;1.1.0 in central found org.scala-lang#scala-reflect;2.10.5 in central :: resolution report :: resolve 502ms :: artifacts dl 10ms :: modules in use: com.codahale.metrics#metrics-core;3.0.2 from central in [default] com.datastax.cassandra#cassandra-driver-core;3.0.0-alpha4 from central in [default] com.datastax.spark#spark-cassandra-connector_2.10;1.5.0-M3 from central in [default] com.google.guava#guava;16.0.1 from central in [default] com.twitter#jsr166e;1.1.0 from central in [default] io.netty#netty-buffer;4.0.27.Final from central in [default] io.netty#netty-codec;4.0.27.Final from central in [default] io.netty#netty-common;4.0.27.Final from central in [default] io.netty#netty-handler;4.0.27.Final from central in [default] io.netty#netty-transport;4.0.27.Final from central in [default] joda-time#joda-time;2.3 from central in [default] org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default] org.apache.commons#commons-lang3;3.3.2 from central in [default] org.joda#joda-convert;1.2 from central in [default] org.scala-lang#scala-reflect;2.10.5 from central in [default] org.slf4j#slf4j-api;1.7.5 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 16 | 0 | 0 | 0 || 16 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 16 already retrieved (0kB/12ms) Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.5.2 /_/ Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.8.0_66-internal) Type in expressions to have them evaluated. Type :help for more information. 15/12/10 17:38:46 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. Spark context available as sc. ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/hive/conf.dist/ivysettings.xml will be used 15/12/10 17:38:54 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/12/10 17:38:54 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. SQL context available as sqlContext.
初始连接时堆栈跟踪:
java.io.IOException: Failed to open native connection to Cassandra at {10.240.0.7}:9042 at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:120) at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:249) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:51) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:146) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59) at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921) at org.apache.spark.rdd.RDD.count(RDD.scala:1125) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.( :34) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC. ( :45) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC. ( :47) at $iwC$$iwC$$iwC$$iwC$$iwC. ( :49) at $iwC$$iwC$$iwC$$iwC. ( :51) at $iwC$$iwC$$iwC. ( :53) at $iwC$$iwC. ( :55) at $iwC. ( :57) at ( :59) at . ( :63) at . ( ) at . ( :7) at . ( ) at $print( ) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$pasteCommand(SparkILoop.scala:825) at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:345) at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.withFallback(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/FutureFallback;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/Listenab leFuture; at com.datastax.driver.core.Connection.initAsync(Connection.java:178) at com.datastax.driver.core.Connection$Factory.open(Connection.java:742) at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:240) at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:187) at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79) at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1393) at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:402) at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155) ... 70 more
Dennis Huo.. 7
不幸的是,Hadoop对Guava 11的依赖(它没有提到Futures.withFallback方法)是一个长期存在的问题,事实上Hadoop 2.7.1仍然依赖于Guava 11.
Spark核心使用Guava 14,这可以在这里看到,但这可以通过在Spark程序集内部着色Guava来解决:
$ jar tf /usr/lib/spark/lib/spark-assembly.jar | grep concurrent.Futures org/spark-project/guava/util/concurrent/Futures$1.class org/spark-project/guava/util/concurrent/Futures$2.class org/spark-project/guava/util/concurrent/Futures$3.class org/spark-project/guava/util/concurrent/Futures$4.class org/spark-project/guava/util/concurrent/Futures$5.class org/spark-project/guava/util/concurrent/Futures$6.class org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture$1.class org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture$1.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture$2.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1$1.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture.class org/spark-project/guava/util/concurrent/Futures$FutureCombiner.class org/spark-project/guava/util/concurrent/Futures$ImmediateCancelledFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFailedCheckedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFailedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulCheckedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulFuture.class org/spark-project/guava/util/concurrent/Futures$MappingCheckedFuture.class org/spark-project/guava/util/concurrent/Futures.class $ javap -cp /usr/lib/spark/lib/spark-assembly.jar org.spark-project.guava.util.concurrent.Futures Compiled from "Futures.java" public final class org.spark-project.guava.util.concurrent.Futures { public staticorg.spark-project.guava.util.concurrent.CheckedFuture makeChecked(org.spark-project.guava.util.concurrent.ListenableFuture , com.google.common.base.Function ); public static org.spark-project.guava.util.concurrent.ListenableFuture immediateFuture(V); public static org.spark-project.guava.util.concurrent.CheckedFuture immediateCheckedFuture(V); public static org.spark-project.guava.util.concurrent.ListenableFuture immediateFailedFuture(java.lang.Throwable); public static org.spark-project.guava.util.concurrent.ListenableFuture immediateCancelledFuture(); public static org.spark-project.guava.util.concurrent.CheckedFuture immediateFailedCheckedFuture(X); public static org.spark-project.guava.util.concurrent.ListenableFuture withFallback(org.spark-project.guava.util.concurrent.ListenableFuture extends V>, org.spark-project.guava.util.concurrent.FutureFallback extends V>); public static org.spark-project.guava.util.concurrent.ListenableFuture withFallback(org.spark-project.guava.util.concurrent.ListenableFuture extends V>, org.spark-project.guava.util.concurrent.FutureFallback extends V>, java.util.concurrent.Executor); public static org.spark-project.guava.util.concurrent.ListenableFuture transform(org.spark-project.guava.util.concurrent.ListenableFuture, org.spark-project.guava.util.concurrent.AsyncFunction super I, ? extends O>); public static org.spark-project.guava.util.concurrent.ListenableFuture transform(org.spark-project.guava.util.concurrent.ListenableFuture, org.spark-project.guava.util.concurrent.AsyncFunction super I, ? extends O>, java.util.concurrent.Executor); public static org.spark-project.guava.util.concurrent.ListenableFuture transform(org.spark-project.guava.util.concurrent.ListenableFuture, com.google.common.base.Function super I, ? extends O>); public static org.spark-project.guava.util.concurrent.ListenableFuture transform(org.spark-project.guava.util.concurrent.ListenableFuture, com.google.common.base.Function super I, ? extends O>, java.util.concurrent.Executor); public static java.util.concurrent.Future lazyTransform(java.util.concurrent.Future, com.google.common.base.Function super I, ? extends O>); public static org.spark-project.guava.util.concurrent.ListenableFuture dereference(org.spark-project.guava.util.concurrent.ListenableFuture extends org.spark-project.guava.util.concurrent.ListenableFuture extends V>>); public static org.spark-project.guava.util.concurrent.ListenableFuture > allAsList(org.spark-project.guava.util.concurrent.ListenableFuture extends V>...); public static org.spark-project.guava.util.concurrent.ListenableFuture > allAsList(java.lang.Iterable extends org.spark-project.guava.util.concurrent.ListenableFuture extends V>>); public static org.spark-project.guava.util.concurrent.ListenableFuture > successfulAsList(org.spark-project.guava.util.concurrent.ListenableFuture extends V>...); public static org.spark-project.guava.util.concurrent.ListenableFuture > successfulAsList(java.lang.Iterable extends org.spark-project.guava.util.concurrent.ListenableFuture extends V>>); public static void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture , org.spark-project.guava.util.concurrent.FutureCallback super V>); public static void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture , org.spark-project.guava.util.concurrent.FutureCallback super V>, java.util.concurrent.Executor); public static V get(java.util.concurrent.Future , java.lang.Class ) throws X; public static V get(java.util.concurrent.Future , long, java.util.concurrent.TimeUnit, java.lang.Class ) throws X; public static V getUnchecked(java.util.concurrent.Future ); static {}; }
您可以按照http://arjon.es/2015/10/12/making-hadoop-2-dot-6-plus-spark-cassandra-driver-play-nice-together/中的说明进行操作.汇编.使用spark-shell,您可以在这里spark.driver.extraClassPath
提到一些变化,但是可能会在各个点继续发生碰撞.
不幸的是,Hadoop对Guava 11的依赖(它没有提到Futures.withFallback方法)是一个长期存在的问题,事实上Hadoop 2.7.1仍然依赖于Guava 11.
Spark核心使用Guava 14,这可以在这里看到,但这可以通过在Spark程序集内部着色Guava来解决:
$ jar tf /usr/lib/spark/lib/spark-assembly.jar | grep concurrent.Futures org/spark-project/guava/util/concurrent/Futures$1.class org/spark-project/guava/util/concurrent/Futures$2.class org/spark-project/guava/util/concurrent/Futures$3.class org/spark-project/guava/util/concurrent/Futures$4.class org/spark-project/guava/util/concurrent/Futures$5.class org/spark-project/guava/util/concurrent/Futures$6.class org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture$1.class org/spark-project/guava/util/concurrent/Futures$ChainingListenableFuture.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture$1.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture$2.class org/spark-project/guava/util/concurrent/Futures$CombinedFuture.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1$1.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture$1.class org/spark-project/guava/util/concurrent/Futures$FallbackFuture.class org/spark-project/guava/util/concurrent/Futures$FutureCombiner.class org/spark-project/guava/util/concurrent/Futures$ImmediateCancelledFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFailedCheckedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFailedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulCheckedFuture.class org/spark-project/guava/util/concurrent/Futures$ImmediateSuccessfulFuture.class org/spark-project/guava/util/concurrent/Futures$MappingCheckedFuture.class org/spark-project/guava/util/concurrent/Futures.class $ javap -cp /usr/lib/spark/lib/spark-assembly.jar org.spark-project.guava.util.concurrent.Futures Compiled from "Futures.java" public final class org.spark-project.guava.util.concurrent.Futures { public staticorg.spark-project.guava.util.concurrent.CheckedFuture makeChecked(org.spark-project.guava.util.concurrent.ListenableFuture , com.google.common.base.Function ); public static org.spark-project.guava.util.concurrent.ListenableFuture immediateFuture(V); public static org.spark-project.guava.util.concurrent.CheckedFuture immediateCheckedFuture(V); public static org.spark-project.guava.util.concurrent.ListenableFuture immediateFailedFuture(java.lang.Throwable); public static org.spark-project.guava.util.concurrent.ListenableFuture immediateCancelledFuture(); public static org.spark-project.guava.util.concurrent.CheckedFuture immediateFailedCheckedFuture(X); public static org.spark-project.guava.util.concurrent.ListenableFuture withFallback(org.spark-project.guava.util.concurrent.ListenableFuture extends V>, org.spark-project.guava.util.concurrent.FutureFallback extends V>); public static org.spark-project.guava.util.concurrent.ListenableFuture withFallback(org.spark-project.guava.util.concurrent.ListenableFuture extends V>, org.spark-project.guava.util.concurrent.FutureFallback extends V>, java.util.concurrent.Executor); public static org.spark-project.guava.util.concurrent.ListenableFuture transform(org.spark-project.guava.util.concurrent.ListenableFuture, org.spark-project.guava.util.concurrent.AsyncFunction super I, ? extends O>); public static org.spark-project.guava.util.concurrent.ListenableFuture transform(org.spark-project.guava.util.concurrent.ListenableFuture, org.spark-project.guava.util.concurrent.AsyncFunction super I, ? extends O>, java.util.concurrent.Executor); public static org.spark-project.guava.util.concurrent.ListenableFuture transform(org.spark-project.guava.util.concurrent.ListenableFuture, com.google.common.base.Function super I, ? extends O>); public static org.spark-project.guava.util.concurrent.ListenableFuture transform(org.spark-project.guava.util.concurrent.ListenableFuture, com.google.common.base.Function super I, ? extends O>, java.util.concurrent.Executor); public static java.util.concurrent.Future lazyTransform(java.util.concurrent.Future, com.google.common.base.Function super I, ? extends O>); public static org.spark-project.guava.util.concurrent.ListenableFuture dereference(org.spark-project.guava.util.concurrent.ListenableFuture extends org.spark-project.guava.util.concurrent.ListenableFuture extends V>>); public static org.spark-project.guava.util.concurrent.ListenableFuture > allAsList(org.spark-project.guava.util.concurrent.ListenableFuture extends V>...); public static org.spark-project.guava.util.concurrent.ListenableFuture > allAsList(java.lang.Iterable extends org.spark-project.guava.util.concurrent.ListenableFuture extends V>>); public static org.spark-project.guava.util.concurrent.ListenableFuture > successfulAsList(org.spark-project.guava.util.concurrent.ListenableFuture extends V>...); public static org.spark-project.guava.util.concurrent.ListenableFuture > successfulAsList(java.lang.Iterable extends org.spark-project.guava.util.concurrent.ListenableFuture extends V>>); public static void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture , org.spark-project.guava.util.concurrent.FutureCallback super V>); public static void addCallback(org.spark-project.guava.util.concurrent.ListenableFuture , org.spark-project.guava.util.concurrent.FutureCallback super V>, java.util.concurrent.Executor); public static V get(java.util.concurrent.Future , java.lang.Class ) throws X; public static V get(java.util.concurrent.Future , long, java.util.concurrent.TimeUnit, java.lang.Class ) throws X; public static V getUnchecked(java.util.concurrent.Future ); static {}; }
您可以按照http://arjon.es/2015/10/12/making-hadoop-2-dot-6-plus-spark-cassandra-driver-play-nice-together/中的说明进行操作.汇编.使用spark-shell,您可以在这里spark.driver.extraClassPath
提到一些变化,但是可能会在各个点继续发生碰撞.