当前位置:  开发笔记 > 编程语言 > 正文

如何使用sqlContext计算累积和

如何解决《如何使用sqlContext计算累积和》经验,为你挑选了2个好方法。

我知道我们可以在pyspark中使用Window函数来计算累积和.但是Window仅在HiveContext中支持,而不在SQLContext中支持.我需要使用SQLContext,因为HiveContext无法在多个进程中运行.

有没有有效的方法来使用SQLContext计算累积和?一种简单的方法是将数据加载到驱动程序的内存中并使用numpy.cumsum,但con是需要能够装入内存的数据



1> Dat Tran..:

不确定这是否是您正在寻找的,但这里有两个示例如何使用sqlContext来计算累积总和:

首先,当您想按某些类别对其进行分区时:

from pyspark.sql.types import StructType, StringType, LongType
from pyspark.sql import SQLContext

rdd = sc.parallelize([
    ("Tablet", 6500), 
    ("Tablet", 5500), 
    ("Cell Phone", 6000), 
    ("Cell Phone", 6500), 
    ("Cell Phone", 5500)
    ])

schema = StructType([
    StructField("category", StringType(), False),
    StructField("revenue", LongType(), False)
    ])

df = sqlContext.createDataFrame(rdd, schema)

df.registerTempTable("test_table")

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (PARTITION BY category ORDER BY revenue) as cumsum
FROM
test_table
""")

输出:

[Row(category='Tablet', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=6500, cumsum=12000),
 Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Cell Phone', revenue=6000, cumsum=11500),
 Row(category='Cell Phone', revenue=6500, cumsum=18000)]

第二,当你只想取一个变量的cumsum时.将df2更改为:

df2 = sqlContext.sql("""
SELECT
    category,
    revenue,
    sum(revenue) OVER (ORDER BY revenue, category) as cumsum
FROM
test_table
""")

输出:

[Row(category='Cell Phone', revenue=5500, cumsum=5500),
 Row(category='Tablet', revenue=5500, cumsum=11000),
 Row(category='Cell Phone', revenue=6000, cumsum=17000),
 Row(category='Cell Phone', revenue=6500, cumsum=23500),
 Row(category='Tablet', revenue=6500, cumsum=30000)]

希望这可以帮助.收集数据后使用np.cumsum效率不高,尤其是在数据集很大的情况下.您可以探索的另一种方法是使用简单的RDD转换,例如groupByKey(),然后使用map通过某个键计算每个组的累积总和,然后在最后减少它.



2> muon..:

这是一个简单的示例:

import pyspark
from pyspark.sql import window
import pyspark.sql.functions as sf


sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)

data = sqlcontext.createDataFrame([("Bob", "M", "Boston", 1, 20),
                                   ("Cam", "F", "Cambridge", 1, 25),
                                  ("Lin", "F", "Cambridge", 1, 25),
                                  ("Cat", "M", "Boston", 1, 20),
                                  ("Sara", "F", "Cambridge", 1, 15),
                                  ("Jeff", "M", "Cambridge", 1, 25),
                                  ("Bean", "M", "Cambridge", 1, 26),
                                  ("Dave", "M", "Cambridge", 1, 21),], 
                                 ["name", 'gender', "city", 'donation', "age"])


data.show()

提供输出

+----+------+---------+--------+---+
|name|gender|     city|donation|age|
+----+------+---------+--------+---+
| Bob|     M|   Boston|       1| 20|
| Cam|     F|Cambridge|       1| 25|
| Lin|     F|Cambridge|       1| 25|
| Cat|     M|   Boston|       1| 20|
|Sara|     F|Cambridge|       1| 15|
|Jeff|     M|Cambridge|       1| 25|
|Bean|     M|Cambridge|       1| 26|
|Dave|     M|Cambridge|       1| 21|
+----+------+---------+--------+---+

定义一个窗口

win_spec = (window.Window
                  .partitionBy(['gender', 'city'])
                  .rowsBetween(window.Window.unboundedPreceding, 0))

#window.Window.unboundedPreceding-组的第一行#.rowsBetween(...,0)- 0引用当前行,如果-2指定则在当前行之前最多2行

现在,这是一个陷阱:

temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

出现错误:

TypeErrorTraceback (most recent call last)
 in ()
----> 1 temp = data.withColumn('cumsum',sum(data.donation).over(win_spec))

/Users/mupadhye/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/column.pyc in __iter__(self)
    238 
    239     def __iter__(self):
--> 240         raise TypeError("Column is not iterable")
    241 
    242     # string methods

TypeError: Column is not iterable

这是由于使用python的sum函数而不是pyspark's。解决此问题的方法是使用sum来自的功能pyspark.sql.functions.sum

temp = data.withColumn('AgeSum',sf.sum(data.donation).over(win_spec))
temp.show()

会给:

+----+------+---------+--------+---+--------------+
|name|gender|     city|donation|age|CumSumDonation|
+----+------+---------+--------+---+--------------+
|Sara|     F|Cambridge|       1| 15|             1|
| Cam|     F|Cambridge|       1| 25|             2|
| Lin|     F|Cambridge|       1| 25|             3|
| Bob|     M|   Boston|       1| 20|             1|
| Cat|     M|   Boston|       1| 20|             2|
|Dave|     M|Cambridge|       1| 21|             1|
|Jeff|     M|Cambridge|       1| 25|             2|
|Bean|     M|Cambridge|       1| 26|             3|
+----+------+---------+--------+---+--------------+


@Mike我已经更新了`win_spec`,感谢您指出
推荐阅读
惬听风吟jyy_802
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有