Flume拦截器-应用与配置

Flume最重要的组件是Source、Channel和Sink,另外,Flume Agent还有一些使Flume更加灵活的组件,如拦截器,Channel选择器,Sink组和Sink选择器。本文将讨论一下拦截器的应用。

拦截器

拦截器(Interceptor)是简单的插入式组件,设置在Source和Source写入数据的Channel之间,Source接收到的事件在写入到Channel之前,拦截器都可以对时间进行拦截,转换或删除这些时间。每个拦截器实例只处理同一个Source接受到的事件。

可以添加任意数量的拦截器去处理从单个Source传来的事件,Source将同一个事务中的所有事件传递给Channel处理器,进而传递给拦截器链,然后事件被传递给拦截器链的第一个拦截器,之后对事件进行转换处理,往下一个拦截器传递,依次直到最后一个拦截器返回的事件写入到Channel中。

拦截器必须在事件写入到Channel之前完成处理,因此在拦截器中进行大量的耗时处理不太合适,如果拦截器的处理非常耗时,需要相应调整响应超时时间。防止由于长时间没有响应发送事件的客户端或者Sink,而导致超时。

拦截器是需要命名的组件,每个拦截器都需要限定一个名字。拦截器的配置需要以interceptor开头、后面跟着拦截器的名称,以及配置项名称。

下面是拦截器配置示例

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

时间戳拦截器

Flume中最常用的拦截器是时间戳拦截器,该拦截器将时间戳插入到Flume事件的报头,附带的timeStamp是HDFS Sink用来分桶的报头。如果时间戳报头已经存在,则会替换该时间戳报头,除非preserveExisting参数设置为true。该拦截器经常用在第一层agent,用于从客户端接受数据。

参数 描述
type timestamp
preserveExisting 默认值false。如果设置为true,若时间戳报头以及存在,则不会替换该时间戳报头

配置示例

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

主机拦截器

主机拦截器插入服务器的IP地址或者主机名,Agent将这些内容注入到Flume的事件报头中,事件报头中的键使用hostHeader配置,默认值为host。如果事件报头在事件中已经存在,则会替换该事件报头,除非preserveExisting参数设置为true。将useIP参数设置为false,插入的主机名会替换ip地址。

参数 描述
type host
hostHeader 默认host,事件的头,用于插入ip地址或者主机名
useIP 如果设置为true,host键插入的是IP地址
preserveExisting 默认值false。如果设置为true,若报头存在,则不会替换该报头
a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = hostname

静态拦截器

静态拦截器可以简单的将固定报头的键和值插入拦截的每个事件中。

参数 描述
type static
key 默认key,报头的键
value 默认value,报头键对应的值
preserveExisting 默认值false。如果设置为true,若该报头已经存在,则不会替换该报头
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

正则过滤拦截器

该拦截器可以用于过滤事件,每个正则过滤器拦截将事件体转换为UTF-8的字符串,使用该字符串基于配置的正则表达式去匹配,如果匹配成功,则通过该事件或者抛弃该事件。

参数 描述
type regex_filter
regex 默认.* 正则表达式
excludeEvents 默认false,如果为true,匹配上的事件会丢弃。

示例

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.excludeEvents= true

Regex Extractor Interceptor

此拦截器使用指定的正则表达式提取regex捕获组,并将匹配组追加到事件的报头。它还支持在将匹配组添加为事件报头之前对其进行格式化。

参数 描述
type regex_extractor
regex 默认.* 正则表达式
serializers 空格分割的名称,对应正则匹配的捕获组
serializers.<s1>.name 报头的键名
serializers.<s1>.type 默认default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer),还有org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer,或者自定义实现org.apache.flume.interceptor.RegexExtractorInterceptorSerializer

默认的序列化器,org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer,只将匹配项映射到指定的头名称,并在regex提取值时传递该值。可以自定义序列化器实现更多功能,以任意方式格式化匹配项。自定义的类需要实现org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口。

示例一

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

示例二

a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

UUID拦截器

拦截器可以为每个事件生成唯一的标识符,生成的UUID可以设置为可配置的参数,还可以为UUID生成相应的前缀。

参数 描述
type org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName 报头名称
preserveExisting 默认true,如果UUID已存在,保留不覆盖。
prefix 生成UUID的前缀
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.r1.interceptors.i1.headerName = prefix-
a1.sources.r1.interceptors.i1.preserveExisting = false

自定义拦截器

拦截器是Flume中最容易编写的组件,只需要实现Interceptor接口。该接口本身非常简单,Flume本身要求所有的拦截器必须有一个实现Interceptor$Builder接口的Builder类。所有的Builde类必须有一个公共的无参构造方法。Flume使用该方法完成实例化,可以使用传递到Builder类的Context实例配置拦截器,所有需要的参数都传递到Context实例。

拦截器一般用于拦截,转换事件,通常给拦截的事件插入事件报头,这些事件用于后续的HDFS Sink(用于时间戳或者基于报头的分桶),Hbase Sink(用于行键)等。这些事件报头也经常在复杂的Channel处理器中用于将流分为多个流的分支,或者基于优先级将事件发送到不同的Sink中。

Interceptor接口

public interface Interceptor {
    void initialize();

    Event intercept(Event var1);

    List<Event> intercept(List<Event> var1);

    void close();

    public interface Builder extends Configurable {
        Interceptor build();
    }
}

可以看到有两种处理事件的方法,第一种方法接受一个事件返回一个事件列表,第二种方法 可以接受一个事件列表并返回一个事件列表。这两个方法必须都是线程安全的,因为如果Source运行在多线程环境下,这些方法可能被多个线程调用。

自定义代码示例

public class CounterInterceptor implements Interceptor {
    private final String headerKey;
    private final AtomicInteger count;

    public CounterInterceptor(Context context) {
        headerKey = context.getString("header","count");
        count = new AtomicInteger(0);
    }

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        int i = count.incrementAndGet();
        event.getHeaders().put(headerKey, String.valueOf(i));
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            Event e = intercept(event);

        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class CounterInterceptorBuilder implements Interceptor.Builder {
        private Context context;
        @Override
        public Interceptor build() {
            return new MyInterceptor(context);
        }

        @Override
        public void configure(Context context) {
            this.context = context;
        }
    }
}

CounterInterceptor 类的拦截方法是线程安全的,因为变量是由final修饰的,或者是Atomic原子操作。如果需要拦截该事件,则返回null即可,如果是事件列表,则必须返回一个事件列表,即使为空,也必须返回列表。

拦截器的调用是由Channel处理器来完成的,Channel处理器会首先实例化Builder类,然后调用Builder类的configure方法,该方法用于传递包含配置拦截器的Context实例。然后Channel处理器调用build方法,该方法返回拦截器。Channel处理器通过调用拦截器实例的Initialize方法初始化拦截器。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,743评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,296评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,285评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,485评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,581评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,821评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,960评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,719评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,186评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,516评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,650评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,329评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,936评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,757评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,991评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,370评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,527评论 2 349

推荐阅读更多精彩内容