我正在运行这个片段来对点的RDD进行排序,对RDD进行排序并从给定点获取K-最近点:
def getKNN(sparkContext:SparkContext, k:Int, point2:Array[Double], pointsRDD:RDD[Array[Double]]): RDD[Array[Double]] = { val tuplePointDistanceRDD:RDD[(Double, Array[Double])] = pointsRDD.map(point => (DistanceUtils.euclidianDistance(point, point2), point)) sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))
}
在我的应用程序中只使用一个SparkContext并将其作为参数传递给我的函数,我在收到KNN点org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
的时刻收到错误.sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))
point2
我正在构建sparkContext
这个片段:
var sparkContext = new SparkContext("local", "")
面对这种错误的可能原因是什么?
基本上这是我的独立spark环境的LOG,其中包含此错误的堆栈跟踪:
15/12/24 11:55:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:55731] 15/12/24 11:55:29 INFO Utils: Successfully started service 'sparkDriver' on port 55731. 15/12/24 11:55:29 INFO SparkEnv: Registering MapOutputTracker 15/12/24 11:55:29 INFO SparkEnv: Registering BlockManagerMaster 15/12/24 11:55:29 INFO DiskBlockManager: Created local directory at /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/blockmgr-70e73cfe-683b-4297-aa5d-de38f98d02f1 15/12/24 11:55:29 INFO MemoryStore: MemoryStore started with capacity 491.7 MB 15/12/24 11:55:29 INFO HttpFileServer: HTTP File server directory is /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/spark-f7bc8b6f-7d4f-4c55-8dff-0fbc4f6c2532/httpd-fb502369-4c28-4585-a37e-f3645d1d55a3 15/12/24 11:55:29 INFO HttpServer: Starting HTTP Server 15/12/24 11:55:29 INFO Utils: Successfully started service 'HTTP file server' on port 55732. 15/12/24 11:55:29 INFO SparkEnv: Registering OutputCommitCoordinator 15/12/24 11:55:29 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/12/24 11:55:29 INFO SparkUI: Started SparkUI at http://localhost:4040 15/12/24 11:55:29 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 15/12/24 11:55:29 INFO Executor: Starting executor ID driver on host localhost 15/12/24 11:55:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55733. 15/12/24 11:55:29 INFO NettyBlockTransferService: Server created on 55733 15/12/24 11:55:29 INFO BlockManagerMaster: Trying to register BlockManager 15/12/24 11:55:29 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55733 with 491.7 MB RAM, BlockManagerId(driver, localhost, 55733) 15/12/24 11:55:29 INFO BlockManagerMaster: Registered BlockManager 15/12/24 11:55:30 INFO TorrentBroadcast: Started reading broadcast variable 0 org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.RDD.sortBy$default$3(RDD.scala:548) at LOF$.getKNN(LOF.scala:14) at LOF$.lof(LOF.scala:25) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply$mcV$sp(BehaviourActivityScoreJudgeTest.scala:14) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656) at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714) at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714) at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760) at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760) at BehaviourActivityScoreJudgeTest.org$scalatest$BeforeAndAfterAll$$super$run(BehaviourActivityScoreJudgeTest.scala:4) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at BehaviourActivityScoreJudgeTest.run(BehaviourActivityScoreJudgeTest.scala:4) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) at org.scalatest.tools.Runner$.run(Runner.scala:883) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:137) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) Caused by: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) ... 94 more 15/12/24 11:55:30 INFO SparkUI: Stopped Spark web UI at http://localhost:4040 15/12/24 11:55:30 INFO DAGScheduler: Stopping DAGScheduler 15/12/24 11:55:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/12/24 11:55:30 INFO MemoryStore: MemoryStore cleared 15/12/24 11:55:30 INFO BlockManager: BlockManager stopped 15/12/24 11:55:30 INFO BlockManagerMaster: BlockManagerMaster stopped 15/12/24 11:55:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 15/12/24 11:55:30 INFO SparkContext: Successfully stopped SparkContext 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Saulo Ricci.. 5
刚刚发现为什么我得到这个异常:因为某个原因,我的SparkContext
对象在ScalaTest
方法之间多次启动/停止.所以,修正这种行为会让我以正常的方式工作.
刚刚发现为什么我得到这个异常:因为某个原因,我的SparkContext
对象在ScalaTest
方法之间多次启动/停止.所以,修正这种行为会让我以正常的方式工作.