SpringBoot 2.2.5 整合Sharding-JDBC 4.1.1 自定义复合分片算法进行分库分表,并配置主从分离

说明

  1. 顶顶大名的分库分表中间件,废话不多说,官网地址:https://shardingsphere.apache.org/
  2. 本文中数据库用的是mysql5.7,并且实现了一主一从。
  3. 场景是订单表的分表,并且要支持只根据user_id进行查询的场景,所以要将用户的标识信息放到主键order_id中,这样才能既能只根据主键order_id进行查询,又能只根据user_id进行查询。
  4. 顺便支持一下主从分离,这个比较简单,加一下配置即可
  5. 完整代码地址在结尾!!

官方简介

  1. 定位为轻量级Java框架,在Java的JDBC层提供的额外服务。 它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架。
  2. 适用于任何基于JDBC的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC。
  3. 支持任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP等。
  4. 支持任意实现JDBC规范的数据库。目前支持MySQL,Oracle,SQLServer,PostgreSQL以及任何遵循SQL92标准的数据库。
image.png

第一步,在pom.xml加入依赖,如下

<!-- MySQL驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>
<!-- mybatisPlus 核心库 -->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.3.1</version>
</dependency>
<!-- sharding-jdbc -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
    <version>4.1.1</version>
</dependency>
<!-- hutool工具 -->
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.7.5</version>
</dependency>

注:

  1. 本文的ORM框架使用的是MyBatis-Plus。
  2. hutool工具用于生成自定义id。

第二步,在application.yml配置shardingsphere,mybatis-plus相关配置

spring:
  application:
    name: shardingjdbc-demo-server
  shardingsphere:
    datasource:
      # 数据源
      names: master,salve
      master:
        driver-class-name: com.mysql.cj.jdbc.Driver
        password: root
        type: com.zaxxer.hikari.HikariDataSource
        jdbc-url: jdbc:mysql://xxx:3306/db1
        username: root
      salve:
        driver-class-name: com.mysql.cj.jdbc.Driver
        password: root
        type: com.zaxxer.hikari.HikariDataSource
        jdbc-url: jdbc:mysql://xxx:3306/db2
        username: root
    sharding:
      # 主从分离
      master-slave-rules:
        master:
          master-data-source-name: master
          slave-data-source-names: salve
      # 表分片
      tables:
        my_order:
          # 主表分片规则表名
          actual-data-nodes: master.my_order_$->{0..3}
          # 主键策略
#          key-generator:
#            column: id
#            type: MyShardingKeyGenerator
          table-strategy:
            # 行表达式分片
#            inline:
#              algorithm-expression: order_$->{id.longValue() % 4}
#              sharding-column: id
            # 标准分片
#            standard:
#              sharding-column: id
              # 指定自定义分片算法类的全路径
#              precise-algorithm-class-name: com.jinhx.shardingjdbc.config.MyPreciseShardingAlgorithm
            # 复合分片
            complex:
              # 分片键
              sharding-columns: order_id,user_id
              # 指定自定义分片算法类的全路径
              algorithm-class-name: com.jinhx.shardingjdbc.config.MyComplexKeysShardingAlgorithm
#          defaultTableStrategy:
    # 打开sql控制台输出日志
    props:
      sql:
        show: true

