从elasticsearch源码看节点启动流程

本文尝试通过阅读源码的途径,了解elasticsearch节点启动的大体流程。

1. 读取配置创建运行环境

运行环境,这里指的是Environment对象,这个对象封装了Settings对象(es配置),data路径,plugins路径,modules路径,bin路径,libs路径,log路径等。

相关源码:

    /** Create an {@link Environment} for the command to use. Overrideable for tests. */
    protected Environment createEnv(final Terminal terminal, final Map<String, String> settings) throws UserException {
        final String esPathConf = System.getProperty("es.path.conf");
        if (esPathConf == null) {
            throw new UserException(ExitCodes.CONFIG, "the system property [es.path.conf] must be set");
        }
        return InternalSettingsPreparer.prepareEnvironment(Settings.EMPTY, terminal, settings, getConfigPath(esPathConf));
    }

可以看到这里读取了es.path.conf的配置,即es的各种配置文件所在路径。

个人认为这里没有必要读取es.path.conf的配置,这个配置实际上可以不要的,因为已经有es.home的配置了,并且Environment类的构造函数里有这样的代码:

Environment(final Settings settings, final Path configPath, final Path tmpPath)
    ...
    if (configPath != null) {
        configFile = configPath.normalize();
    } else {
        configFile = homeFile.resolve("config");
    }
    ...
}

默认的配置文件位置就是home路径下的config里面。

elasticsearch.yml配置文件的加载在下面这个方法里面:

public static Environment  prepareEnvironment(Settings input, Terminal terminal, Map<String, String> properties, Path configPath) {
    ...
    output = Settings.builder(); // start with a fresh output
    Path path = environment.configFile().resolve("elasticsearch.yml");
    if (Files.exists(path)) {
        try {
            output.loadFromPath(path);
        } catch (IOException e) {
            throw new SettingsException("Failed to load settings from " + path.toString(), e);
        }
    }
    ...
    return new Environment(output.build(), configPath);
}

加载出来的配置会封装成Settings对象设置到Environment里面。

2. 初始化节点

这块的代码都在Node类的下面这个构造函数里面

protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
    ...
    logger.info("initializing ...");
    ...
    logger.info("initialized");
    ...
}

把一些细节省略之后,能很明显看到两条我们启动es的时候经常看到的日志。

下面去看里面一些关键性的细节,先整体看下这个构造函数:

    protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
        final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
        boolean success = false;
        {
            // use temp logger just to say we are starting. we can't use it later on because the node name might not be set
            Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
            logger.info("initializing ...");
        }
        try {
            ...
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to bind service", ex);
        } finally {
            if (!success) {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
    }

具体细节在这个try里面,里面会打开很多资源,这些资源都会被添加到一个List里面,如果初始化失败,在finally里面会将这些资源关闭。

然后进入try里面去看具体初始化节点的流程。

配置节点

        originalSettings = environment.settings();
        Settings tmpSettings = Settings.builder().put(environment.settings())
            .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

        // create the node environment as soon as possible, to recover the node id and enable logging
        try {
            nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
            resourcesToClose.add(nodeEnvironment);
        } catch (IOException ex) {
            throw new IllegalStateException("Failed to create node environment", ex);
        }

读取Environment对象里面的配置,创建节点自己的配置NodeEnvironment,里面的配置包括但不限于以下这些信息:

  • node.max_local_storage_nodes,本地最大存储节点数量,默认1,即本地最多只能同时启动一个能存储数据的节点;
  • node.id.seed,生成节点id的随机数种子,默认是0;
  • 节点数据存放路径nodePath,默认是{esHomePath}/data/nodes/0,{esHomePath}/data/nodes/1等;
  • indice数据存放路径,默认是{nodePath}/indices;

同时这里面随机生成了节点id,读取并打印出了磁盘、jvm信息,这里的heap size是默认值1/4物理内存:

[2018-10-19T17:52:35,480][INFO ][o.e.e.NodeEnvironment    ] [0aIXj0y] using [1] data paths, mounts [[(F:)]], net usable_space [215gb], net total_space [231gb], types [NTFS]
[2018-10-19T17:53:12,209][INFO ][o.e.e.NodeEnvironment    ] [0aIXj0y] heap size [7.9gb], compressed ordinary object pointers [true]

生成节点名称

        final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
        final String nodeId = nodeEnvironment.nodeId();
        tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);

