前段时间做的开发有缓存数据持久化的需求,最终采用了redis,实行效果还不错,聊做总结。
〇、技术选型
要解决问题首先要分析问题。应用角色是移动物联网平台OneNET和公司SCADA系统MQ中间件的数据透传worker,但是从MQ到OneNET的命令需要暂存,等OneNET平台推送设备上线消息后才能下发,OneNET上挂设备数量可能会有数十万只,且设备上线后1~2分钟未收到命令就会下线。可以看出这些暂存命令需要极高的数据一致性,不然给用户多充了几次钱或者没充上、说好了要开阀结果没开或者关阀没关上,很容易影响用户体验和公司利益。
鉴于透传命令是结构简单的二进制字符串,且对于每个设备都需要有设备信息哈希表和暂存命令队列,但既不需要做复杂的SQL查询也不需要事务等特性支持,因此
根据实际情况,考虑以下几点:
- 每个设备需要有对应的设备信息哈希表和暂存命令队列,有嵌套数据结构,透传命令是二进制字符串;
- 数据需要备份容灾,且有一致性需求;
- 不需要进行复杂SQL查询也不需要ACID事务等RDMS特性支持。
可以得出redis十分适合此场景的结论:
- redis具备较多的数据结构,其中list和hash可以联合解决实际的嵌套数据结构问题,只需要做一次映射即可,不用序列化;
- redis具备AOF数据持久化策略,采用always选项可以获得极高的一致性保证;
- redis是NoSQL内存数据库,抛弃掉冗余的RDMS特性可以获得更高的性能。
一、redis的编译和部署
由于生产环境是Windows,开发语言采用C++,因此采用微软改写存档的hiredis on Windows,版本采用的是3.0,发布时采用的是vs2013,我用vs2015直接迁移可以成功编译,十分顺利,但是我的机器是win10,后续发布到xp时才发现其源码里有致命的逻辑错误,我的另一篇文章里面进行了详细说明,如果你需要xp兼容的话可以去看一下:windows xp下redis客户端无法连接服务端。
由于redis自己的依赖库都是静态库,因此易于使用,编译后把redis-server.exe和redis-cli.exe拷到部署环境即可,可以选择命令行方式运行或者Windows服务方式运行,通过xxx.conf选项来选择要加载的redis配置文件。
e.g. redis-server redis.conf
or redis-server --service-install redis.service.conf
。
- NOTE:说到配置文件,有一个地方需要特别说明,微软存档的conf文件里logfile选项值的格式是错误的,这会导致加载配置文件时redis无法正常启动,需要手动修改conf文件的logfile选项为合法路径。
二、redis的API使用
可调用的API都在hiredis.h里了,命名都很清晰,可以顾名思义,需要注意的有以下几点:
- 注意多线程客户端场景下每个线程需要单独拥有一个redis连接(redisContext);
- 建议不要用redisConnect而是redisConnectWithTimeout;
- 返回的void*类型的redisReply*对象需要手动释放(freeReplyObject);
- redisContext*对象使用完毕后也需要手动释放;
- 如果返回的redisReply*对象为空,说明与redis服务器的连接中断,需要重连或排查问题;
- redisReply::str可能为空,在访问之前注意判定;
- 多条redis指令在不是特别在意事务性的情况下建议采用pipeline的方式提高效率。
贴一些自己的实际应用代码(有部分Qt API)以供参考。
Redis连接池单例类:
#pragma once
/* Redis connection pool S/ingleT/on
* @file RedisST.h
* @brief Redis连接池单例类
* @author S.K.
* @date 6.11.2018 */
#include <queue>
#include <mutex>
#include <memory>
#include <functional>
#include <WinSock2.h>
#include "./redis/include/hiredis.h"
#include "./redis/include/win32_types_hiredis.h"
#ifdef _DEBUG
#pragma comment(lib,"./redis/x86d/hiredis.lib")
#pragma comment(lib,"./redis/x86d/Win32_Interop.lib")
#else
#pragma comment(lib,"./redis/x86/select/hiredis.lib")
#pragma comment(lib,"./redis/x86/select/Win32_Interop.lib")
#endif // _DEBUG
class RedisST
{
public:
/* 用于管理单例、连接、连接池和命令响应资源的智能指针类型
* @usage
* RedisST::SmartST inst = RedisST::instance();
* RedisST::SmartConn conn(inst->get_conn(), [inst](redisContext* ctx){ inst->idle_conn(ctx); });
* RedisST::SmartReply reply(RedisST::exec_cmd(conn.get(), "set k v"), RedisST::free_reply); */
using SmartST = std::shared_ptr<RedisST>;
using SmartConn = std::unique_ptr<redisContext, std::function<void(redisContext*)>>;
using SmartReply = std::unique_ptr<redisReply, std::function<void(redisReply*)>>;
RedisST(const RedisST&) = delete;
RedisST& operator=(const RedisST&) = delete;
~RedisST();
/* 初始化唯一实例和redis地址
* @param redis服务器IP和端口 */
static void init(const char* ip, int port);
static SmartST instance();
redisContext* get_conn();
/* 将使用完毕的redis连接放回连接池 */
void idle_conn(redisContext* ctx);
static redisReply* exec_cmd(redisContext* ctx, const char* cmd);
/* @note 当reply为空时说明连接异常 */
static bool validate_reply(SmartReply& reply);
static void free_reply(redisReply* reply);
private:
static SmartST st_;
RedisST();
private:
static char ip_[16];
static int port_;
std::mutex mutex_;
std::queue<redisContext*> pool_;
};
#include "RedisST.h"
RedisST::SmartST RedisST::st_;
char RedisST::ip_[16] = "127.0.0.1";
int RedisST::port_ = 6379;
RedisST::RedisST()
{
}
RedisST::~RedisST()
{
while (!pool_.empty())
{
redisFree(pool_.front());
pool_.pop();
}
}
void RedisST::init(const char * ip, int port)
{
strcpy_s(ip_, ip);
port_ = port;
if (!st_) st_.reset(new RedisST());
}
RedisST::SmartST RedisST::instance()
{
return st_;
}
redisContext * RedisST::get_conn()
{
redisContext* ctx = nullptr;
if (!pool_.empty())
{// double check
mutex_.lock();
if (!pool_.empty())
{
ctx = pool_.front();
pool_.pop();
}
mutex_.unlock();
}
if (!ctx || ctx->err)
{
if (ctx) redisFree(ctx);
timeval tv{ 3,0 };
ctx = redisConnectWithTimeout(ip_, port_, tv);
}
return ctx;
}
void RedisST::idle_conn(redisContext * ctx)
{
mutex_.lock();
pool_.push(ctx);
mutex_.unlock();
}
redisReply * RedisST::exec_cmd(redisContext* ctx, const char * cmd)
{
return static_cast<redisReply*>(redisCommand(ctx, cmd));
}
bool RedisST::validate_reply(SmartReply & reply)
{
return reply && (reply->type != REDIS_REPLY_ERROR);
}
void RedisST::free_reply(redisReply * reply)
{
freeReplyObject(reply);
}
部分应用:
...
// some sample code
// redis data structure:
// hash:{key:dev_id,status:online or offline,imei:imei}
// list:{key:imei,value:commands}
// list key(imei) is mapped to hash(key is dev id) field(imei) value
// one imei is singly corresponding to one dev id and both of them are unique
void HttpWorker::HandleCMRCommand(QString imei, QString obj_id, QByteArray cmd)
{
qInfo("###Received new command:###\n--> %s %s %s", qPrintable(imei), qPrintable(obj_id), cmd.constData());
// m_inst is a member instance of class HttpWorker
// m_inst = RedisST::instance();
RedisST::SmartConn conn(m_inst->get_conn(), [this](redisContext* ctx) { m_inst->idle_conn(ctx); });
// device exists?->add property table->command enqueue->cache eliminate
QString redis_cmd = QString("hexists %1 imei").arg(obj_id);
RedisST::SmartReply reply(RedisST::exec_cmd(conn.get(), qPrintable(redis_cmd)), RedisST::free_reply);
if (!RedisST::validate_reply(reply)) goto _err_;
if (!reply->integer)
{
redis_cmd = QString("hset %1 imei %2").arg(obj_id, imei);
reply.reset(RedisST::exec_cmd(conn.get(), qPrintable(redis_cmd)));
if (!RedisST::validate_reply(reply)) goto _err_;
}
redis_cmd = QString("rpush %1 %2").arg(imei, (QString)cmd);
reply.reset(RedisST::exec_cmd(conn.get(), qPrintable(redis_cmd)));
if (!RedisST::validate_reply(reply)) goto _err_;
redis_cmd = QString("llen %1").arg(imei);
reply.reset(RedisST::exec_cmd(conn.get(), qPrintable(redis_cmd)));
if (!RedisST::validate_reply(reply)) goto _err_;
if (reply->integer > m_maxCmds)
{
redis_cmd = QString("lpop %1").arg(imei);
reply.reset(RedisST::exec_cmd(conn.get(), qPrintable(redis_cmd)));
if (!RedisST::validate_reply(reply) || !reply->str) goto _err_;
qWarning("Eliminate a cmd for device %s: %s", qPrintable(obj_id), reply->str);
}
return;
_err_:
qWarning("Perform redis cmd failed[%s]: %s", reply ? reply->str : "connect interrupted", qPrintable(redis_cmd));
}
三、redis的持久化策略与并发处理
redis的持久化策略有两种,一是RDB方式(Redis DataBase),二是AOF方式(AppendOnly File),前者每隔一段时间或每经一定数量的数据库更改操作fork一个子进程备份当前时间点的快照,后者把对数据库的每次更改操作写入AOF文件,当AOF文件过大时会fork子进程进行重写。
- RDB优势:
更好的性能,更快的加载速度(启动时),按计划备份。 - RDB劣势:
较差的数据安全性,最多可能会丢失两次快照间的所有数据更改;
每次备份时fork出的子进程会导致redis短暂的服务无响应状态(依据数据量和CPU性能从毫秒级到秒级)。 - 相关配置:
# 900/300/60秒内如果有至少1/10/10000次更改时进行备份
save 900 1
save 300 10
save 60 10000
# 当无法正常备份时是否允许客户端进行更改redis数据的操作
stop-writes-on-bgsave-error yes
# 是否压缩
rdbcompression yes
# 是否校验RDB文件
rdbchecksum yes
# RDB文件名
dbfilename dump.rdb
# RDB文件路径
dir ./
- AOF优势:
更好的数据安全性,容灾效果好,极小的fork影响(重写AOF文件的频率很低)。 - AOF劣势:
性能较差,这在每次更改都fsync时影响最为明显,但这也带来了最高的安全性;
AOF文件总是要比RDB大。 - 相关配置:
# 是否开启AOF
appendonly yes
# AOF策略(always/everysec/no:安全性递减,性能递增)
appendfsync everysec
# 在重写AOF文件时是否允许主进程调用fsync
# (因为重写AOF文件时会有大量I/O,
# 如果此时主进程也写入AOF文件会造成阻塞,
# 如果不写入则有可能造成数据丢失)
no-appendfsync-on-rewrite no
# 当超过min-size且当前大小超过上一次重写时大小的percentage%时自动重写AOF
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
# 当AOF文件尾部出错时是否尽可能多地加载正确部分(加载前建议先redis-check-aof)
aof-load-truncated yes
可以通过官方文档和antirez的blog对它们更进一步地详细了解:
https://redis.io/topics/persistence
http://oldblog.antirez.com/post/redis-persistence-demystified.html
关于并发问题,redis的数据库操作是单线程的,因此可以不必担心它的线程安全问题,至于性能,I/O本身就是最大的开销不是吗。
四、其他配置项
用到的几个比较关键的配置项有:
- bind:如果配置此项的话,非bind ip无法访问redis,为了数据库安全性建议配置;
- loglevel:一般在调试开发的时候用debug,部署的时候用notice;
- maxclients:允许的最大客户端连接数;
- maxmemory:允许使用的最大内存,单位为字节,建议配置以防内存溢出;
- maxmemory-policy:内存淘汰策略,这部分我是自己写的,因为redis提供的不满足需求,如果符合需求想省事儿的话可以在这里采用redis提供的策略。
其他还有许多配置项就不一一列举了,需要的时候可以自己查看conf文件,里面的注释也十分详尽,注意秉承用不到就不修改的原则。
一些感想:
- 一个程序员最根本的武器还是是数据结构和算法,当有数据库的需求时,如果它内置了熟悉的数据结构或算法就可以直接拿来用,否则需要重新审视一下技术选型或者是自己造一个好用的轮子来解决。
- 在时间不宽裕的情况下不要过早过多地考虑效率、性能等问题,要相信编译器,不纠结于细节,可以在需要时再行优化。
- 如果对程序有更高的性能需求,任何组件当做黑盒来用都是不合适的,必须先了解它的基本原理才能得心应手。