来聊聊Redis持久化AOF管道通信的设计
# 写在文章开头
最近遇到很多烦心事,希望通过技术来得以放松,今天这篇文章笔者希望会通过源码的方式分析一下AOF如何通过Linux父子进程管道通信的方式保证进行AOF异步重写时还能实时接收用户处理的指令生成的AOF字符串,从而保证尽可能的可靠性。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 详解AOF管道通信的设计
# Linux管道通信进程
在进程AOF重写时,redis会fork出一个子进程,让子进程进行异步重写机制,避免AOF文件重写的耗时导致redis执行性能下降。由此也诞生了另外一个问题,AOF子进程异步重写期间,用户最新发送的指令能否被AOF子进程接收并持久化到文件中。
对此redis借助Linux管道通信的方式实现,通过管道通信的方式实现实时数据发送,对应子进程收到这些指令对应的字符串之后,就会将其写入AOF重写文件。

需要注意的是Linux管道通信通常都是单向的,即收发通道需要交由两个数组空间才能实现,例如父进程写入客户端实时指令到通道只能通过数组0空间完成发送,而客户端也只能通过数组1空间完成数组接收。同理要实现通道上客户端向服务端写数据和服务端读取数据就需要在新建相同的2长度的数组了。

我们给出创建AOF子进程的核心代码,即位于aof.c的rewriteAppendOnlyFileBackground,可以看到在创建子进程之前,redis会通过aofCreatePipes函数创建管道为后续的重写子进程以及父进程提供条件:
int rewriteAppendOnlyFileBackground(void) {
//......
if (aofCreatePipes() != REDIS_OK) return REDIS_ERR;//创建管道
start = ustime();
if ((childpid = fork()) == 0) {//fork子进程进行aof重写
char tmpfile[256];
//......
//生成一个tmp文件
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {//重写aof
size_t private_dirty = zmalloc_get_private_dirty();
//......
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
//......
}
return REDIS_OK;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
步入aofCreatePipes我们就可以看到笔者上文所介绍的管道pipes的创建逻辑,可以看到其内部初始化一个长度为6的数组空间,两两构成一个逻辑上的通道,按序通道依次是:
- 父进程写数据到子进程的收发通道。
- 子进程向父进程发送确保
ACK信号的通道。 - 父进程向子进程发送
ACK确认信号的通道。

对应的我们给出创建管道的核心代码即位于aof.c的aofCreatePipes,可以看到其通道本质就是通过创建一个长度为6的数组fds,按照笔者上文所说构成父进程发、子进程确认、父进程确认的通道,这其中父进程会调用anetNonBlock方法将该通道设置为写入时非阻塞以保证主进程写入最新数据时不会阻塞整个流程:
int aofCreatePipes(void) {
//创建3个管道
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
if (pipe(fds) == -1) goto error; /* parent -> children data. */
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
if (pipe(fds+4) == -1) goto error; /* children -> parent ack. */
/* Parent -> children data is non blocking. */
//父进程写到子进程的管道设置为非阻塞
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
//设置读事件监听
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
//将管道复制给各个成员遍历
//主进程向子进程读写数据的通道
server.aof_pipe_write_data_to_child = fds[1];
server.aof_pipe_read_data_from_parent = fds[0];
//子进程向父进程发送ack的通道
server.aof_pipe_write_ack_to_parent = fds[3];
server.aof_pipe_read_ack_from_child = fds[2];
//父进程向子进程发送ack通道的
server.aof_pipe_write_ack_to_child = fds[5];
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
return REDIS_OK;
error:
redisLog(REDIS_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
strerror(errno));
for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
return REDIS_ERR;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# AOF重写如何接收父进程数据
后续的父进程一旦收到客户端实时传入的指令例如set k v之后,其核心流程就会传播该事件到AOF链路上,将用户指令的字符串转为RESP格式(redis协议要求的格式)写入到父进程发送数据到子进程即第一个通道上,后续的子进程就会通过该通道的索引1数组获取这个最新的数据:

当服务端接收到客户端指令后就会执行call方法执行解析并执行客户端指令,然后通过propagate方法将客户端指令传播到AOF函数上并写入到通道中:
void call(redisClient *c, int flags) {
//......
//基于命令者模式执行客户端传入的指令
c->cmd->proc(c);
//......
//将指令传播到aof链路
if (flags & REDIS_CALL_PROPAGATE) {
int flags = REDIS_PROPAGATE_NONE;
//......
if (flags != REDIS_PROPAGATE_NONE)
//将指令cmd和键值对argv传入交由aof事件执行
propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
我们步入propagate即可看到其内部发现如果AOF非关闭状态且允许传播事件,则调用feedAppendOnlyFile追加客户端指令和键值对到通道中:
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
//如果aof非关闭且允许传播aof事件则调用feedAppendOnlyFile
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
//......
}
2
3
4
5
6
7
8
再次步入feedAppendOnlyFile就可以看到redis解析指令生成RESP字符串写入aof缓冲区之后再调用aofRewriteBufferAppend注册一个将缓冲区数据写入通道中的事件:
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
//......
//基于当前数据库生成select指令字符串
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
//基于命令和参数调用catAppendOnlyExpireAtCommand生成命令的字符串
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
//......
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
//......
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else {
//......
}
//如果开启aof则将buf写入aof_buf
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
if (server.aof_child_pid != -1)//存在aof子进程且上述操作存在写入新数据,下面的调用会注册的事件告知子进程当前有新数据写入
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
sdsfree(buf);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
最终我们可以看到aofRewriteBufferAppend函数可以看到该方法会将上一步写入aof缓冲区的数据写入到10M的数据块,再判断当前aof_pipe_write_data_to_child是否为0(默认为-1,0说明没有任何事件,可以写入数据)则注册一个aofChildWriteDiffData方法将这些数据写入到通道中:
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;
while(len) {
/* If we already got at least an allocated block, try appending
* at least some piece into it. */
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
//将数据追加到aof_rewrite_buf_blocks中一个10M的数据块
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}
//......
//查看aof_pipe_write_data_to_child是否有事件
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
//如果没有,则注册一个写事件调用aofChildWriteDiffData写入缓冲区
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
最后redis定时任务即定时的时间时间会轮询到注册的事件aofChildWriteDiffData,将数据块的数据取出并写入到aof_pipe_write_data_to_child所指向的即父进程写数据到子进程的数组中:
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
//......
while(1) {
//取出数据块
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
if (block->used > 0) {
//将数据写入1通道传给子进程
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
}
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 子进程如何保证可靠接收
后续的AOF重写的异步子进程会调用rewriteAppendOnlyFile遍历数据库键值完成重写之后,等到通道数据并完成写入后,双方各自发送确认ACK之后,再次将父进程写入通道的数据持久化到文件后,将数据刷盘:
int rewriteAppendOnlyFile(char *filename) {
//......
fp = fopen(tmpfile,"w");
//......
//读取内存数据写入aof缓冲区
//......
//等待父进程写入通道数据到来
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
//等待父进程从通道传来的数据
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0;
//从通道拿数据写入文件中
aofReadDiffFromParent();
}
//通过通道发送,告知主进程停止发送新信号进行重写
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
//收到parent确认信号后,确认收到后进行后续的最后数据写入和刷盘
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
redisLog(REDIS_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
//再一次通道中拿到父进程的数据
aofReadDiffFromParent();
//......
//刷盘,将文件数据持久化到硬盘中
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
最后我们给出aofReadDiffFromParent方法,可以看到AOF重写子进程本质就是通过read方法获取aof_pipe_read_data_from_parent数组中父进程写入的数据到aof缓冲区buf中,最后回到外层函数完成数据写入,由此完成一次完整的可靠AOF重写:
//AOF重写时调用这个函数
ssize_t aofReadDiffFromParent(void) {
char buf[65536]; /* Default pipe buffer size on most Linux systems. */
ssize_t nread, total = 0;
//读取数据到buf然后写入到aof_child_diff
while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
return total;
}
2
3
4
5
6
7
8
9
10
11
12
# 小结
自此我们通过三篇文章完整的介绍了AOF写入和重写的完整的流程,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。 因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。