分布式日志收集框架Flume

面对以上的问题,我们如何将这些日志移动到hdfs集群上尼????

第一种方案:使用shell脚本cp 文件,然后通过hdfs fs -put 源文件  hdfs目录。

此方案可行,但是优缺点:如下

1)缺乏监控机制、如果某台日志服务器宕机了怎么办?

2)如果使用脚本,肯定要对脚本做定时触发的机制,每隔一分钟或者两分钟,这个时效性就会打折扣

3)文件都是存放在服务器磁盘上的,如果要移动日志文件,肯定会产生较大的磁盘IO,必须要压缩传输

4)业务服务器一般都是多节点部署的,肯定会产生多份日志文件,如何把这些日志聚合在一起?

因此这时候就出现了一个较为可靠的解决方案Flume框架


Flume概述

官网地址:http://flume.apache.org/

介绍:他就是把我们的日志,从a地方搬运到b地方的服务框架


Flume核心组件Source、Channel、Sink

Source:收集

详细介绍:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources

Channel:相当于通道,是数据临时存放的地方,聚合、缓冲区

详细介绍:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources

Sink:输出

详细介绍:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources

Flume三种流行的使用方案

第一种:

第二种:(最常用的)


第三种:


Flume部署

前置条件:

1)Java Runtime Environment - Java 1.8 or later

2)Memory - Sufficient memory for configurations used by sources, channels or sinks

3)Disk Space - Sufficient disk space for configurations used by channels or sinks

4)Directory Permissions - Read/Write permissions for directories used by agent

第一步:

下载jdk

安装jdk

配置环境变量

第二步:

下载flume,地址为http://archive.cloudera.com/cdh5/cdh/5/

weget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.7.0.tar.gz

解压:tar -zvxf  flume-ng-1.6.0-cdh5.7.0.tar.gz  -C ~/usr/目录下


解压后flume的目录结构

配置环境变量:

1、vim /etc/profile

export FLUME_HOME=/usr/apache-flume-1.6.0-cdh5.7.0-bin

export PATH=$PATH:$FLUME_HOME/bin

2、//使环境变量配置生效

source /etc/profile

3、配置flume根目录中conf目录中文件flume-env.sh的配置参数,复制flume-env.sh.template成flume-env.sh在flume-env.sh中设置flume依赖的jdk。


4、检测flume是否安装成功,切换到flume的bin目录/usr/apache-flume-1.6.0-cdh5.7.0-bin/bin 


执行flume-ng version命令查看flume的版本,如果出现下图,则表示安装成功


如何配置Agent?

# example.conf: A single-node Flume configuration

###自己的理解:使用Flume的关键就是写配置文件

A)配置Source

B)配置Channel

C)配置Sink

D)把以上三个组件串联起来

a1:agent的名称

r1:source的名称

k1:sink的名称

c1:channel的名称

# Name the components on this agent

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# Describe/configure the  source

a1.sources.r1.type=netcat    ##固定的类型

a1.sources.r1.bind=localhost ##绑定的IP地址

a1.sources.r1.port=44444  ##需要监听的服务端口,如果这个端口的服务有内容输出,就会被监控到,并把数据通过flume的事件发送过来。

解析如下:

官方网站对四个属性的解释

# Describe the sink

a1.sinks.k1.type=logger  ##指定数据输出形式


# Use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

# Bind the source and sink to the channel

###把source-channel-sink建立联系

a1.sources.r1.channels=c1  ##一个agent的source可以指定多个channel,因此这里时channels

a1.sinks.k1.channel=c1 ##而一个agent的sink只能对应一个channel。

把这个配置内容保存在一个文件里面,存放在flume的根目录conf文件夹下,我自己喜欢命名:

flume-setting.conf

启动flume

bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume-setting.conf --name a1 -Dflume.root.logger=INFO,console

测试是否能正常监听端口

使用telnet ip 监控的端口


上图有Event内容,这里面有很多含义,咱们拿一个样例来说下。

Event:{headers:{} body: 68 65 6C 6C 6F 0D       hello.}为例子。

Event:是Flume数据传输的基本单元

Event:是可选的headers+ byte array

至此,从指定的网络端口采集日志数据输出到控制台已经完成了。


第二个需求

要实现这个需求,我们就需要再重flume中做适合该需求的Agent的选项

在官网查找下来,定下了这个选型:exec Source  +  memory Channel  +  logger Sink


我们把上面的需求的配置文件改动下:

# Name the components on this agent

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# Describe/configure the  source

a1.sources.r1.type=exec    ##shell命令类型的

a1.sources.r1.command=tail -F /usr/test.log ##绑定监控的日志文件

a1.sources.r1.shell=/bin/bash -c    ##固定填写

# Describe the sink

a1.sinks.k1.type=logger  ##指定数据输出形式

# Use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

# Bind the source and sink to the channel

###把source-channel-sink建立联系

a1.sources.r1.channels=c1  ##一个agent的source可以指定多个channel,因此这里时channels

a1.sinks.k1.channel=c1 ##而一个agent的sink只能对应一个channel。

把这个配置内容保存在一个文件里面,存放在flume的根目录conf文件夹下,我自己喜欢命名:

exec-memory-logger.conf

然后启动

bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-logger.conf --name a1 -Dflume.root.logger=INFO,console

这时候我们往/usr/test.log中添加点内容,看看是否能否被监控到

echo 你好吗 >> test.log

echo yes >> test.log

