Hadoop
Hadoop簡述
其實Hadoop誕生至今已經十年了,網路上也充斥著關於Hadoop相關知識的的海量資源。但是,有時還是會使剛剛接觸大數據領域的童鞋分不清hadoop、hdfs、Yarn和MapReduce等等技術辭彙。
Hadoop是ASF(Apache軟體基金會)開源的,根據Google開源的三篇大數據論文設計的,一個能夠允許大量數據在計算機集群中,通過使用簡單的編程模型進行分散式處理的框架。其設計的規模可從單一的伺服器到數千台伺服器,每一個均可提供局部運算和存儲功能。Hadoop並不依賴昂貴的硬體以支持高可用性。Hadoop可以檢測並處理應用層上的錯誤,並可以把錯誤轉移到其他伺服器上(讓它錯誤,我在用別的伺服器頂上就可以了),所以Hadoop提供一個基於計算機集群的、高效性的服務。
經過十年的發展,Hadoop這個名詞的本身也在不斷進化者,目前我們提到Hadoop大多是指大數據的生態圈,這個生態圈包括眾多的軟體技術(e.g. HBase、Hive和Spark等等)。
有如Spring框架有著最基礎的幾個模塊Context、Bean和Core,其他的模塊和項目都是基於這些基礎模塊構建。Hadoop與之一樣,也有最基礎的幾個模塊。
- Common : 支持其他模塊的公用工具包。
- HDFS : 一個可高吞吐訪問應用數據的分散式文件系統。
- YARN : 一個管理集群伺服器資源和任務調度的框架。
- MapReduce : 基於Yarn對大數據集進行並行計算的系統。
其它的,像HBase、Hive等等不過在這幾個基礎模塊上的高級抽象。另外Hadoop也不是目前大數據的唯一解決方案,像Amazon的大數據技術方案等等。目前筆者接觸比較多的是Apache原生軟體和Cloudera的商業版軟體。在以後的章節中會以Cloudera版為藍本,向大家講解。
Common
Common模塊是Hadoop最為基礎的模塊,他為其他模塊提供了像操作文件系統、I/O、序列化和遠程方法調用等最為基礎的實現。如果想深入的了解Hadoop的具體實現,可以閱讀一下Common的源碼。
HDFS
HDFS是「Hadoop Distributed File System」的首字母縮寫,是一種設計運行在一般硬體條件(不需要一定是伺服器級別的設備,但更好的設備能發揮更大的作用)下的分散式文件系統. 他和現有的其他分散式文件系統(e.g. RAID)有很多相似的地方。和其他分散式文件系統的不同之處是HDFS設計為運行在低成本的硬體上(e.g. 普通的PC機),且提供高可靠性的伺服器. HDFS設計滿足大數據量,大吞吐量的應用情況。
為了更好的理解分散式文件系統,我們先從文件講起。
文件
文件這個詞,恐怕只要是現代人都不會陌生。但是在不同行業中,文件有著不同的意義。在計算機科學領域,文件是什麼呢?文件是可以在目錄中看的見的圖標么?當然不是。文件在存儲設備時,是個N長的位元組序列。而在一個計算機使用者的角度而言,文件是對所有I/O設備的抽象。每個I/O設備都可以視為文件,包括磁碟、鍵盤和網路等。文件這個簡單而精緻的概念其內涵是十分豐富的,它嚮應用程序提供了一個統一的視角,來看待系統中可能含有的各式各樣的I/O設備。
文件系統
那麼一台計算機上肯定不止一個文件,成千上萬的文件怎麼管理呢?因此需要我們需要一種對文件進行管理的東西,即文件系統。文件系統是一種在計算機上存儲和組織數據的方法,它使得對其訪問和查找變得容易,文件系統使用文件和樹形目錄的抽象邏輯概念代替了硬碟和光碟等物理設備使用數據塊的概念,用戶使用文件系統來保存數據而不必關心數據實際保存在硬碟的地址為多少的數據塊上,只需要記住這個文件的所屬目錄和文件名。在寫入新數據之前,用戶不必關心硬碟上的那個塊地址沒有被使用,硬碟上的存儲空間管理(分配和釋放)功能由文件系統自動完成,用戶只需要記住數據被寫入到了哪個文件中即可。
分散式文件系統
相對於單機的文件系統而言,分散式文件系統(Distributed file system)。是一種允許文件通過網路在多台主機上分享的文件系統,可讓多計算機上的多用戶分享文件和存儲空間。
在這樣的文件系統中,客戶端並非直接訪問底層的數據存儲區塊和磁碟。而是通過網路,基於單機文件系統並藉由特定的通信協議的幫助,來實現對於文件系統的讀寫。
分散式文件系統需要擁有的最基本的能力是通過暢通網路I/O來實現數據的複製與容錯。也就是說,一方面一個文件是分為多個數據塊分布在多個設備中。另一方面,數據塊有多個副本分布在不同的設備上。即使有一小部分的設備出現離線和宕機等情況,整體來說文件系統仍然可以持續運作而不會有數據損失。
注意:分散式文件系統和分散式數據存儲的界線是模糊的,但一般來說,分散式文件系統是被設計用在區域網,比較強調的是傳統文件系統概念的延伸,並通過軟體方法來達成容錯的目的。而分散式數據存儲,則是泛指應用分散式運算技術的文件和資料庫等提供數據存儲服務的系統。
HDFS
HDFS正是Hadoop中負責分散式文件系統的。HDFS採用master/slave架構。一個HDFS集群是由一個Namenode和一定數目的Datanodes組成。Namenode是一個中心伺服器,負責管理文件系統的命名空間以及文件的訪問控制。集群中的Datanode一般是一個設備上部署一個,負責管理它所在節點上的存儲。HDFS暴露了文件系統的命名空間,用戶能夠以文件的形式在上面存儲數據。實際上,一個文件會被分成一個或多個數據塊,這些塊存儲在一組Datanode上。Namenode執行文件系統的命名空間操作,比如打開、關閉、重命名文件或目錄。它也負責確定數據塊到具體Datanode設備的映射。Datanode負責處理文件系統客戶端的讀寫請求。在Namenode的統一調度下進行數據塊的創建、刪除和複製。為了保證文件系統的高可靠,往往需要另一個Standby的Namenode在Actived Namenode出現問題後,立刻接管文件系統。
網路上有很多關於hdfs的安裝配置手冊,本文就不再複述。只提供一個以前項目中應用過的部署架構僅供大家參考。
這個高可用的HDFS架構是由3台zookeeper設備、2台域名服務(DNS)和時間服務(NTP)設備、2台Namenode設備(如果必要Standby可以更多)、一個共享存儲設備(NFS)和N個DataNode組成。Zookeeper負責接受NameNode的心跳,當Actived namenode不向zookeeper報告心跳時,Standby Namenode的監控進程會收到這個消息,從而激活Standby NameNode並接管Active NameNode的工作。
NFS負責為2個NameNode存儲EditLog文件,(NameNode 在執行 HDFS 客戶端提交的創建文件或者移動文件這樣的寫操作時,會首先把這些操作記錄在 EditLog 文件之中,然後再更新內存中的文件系統鏡像,最終再刷新到磁碟。 EditLog 只是在數據恢復的時候起作用。記錄在 EditLog 之中的每一個操作又稱為一個事務,每個事務有一個整數形式的事務 id 作為編號。EditLog 會被切割為很多段,每一段稱為一個 Segment)當發生NameNode切換的情況時,Standby NameNode接管後,會根據EditLog中把未完成的寫操作繼續下去並開使向EditLog寫入新的寫操作記錄。(此外,hadoop還提供了另一種QJM的EditLog方案)
DNS&NTP分布負責整個系統的(包括客戶端)域名服務和時間服務。這個在集群部署中是非常有必要的兩個存在。首先說一下DNS的必要性,一、Hadoop是極力提倡用機器名作為在HDFS環境中的標識。二、當然可以在/etc/hosts文件中去標明機器名和IP的映射關係,可是請想想如果在一個數千台設備的集群中添加一個設備時,負責系統維護的夥伴會不會罵集群的設計者呢?其次是NTP的必要性,在剛剛開始接觸Hadoop集群時我遇到的大概90%的問題是由於各個設備時間不一致導致的。各個設備的時間同步是數據一致性和管理一致性的一個基本保障。
MapReduce
MapReduce是一個使用簡單的軟體框架,基於它寫出來的應用程序能夠運行在由上千個商用機器組成的大型集群上,並以一種可靠容錯的方式並行處理上T級別的數據集。
一個MapReduce 作業(job)通常會把輸入的數據集切分為若干獨立的數據塊,由 map任務(task)以完全並行的方式處理它們。框架會對map的輸出先進行排序, 然後把結果輸入給reduce任務。通常作業的輸入和輸出都會被存儲在文件系統中。 整個框架負責任務的調度和監控,以及重新執行已經失敗的任務。
通常,MapReduce框架和HDFS是運行在一相同的設備集群上的,也就是說,計算設備和存儲設備通常在一起。這種配置允許框架在那些已經存好數據的設備上高效地調度任務,這可以使整個集群的網路帶寬被非常高效地利用。
MapReduce框架由一個單獨的master JobTracker 和每個集群設備一個slave TaskTracker共同組成。master負責調度構成一個作業的所有任務,這些任務分布在不同的slave上,master監控它們的執行,重新執行已經失敗的任務。而slave僅負責執行由master指派的任務。
用戶編寫的MapReduce應用程序應該指明輸入/輸出的文件位置(路徑),並通過實現合適的介面或抽象類提供map和reduce函數。再加上其他作業的參數,就構成了作業配置(job configuration)。然後,job client提交作業(jar包/可執行程序等)和配置信息給JobTracker,後者負責分發這些軟體和配置信息給slave、調度任務並監控它們的執行,同時提供狀態和診斷信息給job-client。
在抽象的層面上MapReduce是由兩個函數Map和Reduce組成的。簡單來說,一個Map函數就是對一些獨立元素組成的概念上的列表的每一個元素進行指定的操作。事實上,每個元素都是被獨立操作的,而原始列表沒有被更改,因為這裡創建了一個新的列表來保存操作結果。這就是說,Map操作是可以高度並行的。而Reduce函數指的是對Map函數的結果(中間經過洗牌的過程,會把map的結果進行分組)分組後多個列表的元素進行適當的歸併。注意:雖然Hadoop框架是用JavaTM實現的,但MapReduce應用程序則不一定要用 Java來寫 。至少Scala是可以寫的喲。
附上Scala實現的計算詞頻的Scala源碼
import java.io.IOExceptionnimport java.util.StringTokenizernnimport org.apache.hadoop.conf.Configurationnimport org.apache.hadoop.fs.Pathnimport org.apache.hadoop.io.{IntWritable, Text}nimport org.apache.hadoop.mapreduce.lib.input.FileInputFormatnimport org.apache.hadoop.mapreduce.lib.output.FileOutputFormatnimport org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}nnimport scala.collection.JavaConversionsnn/**n * Created by Stanley on 2017/3/14.n */nobject WordCount {ndef main(args: Array[String]): Unit = {nval job = new Job(new Configuration(), "WordCount")n job.setJarByClass(classOf[WordMapper]);n job.setMapperClass(classOf[WordMapper]);n job.setCombinerClass(classOf[WordReducer]);n job.setReducerClass(classOf[WordReducer]);n job.setOutputKeyClass(classOf[Text]);n job.setOutputValueClass(classOf[IntWritable]);n job.setNumReduceTasks(1)n FileInputFormat.addInputPath(job, new Path(args(0)));n FileOutputFormat.setOutputPath(job, new Path(args(1)));n System.exit(job.waitForCompletion(true) match { case true => 0n case false => 1n });n }n}nnclass WordMapper extends Mapper[Object, Text, Text, IntWritable] {nval one = new IntWritable(1)nn@throws[IOException]n @throws[InterruptedException]n override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context) = {nval stringTokenizer = new StringTokenizer(value.toString());nwhile (stringTokenizer.hasMoreTokens()) {n context.write(new Text(stringTokenizer.nextToken()), one);n }n }n}nnclass WordReducer extends Reducer[Text, IntWritable, Text, IntWritable] {n@throws[IOException]n @throws[InterruptedException]n override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {nimport JavaConversions.iterableAsScalaIterablen context.write(key, new IntWritable(values.map(x=>x.get()).reduce(_+_)));n }n}n
Yarn
YARN(Yet Another Resource Negotiator)是Hadoopd的設備資源管理器,它是一個通用資源管理系統,MapReduce和其他上層應用提供統一的資源管理和調度,它為集群在利用率、資源統一管理和數據共享等方面提供了巨大的幫助。
Yarn由ResourceManager、NodeManager、ApplicationMaster和Containe四個概念構成。
ResourceManager是一個全局的資源管理器,負責整個系統的資源管理和分配。它主要由兩個組件構成:調度器(Scheduler)和應用程序管理器(Applications Manager)。調度器根據容量、隊列等限制條件,將系統中的資源分配給各個正在運行的MapReduce程序。應用程序管理器負責管理整個系統中所有MapReduce程序,包括提交、與調度器協商資源以啟動ApplicationMaster、監控ApplicationMaster運行狀態並在失敗時重新啟動它等。
用戶提交的每個MapReduce程序均包含一個ApplicationMaster,主要功能包括:與ResourceManager調度器協商以獲取資源(用Container表示);將得到的任務進一步分配給內部的任務(資源的二次分配);與NodeManager通信以啟動/停止任務;監控所有任務運行狀態,並在任務運行失敗時重新為任務申請資源以重啟任務。
NodeManager是每個設備上的資源和任務管理器,一方面,它會定時地向ResourceManager彙報本設備上的資源使用情況和各個Container的運行狀態;另一方面,它接收並處理來自ApplicationMaster的Container啟動/停止等各種請求。
Container是YARN中的資源抽象,它封裝了某個設備上的多維度資源,如內存、CPU、磁碟、網路等,當AM向RM申請資源時,RM為AM返回的資源便是用Container表示。
結語
本文走馬觀花的介紹了Hadoop相關內容。文章的主要目的是給大家一個對大數據的分散式解決方案的感官印象,為以後的大數據相關文章提供一個基礎的理解。最後要強調的是,思考大數據方向的問題是一定要記住分散式的概念,因為你的數據並不在一個設備中甚至不再一個集群中,而且計算也是分布的。所以在設計大數據應用程序時,要花時間思考程序和演算法在單機應用和分散式應用所產生的不同(e.g. 加權平均值)。
推薦閱讀:
※嫌棄Hadoop?可能是你的打開方式有問題
※數據分析利器之hive優化十大原則
※既然Spark比Hadoop性能好很多,Hadoop未來發展方向是什麼?
※大數據2016年大事記
※大數據實驗手冊怎麼自己搭環境?