Azkaban任務流編寫

在Azkaban中,一個project包含一個或多個flows,一個flow包含多個job。job是你想在azkaban中運行的一個進程,可以是Command,也可以是一個Hadoop任務。當然,如果你安裝相關插件,也可以運行插件。一個job可以依賴於另一個job,這種多個job和它們的依賴組成的圖表叫做flow。本文介紹如何在Azkaban上編寫四類任務流:Command、Hive、Java、Hadoop。

1、Command任務編寫

這裡將模擬一個數據從採集到上傳最後入庫的整個過程的工作流。涉及的job如下:

create_dir.job:創建對應的目錄

get_data1.job:獲取數據1

get_data2.job:獲取數據2

upload_to_hdfs.job:數據上傳到hdfs

insert_to_hive.job:從hdfs入庫到hive中

  • create_dir.job

  • type=commandncommand=echo "create directory before get data"n

  • get_data1.job

    type=commandncommand=echo "get data from logserver"ndependencies=create_dirn

  • get_data2.job

    type=commandncommand=echo "get data from ftp"ndependencies=create_dirn

  • upload_to_hdfs.job

    type=commandncommand=echo "upload to hdfs"ndependencies=get_data1,get_data2n

完成後的目錄如下

打包成demo.zip壓縮包,並上傳到Azkaban中,可以看到依賴圖如下:

點擊執行

在Job List里可以看到每個job的運行情況

點擊Details可以看到每個job執行的日誌

Job中的其他配置選項

  • 可以定義job依賴另一個flow

    type=flownflow.name=fisrt_flown

  • 可以配置多個command命令

    type=commandncommand=echo "hello"ncommand.1=echo "world"ncommand.2=echo "azkaban"n

  • 可以配置job失敗重啟次數,及間隔時間,比如,上述ftp獲取日誌,我可以配置重試12次,每隔5分鐘一次

    type=commandncommand=echo "retry test" nretries=12n#單位毫秒nretry.backoff=300000n

2、Hive任務編寫

Hive任務的編寫比較簡單,在新的目錄下新建hive.job文件,內容如下

#定義類型ntype=hiven#定義執行HiveSQL的用戶nuser.to.proxy=azkabann#固定值nazk.hive.action=execute.querynhive.query.01=drop table words;nhive.query.02=create table words (freq int, word string) row format delimited fields terminated by t stored as textfile;nhive.query.03=describe words;nhive.query.04=load data local inpath "res/input" into table words;nhive.query.05=select * from words limit 10;nhive.query.06=select freq, count(1) as f2 from words group by freq sort by f2 desc limit 10;n

以上第四條語句涉及到數據文件,需要在同級目錄下新建res文件夾,然後新建input文件,內容如下

11tandn10tthen9tton9tinn9tofn9tisn9tCLAUDIUSn8tKINGn8tthisn8twen7twhatn7tusn7tGUILDENSTERNn6tAndn5tdn4tROSENCRANTZn3tan2thisn1tQUEENn1then

然後打包成zip文件即可上傳到azkaban中運行

3、Java任務編寫

Java任務比較簡單,只需要在類里提供一個run方法即可,如果需要設定參數,著在構造方法中指定Props類,然後在job文件里配置好參數。

Java類如下

package com.dataeye.java;nnimport org.apache.log4j.Logger;nnimport azkaban.utils.Props;nnpublic class JavaMain {nntprivate static final Logger logger = Logger.getLogger(JavaMain.class);nntprivate final int fileRows;ntprivate final int fileLine;ntntpublic JavaMain(String name, Props props) throws Exception {nttthis.fileRows = props.getInt("file.rows");nttthis.fileLine = props.getInt("file.line");nt}ntntpublic void run() throws Exception {nttlogger.info(" ### this is JavaMain method ###");nttlogger.info("fileRows value is ==> " + fileRows);nttlogger.info("fileLine value is ==> " + fileLine);nt}ntn}n

