Spark Streaming入門
5 人贊了文章
歡迎大家前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~
本文將幫助您使用基於HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一個擴展,支持連續的數據流處理。
什麼是Spark Streaming?
首先,什麼是流(streaming)?數據流是連續到達的無窮序列。流處理將不斷流動的輸入數據分成獨立的單元進行處理。流處理是對流數據的低延遲處理和分析。Spark Streaming是Spark API核心的擴展,可實現實時數據的快速擴展,高吞吐量,高容錯處理。Spark Streaming適用於大量數據的快速處理。實時處理用例包括:
- 網站監控,網路監控
- 欺詐識別
- 網頁點擊
- 廣告
- 物聯網感測器
Spark Streaming支持如HDFS目錄,TCP套接字,Kafka,Flume,Twitter等數據源。數據流可以用Spark 的核心API,DataFrames SQL,或機器學習的API進行處理,並且可以被保存到HDFS,databases或Hadoop OutputFormat提供的任何文件系統中去。
Spark Straming如何工作
Spark Streaming將數據流每X秒分作一個集合,稱為Dstreams,它在內部是一系列RDD。您的Spark應用程序使用Spark API處理RDD,並且批量返回RDD操作的結果。
示例應用程序的體系結構
Spark Streaming示例代碼執行以下操作:
- 讀取流式數據。
- 處理流數據。
- 將處理後的數據寫入HBase表。
其他Spark示例代碼執行以下操作:
- 讀取流媒體代碼編寫的HBase Table數據
- 計算每日匯總的統計信息
- 將匯總統計信息寫入HBase表
示例數據集
油泵感測器數據文件放入目錄中(文件是以逗號為分隔符的CSV)。Spark Streaming將監視目錄並處理在該目錄中創建的所有文件。(如前所述,Spark Streaming支持不同的流式數據源;為簡單起見,此示例將使用CSV。)
以下是帶有一些示例數據的csv文件示例:
我們使用Scala案例類來定義與感測器數據csv文件相對應的感測器模式,並使用parseSensor函數將逗號分隔值解析到感測器案例類中。
HBase表格模式
流數據的HBase表格模式如下:
- 泵名稱日期和時間戳的複合行鍵
- 可以設置報警列簇,來監控數據。請注意,數據和警報列簇可能會設為在一段時間後失效。
日常統計匯總的模式如下所示:
- 泵名稱和日期的複合行鍵
- 列簇統計
- 最小值,最大值和平均值。
下面的函數將Sensor對象轉換為HBase Put對象,該對象用於將數據行插入到HBase中。
寫HBase表的配置
您可以使用Spark 的TableOutputFormat類寫入HBase表,這與您從MapReduce寫入HBase表的方式類似。下面我們使用TableOutputFormat類設置HBase的配置。
Spark Streaming示例代碼
這些是Spark Streaming代碼的基本步驟:
- 初始化Spark StreamingContext對象。
- 將轉換和輸出操作應用於DStream。
- 開始接收數據並使用streamingContext.start()處理它。
- 等待streamingContext.awaitTermination()的返回從而停止處理。
我們將通過示例應用程序代碼完成這些步驟。
初始化StreamingContext
首先,我們創建一個StreamingContext,這是流式傳輸的主要入口點(2秒間隔時間)。
val sparkConf = new SparkConf ( ) . setAppName ( "HBaseStream" )// 創建 StreamingContext, 流式函數的主要入口val ssc = new StreamingContext ( sparkConf , Seconds ( 2 ) )
接下來,我們使用StreamingContext textFileStream(directory)方法創建一個輸入流,該輸入流監視Hadoop兼容的文件系統以獲取新文件,並處理在該目錄中創建的所有文件。
// 創建代表數據 DStream對象val linesDStream = ssc . textFileStream ( "/user/user01/stream" )
linesDStream代表數據流,每個記錄都是一行文本。內部DStream是一系列RDD,每個批處理間隔一個RDD。
將轉換和輸出操作應用於DStream
接下來,我們將數據行解析為Sensor對象,並使用DStream行上的map操作。
// 把lineDSream的每一行解析為Sensor對象val sensorDStream = linesDStream . map ( Sensor . parseSensor )
map操作在linesDStream中的RDD上使用Sensor.parseSensor函數,從而生成Sensor對象(RDD)。
接下來,我們使用DStream foreachRDD方法將處理應用於此DStream中的每個RDD。我們過濾低psi感測器對象以創建警報,然後我們通過將感測器和警報數據轉換為Put對象並使用PairRDDFunctions saveAsHadoopDataset方法將感測器和警報數據寫入HBase ,該方法使用Hadoop將RDD輸出到任何支持Hadoop的存儲系統,該存儲系統的配置對象(請參閱上面的HBase的Hadoop配置)。
// 對每一個RDD. sensorRDD . foreachRDD { rdd =>// 低psi的感測器過濾器 val alertRDD = rdd . filter ( sensor => sensor . psi < 5.0 )// 把感測器數據轉為對象並寫入HDrdd . map ( Sensor . convertToPut ) . saveAsHadoopDataset (jobConfig )// 把警報轉為對象並寫入HDrdd . map ( Sensor . convertToPutAlert ) . saveAsHadoopDataset (jobConfig )}
sensorRDD對象被轉換並寫入HBase。
開始接收數據
要開始接收數據,我們必須在StreamingContext上顯式調用start(),然後調用awaitTermination來等待計算完成。
// 開始計算ssc . start ( )// 等待計算完成ssc . awaitTermination ( )
Spark R寫入HBase
現在我們要讀取HBase感測器表數據,計算每日摘要統計信息並將這些統計信息寫入。
以下代碼讀取HBase表,感測器表,psi列數據,使用StatCounter計算此數據的統計數據,然後將統計數據寫入感測器統計數據列。
// HBase的讀取設置 val conf = HBaseConfiguration . create ( )conf . set ( TableInputFormat . INPUT_TABLE , HBaseSensorStream . tableName )// 掃描數據conf . set ( TableInputFormat . SCAN_COLUMNS , "data:psi" ) // 載入RDD (row key, row Result)元組val hBaseRDD = sc . newAPIHadoopRDD ( conf , classOf [TableInputFormat ] ,classOf [ org . apache . hadoop . hbase . io . ImmutableBytesWritable ] ,classOf [ org . apache . hadoop . hbase . client . Result ] )// 把(row key, row Result) 元組為RDDval resultRDD = hBaseRDD.map(tuple => tuple._2)// 轉為 RDD (RowKey, ColumnValue), 移除Timeval keyValueRDD = resultRDD. map(result => (Bytes.toString(result.getRow()). split(" ")(0), Bytes.toDouble(result.value)))// 分組,得到統計數據val keyStatsRDD = keyValueRDD. groupByKey(). mapValues(list => StatCounter(list))// 轉碼rowkey,統計信息放入並寫入hbasekeyStatsRDD.map { case (k, v) => convertToPut(k, v)}.saveAsHadoopDataset(jobConfig)
下圖顯示newAPIHadoopRDD的輸出。PairRDDFunctions saveAsHadoopDataset將Put對象保存到HBase。
軟體
- 本教程將在MapR Sandbox上運行 ,其中包括Spark。
- 您可以從這裡下載代碼和數據以運行這些例子:
- 代碼:https://github.com/caroljmcdonald/SparkStreamingHBaseExample
運行程序
您可以將代碼作為獨立應用程序運行,如「MapR Sandbox上的Spark入門教程」中所述。
以下是總的步驟:
- 按照MapR沙箱入門Spark中的介紹,用戶ID user01,密碼mapr。
- 使用maven構建應用程序。
- 使用scp將jar文件和數據文件複製到沙盒主目錄/ user / user01。
- 運行應用程序:/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -path
hbase classpath
--class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar - 將流式數據文件複製到流目錄中:
cp sensordata.csv /user/user01/stream/
- 讀取數據並計算一列的數據/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -path hbase classpath - --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar
- 計算整行的統計信息/ opt / mapr / spark / spark- <version> / bin / spark-submit --driver-class -path hbase classpath - --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar
總結
這就結束了關於使用HBase進行Spark Streaming的教程。您可以在相關閱讀部分找到更多信息。
問答
如何使用MySQL和ApacheSPark?相關閱讀Spark Streaming編程指南Spark Streaming應用與實戰全攻略簡談Spark Streaming的實時計算整合
此文已由作者授權騰訊雲+社區發布,原文鏈接:https://cloud.tencent.com/developer/article/1123173?fromSource=waitui
http://weixin.qq.com/r/6TlxaU-EaRIWrQsK92z7 (二維碼自動識別)
推薦閱讀:
※【HBase從入門到精通系列】誤刪數據如何搶救?
※高手如何實踐HBase?不容錯過的滴滴內部技巧
※1. 從 MySql 到 Hbase (集群化方案)
※HBase最佳實踐-讀性能優化策略
※一種HBase的表region切分和rowkey設計方案