概述
虽然在上一篇博文《flume - 启动过程分析(1)》我们已经了解了flume相关组件的配置的加载以及启动过程,但却遗漏了组件初始化的过程,也就是说缺少了根据配置生成组件的过程,这篇文章就是为了弥补这个过程。
希望通过这篇文章我们能够了解,如何根据解析完的配置生成source、channel、sink这三个组件。
后面会再通过一篇博文针对每个组件会通过举一个例子来说明组件的启动过程,这样组件的初始化和启动就讲解清楚了,当然本篇文章还是着重于讲清楚组件的初始化过程。
配置加载
- 回顾下配置的加载过程,可以从源码的注释当中其实配置的解析过程主要分成两个步骤,step1过程主要用于解析properties的配置信息,step2针对step1解析的结果做二次解析,用于生成source、channel、sink特有的配置信息。
public FlumeConfiguration(Map<String, String> properties) {
agentConfigMap = new HashMap<>();
errors = new LinkedList<>();
//step1 负责解析配置文件,生成source、channel、sink的配置信息。
for (Entry<String, String> entry : properties.entrySet()) {
if (!addRawProperty(entry.getKey(), entry.getValue())) {
LOGGER.warn("Configuration property ignored: {} = {}", entry.getKey(), entry.getValue());
}
}
//step2 根据上一步的配置进行根据source、channel、sink的属性进一步进行解析。
validateConfiguration();
}
- 对比下flume的AgentConfiguration配置对象,源码中我把step1和step2过程初始化的变量都进行了注释了,这样子我想大家应该就够一目了然了吧。
public static class AgentConfiguration {
//step1当中初始化的变量
private final String agentName;
private String configFilters;
private String sources;
private String sinks;
private String channels;
private String sinkgroups;
//step2当中初始化的变量
private final Map<String, ComponentConfiguration> sourceConfigMap;
private final Map<String, ComponentConfiguration> sinkConfigMap;
private final Map<String, ComponentConfiguration> channelConfigMap;
private final Map<String, ComponentConfiguration> sinkgroupConfigMap;
private final Map<String, ComponentConfiguration> configFilterConfigMap;
//step1当中初始化的变量
private Map<String, Context> configFilterContextMap;
private Map<String, Context> sourceContextMap;
private Map<String, Context> sinkContextMap;
private Map<String, Context> channelContextMap;
private Map<String, Context> sinkGroupContextMap;
//step2当中初始化的变量
private Set<String> sinkSet;
private Set<String> configFilterSet;
private Set<String> sourceSet;
private Set<String> channelSet;
private Set<String> sinkgroupSet;
private final List<FlumeConfigurationError> errorList;
private List<ConfigFilter> configFiltersInstances;
private Map<String, Pattern> configFilterPatternCache;
- 下面我们针对step2的过程(也就是validateConfiguration)进行下细分,因为里面涉及到后面我们组件初始化用到的变量。agentConfigMap中保存着agent对应的配置信息AgentConfiguration。遍历每个AgentConfiguration进行配置验证,也就是aconf.isValid()部分的逻辑,继续跟进该部分逻辑。
private void validateConfiguration() {
Set<Entry<String, AgentConfiguration>> entries = agentConfigMap.entrySet();
Iterator<Entry<String, AgentConfiguration>> it = entries.iterator();
while (it.hasNext()) {
Entry<String, AgentConfiguration> next = it.next();
String agentName = next.getKey();
AgentConfiguration aconf = next.getValue();
//todo aconf是agent的配置文件,我们对整个配置文件进行校验
if (!aconf.isValid()) {
LOGGER.warn("Agent configuration invalid for agent '{}'. It will be removed.", agentName);
addError(agentName, AGENT_CONFIGURATION_INVALID, ERROR);
it.remove();
}
LOGGER.debug("Channels:{}\n", aconf.channels);
LOGGER.debug("Sinks {}\n", aconf.sinks);
LOGGER.debug("Sources {}\n", aconf.sources);
}
- aconf.isValid()部分的逻辑,我们可以看出来我们初始化了configFilterSet、channelSet、sourceSet、sinkSet、sinkgroupSet。然后这里证明了step2中初始化的变量。接着我们接着跟进validateChannels、validateSources、validateSinks这三个过程,之所以关注着三个过程我想大家都能理解,毕竟flume的核心组件无非就是channel、source、sink。
private boolean isValid() {
LOGGER.debug("Starting validation of configuration for agent: {}", agentName);
if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
LOGGER.debug("Initial configuration: {}", getPrevalidationConfig());
}
configFilterSet = validateConfigFilterSet();
createConfigFilters();
runFiltersThroughConfigs();
// Make sure that at least one channel is specified
if (channels == null || channels.trim().isEmpty()) {
LOGGER.warn(
"Agent configuration for '{}' does not contain any channels. Marking it as invalid.",
agentName
);
addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR);
return false;
}
//todo 这里用于解析所有channel的名字,\\s+代表空格等分隔符
channelSet = new HashSet<>(Arrays.asList(channels.split("\\s+")));
//todo 核心在于验证里面的channel
channelSet = validateChannels(channelSet);
if (channelSet.isEmpty()) {
LOGGER.warn(
"Agent configuration for '{}' does not contain any valid channels. " +
"Marking it as invalid.",
agentName
);
addError(CONFIG_CHANNELS, PROPERTY_VALUE_NULL, ERROR);
return false;
}
//todo 核心的处理source】channel、sink的逻辑
sourceSet = validateSources(channelSet);
sinkSet = validateSinks(channelSet);
sinkgroupSet = validateGroups(sinkSet);
// If no sources or sinks are present, then this is invalid
if (sourceSet.isEmpty() && sinkSet.isEmpty()) {
LOGGER.warn(
"Agent configuration for '{}' has no sources or sinks. Will be marked invalid.",
agentName
);
addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, ERROR);
addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, ERROR);
return false;
}
// Now rewrite the sources/sinks/channels
this.configFilters = getSpaceDelimitedList(configFilterSet);
sources = getSpaceDelimitedList(sourceSet);
channels = getSpaceDelimitedList(channelSet);
sinks = getSpaceDelimitedList(sinkSet);
sinkgroups = getSpaceDelimitedList(sinkgroupSet);
if (LOGGER.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
LOGGER.debug("Post validation configuration for {}", agentName);
LOGGER.debug(getPostvalidationConfig());
}
return true;
}
- 分析channel的校验过程,跟进validateChannels的过程,核心点在于将有配置信息的channel放置到channelConfigMap,把没有配置信息的channel放置到channelContextMap。ComponentConfigurationFactory.create根据channel的type进行创建,channel的type在下面的源码当中,create过程中对于指定类不存在情况我们虽然创建了ChannelConfiguration,但是属于isNotFoundConfigClass,然后会放置channelContextMap当中。
private Set<String> validateChannels(Set<String> channelSet) {
Iterator<String> iter = channelSet.iterator();
Map<String, Context> newContextMap = new HashMap<>();
ChannelConfiguration conf = null;
//针对每个channel进行分析
while (iter.hasNext()) {
String channelName = iter.next();
//todo channelContextMap保存了所有的配置信息
Context channelContext = channelContextMap.get(channelName);
// Context exists in map.
if (channelContext != null) {
//todo 正常情况这里取的type是file,所以后面走的是else分支
ChannelType chType = getKnownChannel(channelContext.getString(
BasicConfigurationConstants.CONFIG_TYPE));
boolean configSpecified = false;
String config = null;
if (chType == null) {
// 省略不重要的代码
} else {
config = chType.toString().toUpperCase(Locale.ENGLISH);
configSpecified = true;
}
try {
// 根据flume的ChannelType创建对应的配置文件ChannelConfiguration并根据channelContext进行初始化。
conf =
(ChannelConfiguration) ComponentConfigurationFactory.create(
channelName, config, ComponentType.CHANNEL);
//根据原来的channelContext重新初始化conf对象。
if (conf != null) {
conf.configure(channelContext);
}
//没有相关配置信息的channel放在newContextMap当中。
if ((configSpecified && conf.isNotFoundConfigClass()) ||
!configSpecified) {
newContextMap.put(channelName, channelContext);
} else if (configSpecified) {
//有配置信息的channel放在channelConfigMap当中
channelConfigMap.put(channelName, conf);
}
if (conf != null) {
errorList.addAll(conf.getErrors());
}
}
}
}
//channelContextMap保存了没有配置信息的channel,channelConfigMap保存有配置信息的channel。
channelContextMap = newContextMap;
Set<String> tempchannelSet = new HashSet<String>();
tempchannelSet.addAll(channelConfigMap.keySet());
tempchannelSet.addAll(channelContextMap.keySet());
channelSet.retainAll(tempchannelSet);
return channelSet;
}
- 所有支持的channel类型,这里就不一一详细讲解了,后面会针对每种channel进行分析。
public enum ChannelType implements ComponentWithClassName {
OTHER(null),
FILE("org.apache.flume.channel.file.FileChannel"),
MEMORY("org.apache.flume.channel.MemoryChannel"),
JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
SPILLABLEMEMORY("org.apache.flume.channel.SpillableMemoryChannel");
- 分析source的校验过程,跟进validateSources的过程。核心点在于将有配置信息的source放置到sourceConfigMap,把没有配置信息的source放置到sourceContextMap。ComponentConfigurationFactory.create根据source的type进行创建,source的type在下面的源码当中。create过程中对于指定类不存在情况我们虽然创建了SourceConfiguration,但是属于isNotFoundConfigClass,然后会放置sourceContextMap当中。
private Set<String> validateSources(Set<String> channelSet) {
//Arrays.split() call will throw NPE if the sources string is empty
if (sources == null || sources.isEmpty()) {
LOGGER.warn("Agent configuration for '{}' has no sources.", agentName);
addError(CONFIG_SOURCES, PROPERTY_VALUE_NULL, WARNING);
return new HashSet<String>();
}
//todo 空格进行分割的sources
Set<String> sourceSet =
new HashSet<String>(Arrays.asList(sources.split("\\s+")));
Map<String, Context> newContextMap = new HashMap<String, Context>();
Iterator<String> iter = sourceSet.iterator();
SourceConfiguration srcConf = null;
//todo 遍历每个sources进行配置的解析
while (iter.hasNext()) {
String sourceName = iter.next();
Context srcContext = sourceContextMap.get(sourceName);
String config = null;
boolean configSpecified = false;
if (srcContext != null) {
//todo 获取sources的type
SourceType srcType = getKnownSource(srcContext.getString(
BasicConfigurationConstants.CONFIG_TYPE));
if (srcType == null) {
config = srcContext.getString(
CONFIG_CONFIG);
if (config == null || config.isEmpty()) {
config = "OTHER";
} else {
configSpecified = true;
}
} else {
config = srcType.toString().toUpperCase(Locale.ENGLISH);
configSpecified = true;
}
//todo 创建新的sources的配置信息
try {
// Possible reason the configuration can fail here:
// Old component is configured directly using Context
srcConf =
(SourceConfiguration) ComponentConfigurationFactory.create(
sourceName, config, ComponentType.SOURCE);
//todo 用旧的配置来初始化新的srcConf配置
if (srcConf != null) {
srcConf.configure(srcContext);
Set<String> channels = new HashSet<String>();
if (srcConf.getChannels() != null) {
channels.addAll(srcConf.getChannels());
}
channels.retainAll(channelSet);
if (channels.isEmpty()) {
throw new ConfigurationException(
"No Channels configured for " + sourceName);
}
srcContext.put(CONFIG_CHANNELS,
this.getSpaceDelimitedList(channels));
}
if ((configSpecified && srcConf.isNotFoundConfigClass()) ||
!configSpecified) {
newContextMap.put(sourceName, srcContext);
} else if (configSpecified) {
//todo 把最新的配置放置到sourceConfigMap当中
sourceConfigMap.put(sourceName, srcConf);
}
if (srcConf != null) errorList.addAll(srcConf.getErrors());
} catch (ConfigurationException e) {
if (srcConf != null) errorList.addAll(srcConf.getErrors());
iter.remove();
LOGGER.warn(
"Could not configure source {} due to: {}",
new Object[]{sourceName, e.getMessage(), e}
);
}
} else {
iter.remove();
addError(sourceName, CONFIG_ERROR, ERROR);
LOGGER.warn("Configuration empty for: {}.Removed.", sourceName);
}
}
// validateComponent(sourceSet, sourceConfigMap, CLASS_SOURCE, ATTR_TYPE,
// ATTR_CHANNELS);
sourceContextMap = newContextMap;
Set<String> tempsourceSet = new HashSet<String>();
tempsourceSet.addAll(sourceContextMap.keySet());
tempsourceSet.addAll(sourceConfigMap.keySet());
sourceSet.retainAll(tempsourceSet);
return sourceSet;
}
- flume支持的source类型,可以大概看看,后面抽几个核心的分析一下。
public enum SourceType implements ComponentWithClassName {
OTHER(null),
SEQ("org.apache.flume.source.SequenceGeneratorSource"),
NETCAT("org.apache.flume.source.NetcatSource"),
EXEC("org.apache.flume.source.ExecSource"),
AVRO("org.apache.flume.source.AvroSource"),
SYSLOGTCP("org.apache.flume.source.SyslogTcpSource"),
MULTIPORT_SYSLOGTCP("org.apache.flume.source.MultiportSyslogTCPSource"),
SYSLOGUDP("org.apache.flume.source.SyslogUDPSource"),
SPOOLDIR("org.apache.flume.source.SpoolDirectorySource"),
HTTP("org.apache.flume.source.http.HTTPSource"),
THRIFT("org.apache.flume.source.ThriftSource"),
JMS("org.apache.flume.source.jms.JMSSource"),
TAILDIR("org.apache.flume.source.taildir.TaildirSource"),
NETCATUDP("org.apache.flume.source.NetcatUdpSource")
- 分析sink的校验过程,跟进validateSinks的过程。核心点在于将有配置信息的sink放置到sinkConfigMap,把没有配置信息的sink放置到sinkContextMap。然后额外多提一点就是ComponentConfigurationFactory.create根据sink的type进行创建,sink的type在下面的源码当中。create过程中对于指定类不存在情况我们虽然创建了SinkConfiguration,但是属于isNotFoundConfigClass,然后会放置sinkContextMap当中。
private Set<String> validateSinks(Set<String> channelSet) {
// Preconditions.checkArgument(channelSet != null && channelSet.size() >
// 0);
Map<String, Context> newContextMap = new HashMap<String, Context>();
Set<String> sinkSet;
SinkConfiguration sinkConf = null;
if (sinks == null || sinks.isEmpty()) {
LOGGER.warn("Agent configuration for '{}' has no sinks.", agentName);
addError(CONFIG_SINKS, PROPERTY_VALUE_NULL, WARNING);
return new HashSet<String>();
} else {
sinkSet =
new HashSet<String>(Arrays.asList(sinks.split("\\s+")));
}
Iterator<String> iter = sinkSet.iterator();
while (iter.hasNext()) {
//todo 这里在遍历所有sink的名字
String sinkName = iter.next();
Context sinkContext = sinkContextMap.get(sinkName.trim());
if (sinkContext == null) {
iter.remove();
LOGGER.warn("no context for sink{}", sinkName);
addError(sinkName, CONFIG_ERROR, ERROR);
} else {
String config = null;
boolean configSpecified = false;
SinkType sinkType = getKnownSink(sinkContext.getString(
BasicConfigurationConstants.CONFIG_TYPE));
if (sinkType == null) {
config = sinkContext.getString(
CONFIG_CONFIG);
if (config == null || config.isEmpty()) {
config = "OTHER";
} else {
configSpecified = true;
}
} else {
config = sinkType.toString().toUpperCase(Locale.ENGLISH);
configSpecified = true;
}
try {
LOGGER.debug("Creating sink: {} using {}", sinkName, config);
//todo 创建SinkConfigration对象
sinkConf =
(SinkConfiguration) ComponentConfigurationFactory.create(
sinkName, config, ComponentType.SINK);
if (sinkConf != null) {
//todo 初始化sink配置
sinkConf.configure(sinkContext);
}
if (!channelSet.contains(sinkConf.getChannel())) {
throw new ConfigurationException("Channel " +
sinkConf.getChannel() + " not in active set.");
}
if ((configSpecified && sinkConf.isNotFoundConfigClass()) ||
!configSpecified) {
newContextMap.put(sinkName, sinkContext);
} else if (configSpecified) {
//todo sinkConfigMap保存了有配置的sink的配置
sinkConfigMap.put(sinkName, sinkConf);
}
if (sinkConf != null) errorList.addAll(sinkConf.getErrors());
} catch (ConfigurationException e) {
iter.remove();
if (sinkConf != null) errorList.addAll(sinkConf.getErrors());
LOGGER.warn(
"Could not configure sink {} due to: {}",
new Object[]{sinkName, e.getMessage(), e}
);
}
}
// Filter out any sinks that have invalid channel
}
//todo 重置了sinkContextMap对象
sinkContextMap = newContextMap;
Set<String> tempSinkset = new HashSet<String>();
tempSinkset.addAll(sinkConfigMap.keySet());
tempSinkset.addAll(sinkContextMap.keySet());
sinkSet.retainAll(tempSinkset);
return sinkSet;
}
- flume支持的sink类型,可以大概看看,后面抽几个核心的分析一下。
public enum SinkType implements ComponentWithClassName {
OTHER(null),
NULL("org.apache.flume.sink.NullSink"),
LOGGER("org.apache.flume.sink.LoggerSink"),
FILE_ROLL("org.apache.flume.sink.RollingFileSink"),
HDFS("org.apache.flume.sink.hdfs.HDFSEventSink"),
IRC("org.apache.flume.sink.irc.IRCSink"),
AVRO("org.apache.flume.sink.AvroSink"),
THRIFT("org.apache.flume.sink.ThriftSink"),
ELASTICSEARCH("org.apache.flume.sink.elasticsearch.ElasticSearchSink"),
HBASE("org.apache.flume.sink.hbase.HBaseSink"),
ASYNCHBASE("org.apache.flume.sink.hbase.AsyncHBaseSink"),
MORPHLINE_SOLR("org.apache.flume.sink.solr.morphline.MorphlineSolrSink"),
HIVE("org.apache.flume.sink.hive.HiveSink"),
HTTP("org.apache.flume.sink.http.HttpSink");
- 需要对配置ComponentConfigurationFactory.create进行重点讲解,否则有可能绕不出创建对象的逻辑,这里的type我们传进去完整的class路径、类型,所以这里先以完整的类去进行加载类,加载失败走Exception分支判断type类型进行创建。
public class ComponentConfigurationFactory {
@SuppressWarnings("unchecked")
public static ComponentConfiguration create(String name, String type, ComponentType component)
throws ConfigurationException {
Class<? extends ComponentConfiguration> confType = null;
if (type == null) {
throw new ConfigurationException(
"Cannot create component without knowing its type!");
}
try {
//todo type如果是指定类且加载成功就用这个类,如果类不存在或者指定的是类型,那么就走的Exception分支。
confType = (Class<? extends ComponentConfiguration>) Class.forName(type);
return confType.getConstructor(String.class).newInstance(type);
} catch (Exception ignored) {
try {
//todo 我们正常配置的type=File之类的走的是这个分支
type = type.toUpperCase(Locale.ENGLISH);
switch (component) {
case SOURCE:
return SourceConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
.getConfiguration(name);
case CONFIG_FILTER:
return ConfigFilterConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
.getConfiguration(name);
case SINK:
return SinkConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
.getConfiguration(name);
case CHANNEL:
return ChannelConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
.getConfiguration(name);
case SINK_PROCESSOR:
return SinkProcessorConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
.getConfiguration(name);
case CHANNELSELECTOR:
return ChannelSelectorConfigurationType.valueOf(type.toUpperCase(Locale.ENGLISH))
.getConfiguration(name);
case SINKGROUP:
return new SinkGroupConfiguration(name);
default:
throw new ConfigurationException(
"Cannot create configuration. Unknown Type specified: " + type);
}
} catch (ConfigurationException e) {
throw e;
} catch (Exception e) {
throw new ConfigurationException("Could not create configuration! " +
" Due to " + e.getClass().getSimpleName() + ": " + e.getMessage(),
e);
}
}
}
}
- 针对上面提到的Exception分支的getConfiguration方法进行分析,这里也是比较核心的,我们以SourceConfigurationType的getConfiguration为例进行分析,其他的几个逻辑也是类似的。从看到的源码当中我们srcConfigurationName配置的类非常可能存在找不到的情况,那么就可以走Exception分支,然后就创建了NotFoundConfigClass类型的SourceConfiguration对象,其他的channel、sink也存在类似的情况。
public SourceConfiguration getConfiguration(String name)
throws ConfigurationException {
if (this == OTHER) {
return new SourceConfiguration(name);
}
Class<? extends SourceConfiguration> clazz = null;
SourceConfiguration instance = null;
try {
if (srcConfigurationName != null) {
clazz =
(Class<? extends SourceConfiguration>) Class
.forName(srcConfigurationName);
instance = clazz.getConstructor(String.class).newInstance(name);
} else {
// Could not find the configuration stub, do basic validation
instance = new SourceConfiguration(name);
// Let the caller know that this was created because of this exception.
instance.setNotFoundConfigClass();
}
} catch (ClassNotFoundException e) {
//todo 因为上面的类都没有找到,所以应该走的这个分支,创建了SourceConfiguration对象并设置setNotFoundConfigClass
// Could not find the configuration stub, do basic validation
instance = new SourceConfiguration(name);
// Let the caller know that this was created because of this exception.
instance.setNotFoundConfigClass();
} catch (Exception e) {
throw new ConfigurationException("Error creating configuration", e);
}
return instance;
}
flume对象生成流程
- 我们通过loadChannels、loadSources、loadSinks等方法生成对象,然后通过addChannel、addSourceRunner、addSourceRunner添加到conf当中,最后根据conf启动所有服务,这里我们着重分析3个对象的load过程。
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
//todo 加载channel对象
loadChannels(agentConf, channelComponentMap);
//todo 加载source对象
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
//todo 加载sink对象
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap =
channelCache.get(channelComponent.channel.getClass());
if (nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}
channel对象初始化过程
- 整个逻辑我们可以看出来主要分为3步,分别是根据channelConfigMap初始化channel,根据channelContextMap初始化channel,移除多余的channel(这部分是为了兼容动态flume配置变更设计的),我们分析一下一个Channel的创建过程。
private void loadChannels(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap)
throws InstantiationException {
LOGGER.info("Creating channels");
//todo channelsNotReused记录旧的channel对象
ListMultimap<Class<? extends Channel>, String> channelsNotReused =
ArrayListMultimap.create();
// assume all channels will not be re-used
for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
channelCache.entrySet()) {
Class<? extends Channel> channelKlass = entry.getKey();
Set<String> channelNames = entry.getValue().keySet();
channelsNotReused.get(channelKlass).addAll(channelNames);
}
//todo 获取所有channel的名字
Set<String> channelNames = agentConf.getChannelSet();
//todo channelConfigMap获取配置,获取有配置信息的channel并进行初始化
Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
/*
* Components which have a ComponentConfiguration object
*/
for (String chName : channelNames) {
//todo 从compMap中找对象,getChannelSet,获取没有配置信息的channel并进行初始化
ComponentConfiguration comp = compMap.get(chName);
if (comp != null) {
//todo 会把需要重用的channel从channelsNotReused移除表明已经重用了,同时新增新的channel。
Channel channel = getOrCreateChannel(channelsNotReused,
comp.getComponentName(), comp.getType());
try {
//todo 核心地方在于这里负责向channel加载配置,相当于初始化channel对象
Configurables.configure(channel, comp);
//todo channelComponentMap保存着所有的channel
channelComponentMap.put(comp.getComponentName(),
new ChannelComponent(channel));
LOGGER.info("Created channel " + chName);
} catch (Exception e) {
String msg = String.format("Channel %s has been removed due to an " +
"error during configuration", chName);
LOGGER.error(msg, e);
}
}
}
//todo 负责把所有的channel执行以下配置,这里应该包括旧的channel
for (String chName : channelNames) {
//todo 从ChannelContext中找对象,channelContextMap获取配置
Context context = agentConf.getChannelContext().get(chName);
if (context != null) {
Channel channel = getOrCreateChannel(channelsNotReused, chName,
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
try {
//todo 核心地方在于这里负责向channel加载配置,相当于初始化channel对象
Configurables.configure(channel, context);
channelComponentMap.put(chName, new ChannelComponent(channel));
LOGGER.info("Created channel " + chName);
} catch (Exception e) {
String msg = String.format("Channel %s has been removed due to an " +
"error during configuration", chName);
LOGGER.error(msg, e);
}
}
}
//todo 移除不需要的配置信息
for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
Map<String, Channel> channelMap = channelCache.get(channelKlass);
if (channelMap != null) {
for (String channelName : channelsNotReused.get(channelKlass)) {
if (channelMap.remove(channelName) != null) {
LOGGER.info("Removed {} of type {}", channelName, channelKlass);
}
}
if (channelMap.isEmpty()) {
channelCache.remove(channelKlass);
}
}
}
}
- 单个channel的创建过程,分为两步走包括创建channel 和 配置channel,对应的函数是getOrCreateChannel和configure两者。
- 通过channelFactory.getClass(type)获取对应的channelClass,然后通过channelFactory.create(name, type)创建对象。
private Channel getOrCreateChannel(
ListMultimap<Class<? extends Channel>, String> channelsNotReused,
String name, String type)
throws FlumeException {
Class<? extends Channel> channelClass = channelFactory.getClass(type);
/*
* Channel has requested a new instance on each re-configuration
*/
if (channelClass.isAnnotationPresent(Disposable.class)) {
Channel channel = channelFactory.create(name, type);
channel.setName(name);
return channel;
}
//todo channelCache是以channel的class作为key,value为map(key为channel的name,value为channel的实例)
Map<String, Channel> channelMap = channelCache.get(channelClass);
if (channelMap == null) {
channelMap = new HashMap<String, Channel>();
channelCache.put(channelClass, channelMap);
}
//todo name代表的是channel的名字
Channel channel = channelMap.get(name);
if (channel == null) {
//todo 创建channel对象
channel = channelFactory.create(name, type);
channel.setName(name);
channelMap.put(name, channel);
}
//todo 从channelsNotReused移除旧的channel对象
channelsNotReused.get(channelClass).remove(name);
return channel;
}
- channelFactory.getClass主要是从ChannelType中获取type对应的class,然后通过class.newInstance()方法创建对象。
@Override
public Channel create(String name, String type) throws FlumeException {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(type, "type");
logger.info("Creating instance of channel {} type {}", name, type);
Class<? extends Channel> channelClass = getClass(type);
try {
return channelClass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to create channel: " + name
+ ", type: " + type + ", class: " + channelClass.getName(), ex);
}
}
@SuppressWarnings("unchecked")
@Override
public Class<? extends Channel> getClass(String type) throws FlumeException {
String channelClassName = type;
ChannelType channelType = ChannelType.OTHER;
try {
channelType = ChannelType.valueOf(type.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException ex) {
logger.debug("Channel type {} is a custom type", type);
}
if (!channelType.equals(ChannelType.OTHER)) {
channelClassName = channelType.getChannelClassName();
}
try {
return (Class<? extends Channel>) Class.forName(channelClassName);
} catch (Exception ex) {
throw new FlumeException("Unable to load channel type: " + type
+ ", class: " + channelClassName, ex);
}
}
- ChannelType的类型的定义如下
public enum ChannelType implements ComponentWithClassName {
OTHER(null),
FILE("org.apache.flume.channel.file.FileChannel"),
MEMORY("org.apache.flume.channel.MemoryChannel"),
JDBC("org.apache.flume.channel.jdbc.JdbcChannel"),
}
- Configurables.configure的主要配置channel,进一步通过指定的channel的configure方法实现数据的配置,通过传入不同的参数来初始化配置。
---------------------*****step1********--------------------------------------
public static boolean configure(Object target, Context context) {
if (target instanceof Configurable) {
((Configurable) target).configure(context);
return true;
}
return false;
}
public static boolean configure(Object target, ComponentConfiguration conf) {
if (target instanceof ConfigurableComponent) {
((ConfigurableComponent) target).configure(conf);
return true;
}
return false;
}
---------------------*****step2********--------------------------------------
@Override
public void configure(Context context) {
provider = JdbcChannelProviderFactory.getProvider(context, getName());
LOG.info("JDBC Channel initialized: " + getName());
}
---------------------*****step3********--------------------------------------
public static synchronized JdbcChannelProvider getProvider(
Context context, String name) {
if (PROVIDER == null) {
PROVIDER = new JdbcChannelProviderImpl();
PROVIDER.initialize(context);
}
if (!INSTANCES.add(name)) {
throw new JdbcChannelException("Attempt to initialize multiple "
+ "channels with same name: " + name);
}
return PROVIDER;
}
---------------------*****step4********--------------------------------------
public void initialize(Context context) {
LOGGER.debug("Initializing JDBC Channel provider");
initializeSystemProperties(context);
initializeDataSource(context);
initializeSchema(context);
initializeChannelState(context);
}
private void initializeSystemProperties(Context context) {}
private void initializeChannelState(Context context) {}
private void initializeSchema(Context context) {}
private void initializeDataSource(Context context) {}
sources对象初始化过程
- source的创建过程channel很类似,也经过sourceFactory.create创建source对象和Configurables.configure配置source对象,唯一多的步骤就是source需要和channel进行关联,由于比较相似细节就不继续跟进了。
private void loadSources(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap,
Map<String, SourceRunner> sourceRunnerMap)
throws InstantiationException {
Set<String> sourceNames = agentConf.getSourceSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSourceConfigMap();
/*
* Components which have a ComponentConfiguration object
*/
for (String sourceName : sourceNames) {
ComponentConfiguration comp = compMap.get(sourceName);
if (comp != null) {
SourceConfiguration config = (SourceConfiguration) comp;
//todo 创建一个source对象
Source source = sourceFactory.create(comp.getComponentName(),
comp.getType());
try {
//todo 配置sources
Configurables.configure(source, config);
//todo 通过source的config来获取对应的channel的信息,也就是说连接信息
Set<String> channelNames = config.getChannels();
List<Channel> sourceChannels = new ArrayList<Channel>();
for (String chName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(chName);
if (channelComponent != null) {
sourceChannels.add(channelComponent.channel);
}
}
//todo source没有连接任何的channel
if (sourceChannels.isEmpty()) {
String msg = String.format("Source %s is not connected to a " +
"channel", sourceName);
throw new IllegalStateException(msg);
}
//todo sources的配置当中包含ChannelSelectorConfiguration
ChannelSelectorConfiguration selectorConfig =
config.getSelectorConfiguration();
//todo 创建ChannelSelector对象,默认是复制的selector
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
//todo 其中包括了selector以及拦截器对象,并通过config设置channelProcessor
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
//todo 关联channelProcessor到source当中
source.setChannelProcessor(channelProcessor);
//todo 内部创建了runner
sourceRunnerMap.put(comp.getComponentName(),
SourceRunner.forSource(source));
//todo 反关联channel到source的连接
for (Channel channel : sourceChannels) {
ChannelComponent channelComponent =
Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
String.format("Channel %s", channel.getName()));
channelComponent.components.add(sourceName);
}
} catch (Exception e) {
String msg = String.format("Source %s has been removed due to an " +
"error during configuration", sourceName);
LOGGER.error(msg, e);
}
}
}
/*
* Components which DO NOT have a ComponentConfiguration object
* and use only Context
*/
Map<String, Context> sourceContexts = agentConf.getSourceContext();
for (String sourceName : sourceNames) {
Context context = sourceContexts.get(sourceName);
if (context != null) {
Source source =
sourceFactory.create(sourceName,
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
try {
Configurables.configure(source, context);
List<Channel> sourceChannels = new ArrayList<Channel>();
String[] channelNames = context.getString(
BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
for (String chName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(chName);
if (channelComponent != null) {
sourceChannels.add(channelComponent.channel);
}
}
if (sourceChannels.isEmpty()) {
String msg = String.format("Source %s is not connected to a " +
"channel", sourceName);
throw new IllegalStateException(msg);
}
Map<String, String> selectorConfig = context.getSubProperties(
BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, context);
source.setChannelProcessor(channelProcessor);
sourceRunnerMap.put(sourceName,
SourceRunner.forSource(source));
for (Channel channel : sourceChannels) {
ChannelComponent channelComponent =
Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
String.format("Channel %s", channel.getName()));
channelComponent.components.add(sourceName);
}
} catch (Exception e) {
String msg = String.format("Source %s has been removed due to an " +
"error during configuration", sourceName);
LOGGER.error(msg, e);
}
}
}
}
sink对象初始化过程
- sink的创建过程channel很类似,也经过sinkFactory.create创建sink对象和Configurables.configure配置sink对象,唯一多的步骤就是sink需要和channel进行关联,由于比较相似细节就不继续跟进了。
private void loadSinks(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
throws InstantiationException {
//todo 获取sink的集合
Set<String> sinkNames = agentConf.getSinkSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSinkConfigMap();
Map<String, Sink> sinks = new HashMap<String, Sink>();
/*
* Components which have a ComponentConfiguration object
*/
for (String sinkName : sinkNames) {
//todo 根据sink的配置创建sink对象
ComponentConfiguration comp = compMap.get(sinkName);
if (comp != null) {
SinkConfiguration config = (SinkConfiguration) comp;
Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
try {
//todo 配置sink对象
Configurables.configure(sink, config);
//todo 找到sink关联的channel
ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
if (channelComponent == null) {
String msg = String.format("Sink %s is not connected to a " +
"channel", sinkName);
throw new IllegalStateException(msg);
}
//todo sink对象设置channel对象
sink.setChannel(channelComponent.channel);
sinks.put(comp.getComponentName(), sink);
channelComponent.components.add(sinkName);
} catch (Exception e) {
String msg = String.format("Sink %s has been removed due to an " +
"error during configuration", sinkName);
LOGGER.error(msg, e);
}
}
}
/*
* Components which DO NOT have a ComponentConfiguration object
* and use only Context
* 处理没有配置信息的sink对象
*/
Map<String, Context> sinkContexts = agentConf.getSinkContext();
for (String sinkName : sinkNames) {
Context context = sinkContexts.get(sinkName);
if (context != null) {
Sink sink = sinkFactory.create(sinkName, context.getString(
BasicConfigurationConstants.CONFIG_TYPE));
try {
Configurables.configure(sink, context);
ChannelComponent channelComponent =
channelComponentMap.get(
context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
if (channelComponent == null) {
String msg = String.format("Sink %s is not connected to a " +
"channel", sinkName);
throw new IllegalStateException(msg);
}
sink.setChannel(channelComponent.channel);
sinks.put(sinkName, sink);
channelComponent.components.add(sinkName);
} catch (Exception e) {
String msg = String.format("Sink %s has been removed due to an " +
"error during configuration", sinkName);
LOGGER.error(msg, e);
}
}
}
loadSinkGroups(agentConf, sinks, sinkRunnerMap);
}