我给了样本表.我想从每个组"源"列的"值"列中获取中值.其中source列的String DataType值列为double DataType
scala> sqlContext.sql("SELECT * from tTab order by source").show +---------------+-----+ | Source|value| +---------------+-----+ |131.183.222.110| 1.0| | 131.183.222.85| 1.0| | 131.183.222.85| 0.0| | 131.183.222.85| 0.5| | 131.183.222.85| 1.0| | 131.183.222.85| 1.0| | 43.230.146.7| 0.0| | 43.230.146.7| 1.0| | 43.230.146.7| 1.0| | 43.230.146.8| 1.0| | 43.230.146.8| 1.0| +---------------+-----+ scala> tTab.printSchema root |-- Source: string (nullable = true) |-- value: double (nullable = true)
预期答案:
+---------------+-----+ | Source|value| +---------------+-----+ |131.183.222.110| 1.0| | 131.183.222.85| 1.0| | 43.230.146.7| 1.0| | 43.230.146.8| 1.0| +---------------+-----+
如果"value"列为Int,则查询下方有效.由于"value"的数据类型为double,因此它给出了错误:
sqlContext.sql("SELECT source , percentile(value,0.5) OVER (PARTITION BY source) AS Median from tTab ").show
错误:
org.apache.hadoop.hive.ql.exec.NoMatchingMethodException: No matching method for class org.apache.hadoop.hive.ql.udf.UDAFPercentile with (double, double). Possible choices: _FUNC_(bigint, array) _FUNC_(bigint, double) at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getMethodInternal(FunctionRegistry.java:1164) at org.apache.hadoop.hive.ql.exec.DefaultUDAFEvaluatorResolver.getEvaluatorClass(DefaultUDAFEvaluatorResolver.java:83) at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge.getEvaluator(GenericUDAFBridge.java:56) at org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver.getEvaluator(AbstractGenericUDAFResolver.java:47) at org.apache.spark.sql.hive.HiveWindowFunction.evaluator$lzycompute(hiveUDFs.scala:351) at org.apache.spark.sql.hive.HiveWindowFunction.evaluator(hiveUDFs.scala:349) at org.apache.spark.sql.hive.HiveWindowFunction.returnInspector$lzycompute(hiveUDFs.scala:357) at org.apache.spark.sql.hive.HiveWindowFunction.returnInspector(hiveUDFs.scala:356) at org.apache.spark.sql.hive.HiveWindowFunction.dataType(hiveUDFs.scala:362) at org.apache.spark.sql.catalyst.expressions.WindowExpression.dataType(windowExpressions.scala:313) at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:140) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35$$anonfun$apply$15.applyOrElse(Analyzer.scala:856) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35$$anonfun$apply$15.applyOrElse(Analyzer.scala:852) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35.apply(Analyzer.scala:852) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35.apply(Analyzer.scala:863) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ExtractWindowExpressions$$addWindow(Analyzer.scala:849) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$16.applyOrElse(Analyzer.scala:957) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$16.applyOrElse(Analyzer.scala:913) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.apply(Analyzer.scala:913) at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.apply(Analyzer.scala:745) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916) at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914) at org.apache.spark.sql.DataFrame. (DataFrame.scala:132) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC. ( :20) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC. ( :25) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC. ( :27) at $iwC$$iwC$$iwC$$iwC$$iwC. ( :29) at $iwC$$iwC$$iwC$$iwC. ( :31) at $iwC$$iwC$$iwC. ( :33) at $iwC$$iwC. ( :35) at $iwC. ( :37) at ( :39) at . ( :43) at . ( ) at . ( :7) at . ( ) at $print( ) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
非常感谢!
对于非整数值,您应该使用percentile_approx
UDF:
import org.apache.spark.mllib.random.RandomRDDs
val df = RandomRDDs.normalRDD(sc, 1000, 10, 1).map(Tuple1(_)).toDF("x")
df.registerTempTable("df")
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df").show
// +--------------------+
// | _c0|
// +--------------------+
// |0.035379710486199915|
// +--------------------+
在一个侧面不应该使用GROUP BY
不PARTITION BY
.后者用于窗口功能,效果与预期不同.
SELECT source, percentile_approx(value, 0.5) FROM df GROUP BY source
另请参见如何使用Spark查找中值