flumenote3

源将事件写到一个多或者多个通道中。
接收器只从一个通道接收事件。
代理可能会有多个源、通道与接收器。
flume-ng-channels
里面包含了filechannel,jdbcchannel,kafkachannel,memorychannel通道的实现。
flume-ng-clients
实现了log4j相关的几个Appender,使得log4j的日志输出可以直接发送给flume-agent;其中有一个LoadBalancingLog4jAppender的实现,提供了多个flume-agent的load balance和ha功能,采用flume作为日志收集的可以考虑将这个appender引入内部的log4j中。
flume-ng-configuration
这个主要就是Flume配置信息相关的类,包括载入flume-config.properties配置文件并解析。其中包括了Source的配置,Sink的配置,Channel的配置,在阅读源码前推荐先梳理这部分关系再看其他部分的。
flume-ng-core
flume整个核心框架,包括了各个模块的接口以及逻辑关系实现。其中instrumentation是flume内部实现的一套metric机制,metric的变化和维护,其核心也就是在MonitoredCounterGroup中通过一个Map来实现metric的计量。ng-core下几乎大部分代码任然几种在channel、sink、source几个子目录下,其他目录基本完成一个util和辅助的功能。
flume-ng-node
实现启动flume的一些基本类,包括main函数的入口(Application.java中)。在理解configuration之后,从application的main函数入手,可以较快的了解整个flume的代码。

flume-ng启动文件介绍

################################
# constants
################################

#设置常量值,主要是针对不同的参数执行相应的类,以启动Flume环境
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"

#真正启动Flume环境的方法
run_flume() {
  local FLUME_APPLICATION_CLASS

  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi

  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi

  #执行这一行命令,执行相应的启动类,比如org.apache.flume.node.Application
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
      -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}


################################
# main
################################

# set default params
# 在启动的过程中使用到的参数
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
#默认占用堆空间大小,这一块都可以根据JVM进行重新设置
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""

opt_conf=""
opt_classpath=""
opt_plugins_dirs=""
arr_java_props=()
arr_java_props_ct=0
opt_dryrun=""

# 根据不同的参数,执行不同的启动类,每个常量所对应的类路径在代码前面有过介绍。
if [ -n "$opt_agent" ] ; then
  run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
  run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
  run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
  run_flume $FLUME_TOOLS_CLASS $args
else
  error "This message should never appear" 1
fi

主要是run-flume语句以及指定的启动类
从bin/flume-ng这个shell脚本可以看到Flume的起始于org.apache.flume.node.Application类,这是flume的main函数所在。main方法首先会先解析shell命令,如果指定的配置文件不存在就抛出异常。
main方法首先校验shell命令行的代码,解析
./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console
语句,然后加载配置文件。
根据命令中含有"no-reload-conf"参数,决定采用那种加载配置文件方式:
一、没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件;
二、有此参数,则只在启动时加载一次配置文件。实现动态加载功能采用了发布订阅模式,使用guava中的EventBus实现。
三、PropertiesFileConfigurationProvider这个类是配置文件加载类,PollingPropertiesFileConfigurationProvider类中,它实现了LifecycleAware接口,而这个接口是掌管整个Flume生命周期的一个核心接口,LifecycleSupervisor实现了这个接口,通过上面代码中application.start方法触发LifecyleAware的start方法,LifecycleAware接口定义了start和stop方法。
时序调用图:


PollingPropertiesFileConfigurationProvider.start()方法会启动一个单线程FileWatcherRunnable每隔30s去加载一次配置文件。
如何解析文件?
1、用agent.sources.s1.command=s1来举例:变量prefix指的是:sink,source,channel等关键字。
2、parseConfigKey方法,首先根据prefix判断prefix的后面,有少多字符。比如:sources.s1.command,在sources后面s1.command一共有10个字符。
3、解析出name变量,如s1,这个是自己定义的。
4、解析出configKey固定关键字,如command,这个是系统定义的。
5、封装new ComponentNameAndConfigKey(name, configKey)返回。
6、将sources、channel、sink配置信息,分别存放到sourceContextMap、channelConfigMap、sinkConfigMap三个HashMap,最后统一封装到AgentConfiguration对象中,然后再把AgentConfiguration存放到agentConfigMap中,key是agentName。

图片.png

source - channelProcessor - channel - sink -sinkgroup

