目标
1、学习 Redis 的一些高级特性,包括发布订阅、事务、Lua 脚本
1、发布订阅模式
1.1列表的局限
前面我们说通过队列的 rpush 和 lpop 可以实现消息队列(队尾进队头出),但是消费者需要不停地调用 lpop 查看 List 中是否有等待处理的消息(比如写一个 while 循环)。
为了减少通信的消耗,可以 sleep()一段时间再消费,但是会有两个问题:
- 如果生产者生产消息的速度远大于消费者消费消息的速度,List 会占用大量的内存。
- 消息的实时性降低
list 还提供了一个阻塞的命令:blpop,没有任何元素可以弹出的时候,连接会被阻塞。blpop queue 5
,阻塞5秒。
基于 list 实现的消息队列,不支持一对多的消息分发。
1.2发布订阅模式
除了通过 list 实现消息队列之外,Redis 还提供了一组命令实现发布/订阅模式。
这种方式,发送者和接收者没有直接关联(实现了解耦),接收者也不需要持续尝试获取消息。
1.2.1 订阅频道
可以订阅一个或者多个频道。消息的发布者(生产者)可以给指定的频道发布消息。只要有消息到达了频道,所有订阅了这个频道的订阅者都会收到这条消息。
需要注意的注意是,发出去的消息不会被持久化,因为它已经从队列里面移除了,所以消费者只能收到它开始订阅这个频道之后发布的消息。
下面我们来看一下发布订阅命令的使用方法。
订阅者订阅频道:可以一次订阅多个,比如这个客户端订阅了 3 个频道。
subscribe channel-1 channel-2 channel-3
发布者可以向指定频道发布消息(并不支持一次向多个频道发送消息):
publish channel-1 2673
取消订阅(不能在订阅状态下使用):
unsubscribe channel-1
1.2.2 按规则(Pattern) 订阅频道
支持?和占位符。?代表一个字符,代表 0 个或者多个字符。
消费端 1,关注运动信息:
psubscribe *sport
消费端 2,关注所有新闻:
psubscribe news*
消费端 3,关注天气新闻:
psubscribe news-weather
生产者,发布 3 条信息
publish news-sport yaoming
publish news-music jaychou
publish news-weather rain
java 代码
package pubsub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class PublishSubscribe {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.0.224", 6379);
jedis.subscribe(new Subscriber(),"channel");
}
}
class Subscriber extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
System.out.println("接收到的消息:"+message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
}
@Override
public void unsubscribe(String... channels) {
super.unsubscribe(channels);
}
}
package pubsub;
import redis.clients.jedis.Jedis;
public class PublishTest {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.0.224", 6379);
jedis.publish("channel","你好呀");
jedis.close();
}
}
2 、Redis 事务
2.1 为什么要用事务
我们知道 Redis 的单个命令是原子性的(比如 get set mget mset),如果涉及到多个命令的时候,需要把多个命令作为一个不可分割的处理序列,就需要用到事务。
例如我们之前说的用 setnx 实现分布式锁,我们先 set,然后设置对 key 设置 expire,防止 del 发生异常的时候锁不会被释放,业务处理完了以后再 del,这三个动作我们就希望它们作为一组命令执行。
Redis 的事务有两个特点:
- 按进入队列的顺序执行。
- 不会受到其他客户端的请求的影响。
Redis 的事务涉及到四个命令:multi(开启事务),exec(执行事务),discard(取消事务),watch(监视)
2.2 事务的用法
案例场景:tom 和 mic 各有 1000 元,tom 需要向 mic 转账 100 元。
tom 的账户余额减少 100 元,mic 的账户余额增加 100 元。
127.0.0.1:6379> set tom 1000
OK
127.0.0.1:6379> set mic 1000
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby tom 100
QUEUED
127.0.0.1:6379> incrby mic 100
QUEUED
127.0.0.1:6379> exec
1) (integer) 900
2) (integer) 1100
127.0.0.1:6379> get tom
"900"
127.0.0.1:6379> get mic
"1100"
通过 multi 的命令开启事务。事务不能嵌套,多个 multi 命令效果一样。
multi 执行后,客户端可以继续向服务器发送任意多条命令, 这些命令不会立即被执行, 而是被放到一个队列中, 当 exec 命令被调用时, 所有队列中的命令才会被执行。
通过 exec 的命令执行事务。如果没有执行 exec,所有的命令都不会被执行。如果中途不想执行事务了,怎么办?可以调用 discard 可以清空事务队列,放弃执行。
multi
set k1 1
set k2 2
set k3 3
discard
2.3 watch 命令
在 Redis 中还提供了一个 watch 命令。
它可以为 Redis 事务提供 CAS 乐观锁行为(Check and Set / Compare and Swap),也就是多个线程更新变量的时候,会跟原值做比较,只有它没有被其他线程修改的情况下,才更新成新的值。
我们可以用 watch 监视一个或者多个 key,如果开启事务之后,至少有一个被监视key 键在 exec 执行之前被修改了, 那么整个事务都会被取消(key 提前过期除外)。可以用 unwatch 取消。
2.4 事务可能遇到的问题
我们把事务执行遇到的问题分成两种,一种是在执行 exec 之前发生错误,一种是在执行 exec 之后发生错误。
2.4.1 在执行 exec 之前发生错误
比如:入队的命令存在语法错误,包括参数数量,参数名等等(编译器错误)。
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set gupao 666
QUEUED
127.0.0.1:6379> hset qingshan 2673
(error) ERR wrong number of arguments for 'hset' command
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
在这种情况下事务会被拒绝执行,也就是队列中所有的命令都不会得到执行。
2.4.2 在执行 exec 之后发生错误
比如,类型错误,比如对 String 使用了 Hash 的命令,这是一种运行时错误。
127.0.0.1:6379> flushall
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 1
QUEUED
127.0.0.1:6379> hset k1 a b
QUEUED
127.0.0.1:6379> exec
1) OK
2) (error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> get k1
"1"
最后我们发现 set k1 1 的命令是成功的,也就是在这种发生了运行时异常的情况下,只有错误的命令没有被执行,但是其他命令没有受到影响。
这个显然不符合我们对原子性的定义,也就是我们没办法用 Redis 的这种事务机制来实现原子性,保证数据的一致。
思考(作业):
为什么在一个事务中存在错误,Redis 不回滚?
3 Lua 脚本
Lua是一种轻量级脚本语言,它是用 C 语言编写的,跟数据的存储过程有点类似。 使用 Lua 脚本来执行 Redis 命令的好处:
- 一次发送多个命令,减少网络开销。
- Redis 会将整个脚本作为一个整体执行,不会被其他请求打断,保持原子性。
- 对于复杂的组合命令,我们可以放在文件中,可以实现程序之间的命令集复用。
3.1 在 Redis 中调用 Lua 脚本
使用 eval 方法,语法格式:
redis> eval lua-script key-num [key1 key2 key3 ....] [value1 value2 value3 ....]
eval 代表执行 Lua 语言的命令。
lua-script 代表 Lua 语言脚本内容。
key-num 表示参数中有多少个 key, 需要注意的是 Redis 中 key 是从 1 开始的, 如果没有 key 的参数, 那么写 0。
[key1 key2 key3…]是 key 作为参数传递给 Lua 语言, 也可以不填, 但是需要和 key-num 的个数对应起来。
[value1 value2 value3 ….]这些参数传递给 Lua 语言, 它们是可填可不填的。
示例,返回一个字符串,0 个参数:
redis> eval "return 'Hello World'" 0
3.2 在 Lua 脚本中调用 Redis 命令
使用 redis.call(command, key [param1, param2…])进行操作。语法格式:
redis> eval "redis.call('set',KEYS[1],ARGV[1])" 1 lua-key lua-value
command 是命令, 包括 set、 get、 del 等。
key 是被操作的键。
param1,param2…代表给 key 的参数。
注意跟 Java 不一样,定义只有形参,调用只有实参。
Lua 是在调用时用 key 表示形参,argv 表示参数值(实参)。
3.2.1 设置键值对
在 Redis 中调用 Lua 脚本执行 Redis 命令
以上命令等价于 set gupao 2673
在 redis-cli 中直接写 Lua 脚本不够方便,也不能实现编辑和复用,通常我们会把脚本放在文件里面,然后执行这个文件。
3.2.2 在 Redis 中调用 Lua 脚本文件中的命令, 操作 Redis
创建 Lua 脚本文件:
cd /usr/local/soft/redis5.0.5/src
vim gupao.lua
Lua 脚本内容,先设置,再取值:
redis.call('set','gupao','lua666')
return redis.call('get','gupao')
在 Redis 客户端中调用 Lua 脚本
cd /usr/local/soft/redis5.0.5/src
redis-cli --eval gupao.lua 0
得到返回值:
[root@localhost src]# redis-cli --eval gupao.lua 0
"lua666"
3.2.3 案例: 对 IP 进行限流
需求:某个IP,在 X 秒内只能访问 Y 次。
设计思路:用 key 记录 IP,用 value 记录访问次数。拿到 IP 以后,对 IP+1。如果是第一次访问,对 key 设置过期时间(参数 1)。否则判断次数,超过限定的次数(参数 2),返回 0。如果没有超过次数则返回 1。超过时间,key 过期之后,可以再次访问。KEY[1]是 IP, ARGV[1]是过期时间 X,ARGV[2]是限制访问的次数 Y。
-- ip_limit.lua
-- IP 限流, 对某个 IP 频率进行限制 , 6 秒钟访问 10 次
local num=redis.call('incr',KEYS[1])
if tonumber(num)==1 then
redis.call('expire',KEYS[1],ARGV[1])
return 1
elseif tonumber(num)>tonumber(ARGV[2]) then
return 0
else
return 1
end
6 秒钟内限制访问 10 次,调用测试(连续调用 10 次):
./redis-cli --eval "ip_limit.lua" app:ip:limit:192.168.8.111 , 6 10
app:ip:limit:192.168.8.111 是 key 值 ,后面是参数值,中间要加上一个空格 和
一个逗号,再加上一个 空格 。
即:./redis-cli –eval [lua 脚本] [key…]空格,空格[args…]
多个参数之间用一个 空格 分割
3.2.4 缓存 Lua 脚本
为什么要缓存
在脚本比较长的情况下,如果每次调用脚本都需要把整个脚本传给 Redis 服务端,会产生比较大的网络开销。为了解决这个问题,Redis 提供了 EVALSHA 命令,允许开发者通过脚本内容的 SHA1 摘要来执行脚本。
如何缓存
Redis 在执行 script load 命令时会计算脚本的 SHA1 摘要并记录在脚本缓存中,执行 EVALSHA 命令时 Redis 会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了则执行脚本,否则会返回错误:"NOSCRIPT No matching script. Please use EVAL."
127.0.0.1:6379> script load "return 'Hello World'"
"470877a599ac74fbfda41caa908de682c5fc7d4b"
127.0.0.1:6379> evalsha "470877a599ac74fbfda41caa908de682c5fc7d4b" 0
"Hello World"
自乘案例
Redis 有 incrby 这样的自增命令,但是没有自乘,比如乘以 3,乘以 5。
我们可以写一个自乘的运算,让它乘以后面的参数:
local curVal = redis.call("get", KEYS[1])
if curVal == false then
curVal = 0
else
curVal = tonumber(curVal)
end
curVal = curVal * tonumber(ARGV[1])
redis.call("set", KEYS[1], curVal)
return curVal
把这个脚本变成单行,语句之间使用分号隔开
local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal = tonumber(curVal) end; curVal
= curVal * tonumber(ARGV[1]); redis.call("set", KEYS[1], curVal); return curVal
script load '命令'
127.0.0.1:6379> script load 'local curVal = redis.call("get", KEYS[1]); if curVal == false then curVal = 0 else curVal =
tonumber(curVal) end; curVal = curVal * tonumber(ARGV[1]); redis.call("set", KEYS[1], curVal); return curVal'
"be4f93d8a5379e5e5b768a74e77c8a4eb0434441"
调用:
127.0.0.1:6379> set num 2
OK
127.0.0.1:6379> evalsha be4f93d8a5379e5e5b768a74e77c8a4eb0434441 1 num 6
(integer) 12
3.2.5 脚本超时
Redis 的指令执行本身是单线程的,这个线程还要执行客户端的 Lua 脚本,如果 Lua脚本执行超时或者陷入了死循环,是不是没有办法为客户端提供服务了呢?
eval 'while(true) do end' 0
为 了防 止 某个 脚本 执 行时 间 过长 导 致 Redis 无 法提 供 服务 , Redis 提 供 了lua-time-limit 参数限制脚本的最长运行时间,默认为 5 秒钟。
lua-time-limit 5000(redis.conf 配置文件中)
当脚本运行时间超过这一限制后,Redis 将开始接受其他命令但不会执行(以确保脚本的原子性,因为此时脚本并没有被终止),而是会返回“BUSY”错误。
Redis 提供了一个 script kill 的命令来中止脚本的执行。新开一个客户端:
script kill
如果当前执行的 Lua 脚本对 Redis 的数据进行了修改(SET、DEL 等),那么通过script kill 命令是不能终止脚本运行的。
127.0.0.1:6379> eval "redis.call('set','gupao','666') while true do end" 0
因为要保证脚本运行的原子性,如果脚本执行了一部分终止,那就违背了脚本原子性的要求。最终要保证脚本要么都执行,要么都不执行。
127.0.0.1:6379> script kill
(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
遇到这种情况,只能通过 shutdown nosave 命令来强行终止 redis。
shutdown nosave 和 shutdown 的区别在于 shutdown nosave 不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。
总结:如果我们有一些特殊的需求,可以用 Lua 来实现,但是要注意那些耗时的操作。