標籤:

Apache Flume 簡介

【遷移】Apache Flume 簡介

Flume 是 Cloudera 公司開源出來的一套日誌收集系統,早期版本依賴 ZooKeeper,現在的 FumeNG 去掉了這個依賴,我沒用過之前的版本,想來失去整個日誌收集系統的全局視圖是挺可惜的,但 FlumeNG 上手以及使用挺簡單,搭配監測系統也能用的不賴,有利有弊了:-)

下圖展示了一種常見的 Flume 使用場景,伺服器上發送事件給本地的 Flume agent 或者讓本地 Flume agent 去 tail -f 日誌文件,日誌被發送給同一個數據中心裡的多個下游 Flume agent,這些下游 Flume agent 將數據寫到 HDFS,同時在本地磁碟留一個短期副本以供調試。

Flume 的配置文件挺易懂的, 官方文檔 有很詳細的描述,從結構上講分成兩部分,聲明一個 Flume agent 會運行哪些 source、channel、sink,然後是配置各個 source、channel、sink 的屬性以及互相的連接關係。

  • source:日誌來源,可以調用外部命令比如 tail -f,也可以監聽埠,接收 avro、thrift、文本行格式的日誌。source 和 channel 是多對多的關係,source 往 channel 里寫數據可以是 replicating(默認)或者 multiplexing 方式,比如上圖裡 log collector 里的 source 就是複製了兩份日誌寫到兩個 channel 里。

  • channel:其實個人覺得叫 queue 更合適,免得跟 sink 的用途混淆。channel 用來做日誌緩存以及日誌分發,粗略來說channel 和 sink 是一對多的關係,channel 傳數據到 sink 可以有 default, failover 和 load_balance 三種方式,文檔里這個地方既叫 sink processor 又叫 sink group,個人覺得 sink group 理解起來更容易,channel 實際是發送數據到 sink group,所以是 channel 和 sink group 一對一(這大概就是為什麼sink group 又叫 sink processor,指如何把事件從 channel 里轉到 sinks 這個處理過程),sink group 和 sink 是一對多。default 方式下一個 sink group 只允許有一個 sink;failover 指總是先寫優先順序高的 sink,失敗的話寫優先順序次高的 sink;load_balance 就容易理解了,輪流寫 sink。
  • sink:處理日誌寫出,注意一個 sink 只能寫一個地方,比如本地文件或者某單個遠程主機的網路埠,並不是 sink 來做 load balance,所以上圖中針對每個 log collector,log emitter 那裡都要各配置一個 sink。Flume 標配的 sink 挺多,local fs, hdfs, hbase, solr, elasticsearch, avro, thrift 等,難能可貴的是 hdfs 和 hbase sink 都支持 Kerberos 認證,真不愧是 Cloudera 家做的東西,跟 Hadoop 集成就是好。

Flume agent 的一個進程里可以包含多個 source、channel、sink,這些元素之間組成的 flow 可以互相沒關係,比如一套 source-channel-sink 收集 access.log,一套 source-channel-sink 收集 error.log,兩者沒有數據交互。同一台機器上也可以運行多個 flume agent 進程。注意同一個 agent 進程里 memory channel 里的 event 是共享的,但是 Flume 在估算內存消耗時不考慮共享這個事情。

Flume agent 進程會每隔三十秒檢測配置文件,如果修改了會重新載入,所以雖然沒有 ZooKeeper 集中管理配置信息,但利用 Puppet/Chef + Nagios/Ganglia 之類幫忙也不是太大問題。

Flume 沒有像 Scribe 那樣直接支持 category,而是允許給 event 添加 header,在 multiplexing channel selector 里可以按照 event header 映射到不同 channel,這樣就可以在整個 flow 的末端把日誌切分開來。如果使用 hdfs sink 的話,hdfs 文件名可以插入 event header 的值,所以不必用 multiplexing channel selector 即可達到按 category 切分日誌的效果。

Flume 的設計還是挺靈活挺簡單的,我小測試了下,穩定性不錯,但是性能不怎麼樣(可能我測試不規範),尤其是用 file channel 的時候,Flume 把事件緩存在 JVM 里,這個設計沒有 Kafka 高明以及高效。另一個擔心是它沒有像 Kafka 那樣把 replication 作為一個核心設計,需要使用者去 event flow 的各個環節顯式配置,比如每個 log collector 加一個 memory channel 和一個 avro sink 寫到另一個 log collector 去,這個過程沒有 ZooKeeper 的幫助,實際是沒有實用價值的。如果項目時間允許,我覺得在 Kafka 基礎上構建 sink 是個更高效、方便且可靠的日誌收集方案,如同 LinkedIn 的 data pipeline 架構那樣。

