SequoiaDB Spark Yarn部署及案例演示

1、 背景

由於MRv1在擴展性、可靠性、資源利用率和多框架等方面存在明顯的不足,在Hadoop MRv2中引入了資源管理和調度系統YARN。YARN是 Hadoop MRv2計算機框架中構建的一個獨立的、通用的資源管理系統,可為上層應用提供統一的資源管理和調度,它的引入為集群在利用率、資源統一管理和數據共享等方面帶來了巨大好處。主要體現在以下幾個方面:

(1)資源利用率大大提高。一種計算框架一個集群,往往會由於應用程序數量和資源需求的不均衡性,使得在某段時間有些計算框架集群資源緊張,而另外一些集群資源空閑。共享集群模式則通過多種框架共享資源,使得集群中的資源得到更加充分的利用;

(2)運維成本大大降低。共享集群模式使得少數管理員就可以完成多個框架的統一管理;

(3)共享集群的模式也讓多種框架共享數據和硬體資源更為方便。

2、 產品介紹

巨杉資料庫SequoiaDB是一款分散式非關係型文檔資料庫,可以被用來存取海量非關係型的數據,其底層主要基於分散式,高可用,高性能與動態數據類型設計,它兼顧了關係型資料庫中眾多的優秀設計:如索引、動態查詢和更新等,同時以文檔記錄為基礎更好地處理了動態靈活的數據類型。並且為了用戶能夠使用常見的分散式計算框架,SequoiaDB可以和常見分散式計算框架如Spark、Hadoop、HBase進行整合。本文主要講解SequoiaDB與Spark、YARN的整合以及通過一個案例來演示MapReduce分析存儲在SequoiaDB中的業務數據。

3、 環境搭建

3.1、 伺服器分布

3.2、 軟體配置

操作系統:RedHat6.5

JDK版本:1.7.0_80 64位

Scala版本:

Hadoop版本:2.7.2

Spark版本:2.0

SequoiaDB版本:2.0

3.3、 安裝步驟

1、JDK安裝

tar -xvf jdk-7u45-linux-x64.tar.gz –C /usr/localcd /usr/local ln -s jdk1.7.0_45 jdk

配置環境變數

vim ~/.bash_profile export JAVA_HOME=/usr/local/jdk export CLASS_PATH=$JAVA_HOME/lib:$JAVA_HOME/jre/lib export PATH=$PATH:$JAVA_HOME/binsource /etc/profile

2、Scala安裝

tar -xvf scala-2.11.8.tgz –C /usr/localcd /usr/local ln -s scala-2.11.8 scala

配置環境變數

vim ~/.bash_profile export SCALA_HOME=/usr/local/scala export PATH=$PATH:$SCALA_HOME/bin

3、修改主機hosts文件配置

在每台主機上修改host文件

vim /etc/hosts192.168.1.46 node01192.168.1.47 node02192.168.1.48 master

4、 SSH免密鑰登錄

在master節點中執行ssh-keygen按回車鍵

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

將master節點中的授權文件authorized_keys傳輸到slave節點中

scp ~/.ssh/id_rsa.pub root@master:~/.ssh/

在slave節點中執行

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

在slave節點中驗證SSH免密鑰登錄

ssh master

5、Hadoop集群安裝

拷貝hadoop文件hadoop-2.7.2.tar.gz到/opt目錄中

解壓hadoop安裝包

tar –xvf hadoop-2.7.2.tar.gzmv hadoop-2.7.2 /opt/cloud/hadoop

創建hadoop數據存儲及臨時目錄

mkdir –p /opt/hadoop/datamkdir –p /opt/hadoop/tmp

配置Hadoop jdk環境變數

vim hadoop-env.shexport JAVA_HOME=/usr/local/jdk

編輯core.xml文件

<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://master:9000</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/opt/data/tmp</value> </property> <property> <name>io.file.buffer.size</name> <value>4096</value> </property></configuration>

編輯mapred-site.xml

<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.jobtracker.http.address</name> <value> master:50030</value> </property> <property> <name>mapreduce.jobhistory.address</name> <value> master:10020</value> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>master:19888</value> </property></configuration>

編輯hdfs-site.xml

<configuration> <property> <name>dfs.nameservices</name> <value>master</value> </property> <property> <name>dfs.namenode.secondary.http-address</name> <value> master:50090</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:///opt/hadoop/data/name</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:///opt/hadoop/data</value> </property> <property> <name>dfs.replication</name> <value>3</value> </property> <property> <name>dfs.webhdfs.enabled</name> <value>true</value> </property></configuration>

編輯yarn-site.xml

