当前位置:  开发笔记 > 编程语言 > 正文

如何计算spark sqlContext中的数据类型为double的列的中位数

如何解决《如何计算sparksqlContext中的数据类型为double的列的中位数》经验,为你挑选了1个好方法。

我给了样本表.我想从每个组"源"列的"值"列中获取中值.其中source列的String DataType值列为double DataType

scala> sqlContext.sql("SELECT * from tTab order by source").show

+---------------+-----+                                                         
|         Source|value|
+---------------+-----+
|131.183.222.110|  1.0|
| 131.183.222.85|  1.0|
| 131.183.222.85|  0.0|
| 131.183.222.85|  0.5|
| 131.183.222.85|  1.0|
| 131.183.222.85|  1.0|
|   43.230.146.7|  0.0|
|   43.230.146.7|  1.0|
|   43.230.146.7|  1.0|
|   43.230.146.8|  1.0|
|   43.230.146.8|  1.0| 
+---------------+-----+

scala> tTab.printSchema

root
 |-- Source: string (nullable = true)
 |-- value: double (nullable = true)

预期答案:

+---------------+-----+
|         Source|value|
+---------------+-----+
|131.183.222.110|  1.0|
| 131.183.222.85|  1.0|
|   43.230.146.7|  1.0|
|   43.230.146.8|  1.0|
+---------------+-----+

如果"value"列为Int,则查询下方有效.由于"value"的数据类型为double,因此它给出了错误:

 sqlContext.sql("SELECT source , percentile(value,0.5) OVER (PARTITION BY source) AS Median from tTab ").show

错误:

org.apache.hadoop.hive.ql.exec.NoMatchingMethodException: No matching method for class org.apache.hadoop.hive.ql.udf.UDAFPercentile with (double, double). Possible choices: _FUNC_(bigint, array)  _FUNC_(bigint, double)  
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getMethodInternal(FunctionRegistry.java:1164)
    at org.apache.hadoop.hive.ql.exec.DefaultUDAFEvaluatorResolver.getEvaluatorClass(DefaultUDAFEvaluatorResolver.java:83)
    at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge.getEvaluator(GenericUDAFBridge.java:56)
    at org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver.getEvaluator(AbstractGenericUDAFResolver.java:47)
    at org.apache.spark.sql.hive.HiveWindowFunction.evaluator$lzycompute(hiveUDFs.scala:351)
    at org.apache.spark.sql.hive.HiveWindowFunction.evaluator(hiveUDFs.scala:349)
    at org.apache.spark.sql.hive.HiveWindowFunction.returnInspector$lzycompute(hiveUDFs.scala:357)
    at org.apache.spark.sql.hive.HiveWindowFunction.returnInspector(hiveUDFs.scala:356)
    at org.apache.spark.sql.hive.HiveWindowFunction.dataType(hiveUDFs.scala:362)
    at org.apache.spark.sql.catalyst.expressions.WindowExpression.dataType(windowExpressions.scala:313)
    at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:140)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35$$anonfun$apply$15.applyOrElse(Analyzer.scala:856)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35$$anonfun$apply$15.applyOrElse(Analyzer.scala:852)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35.apply(Analyzer.scala:852)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35.apply(Analyzer.scala:863)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ExtractWindowExpressions$$addWindow(Analyzer.scala:849)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$16.applyOrElse(Analyzer.scala:957)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$16.applyOrElse(Analyzer.scala:913)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.apply(Analyzer.scala:913)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.apply(Analyzer.scala:745)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916)
    at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
    at org.apache.spark.sql.DataFrame.(DataFrame.scala:132)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:20)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:27)
    at $iwC$$iwC$$iwC$$iwC$$iwC.(:29)
    at $iwC$$iwC$$iwC$$iwC.(:31)
    at $iwC$$iwC$$iwC.(:33)
    at $iwC$$iwC.(:35)
    at $iwC.(:37)
    at (:39)
    at .(:43)
    at .()
    at .(:7)
    at .()
    at $print()
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

非常感谢!



1> zero323..:

对于非整数值,您应该使用percentile_approxUDF:

import org.apache.spark.mllib.random.RandomRDDs

val df = RandomRDDs.normalRDD(sc, 1000, 10, 1).map(Tuple1(_)).toDF("x")
df.registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df").show

// +--------------------+
// |                 _c0|
// +--------------------+
// |0.035379710486199915|
// +--------------------+

在一个侧面不应该使用GROUP BYPARTITION BY.后者用于窗口功能,效果与预期不同.

SELECT source, percentile_approx(value, 0.5) FROM df GROUP BY source

另请参见如何使用Spark查找中值


您还可以执行以下操作:`SELECT source,percentile_approx(value,Array(0.25,0.5,0.75)FROM df GROUP BY source` for multiple百分位数.
推荐阅读
有风吹过best
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有