hadoop分散式緩存

hadoop分散式緩存

來自專欄 大象跳舞

版權申明:轉載請註明出處。

文章來源:大數據隨筆

1.前言

DistributedCache是hadoop框架提供的一種機制,可以將job指定的文件,在job執行前,先行分發到task執行的機器上,並有相關機制對cache文件進行管理。

DistributedCache 可將具體應用相關的、大尺寸的、只讀的文件有效地分布放置。DistributedCache 是Map/Reduce框架提供的功能,能夠緩存應用程序所需的文件 (包括文本,檔案文件,jar文件等)。

Map-Redcue框架在作業所有任務執行之前會把必要的文件拷貝到slave節點上。 它運行高效是因為每個作業的文件只拷貝一次並且為那些沒有文檔的slave節點緩存文檔。

DistributedCache 根據緩存文檔修改的時間戳進行追蹤。 在作業執行期間,當前應用程序或者外部程序不能修改緩存文件。

distributedCache可以分發簡單的只讀數據或文本文件,也可以分發複雜類型的文件例如歸檔文件和jar文件。歸檔文件(zip,tar,tgz和tar.gz文件)在slave節點上會被解檔(un-archived)。 這些文件可以設置執行許可權。

用戶可以通過設置mapred.cache.{files|archives}來分發文件。 如果要分發多個文件,可以使用逗號分隔文件所在路徑。

DistributedCache可在map/reduce任務中作為 一種基礎軟體分發機制使用。它可以被用於分發jar包和本地庫(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能夠被用於 緩存文件和jar包,並把它們加入子jvm的classpath。也可以通過設置配置文檔里的屬性 mapred.job.classpath.{files|archives}達到相同的效果。緩存文件可用於分發和裝載本地庫。

添加緩存文件:

DistributedCache.addCacheFile(URI,conf)DistributedCache.addCacheArchive(URI,conf) DistributedCache.setCacheFiles(URIs,conf)DistributedCache.setCacheArchives(URIs,conf)其中URI的形式是 hdfs://host:port/absolute-path#link-name

緩存Jar:

DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能夠被用於 緩存文件和jar包,並把它們加入子jvm的classpath。

DistributedCache.createSymlink(Configuration)方法讓DistributedCache 在當前工作目錄下創建到緩存文件的符號鏈接。或者通過設置配置文件屬性mapred.create.symlink為yes。 分散式緩存會截取URI的片段作為鏈接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so,則在task當前工作目錄會有名為lib.so的鏈接,它會鏈接分散式緩存中的lib.so.1。

2.常見應用場景

(1)分發第三方庫(jar,so等);

(2)分發演算法需要的詞典文件;

(3)分發程序運行需要的配置;

(4)分發多表數據join時小表數據簡便處理等

3.注意事項

1.DistributedCache只能應用於分散式的情況,包括偽分散式,完全分散式.有些api在這2種情況下有移植性問題.

2.需要分發的文件,必須提前放到hdfs上.默認的路徑前綴是hdfs://的,不是file://

3.需要分發的文件,最好在運行期間是只讀的.

4.不建議分發較大的文件,比如壓縮文件,可能會影響task的啟動速度.

5.注意在Driver中創建Job實例時一定要把Configuration類型的參數傳遞進去,否則在Mapper或Reducer中調用DistributedCache.getLocalCacheFiles(conf);返回值就為null。因為空構造函數的Job採用的Configuration是從hadoop的配置文件中讀出來的(使用new Configuration()創建的Configuration就是從hadoop的配置文件中讀出來的),請注意在main()函數中有一句:DistributedCache.addCacheFile(dataFile.toUri(), conf);即此時的Configuration中多了一個DistributedCacheFile,所以你需要把這個Configuration傳遞給Job構造函數,如果傳遞默認的Configuration,那在Job中當然不知道DistributedCacheFile的存在了。

4.基本流程

1.每個tasktracker啟動時,都會產生一個TrackerDistributedCacheManager對象,用來管理該tt機器上所有的task的cache文件.

2.在客戶端提交job時,在JobClient內,對即將cache的文件,進行校驗

以確定文件是否存在,文件的大小,文件的修改時間,以及文件的許可權是否是private or public.

3.當task在tt初始化job時,會由TrackerDistributedCacheManager產生一個TaskDistributedCacheManager對象,來管理本task的cache文件.

4.和本task相關聯的TaskDistributedCacheManager,獲取並解壓相關cache文件到本地相應目錄如果本tt機器上已經有了本job的其他task,並已經完成了相應cache文件的獲取和解壓工作,則不會重複進行。如果本地已經有了cache文件,則比較修改時間和hdfs上的文件是否一致,如果一致則可以使用.

5.當task結束時,會對該cache進行ref減一操作.

6.TrackerDistributedCacheManager有一個clearup線程,每隔1min會去處理那些無人使用的,目錄大小大於local.cache.size或者子目錄個數大於mapreduce.tasktracker.cache.local.numberdirectories的cache目錄.

5.應用實例

public class MapJoinByCache { public static class MapJoiner extends Mapper<LongWritable,Text,Text,Text> { static Map<String,String> movies=new HashMap<String,String>(); public void setup(Context context) { try { FileReader reader = new FileReader("movies.dat"); BufferedReader br = new BufferedReader(reader); String s1 = null; while ((s1 = br.readLine()) != null) { System.out.println(s1); String[] splits= s1.split("::"); String movieId=splits[0]; String movieName =splits[1]; movies.put(movieId, movieName); } br.close(); reader.close(); } catch (Exception e) { e.printStackTrace(); } } private Text outKey=new Text(); private Text outVal=new Text(); public void map(LongWritable key,Text value,Context context)throws IOException, InterruptedException { if(value!=null||value.toString()!=null) { String[] splits = value.toString().split("::"); String movieId =splits[1]; String movieName= movies.get(movieId); outKey.set(movieId); outVal.set(movieName+"::"+value.toString()); context.write(outKey, outVal); } } } public static class DirectReducer extends Reducer<Text,Text,NullWritable,Text> { NullWritable outKey=NullWritable.get(); public void reduce(Text key,Iterable<Text> values,Context context)throws IOException, InterruptedException { for(Text value :values) {context.write(outKey, value); } } } public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException, ClassNotFoundException { Configuration conf =new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); DistributedCache.createSymlink(conf); DistributedCache.addCacheFile(new URI("hdfs://mylinux:9000/data/exam/movie/movies.dat#movies.dat"), conf); Job job=new Job(conf); job.setJobName("Join on Map Side"); job.setJarByClass(MapJoinByCache.class); job.setMapperClass(MapJoiner.class); job.setReducerClass(DirectReducer.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }

更多文章請關注微信公眾號:bigdataer


推薦閱讀:

從頭學習大數據培訓課程 Hadoop 系列教程(七)yarn 原理以及日常使用
如何評價小米團隊擁有4個hbase committer?
MapReduce和Spark主要解決哪些方面的問題?
Azkaban入門篇
大數據那些事(31):母親Stratosphere

TAG:分散式系統 | Hadoop |