下面是一段 Perl 腳本,用於生成 Flume 配置,不直接手工配置的原因是很多地方的 source、channel、sink 配置是基本一樣的,手工維護有點累。下面那行 tail 腳本有點長,顯示不完整,應該是

tail -F -n 0 --pid `ps -o ppid= $$` $log_file | sed -e "s/^/host=`hostname --fqdn` category=$category:/"

話說 Scribe、Flume 這些二貨為啥不直接提供 tail -f 的功能。。。。

#!/usr/bin/perl
#
# Emitter:
# Server -> access.log -> tail -F -> Flume exec source(access) ->
# Flume file channel(c1) -> Flume Avro Sinks with load balancing sink processor(g1: s1 s2 s3)
#
# Collector:
# Flume Avro source with replicating channel selects(source1) ->
# Flume memory channel(file1) -> Flume file roll sink(file1)
# Flume memory channel(hdfs1) -> Flume HDFS sink(hdfs2)
# Flume memory channel(hdfs2) -> Flume HDFS sink(hdfs2), in another data center
# Flume memory channel(hbase1) -> Flume HBase sink(hbase1), the standard HBase sink uses
# hbase-site.xml to get server address, so cant use two HBase sinks
# except starting another Flume agent process.

use strict;
use warnings;
use Getopt::Long;

my %g_log_files = (
"access" => [ qw( /tmp/access.log ) ],
);
my @g_collector_hosts = qw( collector1 collector2 collector3 );
my $g_emitter_avro_port = 3000;
my $g_emitter_thrift_port = 3001;
my $g_emitter_nc_port = 3002;
my $g_collector_avro_port = 3000;
my $g_flume_work_dir = "/tmp/flume";
my $g_data_dir = "/tmp/log-data";
my %g_hdfs_paths = (
"hdfs1" => "hdfs://namenode1:8020/user/gg/data",
"hdfs2" => "hdfs://namenode2:8020/user/gg/data",
);
my $g_emitter_conf = "emitter.properties";
my $g_collector_conf = "collector.properties";
my $g_overwrite_conf = 0;

GetOptions("force!" => $g_overwrite_conf);

generate_emitter_config();
generate_collector_config();

exit(0);