# mybatis-plus相关配置
mybatis-plus:
  # xml扫描,多个目录用逗号或者分号分隔(告诉 Mapper 所对应的 XML 文件位置)
  mapper-locations: classpath:com/jinhx/shardingjdbc/mapper/xml/*.xml
  # 别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.jinhx.shardingjdbc.entity
  configuration:
    # 不开启二级缓存
    cache-enabled: false
    # 是否开启自动驼峰命名规则映射:从数据库列名到Java属性驼峰命名的类似映射
    map-underscore-to-camel-case: true
    # 如果查询结果中包含空值的列,则 MyBatis 在映射的时候,不会映射这个字段
    call-setters-on-nulls: true
    # 这个配置会将执行的sql打印出来,在开发或测试的时候可以用
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

server:
  port: 8093

第三步,在主库创建数据库db1,创建订单表,如下

说明

  1. 本文不进行分库,只分4个表,分别为my_order_0,my_order_1,my_order_2,my_order_3

sql

CREATE DATABASE db1;

use db1;

create table my_order_0
(
    order_id bigint not null comment '订单id主键'
        primary key,
    user_id  bigint not null comment '用户id',
    money    bigint not null comment '金额'
)
    comment '用户订单表';

 其他表结构一致,此处省略

第四步,创建表操作相应类

  1. 使用mybatis-plus的代码生成器对数据库的表生成相应的类,不懂的请参考另外一篇文章-SpringBoot 2.2.5 整合MyBatis-Plus 3.3.1 教程,配置多数据源并支持事务,附带代码生成器使用教程
  2. 手动创建,包括Order,IOrderService,OrderServiceImpl等,如下

Order

package com.jinhx.shardingjdbc.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.jinhx.shardingjdbc.util.SnowFlakeUtil;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

import java.io.Serializable;
import java.util.Objects;

/**
 * Order
 *
 * @author jinhx
 * @since 2021-07-27
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("my_order")
public class Order implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 分表的数量,一定要2的n次方
     */
    public static final int TABLE_COUNT = 4;

    /**
     * 订单id主键
     */
    @TableId(type = IdType.INPUT)
    private Long orderId;

    /**
     * 用户id
     */
    private Long userId;

    /**
     * 金额
     */
    private Long money;

    public void buildOrderId(){
        if (Objects.isNull(this.userId)){
            throw new RuntimeException("userId为空,无法生成orderId");
        }
        this.orderId = SnowFlakeUtil.getSnowflakeId(SnowFlakeUtil.getDataCenterId(this.userId) & (TABLE_COUNT - 1));
    }

    public void buildUserId(Integer dataCenterId){
        if (Objects.isNull(dataCenterId)){
            throw new RuntimeException("dataCenterId为空,无法生成userId");
        }
        this.userId = SnowFlakeUtil.getSnowflakeId(dataCenterId & (TABLE_COUNT - 1));
    }

}

IOrderService

package com.jinhx.shardingjdbc.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.jinhx.shardingjdbc.entity.Order;

import java.util.List;

/**
 * IOrderService
 *
 * @author jinhx
 * @since 2021-07-27
 */
public interface IOrderService extends IService<Order> {

    /**
     * 根据orderIds查询
     *
     * @param orderIds orderIds
     * @return List<Order>
     */
    List<Order> selectByOrderIds(List<Long> orderIds);

    /**
     * 根据userIds查询
     *
     * @param userIds userIds
     * @return List<Order>
     */
    List<Order> selectByUserIds(List<Long> userIds);

    /**
     * 批量插入
     *
     * @param orders orders
     */
    void insertOrders(List<Order> orders);

}

OrderServiceImpl

package com.jinhx.shardingjdbc.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.jinhx.shardingjdbc.entity.Order;
import com.jinhx.shardingjdbc.mapper.OrderMapper;
import com.jinhx.shardingjdbc.service.IOrderService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

