Spark Sql 連接mysql
來自專欄 Spark
我的原創地址:
Spark Sql 連接mysql1、基本概念和用法(摘自spark官方文檔中文版)
Spark SQL 還有一個能夠使用 JDBC 從其他資料庫讀取數據的數據源。當使用 JDBC 訪問其它資料庫時,應該首選 JdbcRDD。這是因為結果是以數據框(DataFrame)返回的,且這樣 Spark SQL操作輕鬆或便於連接其它數據源。因為這種 JDBC 數據源不需要用戶提供 ClassTag,所以它也更適合使用 Java 或 Python 操作。(注意,這與允許其它應用使用 Spark SQL 執行查詢操作的 Spark SQL JDBC 伺服器是不同的)。
使用 JDBC 訪問特定資料庫時,需要在 spark classpath 上添加對應的 JDBC 驅動配置。例如,為了從 Spark Shell 連接 postgres,你需要運行如下命令 :
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
通過調用數據源API,遠程資料庫的表可以被載入為DataFrame 或Spark SQL臨時表。支持的參數有 :
屬性名 | 含義 - | :-: url | 要連接的 JDBC URL。 dbtable | 要讀取的 JDBC 表。 注意,一個 SQL 查詢的 From 分語句中的任何有效表都能被使用。例如,既可以是完整表名,也可以是括弧括起來的子查詢語句。 driver | 用於連接 URL 的 JDBC 驅動的類名。 partitionColumn, lowerBound, upperBound, numPartitions | 這幾個選項,若有一個被配置,則必須全部配置。它們描述了當從多個 worker 中並行的讀取表時,如何對它分區。partitionColumn 必須是所查詢表的一個數值欄位。注意,lowerBound 和 upperBound 都只是用於決定分區跨度的,而不是過濾表中的行。因此,表中的所有行將被分區並返回。 fetchSize | JDBC fetch size,決定每次讀取多少行數據。 默認將它設為較小值(如,Oracle上設為 10)有助於 JDBC 驅動上的性能優化。
- 其實該部分翻譯自Spark官方文檔,所以對於翻譯有疑問的可直接看官方文檔
2、scala代碼實現連接mysql
2.1 添加mysql 依賴
在sbt 配置文件里添加:
"mysql" % "mysql-connector-java" % "6.0.6"
然後執行:
sbt eclipse
2.2 建表並初始化數據
DROP TABLE IF EXISTS `USER_T`; CREATE TABLE `USER_T` ( `ID` INT(11) NOT NULL, `USER_NAME` VARCHAR(40) NOT NULL, PRIMARY KEY (`ID`) ) ENGINE=INNODB DEFAULT CHARSET=UTF8;INSERT INTO `USER_T`(`ID`,`USER_NAME`) VALUES (1,測試1);INSERT INTO `USER_T`(`ID`,`USER_NAME`) VALUES (2,測試2);
2.3 代碼
2.3.1 查詢
package com.dkl.leanring.spark.sqlimport org.apache.spark.sql.SparkSession/** * spark查詢mysql測試 */object MysqlQueryDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("MysqlQueryDemo").master("local").getOrCreate() val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "USER_T") .option("user", "root") .option("password", "Root-123456") .load() jdbcDF.show() }}
2.3.2 插入數據
新建USER_T.csv,造幾條數據如圖: (需將csv的編碼格式轉為utf-8,否則spark讀取中文亂碼,轉碼方法見:https://jingyan.baidu.com/article/fea4511a092e53f7bb912528.html)
package com.dkl.leanring.spark.sqlimport org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.SaveModeimport java.util.Properties/** * 從USER_T.csv讀取數據並插入的mysql表中 */object MysqlInsertDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("MysqlInsertDemo").master("local").getOrCreate() val df = spark.read.option("header", "true").csv("src/main/resources/scala/USER_T.csv") df.show() val url = "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8" val prop = new Properties() prop.put("user", "root") prop.put("password", "Root-123456") df.write.mode(SaveMode.Append).jdbc(url, "USER_T", prop) }}
再查詢一次,就會發現表裡多了幾條數據
3、注意(更新)
上面的代碼在本地eclipse運行是沒有問題的,如果放在伺服器上用spark-submit提交的話,可能會報異常
java.sql.SQLException:No suitable driver
解決方法是在代碼里添加 mysql:
.option("driver", "com.mysql.jdbc.Driver")
oracle:
.option("driver", "oracle.jdbc.driver.OracleDriver")
具體可參考我的另一篇博客:spark-submit報錯:Exception in thread "main" java.sql.SQLException:No suitable driver
4、其他讀取mysql的方法(更新於2018.08.22)
發現還有Spark還有其他讀取mysql的方法,其實只是上面講的快捷方式(2.3.1),只附上用法,具體看以看api或參考其他博客,如spark:scala讀取mysql的4種方法 (我就是在這篇博客看到的,該博客是Spark1.x的寫法,若用Spark2.x將sqlContext改為本文的spark即可)
spark.read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, connectionProperties)spark.read.jdbc(url, table, predicates, connectionProperties)spark.read.jdbc(url, table, properties)
- 只要在2.3.1的代碼里用.option(key,value)即可
5、關於讀取mysql的分區設置(更新於2018.08.22)
按照2.3.1的代碼讀取的DataFrame的分區數為1,若想改變分區數,一種方法是重分區
df.repartition(numPartitions)
另一種是在讀的時候設置分區數,在第一部分可以看到,通過numPartitions可以設置分區數,但是注意partitionColumn, lowerBound, upperBound, numPartitions需要同時設置 如:
val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://192.168.44.128:3306/hive?useUnicode=true&characterEncoding=utf-8") .option("dbtable", "USER_T") .option("user", "root") .option("password", "Root-123456") .option("numPartitions", "160") .option("partitionColumn", "ID") .option("lowerBound", "1") .option("upperBound", "1000") .load()
推薦閱讀:
※第七章:完全分散式部署Hadoop
※spark ML ,1概述:評估器,轉換器和管道
※spark集群各角色簡介
※python操作hive實戰
※Spark Streaming入門