19 Oct 2021

redis源码解析之哨兵

Sentinel是Redis高可用性的解决方案:由一个或多个Sentinel实例组成的Sentinel系统可以监控多个主服务器,以及这些主服务器下的所有 从服务器,如下图所示:

从图中可以看到,当前主服务器server1,三个从服务是server2,server3,server4,而Sentinel系统监视所有四个服务器

假设这时,主服务器server1进入下线状态,那么从服务器server2、server3、server4对主服务器的复制会被终止,并且Sentinel系统会察觉server1已经下线
会执行故障转移操作:
1)、Sentinel系统会挑选server1属下的一个从服务器,并将这个被选中的从服务器升级为新的主服务器
2)、Sentinel系统会向server1下所有的从服务器发送新的复制指令,让他们成为新的主服务器的从服务器,当所有从服务器都开始复制新的主服务器时,故障转移操作执行完毕
3)、另外,Sentinel还会继续监视已下线的server1,并在他重新上线时,将他设置为新的主服务器的从服务器

启动初始化Sentinel

$ redis-sentinel /path/to/your/sentinel.conf
或者命令
$ redis-server /path/to/your/sentinel.conf --sentinel

源码在初始化server的时候可以看到
server.sentinel_mode = checkForSentinelMode(argc,argv);
=====>
    int checkForSentinelMode(int argc, char **argv) {
        int j;
    
        if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
        for (j = 1; j < argc; j++)
            if (!strcmp(argv[j],"--sentinel")) return 1;
        return 0;
    }

具体初始化逻辑在于:

/* We need to init sentinel right now as parsing the configuration file
     * in sentinel mode will have the effect of populating the sentinel
     * data structures with master nodes to monitor. */
if (server.sentinel_mode) {
    initSentinelConfig();
    initSentinel();
}    

