以从节点的角度看看Redis主从复制的实现
# 写在文章开头
近期工作遇到比较奇葩的规划,个人思绪略有惆怅,希望通过整理一些技术小文缓解心中的压抑,我们都知道redis为了保证可靠性设计出了主从复制的架构模式,而本文将从从库的角度深入分析redis如何通过状态机事件驱动扭转主从复制期间的状态。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# 详解redis主从复制的设计与实现
# 主从复制几个核心变量
我们可以通过replicaof 主库ip 主库端口号命令或者配置等方式为当前从库设置主库的IP和端口号,由此完成主从关系配置:

对应我们也给出从库的redisServer 中对于这些信息维护的字段,位于redis.h这个头文件中,可以看到该结构体记录了主库的认证信息、ip端口号等、以及当前从库与主库的同步状态:
struct redisServer {
//......
/* Replication (slave) */
//主库验证的密码等信息
char *masterauth; /* AUTH with this password with master */
//主机名
char *masterhost; /* Hostname of master */
//端口号
int masterport; /* Port of master */
int repl_timeout; /* Timeout after N seconds of master idle */
//和主库建立连接信息的客户端
redisClient *master; /* Client that is master for this slave */
//从库上缓存的主库信息
redisClient *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
//从库复制的状态机
int repl_state;
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 节点初始化
服务器初始化时,从库对应的redisServer会初始化主从复制的第一个状态即REDIS_REPL_NONE,还完全未开始的零态,即未开始任何主从同步的工作,仅仅做了状态初始化:

对应我们给出这一段初始化函数initServerConfig,它是redis入口函数main方法必须执行的函数,可以看到其内部对于主从复制状态初始化的操作:
void initServerConfig(void) {
int j;
//......
/* Replication related */
server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
server.repl_master_initial_offset = -1;
//1. 初始化主从复制的状态
server.repl_state = REDIS_REPL_NONE;
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 发起连接
当我们通过Slaveof 指令设置当前节点为其他节点的从节点时,redis会解析这条指令并调用slaveofCommand,其内部会为redisServer配置主节点信息,并将状态更新为REDIS_REPL_CONNECT,意为待待建立连接:

对应我们给出slaveofCommand的核心代码,可以看到其内部本质是调用replicationSetMaster设置主节点ip和端口号并更新主从同步状态:
void slaveofCommand(redisClient *c) {
//......
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
if (!strcasecmp(c->argv[1]->ptr,"no") &&
//......
} else {
long port;
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
return;
//......
//基于参数设置主库的信息
replicationSetMaster(c->argv[1]->ptr, port);
//......
}
addReply(c,shared.ok);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
我们步入replicationSetMaster印证我们所说的ip端口号配置以及主从状态更新:
void replicationSetMaster(char *ip, int port) {
//......
//设置ip和端口号
server.masterhost = sdsnew(ip);
server.masterport = port;
//......
//2. 主从库连接状态变为REDIS_REPL_CONNECT
server.repl_state = REDIS_REPL_CONNECT;
server.master_repl_offset = 0;
server.repl_down_since = 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
# 建立连接
我们都知道redis会定期执行时间事件,所以每次执行定时任务的方法serverCron时,就会检查每隔1s执行一次replicationCron,其内部就会检查redis的状态是否是待连接,如果是则通过主库的ip及端口号信息发起连接,并在连接期间将状态设置为REDIS_REPL_CONNECTING:

对应我们给出redis服务器定时任务方法serverCron,可以看到其内部本质就是每1s执行一次replicationCron:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//......
//1s执行一次,调用replicationCron执行主从复制事件
run_with_period(1000) replicationCron();
//......
}
2
3
4
5
6
replicationCron会检查状态是否是待连接REDIS_REPL_CONNECT,如果是则调用connectWithMaster尝试和master建立连接:
//定期执行的主从命令
void replicationCron(void) {
//......
//3. 运用状态机模式,如果连接状态为REDIS_REPL_CONNECT,则调用connectWithMaster和主库发起连接
if (server.repl_state == REDIS_REPL_CONNECT) {
redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == REDIS_OK) {
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
步入connectWithMaster即可看到核心代码段,可以看到其内部通过anetTcpNonBlockBindConnect发起非阻塞连接,并创建一个连接成功后的回调处理事件syncWithMaster,最后将REDIS_REPL_CONNECTING连接中,为当前从库的socket的读写事件设置syncWithMaster这个回调,方便后续收到主库响应进行进一步处理:
int connectWithMaster(void) {
int fd;
//发起非阻塞连接
fd = anetTcpNonBlockBindConnect(NULL,
server.masterhost,server.masterport,REDIS_BIND_ADDR);
//......
//注册连接后的回调处理方法syncWithMaster
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
// 连接成功后,状态变为REDIS_REPL_CONNECTING
server.repl_state = REDIS_REPL_CONNECTING;
return REDIS_OK;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 协定复制模式
主库对从库进行回复后,syncWithMaster被轮询到之后,如果发现当前从库处于REDIS_REPL_CONNECTING状态,说明此时处于建立连接,从库就会发送一个ping的消息,等待pong,所以将状态改为REDIS_REPL_RECEIVE_PONG。

收到master的pong之后,即说明当前通信已建立建立,从库就会向主库发起认证并发送自己的端口号信息方便主库可以通过info指令获取到所有从库的信息,最后非阻塞的请求master获取同步模式,对应的同步模式为:
PSYNC_CONTINUE:增量同步PSYNC_FULLRESYNC:全量同步PSYNC_NOT_SUPPORTED:不支持PSYNC。
最后从库会根据这个返回结果完成主从复制工作,并将状态设置为REDIS_REPL_TRANSFER即收到master的rdb文件:

对应我们给出该事件函数syncWithMaster的实现,可以看到该分发会根据状态repl_state 做出不同的处理,如果是REDIS_REPL_CONNECTING则发起ping建立连接,然后将状态修改为REDIS_REPL_RECEIVE_PONG等待master的pong响应。
下次执行读事件处理时再次来到syncWithMaster函数,发现状态为等待pong即REDIS_REPL_RECEIVE_PONG,则看看master发送的是不是pong,如果是则执行如下步骤:
- 发送
autn认证。 - 发送端口信息方便
master通过info可以看到从库的信息。 - 非阻塞再限时内获取同步模式。
- 注册文件读取事件,进行远程数据读取并写到
rdb文件中。
完成上述步骤后,将从库的状态机状态设置为REDIS_REPL_TRANSFER即收到master的rdb文件,对此我们给出这段描述的核心函数syncWithMaster:
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
//连接状态变是REDIS_REPL_CONNECTING后则发送ping,并将状态设置为等待pong REDIS_REPL_RECEIVE_PONG
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
//因为后续代码段发送ping了,所以可以删除对于写事件的处理,留着读事件和syncWithMaster的绑定关系
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
//状态更新为等待pong
server.repl_state = REDIS_REPL_RECEIVE_PONG;
//发送ping并返回
syncWrite(fd,"PING\r\n",6,100);
return;
}
//下次收到当前socket读事件走到syncWithMaster回调,发现当前从库socket状态为REDIS_REPL_RECEIVE_PONG,则说明我们需要查看是否收到pong响应,该逻辑进行字符串校验
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
//删除syncWithMaster读写事件处理
aeDeleteFileEvent(server.el,fd,AE_READABLE);
//判断收到的是不是pong如果不是则抛异常
buf[0] = '\0';
if (syncReadLine(fd,buf,sizeof(buf),
server.repl_syncio_timeout*1000) == -1)
{
//......
}
}
/* AUTH with the master if required. */
// 发起认证
if(server.masterauth) {
err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
//......
}
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
{
//发送从库端口等信息
sds port = sdsfromlonglong(server.port);
err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
NULL);
//......
}
//获取同步模式
psync_result = slaveTryPartialResynchronization(fd);
//若是增量复制则是直接执行早期注册过的读事件回调函数readSyncBulkPayload进行增量同步
if (psync_result == PSYNC_CONTINUE) {
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
return;
}
//不支持PSYNC发送SYNC进行同步
if (psync_result == PSYNC_NOT_SUPPORTED) {
redisLog(REDIS_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
//......
}
}
//创建临时文件并作为同步文件
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
//......
//注册读取master rdb数据到从库的读事件以及处理函数readSyncBulkPayload
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
//......
}
//设置为传输完成
server.repl_state = REDIS_REPL_TRANSFER;
//repl_transfer_fd 指向文件的句柄
server.repl_transfer_fd = dfd;
//......
return;
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# 写入同步结果
上一步完成readSyncBulkPayload函数注册之后,后续从库就会调用readSyncBulkPayload读取master的数据并写入到repl_transfer_fd 所指向的文件上:
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
//读取数据到buf
nread = read(fd,buf,readlen);
//......
//写入数据到repl_transfer_fd指向的文件中
if (write(server.repl_transfer_fd,buf,nread) != nread) {
//......
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
# 小结
自此我们将redis基于状态机对主从复制模式的实现的源码分析就完成了,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。 因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
