我需要从单独的源目录中读取JSON文件,并为每个目录创建单独的表.我希望这是并行完成的,但Spark不支持嵌套的RDD,所以目前它正在按顺序执行.是否有一个很好的解决方案可以并行读取/处理这些目录?
这是我正在尝试的示例片段,但由于嵌套的RDD,它不起作用:
def readJsonCreateTable(tableInfo: (String, String)) { val df = spark .read .json(tableInfo._1) df.createOrReplaceTempView(tableInfo._2) } val dirList = List(("/mnt/jsondir1", "temptable1"), ("/mnt/jsondir2", "temptable2"), ("/mnt/jsondir3", "temptable3")) val dirRDD = sc.parallelize(dirList) dirRDD.foreach(readJsonCreateTable) // Nested RDD error
将最后一行更改为dirRDD.collect.foreach有效,但随后工作不会分发并按顺序执行,因此非常慢.
还尝试了dirRDD.collect.par.foreach,但是它只在驱动程序上运行并行线程,并且不使用所有其他节点.
我查看了foreachAsync,但由于嵌套,我不确定异步在这种情况下是否必然是并行的.
这是通过Databricks使用Spark 2.0和Scala 2.11.
===========
增加:
我尝试了foreachAsync,它在Spark中返回一个FutureAction,但也出现了错误.
import scala.concurrent._ import scala.concurrent.duration._ . . . val dirFuture = dirRDD.foreachAsync(readJsonCreateTable) Await.result(dirFuture, 1 second)
而且显然SimpleFutureAction不可序列化
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
Josh Rosen.. 5
您可以使用Scala 并行集合或期货来并行化Spark驱动程序上运行的代码.Spark驱动程序是线程安全的,因此这将按预期工作.
以下是使用具有显式指定的线程池的并行集合的示例:
val dirList = List( ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"), ("dbfs:/databricks-datasets/amazon/users/", "users") ).par val pool = new scala.concurrent.forkjoin.ForkJoinPool(2) try { dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool) dirList.foreach { case (filename, tableName) => println(s"Starting to create table for $tableName") val df = spark.read.json(filename) println(s"Done creating table for $tableName") df.createOrReplaceTempView(tableName) } } finally { pool.shutdown() // to prevent thread leaks. // You could also re-use thread pools across collections. }
当我在Databricks中运行它时,它产生了流日志输出,表明这两个表是并行加载的:
Starting to create table for departuredelays Starting to create table for users Done creating table for departuredelays Done creating table for users
这种并行性也反映在Spark UI的作业时间轴视图中.
当然,您也可以使用Java线程.简而言之,从多个线程调用Spark驱动程序API是安全的,因此选择您选择的JVM并发框架并发出对Spark驱动程序的并行调用来创建表.
您可以使用Scala 并行集合或期货来并行化Spark驱动程序上运行的代码.Spark驱动程序是线程安全的,因此这将按预期工作.
以下是使用具有显式指定的线程池的并行集合的示例:
val dirList = List( ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"), ("dbfs:/databricks-datasets/amazon/users/", "users") ).par val pool = new scala.concurrent.forkjoin.ForkJoinPool(2) try { dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool) dirList.foreach { case (filename, tableName) => println(s"Starting to create table for $tableName") val df = spark.read.json(filename) println(s"Done creating table for $tableName") df.createOrReplaceTempView(tableName) } } finally { pool.shutdown() // to prevent thread leaks. // You could also re-use thread pools across collections. }
当我在Databricks中运行它时,它产生了流日志输出,表明这两个表是并行加载的:
Starting to create table for departuredelays Starting to create table for users Done creating table for departuredelays Done creating table for users
这种并行性也反映在Spark UI的作业时间轴视图中.
当然,您也可以使用Java线程.简而言之,从多个线程调用Spark驱动程序API是安全的,因此选择您选择的JVM并发框架并发出对Spark驱动程序的并行调用来创建表.