在哨兵模式中,执行的语句有限制
struct redisCommand sentinelcmds[] = {
    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
    {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
    {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
    {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};

初始化Sentinel时候,需要初始化Sentinel状态
struct sentinelState {

    uint64_t current_epoch;     /* 当前纪元,用于实现故障转移 */
    
    // 保存了所有被这个sentinel监视的主服务器,key为主服务器的name,value为指向sentinelRedisInstance结构的指针
    dict *masters;

} 

typedef struct sentinelRedisInstance {

    //标识符,实例当前的状态
    int flags;
    
    //实例的名称
    char* name;
    
    //实例的地址
    sentinelAddr* addr;

    ...
}   

如下如所示,结构:

初始化sentinel的最后一步就是创建连向监视主服务器的网络连接,sentinel将成为主服务器的客户端,它可以向主服务器发送命令,该部分在serverCron中

/* Run the Sentinel timer if we are in sentinel mode. */
run_with_period(100) {
    if (server.sentinel_mode) sentinelTimer();
}

===>
    void sentinelTimer(void) {
        sentinelCheckTiltCondition();
        sentinelHandleDictOfRedisInstances(sentinel.masters);
        sentinelRunPendingScripts();
        sentinelCollectTerminatedScripts();
        sentinelKillTimedoutScripts();
    
        /* We continuously change the frequency of the Redis "timer interrupt"
         * in order to desynchronize every Sentinel from every other.
         * This non-determinism avoids that Sentinels started at the same time
         * exactly continue to stay synchronized asking to be voted at the
         * same time again and again (resulting in nobody likely winning the
         * election because of split brain voting). */
        server.hz = REDIS_DEFAULT_HZ + rand() % REDIS_DEFAULT_HZ;
    }
    
    ===>
        sentinelHandleDictOfRedisInstances(ri);
        
        ===>
            sentinelReconnectInstance(ri);
            
            ===>
                ri->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
                连接成功之后发送"ping"
                /* Send a PING ASAP when reconnecting. */
                sentinelSendPing(ri);

说到Redis的同步异步流程,需要了解redis的Hiredis模块,它涵盖了所有的同步异步操作
所谓的同步操作,就是以阻塞的方式向Redis服务器建链,发送命令,接收命令回复。使用同步操作API,主要涉及以下三个API函数:

//创建连接
redisContext *redisConnect(const char *ip, int port);
//发送redis命令
void *redisCommand(redisContext *c, const char *format, ...);
//接收返回的数据
void freeReplyObject(void *reply);

异步操作流程:

redisContext *redisConnectBindNonBlock(const char *ip, int port, const char *source_addr); 

在sentinelReconnectInstance中可以看到后续的事件绑定操作
redisAeAttach(server.el,ri->cc);
==>
    redisContext *c = &(ac->c);
    redisAeEvents *e;

    /* Nothing should be attached when something is already attached */
    if (ac->ev.data != NULL)
        return REDIS_ERR;

    /* Create container for context and r/w events */
    e = (redisAeEvents*)zmalloc(sizeof(*e));
    e->context = ac;
    e->loop = loop;
    e->fd = c->fd;
    e->reading = e->writing = 0;

    /* Register functions to start/stop listening for events */
    ac->ev.addRead = redisAeAddRead;
    ac->ev.delRead = redisAeDelRead;
    ac->ev.addWrite = redisAeAddWrite;
    ac->ev.delWrite = redisAeDelWrite;
    ac->ev.cleanup = redisAeCleanup;
    ac->ev.data = e;
    ===>
        redisAeAddRead方法中创建了文件事件,从而和之前的epoll挂上钩了

其实除了上面建立通信的连接之外,还有一个订阅连接,这个链接专门用于订阅主服务器的__sentinel__:hello频道

#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
//绑定流程和之前一样
ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
retval = redisAsyncCommand(ri->pc,
    sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
        SENTINEL_HELLO_CHANNEL);
if (retval != REDIS_OK) {
    /* If we can't subscribe, the Pub/Sub connection is useless
     * and we can simply disconnect it and try again. */
    sentinelKillLink(ri,ri->pc);
    return;
}          

当2个连接都创建成功后,需要去获取服务器相关的信息了

sentinelHandleRedisInstance.c/sentinelSendPeriodicCommands
/* Send INFO to masters and slaves, not sentinels. */
retval = redisAsyncCommand(ri->cc,
    sentinelInfoReplyCallback, NULL, "INFO");
if (retval == REDIS_OK) ri->pending_commands++;  

//回调方法sentinelInfoReplyCallback中会处理redis server返回的数据信息并且处理
//一方面是服务器的信息,如果run_id,记录的是服务器的运行id
//另一方面,从服务器信息,从服务器都是以"slave" 字符串开头记录行记录

频道信息发送

else if ((now - ri->last_pong_time) > ping_period) {
    /* Send PING to all the three kinds of instances. */
    sentinelSendPing(ri);
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
    /* PUBLISH hello messages to all the three kinds of instances. */
    sentinelSendHello(ri);
} 

频道信息的主体是payload,包括了当前sentinel的信息

接下来的部分

//检测每个实例下是否出现了断开的情况
sentinelCheckSubjectivelyDown(ri);    

/* Check if we are in need for a reconnection of one of the
 * links, because we are detecting low activity.
 *
 * 1) Check if the command link seems connected, was connected not less
 *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have a
 *    pending ping for more than half the timeout. */
if (ri->cc &&
    (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
    ri->last_ping_time != 0 && /* Ther is a pending ping... */
    /* The pending ping is delayed, and we did not received
     * error replies as well. */
    (mstime() - ri->last_ping_time) > (ri->down_after_period/2) &&
    (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
{
    sentinelKillLink(ri,ri->cc);
}

/* 2) Check if the pubsub link seems connected, was connected not less
 *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
 *    activity in the Pub/Sub channel for more than
 *    SENTINEL_PUBLISH_PERIOD * 3.
 */
if (ri->pc &&
    (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
    (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
    sentinelKillLink(ri,ri->pc);
}

对于不断重连在于外层轮询中去创建连接,轮询中已经包含了slaves,sentinels模块等:

void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;

    /* There are a number of things we need to perform against every master. */
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        sentinelHandleRedisInstance(ri);
        if (ri->flags & SRI_MASTER) {
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    if (switch_to_promoted)
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}   

故障转移的源码逻辑

1)、从已下线的主服务器属下的所有从服务器里面,挑选一个从服务器,并转换为主服务器
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
     sentinelFailoverSelectSlave(ri);
break;

//此处从 从服务器中选择一个节点作为主服务器
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
    sentinelRedisInstance *slave = sentinelSelectSlave(ri);

    /* We don't handle the timeout in this state as the function aborts
     * the failover or go forward in the next state. */
    if (slave == NULL) {
        sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
        sentinelAbortFailover(ri);
    } else {
        sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
        slave->flags |= SRI_PROMOTED;
        ri->promoted_slave = slave;
        ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
        ri->failover_state_change_time = mstime();
        sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
            slave, "%@");
    }
}

2)、让已下线主服务器下的所有从服务器改为复制新的主服务器
sentinel.c/sentinelFailoverReconfNextSlave

/* Send SLAVEOF <new master>. */
retval = sentinelSendSlaveOf(slave,
        master->promoted_slave->addr->ip,
        master->promoted_slave->addr->port);
            

3)、已下线的主服务器设置为新的主服务器的从服务器     

Tags:
0 comments