前言
本篇从上边说的方式,自顶向下,从粗到细的思路来进行学习。 知道druid核心功能、每个功能大概实现原理。
学习步骤 - 包结构
druid
|
--src
--main
-- java
-- com.alibaba.druid
-- filter (过滤器相关)
-- mock (猜测测试相关)
-- pool (数据库连接池相关)
-- proxy (jdbc API相关代理实现)
-- sql (sql解析相关)
-- stat (增强功能:统计类信息)
-- support (支持类)
-- util (工具类)
-- wall (增强功能:防sql注入相关)
-- META-INF
-- resources
-- scripts
-- test
-- pom.xml
看完包结构,以及大概猜测的含义,接下来,进入每个包,看下具体含义,以及猜测是否正确。这里的顺序呢,我们以理解的重要程度来作分析。
本篇目的:分析Pool包
打开pool包,我们发现有三个子包ha、vendor、xa,以及其他的针对jdbcApi的池化实现(datasource、connection、statement、resultset等)
ha子包
从字面意思理解,是高可用(high availability)
- selector包
这里主要是围绕都数据源的时候,具体使用哪个数据源的选举展开的,所以,我们弄明白他的设计就可以了。(抓住主干)
-
DataSourceSelector:数据源选举的抽象方法定义(接口)
DataSourceSelector的实现类.png
我们看到他的实现类有三个,分别是按名称、随机、粘性选举。
NamedDataSourceSelector
按名称来选举。大致实现逻辑:使用方通过ThreadLocal传递需要使用的数据源的名称来获取对应的数据源。如果使用方有传递这个名称,则按照这个名来从数据源map(这个是构造时入参HighAvailableDataSource自带的)中取出来。如果没有这个名称,则会取默认的名称,然后再取数据源。RandomDataSourceSelector
随机选取数据源。从数据源map(同上解释)中,根据总大小使用随机函数(Random.nextInt())选取一个数据源返回。StickyRandomDataSourceSelector
粘性选取数据源。这是一个比较特殊的选取器。它继承随机选取数据源,也就是说如果是第一次进来,就是走的随机选取。那区别在哪呢?在选取后,它会把这个数据源跟绑定到当前线程(ThreadLocal),会持续一段时间(这个时间可以配置)。
2.HighAvailableDataSource:DataSource class which contains multiple DataSource objects. 包含多个数据源对象的数据源类
刚刚上面有提到过,三个选取器的构造入参就是这个类的示例。它实现了javax.sql.DataSource,意味着它对外就是一个datasource示例。它的实例化工作在getConnection()方法,调用的时候,会初始化一个数据源集合(初始化逻辑通过DataSourceCreator),然后初始化数据源选取器,可以设置选取器类型(就是上面提到的三种),如果没有设置,使用随机类型。初始化完毕后,使用数据源选取器获取数据源,然后返回该数据源的一个链接(这个是正常的jdbc获取链接的思路)。
3.DataSourceCreator:An utility class to create DruidDataSource dynamically. 动态创建DruidDataSource 的工具类。
上面有提到过,高可用数据源的底层初始化数据源集合就是使用该类。通过构造进来配置文件,以及配置参数的前缀,做解析,紧接着创建数据源集合。具体创建过程就是拿解析到的name、url、username、psw,直接构建DruidDataSource,除了这几个基本参数外,其他参数使用高可用数据源实例自己的默认值(外面也可以设置)
总结下ha包,使用的场景就是你有多数据源的时候。通过统一的高可用数据源对外充当只有一个数据源(具体实现逻辑对使用方不可见)。
比如:
- 按名称选举:主从数据源。通过提前设置数据源名称(高可用数据源设置方法),做到数据源的切换。类似思路其实在sharding-jdbc的读写分离中有用到。
- 随机选举:多个从数据源,之前无差别。
- 粘性选择:我没想到好的场景,可以看看上面的差别,自己想想。
建议:高可用的数据源这里,我提一种场景 -- 多个从库需要最近路由来做数据查询。其实现在的选举器是满足不了的。现在的选举受DataSourceSelectorEnum枚举的限制,不具有扩展能力。自定义规则无法接入。为啥提这个呢,其实就是想实现类似dubbo(rpc框架)的最近路由或者按group路由。达到性能最高的目的。
vendor子包
google一下,是供应商、小摊小贩的意思。其实点一点里面的类,大概值围绕两种接口的实现来做的。
-
ExceptionSorter
An interface to allow for exception evaluation. 允许评估异常的接口。直接这么看是比较生硬的,看下它定义的方法就知道啥意思了,boolean isExceptionFatal(SQLException e);该包下罗列了各种db的实现类以及mock等异常的判断。
-
ValidConnectionChecker
顾名思义,检查连接是否正常有效。这里以MySqlValidConnectionChecker为例,就是判断判断connection是否正常,两种方式,1.使用connection的pingInternal进行反射调用。 2.使用connection执行自定义查询语句。 如果没有异常抛出,就认为是有效的connection。
总结:这个包名字起的也算到位,就是小功能的集合包。这与这个包下的功能何时使用,我们后面看其他包的时候再看下。
xa子包
用于兼容分布式事务相关。这里可参考维基百科介绍的分布式事务概念。
DruidXADataSource
extends DruidDataSource implements XADataSource,这里其实就是扩展了普通的datasource,以满足分布式接口的需要。比如getXAConnection,就是先获取普通的链接,然后根据不同的数据库方言,使用对应的工具类进行创建XAConnection,对外再统一包装一层DruidPooledXAConnection。目前支持的方言有:Mysql、postgresql、h2、还有下面扩展的jtds。DruidPooledXAConnection
implements XAConnection,这里是对不同的方言对应的XAConnection做统一的包装。JtdsXAResource
implements XAResource,commit、end、rollback、start等方法,都是包装在XASupport之上。自实现了isSameRMJtdsXAConnection
implements XAConnection ,包装普通的connection(创建JtdsXAResource、获取分布式链接id)
总结:这里就是对xa的支持,包装逻辑类似于前面介绍的selector,可以对比着理解。
包外的核心类:这里才是重点。
假如之前没有使用过druid很可能会一脸懵逼,不知道从哪入手。不过呢,我们一般可以从一个框架它的核心功能入手,然后一个点切进去,你就会发现不知不觉就都串联起来了。我们这里以DruidDataSource会切入点,我们一般使用的时候,会直接配置DruidDataSource(spring xml配置方式),并且指定它的初始化方法为init方法。接下我们就从这个方法入手,看一看究竟干了啥,又是怎样把这部分核心代码给串联起来的。
/**
* 数据源的初始化入口
*
* @throws SQLException
*/
public void init() throws SQLException {
// 成员变量,如果已经初始化,直接return
if (inited) {
return;
}
// bug fixed for dead lock, for issue #2980
// 这里主要是初始化这个类,利用静态方法注册自己的驱动类
DruidDriver.getInstance();
final ReentrantLock lock = this.lock;
try {
// 尝试拿锁
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
// 双重检查用于更安全(常用于同步后,再做层检查)
if (inited) {
return;
}
// 调用栈信息
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
// 内存级别的id原子加1
this.id = DruidDriver.createDataSourceId();
// 如果不是第一个数据源,则它下面的几个属性做内存指令级别的直接更新,跨度10万个
if (this.id > 1) {
long delta = (this.id - 1) * 100000;
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
}
if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
// 如果是druid自定义包装的jdbcurl做解析
initFromWrapDriverUrl();
}
/**
* 通过filter(通过datasource的property参数注入)去装饰datasource,每个filter关注的功能不同,比如configFilter(去解密秘钥、设置其他属性)
*/
for (Filter filter : filters) {
filter.init(this);
}
if (this.dbType == null || this.dbType.length() == 0) {
this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
}
if (JdbcConstants.MYSQL.equals(this.dbType)
|| JdbcConstants.MARIADB.equals(this.dbType)
|| JdbcConstants.ALIYUN_ADS.equals(this.dbType)) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
}
if (maxActive <= 0) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (maxActive < minIdle) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (getInitialSize() > maxActive) {
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
}
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
if (this.driverClass != null) {
this.driverClass = driverClass.trim();
}
//初始化spi的fitler
initFromSPIServiceLoader();
// 如果没有直接设置,会从url中识别出dbType,然后返回定义好的DriverClassName
if (this.driver == null) {
if (this.driverClass == null || this.driverClass.isEmpty()) {
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
if (MockDriver.class.getName().equals(driverClass)) {
driver = MockDriver.instance;
} else {
if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
throw new SQLException("url not set");
}
// 加载真实驱动
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
}
} else {
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName();
}
}
// 一些基本的简单检查,比如oracle某些版本不支持等
initCheck();
// 根据驱动类型,初始化异常的排序器,在 pool-ventor包下,主要是用来判断异常的严重性
initExceptionSorter();
// 初始化链接的检查器,同上ExceptionSorter同一包下,用于检查链接是否正常
initValidConnectionChecker();
// 检查校验链接是否有效的sql语句是否设定。
validationQueryCheck();
if (isUseGlobalDataSourceStat()) {
// 使用全局的数据源统计
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.dbType);
}
} else {
// 不使用全局数据源统计,新建
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
}
// 设置数据源统计是否支持重置,可以追下代码,发现是servlet调用过来进行reset。
dataSourceStat.setResetStatEnable(this.resetStatEnable);
// 链接持有者 数组
connections = new DruidConnectionHolder[maxActive];
// 被驱逐链接持有者 数组
evictConnections = new DruidConnectionHolder[maxActive];
// 保活链接持有者 数组
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
// 创建指定的初始化个数的链接,并与holder绑定
if (createScheduler != null && asyncInit) {
// 如果是异步创建
for (int i = 0; i < initialSize; ++i) {
submitCreateTask(true);
}
} else if (!asyncInit) {
// 同步创建
// init connections
while (poolingCount < initialSize) {
try {
// 创建物理连接,里面会调用driver的连接方法,然后就会静态缓存datasource信息,供其他地方使用
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
}
// 创建数据源打印统计信息的线程并启动开始打印
createAndLogThread();
// 创建并启动链接的创建线程(异步并且没有调度线程池的时候使用,不会走上面两种判断的初始化)
createAndStartCreatorThread();
// 用于检查最大生命周期、驱逐链接、最大超时时间等配置,用于剔除或者标记链接
createAndStartDestroyThread();
// 保证前面两个线程的创建
initedLatch.await();
init = true;
initedTime = new Date();
// 注册Mbean,用于对外提供监控等功能,可参考(https://blog.csdn.net/u013256816/article/details/52800742)
registerMbean();
if (connectError != null && poolingCount == 0) {
throw connectError;
}
if (keepAlive) {
// async fill to minIdle
if (createScheduler != null) {
for (int i = 0; i < minIdle; ++i) {
submitCreateTask(true);
}
} else {
this.emptySignal();
}
}
} catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} catch (RuntimeException e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (Error e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} finally {
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
String msg = "{dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty()) {
msg += ",";
msg += this.name;
}
msg += "} inited";
LOG.info(msg);
}
}
}
如上就是整个datasource的初始化过程,具体直接看注释,我觉得还是够呛,还是建议自己读一遍代码,到具体方法了,就点进去,看下一下方法干什么。这样整个就了解了。
其实整个过程也就是:校验基本参数、初始化链接、校验链接、输出统计信息等。
如上就是整个Pool包的包结构情况。我们可以看出来,主要是围绕数据源、链接相关,展开一系列的周边建设。
ok,pool包分析就到这里。有什么疑问,可以下面留言给我。