20 Nov 2021

redis源码解析之集群模式

Redis集群是Redis提供的分布式数据库方案,集群通过分片来进行数据共享,并提供复制和故障转移功能,需要注意的是redis cluster默认 使用的是0号数据库,并且不能更改

1、启动节点
一个节点就是一个运行在集群模式下的Redis服务器,Redis服务器在启动时会根据cluster-enable配置选项是否为yes来决定是否开启服务器 集群模式

在redis.c/initServer()方法中进行cluster初始化,代码1892行
if (server.cluster_enabled) clusterInit();

//初始化成功之后
if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
        server.cfd,&server.cfd_count) == REDIS_ERR)
{
    exit(1);
} else {
    int j;

    for (j = 0; j < server.cfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
            clusterAcceptHandler, NULL) == AE_ERR)
                redisPanic("Unrecoverable error creating Redis Cluster "
                            "file event.");
    }
}

2、集群数据结构
clusterNode结构保存了一个节点的当前状态,比如节点的创建时间,节点的名字,节点当前配置纪元,节点的ip和port等等
每个节点都会使用clusterNode结构来记录自己的状态,并为集群中其他node节点创建一个clusterNode结构,以此来记录其他节点的状态:

#define REDIS_CLUSTER_NAMELEN 40    /* sha1 hex length */
#define REDIS_IP_STR_LEN 46 /* INET6_ADDRSTRLEN is 46, but we need to be sure */
struct clusterNode {

    //创建节点的时间
    mstime_t ctime;
    
    //节点的名字
    char name[REDIS_CLUSTER_NAMELEN];
    
    //节点标识,角色,主节点或者从节点
    int flag;
    
    //配置纪元,实现故障转移
    uint64_t configEpoch;
    
    //节点的ip地址
    char ip[REDIS_IP_STR_LEN];
    
    //节点端口
    int port;
    
    //保存了连接节点的所有信息
    clusterLink *link;

}

struct clusterLink {

    //连接创建时间
    mstime_t ctime;

    //tcp套接字描述符
    int fd;
    
    //输入缓冲区,保存着从其他节点接受到的信息
    sds rcvbuf;
    
    //与这个连接相关联的节点,如果没有就为null
    struct clusterNode *node;

} 

最终rediserver是与cluster属性关联,cluster结构为clusterState
struct clusterState {
    
    //当前节点
    clusterNode *myself;
    
    //配置纪元,故障转移
    uint64_t currentEpoch;
    
    //节点状态 REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL
    int state;     
    
    //集群至少处理一个槽的节点的数量 
    int size;            
    
    //集群中所有节点
    dict *nodes;         
    
    ...
    
} clusterState;

3、cluster meet命令解析

//向一个节点node发送cluster meet命令,可以让node节点与ip和port所指定的节点握手,
//握手成功后,就归到当前所在的集群中
cluster meet <ip> <port>

//查看当前cluster的节点
cluster nodes

{"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}

//当cluster meet操作时源码
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
    long long port;

    if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
        addReplyErrorFormat(c,"Invalid TCP port specified: %s",
                            (char*)c->argv[3]->ptr);
        return;
    }

    if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
        errno == EINVAL)
    {
        addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                        (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
    } else {
        addReply(c,shared.ok);
    }
}

clusterStartHandshake也就是正式进行节点之间的通信,返回2种形式结果
EAGAIN - There is already an handshake in progress for this address.
EINVAL - IP or port are not valid.

/* Add the node with a random address (NULL as first argument to
     * createClusterNode()). Everything will be fixed during the
     * handshake. */
n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);

clusterAddNode添加node节点到hash table中

//在server.c中的serverCron方法中,执行具体的cluster逻辑
run_with_period(100) {
    if (server.cluster_enabled) clusterCron();
}

//连接创建成功之后,出发了读事件,如果有数据返回的话
aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link);
//主节点返回MEET信息给从节点
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
//发送数据到从节点
clusterSendMessage(link,buf,totlen);

4、槽指派
Redis集群通过分片的方式保存数据,集群的整个数据库被分为16384个slot,数据库中的每个key都属于这16384个slot中的其中一个,集群中的每个节点可以处理0个或者16384个slot
当数据库中的16384个slot都有节点在处理的时候,集群处于上线状态,否则处理fail状态

//查看集群状态
> cluster info
   
//命令将槽0到5000指派给当前节点负责   
> cluster addslots 0 1 2 ... 5000

//槽的指派在clusterNode节点中
struct clusterNode {

    unsigned char slots[16384/8];
    
    int numslots;

}

slots属性是一个二进制位数组,这个数组的长度为16384/8=2048个字节,共包含16384个二进制位
如下图:
字节 |    slots[0]    |  slots[1]-slots[2047] |  
索引 |0|1|2|3|4|5|6|7 |8|9|10|11|12...|16383  |
 值  |1|1|1|1|1|1|1|1 |0|0|0|0|0|0...| 0      |

如果索引的值为1,表示当前节点处理那些卡槽,当然卡槽的信息会被传播到各个节点

在对数据库中的16384个槽都分配之后,集群就会进入上线状态,这时候客户端就可以向集群中的节点发送数据
计算key在那个槽

追溯源码是在redis client连接server之后从client模块读取命令数据
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
    ==>int thisslot = keyHashSlot((char*)thiskey->ptr,(int)sdslen(thiskey->ptr)); 
        ==>if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
    
逻辑流程为:当节点计算出来所属的槽
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);

robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
                           sdslen(thiskey->ptr));

//如果当前客户端请求路由的key是由本server执行,那么返回当前节点,否则返回 
//if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
//表示当前客户端需要请求到另外一个节点MOVE操作
n = server.cluster->slots[slot];
if (c->flags & REDIS_READONLY &&
    cmd->flags & REDIS_CMD_READONLY &&
    nodeIsSlave(myself) &&
    myself->slaveof == n)
{
    return myself;
}

//在客户端的逻辑中则会判断
/* Check if we need to connect to a different node and reissue the
     * request. */
if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR &&
    (!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK")))
    
MOVE返回格式为 MOVED <slot> <ip>:<port>    

Tags:
0 comments