在Spark 2.x中使用Phoenix 4.1x
截至撰文時我們使用最新版本的Spark2.3與Phoenix1.13-HBase1.3實現對HBase的查詢。
Phoenix是個很不錯的工具,大大簡化了對HBase的操作,但使用中卻並不順利。此文簡單很個記錄,以備回顧及提醒有遇同樣問題的朋友。
先說需求:實現在一個流計算中根據不同的數據write到HBase的不同表中或是對同一表做Insert全部欄位或Update部分欄位,這一過程是無法用 phoenix-spark(http://phoenix.apache.org/phoenix_spark.html) 實現的,只能用通用的JDBC操作處理。
碩大大的驅動包
phoenix-4.13.1-HBase-1.3-client.jar 不包含其依賴就有近100MB大小,這是從舊版本一直延續到現在的通用連接方式。
此方式沒有遇到太多問題,提供下批處理的代碼:
def executeBatchUpdate(sql: String, records: Seq[Seq[Any]],batchSize: Long = 1000, conn: Connection = getConnection, autoClose: Boolean = true): Long = { var stmt: PreparedStatement = null try { conn.setAutoCommit(false) stmt = conn.prepareStatement(sql) val counter = records.zipWithIndex.map(record => { record._1.zipWithIndex.foreach(col => { packageParameters(stmt, col._2 + 1, col._1) }) stmt.execute() if (record._2 % batchSize == 0) { log.trace(s"commit>>${Thread.currentThread().getId} - ${record._2}") conn.commit() } 1 }).size log.trace(s"commit end>>${Thread.currentThread().getId} - $counter") conn.commit() counter } catch { case e: Throwable => log.error("ExecuteBatchUpdate error", e) throw e } finally { if (stmt != null && !stmt.isClosed) { stmt.close() } if (autoClose && !conn.isClosed) { conn.close() } }}
但這太不友好了,Phoenix新版提供了Query Server,可以使用輕量JDBC Client,於是我們準備切換過去,惡夢開始:
Avatica版本衝突
換到phoenix-queryserver-client-4.13.1-HBase-1.3.jar時,本地正常,Standalone 和 Yarn 中都出現了如下異常:
java.lang.RuntimeException: response code 500 at org.apache.calcite.avatica.remote.RemoteService.apply(RemoteService.java:45) at org.apache.calcite.avatica.remote.JsonService.apply(JsonService.java:247) at org.apache.calcite.avatica.remote.RemoteMeta.connectionSync(RemoteMeta.java:109) at org.apache.calcite.avatica.remote.RemoteMeta.createStatement(RemoteMeta.java:77)
經查是Spark自帶的calcite-avatica-1.2.0-incubating.jar與Phoenix的1.10.0版本衝突,前者不支持PROTOBUF。
看到這個很容易想到兩個參數 spark.driver.userClassPathFirst、spark.executor.userClassPathFirst 加上後這個問題沒了,但出現了其它莫名的問題,官方說是實驗性的,還是放棄吧,當然 spark.executor.extraClassPath、spark.executor.extraClassPath 這兩個參數也要試下,奇怪的是開發環境OK,測試、生產環境無效,三個環境的各組件版本相同,配置也近乎相同,沒有找到原因。
後來還是在官網Issue列表中找到了解決方法:java.lang.RuntimeException: response code 500 - Executing a spark job to connect to phoenix query server and load data ,看來官方也發現了這個問題,在計劃中的4.14及5.0中已解決了。但等等,細看 phoenix-queryserver-client-x.jar 實際上沒有解決,人家只是解決了phoenix-X-thin-client.jar,這又是什麼?文檔沒有提過呀。好吧,其實這個可以當 phoenix-queryserver-client-X.jar用,可以理解是後者的fatjar,並且最關鍵的是它通過 Maven maven-shade-plugin 插件實現了對可能衝突包的路徑改寫。
用了這個後問題解決了,但它依賴了Hadoop 在業務模塊中已經有了,最好排除掉。做法是clone phoenix 工程,phoenix-queryserver-client.pom中不需要的依賴改成provided並刪除對應的shdde,重新打包。我實際打出的phoenix-4.13-thin-client才6MB。
用這個包後的批處理的代碼要改成JDBC標準的:
def executeBatchUpdate(sql: String, records: Seq[Seq[Any]], batchSize: Long = 1000, conn: Connection = getConnection, autoClose: Boolean = true): Long = { var stmt: PreparedStatement = null try { conn.setAutoCommit(false) stmt = conn.prepareStatement(sql) val counter = records.zipWithIndex.map(record => { record._1.zipWithIndex.foreach(col => { packageParameters(stmt, col._2 + 1, col._1) }) stmt.addBatch() if (record._2 % batchSize == 0) { log.trace(s"commit>>${Thread.currentThread().getId} - ${record._2}") stmt.executeBatch() conn.commit() } 1 }).size log.trace(s"commit end>>${Thread.currentThread().getId} - $counter") stmt.executeBatch() conn.commit() counter } catch { case e: Throwable => log.error("ExecuteBatchUpdate error", e) throw e } finally { if (stmt != null && !stmt.isClosed) { stmt.close() } if (autoClose && !conn.isClosed) { conn.close() } }}
JDBC連接池
官方解釋過不推薦用(F.A.Q. | Apache Phoenix),但那是針對 phoenix-X-client 的,使用 thin-client 後我覺得還是可以有的,但目前沒看到有適配的連接池實現。
數據丟失
在使用 IMMUTABLE_ROWS=true 時如果針對帶索引的(無論是同步還是非同步索引、本地還是全局索引)的欄位進行更新(e.g. UPSERT INTO someTable (id, someCol) VALUES ( ? , ? ),someTable表有某些二級索引欄位)那麼在一同事務中正常,不同事務間會覆蓋之前的插入或更新操作,即除最後一次更新的欄位外其它欄位都置成null值。
雖然不可變索引性能高,但在這一問題導致我們只能選擇可變索引。
Query Server多實例
public static String getConnectionUrl(String protocol, String hostname, int port, String serialization) { String urlFmt = "jdbc:phoenix:thin:url=%s://%s:%s;serialization=%s"; return String.format(urlFmt, protocol, hostname, port, serialization);}
這是Query Server的連接配置,URL不支持多實例,需要通過Nginx等代理做負載。
推薦閱讀:
※Spark里的DAG是怎麼回事?
※hadoop 和spark如何系統的學習?
※[水] ECOOP真水+某項目真嘆氣
※內存有限的情況下 Spark 如何處理 T 級別的數據?
※矽谷之路38:深入淺出Spark(三)什麼是Standalone