Azkaban二次開發3-Hadoop任務提交方式改造

一、需求說明

原生的Azkaban系統提交HadoopMR任務的時候,是通過在run方法中編寫以下代碼進行任務的提交

public void run() throws Exception {nttnttlogger.info(String.format("Starting %s by yannhuang", new Object[] { getClass().getSimpleName() }));nnttJobConf jobconf = getJobConf();nttjobconf.setJarByClass(WordCount.class);nnttjobconf.setOutputKeyClass(Text.class);nttjobconf.setOutputValueClass(IntWritable.class);nnttjobconf.setMapperClass(WordCountMap.class);nttjobconf.setReducerClass(WordCountReduce.class);nnttjobconf.setInputFormat(TextInputFormat.class);nttjobconf.setOutputFormat(TextOutputFormat.class);nnttFileInputFormat.addInputPath(jobconf, new Path(this.inputPath));nttFileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));nnttif (this.forceOutputOverrite) {ntttFileSystem fs = FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);ntttfs.delete(FileOutputFormat.getOutputPath(jobconf), true);ntt}nttnttsuper.run();nt}n

能不能簡化這種重複的工作呢?根據《Azkaban源碼分析2》中對azkaban-plugins模塊的代碼分析,可以把這塊代碼遷移到plugins代碼中,以配置的方式運行HadoopJob任務。

二、開發目標

運行一個azkaban節點,只需要在該節點的job配置文件中配置以下信息

mapred.output.key.class=org.apache.hadoop.io.Textnmapred.output.value.class=org.apache.hadoop.io.IntWritablenmapred.mapper.class=com.dataeye.mr.maper.WordCountMapnmapred.reducer.class=com.dataeye.mr.reducer.WordCountReducenmapred.input.format.class=org.apache.hadoop.mapred.TextInputFormatnmapred.output.format.class=org.apache.hadoop.mapred.TextOutputFormatn

這樣MR運行時需要的參數都是以配置的方式指定,不再需要編寫自己的類繼承AbstractHadoopJob抽象類了。

三、開發步驟

1、編寫GeneralHadoopJob類,內容其實就是原生azkaban要求的主類,繼承AbstractHadoopJob並提供run方法

package azkaban.jobtype.extend;nnimport azkaban.jobtype.javautils.AbstractHadoopJob;nimport azkaban.utils.Props;nimport org.apache.hadoop.fs.FileSystem;nimport org.apache.hadoop.fs.Path;nimport org.apache.hadoop.mapred.*;nimport org.apache.log4j.Logger;nnimport java.util.HashSet;nimport java.util.Map;nimport java.util.Set;nn/**n * 通用的hadoop job,通過用戶提供的參數配置來構造具體的map-reduce任務,n * 並進行提交和狀態監控n * <p>n * Created by shelocks on 16/6/21.n */npublic class GeneralHadoopJob extends AbstractHadoopJob {n private static final Logger logger = Logger.getLogger(GeneralHadoopJob.class);nn private static Set<String> MARKED_KEY = new HashSet<String>();nn static {n MARKED_KEY.add("job.class");n MARKED_KEY.add("mapred.output.key.class");n MARKED_KEY.add("mapred.output.value.class");n MARKED_KEY.add("mapred.mapper.class");n MARKED_KEY.add("mapred.reducer.class");n MARKED_KEY.add("mapred.input.format.class");n MARKED_KEY.add("mapred.output.format.class");n MARKED_KEY.add("input.path");n MARKED_KEY.add("output.path");n MARKED_KEY.add("force.output.overwrite");n }nn public GeneralHadoopJob(String name, Props props) {n super(name, props);n }nn public void run() throws Exception {n Props props = this.getProps();n //set up confn JobConf jobConf = getJobConf();nn jobConf.setJarByClass(props.getClass("job.class"));nn jobConf.setOutputKeyClass(props.getClass("mapred.output.key.class"));nn jobConf.setOutputValueClass(props.getClass("mapred.output.value.class"));nn jobConf.setMapperClass((Class<? extends Mapper>) props.getClass("mapred.mapper.class"));n logger.info("Mapper class:" + props.getClass("mapred.mapper.class"));n jobConf.setReducerClass((Class<? extends Reducer>) props.getClass("mapred.reducer.class"));n logger.info("Reducer class:" + props.getClass("mapred.reducer.class"));nn jobConf.setInputFormat((Class<? extends InputFormat>) props.getClass("mapred.input.format.class"));n jobConf.setOutputFormat((Class<? extends OutputFormat>) props.getClass("mapred.output.format.class"));nn FileInputFormat.addInputPath(jobConf, new Path(props.getString("input.path")));n logger.info("Input path:" + props.getString("input.path"));n FileOutputFormat.setOutputPath(jobConf, new Path(props.get("output.path")));n logger.info("Output path:" + props.getString("output.path"));nn if (props.getBoolean("force.output.overwrite")) {n FileSystem fs =n FileOutputFormat.getOutputPath(jobConf).getFileSystem(jobConf);n fs.delete(FileOutputFormat.getOutputPath(jobConf), true);n }nn //set other map-reduce job config parametersn for (Map.Entry<Object, Object> entry : props.toProperties().entrySet()) {n if (!MARKED_KEY.contains(entry.getKey())) {n jobConf.set((String) entry.getKey(), (String) entry.getValue());n }n }nn super.run();n }n}n

以上代碼把MR任務的conf參數以配置的方式獨立出來了

2、在MR節點的配置文件中加入自定義配置項

job.extend=true

3、修改HadoopJavaJobRunnerMain類的getJobClass方法,加入配置項判斷

private String getJobClass(Properties props) {n if(props.getProperty("job.extend","true").equals("true")){n return GeneralHadoopJob.class.getCanonicalName();n }n return props.getProperty(JOB_CLASS);n }n

這樣用戶只需要在job文件里配置job.extend=true,就不需要自己編寫MR主類了。如果用戶不需要這個功能,只需要配置job.extend=false即可 。

推薦閱讀:

SparkSQL中的Sort實現(二)
5分鐘 Hadoop Shuffle 優化
嫌棄Hadoop?可能是你的打開方式有問題
大數據那些事(8):HIVE之初期起
大數據那些事(28):卡夫卡們的故事

TAG:任务系统 | Hadoop |