据我所知,distinct()散列分区RDD以识别唯一键.但它是否优化了每个分区只移动不同的元组?
想象一下具有以下分区的RDD
[1,2,2,1,4,2,2]
[1,3,3,5,4,5,5,5]
在这个RDD的一个独特的地方,所有重复的密钥(分区1中的2s和分区2中的5s)是否会被混洗到它们的目标分区,或者只有每个分区的不同密钥被洗牌到目标?
如果所有键都被洗牌,那么带有set()操作的aggregate()将减少shuffle.
def set_update(u, v): u.add(v) return u rdd.aggregate(set(), set_update, lambda u1,u2: u1|u2)
zero323.. 6
unique
通过实现 reduceByKey
在(element, None)
对.因此,它每个分区只会刷新唯一值.如果重复数量很少,那么仍然是相当昂贵的操作.
有些情况下使用set
可能很有用.特别是如果你打电话distinct
,PairwseRDD
你可能更喜欢aggregateByKey
/ combineByKey
同时通过键同时实现重复数据删除和分区.特别考虑以下代码:
rdd1 = sc.parallelize([("foo", 1), ("foo", 1), ("bar", 1)]) rdd2 = sc.parallelize([("foo", "x"), ("bar", "y")]) rdd1.distinct().join(rdd2)
它必须洗牌rdd1
两次 - 一次换distinct
一次换一次join
.相反,你可以使用combineByKey
:
def flatten(kvs): (key, (left, right)) = kvs for v in left: yield (key, (v, right)) aggregated = (rdd1 .aggregateByKey(set(), set_update, lambda u1, u2: u1 | u2)) rdd2_partitioned = rdd2.partitionBy(aggregated.getNumPartitions()) (aggregated.join(rdd2_partitioned) .flatMap(flatten))
注意:
join
Scala中的逻辑与Python中的稍微不同(PySpark使用union
后跟groupByKey
,参见Spark RDD groupByKey +加入vs连接性能的Python和Scala DAG),因此我们必须RDD
在调用join之前手动分区第二个.
unique
通过实现 reduceByKey
在(element, None)
对.因此,它每个分区只会刷新唯一值.如果重复数量很少,那么仍然是相当昂贵的操作.
有些情况下使用set
可能很有用.特别是如果你打电话distinct
,PairwseRDD
你可能更喜欢aggregateByKey
/ combineByKey
同时通过键同时实现重复数据删除和分区.特别考虑以下代码:
rdd1 = sc.parallelize([("foo", 1), ("foo", 1), ("bar", 1)]) rdd2 = sc.parallelize([("foo", "x"), ("bar", "y")]) rdd1.distinct().join(rdd2)
它必须洗牌rdd1
两次 - 一次换distinct
一次换一次join
.相反,你可以使用combineByKey
:
def flatten(kvs): (key, (left, right)) = kvs for v in left: yield (key, (v, right)) aggregated = (rdd1 .aggregateByKey(set(), set_update, lambda u1, u2: u1 | u2)) rdd2_partitioned = rdd2.partitionBy(aggregated.getNumPartitions()) (aggregated.join(rdd2_partitioned) .flatMap(flatten))
注意:
join
Scala中的逻辑与Python中的稍微不同(PySpark使用union
后跟groupByKey
,参见Spark RDD groupByKey +加入vs连接性能的Python和Scala DAG),因此我们必须RDD
在调用join之前手动分区第二个.