Spark學習總結(一)

Spark學習總結(一)

來自專欄大數據分析挖掘37 人贊了文章

最近一周學習了scala編程基礎和spark核心編程,現主要對所學的spark部分做個總結。

RDD及其特點

1、RDD是Spark的核心數據模型,但是個抽象類,全稱為Resillient Distributed Dataset,即彈性分散式數據集。

2、RDD在抽象上來說是一種元素集合,包含了數據。它是被分區的,分為多個分區,每個分區分布在集群中的不同節點上,從而讓RDD中的數據可以被並行操作。(分散式數據集)

3、RDD通常通過Hadoop上的文件,即HDFS文件或者Hive表,來進行創建;有時也可以通過應用程序中的集合來創建。

4、RDD最重要的特性就是,提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDDpartition,因為節點故障,導致數據丟了,那麼RDD會自動通過自己的數據來源重新計算該partition。這一切對使用者是透明的。

5、RDD的數據默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數據寫入磁碟。(彈性)

創建RDD

進行Spark核心編程的第一步就是創建一個初始的RDD。該RDD,通常就代表和包含了Spark應用程序的輸入源數據。然後通過Spark Core提供的transformation運算元,對該RDD進行轉換,來獲取其他的RDD。

Spark Core提供了三種創建RDD的方式:

1.使用程序中的集合創建RDD(主要用於測試)

List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);

2.使用本地文件創建RDD(主要用於臨時性處理有大量數據的文件)

SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();JavaRDD<String> lines = spark.read().textFile("D:\Users\Administrator\Desktop\spark.txt").javaRDD();

3.使用HDFS文件創建RDD(生產環境的常用方式)

SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();

使用HDFS文件創建RDD對比使用本地文件創建RDD,需要修改的,只有兩個地方:

第一,將SparkSession對象的master("local")方法去掉

第二,我們針對的不是本地文件了,修改為hadoop hdfs上的真正的存儲大數據的文件

操作RDD

Spark支持兩種RDD操作:transformation和action。

transformation操作

transformation操作會針對已有的RDD創建一個新的RDD。transformation具有lazy特性,即transformation不會觸發spark程序的執行,它們只是記錄了對RDD所做的操作,不會自發的執行。只有執行了一個action,之前的所有transformation才會執行。

常用的transformation介紹:

map :將RDD中的每個元素傳人自定義函數,獲取一個新的元素,然後用新的元素組成新的RDD。

filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。

flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。

groupByKey:根據key進行分組,每個key對應一個Iterable<value>。

reduceByKey:對每個key對應的value進行reduce操作。

sortByKey:對每個key對應的value進行排序操作。

join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數進行處理。

cogroup:同join,但是每個key對應的Iterable<value>都會傳入自定義函數進行處理。

action操作

action操作主要對RDD進行最後的操作,比如遍歷,reduce,保存到文件等,並可以返回結果給Driver程序。action操作執行,會觸發一個spark job的運行,從而觸發這個action之前所有的transformation的執行,這是action的特性。

常用的action介紹:

reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。

collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。

count:獲取RDD元素總數。

take(n):獲取RDD中前n個元素。

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法。

countByKey:對每個key對應的值進行count計數。

foreach:遍歷RDD中的每個元素。

RDD持久化

要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。但是cache()或者persist()的使用是有規則的,必須在transformation或者textFile等創建了一個RDD之後,直接連續調用cache()或persist()才可以。

如果你先創建一個RDD,然後單獨另起一行執行cache()或persist()方法,是沒有用的,而且會報錯,大量的文件會丟失。

val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()

Spark提供的多種持久化級別,主要是為了在CPU和內存消耗之間進行取捨。

通用的持久化級別的選擇建議:

1、優先使用MEMORY_ONLY,如果可以緩存所有數據的話,那麼就使用這種策略。因為純內存速度最快,而且沒有序列化,不需要消耗CPU進行反序列化操作。

2、如果MEMORY_ONLY策略,無法存儲所有數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操作還是非常快,只是要消耗CPU進行反序列化。

3、如果需要進行快速的失敗恢復,那麼就選擇帶後綴為_2的策略,進行數據的備份,這樣在失敗時,就不需要重新計算了。

4、能不使用DISK相關的策略,就不用使用,有的時候,從磁碟讀取數據,還不如重新計算一次。

共享變數

Spark提供了兩種共享變數:Broadcast Variable(廣播變數)和Accumulator(累加變數)。

BroadcastVariable會將使用到的變數,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網路傳輸以及內存消耗。廣播變數是只讀的。

val factor = 3val broadcastVars = sc.broadcast(factor);val numberList = Array(1,2,3,4,5)val number = sc.parallelize(numberList).map( num => num * broadcastVars.value) //廣播變數讀值broadcastVars.value

Accumulator則可以讓多個task共同操作一份變數,主要可以進行累加操作。task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程序可以讀取Accumulator的值。

val numberList = Array(1,2,3,4,5)val numberRDD = sc.parallelize(numberList,1)val sum = sc.accumulator(0)numberRDD.foreach{m => sum += m}

小案例實戰1

案例需求:

1、對文本文件內的每個單詞都統計出其出現的次數。

2、按照每個單詞出現次數的數量,降序排序。

步驟:

  • 1.創建RDD
  • 2.將文本進行拆分 (flatMap)
  • 3.將拆分後的單詞進行統計 (mapToPair,reduceByKey)
  • 4.反轉鍵值對 (mapToPair)
  • 5.按鍵升序排序 (sortedByKey)
  • 6.再次反轉鍵值對 (mapToPair)
  • 7.列印輸出(foreach)

