Spark學習總結(一)
來自專欄大數據分析挖掘37 人贊了文章
最近一周學習了scala編程基礎和spark核心編程,現主要對所學的spark部分做個總結。
RDD及其特點
1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分散式數據集。
2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分布在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分散式數據集)
3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。
4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。
5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁碟。(彈性)
創建RDD
進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation運算元,對該RDD進行轉換,來獲取其他的RDD。
Spark Core提供了三種創建RDD的方式:
1.使用程序中的集合創建RDD(主要用於測試)
List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)
SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();JavaRDD<String> lines = spark.read().textFile("D:\Users\Administrator\Desktop\spark.txt").javaRDD();
3.使用HDFS文件創建RDD(生產環境的常用方式)
SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();
使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:
第一,將SparkSession對象的master("local")方法去掉第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件操作RDD
Spark支持兩種RDD操作:transformation和action。
transformation操作
transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。
常用的transformation介紹:
map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。
filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。
flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。
groupByKey:根據key進行分組,每個key對應一個Iterable<value>。
reduceByKey:對每個key對應的value進行reduce操作。
sortByKey:對每個key對應的value進行排序操作。
join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。
cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。
action操作
action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。
常用的action介紹:
reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。
collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。
count:獲取RDD元素總數。
take(n):獲取RDD中前n個元素。
saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。
countByKey:對每個key對應的值進行count計數。
foreach:遍歷RDD中的每個元素。
RDD持久化
要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。
如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。
val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()
Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。
通用的持久化級別的選擇建議:
1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。
2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。
3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。
4、能不使用DISK相關的策略,就不用使用,有的時候,從磁碟讀取數據,還不如重新計算一次。
共享變數
Spark提供了兩種共享變數:Broadcast Variable(廣播變數)和Accumulator(累加變數)。
BroadcastVariable會將使用到的變數,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網路傳輸以及內存消耗。廣播變數是只讀的。
val factor = 3val broadcastVars = sc.broadcast(factor);val numberList = Array(1,2,3,4,5)val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變數讀值broadcastVars.value
Accumulator則可以讓多個task共同操作一份變數,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。
val numberList = Array(1,2,3,4,5)val numberRDD = sc.parallelize(numberList,1)val sum = sc.accumulator(0)numberRDD.foreach{m => sum += m}
小案例實戰1
案例需求:
1、對文本文件內的每個單詞都統計出其出現的次數。
2、按照每個單詞出現次數的數量,降序排序。步驟:
- 1.創建RDD
- 2.將文本進行拆分 (flatMap)
- 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
- 4.反轉鍵值對 (mapToPair)
- 5.按鍵升序排序 (sortedByKey)
- 6.再次反轉鍵值對 (mapToPair)
- 7.列印輸出(foreach)
Java版本jdk1.8以下
public class SortWordCount { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 創建lines RDD JavaRDD<String> lines = sc.textFile("D:\Users\Administrator\Desktop\spark.txt"); // 將文本分割成單詞RDD JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //將單詞RDD轉換為(單詞,1)鍵值對RDD JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() { @Override public Tuple2<String,Integer> call(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }); //對wordPair 進行按鍵計數 JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer +integer2; } }); // 到這裡為止,就得到了每個單詞出現的次數 // 我們的新需求,是要按照每個單詞出現次數的順序,降序排序 // wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2) // 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序 // 進行key-value的反轉映射 JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception { return new Tuple2<Integer, String>(s._2,s._1); } }); // 按照key進行排序 JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false); // 再次將value-key進行反轉映射 JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception { return new Tuple2<String, Integer>(s._2,s._1); } }); // 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數 // 列印出來 sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println("word ""+s._1+"" appears "+ s._2+" times."); } }); sc.close(); }}
Java版本jdk1.8
可以使用lambda表達式,簡化代碼:
public class SortWordCount { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 創建lines RDD JavaRDD<String> lines = sc.textFile("D:\Users\Administrator\Desktop\spark.txt"); JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1)); JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b)); JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1)); JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false); JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1)); sortedWordCount.foreach(s->System.out.println("word ""+s._1+"" appears "+ s._2+" times.")); sc.close(); }}
scala版本
由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。
package cn.spark.study.coreimport org.apache.spark.sql.SparkSessionobject SortWordCount { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\Users\Administrator\Desktop\spark.txt") val words = lines.flatMap{line => line.split(" ")} val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _) val countWord = wordCounts.map{word =>(word._2,word._1)} val sortedCountWord = countWord.sortByKey(false) val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)} sortedWordCount.foreach(s=> { println("word ""+s._1+ "" appears "+s._2+" times.") }) spark.stop() }}
小案例實戰2
需求:
1、按照文件中的第一列排序。
2、如果第一列相同,則按照第二列排序。實現步驟:
- 1、實現自定義的key,要實現Ordered介面和Serializable介面,在key中實現自己對多個列的排序演算法
- 2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD(map)
- 3、使用sortByKey運算元按照自定義的key進行排序(sortByKey)
- 4、再次映射,剔除自定義的key,只保留文本行(map)
- 5、列印輸出(foreach)
這裡主要用scala編寫
class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{ override def compare(that: SecondSortKey): Int = { if(this.first - that.first !=0){ this.first-that.first }else{ this.second-that.second } }}object SecondSort { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\sort.txt") val pairs = lines.map{line => ( new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line )} val sortedParis = pairs.sortByKey() val sortedLines = sortedParis.map(pairs => pairs._2) sortedLines.foreach(s => println(s)) spark.stop() }}
小案例實戰3
需求:
對每個班級內的學生成績,取出前3名。(分組取topn)
實現步驟:
1.創建初始RDD
2.對初始RDD的文本行按空格分割,映射為key-value鍵值對
3.對鍵值對按鍵分組
4.獲取分組後每組前3的成績:
- 4.1 遍歷每組,獲取每組的成績
- 4.2 將一組成績轉換成一個數組緩衝
- 4.3 將數組緩衝按從大到小排序
- 4.4 對排序後的數組緩衝取其前三
5.列印輸出
以下是使用scala實現:
object GroupTop3 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate() //創建初始RDD val lines = spark.sparkContext.textFile("D:\score.txt") //對初始RDD的文本行按空格分割,映射為key-value鍵值對 val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt)) //對pairs鍵值對按鍵分組 val groupedPairs = pairs.groupByKey() //獲取分組後每組前3的成績 val top3Score = groupedPairs.map(classScores => { var className = classScores._1 //獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三 var top3 = classScores._2.toBuffer.sortWith(_>_).take(3) Tuple2(className,top3) }) top3Score.foreach(m => { println(m._1) for(s <- m._2) println(s) println("------------------") }) }}
以上三個小案例都用Scala實現了,用到了Scala中的集合的操作、高階函數、鏈式調用、隱式轉換等知識,自己動手實現,對Scala有個比較好的理解和掌握。
作者:簡單的happy Python愛好者社區專欄作者
博客專欄:簡單的happy
推薦閱讀:
※Hive中Beeline提交的SQL查詢同Hive客戶端提交的SQL查詢處理流程有何異同?
※第九章:HDFS概述
※Hadoop環境搭建筆記整理(七)——Hadoop集群環境搭建以及節點的增刪
※想開發Angular項目,但是沒有開發環境?使用Docker So Easy!
※Hadoop生態圈技術