当前位置:  开发笔记 > 大数据 > 正文

在pyspark.ml中的多个功能上运行的变压器

如何解决《在pyspark.ml中的多个功能上运行的变压器》经验,为你挑选了1个好方法。

我想在中创建自己的功能转换器DataFrame,以便添加一列,例如,这是其他两列之间的差异。我遵循了这个问题,但是那里的变压器只能在一个列上运行。pyspark.ml.Transformer以字符串作为参数inputCol,因此我当然不能指定多个列。

因此,基本上,我想要实现的是一种_transform()类似于该方法的方法:

def _transform(self, dataset):
    out_col = self.getOutputCol()
    in_col = dataset.select([self.getInputCol()])

    # Define transformer logic
    def f(col1, col2):
        return col1 - col2
    t = IntegerType()

    return dataset.withColumn(out_col, udf(f, t)(in_col))

这怎么可能呢?



1> aliva7..:

通过首先创建Vector要操作的一组功能中的一个,然后将转换应用于新生成的矢量功能,我设法解决了该问题。下面是一个示例代码,说明如何制作不同于其他两个功能的新功能:

class MeasurementDifferenceTransformer(Transformer, HasInputCol, HasOutputCol):  

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(MeasurementDifferenceTransformer, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]

        # Define transformer logic
        def f(vector):
            return float(vector[0] - vector[1])
        t = FloatType()

        return dataset.withColumn(out_col, udf(lambda x: f(x), t)(in_col))

要使用它,我们首先实例化一个VectorAssembler以创建向量功能:

pair_assembler = VectorAssembler(inputCols=["col1", "col2"], outputCol="cols_vector")

然后我们实例化转换器:

pair_transformer = MeasurementDifferenceTransformer(inputCol="cols_vector", outputCol="col1_minus_col2")

最后,我们转换数据:

pairfeats = pair_assembler.transform(df)
difffeats = pait_transformer.transform(pairfeats)

推荐阅读
黄晓敏3023
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有