来聊聊redis的发布订阅设计与实现
# 写在文章开头
借一个午休的时光整理一下关于redis发布订阅源码的设计与实现,通过本文的阅读,你将会对发布订阅模型的设计思想以及对哨兵间选举通信的流程有着更底层的视角。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的技术人,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 详解redis中发布订阅的设计
# channel的设计
redis服务端启动时,会初始化一个记录channel以及channel订阅者的键值对结构,它用channel的名称作为key,用一个链表记录这个订阅这个channel的客户端:

对此我们给出redis初始化函数initServer的代码片段,可以看到其内部调用dictCreate方法为pubsub_channels 这个记录channel和channel订阅者的指针初始化了一个频道名称为key,链表为value的字典:
void initServer(void) {
//......
// 初始化pubsub_channels存储频道信息,keylistDictType用频道名称作为key,订阅者list作为value
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
//......
}
2
3
4
5
6
# pub/sub的实现
当客户端1通过SUBSCRIBE mychannel订阅mychannel这个频道,本质上就是redis服务端解析SUBSCRIBE指令并调用subscribeCommand函数,该方法会检查这个channel是否存在,如果不存在则则以channel名称为key,初始化一个链表作为value,将订阅这个channel的客户端追加到链表中。反之,如果channel存在则直接将客户端信息存入链表即可:

对此我们给出对应的源码实现,该函数subscribeCommand位于pubsub.c可以看到其入口逻辑就是遍历参数得到当前客户端想订阅的频道,然后调用pubsubSubscribeChannel将该客户端追加到这个频道的链表上:
void subscribeCommand(redisClient *c) {
int j;
//遍历频道将该客户端存入
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
//将当前客户端标识为做了发布订阅
c->flags |= REDIS_PUBSUB;
}
2
3
4
5
6
7
8
我们步入pubsubSubscribeChannel方法即可看到上图所说明的逻辑,如果对应的频道不存在,则初始化然后将客户端追加到链表中,反之直接追加到链表中:
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
//频道添加到pubsub_channels中
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
//查看这个频道的订阅者链表是否存在
de = dictFind(server.pubsub_channels,channel);
//如果频道不存在,则直接初始化链表
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
//将客户端追加的链表尾巴
listAddNodeTail(clients,c);
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
同理,当我们通过redis客户端键入publish mychannel "hello"向mychannel 发送一个hello消息时,redis服务端会解析这条publish指令并调用publishCommand完成消息发布,通知到各个订阅者:

我们给出publishCommand的源码,位于pubsub.c这个源代码文件中,可以看到这段代码会将channel和对应的消息传入pubsubPublishMessage方法中,并返回接收者数:
void publishCommand(redisClient *c) {
//发布消息返回接收者 PUBLISH <channel> <message>,返回接收者的数量
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
//......
}
2
3
4
5
步入pubsubPublishMessage即可看到发布消息的核心逻辑,可以看到这个方法用receivers来记录接收的通知者,它会先进行精准匹配,到pubsub_channels找到和channel名字一致的channel并向该channel的订阅者发布消息,然后在进行模糊匹配,遍历所有的channel找到模糊匹配上的channel并向订阅者发布消息:
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
//查找名字相同的channel
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
//......
//移动至订阅者链表首部
listRewind(list,&li);
//遍历并向这些订阅者发布消息
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
//......
//发布消息
addReplyBulk(c,message);
//接收数++
receivers++;
}
}
if (listLength(server.pubsub_patterns)) {
//移动至channel链表首部
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
//遍历channel
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
//找到匹配的channel并发布消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
//......
addReplyBulk(pat->client,message);
receivers++;
}
}
}
return receivers;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 哨兵如何利用发布订阅完成消息通信的
关于pub/sub模式,redis中的哨兵就很好的利用这种模式进行沟通和选举等各个工作,当我们的redis以哨兵的方式启动时,redis会定期执行哨兵的定时任务,该任务会在检查连接时检查发布订阅master的连接是否为空,若为空则调用异步连接绑定的方式订阅master的"__sentinel__:hello"频道,而该频道主要负责下面这些工作:
Sentinel 实例的发现与信息交换:每个 Sentinel 实例会定期通过
__sentinel__:hello频道发布自己的信息,包括Sentinel的IP 地址、端口、运行 ID、当前配置的纪元(epoch)等。 其他 Sentinel 实例会订阅这个频道,从而感知到其他 Sentinel 的存在,并获取它们的信息。 监控主从节点的状态:Sentinel 实例通过
__sentinel__:hello频道共享它们对 Redis 主节点和从节点的监控信息:例如,某个 Sentinel 实例检测到主节点不可用时,会通过这个频道通知其他 Sentinel 实例,以便它们确认并共同决定是否进行故障转移。 故障转移的协调:在故障转移过程中,
Sentinel实例会通过__sentinel__:hello频道交换信息,协调谁来执行故障转移操作,并确保只有一个 Sentinel 实例负责执行。