/**
 * OrderServiceImpl
 *
 * @author jinhx
 * @since 2021-07-27
 */
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService {

    /**
     * 根据orderIds查询
     *
     * @param orderIds orderIds
     * @return List<Order>
     */
    @Override
    public List<Order> selectByOrderIds(List<Long> orderIds) {
        return baseMapper.selectBatchIds(orderIds);
    }

    /**
     * 根据userIds查询
     *
     * @param userIds userIds
     * @return List<Order>
     */
    @Override
    public List<Order> selectByUserIds(List<Long> userIds) {
        return baseMapper.selectList(new LambdaQueryWrapper<Order>()
                .in(CollectionUtils.isNotEmpty(userIds), Order::getUserId, userIds));
    }

    /**
     * 批量插入
     *
     * @param orders orders
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void insertOrders(List<Order> orders) {
        if (CollectionUtils.isNotEmpty(orders)){
            if (orders.stream().mapToInt(item -> baseMapper.insert(item)).sum() != orders.size()){
                log.error("批量插入order表失败 orders={}" + orders);
                throw new RuntimeException("批量插入order表失败");
            }
        }
    }

}

第五步,配置MybatisPlus,如下

1. 在启动类MybatisplusApplication新增@MapperScan注解,里面写入生成的文件中的mapper存放的路径,用于扫描mapper文件。

package com.jinhx.shardingjdbc;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@MapperScan("com.jinhx.shardingjdbc.mapper")
@SpringBootApplication
public class ShardingjdbcApplication {

    public static void main(String[] args) {
        SpringApplication.run(ShardingjdbcApplication.class, args);
    }

}

2. 创建MybatisPlus配置类,MybatisPlusConfig,主要是配置一些插件的使用,此步可省略

package com.jinhx.shardingjdbc.config;

import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.plugins.pagination.optimize.JsqlParserCountOptimize;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;

/**
 * Mybatis-Plus配置类
 *
 * @author jinhx
 * @since 2021-07-27
 */
@EnableTransactionManagement
@Configuration
public class MybatisPlusConfig {

    /**
     * mybatis-plus SQL执行效率插件【生产环境可以关闭】,设置 dev test 环境开启
     */
//    @Bean
//    @Profile({"dev", "qa"})
//    public PerformanceInterceptor performanceInterceptor() {
//        PerformanceInterceptor performanceInterceptor = new PerformanceInterceptor();
//        performanceInterceptor.setMaxTime(1000);
//        performanceInterceptor.setFormat(true);
//        return performanceInterceptor;
//    }

    /**
     * 分页插件
     */
    @Bean
    public PaginationInterceptor paginationInterceptor() {
        PaginationInterceptor paginationInterceptor = new PaginationInterceptor();
        // 设置请求的页面大于最大页后操作, true调回到首页,false 继续请求  默认false
         paginationInterceptor.setOverflow(false);
        // 设置最大单页限制数量,默认 500 条,-1 不受限制
         paginationInterceptor.setLimit(500);
        // 开启 count 的 join 优化,只针对部分 left join
        paginationInterceptor.setCountSqlParser(new JsqlParserCountOptimize(true));
        return paginationInterceptor;
    }
}

第六步,创建自定义复合分片算法类MyComplexKeysShardingAlgorithm,注意自己替换application.yml里面的全路径

package com.jinhx.shardingjdbc.config;

import com.jinhx.shardingjdbc.util.SnowFlakeUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * 配置Sharding-JDBC复合分片算法
 * 根据id和age计算,来确定是路由到那个表中
 * 目前处理 = 和 in 操作,其余的操作,比如 >、< 等范围操作均不支持。
 *
 * @author jinhx
 * @since 2021-07-27
 */
@Slf4j
public class MyComplexKeysShardingAlgorithm implements ComplexKeysShardingAlgorithm<Long> {

    /**
     * orderId
     */
    private static final String COLUMN_ORDER_ID = "order_id";

    /**
     * userId
     */
    private static final String COLUMN_USER_ID = "user_id";

    /**
     * 重写复合分片算法
     */
    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames, ComplexKeysShardingValue<Long> shardingValue) {
        if (!shardingValue.getColumnNameAndRangeValuesMap().isEmpty()) {
            throw new RuntimeException("条件全部为空,无法路由到具体的表,暂时不支持范围查询");
        }

        // 获取orderId
        Collection<Long> orderIds = shardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_ORDER_ID, new ArrayList<>(1));
        // 获取userId
        Collection<Long> userIds = shardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_USER_ID, new ArrayList<>(1));

        if (CollectionUtils.isEmpty(orderIds) && CollectionUtils.isEmpty(userIds)) {
            throw new RuntimeException("orderId,userId字段同时为空,无法路由到具体的表,暂时不支持范围查询");
        }

        // 获取最终要查询的表后缀序号的集合,入参顺序不能颠倒
        List<Integer> tableNos = getTableNoList(orderIds, userIds);

        return tableNos.stream()
                // 对可用的表数量求余数,获取到真实的表的后缀
