我知道我们可以在pyspark中使用Window函数来计算累积和.但是Window仅在HiveContext中支持,而不在SQLContext中支持.我需要使用SQLContext,因为HiveContext无法在多个进程中运行.
有没有有效的方法来使用SQLContext计算累积和?一种简单的方法是将数据加载到驱动程序的内存中并使用numpy.cumsum,但con是需要能够装入内存的数据
不确定这是否是您正在寻找的,但这里有两个示例如何使用sqlContext来计算累积总和:
首先,当您想按某些类别对其进行分区时:
from pyspark.sql.types import StructType, StringType, LongType from pyspark.sql import SQLContext rdd = sc.parallelize([ ("Tablet", 6500), ("Tablet", 5500), ("Cell Phone", 6000), ("Cell Phone", 6500), ("Cell Phone", 5500) ]) schema = StructType([ StructField("category", StringType(), False), StructField("revenue", LongType(), False) ]) df = sqlContext.createDataFrame(rdd, schema) df.registerTempTable("test_table") df2 = sqlContext.sql(""" SELECT category, revenue, sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum FROM test_table """)
输出:
[Row(category='Tablet', revenue=5500, cumsum=5500), Row(category='Tablet', revenue=6500, cumsum=12000), Row(category='Cell Phone', revenue=5500, cumsum=5500), Row(category='Cell Phone', revenue=6000, cumsum=11500), Row(category='Cell Phone', revenue=6500, cumsum=18000)]
第二,当你只想取一个变量的cumsum时.将df2更改为:
df2 = sqlContext.sql(""" SELECT category, revenue, sum(revenue) OVER (ORDER BY revenue, category) as cumsum FROM test_table """)
输出:
[Row(category='Cell Phone', revenue=5500, cumsum=5500), Row(category='Tablet', revenue=5500, cumsum=11000), Row(category='Cell Phone', revenue=6000, cumsum=17000), Row(category='Cell Phone', revenue=6500, cumsum=23500), Row(category='Tablet', revenue=6500, cumsum=30000)]
希望这可以帮助.收集数据后使用np.cumsum效率不高,尤其是在数据集很大的情况下.您可以探索的另一种方法是使用简单的RDD转换,例如groupByKey(),然后使用map通过某个键计算每个组的累积总和,然后在最后减少它.
这是一个简单的示例:
import pyspark from pyspark.sql import window import pyspark.sql.functions as sf sc = pyspark.SparkContext(appName="test") sqlcontext = pyspark.SQLContext(sc) data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20), ("Cam", "F", "Cambridge", 1, 25), ("Lin", "F", "Cambridge", 1, 25), ("Cat", "M", "Boston", 1, 20), ("Sara", "F", "Cambridge", 1, 15), ("Jeff", "M", "Cambridge", 1, 25), ("Bean", "M", "Cambridge", 1, 26), ("Dave", "M", "Cambridge", 1, 21),], ["name", 'gender', "city", 'donation', "age"]) data.show()
提供输出
+----+------+---------+--------+---+ |name|gender| city|donation|age| +----+------+---------+--------+---+ | Bob| M| Boston| 1| 20| | Cam| F|Cambridge| 1| 25| | Lin| F|Cambridge| 1| 25| | Cat| M| Boston| 1| 20| |Sara| F|Cambridge| 1| 15| |Jeff| M|Cambridge| 1| 25| |Bean| M|Cambridge| 1| 26| |Dave| M|Cambridge| 1| 21| +----+------+---------+--------+---+
定义一个窗口
win_spec = (window.Window .partitionBy(['gender', 'city']) .rowsBetween(window.Window.unboundedPreceding, 0))
#window.Window.unboundedPreceding-组的第一行#.rowsBetween(...,0)- 0
引用当前行,如果-2
指定则在当前行之前最多2行
现在,这是一个陷阱:
temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))
出现错误:
TypeErrorTraceback (most recent call last)in () ----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec)) /Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self) 238 239 def __iter__(self): --> 240 raise TypeError("Column is not iterable") 241 242 # string methods TypeError: Column is not iterable
这是由于使用python的sum
函数而不是pyspark's
。解决此问题的方法是使用sum
来自的功能pyspark.sql.functions.sum
:
temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec)) temp.show()
会给:
+----+------+---------+--------+---+--------------+ |name|gender| city|donation|age|CumSumDonation| +----+------+---------+--------+---+--------------+ |Sara| F|Cambridge| 1| 15| 1| | Cam| F|Cambridge| 1| 25| 2| | Lin| F|Cambridge| 1| 25| 3| | Bob| M| Boston| 1| 20| 1| | Cat| M| Boston| 1| 20| 2| |Dave| M|Cambridge| 1| 21| 1| |Jeff| M|Cambridge| 1| 25| 2| |Bean| M|Cambridge| 1| 26| 3| +----+------+---------+--------+---+--------------+