#######################################
sub generate_emitter_config {
my $conf = "";
my $sources = join(" ", sort(keys %g_log_files));
my $sinks = join(" ", map { "s$_" } (1 .. @g_collector_hosts));

$conf .= <<EOF;
emitter.sources = $sources avro1 thrift1 nc1
emitter.channels = c1
emitter.sinks = $sinks
emitter.sinkgroups = g1

EOF

for my $category ( sort keys %g_log_files) {
my $log_files = $g_log_files{$category};

for my $log_file ( @$log_files ) {
$conf .= <<EOF;
emitter.sources.$category.channels = c1
emitter.sources.$category.type = exec
emitter.sources.$category.command = tail -F -n 0 --pid `ps -o ppid= $$` $log_file | sed -e "s/^/host=`hostname --fqdn` category=$category:/"
emitter.sources.$category.shell = /bin/sh -c
emitter.sources.$category.restartThrottle = 5000
emitter.sources.$category.restart = true
emitter.sources.$category.logStdErr = true
emitter.sources.$category.interceptors = i1 i2 i3
emitter.sources.$category.interceptors.i1.type = timestamp
emitter.sources.$category.interceptors.i2.type = host
emitter.sources.$category.interceptors.i2.useIP = false
emitter.sources.$category.interceptors.i3.type = static
emitter.sources.$category.interceptors.i3.key = category
emitter.sources.$category.interceptors.i3.value = $category

EOF
}
}

$conf .= <<EOF;
emitter.sources.avro1.channels = c1
emitter.sources.avro1.type = avro
emitter.sources.avro1.bind = localhost
emitter.sources.avro1.port = $g_emitter_avro_port
emitter.sources.avro1.interceptors = i1 i2 i3
emitter.sources.avro1.interceptors.i1.type = timestamp
emitter.sources.avro1.interceptors.i2.type = host
emitter.sources.avro1.interceptors.i2.useIP = false
emitter.sources.avro1.interceptors.i3.type = static
emitter.sources.avro1.interceptors.i3.key = category
emitter.sources.avro1.interceptors.i3.value = default

emitter.sources.thrift1.channels = c1
emitter.sources.thrift1.type = thrift
emitter.sources.thrift1.bind = localhost
emitter.sources.thrift1.port = $g_emitter_thrift_port
emitter.sources.thrift1.interceptors = i1 i2 i3
emitter.sources.thrift1.interceptors.i1.type = timestamp
emitter.sources.thrift1.interceptors.i2.type = host
emitter.sources.thrift1.interceptors.i2.useIP = false
emitter.sources.thrift1.interceptors.i3.type = static
emitter.sources.thrift1.interceptors.i3.key = category
emitter.sources.thrift1.interceptors.i3.value = default

emitter.sources.nc1.channels = c1
emitter.sources.nc1.type = netcat
emitter.sources.nc1.bind = localhost
emitter.sources.nc1.port = $g_emitter_nc_port
emitter.sources.nc1.max-line-length = 20480
emitter.sources.nc1.interceptors = i1 i2 i3
emitter.sources.nc1.interceptors.i1.type = timestamp
emitter.sources.nc1.interceptors.i2.type = host
emitter.sources.nc1.interceptors.i2.useIP = false
emitter.sources.nc1.interceptors.i3.type = static
emitter.sources.nc1.interceptors.i3.key = category
emitter.sources.nc1.interceptors.i3.value = default

emitter.channels.c1.type = file
emitter.channels.c1.checkpointDir = $g_flume_work_dir/emitter-c1/checkpoint
#emitter.channels.c1.useDualCheckpoints = true
#emitter.channels.c1.backupCheckpointDir = $g_flume_work_dir/emitter-c1/checkpointBackup
emitter.channels.c1.dataDirs = $g_flume_work_dir/emitter-c1/data

EOF

my $i = 0;
my $port = $g_collector_avro_port;
my $onebox = is_one_box();
for my $host ( sort @g_collector_hosts ) {
++$i;
$port += 1000 if $onebox;

$conf .= <<EOF;
emitter.sinks.s$i.channel = c1
emitter.sinks.s$i.type = avro
emitter.sinks.s$i.hostname = $host
emitter.sinks.s$i.port = $port
emitter.sinks.s$i.batch-size = 100
#emitter.sinks.s$i.reset-connection-interval = 600
emitter.sinks.s$i.compression-type = deflate

EOF
}

$conf .= <<EOF;

emitter.sinkgroups.g1.sinks = $sinks
emitter.sinkgroups.g1.processor.type = load_balance
emitter.sinkgroups.g1.processor.backoff = true
emitter.sinkgroups.g1.processor.selector = round_robin

EOF

$conf =~ s/^ +//mg;

die "$g_emitter_conf already exists!
" if ! $g_overwrite_conf && -e $g_emitter_conf;
open my $fh, ">", $g_emitter_conf or die "Cant write $g_emitter_conf: $!
";
print $fh $conf;
close $fh;
}