<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.address</name> <value> master:8032</value> </property> <property> <name>yarn.resourcemanager.scheduler.address</name> <value> master:8030</value> </property> <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value> master:8031</value> </property> <property> <name>yarn.resourcemanager.admin.address</name> <value> master:8033</value> </property> <property> <name>yarn.resourcemanager.webapp.address</name> <value>master:8088</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>12288</value> </property> <property> <name>yarn.nodemanager.log-dirs</name> <value>/opt/hadoop/tmp/userlogs</value> </property></configuration>

啟動Hadoop

首次啟動集群時,做如下操作

進入到/opt/cloud/hadoop/bin目錄中執行./hdfs namenode –format格式化

hdfs文件系統

進入到/opt/cloud/hadoop/sbin目錄中執行./start-all.sh啟動hadoop集群

6、安裝Spark集群

拷貝Spark安裝包到/opt目錄中,解壓

tar –xvf spark-2.0.0-bin-hadoop2.7.tgzmv spark-2.0.0-bin-hadoop2.7 /opt/cloud/spark

編輯spark-env.sh

vim spark-env.shJAVA_HOME="/usr/jdk1.7"SPARK_DRIVER_MEMORY="1g"SPARK_EXECUTOR_CORES=1SPARK_EXECUTOR_MEMORY="512m"SPARK_MASTER_PORT="7077"SPARK_MASTER_WEBUI_PORT="8070"SPARK_CLASSPATH="/opt/cloud/spark/jars/sequoiadb.jar:/opt/cloud/spark/jars/spark-sequoiadb_2.11-2.6.0.jar"SPARK_MASTER_IP="node03"SPARK_WORKER_MEMORY="712m"SPARK_WORKER_CORES=1SPARK_WORKER_INSTANCES=1SPARK_WORKER_DIR="/opt/data/spark/work"SPARK_LOCAL_DIRS="/opt/data/spark/tmp"HADOOP_HOME="/opt/cloud/hadoop"HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

編輯 slaves

node02node03

啟動spark集群

進入到目錄/opt/cloud/spark/sbin目錄中

./start-all.sh

Spark成功啟動後截圖如下:

7、Spark Yarn連接SequoiaDB

在SequoiaDB中創建集合空間、集合

db.createCS(poc);db.poc.createCL(test);

進入到spark安裝目錄bin中,執行./spark-sql –master yarn啟動spark sql交互界面

創建表,映射到上述poc集合空間中test集合

CREATE TABLE `test` (`id` INT, `name` STRING)USING com.sequoiadb.sparkOPTIONS ( `collection` test, `host` node02:11810,node03:11810, `serialization.format` 1, `collectionspace` poc);

查詢表test數據,執行:

Select * from test;

進入到yarn管理頁面查看spark任務

5、 案例演示

為了配合司法部門的執法和銀行內部的風險監管,部分商業銀行對於存取款業務定製了相關預警方案,本案例以個人存取款業務高頻交易來講述MapReduce如何分析SequoiaDB中的個人交易明細數據。

具體場景為:分析同一實體櫃員辦理,1小時內同一賬戶連續3筆以上支取類金額的交易賬戶及明細。

本演示案例採用Hadoop Map Reduce實現,開發語言為Java語言。整個測試程序分為兩個部分Map演算法和Reduce演算法。演示程序中Map演算法負責將同一個賬號的所有對應交易明細歸併在一起並輸出給Reduce端,Reduce端根據Map演算法的結果運算具體的業務場景,最後將運算結果寫入到SequoiaDB中。

具體架構如下:

Reduce端具體演算法流程如下:

Map端演算法代碼如下:

static class TMapper extends Mapper<Object, BSONWritable,Text,BSONWritable>{ @Override protected void map(Object key, BSONWritable value, Context context) throws IOException, InterruptedException { BSONObject obj = value.getBson(); String acct_no=(String) obj.get("ACCT_NO"); context.write(new Text(acct_no), value); }}

Reduce端演算法代碼如下:

