1. spark sql是什么?
2. spark sql的特点
3. 为什么学习sparksql?
我们已经学习了hive,它是将hive sql转换成mapreduce然后提交到集群上执行,大大简化了编写mapreduce的程序的复杂性,由于mapreduce这种计算模型执行效率比较慢。所有spark sql的应运而生,它是将spark sql转换成rdd,然后提交到集群执行,执行效率非常快!
4. dataframe(数据框)
5. sparksql1.x的api编程
<dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-sql_2.11</artifactid> <version>${spark.version}</version> </dependency>
5.1 使用sqlcontext创建dataframe(测试用)
object ops3 { def main(args: array[string]): unit = { val conf = new sparkconf().setappname("ops3").setmaster("local[3]") val sc = new sparkcontext(conf) val sqlcontext = new sqlcontext(sc) val rdd1 = sc.parallelize(list(person("admin1", 14, "man"),person("admin2", 16, "man"),person("admin3", 18, "man"))) val df1: dataframe = sqlcontext.createdataframe(rdd1) df1.show(1) } } case class person(name: string, age: int, sex: string);
5.2 使用sqlcontxet中提供的隐式转换函数(测试用)
import org.apache.spark val conf = new sparkconf().setappname("ops3").setmaster("local[3]") val sc = new sparkcontext(conf) val sqlcontext = new sqlcontext(sc) val rdd1 = sc.parallelize(list(person("admin1", 14, "man"), person("admin2", 16, "man"), person("admin3", 18, "man"))) import sqlcontext.implicits._ val df1: dataframe = rdd1.todf df1.show() 5.3 使用sqlcontext创建dataframe(常用) val conf = new sparkconf().setappname("ops3").setmaster("local[3]") val sc = new sparkcontext(conf) val sqlcontext = new sqlcontext(sc) val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/") val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val rowrdd: rdd[row] = linesrdd.map(line => { val linesplit: array[string] = line.split(",") row(linesplit(0), linesplit(1).toint, linesplit(2)) }) val rowdf: dataframe = sqlcontext.createdataframe(rowrdd, schema) rowdf.show()
6. 使用新版本的2.x的api
val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate() val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val df: dataframe = sparksession.createdataframe(rowrdd, schema) df.createorreplacetempview("p1") val df2 = sparksession.sql("select * from p1") df2.show()
7. 操作sparksql的方式
7.1 使用sql语句的方式对dataframe进行操作
val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate()//spark2.x新的api相当于spark1.x的sqlcontext val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val df: dataframe = sparksession.createdataframe(rowrdd, schema) df.createorreplacetempview("p1")//这是sprk2.x新的api 相当于spark1.x的registtemptable() val df2 = sparksession.sql("select * from p1") df2.show()
7.2 使用dsl语句的方式对dataframe进行操作
dsl(domain specific language ) 特定领域语言 val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate() val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest/") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema) import sparksession.implicits._ val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc) df.show()
8. sparksql的输出
8.1 写出到json文件
val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate() val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema) import sparksession.implicits._ val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc) df.write.json("hdfs://uplooking02:8020/sparktest1")
8.2 写出到关系型数据库(mysql)
val conf = new sparkconf().setappname("ops5") setmaster ("local[3]") val sparksession: sparksession = sparksession.builder().config(conf).getorcreate() val sc = sparksession.sparkcontext val linesrdd: rdd[string] = sc.textfile("hdfs://uplooking02:8020/sparktest") //数据清洗 val rowrdd: rdd[row] = linesrdd.map(line => { val splits: array[string] = line.split(",") row(splits(0), splits(1).toint, splits(2)) }) val schema = structtype(list(structfield("name", stringtype), structfield("age", integertype), structfield("sex", stringtype))) val rowdf: dataframe = sparksession.createdataframe(rowrdd, schema) import sparksession.implicits._ val df: dataframe = rowdf.select("name", "age").where("age>10").orderby($"age".desc) val url = "jdbc:mysql://localhost:3306/test" //表会自动创建 val tbname = "person1"; val prop = new properties() prop.put("user", "root") prop.put("password", "root") //savemode 默认为errorifexists df.write.mode(savemode.append).jdbc(url, tbname, prop)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。
如对本文有疑问, 点击进行留言回复!!
[杭电多校2020]第一场 1004 Distinct Sub-palindromes
Swift -- 将本地生成的UIImage进行持久化保存(存到文件中fileManager.createFile)
网友评论