前言
上一篇博文中,我们分析了Mybatis中提供的第一种数据源实现UnpooledDataSource
,本文我们将分析另外一种数据源实现类PooledDataSource
首先说下背景,为什么需要一个池化的数据源呢?我们知道在JDBC编程中,数据库连接的创建
过程是十分耗时
的,且数据库能够建立的连接数
也是有限
的。因此,数据库连接是一种极其珍贵
的系统资源
,也就有了池化
的必要,从而保证数据库连接的重用,提高系统响应速度,防止数据库连接泄露等
PooledDataSource
Mybatis使用PooledDataSource来实现数据源连接池的功能,其内部依赖了上文分析的
UnplooledDataSource
组件,因为池化的数据源也是需要创建连接的,其创建连接的部分功能就由内部持有的UnpooledDataSource
来完成
接下来我们直接看源码
// 一个简单,同步,线程安全的数据库连接池
public class PooledDataSource implements DataSource {
// 省略其他属性...
// 内部持有一个非池化的数据源,用来创建连接对象
private final UnpooledDataSource dataSource;
// 有关池相关的可选配置属性
// 池最大活跃连接数, 默认值10
protected int poolMaximumActiveConnections = 10;
// 池最大空闲连接数,默认值5,超过该值配置的空闲连接会被直接关闭,而不是再将其放入池中维护
protected int poolMaximumIdleConnections = 5;
// 池最大检查时间,即强制收回时间,获取连接时如果没有空闲连接可用且不满足创建新连接的条件,就会从活跃连接集合中获取第一个连接,并判断其上次检查时间到当前时间的差,如果大于该配置值就满足强制收回条件,配置默认值为20秒
protected int poolMaximumCheckoutTime = 20000;
// 池获取连接等待时间,当获取连接时没有空闲连接,且不能创建新连接,且没有活跃连接到达强制收回条件,则当前获取连接的线程就会阻塞等待该配置的时间,默认20秒
protected int poolTimeToWait = 20000;
// 如果获取到的是一个坏的连接,那么重新获取一个新的连接,次数最多为 poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 一下配置和valid方法相关 ---------------------------
// 心跳相关配置属性 心跳查询sql
protected String poolPingQuery = "NO PING QUERY SET";
// 是否允许心跳查询
protected boolean poolPingEnabled;
// 执行心跳查询频率
protected int poolPingConnectionsNotUsedFor;
// 另一个重要属性是PoolState,该类用来存放数据库状态,包括我们的池相关功能(活动连接,空闲练级的保存...)都是通过该类实现的,当我们使用Mybatis的连接池出现问题或者需要优化时,可以利用该属性获取到连接池的各种状态来做参考
private final PoolState state = new PoolState(this);
}
以上就是PooledDataSource
类的重要属性分析,既然是属性,那就需要有响应的设置配置属性的方法提供
// 设置池最大活动连接属
public void setPoolMaximumActiveConnections(int poolMaximumActiveConnections) {
this.poolMaximumActiveConnections = poolMaximumActiveConnections;
forceCloseAll();
}
// 设置池最大空闲连接数
public void setPoolMaximumIdleConnections(int poolMaximumIdleConnections) {
this.poolMaximumIdleConnections = poolMaximumIdleConnections;
forceCloseAll();
}
// 省略其他设置方法...
-------------------------------------------------------------------------
以上是数据源连接池提供的设置自身池相关属性的方法,由于其内部持有了一个非池化数据源,因此Mybatis也提供了一些方法用来设置非池化数据源的配置属性,如`driver`,`url`,`username`,`password`等
// 设置驱动
public void setDriver(String driver) {
dataSource.setDriver(driver);
forceCloseAll();
}
// 设置url
public void setUrl(String url) {
dataSource.setUrl(url);
forceCloseAll();
}
// 设置username
public void setUsername(String username) {
dataSource.setUsername(username);
forceCloseAll();
}
// 设置password
public void setPassword(String password) {
dataSource.setPassword(password);
forceCloseAll();
}
// 省略其他非池化属性设置方法...
// 除了提供了相应的setter方法,也提供了对应的getter方法,这里就不一一列举了...
通过以上设置相关配置属性的方法我们可以看到,基本上所有的属性配置方法都会执行一个forceCloseAll()
方法,下面我们看下这个方法做了什么事
// 代码逻辑不复杂,就是利用for循环,关闭池中所有活跃和空闲的连接
public void forceCloseAll() {
synchronized (state) {
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
for (int i = state.activeConnections.size(); i > 0; i--) {
try {
// 从PoolState对象持有的活跃连接集合中移除活跃连接
PooledConnection conn = state.activeConnections.remove(i - 1);
// 失效该活跃连接对象
conn.invalidate();
// 获取PooledConnection对象持有的真实连接对象Connection
Connection realConn = conn.getRealConnection();
// 如果真实的连接对象是手动提交事务的,这里需要回滚事务
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
// 关闭连接对象
realConn.close();
} catch (Exception e) {
// ignore
}
}
for (int i = state.idleConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.idleConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
}
if (log.isDebugEnabled()) {
log.debug("PooledDataSource forcefully closed/removed all connections.");
}
}
下面我们来分析数据源连接池的重要的基本功能:连接获取
,缓存
,释放
等功能
首先看连接获取
PooledDataSource
类实现了DataSource
接口,自然就需要重写其两个getConnection
方法
@Override
public Connection getConnection() throws SQLException {
return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return popConnection(username, password).getProxyConnection();
}
以上两个重载的方法都转到了popConnection
方法上,我们直接看popConnection方法
的源码,该方法代码量比较多,毕竟是数据源连接池的核心逻辑所在,重点分析这个方法
private PooledConnection popConnection(String username, String password) throws SQLException {...}
首先看下方法声明,该方法并不是直接返回一个Connection对象,而是一个PooledConnection
对象,下面我们先看下这个类的定义
PooledConnection
不仅是一个
InvocationHandler
,还持有数据源dataSource
,真实的连接对象realConnection
,以及JDK动态代理连接对象proxyConnection
,以及连接对象的创建时间
,上次使用时间
,连接是否有效
等属性...
class PooledConnection implements InvocationHandler {
private static final String CLOSE = "close";
// 创建代理类时传入,作为代理的接口类型
private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
private final PooledDataSource dataSource;
private final Connection realConnection;
private final Connection proxyConnection;
private long checkoutTimestamp;
private long createdTimestamp;
private long lastUsedTimestamp;
private int connectionTypeCode;
private boolean valid;
// 构造函数
public PooledConnection(Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
// 利用JDK动态代理,创建出代理对象proxyConnection,并传入当前实例this作为InvocationHandler
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}
}
那既然是一个InvocationHandler,那重要的方法就是invoke啦
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
dataSource.pushConnection(this);
return null;
} else {
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
checkConnection();
}
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
}
可以看到实际上就是代理了Connection对象的close方法,如果不是池化的数据源,那这里也就没必要作什么代理,连接对象的close方法执行就是真实的连接对象的关闭操作,而如果是一个池化的数据源,那一个真实连接对象的close方法就不能简单的直接关闭就完事了,到底关不关,需要交给池化的dataSource决定,内部处理逻辑是:如果空闲的连接已经超过了配置设定的最大空闲连接数,那就关闭真实连接对象,否则就加到空闲连接集合中缓存起来...
这就是PooledConnection存在的作用,分析到了这里,是不是优点迫不及待的想看看到底PooledDataSource是怎么获取连接以及释放连接的呢?
继续上面的获取连接的方法popConnection分析,核心地方到了
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
while (conn == null) {
// 利用一个while循环获取连接,文章开头说的同步的数据源连接池,原因就在这里,你可以把PoolState理解为一个大的池,里面有存放活跃/空闲连接的集合,每次获取连接也好,释放也好,都需要加锁同步处理,否则会出现并发安全问题
synchronized (state) {
if (!state.idleConnections.isEmpty()) {
// Pool has available connection
// 1. 如果池的空闲连接集合不为空,则代表有可用的连接,则取出一个空闲连接
conn = state.idleConnections.remove(0);
if (log.isDebugEnabled()) {
// 打印取出连接日志
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else {
// Pool does not have available connection
// 2. 池中没有空闲连接,则说明没有连接可用,则判断是否满足可以新建连接,即判断池当前活跃连接数是否小于配置的最大活跃连接数
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// Can create new connection
// 2.1. 允许创建新连接,调用了内部持有的UnpooledDataSource实例的getConnection方法创建新的对象,并封装成PooledConnection对象返回
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
// 打印新建连接日志
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
} else {
// Cannot create new connection
// 不允许创建新连接,则尝试获取最新创建的活跃连接
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
// 检测该连接是否超时
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// Can claim overdue connection
// 统计超时连接
state.claimedOverdueConnectionCount++;// 超时连接对象数量+1
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
// 将超时连接移出活跃连接集合
state.activeConnections.remove(oldestActiveConnection);
// 如果超时连接是手动提交的方式
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
// 则尝试回滚该超时连接
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
/*
Just log a message for debug and continue to execute the following
statement like nothing happend.
Wrap the bad connection with a new PooledConnection, this will help
to not intterupt current executing thread and give current thread a
chance to join the next competion for another valid/good database
connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug("Bad connection. Could not roll back");
}
}
// 利用超时连接的真实Connection连接对象创建一个新的PooledConnection对象,并设置创建时间为之前超时连接的创建时间
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
// 设置上次使用时间为超时连接的上次使用时间
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
// 超时连接对象置为无效状态
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else {
// Must wait
// 执行到这里,等于没有空闲连接,且无法创建新连接,且无超时活跃连接,则只能等待,等待时间是poolTimeToWait属性配置项
try {
if (!countedWait) {
// 统计池状态,增加等待数量
state.hadToWaitCount++;
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait);
// 统计累计等待时间
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
// 如果是等待后执行这里,则不会进入if,直接下一次循环获取连接对象,如果不是等待后执行的这里,则会进入if内
if (conn != null) {
// ping to server and check the connection is valid or not
// 检测获取到的连接对象是否有效
if (conn.isValid()) {
// 如果这里的连接真实对象是手动提交事务的,就回滚事务
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
// 重置连接对象的上次使用时间和检查时间(调用valid)为当前时间戳
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
// 将其添加到活跃连接集合中
state.activeConnections.add(conn);
// 增加池请求连接成功数量
state.requestCount++;
// 累计请求时间
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
// 无效连接对象,则打印日志
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
// 统计无效连接数量
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
// 如果获取到的无效连接数量超过poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance之和,则会执行这里,抛出SQLException异常...
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
// while循环结束,返回pooledConnection对象
return conn;
}
以上就是整个数据源连接池中关于获取连接的代码逻辑分析,下面用一个流程图总结一下
当我们获取到连接对象PooledConnection
之后,还需要调用其getProxyConnection()
方法获取
PooledConnection对象中封装的JDK代理对象
返回,Mybatis拿到这个Connection代理对象后就可以使用了,比如获取预处理语句
,执行
等方法...
介绍完了整个获取连接的流程,我们下面分析一下释放的逻辑,比如Mybatis拿到代理对象Connection之后执行完一整套SQL相关方法之后,需要调用代理对象的close方法,其close方法调用之后就会进入到PooledConnection(InvocationHandler)的invoke方法中,invoke中我们上面看到,当执行方法为close时会直接调用dataSource.pushConnection(this);
,因此我们看PooledDataSource的pushConnection
方法源码
protected void pushConnection(PooledConnection conn) throws SQLException {
// 同样需要加锁同步
synchronized (state) {
// 将连接对象PooledConnection对象从池的活跃集合中移除
state.activeConnections.remove(conn);
if (conn.isValid()) {
// 移除的活跃连接有效,就判断当前池的空闲连接数以达到上限,以及PooledConnection 是否为该连接池的连接
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
// 统计累计校验时间
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
// 为返还连接生成一个新的PooledConnection对象,只是使用了老的真实连接对象
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
// 增加池空闲连接对象
state.idleConnections.add(newConn);
// 新连接对象使用老连接对象的创建时间和上次使用时间
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
// 老的连接对象失效处理
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
// 唤醒阻塞在等待空闲连接的线程
state.notifyAll();
} else {
// 空闲连接数已达上限,则关闭真实的数据库连接,并失效连接对象PooledConnection
// 统计累计校验时间
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
conn.invalidate();
}
} else {
// 移除的连接对象无效了,打印log,并统计池状态的badConnectionCount数量
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
}
state.badConnectionCount++;
}
}
}
源码相对popConnection来说比较少,逻辑比较简单
需要注意的是PooledConnection的isValid方法,该方法不仅会检测boolean属性valid,而且会尝试调用PooledDataSource的pingConnection(this)方法让数据库执行poolPingQuery属性中保存的测试SQL语句,从而确保该真实连接对象的真实可用
public boolean isValid() {
return valid && realConnection != null && dataSource.pingConnection(this);
}
pingConnection(PooledConnection conn)
protected boolean pingConnection(PooledConnection conn) {
// 记录ping 操作是否成功
boolean result = true;
try {
result = !conn.getRealConnection().isClosed();
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
result = false;
}
// 当真实连接对象确实没有关闭
if (result) {
// 校验是否启动Ping
if (poolPingEnabled) {
// 只有长时间(超过poolPingConnectionsNotUsedFor指定的时长)未使用的连接才需要Ping操作
if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
// conn.getTimeElapsedSinceLastUse()的时间就是采用当前时间 - lastUsedTimestamp时间得来的
try {
if (log.isDebugEnabled()) {
log.debug("Testing connection " + conn.getRealHashCode() + " ...");
}
// 1. 获取真实连接对象
Connection realConn = conn.getRealConnection();
// 2. 获取预处理语句
Statement statement = realConn.createStatement();
// 3. 执行poolPingQuery SQL
ResultSet rs = statement.executeQuery(poolPingQuery);
// 关闭ResultSet
rs.close();
// 关闭预处理语句
statement.close();
if (!realConn.getAutoCommit()) {
// 回滚事务
realConn.rollback();
}
// 执行完毕返回true
result = true;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
}
} catch (Exception e) {
log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
try {
// 执行SQL发生异常,则关闭真实连接对象
conn.getRealConnection().close();
} catch (Exception e2) {
//ignore
}
// 返回false
result = false;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
}
}
}
}
return result;
}
PooledDataSource如何创建
PooledDataSource和上文中非池化数据源一样,也是采用了工厂方法模式创建数据源实例,因此,Mybatis额外提供一个工厂实现类PooledDataSourceFactory
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
public PooledDataSourceFactory() {
this.dataSource = new PooledDataSource();
}
}
可以看出这里继承了UnpooledDataSourceFactory
,只是提供了自己的构造函数用来初始化dataSource属性为PooledDataSource
而PooledDataSource的构造函数实际上就是new了一个UnPooledDataSource
public PooledDataSource() {
dataSource = new UnpooledDataSource();
}
总结
以上就是关于Mybatis中PooledDataSource数据源的所有源码分析