MongoDB自增序列实现 - Java多线程同步 synchronized 用法

在使用MongoDB的时候 (基于spring-mongo) ,我想在插入对象时获取有序自增的主键 ,但是MongoDB的默认规则是生成一串无序 (大致有序) 的字串 .而Spring Data提供的主键生成方法也是随机的 String/BigInteger.

因为分布式情况下 ,有序ID会变得困难 ( ID中心/分布式锁 )

同步问题

获取有序ID的通常做法是 :

  • 创建sequence : key-start-end-step-current 标识/起值/止值/步长/当前值
  • 获取sequence ,current作为主键值
  • 保存current = current + step到数据库作为下一个主键值

但是在多线程情况下 , 2-3可能被多个线程同时运行 ,导致sequence还未保存成功就被下一个获取.

模拟数据库Sequence模型
@AllArgsConstructor
public class Sequence {

  @Setter
  long current;

  public long getCurrent() {
    transTime();
    return current;
  }

  public void setCurrent(long current) {
    transTime();
    this.current = current;
  }

  private void transTime() {
    try {
      Thread.sleep(3);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

public class SequenceServiceTest {

  Sequence A;//A型sequence
  Sequence B;//B型sequence

  /**
   * 初始化数据
   */
  @Before
  public void before() {
    A = new Sequence(1L);
    B = new Sequence(1L);
  }

  /**
   * 模拟数据库取值
   */
  private Sequence getSequence(String name) {
    switch (name) {
      case "A":
        return A;
      case "B":
        return B;
      default:
        return null;
    }
  }

  private void waitService(ExecutorService executorService) {
    executorService.shutdown();
    try {
      while (!executorService.awaitTermination(1000, TimeUnit.SECONDS)) {
      }
      System.out.println("final A:" + A.getCurrent());
      System.out.println("final B:" + B.getCurrent());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
非同步代码读写sequence
  @Test
  public void noSynchronizedTest() {
    //10个用户同时需要获取id
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    //一部分需要A ,一部分需要B
    for (int i = 0; i < 1000; i++) {
      executorService.submit(() -> System.out.println("A:" + getNextWithNoSync("A")));
      executorService.submit(() -> System.out.println("B:" + getNextWithNoSync("B")));
    }
    waitService(executorService);
  }

  private long getNextWithNoSync(String name) {
    Sequence sequence = getSequence(name);
    long current = sequence.getCurrent();
    long next = current + 1;
    sequence.setCurrent(next);
    return current;
  }
运行时间 : 1s369ms
...
B:360
A:332
B:361
B:361
A:332
A:332
final A:333
final B:362

可以从结果看出 ,最终的sequence不是1001(从1开始取1000个) ,说明中途有重复的sequence产生 .原因就是在getNextWithNoSync函数 , get取值 在上一个set回写 之前运行 ,造成混乱. (网络传输越慢 ,影响越大)

同步代码读写sequence

因此set/get必须在一个同步代码块中 ,这段代码每次只能被一个线程访问 .

  @Test
  public void synchronizedMethodTest() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 1000; i++) {
      executorService.submit(() -> System.out.println("A:" + getNextWithSync("A")));
      executorService.submit(() -> System.out.println("B:" + getNextWithSync("B")));
    }
    waitService(executorService);
  }

  private synchronized long getNextWithSync(String name) {
    Sequence sequence = getSequence(name);
    long current = sequence.getCurrent();
    long next = current + 1;
    sequence.setCurrent(next);
    return current;
  }
运行时间 : 13s277ms
...
A:998
B:999
A:999
B:1000
A:1000
final A:1001
final B:1001

这次结果正确 ,所有ID都是有序取用的 ,但是运行的时间足足慢了10倍 !! 因为 synchronized 标记加在了getNextWithSync方法上 ,线程会等待只有一个线程能运行这个函数 .差值 = 数据库读写时间*方法执行数量 : (3+3) * 1000 * 2 = 12,000 ms .

当然 ,同步代码约束了多线程的运行 ,效率上自然有所下降 .不过我们发现 ,我们所取的AB两个Sequence是互不影响的 ,当 A.get/B.get 同时发生也是允许的 ,所以需要调整同步规则 ,只对同一个sequence取用进行同步.

同步指定对象运行代码读写sequence
  @Test
  public void synchronizedObjectTest() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    for (int i = 0; i < 1000; i++) {
      executorService.submit(() -> System.out.println("A:" + getNextWithSyncObject("A")));
      executorService.submit(() -> System.out.println("B:" + getNextWithSyncObject("B")));
    }
    waitService(executorService);
  }

  private long getNextWithSyncObject(String name) {
    Sequence sequence = getSequence(name);
    synchronized (sequence) {
      long current = sequence.getCurrent();
      long next = current + 1;
      sequence.setCurrent(next);
      return current;
    }
  }
运行时间 : 6s869ms
...
A:997
B:999
A:998
A:999
B:1000
A:1000
final A:1001
final B:1001

很容易看出 ,运行时间比上一个少了一半 .因为我们对sequence对象进行同步 ,不同的sequence对象就像多个锁 ,只有争抢同一个锁的人才需要进行等待 .

实践

@Service
public class SequenceServiceImpl
    implements SequenceService, ApplicationListener<ContextRefreshedEvent> {

  final Map<String, SequenceLock> collection2SequenceLock = new ConcurrentHashMap<>();

  @Autowired
  SequenceRepository sequenceRepository;

  /**
   * 初始化各sequence的同步锁
   */
  @Override
  public void onApplicationEvent(ContextRefreshedEvent event) {
    //避免spring-mvc中的servlet context事件
    if (event.getApplicationContext().getParent() == null) {
      for (SysSequence sequence : sequenceRepository.findAll()) {
        collection2SequenceLock.put(sequence.getCollectionName(), new SequenceLock(sequence));
        log.info("初始化sequence lock列表 : {} - {}",
            sequence.getCollectionName(), sequence.getCurrent());
      }
    }
  }

  @Override
  public void register(SysSequence sequence) {
    sequenceRepository.insert(sequence);
  }

  /**
   * 获取主键
   *
   * @param entity collection对象
   */
  @Override
  public Long getNext(Class entity) {
    String collection = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, entity.getSimpleName());
    Document docAnnotation = (Document) entity.getAnnotation(Document.class);
    if (docAnnotation != null) {
      collection = docAnnotation.collection();
    }
    return getNext(collection);
  }

  @Override
  public Long getNext(String collection) {
    if (!collection2SequenceLock.containsKey(collection)) {
      throw CheckedException.of("找不到%s表的主键sequence", collection);
    }
    return getNextWithLock(collection);
  }


  /**
   * 多线程情况下获取同一个collection的id时 ,可能出现同步问题 ;但又不能影响不同的collection的流程
   */
  private Long getNextWithLock(String collection) {
    SequenceLock lock = collection2SequenceLock.get(collection);
    synchronized (lock) {
      //已载入的批量主键
      if (lock.isBatchInit) {
        long current = lock.current;
        long next = current + lock.step;
        lock.current = next;
        if (current < lock.max) {
          log.debug("批量序列分发 - {}:{}", collection, current);
          return current;
        }
      }

      //获取主键记录值
      SysSequence sequence = sequenceRepository.findOne(collection);
      long current = sequence.getCurrent();
      long next;
      if (sequence.isBatch()) {
        //未载入的批量主键 : 直接取一段sequence
        next = current + sequence.getStep() * sequence.getBatchCount();
        //缓存到lock中
        long batchNext = current + sequence.getStep();
        lock.init(batchNext, sequence.getStep(), next);
      } else {
        next = current + sequence.getStep();
      }

      if (next > sequence.getEnd()) {
        throw CheckedException
            .of("%s表的sequence超限 - next[%s] max[%s]", collection, next, sequence.getEnd());
      }
      sequence.setCurrent(next);
      sequenceRepository.save(sequence);

      log.debug("数据库获取 - {}:{}", collection, current);
      return current;
    }
  }

  /**
   * 获取sequence的同步lock ,同时用来分发批量主键
   */
  class SequenceLock {

    String collection;
    boolean isBatchInit;

    long current = -1;
    long step = -1;
    long max = -1;

    SequenceLock(SysSequence sequence) {
      this.collection = sequence.getCollectionName();
    }

    void init(long current, long step, long max) {
      this.isBatchInit = true;
      this.current = current;
      this.step = step;
      this.max = max;
      log.debug("初始化批量序列 - {} ,当前 - {} ,步长 - {} ,分发最大值 - {}", collection, current, step, max);
    }
  }
}
测试服务
@SpringBootTest(classes = {Application.class})
@RunWith(SpringRunner.class)
public class SequenceServiceTest {

  @Autowired
  SequenceService sequenceService;
  @Autowired
  SequenceRepository sequenceRepository;

  @Before
  public void before() {
    sequenceRepository.save(
        new SysSequence()
            .setCollectionName("sys_user")
            .setStart(0L).setEnd(Long.MAX_VALUE).setStep(1L)
            .setCurrent(0L)
    );
    sequenceRepository.save(
        new SysSequence()
            .setCollectionName("sys_user2")
            .setStart(0L).setEnd(Long.MAX_VALUE).setStep(10L)
            .setCurrent(0L)
    );
    sequenceRepository.save(
        new SysSequence()
            .setCollectionName("sys_user3")
            .setStart(0L).setEnd(Long.MAX_VALUE).setStep(1L)
            .setCurrent(0L)
            .setBatch(true).setBatchCount(10)
    );
  }

  @Test
  public void getSequence1() {
    for (int i = 0; i < 1000; i++) {
      sequenceService.getNext(SysUser.class);
    }
    Assert.assertEquals(sequenceService.getNext(SysUser.class), Long.valueOf(1000L));
  }

  @Test
  public void getSequence2() {
    for (int i = 0; i < 1000; i++) {
      sequenceService.getNext("sys_user2");
    }
    Assert.assertEquals(sequenceService.getNext("sys_user2"), Long.valueOf(10000L));
  }

  @Test
  public void getSequence3() {
    for (int i = 0; i < 1000; i++) {
      sequenceService.getNext("sys_user3");
    }
    Assert.assertEquals(sequenceService.getNext("sys_user3"), Long.valueOf(1000L));
  }
}
测试结果
image.png

可以看到测试成功 ,已经可以获取到有序自增的sequence . 而批量主键的执行速度明显优于数据库存取 ,而且可以减轻数据库压力 ,在不需要严格连续的主键时 ,应该采用批量获取 ,内存发放的方式 ( 分布式ID实现方式之一 ) .

代码见 todolist-server / SequenceServiceTest.java.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容

  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,234评论 11 349
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,621评论 18 399
  • 引用自多线程编程指南应用程序里面多个线程的存在引发了多个执行线程安全访问资源的潜在问题。两个线程同时修改同一资源有...
    Mitchell阅读 1,988评论 1 7
  • 每个人心中,都或多或少做过一个这样的梦,自己拥有一家店铺,要布满鲜花,四季芳香,设计有格调,给客人最好的服务,无论...
    张非一阅读 678评论 0 3
  • 大概因为从小就没有运动细胞,夏曼曼格外抵触运动会,毕竟在坐在露天的操场看台上,承受着阳光的暴晒,不时地还有学生会人...
    就是宁姐姐呀阅读 1,122评论 4 6