static class TReducer extends Reducer<Text,BSONWritable,NullWritable,NullWritable>{ private static String pattern = "yyyy-MM-dd HH:mm:ss"; private DateFormat df = new SimpleDateFormat(pattern); private static int tradeNum1 = 3; private static int tradeTime1 = 3600; private static int tradeNum2 = 2; private static int tradeTime2 = 1800; private static int tradeAll = 100000; private Sequoiadb sdb = null; private CollectionSpace cs = null; private DBCollection cl_1 = null; private DBCollection cl_2 = null; private static String CS_NAME=""; private static String CL_NAME_1=""; private static String CL_NAME_2=""; public TReducer(){ if (null == sdb) { sdb = ConnectionPool.getInstance().getConnection(); } if (sdb.isCollectionSpaceExist(CS_NAME)) { cs = sdb.getCollectionSpace(CS_NAME); } else { throw new BaseException("集合空間" + CS_NAME + "不存在!"); } if (null == cs) { throw new BaseException("集合空間不能為null!"); } else { this.cl_1 = cs.getCollection(CL_NAME_1); } if (null == cs) { throw new BaseException("集合空間不能為null!"); } else { this.cl_2 = cs.getCollection(CL_NAME_2); } } @Override protected void reduce(Text key, Iterable<BSONWritable> values, Context context) throws IOException, InterruptedException{ Iterator<BSONWritable> iterator=values.iterator(); long sum=0; List<BSONWritable> oldList = new ArrayList<BSONWritable>(); while(iterator.hasNext()){ BSONWritable bsonWritable = iterator.next(); oldList.add(bsonWritable); } //對values進行排序,排序欄位為TRN_TIME(交易時間) Collections.sort(oldList, new Comparator<BSONWritable>() { @Override public int compare(BSONWritable o1, BSONWritable o2) { String trn_time1 = (String)o1.getBson().get("TRN_TIME"); String trn_time2 = (String)o2.getBson().get("TRN_TIME"); return trn_time2.compareTo(trn_time1); } }); Map<String,BSONWritable> result = new HashMap<String,BSONWritable>(); if(oldList != null && oldList.size() > 0){ //記錄同一賬戶滿足條件的筆數 Map<String,BSONWritable> tempMap = new HashMap<String,BSONWritable>(); for(int i=0;i<oldList.size()-1;i++){ BSONWritable bSONWritable1 = oldList.get(i); //交易代碼 String trn_cd = (String)bSONWritable1.getBson().get("TRN_CD"); if(trn_cd.equals("000045") || trn_cd.equals("001045") || trn_cd.equals("021031") || trn_cd.equals("020031") || trn_cd.equals("001060") || trn_cd.equals("000060")){ //交易櫃員 String tran_teller_no1 = (String)bSONWritable1.getBson().get("TRAN_TELLER_NO"); //流水號 String jrnl_no = (String)bSONWritable1.getBson().get("JRNL_NO"); //交易日期 String trn_date1 = (String)bSONWritable1.getBson().get("TRN_DATE"); //交易時間 String trn_time1 = (String)bSONWritable1.getBson().get("TRN_TIME"); Date bigDate = null; try { bigDate = df.parse(trn_date1+" "+trn_time1); } catch (ParseException e) { e.printStackTrace(); } tempMap.put(jrnl_no,bSONWritable1); for(int j=i+1;j<oldList.size();j++){ BSONWritable bSONWritable2 = oldList.get(j); //交易代碼 String trn_cd1 = (String)bSONWritable2.getBson().get("TRN_CD"); if(trn_cd1.equals("000045") || trn_cd1.equals("001045") || trn_cd1.equals("021031") || trn_cd1.equals("020031") || trn_cd1.equals("001060") || trn_cd1.equals("000060")){ //交易櫃員 String tran_teller_no2 = (String)bSONWritable2.getBson().get("TRAN_TELLER_NO"); //流水號 String jrnl_no2 = (String)bSONWritable2.getBson().get("JRNL_NO"); //交易日期 String trn_date2 = (String)bSONWritable2.getBson().get("TRN_DATE"); //交易時間 String trn_time2 = (String)bSONWritable2.getBson().get("TRN_TIME"); Date smallDate = null; try { smallDate = df.parse(trn_date1+" "+trn_time1); } catch (ParseException e) { e.printStackTrace(); } //判斷是否是同一實體{交易櫃員}辦理 if(!tran_teller_no1.equals(tran_teller_no2)){ continue; } //判斷{交易日期}{交易時間}是否是[1小時]內 if((bigDate.getTime()-smallDate.getTime())/1000 > tradeTime1){ break; } tempMap.put(jrnl_no2,bSONWritable2); }else{ //end if TRN_CD1.equals("000045") continue; } }//end for if(tempMap.size() >= tradeNum1){ result.putAll(tempMap); tempMap.clear(); } }else{ continue; }//end if || }//end for } Map<String,BSONWritable> result2 = new HashMap<String,BSONWritable>(); List<BSONObject> cl_1_list = new ArrayList<BSONObject>(); //結果寫入sdb Iterator iter1 = result.keySet().iterator(); while(iter1.hasNext()){ String keyValue = (String)iter1.next(); BSONWritable resultValue = result.get(keyValue); cl_1_list.add(resultValue.getBson()); cl_1.insert(resultValue.getBson()); } cl_1.bulkInsert(cl_1_list, DBCollection.FLG_INSERT_CONTONDUP); cl_1_list = null; List<BSONObject> cl_2_list = new ArrayList<BSONObject>(); context.write(null,null); } }

推薦閱讀:

深入淺出hbase和bigtable
設計數據密集型應用-DDIA中文翻譯
為物聯網而生:高性能時間序列資料庫HiTSDB商業化首發!
sql中插入中文問題
基於協程和非同步IO的NoSQL資料庫AsyncDB正式發布

TAG:資料庫 | NewSQL | Spark |