让Flume的TaildirSource支持Windows操作系统

最近准备重写以前的监控日志文件变动的项目,以前监控日志文件变动使用的技术基础是jdk1.7新出的WatchService,但是使用了接近一年以后发现了几个问题:

1.无法指定专门文件监听,只能对整个目录的所有文件进行监听,然后自己过滤
2.当修改了目录或者文件的任何metadata后,就不会再传入任何变动事件给监听者了
3.无法很好的控制监控频率

对于以上的三个问题,使用WatchService技术无法很好的解决他们,所以到处找别的解决方案。
因为我使用的是flume, 版本是1.6,当我看到flume1.7使用的TaildirSource的时候,忍不住眼前一亮,这就是我需要的解决方案。但是使用之前我要判断下是否有坑,于是去读了下他的源代码。

TaildirSource的代码很简单,主要就是五个类:
ReliableTaildirEventReader类
TaildirMatcher类
TaildirSource类
TaildirSourceConfigurationConstants类
TailFile类

这五个类之间的关系描述如下:
TailSource不用说,Flume的启动点
在其process方法中
TailSource调用ReliableTaildirEventReader的updateTailFiles方法去获取所有需要关注的文件,并读取解析
而为了只关注需要关注的文件,在ReliableTaildirEventReader的updateTailFiles方法中使用了TaildirMatcher去过滤出需要的文件
而TaildirMatcher和文件系统的目录是一一对应的,一个目录可以抽象为一个TaildirMatcher,而被关注的文件被抽象为TailFile对象
TaildirSourceConfigurationConstants类不用多说,一些配置常量而已

理解了类和代码,我们就可以明白整套TaildirSource动态监听文件变化的技术基础就是获取文件的inode,建立inode和文件之间的一一对应关系,利用RandomAccessFile去读取文件,并将inode和读取的位置以及文件位置保存成json文件进行持久化,以便后续的继续跟踪。
但是,注意,inode是linux文件的概念,而获取inode是在ReliableTaildirEventReader的getInode方法里,在这个方法里,我们将受到一万点暴击:(long) Files.getAttribute(file.toPath(), "unix:ino");
这段代码无耻的排除了windows操作系统的存在。

看明白了TaildirSource的代码,发现TaildirSource是不支持Windows的。怎么办呢?首先想想解决思路:
我们知道整体TaildirSource的思想是获取一个文件的标识(linux里inode可以作为文件的标识使用,当系统读取文件时,其实就是根据文件路径转换成对应的inode值来做的操作)并记录对应的文件路径,那么我们首先要确认在Windows中是否是有类似于inode这种东西的存在。这个回答解答了这个问题,windows中是有file id这种类似于inode的存在的。

那么继续,这个file id是否是有什么限制或者有什么特性呢?
参考这个回答 我们知道file id是跟文件系统有关的, 在FAT系统中,如果修改的名字长于旧名字,file id可能会发生改变,但是在NTFS系统中,在删除之前file id都是稳定的。

ok,方案可以确定了,如果是windows系统 并且文件系统是ntfs,那么我们就使用file id去获取文件,剩下的逻辑几乎和linux是一模一样。
如果是fat系统(在我们的工作环境中不可能出现),我们暂时不做支持。
(另外提一句,在windows 2012中新增加了一个Refs 这个由于我们基本不用,所以没有做考虑,参考这个)
方案落地如下:
使用java的jna
Kernel32的代码:

package com.creditease.ns.jna;/* Copyright (c) 2007, 2013 Timothy Wall, Markus Karg, All Rights Reserved
 *
 * The contents of this file is dual-licensed under 2 
 * alternative Open Source/Free licenses: LGPL 2.1 or later and 
 * Apache License 2.0. (starting with JNA version 4.0.0).
 * 
 * You can freely decide which license you want to apply to 
 * the project.
 * 
 * You may obtain a copy of the LGPL License at:
 * 
 * http://www.gnu.org/licenses/licenses.html
 * 
 * A copy is also included in the downloadable source code package
 * containing JNA, in file "LGPL2.1".
 * 
 * You may obtain a copy of the Apache License at:
 * 
 * http://www.apache.org/licenses/
 * 
 * A copy is also included in the downloadable source code package
 * containing JNA, in file "AL2.0".
 */

