我想知道如何在Spark(Pyspark)中实现以下功能
初始数据帧:
+--+---+ |id|num| +--+---+ |4 |9.0| +--+---+ |3 |7.0| +--+---+ |2 |3.0| +--+---+ |1 |5.0| +--+---+
结果数据帧:
+--+---+-------+ |id|num|new_Col| +--+---+-------+ |4 |9.0| 7.0 | +--+---+-------+ |3 |7.0| 3.0 | +--+---+-------+ |2 |3.0| 5.0 | +--+---+-------+
我设法通过以下方式将新列"附加"到数据框中:
df.withColumn("new_Col", df.num * 10)
但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.
任何帮助,将不胜感激.
您可以lag
按如下方式使用窗口功能
from pyspark.sql.functions import lag, col from pyspark.sql.window import Window df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"]) w = Window().partitionBy().orderBy(col("id")) df.select("*", lag("num").over(w).alias("new_col")).na.drop().show() ## +---+---+-------+ ## | id|num|new_col| ## +---+---+-------| ## | 2|3.0| 5.0| ## | 3|7.0| 3.0| ## | 4|9.0| 7.0| ## +---+---+-------+
但是有一些重要的问题:
如果你需要一个全局操作(没有被其他一些列/列分区),那么效率非常低.
您需要一种自然的方式来订购数据.
虽然第二个问题几乎从来都不是问题,但第一个问题可能是一个交易破坏者.如果是这种情况,您应该简单地将您转换DataFrame
为RDD并lag
手动计算.参见例如:
如何在Pyspark中使用滑动窗口对时间序列数据进行数据转换
Apache Spark Moving Average(用Scala编写,但可以针对PySpark进行调整.请务必先阅读注释).
其他有用的链接:
https://github.com/UrbanInstitute/pyspark-tutorials/blob/master/05_moving-average-imputation.ipynb
Spark窗口函数 - rangeBetween日期