Snowflake算法
snowflake是Twitter开源的分布式ID生成算法,保证业务集群中所有机器在某个时间点都能生成一个64 bits的唯一ID(long)。如下图所示,sign为固定1bit符号标识,即生成的ID为正数,时间戳描述了ID生成时间,工作节点用来区分集群不同机器,并发序列保证相同时间、相同机器ID递增。
本文主要讲解国内比较流行的两种开源分布式ID生成方法:百度UidGenerator和美团Leaf。
百度UidGenerator
UidGenerator提供了两种生成方案:DefaultUidGenerator、CachedUidGenerator。
DefaultUidGenerator
核心代码如下:
protected synchronized long nextId() {
long currentSecond = getCurrentSecond();
// Clock moved backwards, refuse to generate uid
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
}
// At the same second, increase sequence
if (currentSecond == lastSecond) {
sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
// Exceed the max sequence, we wait the next second to generate uid
if (sequence == 0) {
currentSecond = getNextSecond(lastSecond);
}
// At the different second, sequence restart from zero
} else {
sequence = 0L;
}
lastSecond = currentSecond;
// Allocate bits for UID
return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}
该方法简单粗暴,上来就是synchronized锁,保证多线程获取id的情况下只有一个线程能进入此方法,其中工作节点workerId是每次业务启动时向数据库表WORKER_NODE插入一条数据后拿到的最新记录自增ID。
public long assignWorkerId() {
// build worker node entity
WorkerNodeEntity workerNodeEntity = buildWorkerNode();
// add worker node for new (ignore the same IP + PORT)
workerNodeDAO.addWorkerNode(workerNodeEntity);
LOGGER.info("Add worker node:" + workerNodeEntity);
return workerNodeEntity.getId();
}
缺点很明显:1、锁粒度比较大、容易造成多线程并发时等待;2、每次都要计算才能拿到最新ID。
CachedUidGenerator
CachedUidGenerator采用了预取的方式,使用两个循环队列,Uid-RingBuffer用于存储预取的Uid、Flag-RingBuffer用于存储Uid状态(是否可填充、是否可消费)。
循环队列用数组实现,由于数组元素在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题,为此在Tail、Cursor指针、Flag-RingBuffer中采用了CacheLine 补齐方式。
RingBuffer填充时机
初始化预填充:RingBuffer初始化时,预先填充满整个RingBuffer。
即时填充:Take消费时,即时检查剩余可用slot量(tail - cursor),如小于设定阈值,则补全空闲slots。
周期填充:通过Schedule线程,定时补全空闲slots。
核心代码如下:
public long take() {
// spin get next available cursor
long currentCursor = cursor.get();
long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
// check for safety consideration, it never occurs
Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");
// trigger padding in an async-mode if reach the threshold
long currentTail = tail.get();
if (currentTail - nextCursor < paddingThreshold) {
LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
nextCursor, currentTail - nextCursor);
bufferPaddingExecutor.asyncPadding();
}
// cursor catch the tail, means that there is no more available UID to take
if (nextCursor == currentCursor) {
rejectedTakeHandler.rejectTakeBuffer(this);
}
// 1. check next slot flag is CAN_TAKE_FLAG
int nextCursorIndex = calSlotIndex(nextCursor);
Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
// 2. get UID from next slot
// 3. set next slot flag as CAN_PUT_FLAG.
long uid = slots[nextCursorIndex];
flags[nextCursorIndex].set(CAN_PUT_FLAG);
// Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
// slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
return uid;
}
CachedUidGenerator官方统计能提供600万/s的稳定吞吐量。缺点就是由于采用了预取的方式,ID中的时间信息意义丢失,不能表示ID生成的真实时间。
github地址:https://github.com/baidu/uid-generator
美团Leaf
Leaf 提供两种生成的ID的方式:号段模式和snowflake模式。
号段模式
业务每次获取一个号段(step决定大小)的值。用完之后再去数据库获取新的号段,各个业务不同的发号需要用biz_tag字段来区分,每个biz-tag的ID获取相互隔离,互不影响。
为了解决在号段消费完,因为等待最新号段、阻塞业务线程的问题,号段模式采用两个号段缓存——双buffer的方式。当前号段已下发10%时,如果下一个号段未更新,则另启一个更新线程去更新下一个号段。当前号段全部下发完后,如果下个号段准备好了则切换到下个号段为当前segment接着下发,循环往复。
核心代码如下:
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
while (true) {
try {
// 获取buffer的读锁
buffer.rLock().lock();
// 获取当前的号段
final Segment segment = buffer.getCurrent();
if ( // nextReady is false (下一个号段没有初始化.)
!buffer.isNextReady()
// idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 )
&& (segment.getIdle() < 0.9 * segment.getStep())
// buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作.
// 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段.
&& buffer.getThreadRunning().compareAndSet(false, true)
) {
// 放入线程池进行异步更新.
service.execute(new Runnable() {
@Override
public void run() {
Segment next = buffer.getSegments()[buffer.nextPos()];
boolean updateOk = false;
try {
updateSegmentFromDb(buffer.getKey(), next);
// 更新成功,设置标记位为true
updateOk = true;
logger.info("update segment {} from db {}", buffer.getKey(), next);
} catch (Exception e) {
logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
} finally {
if (updateOk) {
// 获取buffer 的写锁
buffer.wLock().lock();
// next准备完成
buffer.setNextReady(true);
// next运行标记位设置为false
buffer.getThreadRunning().set(false);
buffer.wLock().unlock();
} else {
buffer.getThreadRunning().set(false);
}
}
}
});
}
// 获取value
long value = segment.getValue().getAndIncrement();
// value < 当前号段的最大值,则返回改值
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
} finally {
buffer.rLock().unlock();
}
// 等待下一个号段执行完成,执行代码在-> execute()
// buffer.setNextReady(true);
// buffer.getThreadRunning().set(false);
waitAndSleep(buffer);
try {
// buffer 级别加写锁.
buffer.wLock().lock();
final Segment segment = buffer.getCurrent();
// 获取value -> 为什么重复获取value, 多线程执行时,在进行waitAndSleep() 后,
// 当前Segment可能已经被调换了.直接进行一次获取value的操作,可以提高id下发的速度(没必要再走一次循环),并且防止出错(在交换Segment前进行一次检查)
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
// 执行到这里, 其他的线程没有进行号段的调换,并且当前号段所有号码已经下发完成.
// 判断nextReady是否为true.
if (buffer.isNextReady()) {
// 调换segment
buffer.switchPos();
// 调换完成后, 设置nextReady为false
buffer.setNextReady(false);
} else {
// 进入这里的条件
// 1. 当前号段获取到的值大于maxValue
// 2. 另外一个号段还没有准备好
// 3. 等待时长大于waitAndSleep中的时间.
logger.error("Both two segments in {} are not ready!", buffer);
return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
}
} finally {
// finally代码块中释放写锁.
buffer.wLock().unlock();
}
}
}
该方案的缺点是ID是趋势递增的,同时ID号是可计算的,不适用于订单ID生成场景,比如竞对在两天中午12点分别下单,通过订单id号相减就能大致计算出公司一天的订单量,这个是不能忍受的。
snowflake方案
核心代码如下:
public synchronized Result get(String key) {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
wait(offset << 1);
timestamp = timeGen();
if (timestamp < lastTimestamp) {
return new Result(-1, Status.EXCEPTION);
}
} catch (InterruptedException e) {
LOGGER.error("wait interrupted");
return new Result(-2, Status.EXCEPTION);
}
} else {
return new Result(-3, Status.EXCEPTION);
}
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//seq 为0的时候表示是下一毫秒时间开始对seq做随机
sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//如果是新的ms开始
sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
return new Result(id, Status.SUCCESS);
}
通过代码可以发现生成方式和百度的DefaultUidGenerator类似,不同的地方是使用Zookeeper持久顺序节点的特性自动对snowflake节点配置wokerID。
业务会周期性向ZooKeeper上报当前机器节点时间,服务每次重启时会校验当前机器时间是否大于上次上传的机器时间。
该方案的缺点是引入ZooKeeper,维护成本比较高。为了降低对ZooKeeper的依赖,除了每次会去ZooKeeper拿数据以外,也会在本机文件系统上缓存一个workerID文件,当ZooKeeper出现问题,恰好机器出现问题需要重启时,能保证服务能够正常启动。