Source  {
    ChannelProcessor  {
             Channel  ch1
             Channel  ch2
             …
    }
} 
Sink  {
   Channel  ch; 
} 
SinkGroup {
   Channel ch;
   Sink s1;
   Sink s2;
   …
}

1、Source组件
Source是数据源的总称,设定好源后,数据将源源不断的被抓取或者被推送。常见的数据源有:ExecSource,KafkaSource,HttpSource,NetcatSource,JmsSource,AvroSource等等。所有的数据源统一实现一个接口类如下:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Source extends LifecycleAware, NamedComponent {

  /**
   * Specifies which channel processor will handle this source's events.
   *
   * @param channelProcessor
   */
  public void setChannelProcessor(ChannelProcessor channelProcessor);

  /**
   * Returns the channel processor that will handle this source's events.
   */
  public ChannelProcessor getChannelProcessor();

}

Source提供了两种机制: PollableSource(轮询拉取)和EventDrivenSource(事件驱动):


上图展示的Source继承关系类图。通过类图我们可以看到NetcatSource,ExecSource和HttpSource属于事件驱动模型。KafkaSource,SequenceGeneratorSource和JmsSource属于轮询拉取模型。Source接口继承了LifecycleAware接口,它的的所有逻辑的实现在接口的start和stop方法中进行。下图是类关系方法图:

Source接口定义的是最终的实现过程,如抓取日志,这个抓取的过程和实际操作就是在对应的Source实现中,比如:ExecSource。那么这些Source实现由谁来驱动的呢?现在我们将介绍SourceRunner类。看一下类继承结构图:


我们看一下PollableSourceRunner和EventDrivenSourceRunner的具体实现:

  @Override //PollableSourceRunner:
  public void start() {
    PollableSource source = (PollableSource) getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();

    runner = new PollingRunner();

    runner.source = source;
    runner.counterGroup = counterGroup;
    runner.shouldStop = shouldStop;

    runnerThread = new Thread(runner);
    runnerThread.setName(getClass().getSimpleName() + "-" + 
        source.getClass().getSimpleName() + "-" + source.getName());
    runnerThread.start();

    lifecycleState = LifecycleState.START;
  }

  @Override //EventDrivenSourceRunner
  public void start() {
    Source source = getSource();
    ChannelProcessor cp = source.getChannelProcessor();
    cp.initialize();
    source.start();
    lifecycleState = LifecycleState.START;
  }

注:其实所有的Source实现类内部都维护着线程,执行source.start()其实就是启动了相应的线程。
2、Channel组件
Channel用于连接Source和Sink,Source将日志信息发送到Channel,Sink从Channel消费日志信息;Channel是中转日志信息的一个临时存储,保存有Source组件传递过来的日志信息。先看代码如下:AbstractConfigurationProvider的loadSources()

          ChannelSelectorConfiguration selectorConfig =
              config.getSelectorConfiguration();

          ChannelSelector selector = ChannelSelectorFactory.create(
              sourceChannels, selectorConfig);

          ChannelProcessor channelProcessor = new ChannelProcessor(selector);
          Configurables.configure(channelProcessor, config);

          source.setChannelProcessor(channelProcessor);

ChannelSelectorFactory.create方法实现如下:

  public static ChannelSelector create(List<Channel> channels,
      ChannelSelectorConfiguration conf) {
    String type = ChannelSelectorType.REPLICATING.toString();
    if (conf != null) {
      type = conf.getType();
    }
    ChannelSelector selector = getSelectorForType(type);
    selector.setChannels(channels);
    Configurables.configure(selector, conf);
    return selector;
  }

其中我们看一下ChannelSelectorType这个枚举类,包括了几种类型:

public enum ChannelSelectorType {

  /**
   * Place holder for custom channel selectors not part of this enumeration.
   */
  OTHER(null),

  /**
   * Replicating channel selector.
   */
  REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"),

