前言
Source是负责接收数据到Flume Agent的组件。Source可以从其他系统接收数据。Source也可以用于接受其他Flume Agent的Sink通过RPC发送来的数据。毫不夸张的说,Source可以接受任何来源的数据。
Source的基本配置
Source像所有的Fluem组件一样,需要在配置文件中指定它的类型,可以是FQCN或者内置Source的别名,所有的Source都至少有一个用于写入的Channel。因此Channel的列表也是对应Source合理配置的必要参数。
配置的例子如下:
//source的别名
usingFlume.sources = usingFlumeSource
usingFlume.channels = memory
usingFlume.sources.usingFlumeSource.type = avro
usingFlume.sources.usingFlumeSource.channels = memory
usingFlume.sources.usingFlumeSource.port = 7777
usingFlume.sources.usingFlumeSource.bind = 0.0.0.0
当然Source也可以有一些可选的配置参数,可以用来配置拦截器和选择器。
参数 | 描述 |
---|---|
Interceptors | 代表一连串拦截器的名单 |
Interceptors.<Interceptors.name>.* | 传递给指定名称拦截器的参数 |
Selector | Channel选择器使用的别名 |
Selector.* | 传递给Channel选择器的配置参数 |
配置拦截器实例
usingFlume.source.avro.interceptor = i1 i2
usingFlume.source.avro.interceptor.i1.type = host
usingFlume.source.avro.interceptor.i1.preserveExsiting= true
usingFlume.source.avro.interceptor.i2.type = static
usingFlume.source.avro.interceptor.i2.key= header
usingFlume.source.avro.interceptor.i2.value= staticValue
配置选择器实例
usingFlume.source.avro.selector.type = multiplexing
usingFlume.source.avro.selector.header = priority
usingFlume.source.avro.selector.mapping.1 = channel1
usingFlume.source.avro.selector.mapping.2 = channel2
usingFlume.source.avro.selector.default = channel2
Sink-Source通信
Flume灵活性的重点就是Fluem的RPC Sink-to-Source的结合,RPC sink用来被设计给RPC source发送事件。Source能够接受大量的Sink或者RPC客户单发送的数据,尽管每个RPC Sink只能发送数据给一个RPC Source,但是每个Agent可以配置使用Sink组和Sink处理器,来发送数据给多个其他的Agent。
Avro Source
Flume主要的RPC-Source是Avro-Source。Avro-Source被设计为高扩展的RPC服务器端,能从其他的Flume Agent的Avro Sink或者使用Flume的SDK发送数据的客户端应用,接受数据到一个Flume Agent中。
Flume的Avro Source使用Netty-Avro inter-process的通信协议。
Avro-Source可以配置用来接受解压缩的Avro-Sink发送的压缩过的事件。也可以接受SSL加密后的数据。
Avro-Source配置
参数 | 描述 |
---|---|
type | avro,也可以用完整类别名称 |
bind | IP地址,或者主机名,0.0.0.0绑定机器的所有接口 |
threads | infinity,接受从客户端 或者Avro-Sink传入数据的最大工作线程数量 |
ssl | true/false是否启动SSL,如果启用需要keystore和keystore-password |
keystore | 使用ssl的keystore 路径 |
keystore-password | 打开keystore 的密码 |
keystore-type | 默认JKS0,使用keystore类型 |
compression-type | 解压缩数据的压缩格式,如zlib。如果要接受zlib压缩的数据,设置为deflate |
Avro-Source使用Netty服务器来处理请求,Netty服务器使用Java的非阻塞I/O,这保证了Netty服务器使用相对较少的线程处理高的请求数。Avro-Source允许配置线程的最大数量。
如果数据在广域网数据中心传播,配置compression-type是非常有用的,能够减少使用的带宽。如果这个参数没有设置,或者设置为none,而接受的数据都是压缩的,这可能会导致事件积压在前一阶段,因为Source将不能解析压缩数据,会给前一阶段返回错误,将导致前一阶段一直重试。
下面配置了一个SSL和压缩的Avro Source的例子
usingFlume.sources = avroSrc
usingFlume.channels = memChannel
usingFlume.sources.avroSrc.type = avro
usingFlume.sources.avroSrc.channels = memChannel
# 绑定到所有的接口
usingFlume.sources.avroSrc.bind = 0.0.0.0
usingFlume.sources.avroSrc.port = 4353
# SSL
usingFlume.sources.avroSrc.ssl = true
usingFlume.sources.avroSrc.keystore = /tem/keystore.jks
usingFlume.sources.avroSrc.keystore-password = usingFlume
usingFlume.sources.avroSrc.keystore-type = jks
# 解压缩zlib
usingFlume.sources.avroSrc.compression-type = deflate
# channel配置
usingFlume.channels.memChannel.type = memory
Avro Sink的配置
usingFlume.sinks = avroSink
usingFlume.channels = memChannel
# channel配置
usingFlume.channels.memChannel.type = memory
# sink配置
usingFlume.sinks.avroSink.type = avro
usingFlume.sinks.avroSink.channels = memChannel
usingFlume.sinks.avroSink.hostname = node001.com
usingFlume.sinks.avroSink.port= 4353
# SSL
usingFlume.sinks.avroSink.ssl = true
usingFlume.sinks.avroSink.trust-all-certs = /tem/keystore.jks
usingFlume.sinks.avroSink.truststore = /path/keystore
usingFlume.sinks.avroSink.truststore-password = usingFlume
usingFlume.sinks.avroSink.truststore-type= jks
# 解压缩 zlib
usingFlume.sinks.avroSink.compression-type = deflate
RPC Source的失败处理
如果Source配置用来写入的其中一个Channel由于写满,会抛出ChannelException异常,或者由于这次事务量太大,Source会给Sink返回一个失败的状态,来回调它并期望他重试,因为RPC Source通过线程池的线程接受数据,线程失败j只能导致线程终止。
这些异常或者失败,记录在异常的日志文件中,有时候这些异常可能指明了一个大问题,例如资源耗尽异常,OutOfMemoryError,如果频繁抛出ChannelException异常,意味着Channel分配的速率远小于写入的速率,或者Sink清理Channel中数据的速度不够快,如果Sink数量不足可以增加Sink数量,但是如果最终的目的地不能处理负载,则需要重新考虑这些问题。但不管在什么情况下,错误只会导致程序重复执行,但不会造成真正的数据丢失,因为只有当数据真正的写入到下一阶段,事件才会从Channel中移除。
Spooling Directory Source
在很多场景中,应用程序写入数据到文件,通常这些文件不是简单的文件,或者每一行转换为一个事件没有意义,例如堆栈跟踪,这种情况下,Flume的Spooling Directory Source可以被用来从这些文件中读取数据。
Spooling Directory Source监视读取事件的目录,文件一旦被移动到该目录,就不应该再被写入数据,同时目录中的文件名也不能重复,如果这两种情况发生,Source会抛出异常终止,此时只能重启Agent的Source。
Spooling Directory Source是使用tail -f 的Exec Source的一种好的替换方案。但是这种方式不能保证实时跟踪,只能在文件关闭移动到坚实目录才能读取文件,文件一旦被Source完全使用,且所有的事件都被成功写入Channel中,Source就可以基于配置重命名文件或者删除文件。重命名文件指的是给文件名添加一个后缀,如.COMPLETE,这个后缀是可以配置的。
Spooling Directory Source使用追踪器持久化到磁盘,定位每个文件在哪个位置成功将事件写入到Channel,如果Agent挂了,重启时,Source就知道从这个位置开始读取数据。由于Source重启时,从文件的上次处理位置开始,这也就是Source不允许文件重用的原因。
Spooling Directory Source 参数
参数 | 描述 |
---|---|
type | spooldir,也可以用完整类别名称 |
spoolDir | 监控的和读取文件的目录 |
batchSize | 默认值100,每次写入Channel的最大事件数量 |
ignorePatter | 默认值^& 正则表达式,文件名称匹配的正则表达式会被忽略 |
deletePolicy | 默认never |
fileSuffix | 后缀,默认.COMPLETE |
fileHeader | false 如果为true,完整路径/文件名会被添加到header |
fileHeaderKey | file文件路径 如果文件名被添加到header,此参数为header中使用的密钥。 |
trackerDir | .flumespool Spooling Directory Source存储元数据的目录,用来source中断时重启source |
deserializer | 默认line,行序列化器,Bulider类的别名或者完整类别名称,用来创建或读取自定义格式数据的反序列化器 |
inputCharste | 默认utf8,当反序列化器调用readChar方法使用的编码 |
Spooling Directory Source的类型是spooldir,Source读取指定目录的文件并逐个处理它们,处理目录的完整名称通过spoolDir 指定。由于性能的原因,Source批处理的最大数量由batchSize 指定,Source尽可能多的从文件中读取事件,直至达到批大小,如果文件中事件的数量不足批大小,一旦文件读取完成,就尽快提交事务。
Spooling Directory Source能从他中断的位置恢复,所以能避免重复消耗文件的数据,文件读取处理信息持久化到 到追踪目录,追踪目录一直在Source的监控目录中,目录名默认为.flumespool 。需要注意的是,一旦追踪目录的名称设置好,如果这个参数的值发生改变,Source将不能再定位文件处理的位置,可能导致重复消费数据。
Spooling Directory Source配置实例
agent.sources = spool
agent.channels = memChannel
agent.source.spool.type = spooldir
agent.source.spool.channels = memChannel
agent.source.spool.spoolDir = /data/flume/spool
agent.source.spool.batchSize = 250
agent.source.spool.deletePolicy = immediate
agent.source.spool.fileHeader = true
agent.source.spool.fileHeaderKey = usingFlumeFiles
agent.source.spool.deserializer = usingflume.ch03.ProtobufDeserializer$ProtobufDeserializerBuilder
agent.channel.memChannel.type = memory
agent.channel.memChannel.capacity = 10000
agent.channel.memChannel.transactionCapacity = 500
Spooling Directory Source的性能
Spooling Directory Source是IO密集型的,可以通过使用多个线程读取数据,更多的使用可用的CPU来提高性能。提高读取文件性能的一种方法是轮流写文件到不同的目录,并有Spooling Directory Source处理每一个目录,如果数据都传入到相同的目的地,则写入到相同的Channel,
Exec Source
Exec Source执行用户配置的命令,且基于命令的标准输出来生成事件,他还可以从命令中读取错误流,转换成Flume事件。Source希望命令不断的产生数据,并且吸收其输出,只要命令开始运行,Source就要不停的去运行处理。
接着输出流的每一行将被编码为字节数组,编码为UTF-8,然后每个字节数组用作Flume事件的Body,把他们写入到Channel,如果Channel已满,Source可以配置成停止读取流输出和错误流。
Exec Source配置
参数 | 描述 |
---|---|
type | exec,也可以用完整类别名称org.apache.flume.source.ExecSource |
command | source运行的命令 |
restart | 默认值false,设置为true时,如果流程死亡,Source将重启流程 |
restartThrottle | 默认值10000,重启命令需要等待的毫秒值,如果restart为false,则无影响 |
logStdErr | 默认false,如果设置为true,错误流也会被读取并转换为Flume事件 |
batchSize | 默认20,批处理,写入到Channel中的最大事件数量 |
batchTimeout | 批处理超时时间,如果长时间未写入到channel,则该配置会生效,将缓存的数据写入到channel中 |
charset | UTF-8,编码输入流或错误流为Flume事件的字符集 |
shell | 用于运行该命令的shell或者命令处理器 |
配置示例
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
shell运行命令
a1.sources = tailsource-1
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
Exec Source 丢失数据的可能性?
Exec Source是异步Source的一个例子,即如果有失败可能通知不到数据生产者,因此,重新启动Agent或者机器会导致数据丢失。
Exec Source在Flume中最常用来追踪文件,利用tail -f 命令使用Exec Source来追踪文件,近乎实时的将数据放到Flume中,但存在丢失数据的风险。如果Flume Agent挂了,重新启动Agent或者机器,Exec Source将在启动时重新运行command,在这种情况下,因为tail 命令只会拉取新数据,因此任何在Agent死亡和Source启动期间的写入文件的数据都会丢失。
由于这个原因,如果要求比较严格,可以采用Spooling Dircetory Source处理写入文件的数据。
即使使用其他一些命令,Exec Source 在将事件写入到Channel之前,也会缓存一些事件,直至到达batch大小,如果Agent重启,这些事件也可能会丢失。
Kafka Source
Kafka Source配置
参数 | 描述 |
---|---|
type | 完整类别名称org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | kafka集群 |
kafka.topics | kafka消费者将从逗号分隔的主题列表中读取消息 |
kafka.topics.regex | 正则订阅topic,该属性比kafka.topics优先级更高,如果存在,将覆盖kafka.topics。 |
kafka.consumer.group.id | 默认flume,消费者组Id |
batchSize | 默认1000,批处理,写入到Channel中的最大事件数量 |
batchDurationMillis | 默认1000,批处理超时时间,如果长时间未写入到channel,则该配置会生效,将缓存的数据写入到channel中 |
backoffSleepIncrement | 1000 初始和增量等待时间当kafka主题显示为空时触发,该参数会减少kafka一直去ping一个空主题的topic |
maxBackoffSleep | 5000 kafka主题显示为空时触发的最长等待时间 |
useFlumeEventFormat | false 设置为true以Flume Avro二进制格式读取事件 |
migrateZookeeperOffsets | true |
kafka.consumer.security.protocol | SASL_PLAINTEXT, SASL_SSL 或者 SSL |
配置示例
逗号分隔的主题列表
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
正则订阅Topic
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
自定义Source
Flume有两种类型的Source,EventDriven Source和Pollable Source.
PollableSource
1.当一个agent 启动之后,就会不断循环调用 process 以获取数据
2.当 process 返回 READY,表示数据产生正常,如果是 BACKOFF 则表示异常,当产生异常时候,agent 会等待一段时间再来调用 process,异常次数越多,间隔时间越长,最长不超过 5s。
3.自带一个线程,工作都是在自己的独立线程之内的
EventDrivenSource
1.当一个agent启动时候,会开始执行 application 的 main() 方法
2.进程启动之后,会通过 AbstractConfigurationProvider$getConfiguration解析配置文件中的各个组件和属性
3.针对 source 会生成 sourceRunner 通过 supervisor 来运行和管理其生命周期。
4.source 的生命周期 start 方法正式开始执行,这样也就到了我们将要自定义代码的实现执行了。
创建一个类,继承自 AbstractSource 并实现 Configurable 和( EventDrivenSource 或者PollableSource )
实现相关方法,以下是简单的一个生成序列的source
package com.inveno.flume;
import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.ImmutableMap;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SequenceSource extends AbstractSource implements Configurable ,EventDrivenSource {
private static final Logger logger = LoggerFactory
.getLogger(SequenceSource.class);
private long seq;
private int batchSize = 10;
private List<Event> batchArrayList = new ArrayList<>();
@Override
public void configure(Context context) {
//自定义配置属性
batchSize = context.getInteger("batchSize", 1);
//打印自定义属性
ImmutableMap<String, String> map = context.getParameters();
for (String s : map.keySet()) {
logger.warn(s + "==============configure=============================" + map.get(s));
}
}
private void process(){
try {
batchArrayList.add(EventBuilder.withBody(String.valueOf(seq++).getBytes()));
if(batchArrayList.size()>=batchSize){
getChannelProcessor().processEventBatch(batchArrayList);
batchArrayList.clear();
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void start() {
super.start();
//开启一个线程来生产数据,当然你也可以整个线程池
new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
//这里有个java知识点 ,InterruptedException捕获后,
// 这个标记点会被重置 ,需要再次 interrupt才能正确退出
Thread.currentThread().interrupt();
}
process();
}
}
}).start();
logger.debug("==========================start");
}
@Override
public void stop() {
super.stop();
logger.info("==========================stop", getName());
}
}