Java版本jdk1.8以下

public class SortWordCount { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 創建lines RDD JavaRDD<String> lines = sc.textFile("D:\Users\Administrator\Desktop\spark.txt"); // 將文本分割成單詞RDD JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //將單詞RDD轉換為(單詞,1)鍵值對RDD JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() { @Override public Tuple2<String,Integer> call(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }); //對wordPair 進行按鍵計數 JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer +integer2; } }); // 到這裡為止,就得到了每個單詞出現的次數 // 我們的新需求,是要按照每個單詞出現次數的順序,降序排序 // wordCounts RDD內的元素是這種格式:(spark, 3) (hadoop, 2) // 因此我們需要將RDD轉換成(3, spark) (2, hadoop)的這種格式,才能根據單詞出現次數進行排序 // 進行key-value的反轉映射 JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception { return new Tuple2<Integer, String>(s._2,s._1); } }); // 按照key進行排序 JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false); // 再次將value-key進行反轉映射 JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception { return new Tuple2<String, Integer>(s._2,s._1); } }); // 到此為止,我們獲得了按照單詞出現次數排序後的單詞計數 // 列印出來 sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> s) throws Exception { System.out.println("word ""+s._1+"" appears "+ s._2+" times."); } }); sc.close(); }}

Java版本jdk1.8

可以使用lambda表達式,簡化代碼:

public class SortWordCount { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); // 創建lines RDD JavaRDD<String> lines = sc.textFile("D:\Users\Administrator\Desktop\spark.txt"); JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1)); JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b)); JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1)); JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false); JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1)); sortedWordCount.foreach(s->System.out.println("word ""+s._1+"" appears "+ s._2+" times.")); sc.close(); }}

scala版本

由於spark2 有了統一切入口SparkSession,在這裡就使用了SparkSession。

package cn.spark.study.coreimport org.apache.spark.sql.SparkSessionobject SortWordCount { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\Users\Administrator\Desktop\spark.txt") val words = lines.flatMap{line => line.split(" ")} val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _) val countWord = wordCounts.map{word =>(word._2,word._1)} val sortedCountWord = countWord.sortByKey(false) val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)} sortedWordCount.foreach(s=> { println("word ""+s._1+ "" appears "+s._2+" times.") }) spark.stop() }}

小案例實戰2

需求:

1、按照文件中的第一列排序。

2、如果第一列相同,則按照第二列排序。

實現步驟:

  • 1、實現自定義的key,要實現Ordered介面和Serializable介面,在key中實現自己對多個列的排序演算法
  • 2、將包含文本的RDD,映射成key為自定義key,value為文本的JavaPairRDD(map)
  • 3、使用sortByKey運算元按照自定義的key進行排序(sortByKey)
  • 4、再次映射,剔除自定義的key,只保留文本行(map)
  • 5、列印輸出(foreach)

這裡主要用scala編寫

class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{ override def compare(that: SecondSortKey): Int = { if(this.first - that.first !=0){ this.first-that.first }else{ this.second-that.second } }}object SecondSort { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate() val lines = spark.sparkContext.textFile("D:\sort.txt") val pairs = lines.map{line => ( new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line )} val sortedParis = pairs.sortByKey() val sortedLines = sortedParis.map(pairs => pairs._2) sortedLines.foreach(s => println(s)) spark.stop() }}

小案例實戰3

需求:

對每個班級內的學生成績,取出前3名。(分組取topn)

實現步驟:

1.創建初始RDD

2.對初始RDD的文本行按空格分割,映射為key-value鍵值對

3.對鍵值對按鍵分組

4.獲取分組後每組前3的成績:

  • 4.1 遍歷每組,獲取每組的成績
  • 4.2 將一組成績轉換成一個數組緩衝
  • 4.3 將數組緩衝按從大到小排序
  • 4.4 對排序後的數組緩衝取其前三

5.列印輸出

以下是使用scala實現:

object GroupTop3 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate() //創建初始RDD val lines = spark.sparkContext.textFile("D:\score.txt") //對初始RDD的文本行按空格分割,映射為key-value鍵值對 val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt)) //對pairs鍵值對按鍵分組 val groupedPairs = pairs.groupByKey() //獲取分組後每組前3的成績 val top3Score = groupedPairs.map(classScores => { var className = classScores._1 //獲取每組的成績,將其轉換成一個數組緩衝,並按從大到小排序,取其前三 var top3 = classScores._2.toBuffer.sortWith(_>_).take(3) Tuple2(className,top3) }) top3Score.foreach(m => { println(m._1) for(s <- m._2) println(s) println("------------------") }) }}

以上三個小案例都用Scala實現了,用到了Scala中的集合的操作、高階函數、鏈式調用、隱式轉換等知識,自己動手實現,對Scala有個比較好的理解和掌握。

作者:簡單的happy Python愛好者社區專欄作者

博客專欄:簡單的happy

推薦閱讀:

Hive中Beeline提交的SQL查詢同Hive客戶端提交的SQL查詢處理流程有何異同?
第九章:HDFS概述
Hadoop環境搭建筆記整理(七)——Hadoop集群環境搭建以及節點的增刪
想開發Angular項目,但是沒有開發環境?使用Docker So Easy!
Hadoop生態圈技術

TAG:Hadoop | MapReduce | Spark |