当前位置:  开发笔记 > 编程语言 > 正文

如何使用Java将一行数组平面映射到Apache spark中的多行?

如何解决《如何使用Java将一行数组平面映射到Apachespark中的多行?》经验,为你挑选了1个好方法。

我有一个json数据文件,其中包含一个属性,它是"tags"的字符串数组.Apache Spark DataFrame架构如下所示:

root
 |-- acceptedAnswerId: long (nullable = true)
 |-- answerCount: long (nullable = true)
 |-- body: string (nullable = true)
 |-- score: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- viewCount: long (nullable = true)

我想在Java中将每一行分成几行.我可以使用Scala找到类似的答案,但无法在Java中转换解决方案.有什么建议吗?

JSON中的"tags"属性如下所示:

"tags":["c#",".net","compression","decompression"]

ErhWen Kuo.. 6

为了使解决方案更具可视性,示例json数据如下所示:

{"id":4,"score":358,"viewCount":24247,"answerCount":13,"commentCount":1,"favoriteCount":28,"tags":["c#","winforms","type-conversion","opacity"]}

下面是将json数据作为"DataFrame"对象读取的Java代码段:

JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

String jsonData = "{\"id\":4,\"score\":358,\"viewCount\":24247,\"tags\":[\"c#\",\"winforms\",\"type-conversion\",\"opacity\"]}";    

List dataSet = Arrays.asList(jsonData);

JavaRDD distData = sc.parallelize(dataSet);

DataFrame stackoverflow_Posts = sqlContext.read().json(distData);

stackoverflow_Posts.printSchema(); //let's print out the DataFrame schema (Output#1)

stackoverflow_Posts.show(); //let's show the DataFrame content (Ouput#2)

架构:输出#1如下所示:

root
 |-- id: long (nullable = true)
 |-- score: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- viewCount: long (nullable = true)

数据:输出#2如下所示:

+---+-----+--------------------+---------+
| id|score|                tags|viewCount|
+---+-----+--------------------+---------+
|  4|  358|[c#, winforms, ty...|    24247|
+---+-----+--------------------+---------+

基于zero323的信息,我继续处理:

DataFrame expanded = stackoverflow_Posts.withColumn("tag", org.apache.spark.sql.functions.explode(stackoverflow_Posts.col("tags")));

expanded.printSchema(); //let's print out the DataFrame schema again (Output#3)

expanded.show(); //let's show the DataFrame content (Output#4)

架构:输出#3如下所示:

root
 |-- id: long (nullable = true)
 |-- score: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- viewCount: long (nullable = true)
 |-- tag: string (nullable = true)

爆炸后的数据:输出#4

+---+-----+--------------------+---------+---------------+
| id|score|                tags|viewCount|            tag|
+---+-----+--------------------+---------+---------------+
|  4|  358|[c#, winforms, ty...|    24247|             c#|
|  4|  358|[c#, winforms, ty...|    24247|       winforms|
|  4|  358|[c#, winforms, ty...|    24247|type-conversion|
|  4|  358|[c#, winforms, ty...|    24247|        opacity|
+---+-----+--------------------+---------+---------------+

结果看起来非常类似于使用SQL连接两个表.



1> ErhWen Kuo..:

为了使解决方案更具可视性,示例json数据如下所示:

{"id":4,"score":358,"viewCount":24247,"answerCount":13,"commentCount":1,"favoriteCount":28,"tags":["c#","winforms","type-conversion","opacity"]}

下面是将json数据作为"DataFrame"对象读取的Java代码段:

JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

String jsonData = "{\"id\":4,\"score\":358,\"viewCount\":24247,\"tags\":[\"c#\",\"winforms\",\"type-conversion\",\"opacity\"]}";    

List dataSet = Arrays.asList(jsonData);

JavaRDD distData = sc.parallelize(dataSet);

DataFrame stackoverflow_Posts = sqlContext.read().json(distData);

stackoverflow_Posts.printSchema(); //let's print out the DataFrame schema (Output#1)

stackoverflow_Posts.show(); //let's show the DataFrame content (Ouput#2)

架构:输出#1如下所示:

root
 |-- id: long (nullable = true)
 |-- score: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- viewCount: long (nullable = true)

数据:输出#2如下所示:

+---+-----+--------------------+---------+
| id|score|                tags|viewCount|
+---+-----+--------------------+---------+
|  4|  358|[c#, winforms, ty...|    24247|
+---+-----+--------------------+---------+

基于zero323的信息,我继续处理:

DataFrame expanded = stackoverflow_Posts.withColumn("tag", org.apache.spark.sql.functions.explode(stackoverflow_Posts.col("tags")));

expanded.printSchema(); //let's print out the DataFrame schema again (Output#3)

expanded.show(); //let's show the DataFrame content (Output#4)

架构:输出#3如下所示:

root
 |-- id: long (nullable = true)
 |-- score: long (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- viewCount: long (nullable = true)
 |-- tag: string (nullable = true)

爆炸后的数据:输出#4

+---+-----+--------------------+---------+---------------+
| id|score|                tags|viewCount|            tag|
+---+-----+--------------------+---------+---------------+
|  4|  358|[c#, winforms, ty...|    24247|             c#|
|  4|  358|[c#, winforms, ty...|    24247|       winforms|
|  4|  358|[c#, winforms, ty...|    24247|type-conversion|
|  4|  358|[c#, winforms, ty...|    24247|        opacity|
+---+-----+--------------------+---------+---------------+

结果看起来非常类似于使用SQL连接两个表.

推荐阅读
贴进你的心聆听你的世界
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有