Spark作為ETL工具與SequoiaDB的結合應用
來自專欄 SequoiaDB——NoSQL技術分享
一、前言
ETL一詞較常用於數據倉庫,但其對象並不僅限於數據倉庫。ETL是指將數據從源系統中經過抽取(Extract)、轉換(Transform)、載入(Load)到目標數據存儲區的過程。常見的ETL工具有Oracle Data Integrator、Informatica PowerCenter、DataStage、Kettle、DataSprider等。
在大數據應用中,海量的數據及對潛在應用的支持是非常重要的方面,並體現出與傳統應用開發的巨大不同。因此,在選擇合適的ETL工具時,除了需要考慮數據處理的正確性、完整性、工具易用性、對不同數據格式的支持程度之外,還必須考慮數據處理的效率、處理能力的可擴展、容錯性。
Spark是UC Berkeley AMP lab開源的類Hadoop MapReduce的通用的並行計算框架,是一個新興的大數據處理引擎,主要特點是提供了一個集群的分散式內存抽象。與Hadoop相比,Spark將中間數據放在內存中,避免頻繁寫盤,因此效率更高,更適合於迭代計算;在操作類型、開發語言支持上更豐富;在分散式數據集計算時通過checkpoint來實現容錯。而且,由於Spark的分散式特性,處理能力的擴展更容易,也更經濟。因此,從整體上,Spark作為ETL工具能幫助企業實現技術和財務的雙贏。
SequoiaDB是新一代NewSQL資料庫,是文檔型分散式數據的典型代表。SequoiaDB企業版通過深度集成最新的Spark內存計算框架,實現了批處理分析、流處理等貼近應用的功能。存儲層和計算層兩層分離的架構,技術互補,是矽谷大數據新架構的主流,將分散式計算與分散式存儲的能力分別發揮到了極致。在Spark最新版本中,SparkSQL對標準SQL的支持也越來越完善,更加體現出Spark產品的成熟。因此,在SequoiaDB應用中,利用Spark進行數據加工分析是理想之選。
二、功能概述
作為ETL工具,必須具備多樣數據源的支持,比如HDFS、HBase、Amazon S3、MongoDB等。在這一點上,Spark支持跟多種數據源的對接,常見的數據源包括HDFS、Cassandra、HBase、Hive、ALLUXIO(即Tachyon)、Amazon S3;Spark也能從全文檢索工具Elasticsearch中讀寫數據。Spark作為ETL工具能滿足工具功能通用性的要求。
以Spark為ETL處理的數據流圖如圖一所示:
圖一 Spark為ETL數據流圖
在以上數據流圖中,可以將存儲於HDFS、Cassandra等系統中的存量數據通過Spark提供的介面抽到Spark中,利用Spark的快速處理能力進行處理,比如數據去重、更新,最後將結構數據存儲到巨杉資料庫中。整個處理過程中,不需要將數據以數據文件的形式存檔,加快了處理速度。
對於已存儲到巨杉資料庫中的數據,也可以在Spark中處理,並將處理後的數據落到庫中。
三、環境搭建
3.1 Spark環境搭建
Spark運行模式包括Standalone、Spark on YARN、Spark on Mesos。三種模式的主要區別在於使用的資源管理調度工具不一樣。這裡以Standalone模式為例進行說明。
在部署之前,將需要部署Spark的機器兩兩之間的信任關係配置好,並根據Spark版本對JDK版本的需求安裝配置好JDK。然後就可以開始安裝Spark。
首先,從Spark官網獲取最新版本的Spark安裝文件。下載完成後將其解壓到目標文件夾。
tar -zxvf spark-2.0.0-bin-hadoop2.6.tgz
從解壓出來的文件目錄可以看到,跟1.6版本相比,2.0版本的目錄結構有一些細微變化,lib目錄被刪除,增加了jars目錄。
然後,修改配置文件。通常需要修改的配置文件包含spark-env.sh、slaves,但為了後續使用方便,還需要修改或增加hive-site.xml、spark-defaults.conf、log4j.properties。下面分別進行說明。
1. spark-env.sh
配置Spark環境變數,包括:
SPARK_MASTER_IP:Spark集群Master節點IP地址;
SPARK_MASTER_PORT:Master節點埠號,默認為7077;
SPARK_WORKER_INSTANCES:每節點啟動的Worker進程數量;
SPARK_WORKER_CORES:本機上Worker可用核數;
SPARK_WORKER_MEMORY:Worker可分配給executor使用的總內存;
SPARK_WORKER_DIR:Worker工作目錄;
SPARK_LOCAL_DIRS:節點shuffle數據存放目錄;
SPARK_CLASSPATH:Spark默認classpath。
2.slaves
配置Spark集群中運行Worker的節點,值為主機名,每一行一個主機名。
3. hive-site.xml
主要用於元資料庫的配置。Spark默認使用Derby作為資料庫管理元數據,當我們需要配置其他資料庫作為元資料庫時,需要增加並修改此配置文件。一個例子如下所示:
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:postgresql://192.168.111.129:5432/metastore</value> <description>JDBC connect string for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>org.postgresql.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hiveuser</value> <description>Username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>mypassword</value> <description>password to use against metastore database</description> </property> <property> <name>datanucleus.autoCreateSchema</name> <value>false</value> </property></configuration>
4. spark-defaults.conf
Spark默認配置。該配置可以配置spark.master、spark.driver.memory、spark.executor.extraJavaOptions等。當我們需要通過JDBC使用SparkSQL時,需要首先啟動Thriftserver,啟動時需要指定MASTER_URL,這個MASTER_URL可以配置到spark-defaults.conf中的spark.master參數中,省去在命令行啟動時都需要輸入MASTER_URL的麻煩。
5. log4j.properties
配置Spark log日誌。
最後,啟動Spark集群。配置文件修改好後,就可以啟動Spark。由於已經配置好Master及Worker的信息,可以通過如下命令啟動Spark集群:
sbin/start-all.sh
3.2 配置Spark與SequoiaDB的連接
SequoiaDB開源了Spark連接器,可以在github網站上找到相應的代碼(SequoiaDB/spark-sequoiadb),打包後得到連接器命名為spark-sequoiadb-2.0.0.jar。將連接器和SequoiaDB Java驅動包sequoiadb.jar一起,拷貝至jars目錄,並在spark-env.sh中配置SPARK_CLASSPATH,將連接器及驅動包全路徑配置到SPARK_CLASSPATH環境變數中,如
SPARK_CLASSPATH=/opt/spark-2.0.0-bin-hadoop2.6/jars/spark-sequoiadb-2.0.0.jar:/opt/spark-2.0.0-bin-hadoop2.6/jars/sequoiadb.jar
配置完成後,通過如下命令啟動Thriftserver:
sbin/start-thriftserver.sh
啟動成功後,通過jps命令可以看到Thriftserver相關的進程:
圖二 Thriftserver相關進程
至此,Spark與SequoiaDB的環境已經搭建完成,可以開始用Spark處理數據了。
四、SequoiaDB數據處理
4.1 處理流程
SequoiaDB與Spark環境搭建完成之後,可以根據數據源的不同,採取不同的方式在Spark中創建針對不同數據源的映射,就可以將數據源與SequoiaDB通過Spark連接起來,完成ETL處理。
本節以SparkSQL對SequoiaDB中數據進行ETL為例,說明Spark與SequoiaDB的結合應用。源數據為其他時可以使用類似的處理邏輯。
在Spark中創建到SequoiaDB中集合的映射表語法為:
create table tablename (f1 string,f2 string,f3 int,f4 double,f5 string,f6 long) using com.sequoiadb.spark OPTIONS ( host sdbserver1:11810,sdbserver2:11810,sdbserver3:11810, collectionspace foo, collection bar);
其中,host為SequoiaDB的訪問地址,格式為hostname:svcname,可以包含多個地址。collectionspace及collection分別代表SequoiaDB中的集合空間及集合。
本例為利用每天增量數據對已有存量數據進行更新的場景,涉及的表為:賬戶信息表acct_info為結果表、賬戶信息中轉表repo_acct_info為每天增量數據、acct_info_his為已有存量數據。由於SparkSQL不支持UPDATE及DELETE操作,因此,涉及到UPDATE及DELETE的場景可以通過將結果數據存於新表的方式來完成。UPDATE分為兩步:
第一步:將中轉表中最新數據插入結果表。通過這一步,保證第一次進來的數據和存在更新的數據進到結果表。執行語句為:
insert into table dst.acct_infoselect * from src.repo_acct_infowhere tx_date = 2016-06-23 ;"
第二步:將未做任何更新的數據數據插入結果表。執行語句為:
insert into table dst.acct_infoselect distinct a.* from src.acct_info_his a left join src.repo_acct_info bon a.id = b.id and b.tx_date = 2016-06-23where b.id is null ;"
其中id為acct_info表的主鍵,通過id唯一標識一條記錄。通過以上兩個步驟,結果表acct_info即為經過去重後的更新數據。
而DELETE操作則只需要將不滿足刪除條件的數據插入新表即可。
4.2 性能結果
1. 系統配置
硬體環境:
軟體環境:
2. UPDATE場景
3. DELETE場景
五、結論
Spark能夠方便地讀取多樣數據源,作為一種較為成熟的新框架,Spark不僅支持HDFS、Cassandra、HIVE、Amazon S3這類相對較新的數據源,對傳統行業常見的如Oracle、DB2、Teradata,Spark也能很好地支持,且Spark支持SQL2003,應用Spark也能充分發揮傳統企業在SQL處理上的強項。在大數據應用中,以Spark為ETL工具可以充分發揮分散式計算框架Spark的處理能力的性能優勢。
作為全球獲得Databricks認證的14家發行商之一,SequoiaDB企業版深度集成最新的Spark內存計算框架,存儲層和計算層兩層分離的架構、技術互補,是矽谷大數據新架構的主流,將分散式計算與分散式存儲的能力分別發揮到了極致。如今,Spark技術已經被大量運用到實時流處理、分析等不同領域,後台數據加工也可以利用Spark技術得以實現。
SequoiaDB巨杉資料庫2.6最新版下載
SequoiaDB巨杉資料庫技術博客SequoiaDB巨杉資料庫社區推薦閱讀:
※SequoiaDB版本在線升級介紹說明
※NewSQL系統綜述——NewSQL到底New在哪裡?
※TiDB會不會出現「加拉帕戈斯」?
※TiDB RC1 Release