Redisson 中的 CommandExecutor

转自:https://github.com/angryz/my-blog/issues/6

上篇Redisson 分布式锁实现分析中提到了RedissonLock中的redis命令都是通过CommandExecutor来发送到redis服务执行的,本篇就来了解一下它的实现方式。

先来看其源码

public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor {
}

可以看到它同时继承了 同步和异步(sync/async) 两种调用方式。

Note:

  • 在分布式锁的实现中是用了同步的 CommandExecutor,是因为锁的获取和释放是有强一致性要求的,需要实时知道结果方可进行下一步操作。
  • 上篇分布式锁分析中我提到 Redisson 的同步实现实际上是基于异步实现的,这在下文中也会得到解释。

在Redisson中,除了提供同步和异步的方式执行命令之外,还通过 Reactive Streams 实现了 Reactive 方式的命令执行器。见下图

预备知识

Redisson 大量使用了 Redis 的 EVAL 命令来执行 Lua 脚本,所以先要对 EVAL 有所了解。

EVAL命令格式和示例

EVAL script numkeys key [key ...] arg [arg ...]

> eval "return redis.call('set',KEYS[1],ARGV[1])" 1 foo bar
OK

从 Redis 2.6.0 版本开始,通过内置的 Lua 解释器,可以使用 EVAL 命令对 Lua 脚本进行求值。

参数的说明本文不再详述,可查阅 Redis命令参考

重点是这个:Redis 使用单个 Lua 解释器去运行所有脚本,并且 Redis 也保证脚本会以原子性(atomic)的方式执行,当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。所以 Redisson 中使用了 EVAL 来保证执行命令操作数据时的安全性。

例子

这里就使用 Redisson 参考文档中的一个 RAtomicLong 对象的例子吧。

RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// 同步方式
longObject.compareAndSet(3, 401);
// 异步方式
longObject.compareAndSetAsync(3, 401);

RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive方式
longObject.compareAndSet(3, 401);

根据此例,我们分别来看 compareAndSet/compareAndSetAsync 的实现,其他命令原理都一样。

异步

既然同步和Reactive的实现都继承了异步的实现,那我们就先来看看CommandAsyncService吧。

例子中的 longObject.compareAndSetAsync(3, 401); 实际执行的是 RedissonAtomicLong 实现类的 compareAndSetAsync 方法,如下

public Future<Boolean> compareAndSetAsync(long expect, long update) {
    return commandExecutor.evalWriteAsync(getName(),
                                          StringCodec.INSTANCE,
                                          RedisCommands.EVAL_BOOLEAN,
                                          "...此处省略...",
                                          Collections.<Object>singletonList(getName()),
                                          expect, update);
}

此处的 evalWriteAsync 就是在 CommandAsyncService 中实现的,如下

public <T, R> Future<R> evalWriteAsync(String key,
                                       Codec codec,
                                       RedisCommand<T> evalCommandType,
                                       String script,
                                       List<Object> keys,
                                       Object ... params) {
    NodeSource source = getNodeSource(key);
    return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}

private <T, R> Future<R> evalAsync(NodeSource nodeSource,
                                   boolean readOnlyMode,
                                   Codec codec,
                                   RedisCommand<T> evalCommandType,
                                   String script,
                                   List<Object> keys,
                                   Object ... params) {
    Promise<R> mainPromise = connectionManager.newPromise();
    List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
    args.add(script);
    args.add(keys.size());
    args.addAll(keys);
    args.addAll(Arrays.asList(params));
    async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
    return mainPromise;
}

追根溯源,最后来看看 async 方法的实现,

protected <V, R> void async(final boolean readOnlyMode,
                            final NodeSource source,
                            final Codec codec,
                            final RedisCommand<V> command,
                            final Object[] params,
                            final Promise<R> mainPromise,
                            final int attempt) {
    // ....省略部分代码....
    // AsyncDetails 是一个包装对象,用来将异步调用过程中的对象引用包装起来方便使用
    final AsyncDetails<V, R> details = AsyncDetails.acquire();
    details.init(connectionFuture, attemptPromise,
            readOnlyMode, source, codec, command, params, mainPromise, attempt);

    // retryTimerTask 用来实现 Redisson 提供的重试机制
    final TimerTask retryTimerTask = new TimerTask() {

        @Override
        public void run(Timeout t) throws Exception {
            // ....省略部分代码....
            int count = details.getAttempt() + 1;
            // ....省略部分代码....
            async(details.isReadOnlyMode(), details.getSource(),
                    details.getCodec(), details.getCommand(),
                    details.getParams(), details.getMainPromise(), count);
            AsyncDetails.release(details);
        }
    };
    // 启用重试机制
    Timeout timeout = connectionManager.newTimeout(retryTimerTask,
            connectionManager.getConfig().getRetryInterval(),
            TimeUnit.MILLISECONDS);
    details.setTimeout(timeout);

    // checkConnectionFuture 用于检查客户端是否与服务端集群建立连接,如果连接建立
    // 则可发送命令到服务端执行
    if (connectionFuture.isDone()) {
        checkConnectionFuture(source, details);
    } else {
        connectionFuture.addListener(new FutureListener<RedisConnection>() {
            @Override
            public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
                checkConnectionFuture(source, details);
            }
        });
    }

    // ....省略部分代码....
}

