我想在中创建自己的功能转换器DataFrame
,以便添加一列,例如,这是其他两列之间的差异。我遵循了这个问题,但是那里的变压器只能在一个列上运行。pyspark.ml.Transformer
以字符串作为参数inputCol
,因此我当然不能指定多个列。
因此,基本上,我想要实现的是一种_transform()
类似于该方法的方法:
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset.select([self.getInputCol()])
# Define transformer logic
def f(col1, col2):
return col1 - col2
t = IntegerType()
return dataset.withColumn(out_col, udf(f, t)(in_col))
这怎么可能呢?
通过首先创建Vector
要操作的一组功能中的一个,然后将转换应用于新生成的矢量功能,我设法解决了该问题。下面是一个示例代码,说明如何制作不同于其他两个功能的新功能:
class MeasurementDifferenceTransformer(Transformer, HasInputCol, HasOutputCol): @keyword_only def __init__(self, inputCol=None, outputCol=None): super(MeasurementDifferenceTransformer, self).__init__() kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, inputCol=None, outputCol=None): kwargs = self.setParams._input_kwargs return self._set(**kwargs) def _transform(self, dataset): out_col = self.getOutputCol() in_col = dataset[self.getInputCol()] # Define transformer logic def f(vector): return float(vector[0] - vector[1]) t = FloatType() return dataset.withColumn(out_col, udf(lambda x: f(x), t)(in_col))
要使用它,我们首先实例化一个VectorAssembler
以创建向量功能:
pair_assembler = VectorAssembler(inputCols=["col1", "col2"], outputCol="cols_vector")
然后我们实例化转换器:
pair_transformer = MeasurementDifferenceTransformer(inputCol="cols_vector", outputCol="col1_minus_col2")
最后,我们转换数据:
pairfeats = pair_assembler.transform(df) difffeats = pait_transformer.transform(pairfeats)