在Redis中,用户可以通过slaveof命令或者设置slaveof选项,让一个服务器去复制另一个服务器,我们称被复制的服务器为主服务器
,假设我们现在有2个Redis服务器,地址为127.0.0.1:6379和127.0.0.1:16379,如果我们向服务器127.0.0.1:16379发送以下命令:
127.0.0.1:16379> slaveof 127.0.0.1:6379
OK
那么服务器127.0.0.1:16379就是127.0.0.1:6379的从服务器,当然所有在主服务器上的操作,在从服务器中也会执行一次
Redis的复制分为同步和命令传播2个操作,同步操作用于将从服务器的数据库状态更新至主服务器当前所处的状态
命令传播则是用于在主服务器的数据库状态被修改,出现主从数据不一致时,让主从服务器重新处于一致状态
同步操作步骤:
1、从服务向主服务器发送SYNC命令
2、收到SYNC命令的主服务器执行BGSAVE命令,在后台生成一个RDB文件,并使用缓冲区记录从现在开始执行的所有写命令
3、当主服务器的BGSAVE命令执行完毕后,主服务器会将BGSAVE生成的RDB文件传输给从服务器,从服务器载入RDB文件,开始执行写操作
4、主服务器将记录在缓冲区的所有写操作命令发送给从服务器,从服务器执行命令
我们知道在redis.c中定义了命令需要执行的方法{"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0}
void slaveofCommand(redisClient *c) {
/* 表示集群模式下该节点不能作为从节点 */
if (server.cluster_enabled) {
addReplyError(c,"SLAVEOF not allowed in cluster mode.");
return;
}
/* 如果当前是SLAVEOF NO ONE命令,则表示取消复制模式 */
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) {
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
redisLog(REDIS_NOTICE,
"MASTER MODE enabled (user request from '%s')",client);
sdsfree(client);
}
} else {
PORT_LONG port;
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
return;
/* 校验当前是否已经进入复制模式了,则直接返回 */
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
return;
}
/* 该方法中会set主服务器ip与host,需要注意的是SYNC命令是异步的,所有会直接返回连接成功,实际的复制工作在返回OK之后 */
replicationSetMaster(c->argv[1]->ptr, (int)port); WIN_PORT_FIX /* cast (int) */
sds client = catClientInfoString(sdsempty(),c);
redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')",
server.masterhost, server.masterport, client);
sdsfree(client);
}
addReply(c,shared.ok);
}
server中保存结构
struct redisServer {
char *masterhost;
int masterport;
}
真正的任务调度是在serverCron中操作:
/* Replication cron function -- used to reconnect to master and
* to detect transfer failures. */
run_with_period(1000) replicationCron(); //表示每1s执行复制操作
套接字的连接
/* Check if we should connect to a MASTER */
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
}
}
在connectWithMaster方法中aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL)创建了建立的事件
发送一个PING来检查主机是否能够无错误地回复
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REDIS_REPL_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
if (err) goto write_error;
WIN32_ONLY(WSIOCP_QueueNextRead(fd);)
return;
}
主服务器在接到 PING 信息后 会返回 +PONG\r\n,触发 从服务器写事件,调用 之前我们新建的从服务器读事件,回调 syncWithMaster() 进行处理
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
redisLog(REDIS_NOTICE,
"Master replied to PING, replication can continue...");
}
sdsfree(err);
server.repl_state = REDIS_REPL_SEND_AUTH;
}
鉴权部分
if (server.repl_state == REDIS_REPL_SEND_AUTH) {
if (server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REDIS_REPL_RECEIVE_AUTH;
WIN32_ONLY(WSIOCP_QueueNextRead(fd);)
return;
} else {
server.repl_state = REDIS_REPL_SEND_PORT;
}
}
/* Receive AUTH reply. */
if (server.repl_state == REDIS_REPL_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (err[0] == '-') {
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
server.repl_state = REDIS_REPL_SEND_PORT;
}
从服务器上的端口信息发送给主服务器
if (server.repl_state == REDIS_REPL_SEND_PORT) {
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"listening-port",port, NULL);
sdsfree(port);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REDIS_REPL_RECEIVE_PORT;
return;
}
/* Receive REPLCONF listening-port reply. */
if (server.repl_state == REDIS_REPL_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
redisLog(REDIS_NOTICE,"(Non critical) Master does not understand "
"REPLCONF listening-port: %s", err);
}
sdsfree(err);
server.repl_state = REDIS_REPL_SEND_CAPA;
}
PSYNC命令,全量同步与增量同步
基本上后续的复制操作都是基于PSYNC实现
1)、如果从服务器没有复制过主服务器,或者没有执行过slaveof no one命令,那么从服务器在开始新的复制时将向主服务器
发送PSYNC ? -1 命令,主动请求主服务器进行完整重同步
//从此处可以看出来,节点之间的数据复制都是传递的rdb文件
if (slaves_waiting && max_idle > server.repl_diskless_sync_delay) {
/* Start a BGSAVE. Usually with socket target, or with disk target
* if there was a recent socket -> disk config change. */
startBgsaveForReplication(mincapa);
}
2)、如果从服务器是第一次复制,发送PSYNC ? -1,否则发送PSYNC <runid> <offset>
if (server.cached_master) {
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
} else {
redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
3)、后续基于前面操作是否执行重同步
4)、检测命令丢失
replicationFeedSlaves():会将执行的命令以协议的传输格式写到从服务器 client 的输出缓冲区中