void sentinelReconnectInstance(sentinelRedisInstance *ri) {
//......
//检查发布订阅是否为空
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
//若为空则pc指针指向异步连接
ri->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,REDIS_BIND_ADDR);
if (ri->pc->err) {
//......
} else {//如果没有报错,则订阅__sentinel__:hello频道
int retval;
//......
//哨兵订阅 __sentinel__:hello 频道(也就是下面的常量SENTINEL_HELLO_CHANNEL),通过sentinelReceiveHelloMessages处理回调
retval = redisAsyncCommand(ri->pc,
sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
//......
}
}
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
通过master的hello频道,哨兵会定期publish自己的信息到hello频道,其他哨兵就可以基于这个频道发现其他的哨兵由此完成通信:

对此我们给出哨兵定期发送hello的函数入口sentinelSendPeriodicCommands,这个方法会被定期执行,其内部逻辑一旦检查到pub/sub时间间隔过长时就会发送调用sentinelSendHello向hello频道发送当前哨兵的信息让其他哨兵感知:
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
//......
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
//......
} else if ((now - ri->last_pong_time) > ping_period) {//超过ping间隔发ping
//......
} else if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
//超过pub最大间隔SENTINEL_PUBLISH_PERIOD则发送发送哨兵自身ip端口等信息到hello频道
sentinelSendHello(ri);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
步入sentinelSendHello即可看到我们上文所说的逻辑,可以看到当前哨兵会组装个人信息通过异步连接cc指针维护的连接信息PUBLISH 个人信息到hello频道:
int sentinelSendHello(sentinelRedisInstance *ri) {
//......
//获取当前哨兵ip
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return REDIS_ERR;
announce_ip = ip;
}
//获取当前哨兵端口
announce_port = sentinel.announce_port ?
sentinel.announce_port : server.port;
//将数据拼接到payload中
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, server.runid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
//将组装的哨兵信息publish到hello频道(SENTINEL_HELLO_CHANNEL就是hello频道的常量变量值)
retval = redisAsyncCommand(ri->cc,
sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
SENTINEL_HELLO_CHANNEL,payload);
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 小结
自此我们们将redis发布订阅的设计与实现,本质上就是通过一个个链表管理订阅者,通过pub指令定位到channel后将消息遍历发送到对应客户端socket上,这里笔者也简单的补充一句,从源码中我们可以看到redis的发布订阅模型没有持久化机制,所以对于可靠性要求高的场景笔者还是不太建议使用pub/sub。
我是 sharkchili ,CSDN Java 领域博客专家,mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
同时也非常欢迎你star我的开源项目mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
# 参考
Redis发布订阅模式(publish/subscribe):https://blog.csdn.net/Jason_Dom/article/details/108599666 (opens new window)
PSUBSCRIBE:https://redis.io/docs/latest/commands/psubscribe/ (opens new window)
Redis发布订阅模式(publish/subscribe):https://blog.csdn.net/qq_50921201/article/details/140345433 (opens new window)