首页 > 数据库技术 > 详细

spark教程(十)-操作数据库

时间:2019-10-18 16:28:38      阅读:58      评论:0      收藏:0      [点我收藏+]

数据库也是 spark 数据源创建 df 的一种方式,因为比较重要,所以单独算一节。

本文以 postgres 为例

 

安装 JDBC

首先需要 安装 postgres 的客户端驱动,即 JDBC 驱动,这是官方下载地址,JDBC,根据数据库版本下载对应的驱动

上传至 spark 目录下的 jars 目录

技术分享图片

 并设置环境变量

export SPARK_CLASSPATH = /usr/lib/spark/jars

 

编程模板

如何操作数据库,不同的版本方法不同,网上的教程五花八门,往往尝试不成功。

其实我们可以看 spark 自带的样例, 路径为 /usr/lib/spark/examples/src/main/python/sql    【编码时,sparkSession 需要声明 spark jars 的驱动路径,代码调用 API JDBC To Other Databases

我从 datasource.py 中找到了基本的读写方法,其他自己可以看看

def jdbc_dataset_example(spark):
    # $example on:jdbc_dataset$
    # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    # Loading data from a JDBC source
    jdbcDF = spark.read         .format("jdbc")         .option("url", "jdbc:postgresql:dbserver")         .option("dbtable", "schema.tablename")         .option("user", "username")         .option("password", "password")         .load()

    jdbcDF2 = spark.read         .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
              properties={"user": "username", "password": "password"})

    # Specifying dataframe column data types on read
    jdbcDF3 = spark.read         .format("jdbc")         .option("url", "jdbc:postgresql:dbserver")         .option("dbtable", "schema.tablename")         .option("user", "username")         .option("password", "password")         .option("customSchema", "id DECIMAL(38, 0), name STRING")         .load()

# Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$

 

实战案例

仅供参考,请确保 spark 能连接上数据库

from pyspark.sql import SparkSession
import os

# 获取 环境变量 SPARK_CLASSPATH, 当然需要你事先设定了 该变量
# 如果没有设定 SPARK_CLASSPATH, 得到 后面的值 /usr/lib/spark/jars/*
sparkClassPath = os.getenv(SPARK_CLASSPATH, /usr/lib/spark/jars/*)

### 创建 sparkSession
# spark.driver.extraClassPath 设定了 jdbc 驱动的路径
spark = SparkSession     .builder     .appName("Python Spark SQL basic example")     .master("local")     .config("spark.driver.extraClassPath", sparkClassPath)     .getOrCreate()

### 连接数据库并读取表
# airDF 已经是个 DataFrame
airDF = spark.read     .format("jdbc")     .option("url", "jdbc:postgresql://172.16.89.80:5432/postgres")     .option("driver", "org.postgresql.Driver")     .option("dbtable", "road_point002")     .option("user", "postgres")     .option("password", "postgres")     .load()

### 打印schema
airDF.printSchema()     # df 的表结构,我们看到的就是 列名即格式等

### 只打印前20条 -- dsl 方式
airDF.select(id, road_number, speed_t).show() # id, road_number, speed_t 列名

### 把 df 转成 table -- sql 方式
def func(x):
    print(x)

airDF.registerTempTable(pg)
spark.sql("select * from pg limit 20").foreach(func)


### 存储为 RDBMS、xml、json等格式
## 存到数据库
airDF.write.jdbc("jdbc:postgresql://172.16.89.80:5432/postgres" ,
                 table = "test",mode="append", properties={"user": "postgres", "password": "postgres"})       # 写入数据库

## 存为 json
airDF.write.format(json).save(jsoin_path)       # 存入分区文件
airDF.coalesce(1).write.format(json).save(filtered.json)  # 存入单个文件,不建议使用

 

spark教程(十)-操作数据库

原文:https://www.cnblogs.com/yanshw/p/11697289.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!