聊聊CRDT

本文主要研究一下CRDT

CRDT

CRDT是Conflict-free Replicated Data Type的简称,也称为a passive synchronisation,即免冲突的可复制的数据类型,这种数据类型可以用于数据跨网络复制并且可以自动解决冲突达到一致,非常适合使用AP架构的系统在各个partition之间复制数据时使用;具体实现上可以分为State-based的CvRDT、Operation-based的CmRDT、Delta-based、Pure operation-based等

Consistency without Consensus,guarantee convergence to the same value in spite of network delays, partitions and message reordering

State-based(CvRDT)

  • CvRDT即Convergent Replicated Data Type的简称,也称为an active synchronisation,通常在诸如NFS, AFS, Coda的文件系统以及诸如Riak, Dynamo的KV存储中使用
  • 这种方式是通过传递整个object的states来完成,需要定义一个merge函数来合并输入的object states
  • 该merge函数需要满足commutative及idempotent,即monotonically increasing,以做到可以重试及与order无关

Operation-based(CmRDT)

  • CmRDT即Commutative Replicated Data Type的简称,通常在诸如Bayou, Rover, IceCube, Telex的cooperative systems中使用
  • 这种方式是通过传递operations来完成,需要prepare方法生成operations,以及effect方法将输入的operations表示的变更作用在local state中
  • 这里要求传输协议是可靠的,如果可能重复传输则要求effect是幂等的,而且对order有一定要求,如果不能保证order则需要effect叠加在一起是or的效果

Delta-based

Delta-based可以理解为是结合State-based及Operation-based的一种改进,它通过delta-mutators来进行replicate

Pure operation-based

通常Operation-based的方式需要prepare方法生成operations,这里可能存在延时,Pure operation-based是指prepare的实现不是通过对比state生成operations,而是仅仅返回现成的operations,这就需要记录每一步对object state操作的operations

Convergent Operations

对于CRDT来说,为了实现Conflict-free Replicated对数据结构的一些操作需要满足如下条件:

  • Associative

(a+(b+c)=(a+b)+c),即grouping没有影响

  • Commutative

(a+b=b+a),即order没有影响

  • Idempotent

(a+a=a),即duplication没有影响(幂等)

基本数据类型

CRDT的基本数据类型有Counters、Registers、Sets

Counters

  • Grow-only counter(G-Counter)

使用max函数来进行merge

  • Positive-negative counter(PN-Counter)

使用两个G-Counter来实现,一个用于递增,一个用于递减,最后取值进行sum

Registers

register有assign()及value()两种操作

  • Last Write Wins -register(LWW-Register)

给每个assign操作添加unique ids,比如timestamps或者vector clock,使用max函数进行merge

  • Multi-valued -register(MV-Register)

类似G-Counter,每次assign都会新增一个版本,使用max函数进行merge

Sets

  • Grow-only set(G-Set)

使用union操作进行merge

  • Two-phase set(2P-Set)

使用两个G-Set来实现,一个addSet用于添加,一个removeSet用于移除

  • Last write wins set(LWW-element Set)

类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了timestamp信息,且timestamp较高的add及remove优先

  • Observed-remove set(OR-Set)

类似2P-Set,有一个addSet,一个removeSet,不过对于元素增加了tag信息,对于同一个tag的操作add优先于remove

其他数据类型

Array

关于Array有Replicated Growable Array(RGA),支持addRight(v, a)操作

Graph

Graph可以基于Sets结构实现,不过需要处理并发的addEdge(u, v)、removeVertex(u)操作

Map

Map需要处理并发的put、rmv操作

实例

这里使用wurmloch-crdt的实现

GCounter

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GCounter.java

public class GCounter extends AbstractCrdt<GCounter, GCounter.UpdateCommand> {

    // fields
    private Map<String, Long> entries = HashMap.empty();


    // constructor
    public GCounter(String nodeId, String crdtId) {
        super(nodeId, crdtId, BehaviorProcessor.create());
    }


    // crdt
    @Override
    protected Option<UpdateCommand> processCommand(UpdateCommand command) {
        final Map<String, Long> oldEntries = entries;
        entries = entries.merge(command.entries, Math::max);
        return entries.equals(oldEntries)? Option.none() : Option.of(new UpdateCommand(crdtId, entries));
    }