sub generate_collector_config {
my $conf = "";
my @sinks = qw(file1 hdfs1 hdfs2 hbase1);
my $sinks = join(" ", @sinks);

my $port = $g_collector_avro_port;
my $onebox = is_one_box();
$port += 1000 if $onebox;

$conf .= <<EOF;
collector.sources = source1
collector.channels = $sinks
collector.sinks = $sinks

collector.sources.source1.channels = $sinks
collector.sources.source1.type = avro
collector.sources.source1.bind = 0.0.0.0
collector.sources.source1.port = $port
collector.sources.source1.compression-type = deflate
collector.sources.source1.interceptors = i1 i2 i3 i4

collector.sources.source1.interceptors.i1.type = timestamp
collector.sources.source1.interceptors.i1.preserveExisting = true

collector.sources.source1.interceptors.i2.type = host
collector.sources.source1.interceptors.i2.preserveExisting = true
collector.sources.source1.interceptors.i2.useIP = false

collector.sources.source1.interceptors.i3.type = static
collector.sources.source1.interceptors.i3.preserveExisting = true
collector.sources.source1.interceptors.i3.key = category
collector.sources.source1.interceptors.i3.value = default

collector.sources.source1.interceptors.i4.type = host
collector.sources.source1.interceptors.i4.preserveExisting = false
collector.sources.source1.interceptors.i4.useIP = false
collector.sources.source1.interceptors.i4.hostHeader = collector

EOF

for my $sink (@sinks) {
$conf .= <<EOF;
collector.channels.$sink.type = memory
collector.channels.$sink.capacity = 10000
collector.channels.$sink.transactionCapacity = 100
collector.channels.$sink.byteCapacityBufferPercentage = 20
collector.channels.$sink.byteCapacity = 0

EOF
}

$conf .= <<EOF;

collector.sinks.file1.channel = file1
collector.sinks.file1.type = file_roll
collector.sinks.file1.sink.directory = $g_data_dir/collector-$port-file1
collector.sinks.file1.sink.rollInterval = 3600
collector.sinks.file1.batchSize = 100
collector.sinks.file1.sink.serializer = text
collector.sinks.file1.sink.serializer.appendNewline = true
#collector.sinks.file1.sink.serializer = avro_event
#collector.sinks.file1.sink.serializer.syncIntervalBytes = 2048000
#collector.sinks.file1.sink.serializer.compressionCodec = snappy

collector.sinks.hdfs1.channel = hdfs1
collector.sinks.hdfs1.type = hdfs
collector.sinks.hdfs1.hdfs.path = $g_hdfs_paths{hdfs1}/%{category}/%Y%m%d/%H
collector.sinks.hdfs1.hdfs.filePrefix = %{collector}-$port
collector.sinks.hdfs1.hdfs.rollInterval = 600
collector.sinks.hdfs1.hdfs.rollSize = 0
collector.sinks.hdfs1.hdfs.rollCount = 0
collector.sinks.hdfs1.hdfs.idleTimeout = 0
collector.sinks.hdfs1.hdfs.batchSize = 100
collector.sinks.hdfs1.hdfs.codeC = snappy
collector.sinks.hdfs1.hdfs.fileType = SequenceFile
#collector.sinks.hdfs1.serializer = text
#collector.sinks.hdfs1.serializer.appendNewline = true
collector.sinks.hdfs1.serializer = avro_event
collector.sinks.hdfs1.serializer.syncIntervalBytes = 2048000
collector.sinks.hdfs1.serializer.compressionCodec = null
#collector.sinks.hdfs2.serializer.compressionCodec = snappy

collector.sinks.hdfs2.channel = hdfs2
collector.sinks.hdfs2.type = hdfs
collector.sinks.hdfs2.hdfs.path = $g_hdfs_paths{hdfs2}/%{category}/%Y%m%d/%H
collector.sinks.hdfs2.hdfs.filePrefix = %{collector}-$port
collector.sinks.hdfs2.hdfs.rollInterval = 600
collector.sinks.hdfs2.hdfs.rollSize = 0
collector.sinks.hdfs2.hdfs.rollCount = 0
collector.sinks.hdfs2.hdfs.idleTimeout = 0
collector.sinks.hdfs2.hdfs.batchSize = 100
collector.sinks.hdfs2.hdfs.codeC = snappy
collector.sinks.hdfs2.hdfs.fileType = SequenceFile
#collector.sinks.hdfs2.serializer = text
#collector.sinks.hdfs2.serializer.appendNewline = true
collector.sinks.hdfs2.serializer = avro_event
collector.sinks.hdfs2.serializer.syncIntervalBytes = 2048000
collector.sinks.hdfs2.serializer.compressionCodec = null
#collector.sinks.hdfs2.serializer.compressionCodec = snappy

collector.sinks.hbase1.channel = hbase1
collector.sinks.hbase1.type = hbase
collector.sinks.hbase1.table = log
collector.sinks.hbase1.columnFamily = log

EOF

$conf =~ s/^ +//mg;

die "$g_collector_conf already exists!
" if ! $g_overwrite_conf && -e $g_collector_conf;
open my $fh, ">", $g_collector_conf or die "Cant write $g_collector_conf: $!
";
print $fh $conf;
close $fh;
}

sub is_one_box {
my %h = map { $_ => 1 } @g_collector_hosts;
return keys %h < @g_collector_hosts;
}

推薦閱讀:

TAG:Linux |