检查node.name是否有配置,如果用户没有配置,则取nodeId前7位作为节点名称。

加载plugins和modules

this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);

PluginService的构造函数里加载了所有的pluginsmodules,过程中会打印出如下日志:

[2018-10-22T15:42:43,978][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [aggs-matrix-stats]
[2018-10-22T15:42:43,980][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [analysis-common]
[2018-10-22T15:42:43,981][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [ingest-common]
[2018-10-22T15:42:43,982][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [lang-expression]
[2018-10-22T15:42:43,983][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [lang-mustache]
[2018-10-22T15:42:43,983][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [lang-painless]
[2018-10-22T15:42:43,984][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [mapper-extras]
[2018-10-22T15:42:43,985][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [parent-join]
[2018-10-22T15:42:43,986][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [percolator]
[2018-10-22T15:42:43,987][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [rank-eval]
[2018-10-22T15:42:43,988][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [reindex]
[2018-10-22T15:42:43,989][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [repository-url]
[2018-10-22T15:42:43,990][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [transport-netty4]
[2018-10-22T15:42:43,991][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [tribe]
[2018-10-22T15:42:43,992][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-core]
[2018-10-22T15:42:43,993][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-deprecation]
[2018-10-22T15:42:43,994][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-graph]
[2018-10-22T15:42:43,994][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-logstash]
[2018-10-22T15:42:43,995][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-ml]
[2018-10-22T15:42:43,996][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-monitoring]
[2018-10-22T15:42:43,998][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-rollup]
[2018-10-22T15:42:43,999][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-security]
[2018-10-22T15:42:44,000][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-sql]
[2018-10-22T15:42:44,001][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-upgrade]
[2018-10-22T15:42:44,002][INFO ][o.e.p.PluginsService     ] [localhost-debug] loaded module [x-pack-watcher]
[2018-10-22T15:42:45,130][INFO ][o.e.p.PluginsService     ] [localhost-debug] no plugins loaded

初始化各模块

后面长达几百行的代码都是在构造各个模块及组件(AbstractComponent类),如ScriptModuleAnalysisModuleSearchModuleIndicesServiceClusterServiceTransportService等。

打印日志如下:

[2018-10-24T16:10:08,595][DEBUG][o.e.a.ActionModule       ] Using REST wrapper from plugin org.elasticsearch.xpack.security.Security
[2018-10-24T16:10:08,856][INFO ][o.e.d.DiscoveryModule    ] [localhost-debug] using discovery type [zen]

各个模块及组件分别承担什么功能,此处暂时不深究,实际上根据类名大致也能猜测出其作用。

3. 启动节点

这块代码在Node类的start()方法里面。

启动各组件

    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() throws NodeValidationException {
        if (!lifecycle.moveToStarted()) {
            return this;
        }

        Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
        logger.info("starting ...");
        pluginLifecycleComponents.forEach(LifecycleComponent::start);

        injector.getInstance(MappingUpdatedAction.class).setClient(client);
        injector.getInstance(IndicesService.class).start();
        injector.getInstance(IndicesClusterStateService.class).start();
        injector.getInstance(SnapshotsService.class).start();
        injector.getInstance(SnapshotShardsService.class).start();
        injector.getInstance(RoutingService.class).start();
        injector.getInstance(SearchService.class).start();
        nodeService.getMonitorService().start();
                final ClusterService clusterService = injector.getInstance(ClusterService.class);

        final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
        nodeConnectionsService.start();
        clusterService.setNodeConnectionsService(nodeConnectionsService);

        injector.getInstance(ResourceWatcherService.class).start();
        injector.getInstance(GatewayService.class).start();
        Discovery discovery = injector.getInstance(Discovery.class);
        clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

        // Start the transport service now so the publish address will be added to the local disco node in ClusterService
        TransportService transportService = injector.getInstance(TransportService.class);
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.start();
        assert localNodeFactory.getNode() != null;
        assert transportService.getLocalNode().equals(localNodeFactory.getNode())
            : "transportService has a different local node than the factory provided";
        final MetaData onDiskMetadata;
        try {
            // we load the global state here (the persistent part of the cluster state stored on disk) to
            // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
            if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
                onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
            } else {
                onDiskMetadata = MetaData.EMPTY_META_DATA;
            }
            assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        ...
    }

这块start了很多组件(Service,都是AbstractComponent的子类),具体逻辑暂时不深究,打印日志如下:

[2018-10-24T16:10:16,609][INFO ][o.e.n.Node               ] [localhost-debug] starting ...
[2018-10-24T16:11:14,864][INFO ][o.e.t.TransportService   ] [localhost-debug] publish_address {xx.xx.xx.xx:9300}, bound_addresses {xx.xx.xx.xx:9300}

验证node信息

    validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
            .filterPlugins(Plugin
            .class)
            .stream()
            .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));

这块代码对node进行启动验证,日志如下:

[2018-10-24T16:33:31,983][INFO ][o.e.b.BootstrapChecks    ] [localhost-debug] bound or publishing to a non-loopback address, enforcing bootstrap checks

如果修改了elasticsearch.yml里面network.host或者其它方式(如http.host,transport.host)的host的配置,而不是默认的127.0.0.1,那么bootstrap check如果检测到问题就不是warning而是error了。

这里可能会有一些问题报出来。比如配置-Xms和-Xmx的值不相同就会有问题,还有一些其它如file descriptors之类的问题,网上都能找到解决办法。

join cluster

discovery.startInitialJoin();

这行代码尝试把当前node加入cluster,不同的Discovery有不同的实现,默认实现是ZenDiscovery

        if (initialStateTimeout.millis() > 0) {
            final ThreadPool thread = injector.getInstance(ThreadPool.class);
            ClusterState clusterState = clusterService.state();
            ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
            if (clusterState.nodes().getMasterNodeId() == null) {
                logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
                final CountDownLatch latch = new CountDownLatch(1);
                observer.waitForNextChange(new ClusterStateObserver.Listener() {
                    @Override
                    public void onNewClusterState(ClusterState state) { latch.countDown(); }

                    @Override
                    public void onClusterServiceClose() {
                        latch.countDown();
                    }

                    @Override
                    public void onTimeout(TimeValue timeout) {
                        logger.warn("timed out while waiting for initial discovery state - timeout: {}",
                            initialStateTimeout);
                        latch.countDown();
                    }
                }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);

                try {
                    latch.await();
                } catch (InterruptedException e) {
                    throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
                }
            }
        }

加入cluster需要先找到master节点,找到master节点需要时间,这里会进行等待,等到master节点出现,或超时(默认30秒)结束。

启用http

        if (NetworkModule.HTTP_ENABLED.get(settings)) {
            injector.getInstance(HttpServerTransport.class).start();
        }

这里根据配置启用http,默认是开启的,打印日志如下:

[2018-10-24T19:13:07,544][INFO ][o.e.x.s.t.n.SecurityNetty4HttpServerTransport] [localhost-debug] publish_address {xx.xx.xx.xx:9200}, bound_addresses {xx.xx.xx.xx:9200}

到这里当前节点就已经成功启动了,此时仍然有可能还没有选出master节点,没有加入cluster,但不要紧,虽然不能提供集群服务,但凭借此单节点还是能提供一些服务的(可以尝试发送http请求试验)。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,653评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,321评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,833评论 0 324
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,472评论 1 266
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,306评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,274评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,658评论 3 385
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,335评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,638评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,697评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,454评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,311评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,699评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,986评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,254评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,647评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,847评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,502评论 18 139
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 3,799评论 0 5
  • ElasticSearch优化系列一:集群节点规划 - 简书//www.greatytc.com/p/4c...
    葡萄喃喃呓语阅读 7,161评论 1 19
  • 今天才知道,因百度关闭了空间,我多年前在百度空间写的文章全没了。那些饱含情感的文章,再也找不回来了。或许是时候放下...
    晴小菡阅读 123评论 0 0
  • 最初知道这本书是当时某童司机在看的时候提起过,于是也顺手购入了这本书,直到最近才翻阅。这本书其实很简单,没有设计的...
    可飞阅读 723评论 0 2