    // core functionality
    public long get() {
        return entries.values().sum().longValue();
    }

    public void increment() {
        increment(1L);
    }

    public void increment(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        entries = entries.put(nodeId, entries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                entries
        ));
    }

    //......
}
  • 这里GCounter使用HashMap来实现,其processCommand接收UpdateCommand,采用HashMap的merge方法进行合并,其中BiFunction为Math::max;get()方法对entries.values()进行sum得出结果

PNCounter

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/PNCounter.java

public class PNCounter extends AbstractCrdt<PNCounter, PNCounter.UpdateCommand> {

    // fields
    private Map<String, Long> pEntries = HashMap.empty();
    private Map<String, Long> nEntries = HashMap.empty();


    // constructor
    public PNCounter(String nodeId, String crtdId) {
        super(nodeId, crtdId, BehaviorProcessor.create());
    }


    // crdt
    protected Option<UpdateCommand> processCommand(PNCounter.UpdateCommand command) {
        final Map<String, Long> oldPEntries = pEntries;
        final Map<String, Long> oldNEntries = nEntries;
        pEntries = pEntries.merge(command.pEntries, Math::max);
        nEntries = nEntries.merge(command.nEntries, Math::max);
        return pEntries.equals(oldPEntries) && nEntries.equals(oldNEntries)? Option.none()
                : Option.of(new UpdateCommand(crdtId, pEntries, nEntries));
    }


    // core functionality
    public long get() {
        return pEntries.values().sum().longValue() - nEntries.values().sum().longValue();
    }

    public void increment() {
        increment(1L);
    }

    public void increment(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        pEntries = pEntries.put(nodeId, pEntries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                pEntries,
                nEntries
        ));
    }

    public void decrement() {
        decrement(1L);
    }

    public void decrement(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        nEntries = nEntries.put(nodeId, nEntries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                pEntries,
                nEntries
        ));
    }

    //......
}
  • 这里PNCounter使用了两个HashMap来实现,其中pEntries用于递增,nEntries用于递减;processCommand采用HashMap的merge方法分别对pEntries及nEntries进行合并,其中BiFunction为Math::max;get()方法则使用pEntries.values()的sum减去nEntries.values()的sum

LWWRegister

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/LWWRegister.java

public class LWWRegister<T> extends AbstractCrdt<LWWRegister<T>, LWWRegister.SetCommand<T>> {

    // fields
    private T value;
    private StrictVectorClock clock;


    // constructor
    public LWWRegister(String nodeId, String crdtId) {
        super(nodeId, crdtId, BehaviorProcessor.create());
        this.clock = new StrictVectorClock(nodeId);
    }


    // crdt
    protected Option<SetCommand<T>> processCommand(SetCommand<T> command) {
        if (clock.compareTo(command.getClock()) < 0) {
            clock = clock.merge(command.getClock());
            doSet(command.getValue());
            return Option.of(command);
        }
        return Option.none();
    }


    // core functionality
    public T get() {
        return value;
    }

    public void set(T newValue) {
        if (! Objects.equals(value, newValue)) {
            doSet(newValue);
            commands.onNext(new SetCommand<>(
                    crdtId,
                    value,
                    clock
            ));
        }
    }


    // implementation
    private void doSet(T value) {
        this.value = value;
        clock = clock.increment();
    }

    //......
}
  • 这里LWWRegister使用了StrictVectorClock,其processCommand接收SetCommand,它在本地clock小于command.getClock()会先merge clock,然后执行doSet更新value,同时更新本地为clock.increment()

MVRegister

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/MVRegister.java

public class MVRegister<T> extends AbstractCrdt<MVRegister<T>, MVRegister.SetCommand<T>> {

    // fields
    private Array<Entry<T>> entries = Array.empty();


    // constructor
    public MVRegister(String nodeId, String crdtId) {
        super(nodeId, crdtId, ReplayProcessor.create());
    }