import com.sun.jna.Native;
import com.sun.jna.Structure;
import com.sun.jna.platform.win32.WinBase;
import com.sun.jna.platform.win32.WinNT;
import com.sun.jna.platform.win32.Wincon;
import com.sun.jna.win32.StdCallLibrary;
import com.sun.jna.win32.W32APIOptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * Interface definitions for <code>kernel32.dll</code>. Includes additional
 * alternate mappings from {@link WinNT} which make use of NIO buffers,
 * {@link Wincon} for console API.
 */
public interface Kernel32 extends StdCallLibrary, WinNT, Wincon {

    /**
     * The instance.
     */
    Kernel32 INSTANCE = (Kernel32) Native.loadLibrary("kernel32", Kernel32.class, W32APIOptions.DEFAULT_OPTIONS);

    /**
     * The GetLastError function retrieves the calling thread's last-error code
     * value. The last-error code is maintained on a per-thread basis. Multiple
     * threads do not overwrite each other's last-error code.
     *
     * @return The return value is the calling thread's last-error code value.
     */
    int GetLastError();

    /**
     * The CreateFile function creates or opens a file, file stream, directory,
     * physical disk, volume, console buffer, tape drive, communications
     * resource, mailslot, or named pipe. The function returns a handle that can
     * be used to access an object.
     *
     * @param lpFileName            A pointer to a null-terminated string that specifies the name
     *                              of an object to create or open.
     * @param dwDesiredAccess       The access to the object, which can be read, write, or both.
     * @param dwShareMode           The sharing mode of an object, which can be read, write, both,
     *                              or none.
     * @param lpSecurityAttributes  A pointer to a SECURITY_ATTRIBUTES structure that determines
     *                              whether or not the returned handle can be inherited by child
     *                              processes. If lpSecurityAttributes is NULL, the handle cannot
     *                              be inherited.
     * @param dwCreationDisposition An action to take on files that exist and do not exist.
     * @param dwFlagsAndAttributes  The file attributes and flags.
     * @param hTemplateFile         Handle to a template file with the GENERIC_READ access right.
     *                              The template file supplies file attributes and extended
     *                              attributes for the file that is being created. This parameter
     *                              can be NULL.
     * @return If the function succeeds, the return value is an open handle to a
     * specified file. If a specified file exists before the function
     * call and dwCreationDisposition is CREATE_ALWAYS or OPEN_ALWAYS, a
     * call to GetLastError returns ERROR_ALREADY_EXISTS, even when the
     * function succeeds. If a file does not exist before the call,
     * GetLastError returns 0 (zero). If the function fails, the return
     * value is INVALID_HANDLE_VALUE. To get extended error information,
     * call GetLastError.
     */
    HANDLE CreateFile(String lpFileName, int dwDesiredAccess, int dwShareMode,
        WinBase.SECURITY_ATTRIBUTES lpSecurityAttributes,
        int dwCreationDisposition, int dwFlagsAndAttributes,
        HANDLE hTemplateFile);


    /**
     * Closes an open object handle.
     *
     * @param hObject Handle to an open object. This parameter can be a pseudo
     *                handle or INVALID_HANDLE_VALUE.
     * @return If the function succeeds, the return value is nonzero. If the
     * function fails, the return value is zero. To get extended error
     * information, call {@code GetLastError}.
     * @see <A HREF="https://msdn.microsoft.com/en-us/library/windows/desktop/ms724211(v=vs.85).aspx">CloseHandle</A>
     */
    boolean CloseHandle(HANDLE hObject);


    class BY_HANDLE_FILE_INFORMATION extends Structure {
        public DWORD dwFileAttributes;
        public FILETIME ftCreationTime;
        public FILETIME ftLastAccessTime;
        public FILETIME ftLastWriteTime;
        public DWORD dwVolumeSerialNumber;
        public DWORD nFileSizeHigh;
        public DWORD nFileSizeLow;
        public DWORD nNumberOfLinks;
        public DWORD nFileIndexHigh;
        public DWORD nFileIndexLow;