private <R, V> void checkConnectionFuture(final NodeSource source,
        final AsyncDetails<V, R> details) {
    // ....省略部分代码....
    // 获取客户端与服务端集群建立的连接
    final RedisConnection connection = details.getConnectionFuture().getNow();

    if (details.getSource().getRedirect() == Redirect.ASK) {
        // 客户端接收到 ASK 转向, 先发送一个 ASKING 命令,然后再发送真正的命令请求
        // ....省略部分代码....
    } else {
        // ....省略部分代码....
        // 客户端发送命令到服务端
        ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(),
                details.getCodec(), details.getCommand(), details.getParams()));
        details.setWriteFuture(future);
    }
    // ....省略部分代码....
    // 释放本次连接
    releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(),
            details.getAttemptPromise(), details);
}

由于代码太长,我只贴出了和执行命令有关的部分代码,我们可以从上面代码中看到

  • Redisson 对每次操作都提供了重试机制,可配置 retryAttempts 来控制重试次数(缺省为3次),可配置 retryInterval 来控制重试间隔(缺省为 1000 ms)。Redisson 中使用了 Netty 的 TimerTaskTimeout 工具来实现其重试机制。
  • Redisson 中也大量使用了 Netty 实现的异步工具 FutureFutureListener,使得异步调用执行完成后能够立刻做出对应的操作。
  • RedissonConnection 是基于 Netty 实现的,发送命令的 send 方法实现是使用 Netty 的 Channel.writeAndFlush 方法。

以上便是 Redisson 的异步实现。

同步

Redisson 里的同步都是基于异步来实现的,为什么这么说,来看看 RedissonAtomicLongcompareAndSet方法,

public boolean compareAndSet(long expect, long update) {
    return get(compareAndSetAsync(expect, update));
}

可见是在之前的异步方法外套了一个 get 方法,而这个 get 方法实际上也是在异步实现类 CommandAsyncService 中实现的,至于同步的实现类 CommandSyncService 有兴趣大家可以去看看,基本上都是在异步实现返回的 Future 外套了一个 get 方法。那么我们就看看 get 的实现,

public <V> V get(Future<V> future) {
    final CountDownLatch l = new CountDownLatch(1);
    future.addListener(new FutureListener<V>() {
        @Override
        public void operationComplete(Future<V> future) throws Exception {
            l.countDown();
        }
    });
    try {
        // 阻塞当前线程
        l.await();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    if (future.isSuccess()) {
        return future.getNow();
    }
    throw convertException(future);
}

原来是利用了 CountDownLatch 在异步调用结果返回前将当前线程阻塞,然后通过 Netty 的 FutureListener在异步调用完成后解除阻塞,并返回调用结果。

Reactive

从例子中可以看到,Reactive 的客户端和对象实现都是独立的,先来看看 RedissonAtomicLongReactivecompareAndSet 方法,

public Publisher<Boolean> compareAndSet(long expect, long update) {
    return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE,
            RedisCommands.EVAL_BOOLEAN,
            "if redis.call('get', KEYS[1]) == ARGV[1] then "
                 + "redis.call('set', KEYS[1], ARGV[2]); "
                 + "return 1 "
               + "else "
                 + "return 0 end",
            Collections.<Object>singletonList(getName()), expect, update);
}

它调用的是 CommandReactiveService 中实现的 evalWriteReactive 方法,

public <T, R> Publisher<R> evalWriteReactive(String key, Codec codec,
        RedisCommand<T> evalCommandType, String script, List<Object> keys,
        Object... params) {
  Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
  return new NettyFuturePublisher<R>(f);
}

我们看到这里还是基于异步调用实现的,只是将异步调用返回的 Future 封装在了一个 NettyFuturePublisher 对象中返回,这个对象继承了 Reactive Streams 中的 Stream,所以我的解读也只能到此为止了,Reactive Streams 的相关知识目前我还不具备。

总结

  • Redisson 提供了 同步、异步 和 Reactive 三种命令执行方式。
  • 同步 和 Reactive 的实现是基于 异步 的实现的。
  • Redisson 使用 Netty 连接 Redis 服务,并依赖 Netty 异步工具类来实现异步通信、重试机制、阻塞等特性。
  • Redisson 使用 Reactive Streams 来实现 Reactive 特性。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351