当前位置:  开发笔记 > 编程语言 > 正文

通过Scala Spark读取单独的目录并并行创建单独的RDD

如何解决《通过ScalaSpark读取单独的目录并并行创建单独的RDD》经验,为你挑选了1个好方法。

我需要从单独的源目录中读取JSON文件,并为每个目录创建单独的表.我希望这是并行完成的,但Spark不支持嵌套的RDD,所以目前它正在按顺序执行.是否有一个很好的解决方案可以并行读取/处理这些目录?

这是我正在尝试的示例片段,但由于嵌套的RDD,它不起作用:

def readJsonCreateTable(tableInfo: (String, String)) {
  val df = spark
           .read
           .json(tableInfo._1)
  df.createOrReplaceTempView(tableInfo._2)
}

val dirList = List(("/mnt/jsondir1", "temptable1"),
                   ("/mnt/jsondir2", "temptable2"),
                   ("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error

将最后一行更改为dirRDD.collect.foreach有效,但随后工作不会分发并按顺序执行,因此非常慢.

还尝试了dirRDD.collect.par.foreach,但是它只在驱动程序上运行并行线程,并且不使用所有其他节点.

我查看了foreachAsync,但由于嵌套,我不确定异步在这种情况下是否必然是并行的.

这是通过Databricks使用Spark 2.0和Scala 2.11.

===========
增加:

我尝试了foreachAsync,它在Spark中返回一个FutureAction,但也出现了错误.

import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)

而且显然SimpleFutureAction不可序列化

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction

Josh Rosen.. 5

您可以使用Scala 并行集合或期货来并行化Spark驱动程序上运行的代码.Spark驱动程序是线程安全的,因此这将按预期工作.

以下是使用具有显式指定的线程池的并行集合的示例:

val dirList = List(
  ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"),
  ("dbfs:/databricks-datasets/amazon/users/", "users")
).par

val pool = new scala.concurrent.forkjoin.ForkJoinPool(2)

try {
  dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool)
  dirList.foreach { case (filename, tableName) =>
    println(s"Starting to create table for $tableName")
    val df = spark.read.json(filename)
    println(s"Done creating table for $tableName")
    df.createOrReplaceTempView(tableName)
  }
} finally {
  pool.shutdown() // to prevent thread leaks.
  // You could also re-use thread pools across collections.
}

当我在Databricks中运行它时,它产生了流日志输出,表明这两个表是并行加载的:

Starting to create table for departuredelays
Starting to create table for users
Done creating table for departuredelays
Done creating table for users

这种并行性也反映在Spark UI的作业时间轴视图中.

当然,您也可以使用Java线程.简而言之,从多个线程调用Spark驱动程序API是安全的,因此选择您选择的JVM并发框架并发出对Spark驱动程序的并行调用来创建表.



1> Josh Rosen..:

您可以使用Scala 并行集合或期货来并行化Spark驱动程序上运行的代码.Spark驱动程序是线程安全的,因此这将按预期工作.

以下是使用具有显式指定的线程池的并行集合的示例:

val dirList = List(
  ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"),
  ("dbfs:/databricks-datasets/amazon/users/", "users")
).par

val pool = new scala.concurrent.forkjoin.ForkJoinPool(2)

try {
  dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool)
  dirList.foreach { case (filename, tableName) =>
    println(s"Starting to create table for $tableName")
    val df = spark.read.json(filename)
    println(s"Done creating table for $tableName")
    df.createOrReplaceTempView(tableName)
  }
} finally {
  pool.shutdown() // to prevent thread leaks.
  // You could also re-use thread pools across collections.
}

当我在Databricks中运行它时,它产生了流日志输出,表明这两个表是并行加载的:

Starting to create table for departuredelays
Starting to create table for users
Done creating table for departuredelays
Done creating table for users

这种并行性也反映在Spark UI的作业时间轴视图中.

当然,您也可以使用Java线程.简而言之,从多个线程调用Spark驱动程序API是安全的,因此选择您选择的JVM并发框架并发出对Spark驱动程序的并行调用来创建表.


那个链接的SO问题正在讨论并行性_within_一个任务,这与在驱动程序上并行运行多个Spark作业不同.
推荐阅读
可爱的天使keven_464
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有