  /**
   * Multiplexing channel selector.
   */
  MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector");

ChannelSelector的类结构图如下所示:


注:RelicatingChannelSelector和MultiplexingChannelSelector是二个通道选择器,第一个是复用型通道选择器,也就是的默认的方式,会把接收到的消息发送给其他每个channel。第二个是多路通道选择器,这个会根据消息header中的参数进行通道选择。
Channel是什么?:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Channel extends LifecycleAware, NamedComponent {

  /**
   * <p>Puts the given event into the channel.</p>
   * <p><strong>Note</strong>: This method must be invoked within an active
   * {@link Transaction} boundary. Failure to do so can lead to unpredictable
   * results.</p>
   * @param event the event to transport.
   * @throws ChannelException in case this operation fails.
   * @see org.apache.flume.Transaction#begin()
   */
  public void put(Event event) throws ChannelException;

  /**
   * <p>Returns the next event from the channel if available. If the channel
   * does not have any events available, this method must return {@code null}.
   * </p>
   * <p><strong>Note</strong>: This method must be invoked within an active
   * {@link Transaction} boundary. Failure to do so can lead to unpredictable
   * results.</p>
   * @return the next available event or {@code null} if no events are
   * available.
   * @throws ChannelException in case this operation fails.
   * @see org.apache.flume.Transaction#begin()
   */
  public Event take() throws ChannelException;

  /**
   * @return the transaction instance associated with this channel.
   */
  public Transaction getTransaction();
}

注:put方法是用来发送消息,take方法是获取消息,transaction是用于事务操作。类结构图如下:

3、Sink组件
Sink负责取出Channel中的消息数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
Sink接口类内容如下:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Sink extends LifecycleAware, NamedComponent {
  /**
   * <p>Sets the channel the sink will consume from</p>
   * @param channel The channel to be polled
   */
  public void setChannel(Channel channel);

  /**
   * @return the channel associated with this sink
   */
  public Channel getChannel();

  /**
   * <p>Requests the sink to attempt to consume data from attached channel</p>
   * <p><strong>Note</strong>: This method should be consuming from the channel
   * within the bounds of a Transaction. On successful delivery, the transaction
   * should be committed, and on failure it should be rolled back.
   * @return READY if 1 or more Events were successfully delivered, BACKOFF if
   * no data could be retrieved from the channel feeding this sink
   * @throws EventDeliveryException In case of any kind of failure to
   * deliver data to the next hop destination.
   */
  public Status process() throws EventDeliveryException;

  public static enum Status {
    READY, BACKOFF
  }
}

Sink是通过如下代码进行的创建:
Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());

DefaultSinkFactory.create方法如下:

  @Override
  public Sink create(String name, String type) throws FlumeException {
    Preconditions.checkNotNull(name, "name");
    Preconditions.checkNotNull(type, "type");
    logger.info("Creating instance of sink: {}, type: {}", name, type);
    Class<? extends Sink> sinkClass = getClass(type);
    try {
      Sink sink = sinkClass.newInstance();
      sink.setName(name);
      return sink;
    } catch (Exception ex) {
      throw new FlumeException("Unable to create sink: " + name
          + ", type: " + type + ", class: " + sinkClass.getName(), ex);
    }
  }

**注:Sink是通过SinkFactory工厂来创建,提供了DefaultSinkFactory默认工厂,程序会查找org.apache.flume.conf.sink.SinkType这个枚举类找到相应的Sink处理类,比如:org.apache.flume.sink.LoggerSink,如果没找到对应的处理类,直接通过Class.forName(className)进行直接查找实例化实现类。 **
Sink的类结构图如下:


与ChannelProcessor处理类对应的是SinkProcessor,由SinkProcessorFactory工厂类负责创建,SinkProcessor的类型由一个枚举类提供,看下面代码:

public enum SinkProcessorType {
  /**
   * Place holder for custom sinks not part of this enumeration.
   */
  OTHER(null),

  /**
   * Failover processor
   *
   * @see org.apache.flume.sink.FailoverSinkProcessor
   */
  FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"),

  /**
   * Standard processor
   *
   * @see org.apache.flume.sink.DefaultSinkProcessor
   */
  DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"),


  /**
   * Load balancing processor
   *
   * @see org.apache.flume.sink.LoadBalancingSinkProcessor
   */
  LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor");

SinkProcessor的类结构图如下:


说明:1、FailoverSinkProcessor是故障转移处理器,当sink从通道拿数据信息时出错进行的相关处理,代码如下:

