Redis集群是Redis提供的分布式数据库方案,集群通过分片来进行数据共享,并提供复制和故障转移功能,需要注意的是redis cluster默认 使用的是0号数据库,并且不能更改
1、启动节点
一个节点就是一个运行在集群模式下的Redis服务器,Redis服务器在启动时会根据cluster-enable配置选项是否为yes来决定是否开启服务器 集群模式
在redis.c/initServer()方法中进行cluster初始化,代码1892行
if (server.cluster_enabled) clusterInit();
//初始化成功之后
if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == REDIS_ERR)
{
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
redisPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}
2、集群数据结构
clusterNode结构保存了一个节点的当前状态,比如节点的创建时间,节点的名字,节点当前配置纪元,节点的ip和port等等
每个节点都会使用clusterNode结构来记录自己的状态,并为集群中其他node节点创建一个clusterNode结构,以此来记录其他节点的状态:
#define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */
#define REDIS_IP_STR_LEN 46 /* INET6_ADDRSTRLEN is 46, but we need to be sure */
struct clusterNode {
//创建节点的时间
mstime_t ctime;
//节点的名字
char name[REDIS_CLUSTER_NAMELEN];
//节点标识,角色,主节点或者从节点
int flag;
//配置纪元,实现故障转移
uint64_t configEpoch;
//节点的ip地址
char ip[REDIS_IP_STR_LEN];
//节点端口
int port;
//保存了连接节点的所有信息
clusterLink *link;
}
struct clusterLink {
//连接创建时间
mstime_t ctime;
//tcp套接字描述符
int fd;
//输入缓冲区,保存着从其他节点接受到的信息
sds rcvbuf;
//与这个连接相关联的节点,如果没有就为null
struct clusterNode *node;
}
最终rediserver是与cluster属性关联,cluster结构为clusterState
struct clusterState {
//当前节点
clusterNode *myself;
//配置纪元,故障转移
uint64_t currentEpoch;
//节点状态 REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL
int state;
//集群至少处理一个槽的节点的数量
int size;
//集群中所有节点
dict *nodes;
...
} clusterState;
3、cluster meet命令解析
//向一个节点node发送cluster meet命令,可以让node节点与ip和port所指定的节点握手,
//握手成功后,就归到当前所在的集群中
cluster meet <ip> <port>
//查看当前cluster的节点
cluster nodes
{"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}
//当cluster meet操作时源码
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
long long port;
if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s",
(char*)c->argv[3]->ptr);
return;
}
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
errno == EINVAL)
{
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
addReply(c,shared.ok);
}
}
clusterStartHandshake也就是正式进行节点之间的通信,返回2种形式结果
EAGAIN - There is already an handshake in progress for this address.
EINVAL - IP or port are not valid.
/* Add the node with a random address (NULL as first argument to
* createClusterNode()). Everything will be fixed during the
* handshake. */
n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);
clusterAddNode添加node节点到hash table中
//在server.c中的serverCron方法中,执行具体的cluster逻辑
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
//连接创建成功之后,出发了读事件,如果有数据返回的话
aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
//主节点返回MEET信息给从节点
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
//发送数据到从节点
clusterSendMessage(link,buf,totlen);
4、槽指派
Redis集群通过分片的方式保存数据,集群的整个数据库被分为16384个slot,数据库中的每个key都属于这16384个slot中的其中一个,集群中的每个节点可以处理0个或者16384个slot
当数据库中的16384个slot都有节点在处理的时候,集群处于上线状态,否则处理fail状态
//查看集群状态
> cluster info
//命令将槽0到5000指派给当前节点负责
> cluster addslots 0 1 2 ... 5000
//槽的指派在clusterNode节点中
struct clusterNode {
unsigned char slots[16384/8];
int numslots;
}
slots属性是一个二进制位数组,这个数组的长度为16384/8=2048个字节,共包含16384个二进制位
如下图:
字节 | slots[0] | slots[1]-slots[2047] |
索引 |0|1|2|3|4|5|6|7 |8|9|10|11|12...|16383 |
值 |1|1|1|1|1|1|1|1 |0|0|0|0|0|0...| 0 |
如果索引的值为1,表示当前节点处理那些卡槽,当然卡槽的信息会被传播到各个节点
在对数据库中的16384个槽都分配之后,集群就会进入上线状态,这时候客户端就可以向集群中的节点发送数据
计算key在那个槽
追溯源码是在redis client连接server之后从client模块读取命令数据
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
==>int thisslot = keyHashSlot((char*)thiskey->ptr,(int)sdslen(thiskey->ptr));
==>if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
逻辑流程为:当节点计算出来所属的槽
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
//如果当前客户端请求路由的key是由本server执行,那么返回当前节点,否则返回
//if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
//表示当前客户端需要请求到另外一个节点MOVE操作
n = server.cluster->slots[slot];
if (c->flags & REDIS_READONLY &&
cmd->flags & REDIS_CMD_READONLY &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}
//在客户端的逻辑中则会判断
/* Check if we need to connect to a different node and reissue the
* request. */
if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR &&
(!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK")))
MOVE返回格式为 MOVED <slot> <ip>:<port>