当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Spark SQL,如何将 DataFrame 转为 json 格式

Spark SQL,如何将 DataFrame 转为 json 格式

2018年12月07日  | 移动技术网IT编程  | 我要评论

今天主要介绍一下如何将 spark dataframe 的数据转成 json 数据。用到的是 scala 提供的 json 处理的 api。

用过 spark sql 应该知道,spark dataframe 本身有提供一个 api 可以供我们将数据转成一个 jsonarray,我们可以在 spark-shell 里头举个栗子来看一下。

import org.apache.spark.sql.sparksession
val spark = sparksession.builder().master("master").appname("test").config("spark.sql.warehouse.dir", warehouselocation).enablehivesupport().getorcreate();
//提供隐式转换功能,比如将 rdd 转为 dataframe
import spark.implicits._

val df:dataframe = sc.parallelize(array(("abc",2),("efg",4))).todf()
df.show()
/*-------------show -----------
+---+---+
| _1| _2|
+---+---+
|abc|  2|
|efg|  4|
+---+---+
*/

//这里使用 dataframe api 转换成 jsonarray
val jsonstr:string = a.tojson.collectaslist.tostring
/*--------------- json string-------------
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
*/

可以发现,我们可以使用 dataframe 提供的 api 直接将 dataframe 转换成 jsonarray 的形式,但这样子却有些冗余。以上面的例子来说,很多时候我要的不是这样的形式。

[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]

而是下面这种形式。

[{"abc":2}, {"efg":4}]

这才是我们通常会使用到的 json 格式。以 dataframe 的 api 转换而成的 json 明显太过冗余。为此,我们需要借助一些 json 处理的包,本着能懒则懒的原则,直接使用 scala 提供的 json 处理包。

import org.apache.spark.sql.{dataframe, sparksession}
import org.apache.spark.sql.sparksession
val spark = sparksession.builder().master("master").appname("test").config("spark.sql.warehouse.dir", warehouselocation).enablehivesupport().getorcreate();
//提供隐式转换功能,比如将 rdd 转为 dataframe
import spark.implicits._

val df:dataframe = sc.parallelize(array(("abc",2),("efg",4))).todf()
df.show()
/*-------------show -----------
+---+---+
| _1| _2|
+---+---+
|abc|  2|
|efg|  4|
+---+---+
*/

//接下来不一样了
val df2array:array[tuple2[string,int]] = df.collect().map{case org.apache.spark.sql.row(x:string,y:int) => (x,y)}

val jsondata:array[jsonobject] = am.map{ i =>
  new jsonobject(map(i._1 -> i._2))
}

val jsonarray:jsonarray = new jsonarray(jsondata.tolist)
/*-----------jsonarray------------
[{"abc" : 2}, {"efg" : 4}]
*/

大概说明一下上述的代码,首先我们要先将 df 变量进行 collect 操作,将它转换成 array ,但是要生成 jsonobject 得是 array[tuple2[t,t]] 的格式,所以我们需要再进一步转换成对应格式。这里的 map 是函数式编程里面的 map 。

然后也是用 map 操作生成 array[jsonobject],最后再转换成 jsonarray 就可以。

将数据转换成 json 的格式通常不能太大,一般用在 spark 跑出数据结果后写入到其他数据库的时候会用到,比如 mysql 。

以上~~


欢迎关注公众号哈尔的数据城堡,里面有数据,代码,以及深度的思考。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网