一、背景
sentinel推模式源码中的数据源是可扩展的,源码中只有几种数据源:redis、Apollo、Nacos、Zookeeper等;
拉模式中只有一种
二、具体实现
以mysql为例,作为数据库的数据源
1、定义一个MysqlDataSource
需要继承AutoRefreshDataSource<S,T>,然后这是一个模板类,第一个参数表示从DataSource中读取的数据格式是什么类型,第二个参数表示最后需要转换成什么类型(在这里分别是从数据库读取出来的List, 和转换后的List)
package com.高振芳.sentinel.controller;
import com.alibaba.csp.sentinel.datasource.AutoRefreshDataSource;
import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.高振芳.sentinel.entity.ResourceRoleQps;
import com.高振芳.sentinel.config.DataSourceUtils;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
/**
* @program: sentinel
* @description: sentinel集成sql规则数据源
* @author: Gaozf
* @create: 2020-08-11 10:38
**/
public class MysqlRefreshableDataSource extends AutoRefreshDataSource<List<ResourceRoleQps>,List<FlowRule>> {
private static final long DEFAULT_REFRESH_MS = 3000;
public MysqlRefreshableDataSource(Converter<List<ResourceRoleQps>, List<FlowRule>> configParser) {
super(configParser, DEFAULT_REFRESH_MS);
firstLoad();
}
private void firstLoad() {
try {
List<FlowRule> newValue = loadConfig();
getProperty().updateValue(newValue);
} catch (Throwable e) {
System.out.println(e);
RecordLog.info("loadConfig exception", e);
}
}
@Override
public List<ResourceRoleQps> readSource() throws Exception {
JdbcTemplate jdbcTemplate = new JdbcTemplate(DataSourceUtils.getDataSource());
return jdbcTemplate.query("select id, app_id, api, limit_qps, create_at from resource_role_qps"
,new RowMapper<ResourceRoleQps>(){
@Override
public ResourceRoleQps mapRow(ResultSet resultSet, int i) throws SQLException {
return new ResourceRoleQps(resultSet.getLong("id"),
resultSet.getString("app_id"),
resultSet.getString("api"),
resultSet.getLong("limit_qps"),
resultSet.getLong("create_at"));
}
});
}
@Override
public void close() throws Exception {
super.close();
}
}
readSource()方法就是为了从数据源读取所需数据
这里需要注意了:不可以使用注解的方式通过mybatis从数据库拉取数据,因为这个类时不能通过注解进行注入,没有无参构造函数,所以通过注解注入Mapper类去对数据库进行查找是不可以的。我这里实现是通过jdbc的方式。
2、调用、实现
@GetMapping("/gzf1111")
@SentinelResource(value = "gzf1111",blockHandler = "blockMehord")
public String hello(){
ReadableDataSource readableDataSource = new MysqlRefreshableDataSource(source ->
source.stream().map(openApiAppIdApiQps -> {
FlowRule flowRule = new FlowRule();
flowRule.setResource(openApiAppIdApiQps.getApi());
flowRule.setCount(openApiAppIdApiQps.getLimitQps());
flowRule.setLimitApp(openApiAppIdApiQps.getAppId());
flowRule.setGrade(1);
flowRule.setStrategy(0);
flowRule.setControlBehavior(0);
return flowRule;
}).collect(Collectors.toList())
);
// 自定义拉取数据源
FlowRuleManager.register2Property(readableDataSource.getProperty());
return "hello world";
}
public String blockMehord(BlockException ex) {
return "熔断";
}
3、JdbcTemplate实现方式
maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.16</version>
</dependency>
配置文件datasource.properties中数据库配置
driverClassName=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://xxx.xxx.xxx.xxx:3306/sentinel?characterEncoding=utf-8
username=root
password=root
initialSize=5
maxActive=10
maxWait=2000
配置参数说明
Jdbc工具类
public class DataSourceUtils {
private static DataSource dataSource;
private static ThreadLocal<Connection> tl = new ThreadLocal<Connection>();
//在静态代码块中初始化连接池
static {
Properties properties = new Properties();
try {
//getResourceAsStream(String name) 在模块中查找指定名称的文件,返回该文件的输入流
properties.load(DataSourceUtils.class.getClassLoader().getResourceAsStream("datasource.properties"));
dataSource = DruidDataSourceFactory.createDataSource(properties);
} catch (Exception e) {
e.printStackTrace();
}
}
// 直接可以获取一个连接池
public static DataSource getDataSource() {
return dataSource;
}
// 获取连接对象
public static Connection getConnection() throws SQLException {
Connection con = tl.get();
if (con == null) {
con = dataSource.getConnection();
tl.set(con);
}
return con;
}
// 开启事务
public static void startTransaction() throws SQLException {
Connection con = getConnection();
if (con != null) {
con.setAutoCommit(false);
}
}
// 事务回滚
public static void rollback() throws SQLException {
Connection con = getConnection();
if (con != null) {
con.rollback();
}
}
// 提交并且 关闭资源及从ThreadLocall中释放
public static void commitAndRelease() throws SQLException {
Connection con = getConnection();
if (con != null) {
con.commit(); // 事务提交
con.close();// 关闭资源
tl.remove();// 从线程绑定中移除
}
}
// 关闭资源方法
public static void closeConnection() throws SQLException {
Connection con = getConnection();
if (con != null) {
con.close();
tl.remove();// 从线程绑定中移除
}
}
public static void closeStatement(Statement st) throws SQLException {
if (st != null) {
st.close();
}
}
public static void closeResultSet(ResultSet rs) throws SQLException {
if (rs != null) {
rs.close();
}
}
}
实现原理看另一篇DataSource源码解析