当前位置:  开发笔记 > 编程语言 > 正文

PySpark和广播连接示例

如何解决《PySpark和广播连接示例》经验,为你挑选了1个好方法。

我正在使用Spark 1.3

# Read from text file, parse it and then do some basic filtering to get   data1
data1.registerTempTable('data1')

# Read from text file, parse it and then do some basic filtering to get data1
data2.registerTempTable('data2')

# Perform join
data_joined = data1.join(data2, data1.id == data2.id);

我的数据非常偏斜,data2(几KB)<< data1(GB的10s),性能非常糟糕.我正在阅读有关广播加入的内容,但不确定如何使用Python API执行相同操作.



1> zero323..:

Spark 1.3不支持使用DataFrame进行广播连接.在Spark> = 1.5.0中,您可以使用broadcast函数来应用广播连接:

from pyspark.sql.functions import broadcast

data1.join(broadcast(data2), data1.id == data2.id)

对于旧版本,唯一的选择是转换为RDD并应用与其他语言相同的逻辑.大概是这样的:

from pyspark.sql import Row
from pyspark.sql.types import StructType

# Create a dictionary where keys are join keys
# and values are lists of rows
data2_bd = sc.broadcast(
    data2.map(lambda r: (r.id, r)).groupByKey().collectAsMap())


# Define a new row with fields from both DFs
output_row = Row(*data1.columns + data2.columns)

# And an output schema
output_schema = StructType(data1.schema.fields + data2.schema.fields)

# Given row x, extract a list of corresponding rows from broadcast
# and output a list of merged rows
def gen_rows(x):
    return [output_row(*x + y) for y in data2_bd.value.get(x.id, [])]

# flatMap and create a new data frame
joined = data1.rdd.flatMap(lambda row: gen_rows(row)).toDF(output_schema)

推荐阅读
依然-狠幸福
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有