echo 我非常好 >> test.log


我们把监控到的数据打印到控制台,是没有实际意义的,我们如果想把监控到的数据写入到我们的hdfs上去,改怎么处理?

继续查找flume的官方开发文档:

既然是写入到hdfs,我们就必须去设置agent的sink配置,来我们来找一下。


sink的配置样例

# Name the components on this agent

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# Describe/configure the  source

a1.sources.r1.type=exec    ##shell命令类型的

a1.sources.r1.command=tail -F /usr/test.log ##绑定监控的日志文件

a1.sources.r1.shell=/bin/bash -c    ##固定填写

# Describe the sink

a1.sinks.k1.type=logger  ##指定数据输出形式

# Use a channel which buffers events in memory

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

# Bind the source and sink to the channel

###把source-channel-sink建立联系

a1.sources.r1.channels=c1  ##一个agent的source可以指定多个channel,因此这里时channels

a1.sinks.k1.channel=c1       ##而一个agent的sink只能对应一个channel。

a1.sinks.k1.hdfs.path = hdfs://flume/xcx/%y-%m-%d/%H%M/%S ###定义再hdfs的存储目录格式

a1.sinks.k1.hdfs.filePrefix = xcx-                                          ###存储文件前缀

a1.sinks.k1.hdfs.fileSuffix =.lzo                                            ###存储文件后缀

a1.sinks.k1.hdfs.codeC=lzo                                               ###压缩格式可选值gzip, bzip2, lzo, lzop, snappy

a1.sinks.k1.hdfs.writeFormat=Text                                        ####存储文件类型Text or Writable

a1.sinks.k1.hdfs.round = true                                                   ###是否根据时间间隔滚动产生文件


a1.sinks.k1.hdfs.roundValue = 10                                            ###10

a1.sinks.k1.hdfs.roundUnit = minute                                       ###单位  可选值 second, minute or hour.

把这个配置内容保存在一个文件里面,存放在flume的根目录conf文件夹下,我自己喜欢命名:

exec-memory-hdfs.conf

这里我有个疑问,flume是如何知道我的hdfs服务器位置的尼?拿到只用一个 a1.sinks.k1.hdfs.path = hdfs://flume/xcx/%y-%m-%d/%H%M/%S ###定义再hdfs的存储目录格式就可以做到了吗?如果flume不跟hdfs部署在同一台服务器,这种方式还有效吗?

咱们就引出来了第三个需求:



我们的技术选型:这里的avro sink就是用来跨机器的传输格式

第一组:exec source + memory channel + avro sink

第二组:avro source + memory channel + logger sink

有几组就要编写几个agent配置文件

第一组agent文件名:exec-memory-avro.conf

######begin######

# Name the components on this agent

exec-memory-avro.sources=exec-source

exec-memory-avro.sinks=avro-sink

exec-memory-avro.channels=memroy-channel


# Describe/configure the  source

exec-memory-avro.sources.exec-source.type=exec    ##shell命令类型的

exec-memory-avro.sources.exec-source.command=tail -F /usr/test.log ##绑定监控的日志文件

exec-memory-avro.sources.exec-source.shell=/bin/bash -c    ##固定填写


exec-memory-avro.sinks.avro-sink.type=avro ##指定数据输出形式

exec-memory-avro.sinks.avro-sink.hostname=192.168.32.129 ####数据需要写入的目标服务器ip

exec-memory-avro.sinks.avro-sink.port=44444   ####所在端口

exec-memory-avro.channels.memroy-channel.type=memory

exec-memory-avro.channels.memroy-channel.capacity=1000

exec-memory-avro.channels.memroy-channel.transactionCapacity=100

###把source-channel-sink建立联系

exec-memory-avro.sources.exec-source.channels=memroy-channel ##一个agent的source可以指定多个channel,因此这里时channels

exec-memory-avro.sinks.avro-sink.channel=memroy-channel ##而一个agent的sink只能对应一个channel。

######end######

第二组agent文件名:avro-memory-logger.conf

######begin######

# Name the components on this agent

avro-memory-logger.sources=avro-source

avro-memory-logger.sinks=logger-sink

avro-memory-logger.channels=memroy-channel

# Describe/configure the  source

avro-memory-logger.sources.avro-source.type=avro 

avro-memory-logger.sources.avro-source.bind=192.168.32.72

avro-memory-logger.sources.avro-source.port=44444


avro-memory-logger.sinks.logger-sink.type=logger ##指定数据输出形式

avro-memory-logger.channels.memroy-channel.type=memory

avro-memory-logger.channels.memroy-channel.capacity=1000

avro-memory-logger.channels.memroy-channel.transactionCapacity=100

###把source-channel-sink建立联系

avro-memory-logger.sources.avro-source.channels=memroy-channel ##一个agent的source可以指定多个channel,因此这里时channels

avro-memory-logger.sinks.logger-sink.channel=memroy-channel ##而一个agent的sink只能对应一个channel。

######end######


启动Flume

这里要由于有两个agent,所以要有个启动顺序

需要先启动:avro-memory-logger

bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-logger.conf --name avro-memory-logger -Dflume.root.logger=INFO,console

然后再启动:exec-memory-avro

bin/flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf --name exec-memory-avro -Dflume.root.logger=INFO,console

关闭flume

ps -ef|grep flume,查出进程号,然后kill pid即可

注意:改动flume的配置之后,重启flume,重启前未被发出来的消息还依然会重发。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容