1、默认拦截器
官网提供了几个默认拦截器,具体使用可查官方文档
image.png
2、自定义拦截器(实现MapReducer中的日志的清洗功能)
a. 编写代码(实现Interceptor接口,并实现内部接口Builder)
Interceptor接口主要实现具体的拦截器的功能,内部接口Builder功能包含实例化Interceptor,以及获取配置文件传给Interceptor
package top.gujm.flume;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class LoggerFilterInterceptor implements Interceptor {
public void initialize() {
}
public Event intercept(Event event) {
String line = new String(event.getBody());
String[] fields = line.split(" ");
// 字段长度大于11
if(fields.length <= 11){
return null;
}
// 校验状态码小于400
if(Integer.parseInt(fields[8]) >= 400){
return null;
}
return event;
}
public List<Event> intercept(List<Event> events) {
List<Event> list = new ArrayList<Event>();
for (Event event : events) {
Event e = intercept(event);
if (e != null)
list.add(e);
}
return list;
}
public void close() {
}
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new LoggerFilterInterceptor();
}
public void configure(Context context) {
}
}
}
b. 使用maven打包,并将jar包放入lib文件夹中
c. conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/testData
#保存文件绝对路径到header
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = top.gujm.flume.LoggerFilterInterceptor$Builder
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume/log/%y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 1000
#1小时滚动一次
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
image
image
image