前言:flume通过使用Interceptors(拦截器)实现修改和过滤事件的功能。举个栗子,一个网站每天产生海量数据,但是可能会有很多数据是不完整的(缺少重要字段),或冗余的,如果不对这些数据进行特殊处理,那么会降低系统的效率。这时候拦截器就派上用场了。
一、flume内置的拦截器
先列个flume内置拦截器的表:
由于拦截器一般针对Event的Header进行处理,那我先介绍一Event吧。Event结构如下图:
- event是flume中处理消息的基本单元,由零个或者多个header和正文body组成。
- Header 是 key/value 形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和 HTTP 头一样提供相同的功能——通过该方法来传输正文之外的额外信息。
- Body是一个字节数组,包含了实际的内容。
- flume提供的不同source会给其生成的event添加不同的header。
这些拦截器实际都是往Event的header里插数据,比如Timestamp Interceptor拦截器就是可以往event的header中插入关键词为timestamp的时间戳。
1.1 timestamp拦截器
下面是官网的timestamp的配置:
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
测试例子:
在flume的conf目录下新建timestamp.conf文件,输入以下代码:
#配置文件:timestamp.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.preserveExisting= false
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
打开一个终端,进入flume的bin目录,输入./flume-ng agent -c ../conf -f ../conf/timestamp.conf -Dflume.root.logger=INFO,console -n a1
启动成功后,flume开始监控本主机上的所有IP地址,再开一个终端,向50000端口发TCP数据,命令如下:echo "TimestampInterceptor" | nc 127.0.0.1 50000
。
此时,flume的终端会接收到这个消息,如下:
我们可以清楚地看到header中有timestamp的影子。成功了。
1.2 host拦截器
再说一个host interceptor,该拦截器可以往event的header中插入关键词默认为host的主机名或者ip地址(注意是agent运行的机器的主机名或者ip地址)。官网配置如下:
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
测试例子
#配置文件:host.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i2
# a1.sources.r1.interceptors.i1.preserveExisting= false
# a1.sources.r1.interceptors.i1.type =timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader =hostname
a1.sources.r1.interceptors.i2.useIP = false
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
进入flume的bin目录,执行./flume-ng agent -c ../conf -f ../conf/host.conf -Dflume.root.logger=INFO,console -n a1
然后在新的终端输入echo "HostInterceptor" | nc 127.0.0.1 50000
结果如下:
可以发现header中增加了一个hostname的值:192.168.1.105。使用ifconfig命令,正是我电脑的IP地址,因为flume的agent运行在这个IP上嘛。
1.3 Regex Filtering Interceptor拦截器
还有比较重要的Regex Filtering Interceptor,Regex Filtering Interceptor拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件。常用于数据清洗,通过正则表达式把数据过滤出来。
测试例子:
#配置文件:regex_filter.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
#全部是数字的数据
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
#排除符合正则表达式的数据
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这样的配置将会过滤掉所有纯数字的数据。
进入bin目录启动flume,执行./flume-ng agent -c ../conf -f ../conf/regex_filter.conf -Dflume.root.logger=INFO,console -n a1
新开终端输入
echo "1234" | nc 127.0.0.1 50000
echo "1234" | nc 127.0.0.1 50000
echo "1234" | nc 127.0.0.1 50000
在flume的终端会输出如下内容,但是没有数据,因为被我们过滤掉了。
二、总结
本文介绍了flume中的interceptor拦截器,并针对具体的三个拦截器进行了测试,验证其功能。后面我们还会编写符合自己需求的自定义拦截器。敬请期待。