我使用javaHiveContext
Spark 执行连接.
大表是1,76Gb,有1亿记录.
第二个表是273Mb,有1000万条记录.
我得到一个JavaSchemaRDD
,我打电话count()
给它:
String query="select attribute7,count(*) from ft,dt where ft.chiavedt=dt.chiavedt group by attribute7"; JavaSchemaRDD rdd=sqlContext.sql(query); System.out.println("count="+rdd.count());
如果我强制a broadcastHashJoin (SET spark.sql.autoBroadcastJoinThreshold=290000000)
并在5节点上使用5个执行器,其中包含8个核心和20Gb内存,则会在100秒内执行.如果我不强制广播,它将在30秒内执行.
NB表格存储为Parquet文件.
很可能问题的根源是广播费用.为简单起见,假设您在较大的RDD中有1800MB,在较小的RDD中有300MB.假设有5个执行器且没有先前的分区,则所有数据中的五分之一应该已经在正确的机器上.在标准连接的情况下,它可以提供~1700MB的洗牌.
对于广播加入,必须将较小的RDD转移到所有节点.这意味着要传输大约1500MB的数据.如果您使用驱动程序添加所需的通信,则意味着您必须以更昂贵的方式移动相当数量的数据.必须首先收集广播的数据,然后才能将其转发给所有工作人员.