    // crdt
    protected Option<SetCommand<T>> processCommand(SetCommand<T> command) {
        final Entry<T> newEntry = command.getEntry();
        if (!entries.exists(entry -> entry.getClock().compareTo(newEntry.getClock()) > 0
                || entry.getClock().equals(newEntry.getClock()))) {
            final Array<Entry<T>> newEntries = entries
                    .filter(entry -> entry.getClock().compareTo(newEntry.getClock()) == 0)
                    .append(newEntry);
            doSet(newEntries);
            return Option.of(command);
        }
        return Option.none();
    }


    // core functionality
    public Array<T> get() {
        return entries.map(Entry::getValue);
    }

    public void set(T newValue) {
        if (entries.size() != 1 || !Objects.equals(entries.head().getValue(), newValue)) {
            final Entry<T> newEntry = new Entry<>(newValue, incVV());
            doSet(Array.of(newEntry));
            commands.onNext(new SetCommand<>(
                    crdtId,
                    newEntry
            ));
        }
    }


    // implementation
    private void doSet(Array<Entry<T>> newEntries) {
        entries = newEntries;
    }

    private VectorClock incVV() {
        final Array<VectorClock> clocks = entries.map(Entry::getClock);
        final VectorClock mergedClock = clocks.reduceOption(VectorClock::merge).getOrElse(new VectorClock());
        return mergedClock.increment(nodeId);
    }

    //......
}
  • 这里LWWRegister使用了Array以及StrictVectorClock,其processCommand接收SetCommand,它在没有entry的clock大于或者equals newEntry.getClock()时会创建新的newEntries,该newEntries不包含clock与newEntry.getClock()等值的entry,同时加入了newEntry,最后使用doSet赋值给本地的entries

GSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GSet.java

public class GSet<E> extends AbstractSet<E> implements Crdt<GSet<E>, GSet.AddCommand<E>> {

    // fields
    private final String crdtId;
    private final Set<E> elements = new HashSet<>();
    private final Processor<AddCommand<E>, AddCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public GSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super AddCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends AddCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<AddCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);
        });
    }

    private Option<AddCommand<E>> processCommand(AddCommand<E> command) {
        return doAdd(command.getElement())? Option.of(command) : Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return elements.size();
    }

    @Override
    public Iterator<E> iterator() {
        return new GSetIterator();
    }

    @Override
    public boolean add(E element) {
        commands.onNext(new AddCommand<>(crdtId, element));
        return doAdd(element);
    }


    // implementation
    private synchronized boolean doAdd(E element) {
        return elements.add(element);
    }

    //......
}
  • 这里GSet使用Set来实现,其processCommand接收AddCommand,其doAdd方法使用Set的add进行合并

TwoPhaseSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/TwoPSet.java

public class TwoPSet<E> extends AbstractSet<E> implements Crdt<TwoPSet<E>, TwoPSet.TwoPSetCommand<E>> {

    // fields
    private final String crdtId;
    private final Set<E> elements = new HashSet<>();
    private final Set<E> tombstone = new HashSet<>();
    private final Processor<TwoPSetCommand<E>, TwoPSetCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public TwoPSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "CrdtId must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super TwoPSetCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends TwoPSetCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<TwoPSetCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);

        });
    }

    private Option<TwoPSetCommand<E>> processCommand(TwoPSetCommand<E> command) {
        if (command instanceof TwoPSet.AddCommand) {
            return doAdd(((TwoPSet.AddCommand<E>) command).getElement())? Option.of(command) : Option.none();
        } else if (command instanceof TwoPSet.RemoveCommand) {
            return doRemove(((TwoPSet.RemoveCommand<E>) command).getElement())? Option.of(command) : Option.none();
        }
        return Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return elements.size();
    }

    @Override
    public Iterator<E> iterator() {
        return new TwoPSetIterator();
    }

    @Override
    public boolean add(E value) {
        final boolean changed = doAdd(value);
        if (changed) {
            commands.onNext(new TwoPSet.AddCommand<>(crdtId, value));
        }
        return changed;
    }


    // implementation
    private boolean doAdd(E value) {
        return !tombstone.contains(value) && elements.add(value);
    }

    private boolean doRemove(E value) {
        return tombstone.add(value) | elements.remove(value);
    }

    //......
}
  • 这里TwoPSet使用了两个Set来实现,其中elements用于add,tombstone用于remove;其processCommand方法接收TwoPSetCommand,它有TwoPSet.AddCommand及TwoPSet.RemoveCommand两个子类,两个command分别对应doAdd及doRemove方法;doAdd要求tombstone不包含该元素并往elements添加元素;doRemove往tombstone添加元素并从elements移除元素

ORSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/ORSet.java

public class ORSet<E> extends AbstractSet<E> implements Crdt<ORSet<E>, ORSet.ORSetCommand<E>> /*, ObservableSet<E> */ {

    // fields
    private final String crdtId;
    private final Set<Element<E>> elements = new HashSet<>();
    private final Set<Element<E>> tombstone = new HashSet<>();
    private final Processor<ORSetCommand<E>, ORSetCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public ORSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super ORSetCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends ORSetCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<ORSetCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);

        });
    }

    private Option<ORSetCommand<E>> processCommand(ORSetCommand<E> command) {
        if (command instanceof AddCommand) {
            return doAdd(((AddCommand<E>) command).getElement())? Option.of(command) : Option.none();
        } else if (command instanceof RemoveCommand) {
            return doRemove(((RemoveCommand<E>) command).getElements())? Option.of(command) : Option.none();
        }
        return Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return doElements().size();
    }

    @Override
    public Iterator<E> iterator() {
        return new ORSetIterator();
    }

    @Override
    public boolean add(E value) {
        final boolean contained = doContains(value);
        prepareAdd(value);
        return !contained;
    }


    // implementation
    private static <U> Predicate<Element<U>> matches(U value) {
        return element -> Objects.equals(value, element.getValue());
    }

    private synchronized boolean doContains(E value) {
        return elements.parallelStream().anyMatch(matches(value));
    }

    private synchronized Set<E> doElements() {
        return elements.parallelStream().map(Element::getValue).collect(Collectors.toSet());
    }

    private synchronized void prepareAdd(E value) {
        final Element<E> element = new Element<>(value, UUID.randomUUID());
        commands.onNext(new AddCommand<>(getCrdtId(), element));
        doAdd(element);
    }

    private synchronized boolean doAdd(Element<E> element) {
        return (elements.add(element) | elements.removeAll(tombstone)) && (!tombstone.contains(element));
    }

    private synchronized void prepareRemove(E value) {
        final Set<Element<E>> removes = elements.parallelStream().filter(matches(value)).collect(Collectors.toSet());
        commands.onNext(new RemoveCommand<>(getCrdtId(), removes));
        doRemove(removes);
    }

    private synchronized boolean doRemove(Collection<Element<E>> removes) {
        return elements.removeAll(removes) | tombstone.addAll(removes);
    }

    //......
}
  • 这里ORSet使用了两个Set来实现,其中elements用于add,tombstone用于remove;其processCommand方法接收ORSetCommand,它有ORSet.AddCommand及ORSet.RemoveCommand两个子类,两个command分别对应doAdd及doRemove方法;doAdd方法首先执行prepareAdd使用UUID创建element,然后往elements添加元素,移除tombstone;doRemove方法首先执行prepareRemove找出需要移除的element集合removes,然后从elements移除removes并往tombstone添加removes

小结

  • CRDT是Conflict-free Replicated Data Type的简称,也称为a passive synchronisation,即免冲突的可复制的数据类型;具体实现上可以分为State-based的CvRDT、Operation-based的CmRDT、Delta-based、Pure operation-based等
  • CvRDT即Convergent Replicated Data Type的简称,也称为an active synchronisation,通常在诸如NFS, AFS, Coda的文件系统以及诸如Riak, Dynamo的KV存储中使用;CvRDT即Convergent Replicated Data Type的简称,也称为an active synchronisation,通常在诸如NFS, AFS, Coda的文件系统以及诸如Riak, Dynamo的KV存储中使用
  • 对于CRDT来说,为了实现Conflict-free Replicated要求对数据结构的操作是Convergent,即需要满足Associative、Commutative及Idempotent;CRDT的基本数据类型有Counters(G-Counter、PN-Counter)、Registers(LWW-Register、MV-Register)、Sets(`G-Set、2P-Set、LWW-element Set、OR-Set``)

doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352