本篇将根据启动类MycatStartup.java来概览MyCAT的启动流程。
为了简明清晰,所有源码只列出关键代码加以说明,只做NIO网络框架的相关分析。
Mycat的主要启动流程在单例MycatServer.java中,我们先看下构造方法。
private MycatServer() {
//读取文件配置
this.config = new MycatConfig();
//缓存服务初始化
cacheService = new CacheService();
//路由计算初始化
routerService = new RouteService(cacheService);
//SQL解析器
sqlInterceptor = (SQLInterceptor) Class.forName(
config.getSystem().getSqlInterceptor()).newInstance();
}
主要逻辑说明:
1、读取配置文件。
即schema.xml、server.xml、rule.xml。mycat支持两种配置,zk或者本地文件,方式配置项在myid.properties文件中的loadZk=false or true。基于zk的配置方式会从zk将约定路径上的配置内容读取到本地文件,在此行代码解析到MycatConfig.java类中。
2、缓存和路由计算初始化。
缓存的配置信息在cacheservice.properties,默认实现为factory.encache=io.mycat.cache.impl.EnchachePooFactory。支持可配各个场景的缓存实现,缓存空间的大小,以及缓存的超时时间。例:
pool.SQLRouteCache=encache,10000,1800
layedpool.TableID2DataNodeCache=encache,10000,18000
配置了RouteService的本地缓存池。
备注:
SQLRouteCache的缓存key为 schema+SQL语句,value为 RouteResultset路由信息
TableID2DataNodeCache的缓存key为 id 的值,value为 节点名
接下来我们分析MycatServer.startup()方法,该方法中完成了Mycat网络通信框架中前端、后端通信的关键组件的初始化。
public void startup() throws IOException {
//创建对SocketChannel进行封装的工厂类
ServerConnectionFactory sf = new ServerConnectionFactory();
//创建缓存块pool,支持可配,默认使用直接内存
bufferPool = new DirectByteBufferPool(bufferPoolPageSize,bufferPoolChunkSize,
bufferPoolPageNumber,system.getFrontSocketSoRcvbuf());
//创建业务线程池和异步处理器
processors = new NIOProcessor[processorCount];
//业务线程池的大小可以适当小一些,Mycat主要处理流程都是在reactor线程组中完成的
businessExecutor = ExecutorUtil.create("BusinessExecutor",
threadPoolSize);
for (int i = 0; i < processors.length; i++) {
processors[i] = new NIOProcessor("Processor" + i, bufferPool,
businessExecutor);
}
//创建NIOReactorPool, 默认大小为机器cpu核心线程数
NIOReactorPool reactorPool = new NIOReactorPool(
DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + "NIOREACTOR",
processors.length);
//创建NIOConnector
connector = new NIOConnector(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + "NIOConnector", reactorPool);
((NIOConnector) connector).start();
//创建并启动NIOAcceptor
server = new NIOAcceptor(DirectByteBufferPool.LOCAL_BUF_THREAD_PREX + NAME
+ "Server", system.getBindIp(), system.getServerPort(), sf, reactorPool);
server.start();
//初始化数据库连接,调度心跳监听任务
Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
for (PhysicalDBPool node : dataHosts.values()) {
String index = dnIndexProperties.getProperty(node.getHostName(),"0");
if (!"0".equals(index)) {
LOGGER.info("init datahost: " + node.getHostName() + " to use datasource index:" + index);
}
node.init(Integer.parseInt(index));
node.startHeartbeat();
}
scheduler.scheduleAtFixedRate(dataNodeHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod(),TimeUnit.MILLISECONDS);
}
主要逻辑说明:
1、ServerConnectionFactory
封装SocketChannel为ServerConnection, 设置数据包handler处理器,创建当前连接的NonBlockingSession
2、DirectByteBufferPool
预创建缓存块pool
3、NIOReactorPool-线程组
NIOReactor线程组,用来监听并处理前端和后端的SelectionKey.OP_READ读事件,可以看到NIOConnector和NIOAcceptor都持有了reactorPool,创建连接后都交给reactorPool去处理。
4、NIOConnector-单线程
创建数据库连接,监听并处理SelectionKey.OP_CONNECT事件
5、NIOAcceptor-单线程
接收客户端(也就是业务服务)连接请求,监听并处理SelectionKey.OP_ACCEPT事件
本篇分析到此结束。
转载请备注原文链接。