当前位置: 移动技术网 > IT编程>脚本编程>Python > 在python中使用pyspark读写Hive数据操作

在python中使用pyspark读写Hive数据操作

2020年06月23日  | 移动技术网IT编程  | 我要评论

优优影视,败家少奶奶,厦门二手家具市场

1、读hive表数据

pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用sql语句从hive里面查询需要的数据,代码如下:

from pyspark.sql import hivecontext,sparksession
 
_spark_host = "spark://spark-master:7077"
_app_name = "test"
spark_session = sparksession.builder.master(_spark_host).appname(_app_name).getorcreate()
 
hive_context= hivecontext(spark_session )
 
# 生成查询的sql语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}".format(hive_database, hive_table)
 
# 通过sql语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)

2 、将数据写入hive表

pyspark写hive表有两种方式:

(1)通过sql语句生成表

from pyspark.sql import sparksession, hivecontext
 
_spark_host = "spark://spark-master:7077"
_app_name = "test"
 
spark = sparksession.builder.master(_spark_host).appname(_app_name).getorcreate()
 
data = [
 (1,"3","145"),
 (1,"4","146"),
 (1,"5","25"),
 (1,"6","26"),
 (2,"32","32"),
 (2,"8","134"),
 (2,"8","134"),
 (2,"9","137")
]
df = spark.createdataframe(data, ['id', "test_id", 'camera_id'])
 
# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registertemptable('test_hive')
sqlcontext.sql("create table default.write_test select * from test_hive")

(2)saveastable的方式

# method two
 
# "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
# mode("append")是在原有表的基础上进行添加数据
df.write.format("hive").mode("overwrite").saveastable('default.write_test')
 

tips:

spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:

spark-submit --conf spark.sql.catalogimplementation=hive test.py

补充知识:pyspark基于shc框架读取hbase数据并转成dataframe

一、首先需要将hbase目录lib下的jar包以及shc的jar包复制到所有节点的spark目录lib下

二、修改spark-defaults.conf 在spark.driver.extraclasspath和spark.executor.extraclasspath把上述jar包所在路径加进去

三、重启集群

四、代码

#/usr/bin/python
#-*- coding:utf-8 –*-
 
from pyspark import sparkcontext
from pyspark.sql import sqlcontext,hivecontext,sparksession
from pyspark.sql.types import row,stringtype,structfield,stringtype,integertype
from pyspark.sql.dataframe import dataframe
 
sc = sparkcontext(appname="pyspark_hbase")
sql_sc = sqlcontext(sc)
 
dep = "org.apache.spark.sql.execution.datasources.hbase"
#定义schema
catalog = """{
       "table":{"namespace":"default", "name":"teacher"},
       "rowkey":"key",
       "columns":{
            "id":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"teacherinfo", "col":"name", "type":"string"},
            "age":{"cf":"teacherinfo", "col":"age", "type":"string"},
            "gender":{"cf":"teacherinfo", "col":"gender","type":"string"},
            "cat":{"cf":"teacherinfo", "col":"cat","type":"string"},
            "tag":{"cf":"teacherinfo", "col":"tag", "type":"string"},
            "level":{"cf":"teacherinfo", "col":"level","type":"string"} }
      }"""
 
df = sql_sc.read.options(catalog = catalog).format(dep).load()
 
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
df.show()
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
sc.stop()

五、解释

数据来源参考请本人之前的文章,在此不做赘述

schema定义参考如图:

六、结果

以上这篇在python中使用pyspark读写hive数据操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持移动技术网。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网