//                .map(idSuffix -> String.valueOf(idSuffix % availableTargetNames.size()))
                // 拼接获取到真实的表
                .map(tableSuffix -> availableTargetNames.stream().filter(targetName -> targetName.endsWith(String.valueOf(tableSuffix))).findFirst().orElse(null))
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    /**
     * 获取最终要查询的表后缀序号的集合
     *
     * @param orderIds orderId字段集合
     * @param userIds userId字段集合
     * @return 最终要查询的表后缀序号的集合
     */
    private List<Integer> getTableNoList(Collection<Long> orderIds, Collection<Long> userIds) {
        List<Integer> result = new ArrayList<>();
        if (CollectionUtils.isNotEmpty(orderIds)){
            // 获取表位信息
            result.addAll(orderIds.stream()
                    .filter(item -> Objects.nonNull(item) && item > 0)
                    .map(item -> (int) SnowFlakeUtil.getDataCenterId(item))
                    .collect(Collectors.toList()));
        }

        if (CollectionUtils.isNotEmpty(userIds)) {
            // 获取表位信息
            result.addAll(userIds.stream().filter(item -> Objects.nonNull(item) && item > 0)
                    .map(item -> (int) SnowFlakeUtil.getDataCenterId(item))
                    .collect(Collectors.toList()));
        }

        if (CollectionUtils.isNotEmpty(result)) {
            log.info("SharingJDBC解析路由表后缀成功 redEnvelopeIds={} uids={} 路由表后缀列表={}", orderIds, userIds, result);
            // 合并去重
            return result.stream().distinct().collect(Collectors.toList());
        }
        log.error("SharingJDBC解析路由表后缀失败 redEnvelopeIds={} uids={}", orderIds, userIds);
        throw new RuntimeException("orderId,userId解析路由表后缀为空,无法路由到具体的表,暂时不支持范围查询");
    }

}

第七步,编写单元测试类,ShardingjdbcApplicationTests,并进行测试

测试步骤

  1. 先运行insertOrdersTest方法向数据库插入数据,跑完分别查看四个表,是否都有数据,且理论上来说应该是数据均匀
  2. 分别从4个表里面随机捞几条数据的order_id出来,然后运行selectByOrderIdsTest,看是否都能查出数据,此步骤是为了验证只根据order_id进行路由查询是否正常
  3. 分别从4个表里面随机捞几条数据的user_id出来,然后运行selectByUserIdsTest,看是否都能查出数据,此步骤是为了验证只根据user_id进行路由查询是否正常

ShardingjdbcApplicationTests

package com.jinhx.shardingjdbc;

import com.jinhx.shardingjdbc.entity.Order;
import com.jinhx.shardingjdbc.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.List;

@Slf4j
// 获取启动类,加载配置,确定装载 Spring 程序的装载方法,它回去寻找 主配置启动类(被 @SpringBootApplication 注解的)
@SpringBootTest
class ShardingjdbcApplicationTests {

    @Autowired
    private IOrderService iOrderService;

    @Test
    void selectByOrderIdsTest() {
        List<Long> orderIds = Lists.newArrayList(1443844581547311109L, 1443844581547442181L, 1443844581547573255L, 1443844581547704327L);
        log.info(iOrderService.selectByOrderIds(orderIds).toString());
    }

    @Test
    void selectByUserIdsTest() {
        List<Long> userIds = Lists.newArrayList(1443844581547311108L, 1443844581547311106L, 1443844581547442180L, 1443844581547704326L);
        log.info(iOrderService.selectByUserIds(userIds).toString());
    }

    @Test
    void insertOrdersTest() {
        List<Order> orders = Lists.newArrayList();
        for (int i = 1;i < 100;i++){
            Order order = new Order();
            order.buildUserId(i);
            order.setMoney(i * 1000L);
            order.buildOrderId();
            orders.add(order);
        }
        log.info("orders={}", orders);
        iOrderService.insertOrders(orders);
    }

    @BeforeEach
    void testBefore(){
        log.info("测试开始!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    }

    @AfterEach
    void testAfter(){
        log.info("测试结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    }

}

完整代码地址:https://github.com/Jinhx128/springboot-demo

注:此工程包含多个module,本文所用代码均在shardingjdbc-demo模块下

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容