        public static class ByReference extends BY_HANDLE_FILE_INFORMATION
            implements Structure.ByReference {

        }


        public static class ByValue extends BY_HANDLE_FILE_INFORMATION
            implements Structure.ByValue {

        }

        @Override
        protected List getFieldOrder() {
            List fields = new ArrayList();
            fields.addAll(Arrays.asList(new String[]{"dwFileAttributes",
                "ftCreationTime", "ftLastAccessTime", "ftLastWriteTime",
                "dwVolumeSerialNumber", "nFileSizeHigh", "nFileSizeLow",
                "nNumberOfLinks", "nFileIndexHigh", "nFileIndexLow"}));
            return fields;

        }
    }

    boolean GetFileInformationByHandle(HANDLE hFile,
        BY_HANDLE_FILE_INFORMATION lpFileInformation);
}
package com.creditease.ns.jna;

import com.sun.jna.platform.win32.WinBase;
import com.sun.jna.platform.win32.WinNT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;

public class WinFileUtils {
    private static Logger logger = LoggerFactory.getLogger(WinFileUtils.class);

    private final static int GENERIC_READ = 0x80000000;
    private final static int FILE_SHARE_READ = 0x00000001;
    private static final WinBase.SECURITY_ATTRIBUTES SECURITY_ATTRIBUTES = null;
    private static final int OPEN_EXISTING = 3;
    private static final int FILE_FLAG_BACKUP_SEMANTICS = 0x02000000;
    public static final String IO_ERROR = "_ERROR_";

    public static String getUniqueFileId(Path file) {
        Kernel32.BY_HANDLE_FILE_INFORMATION nfo = new Kernel32.BY_HANDLE_FILE_INFORMATION();
        WinNT.HANDLE handle = Kernel32.INSTANCE.CreateFile(file.toString(), GENERIC_READ, FILE_SHARE_READ,
            SECURITY_ATTRIBUTES, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS, null);
        int errorRet = Kernel32.INSTANCE.GetLastError();
        String identifier;
        if (errorRet != 0) {
            logger.error("创建/打开文件时发生错误 错误级别:{} 文件路径:{}", errorRet, file);
            identifier = IO_ERROR;
        } else {
            Kernel32.INSTANCE.GetFileInformationByHandle(handle, nfo);
            errorRet = Kernel32.INSTANCE.GetLastError();
            if (errorRet != 0) {
                logger.error("获取文件信息时发生错误 错误级别:{} 文件路径:{}", errorRet, file);
                identifier = IO_ERROR;
            } else {
                identifier = nfo.nFileIndexHigh + nfo.nFileIndexLow.toString() + Integer.toHexString(nfo
                    .dwVolumeSerialNumber.intValue());
            }
        }
        Kernel32.INSTANCE.CloseHandle(handle);
        return identifier;
    }
}

以上代码综合参考了这里这里
再次强调下,以上获取FileId的代码不适用于Refs系统,如果想支持的更完善,请使用JNA 4.4.0版本 这个版本里的kernel32有一个方法叫做GetFileInformationByHandleEx 它是支持最新的Refs系统的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容

  • Linux系统一般有4个主要部分: 内核、shell、文件系统和应用程序。内核、shell和文件系统一起形成了基本...
    偷风筝的人_阅读 3,247评论 1 17
  • Linux系统一般有4个主要部分:内核、shell、文件系统和应用程序。 内核、shell和文件系统一起形成了基本...
    请爱护小动物阅读 2,556评论 0 22
  • 一个基本的计算机系统由“硬件”和“软件”组成,一台Linux设备,主要的组成如下图所示: 一般情况下,我们所说的L...
    时待吾阅读 1,636评论 0 16
  • 1. 硬链接和软连接区别 硬连接-------指通过索引节点来进行连接。在Linux的文件系统中,保存在磁盘分区...
    杰伦哎呦哎呦阅读 2,239评论 0 2
  • 烟雨灰蒙,檐青瓦黛。我撑着纸伞沿着青石板街徐徐走着,双眼望着前路,空洞迷惘。 时不时有路过的公子,皆玉带锦...
    知子当如阅读 234评论 0 0