 @Override
  public Status process() throws EventDeliveryException {
    // Retry any failed sinks that have gone through their "cooldown" period
    Long now = System.currentTimeMillis();
    while (!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
      FailedSink cur = failedSinks.poll();
      Status s;
      try {
        s = cur.getSink().process();
        if (s  == Status.READY) {
          liveSinks.put(cur.getPriority(), cur.getSink());
          activeSink = liveSinks.get(liveSinks.lastKey());
          logger.debug("Sink {} was recovered from the fail list",
                  cur.getSink().getName());
        } else {
          // if it's a backoff it needn't be penalized.
          failedSinks.add(cur);
        }
        return s;
      } catch (Exception e) {
        cur.incFails();
        failedSinks.add(cur);
      }
    }

    Status ret = null;
    while (activeSink != null) {
      try {
        ret = activeSink.process();
        return ret;
      } catch (Exception e) {
        logger.warn("Sink {} failed and has been sent to failover list",
                activeSink.getName(), e);
        activeSink = moveActiveToDeadAndGetNext();
      }
    }

    throw new EventDeliveryException("All sinks failed to process, " +
        "nothing left to failover to");
  }

2、LoadBalancingSinkProcessor是负载Sink处理器首先我们和ChannelProcessor一样,我们也要重点说明一下SinkSelector这个选择器。先看一下LoadBalancingSinkProcessor extends AbstractSinkProcessor的configure方法的部分代码:

  public interface SinkSelector extends Configurable, LifecycleAware {
    void setSinks(List<Sink> sinks);
    Iterator<Sink> createSinkIterator();
    void informSinkFailed(Sink failedSink);
  }

    if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
      selector = new RoundRobinSinkSelector(shouldBackOff);
    } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
      selector = new RandomOrderSinkSelector(shouldBackOff);
    } else {
      try {
        @SuppressWarnings("unchecked")
        Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>)
            Class.forName(selectorTypeName);
        selector = klass.newInstance();
      } catch (Exception ex) {
        throw new FlumeException("Unable to instantiate sink selector: "
            + selectorTypeName, ex);
      }
    }

结合上面的代码,再看类结构图如下:


注:RoundRobinSinkSelector是轮询选择器,RandomOrderSinkSelector是随机分配选择器。
以KafkaSink为例看一下Sink里面的具体实现:

public Status process() throws EventDeliveryException {
    Status result = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = null;
    Event event = null;
    String eventTopic = null;
    String eventKey = null;

    try {
      long processedEvents = 0;

      transaction = channel.getTransaction();
      transaction.begin();

      messageList.clear();
      for (; processedEvents < batchSize; processedEvents += 1) {
        event = channel.take();

        if (event == null) {
          // no events available in channel
          break;
        }

        byte[] eventBody = event.getBody();
        Map<String, String> headers = event.getHeaders();

        if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
          eventTopic = topic;
        }

        eventKey = headers.get(KEY_HDR);

        if (logger.isDebugEnabled()) {
          logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
            + new String(eventBody, "UTF-8"));
          logger.debug("event #{}", processedEvents);
        }

        // create a message and add to buffer
        KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
        messageList.add(data);

      }

      // publish batch and commit.
      if (processedEvents > 0) {
        long startTime = System.nanoTime();
        producer.send(messageList);
        long endTime = System.nanoTime();
        counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
        counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
      }

      transaction.commit();

    } catch (Exception ex) {
      String errorMsg = "Failed to publish events";
      logger.error("Failed to publish events", ex);
      result = Status.BACKOFF;
      if (transaction != null) {
        try {
          transaction.rollback();
          counter.incrementRollbackCount();
        } catch (Exception e) {
          logger.error("Transaction rollback failed", e);
          throw Throwables.propagate(e);
        }
      }
      throw new EventDeliveryException(errorMsg, ex);
    } finally {
      if (transaction != null) {
        transaction.close();
      }
    }

    return result;
  }

注:方法从channel中不断的获取数据,然后通过Kafka的producer生产者将消息发送到Kafka里面

主要参考://www.greatytc.com/p/0187459831af

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容

  • 博客原文 翻译作品,水平有限,如有错误,烦请留言指正。原文请见 官网英文文档 引言 概述 Apache Flume...
    rabbitGYK阅读 11,456评论 13 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 介绍 概述 Apache Flume是为有效收集聚合和移动大量来自不同源到中心数据存储而设计的可分布,可靠的,可用...
    ximengchj阅读 3,518评论 0 13
  • 介绍 概述 Apache Flume是一个分布式的,可靠的,高可用的系统,用于高效地从多个不同的数据源收集,汇总及...
    steanxy阅读 1,058评论 0 1
  • //聊聊Flume和Logstash的那些事儿 - 简书//www.greatytc.com/p/349e...
    葡萄喃喃呓语阅读 5,379评论 1 12