在python中使用pyspark读写Hive数据操作

2020-10-06 0 252

1、读Hive表数据

pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可以直接使用SQL语句从hive里面查询需要的数据,代码如下:

from pyspark.sql import HiveContext,SparkSession
 
_SPARK_HOST = \"spark://spark-master:7077\"
_APP_NAME = \"test\"
spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()
 
hive_context= HiveContext(spark_session )
 
# 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = \"database1\"
hive_table = \"test\"
hive_read = \"select * from {}.{}\".format(hive_database, hive_table)
 
# 通过SQL语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)

2 、将数据写入hive表

pyspark写hive表有两种方式:

(1)通过SQL语句生成表

from pyspark.sql import SparkSession, HiveContext
 
_SPARK_HOST = \"spark://spark-master:7077\"
_APP_NAME = \"test\"
 
spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()
 
data = [
 (1,\"3\",\"145\"),
 (1,\"4\",\"146\"),
 (1,\"5\",\"25\"),
 (1,\"6\",\"26\"),
 (2,\"32\",\"32\"),
 (2,\"8\",\"134\"),
 (2,\"8\",\"134\"),
 (2,\"9\",\"137\")
]
df = spark.createDataFrame(data, [\'id\', \"test_id\", \'camera_id\'])
 
# method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable(\'test_hive\')
sqlContext.sql(\"create table default.write_test select * from test_hive\")

(2)saveastable的方式

# method two
 
# \"overwrite\"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
# mode(\"append\")是在原有表的基础上进行添加数据
df.write.format(\"hive\").mode(\"overwrite\").saveAsTable(\'default.write_test\')
 

tips:

spark用上面几种方式读写hive时,需要在提交任务时加上相应的配置,不然会报错:

spark-submit –conf spark.sql.catalogImplementation=hive test.py

补充知识:PySpark基于SHC框架读取HBase数据并转成DataFrame

一、首先需要将HBase目录lib下的jar包以及SHC的jar包复制到所有节点的Spark目录lib下

二、修改spark-defaults.conf 在spark.driver.extraClassPath和spark.executor.extraClassPath把上述jar包所在路径加进去

三、重启集群

四、代码

#/usr/bin/python
#-*- coding:utf-8 ?*-
 
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.types import Row,StringType,StructField,StringType,IntegerType
from pyspark.sql.dataframe import DataFrame
 
sc = SparkContext(appName=\"pyspark_hbase\")
sql_sc = SQLContext(sc)
 
dep = \"org.apache.spark.sql.execution.datasources.hbase\"
#定义schema
catalog = \"\"\"{
       \"table\":{\"namespace\":\"default\", \"name\":\"teacher\"},
       \"rowkey\":\"key\",
       \"columns\":{
            \"id\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},
            \"name\":{\"cf\":\"teacherInfo\", \"col\":\"name\", \"type\":\"string\"},
            \"age\":{\"cf\":\"teacherInfo\", \"col\":\"age\", \"type\":\"string\"},
            \"gender\":{\"cf\":\"teacherInfo\", \"col\":\"gender\",\"type\":\"string\"},
            \"cat\":{\"cf\":\"teacherInfo\", \"col\":\"cat\",\"type\":\"string\"},
            \"tag\":{\"cf\":\"teacherInfo\", \"col\":\"tag\", \"type\":\"string\"},
            \"level\":{\"cf\":\"teacherInfo\", \"col\":\"level\",\"type\":\"string\"} }
      }\"\"\"
 
df = sql_sc.read.options(catalog = catalog).format(dep).load()
 
print (\'***************************************************************\')
print (\'***************************************************************\')
print (\'***************************************************************\')
df.show()
print (\'***************************************************************\')
print (\'***************************************************************\')
print (\'***************************************************************\')
sc.stop()

五、解释

数据来源参考请本人之前的文章,在此不做赘述

schema定义参考如图:

在python中使用pyspark读写Hive数据操作

六、结果

在python中使用pyspark读写Hive数据操作

以上这篇在python中使用pyspark读写Hive数据操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持自学编程网。

遇见资源网 Python 在python中使用pyspark读写Hive数据操作 http://www.ox520.com/25962.html

常见问题

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务