java.job文件如下

type=javan#指定類的全路徑njob.class=com.dataeye.java.JavaMainn#指定執行jar包的路徑nclasspath=lib/*n#用戶參數1nfile.rows=10n#用戶參數2nfile.line=50n

新建目錄,把java.job拷貝到該目錄下,然後新建lib文件夾,把以上java類打包成jar文件,放入lib目錄下,打包成zip文件,上傳到azkaban中。執行成功後的日誌如下

31-08-2016 14:41:15 CST simple INFO - INFO Running job simplen31-08-2016 14:41:15 CST simple INFO - INFO Class name com.dataeye.java.JavaMainn31-08-2016 14:41:15 CST simple INFO - INFO Constructor found public com.dataeye.java.JavaMain(java.lang.String,azkaban.utils.Props) throws java.lang.Exceptionn31-08-2016 14:41:15 CST simple INFO - INFO Invoking method runn31-08-2016 14:41:15 CST simple INFO - INFO Proxy check failed, not proxying run.n31-08-2016 14:41:15 CST simple INFO - INFO ### this is JavaMain method ###n31-08-2016 14:41:15 CST simple INFO - INFO fileRows value is ==> 10n31-08-2016 14:41:15 CST simple INFO - INFO fileLine value is ==> 50n31-08-2016 14:41:15 CST simple INFO - INFO Apparently there isnt a method[getJobGeneratedProperties] on object[com.dataeye.java.JavaMain@591f989e], using empty Props object instead.n31-08-2016 14:41:15 CST simple INFO - INFO Outputting generated properties to /home/hadoop/azkaban/azkaban-solo-server-3.0.0/executions/339/simple_output_6034902760752438337_tmpn31-08-2016 14:41:15 CST simple INFO - Process completed successfully in 0 seconds.n31-08-2016 14:41:15 CST simple INFO - Finishing job simple attempt: 0 at 1472625675501 with status SUCCEEDEDn

日誌中已經列印出run方法中的參數值。

4、Hadoop任務編寫

Hadoop相對以上三種類型會複雜一些,需要注意的地方如下

  • 必須繼承 AbstractHadoopJob 類

    public class WordCount extends AbstractHadoopJobn

  • 必須要有構造方法,參數是String和Props,且要調用super方法

    public WordCount(String name, Props props) {ntsuper(name, props);nt//other codetn}n

  • 必須提供run方法,且在run方法的最後調用super.run();

    public void run() throws Exception{n//other codensuper.run();}n

下面提供一個 WordCount 任務的例子

WordCount.java類

package com.dataeye.mr;nnimport org.apache.hadoop.fs.FileSystem;nimport org.apache.hadoop.fs.Path;nimport org.apache.hadoop.io.IntWritable;nimport org.apache.hadoop.io.Text;nimport org.apache.hadoop.mapred.FileInputFormat;nimport org.apache.hadoop.mapred.FileOutputFormat;nimport org.apache.hadoop.mapred.JobConf;nimport org.apache.hadoop.mapred.TextInputFormat;nimport org.apache.hadoop.mapred.TextOutputFormat;nimport org.apache.log4j.Logger;nnimport azkaban.jobtype.javautils.AbstractHadoopJob;nimport azkaban.utils.Props;nnimport com.dataeye.mr.maper.WordCountMap;nimport com.dataeye.mr.reducer.WordCountReduce;nnpublic class WordCount extends AbstractHadoopJob {nntprivate static final Logger logger = Logger.getLogger(WordCount.class);nntprivate final String inputPath;ntprivate final String outputPath;ntprivate boolean forceOutputOverrite;nntpublic WordCount(String name, Props props) {nttsuper(name, props);nttthis.inputPath = props.getString("input.path");nttthis.outputPath = props.getString("output.path");nttthis.forceOutputOverrite = props.getBoolean("force.output.overwrite", false);nt}nntpublic void run() throws Exception {nttnttlogger.info(String.format("Hadoop job, class is %s", new Object[] { getClass().getSimpleName() }));nnttJobConf jobconf = getJobConf();nttjobconf.setJarByClass(WordCount.class);nnttjobconf.setOutputKeyClass(Text.class);nttjobconf.setOutputValueClass(IntWritable.class);nnttjobconf.setMapperClass(WordCountMap.class);nttjobconf.setReducerClass(WordCountReduce.class);nnttjobconf.setInputFormat(TextInputFormat.class);nttjobconf.setOutputFormat(TextOutputFormat.class);nnttFileInputFormat.addInputPath(jobconf, new Path(this.inputPath));nttFileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));nnttif (this.forceOutputOverrite) {ntttFileSystem fs = FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);ntttfs.delete(FileOutputFormat.getOutputPath(jobconf), true);ntt}nttnttsuper.run();nt}nn}n

