常规scala集合有一个漂亮的collect
方法,它允许我filter-map
使用部分函数在一个传递中执行操作.火花上有相同的操作Dataset
吗?
我想它有两个原因:
句法简洁
它将filter-map
样式操作减少到一次通过(尽管在火花中我猜测有优化可以为你发现这些东西)
这是一个显示我的意思的例子.假设我有一系列选项,我想提取并加倍定义的整数(a中的那些Some
):
val input = Seq(Some(3), None, Some(-1), None, Some(4), Some(5))
方法1 - collect
input.collect { case Some(value) => value * 2 } // List(6, -2, 8, 10)
这collect
使得这个语法非常简洁,并且一次通过.
方法2 - filter-map
input.filter(_.isDefined).map(_.get * 2)
我可以将这种模式带到火花上,因为数据集和数据框有类似的方法.
但是,我不喜欢这个这么多,因为isDefined
和get
看起来像代码异味给我.有一个隐含的假设,即地图只接收Some
s.编译器无法验证这一点.在一个更大的例子中,开发人员更难发现这种假设,开发人员可能会交换过滤器并映射,例如,不会出现语法错误.
方法3 - fold*
操作
input.foldRight[List[Int]](Nil) { case (nextOpt, acc) => nextOpt match { case Some(next) => next*2 :: acc case None => acc } }
我没有使用过足够的火花来知道折叠是否有等价,所以这可能有点切线.
无论如何,模式匹配,折叠锅炉板和列表的重建都混杂在一起,很难读.
总的来说,我发现collect
语法最好,我希望spark有这样的东西.
这里的答案是不正确的,至少在Spark的当前.
RDD实际上有一个collect方法,它采用部分函数并将过滤器和映射应用于数据.这与无参数.collect()方法完全不同.请参阅Spark源代码RDD.scala @ line 955:
/** * Return an RDD that contains all matching values by applying `f`. */ def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope { val cleanF = sc.clean(f) filter(cleanF.isDefinedAt).map(cleanF) }
这不会实现RDD中的数据,而不是RDD.scala @ line 923中的无参数.collect()方法:
/** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
在文档中,请注意如何
def collect[U](f: PartialFunction[T, U]): RDD[U]
方法并没有有关数据加载到驱动程序的内存有与之相关的警告:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@collect[U](f:PartialFunction[T,U])(implicitevidence$29: scala.reflect.ClassTag [U]):org.apache.spark.rdd.RDD [U]
Spark让这些重载方法做完全不同的事情让人非常困惑.
编辑:我的错!我误解了这个问题,我们谈论的是DataSets而不是RDD.仍然,接受的答案说
"然而,Spark文档指出,"只有在期望结果数组很小的情况下才能使用此方法,因为所有数据都被加载到驱动程序的内存中."
哪个不对!调用.collect()的部分功能版本时,数据不会加载到驱动程序的内存中 - 仅在调用无参数版本时.调用.collect(partial_function)应该具有与依次调用.filter()和.map()相同的性能,如上面的源代码所示.