從頭學習大數據培訓課程 spark 基於內存的分散式計算框架(七)spark-hbase_bulkload、spark 程序集群運行

1.hbase bulk load

pom:

代碼:

結果:

2.spark on yarn

修改如下代碼

用assembly打成jar包

常規方法

spark-submit --jars $(echo /usr/local/hbase/lib/*.jar | tr ,) --class com.hainiu.spark.hbase.SparkHbaseBulk --master yarn --queue hainiu ~/spark-1.0-hainiu.jar /data/hainiu/user_install_status/20141228/part-r-00001 /user/qingniu/task/user_install_status

加driver工具

spark-submit --jars $(echo /usr/local/hbase/lib/*.jar | tr ,) --master yarn --queue hainiu ~/spark-1.0-hainiu.jar sparkhbaseload /data/hainiu/user_install_status/20141228/part-r-00001 /user/qingniu/task/user_install_status

配置環境變數,跳過上傳hbase的jar包,好加快啟動速度

spark-submit --master yarn --queue hainiu ~/spark-1.0-hainiu.jar sparkhbaseload /data/hainiu/user_install_status/20141228/part-r-00001 /user/qingniu/task/user_install_status

spark-env.sh

把每個機器上的hbase lib下的yarn相關的移除

yarn 包衝突

mv /usr/local/hbase/lib/hadoop-yarn-api-2.5.1.jar /usr/local/hbase/lib/hadoop-yarn-api-2.5.1.jar_back

mv /usr/local/hbase/lib/hadoop-yarn-client-2.5.1.jar /usr/local/hbase/lib/hadoop-yarn-client-2.5.1.jar_back

mv /usr/local/hbase/lib/hadoop-yarn-common-2.5.1.jar /usr/local/hbase/lib/hadoop-yarn-common-2.5.1.jar_back

mv /usr/local/hbase/lib/hadoop-yarn-server-common-2.5.1.jar /usr/local/hbase/lib/hadoop-yarn-server-common-2.5.1.jar_back

3.spark streaming

概述

隨著大數據技術的不斷發展,人們對於大數據的實時性處理要求也在不斷提高,傳統的 MapReduce 等批處理框架在某些特定領域,例如實時用戶推薦、用戶行為分析這些應用場景上逐漸不能滿足人們對實時性的需求,因此誕生了一批如 S3、Storm 這樣的流式分析、實時計算框架。Spark 由於其內部優秀的調度機制、快速的分散式計算能力,所以能夠以極快的速度進行迭代計算。正是由於具有這樣的優勢,Spark 能夠在某些程度上進行實時處理,Spark Streaming 正是構建在此之上的流式框架。

Spark Streaming可以實現高吞吐量的、具備容錯機制的實時流數據的處理。支持從多種數據源獲取數據,包括Kafka、ZeroMQ等消息隊列以及TCP sockets或者目錄文件從數據源獲取數據之後,可以使用諸如map、reduce、join和window等高級函數進行複雜演算法的處理。最後還可以將處理結果存儲到文件系統,資料庫

Spark Streaming接收這些實時輸入數據流,會將它們按批次劃分,然後交給Spark引擎處理,生成按照批次劃分的結果流。

Spark的各個子框架,都是基於核心Spark的,Spark Streaming在內部的處理機制是,接收實時流的數據,並根據一定的時間間隔拆分成一批批的數據,然後通過Spark Engine處理這些批數據,最終得到處理後的一批批結果數據。

對應的批數據,在Spark內核對應一個RDD實例,因此,對應流數據的DStream可以看成是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分成一批一批後,通過一個先進先出的隊列,然後 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,然後進行處理,這是一個典型的生產者消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。

Storm與Spark Streming比較

1.處理模型以及延遲

雖然兩框架都提供了可擴展性(scalability)和可容錯性(fault tolerance),但是它們的處理模型從根本上說是不一樣的。Storm可以實現亞秒級時延的處理,而每次只處理一條event,而Spark Streaming可以在一個短暫的時間窗口裡面處理多條(batches)Event。所以說Storm可以實現亞秒級時延的處理,而Spark Streaming則有一定的時延。

2.容錯和數據保證

然而兩者的代價都是容錯時候的數據保證,Spark Streaming的容錯為有狀態的計算提供了更好的支持。在Storm中,每條記錄在系統的移動過程中都需要被標記跟蹤,所以Storm只能保證每條記錄最少被處理一次,但是允許從錯誤狀態恢復時被處理多次。這就意味著可變更的狀態可能被更新兩次從而導致結果不正確。

任一方面,Spark Streaming僅僅需要在批處理級別對記錄進行追蹤,所以他能保證每個批處理記錄僅僅被處理一次,即使是node節點掛掉。

3.批處理框架集成

Spark Streaming的一個很棒的特性就是它是在Spark框架上運行的。這樣你就可以想使用其他批處理代碼一樣來寫Spark Streaming程序,或者是在Spark中交互查詢。這就減少了單獨編寫流批量處理程序和歷史數據處理程序。

4.生產支持

兩者都可以在各自的集群框架中運行,但是Storm可以在Mesos上運行, 而Spark Streaming可以在YARN和Mesos上運行。

Storm已經出現好多年了,而且自從2011年開始就在Twitter內部生產環境中使用,還有其他一些公司。而據twitter跳槽過來的同事說,現在他們內部已經不使用storm了,轉而使用Spark Streaming,可現Spark Streaming正漸漸成為主流

Spark Streaming優缺點

優點:

吞吐量大、速度快。

容錯:SparkStreaming在沒有額外代碼和配置的情況下可以恢復丟失的工作。checkpoint。

社區活躍度高。生態圈強大。因為後台是Spark

數據源廣泛。

缺點:

延遲。500毫秒已經被廣泛認為是最小批次大小,這個相對storm來說,還是大很多。所以實際場景中應注意該問題,就像標題分類場景,設定的0.5s一批次,加上處理時間,分類介面會佔用1s的響應時間。實時要求高的可選擇使用其他框架。

數據處理流程

Spark Streaming接收這些實時輸入數據流,會將它們按批次劃分,然後交給Spark引擎處理,生成按照批次劃分的結果流。

Spark Streaming提供了表示連續數據流的、高度抽象的被稱為離散流的DStream。DStream本質上表示RDD的序列。任何對DStream的操作都會轉變為對底層RDD的操作。

Spark Streaming使用數據源產生的數據流創建DStream,也可以在已有的DStream上使用一些操作來創建新的DStream。

架構與抽象

Spark Streaming使用「微批次」的架構,把流試計算當使得一系列連接的小規模批處理來對待,Spark Streaming從各種輸入源中讀取數據,並把數據分成小組的批次,新的批次按均勻的時間間隔創建出來,在每個時間區間開始的時候,一個新的批次就創建出來,在該區間內收到的數據都會被添加到這個批次中,在時間區間結束時,批次停止增長。時間區間的大小是由批次間隔這個參數決定的,批次間隔一般設在500毫秒到幾秒之間,由應用開發者配置,每個輸出批次都會形成一個RDD,以Spark作業的方式處理並生成其他的RDD。並能將處理結果按批次的方式傳給外部系統。

接收器用於接收外部輸入數據,然後Spark分批次生成新的RDD去處理,然後把批次處理的結果存儲到外部系統中。如HDFS、HBASE、MYSQL

接受器會佔用一個executor,所以在local[n]模式下,n必須大於1

DStream 操作

DStream 上的原語與 RDD 的類似,分為 Transformations(轉換)和 Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform() 以及各種 Window 相關的原語。

UpdateStateByKey 原語用於記錄歷史記錄,上文中 Word Count 示例中就用到了該特性。若不用 UpdateStateByKey 來更新狀態,那麼每次數據進來後分析完成,結果輸出後將不再保存。如輸入:hellow world,結果則為:(hello,1)(world,1),然後輸入 hello spark,結果則為 (hello,1)(spark,1)。也就是不會保留上一次數據處理的結果。

使用 UpdateStateByKey 原語用於需要記錄的 State,可以為任意類型,如上例中即為 Optional類型。

Transform() 原語允許 DStream 上執行任意的 RDD-to-RDD 函數,通過該函數可以方便的擴展 Spark API。

普通的轉換操作如下表所示:

OLDRDD -> NEWRDD

updateStateByKey

v1:當前批次數據

V2:上次數據

返回結果本次匯總數據,或者下次的V2數據

窗口轉換操作

在Spark Streaming中,數據處理是按批進行的,而數據採集是逐條進行的,因此在Spark Streaming中會先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把採集到的數據匯總起來成為一批數據交給系統去處理。

對於窗口操作而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續時間,在窗口操作中,只有窗口的長度滿足了才會觸發批數據的處理。除了窗口的長度,窗口操作還有另一個重要的參數就是滑動間隔(slide duration),它指的是經過多長時間窗口滑動一次形成新的窗口,滑動窗口默認情況下和批次間隔的相同,而窗口間隔一般設置的要比它們兩個大。在這裡必須注意的一點是滑動間隔和窗口間隔的大小一定得設置為批處理間隔的整數倍。

如圖所示,批處理間隔是1個時間單位,窗口間隔是3個時間單位,滑動間隔是2個時間單位。對於初始的窗口time 1-time 3,只有窗口間隔滿足了才觸發數據的處理。這裡需要注意的一點是,初始的窗口有可能流入的數據沒有撐滿,但是隨著時間的推進,窗口最終會被撐滿。當每個"2"個時間單位,窗口滑動一次後,會有新的數據流入窗口,這時窗口會移去最早的兩個時間單位的數據,而與最新的兩個時間單位的數據進行匯總形成新的窗口(time3-time5)。

對於窗口操作,批處理間隔、窗口間隔和滑動間隔是非常重要的三個時間概念,是理解窗口操作的關鍵所在。

Spark Streaming 還提供了窗口的計算,它允許你通過滑動窗口對數據進行轉換,窗口轉換操作如下:

輸出操作

Spark Streaming允許DStream的數據被輸出到外部系統,如資料庫或文件系統。由於輸出操作實際上使transformation操作後的數據可以通過外部系統被使用,同時輸出操作觸發所有DStream的transformation操作的實際執行(類似於RDD操作)。以下表列出了目前主要的輸出操作:

foreachRDD

foreachRDD,是一個非常強大的輸出操作,它允將許數據輸出到外部系統。就看你怎麼使用裡面的RDD了

DStreams通過輸出操作進行延遲執行,就像RDD由RDD操作懶惰地執行。具體來說,DStream輸出操作中的RDD動作強制處理接收到的數據。因此,如果您的應用程序沒有任何輸出操作,或者具有輸出操作,比如dstream.foreachRDD()沒有任何RDD操作,那麼任何操作都不會被執行。系統將簡單地接收數據並將其丟棄。

第一個SparkStreaming程序

socket

代碼

ncat使用,對應linux上的命令就是nc

結果

版權聲明:原創作品,允許轉載,轉載時務必以超鏈接的形式表明出處和作者信息。否則將追究法律責任。來自海牛學院-青牛


推薦閱讀:

Spark特點及缺點?
想研讀下spark的源碼,怎麼搭閱讀和調試的環境呢?
spark中的RDD叫做彈性分布數據集,如何理解彈性兩個字?
Spark SQL生成的代碼怎麼調試?
如何評價spark的機器學習框架 和 tensorflow的機器學習系統?

TAG:大數據 | Spark | 大數據處理 |