当前位置:  开发笔记 > 编程语言 > 正文

Spark使用上一行的值向数据框添加新列

如何解决《Spark使用上一行的值向数据框添加新列》经验,为你挑选了1个好方法。

我想知道如何在Spark(Pyspark)中实现以下功能

初始数据帧:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+

结果数据帧:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+

我设法通过以下方式将新列"附加"到数据框中: df.withColumn("new_Col", df.num * 10)

但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.

任何帮助,将不胜感激.



1> zero323..:

您可以lag按如下方式使用窗口功能

from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()

## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## |  2|3.0|    5.0|
## |  3|7.0|    3.0|
## |  4|9.0|    7.0|
## +---+---+-------+

但是有一些重要的问题:

    如果你需要一个全局操作(没有被其他一些列/列分区),那么效率非常低.

    您需要一种自然的方式来订购数据.

虽然第二个问题几乎从来都不是问题,但第一个问题可能是一个交易破坏者.如果是这种情况,您应该简单地将您转换DataFrame为RDD并lag手动计算.参见例如:

如何在Pyspark中使用滑动窗口对时间序列数据进行数据转换

Apache Spark Moving Average(用Scala编写,但可以针对PySpark进行调整.请务必先阅读注释).

其他有用的链接:

https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/05_moving-average-imputation.ipynb

Spark窗口函数 - rangeBetween日期

推荐阅读
可爱的天使keven_464
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有