试图了解Spark的规范化算法.我的小测试集包含5个向量:
{0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0}, {1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 70000.0}, {-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 70000.0}, {-0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0}, {0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 70000.0},
我希望在每个向量要素被标准化的地方new Normalizer().transform(vectors)
创建JavaRDD
,其中(v-mean)/stdev
feature-0,`feature-1等
的所有值都被标准化.结果集合为:
[-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,-1.4285714276967932E-5,0.9999999993877552] [1.357142668768307E-5,2.571428214508371E-7,0.0,3.428570952677828E-4,3.428570952677828E-4,2.057142571606697E-4,0.9999998611976999] [-1.357142668768307E-5,2.571428214508371E-7,0.0,3.428570952677828E-4,3.428570952677828E-4,2.057142571606697E-4,0.9999998611976999] [1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,1.4285714276967932E-5,0.9999999993877552] [0.0,0.0,0.0,0.0,0.0,0.0,1.0]
请注意,所有原始值7000.0都会导致不同的"标准化"值.此外,如何,例如,1.357142668768307E-5
被当值的计算:.95
,1
,-1
,-.95
,0
?更重要的是,如果我删除一个功能,结果会有所不同.无法找到有关该问题的任何文档.
事实上,我的问题是,如何正确地规范化RDD中的所有向量?
你的期望是完全错误的.正如官方文档中明确指出的那样"将Normalizer
单个样本缩放为具有单位L p norm ",其中p的默认值为2.忽略数值精度问题:
import org.apache.spark.mllib.linalg.Vectors
val rdd = sc.parallelize(Seq(
Vectors.dense(0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0),
Vectors.dense(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 70000.0),
Vectors.dense(-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, 70000.0),
Vectors.dense(-0.95, 0.018, 0.0, 24.0, 24.0, 14.4, 70000.0),
Vectors.dense(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 70000.0)))
val transformed = normalizer.transform(rdd)
transformed.map(_.toArray.sum).collect
// Array[Double] = Array(1.0009051182149054, 1.000085713673417,
// 0.9999142851020933, 1.00087797536153, 1.0
MLLib
不提供你需要的功能,但可以使用StandardScaler
从ML
.
import org.apache.spark.ml.feature.StandardScaler
val df = rdd.map(Tuple1(_)).toDF("features")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(true)
val transformedDF = scaler.fit(df).transform(df)
transformedDF.select($"scaledFeatures")show(5, false)
// +--------------------------------------------------------------------------------------------------------------------------+
// |scaledFeatures |
// +--------------------------------------------------------------------------------------------------------------------------+
// |[0.9740388301169303,0.015272022105217588,0.0,1.0938637007095298,1.0938637007095298,1.0910691283447955,0.0] |
// |[1.0253040317020319,1.4038947727833362,1.414213562373095,-0.6532797101459693,-0.6532797101459693,-0.6010982697825494,0.0] |
// |[-1.0253040317020319,-1.4242574689236265,-1.414213562373095,-0.805205224133404,-0.805205224133404,-0.8536605680105113,0.0]|
// |[-0.9740388301169303,0.015272022105217588,0.0,1.0938637007095298,1.0938637007095298,1.0910691283447955,0.0] |
// |[0.0,-0.010181348070145075,0.0,-0.7292424671396867,-0.7292424671396867,-0.7273794188965303,0.0] |
// +--------------------------------------------------------------------------------------------------------------------------+