我有两个RDD,一个(a, b, a, c, b, c, a)
和另一个 - 配对的RDD ((a, 0), (b, 1), (c, 2))
.
我想分别用0,1,2(分别是第二个RDD中的键a,b,c的值)替换a
第一个RDD中的s,b
s和c
s.我想在第一个RDD中保留事件的顺序.
如何在Spark中实现它?
例如这样:
val rdd1 = sc.parallelize(Seq("a", "b", "a", "c", "b", "c", "a"))
val rdd2 = sc.parallelize(Seq(("a", 0), ("b", 1), ("c", 2)))
rdd1
.map((_, 1)) // Map first to PairwiseRDD with dummy values
.join(rdd2)
.map { case (_, (_, x)) => x } // Drop keys and dummy values
如果映射RDD很小,它可以更快broadcast
和map
:
val bd = sc.broadcast(rdd2.collectAsMap)
// This assumes all values are present. If not use get / getOrElse
// or map withDefault
rdd1.map(bd.value)
它还将保留元素的顺序.
如果join
您可以添加增加的标识符(zipWithIndex
/ zipWithUniqueId
)以便能够恢复初始订购,但它实际上更昂贵.