1. 前言
最近有需求要定时监控文件,如果文件内容发生变化,就要动态地获取新内容,于是就准备使用 env.readFile方法,
(1)当你监控一个文件时,当文件内容发生变化,会将文件的整个内容作为流输出。
(2)当你监控一个目录的时候,发现当你复制一个文件,而没有改变这个文件的内容,并不会监控到也不会输出内容,只有某文件内容发生改变时,才会将该文件的所有内容输出(也不是这个目录下的所有文件内容输出)
因此,上述内容底层到底是如何实现的呢?是根据什么判定该文件或者该目录发生了变化呢?
2.源码分析
(1) env.readFile方法
readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation)
具体实现中是调用方法
createFileInput(FileInputFormat<OUT> inputFormat, TypeInformation<OUT> typeInfo, String sourceName, FileProcessingMode monitoringMode, long interval)
(2)ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction(inputFormat, monitoringMode, this.getParallelism(), interval);
ContinuousFileMonitoringFunction类的作用:
这是一个单个(非并行)监视任务,主要负责:
a、监视用户提供的路径
b、确定应该进一步读取和处理哪些文件
c、创建与那些文件相对应的文件输入片
d、将它们分配给下游任务进一步处理(分片分配到下游任务,可以超过单个并行度)
注意:分片被转发到下游时,基于它们所属文件的修改时间,以便按修改时间升序进行读取。
2.1 该类的构造器
其中,定义了分片后下游读取的并行度readerParallelism(算子定义的并行度和1中取最大值);
还有默认的全局修改时间globalModificationTime(初始是无穷小值)
另外,一次只能监控一个路径,不能同时监控多个不同的路径。
2.2 run方法
run方法是实现接口SourceFunction实现的方法,是用来向下游输出元素的。
a、首先是根据文件路径进行初始化,获取checkpoint锁
这个锁的作用是保证对state进行checkpoint和update的操作,与元素的输出不是同步完成的,需要在synchronized块中。
@Override
public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
Path p = new Path(path);
FileSystem fileSystem = FileSystem.get(p.toUri());
if (!fileSystem.exists(p)) {
throw new FileNotFoundException("The provided file path " + path + " does not exist.");
}
b、根据watchType,决定如何向下游输出元素。
PROCESS_CONTINUOUSLY类型:
while循环,在获得checkpointLock时,调用monitorDirAndForwardSplits方法,并且线程睡眠interval毫秒,再去再次调用monitorDirAndForwardSplits方法。PROCESS_ONCE类型:
调用一次monitorDirAndForwardSplits方法,将isRunning置为false。
switch (watchType) {
case PROCESS_CONTINUOUSLY:
while (isRunning) {
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
}
Thread.sleep(interval);
}
break;
case PROCESS_ONCE:
synchronized (checkpointLock) {
if (globalModificationTime == Long.MIN_VALUE) {
monitorDirAndForwardSplits(fileSystem, context);
globalModificationTime = Long.MAX_VALUE;
}
isRunning = false;
}
break;
default:
isRunning = false;
throw new RuntimeException("Unknown WatchType" + watchType);
2.3 monitorDirAndForwardSplits方法
Map<Path, FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(path));
Map<Long, List<TimestampedFileInputSplit>> splitsSortedByModTime = getInputSplitsSortedByModTime(eligibleFiles);
for (Map.Entry<Long, List<TimestampedFileInputSplit>> splits: splitsSortedByModTime.entrySet()) {
long modificationTime = splits.getKey();
for (TimestampedFileInputSplit split: splits.getValue()) {
LOG.info("Forwarding split: " + split);
context.collect(split);
}
// update the global modification time
globalModificationTime = Math.max(globalModificationTime, modificationTime);
}
a、首先调用listEligibleFiles方法
FileStatus[] statuses=fileSystem.listStatus(path)
获取路径下所有文件夹/文件的状态;
文件类型,获取最新的修改时间,调用shouldIgnore方法(该方法里判定modificationTime <= globalModificationTime时,返回true,应该忽略)
就是说某个文件的修改时间,是小于等于全局最近修改时间的时候,就会忽略了,认为该文件并没有再次被修改。
所以说,在路径下新复制一个文件,取决于这个被复制的文件的上次修改时间,与全局修改时间的大小,如果更大,则不会被忽略,相反会被忽略。-
目录类型,递归调用listEligibleFiles方法
b、从a步骤中获取内容又发生变化的文件后,调用getInputSplitsSortedByModTime
- 其中有createInputSplits方法,该方法比较复杂,定义在对应的FileInputFormat类中,该方法的作用主要是给文件计算分片,返回的是FileInputSplit列表。
详细内容请参考另一篇文章 Flink在加载文件数据源时,如何创建分片呢?
注意这里使用的数据结果为TreeMap结构,key就是修改时间的时间戳,value是文件输入分片 TimestampedFileInputSplit实例的列表。
因为TreeMap存储,所以会将最新时间戳放在最前面。
c、获取到TreeMap之后,进行遍历,将分片分配给下游,更新全局更新时间。
(3)根据文件格式创建ContinuousFileReaderOperator实例
读取从前面ContinuousFileMonitoringFunction返回的的修改的文件分片(TimestampedFileInputSplit实例);
与ContinuousFileMonitoringFunction相反并行度为1,此运算符的并行度>1;
一旦收到文件splits描述符,它就会被放入队列中,并具有另一个线程读取拆分的实际数据。 这种架构允许将从发出检查点障碍的那一侧读取线程,从而消除任何潜在的背压。
(4)最后返回数据源
3.总结
flink定时监控文件在某些场景十分有用,比如说动态修改某配置文件,在风险监控场景动态修改告警规则等,再与流广播配合使用那就更香了,达到不需要停止实时任务而能够改变某些配置。
至于问题所说,作者在进行实践中,复制某个文件之后,目录下明明多了一个文件,却并没有将该文件的内容读取输出,相信大家看了源码分析的内容应该有了答案:
因为复制的文件,该文件的最新修改时间是与源文件保持一致的,并不是你复制的时间,所以这个修改时间可能就小于全局更新时间(globalModificationTime),而不被认为是新文件,所以在listEligibleFiles那一步就被忽略了。
如果你是
(1)新创建的文件,
(2)或者说复制的源文件的修改时间是大于globalModificationTime,
(3)再或者你复制文件后,编辑下文件,更新下文件修改时间
最后肯定会输出该文件的内容。