在使用MongoDB的时候 (基于spring-mongo) ,我想在插入对象时获取有序自增的主键 ,但是MongoDB的默认规则是生成一串无序 (大致有序) 的字串 .而Spring Data提供的主键生成方法也是随机的 String/BigInteger.
因为分布式情况下 ,有序ID会变得困难 ( ID中心/分布式锁 )
同步问题
获取有序ID的通常做法是 :
- 创建sequence :
key-start-end-step-current
标识/起值/止值/步长/当前值 - 获取sequence ,
current
作为主键值 - 保存
current = current + step
到数据库作为下一个主键值
但是在多线程情况下 , 2-3可能被多个线程同时运行 ,导致sequence还未保存成功就被下一个获取.
模拟数据库Sequence模型
@AllArgsConstructor
public class Sequence {
@Setter
long current;
public long getCurrent() {
transTime();
return current;
}
public void setCurrent(long current) {
transTime();
this.current = current;
}
private void transTime() {
try {
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class SequenceServiceTest {
Sequence A;//A型sequence
Sequence B;//B型sequence
/**
* 初始化数据
*/
@Before
public void before() {
A = new Sequence(1L);
B = new Sequence(1L);
}
/**
* 模拟数据库取值
*/
private Sequence getSequence(String name) {
switch (name) {
case "A":
return A;
case "B":
return B;
default:
return null;
}
}
private void waitService(ExecutorService executorService) {
executorService.shutdown();
try {
while (!executorService.awaitTermination(1000, TimeUnit.SECONDS)) {
}
System.out.println("final A:" + A.getCurrent());
System.out.println("final B:" + B.getCurrent());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
非同步代码读写sequence
@Test
public void noSynchronizedTest() {
//10个用户同时需要获取id
ExecutorService executorService = Executors.newFixedThreadPool(10);
//一部分需要A ,一部分需要B
for (int i = 0; i < 1000; i++) {
executorService.submit(() -> System.out.println("A:" + getNextWithNoSync("A")));
executorService.submit(() -> System.out.println("B:" + getNextWithNoSync("B")));
}
waitService(executorService);
}
private long getNextWithNoSync(String name) {
Sequence sequence = getSequence(name);
long current = sequence.getCurrent();
long next = current + 1;
sequence.setCurrent(next);
return current;
}
运行时间 : 1s369ms
...
B:360
A:332
B:361
B:361
A:332
A:332
final A:333
final B:362
可以从结果看出 ,最终的sequence不是1001(从1开始取1000个) ,说明中途有重复的sequence产生 .原因就是在getNextWithNoSync
函数 , get取值 在上一个set回写 之前运行 ,造成混乱. (网络传输越慢 ,影响越大)
同步代码读写sequence
因此set/get必须在一个同步代码块中 ,这段代码每次只能被一个线程访问 .
@Test
public void synchronizedMethodTest() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.submit(() -> System.out.println("A:" + getNextWithSync("A")));
executorService.submit(() -> System.out.println("B:" + getNextWithSync("B")));
}
waitService(executorService);
}
private synchronized long getNextWithSync(String name) {
Sequence sequence = getSequence(name);
long current = sequence.getCurrent();
long next = current + 1;
sequence.setCurrent(next);
return current;
}
运行时间 : 13s277ms
...
A:998
B:999
A:999
B:1000
A:1000
final A:1001
final B:1001
这次结果正确 ,所有ID都是有序取用的 ,但是运行的时间足足慢了10倍 !! 因为 synchronized 标记加在了getNextWithSync
方法上 ,线程会等待只有一个线程能运行这个函数 .差值 = 数据库读写时间*方法执行数量 : (3+3) * 1000 * 2 = 12,000 ms
.
当然 ,同步代码约束了多线程的运行 ,效率上自然有所下降 .不过我们发现 ,我们所取的AB两个Sequence是互不影响的 ,当 A.get/B.get 同时发生也是允许的 ,所以需要调整同步规则 ,只对同一个sequence取用进行同步.
同步指定对象运行代码读写sequence
@Test
public void synchronizedObjectTest() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.submit(() -> System.out.println("A:" + getNextWithSyncObject("A")));
executorService.submit(() -> System.out.println("B:" + getNextWithSyncObject("B")));
}
waitService(executorService);
}
private long getNextWithSyncObject(String name) {
Sequence sequence = getSequence(name);
synchronized (sequence) {
long current = sequence.getCurrent();
long next = current + 1;
sequence.setCurrent(next);
return current;
}
}
运行时间 : 6s869ms
...
A:997
B:999
A:998
A:999
B:1000
A:1000
final A:1001
final B:1001
很容易看出 ,运行时间比上一个少了一半 .因为我们对sequence对象进行同步 ,不同的sequence对象就像多个锁 ,只有争抢同一个锁的人才需要进行等待 .
实践
@Service
public class SequenceServiceImpl
implements SequenceService, ApplicationListener<ContextRefreshedEvent> {
final Map<String, SequenceLock> collection2SequenceLock = new ConcurrentHashMap<>();
@Autowired
SequenceRepository sequenceRepository;
/**
* 初始化各sequence的同步锁
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//避免spring-mvc中的servlet context事件
if (event.getApplicationContext().getParent() == null) {
for (SysSequence sequence : sequenceRepository.findAll()) {
collection2SequenceLock.put(sequence.getCollectionName(), new SequenceLock(sequence));
log.info("初始化sequence lock列表 : {} - {}",
sequence.getCollectionName(), sequence.getCurrent());
}
}
}
@Override
public void register(SysSequence sequence) {
sequenceRepository.insert(sequence);
}
/**
* 获取主键
*
* @param entity collection对象
*/
@Override
public Long getNext(Class entity) {
String collection = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, entity.getSimpleName());
Document docAnnotation = (Document) entity.getAnnotation(Document.class);
if (docAnnotation != null) {
collection = docAnnotation.collection();
}
return getNext(collection);
}
@Override
public Long getNext(String collection) {
if (!collection2SequenceLock.containsKey(collection)) {
throw CheckedException.of("找不到%s表的主键sequence", collection);
}
return getNextWithLock(collection);
}
/**
* 多线程情况下获取同一个collection的id时 ,可能出现同步问题 ;但又不能影响不同的collection的流程
*/
private Long getNextWithLock(String collection) {
SequenceLock lock = collection2SequenceLock.get(collection);
synchronized (lock) {
//已载入的批量主键
if (lock.isBatchInit) {
long current = lock.current;
long next = current + lock.step;
lock.current = next;
if (current < lock.max) {
log.debug("批量序列分发 - {}:{}", collection, current);
return current;
}
}
//获取主键记录值
SysSequence sequence = sequenceRepository.findOne(collection);
long current = sequence.getCurrent();
long next;
if (sequence.isBatch()) {
//未载入的批量主键 : 直接取一段sequence
next = current + sequence.getStep() * sequence.getBatchCount();
//缓存到lock中
long batchNext = current + sequence.getStep();
lock.init(batchNext, sequence.getStep(), next);
} else {
next = current + sequence.getStep();
}
if (next > sequence.getEnd()) {
throw CheckedException
.of("%s表的sequence超限 - next[%s] max[%s]", collection, next, sequence.getEnd());
}
sequence.setCurrent(next);
sequenceRepository.save(sequence);
log.debug("数据库获取 - {}:{}", collection, current);
return current;
}
}
/**
* 获取sequence的同步lock ,同时用来分发批量主键
*/
class SequenceLock {
String collection;
boolean isBatchInit;
long current = -1;
long step = -1;
long max = -1;
SequenceLock(SysSequence sequence) {
this.collection = sequence.getCollectionName();
}
void init(long current, long step, long max) {
this.isBatchInit = true;
this.current = current;
this.step = step;
this.max = max;
log.debug("初始化批量序列 - {} ,当前 - {} ,步长 - {} ,分发最大值 - {}", collection, current, step, max);
}
}
}
测试服务
@SpringBootTest(classes = {Application.class})
@RunWith(SpringRunner.class)
public class SequenceServiceTest {
@Autowired
SequenceService sequenceService;
@Autowired
SequenceRepository sequenceRepository;
@Before
public void before() {
sequenceRepository.save(
new SysSequence()
.setCollectionName("sys_user")
.setStart(0L).setEnd(Long.MAX_VALUE).setStep(1L)
.setCurrent(0L)
);
sequenceRepository.save(
new SysSequence()
.setCollectionName("sys_user2")
.setStart(0L).setEnd(Long.MAX_VALUE).setStep(10L)
.setCurrent(0L)
);
sequenceRepository.save(
new SysSequence()
.setCollectionName("sys_user3")
.setStart(0L).setEnd(Long.MAX_VALUE).setStep(1L)
.setCurrent(0L)
.setBatch(true).setBatchCount(10)
);
}
@Test
public void getSequence1() {
for (int i = 0; i < 1000; i++) {
sequenceService.getNext(SysUser.class);
}
Assert.assertEquals(sequenceService.getNext(SysUser.class), Long.valueOf(1000L));
}
@Test
public void getSequence2() {
for (int i = 0; i < 1000; i++) {
sequenceService.getNext("sys_user2");
}
Assert.assertEquals(sequenceService.getNext("sys_user2"), Long.valueOf(10000L));
}
@Test
public void getSequence3() {
for (int i = 0; i < 1000; i++) {
sequenceService.getNext("sys_user3");
}
Assert.assertEquals(sequenceService.getNext("sys_user3"), Long.valueOf(1000L));
}
}
测试结果
可以看到测试成功 ,已经可以获取到有序自增的sequence . 而批量主键的执行速度明显优于数据库存取 ,而且可以减轻数据库压力 ,在不需要严格连续的主键时 ,应该采用批量获取 ,内存发放的方式 ( 分布式ID实现方式之一 ) .