WordCountMap.java類

package com.dataeye.mr.maper;nnimport java.io.IOException;nimport java.util.StringTokenizer;nnimport org.apache.hadoop.io.IntWritable;nimport org.apache.hadoop.io.LongWritable;nimport org.apache.hadoop.io.Text;nimport org.apache.hadoop.mapred.MapReduceBase;nimport org.apache.hadoop.mapred.Mapper;nimport org.apache.hadoop.mapred.OutputCollector;nimport org.apache.hadoop.mapred.Reporter;nnpublic class WordCountMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {ntprivate static final IntWritable one = new IntWritable(1);ntprivate Text word = new Text();nntprivate long numRecords = 0L;nntpublic void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {nttString line = value.toString();nttStringTokenizer tokenizer = new StringTokenizer(line);nttwhile (tokenizer.hasMoreTokens()) {ntttthis.word.set(tokenizer.nextToken());ntttoutput.collect(this.word, one);ntttreporter.incrCounter(Counters.INPUT_WORDS, 1L);ntt}nnttif (++this.numRecords % 100L == 0L)ntttreporter.setStatus("Finished processing " + this.numRecords + " records " + "from the input file");nt}nntstatic enum Counters {nttINPUT_WORDS;nt}n}n

WordCountReduce.java類

package com.dataeye.mr.reducer;nnimport java.io.IOException;nimport java.util.Iterator;nnimport org.apache.hadoop.io.IntWritable;nimport org.apache.hadoop.io.Text;nimport org.apache.hadoop.mapred.MapReduceBase;nimport org.apache.hadoop.mapred.OutputCollector;nimport org.apache.hadoop.mapred.Reducer;nimport org.apache.hadoop.mapred.Reporter;nnpublic class WordCountReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {ntpublic void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {nttint sum = 0;nttwhile (values.hasNext()) {ntttsum += ((IntWritable) values.next()).get();ntt}nttoutput.collect(key, new IntWritable(sum));nt}n}n

以下是 wc.job 配置文件

type=hadoopJavannjob.class=com.dataeye.mr.WordCountnnclasspath=lib/*nnforce.output.overwrite=truenninput.path=/tmp/azkaban/wordcountjavainnnoutput.path=/tmp/azkaban/wordcountjavaoutn

注意/tmp/azkaban/wordcountjavain文件是必須先存在hdfs中的。

新增目錄,把wc.job文件拷貝到該目錄下,然後新增lib目錄,把以上java代碼打包成jar文件。最後壓縮成zip文件,上傳到azkaban上執行即可。

以上介紹了四類常用的azkaban任務的編寫過程。其他任務類型可以參考Azkaban官網:Azkaban 3.0 Documentation

推薦閱讀:

Apache kylin進階——Slow Query SQL改造篇
大數據那些事(30):Presto之坑和蘿蔔傻子和騙子的故事
Azkaban二次開發3-Hadoop任務提交方式改造
SparkSQL中的Sort實現(二)
5分鐘 Hadoop Shuffle 優化

TAG:Hadoop | 任务系统 |