mini-redis如何解析处理客户端请求
# 写在文章开头
在之前的引子中笔者大体介绍了一下实现mini-redis过程中一些网络读写模块的处理过程,而这篇文章将参考redis源码完成command指令和ping指令的解析和回复,希望对你有所帮助。

Hi,我是 sharkChili ,是个不断在硬核技术上作死的技术人,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

# 详解mini-redis协议解析和实现思路
# 抓包了解RESP协议
了解redis协议的本质最简单的方式就是通过抓包的方式查看协议的基本格式和建立连接时的交互逻辑,所以这里笔者也贴出自己在研究redis源码时所抓的网络包。
如下所示,可以看到在三次握手之后,redis客户端就会向服务端发送一个command的指令,而该指令会返回服务端所支持的指令集:

仔细观察该协议的内容,按照RESP的协议,实际上command的对应的字符串转为思路为:
command只有一个字符串,所以长度为1,按照RESP协议规范就是*1,即一个长度为1的数组。command长度为7,也就是$7,然后拼接换行符\r\n,再追加command,再\r\n。
最终我们得到的字符串就是*1\r\n$7\r\ncommand\r\n,基于这条协议,redis会得到参数数量即argc为1,而对应数组argv值为command,基于这些参数redis最终会走到redis的command指令处理函数,并将处理结果返回给客户端。
这一点,我们也可以通过抓包的内容印证:

由于本篇文章,着重说明笔者的实现mini-redis的思路,更多关于RESP的具体协议解析流程,读者可参考笔者之前写的这一篇文章:
硬核详解redis客户端指令与服务端传输协议RESP (opens new window)
# 实现思路
由于go语言是多协程的语言,所以为了保证mini-redis能够像redis一样单线程顺序处理所有请求,笔者基于go语言中的go-routine-per-connection基础上增加一个channel有序将所有客户端传入指令交给redis服务端处理:

基于上述思路,笔者将介绍mini-redis解析RESP的协议交互基调以及完成生成的参数通过命令模式走到对应处理函数的核心逻辑开发,首先自然是协议解析的步骤,笔者以RESP规范介绍一下set指令实际的样子。
我们都知道存储字符串时,都是通过set指令存储键值对,假设我们要存储一个key为k,value值为v的字符串,对应的指令就是set k v,按照RESP协议规范,对应的生成的字符串过程为:
- 因为带有
set、k、v这3个字符串,对应*后面跟着3,因为这个指令的数组长度为3。 - 第1个字符串为
set,长度为3,所以得到set的块字符串$3\r\nset\r\n。 - 第2个字符串为
k,长度为1,所以得到set的块字符串$1\r\nk\r\n。 - 第3个字符串为
v,长度为1,所以得到v的块字符串$1\r\nv\r\n。
于是对应的完整的字符串为:*3\r\n$3\r\nset\r\n$1\r\nk\r\n$1\r\nv\r\n,由此可知我们后续的解析思路就是:
- 通过
*获取字符串个数。 - 通过
$获取每个字符串的长度,然后跳过换行符\r\n获取对应长度的字符串。
# 协议解析代码落地
经过上述的思路分析相信读者已经了解了笔者的大体实现思路,接下来笔者就来讲述了对应的实现细节,如下这段位于client.go的readQueryFromClient函数,其无论函数名还是执行逻辑都和原生redis作用一致,都是用于解析redis-cli传入的指令RESP字符串。
唯一的区别就是笔者用go语言的实现中,通过redisClient指针拿到当前客户端的连接对象net.Conn的reader ,并将其传给processInputBuffer让该函数完成指令解析并将参数传到commandCh中:
func readQueryFromClient(c *redisClient, CloseClientCh chan redisClient, commandCh chan redisClient) {
//get the network reader through the redis client's connection.
reader := bufio.NewReader(c.conn)
//parse the string through the reader, and pass the parsing result to commandCh for Redis server to parse and execute.
processInputBuffer(c, reader, CloseClientCh, commandCh)
}
2
3
4
5
6
7
步入processInputBuffer即可看到笔者的处理逻辑,笔者通过\n截取每一段字符串,如果是*开头的字符串,笔者会拿到*后面的数字n,并基于这个n创建长度为n的数组,然后调用processMultibulkBuffer将解析到字符串存到这个数组argv中:

对应笔者贴出processInputBuffer的实现代码,和上述所说基本一致将*后面的值存到multibulklen,然后创建指定大小的数组argv,调用processMultibulkBuffer将解析结果存入argv中,然后将当前客户端对象的指针传入commandCh 这个channel,让服务端处理解析的指令:
func processInputBuffer(c *redisClient, reader *bufio.Reader, CloseClientCh chan redisClient, commandCh chan redisClient) {
for {
//initialize the array length to -1.
c.multibulklen = -1
//split each string by '\n'
bytes, err := reader.ReadBytes('\n')
c.queryBuf = bytes
if err != nil {
log.Println("the redis client has been closed")
CloseClientCh <- *c
break
}
//throw an exception if '\n' is not preceded by '\r'.
if len(c.queryBuf) == 0 || (len(c.queryBuf) >= 2 && c.queryBuf[len(c.queryBuf)-2] != '\r') {
_, _ = c.conn.Write([]byte("-ERR unknown command\r\n"))
log.Println("ERR unknown command")
continue
}
//If it starts with "*", it indicates a multiline string.
if c.queryBuf[0] == '*' && c.multibulklen == -1 {
//set the request type to multiline
c.reqType = REDIS_REQ_MULTIBULK
//get the length of the array based on the number following '*'
c.multibulklen, err = strconv.ParseInt(string(c.queryBuf[1:len(c.queryBuf)-2]), 10, 32)
//......
//based on the parsed length, initialize the size of the array.
c.argv = make([]string, c.multibulklen)
//based on the length indicated by "*", start parsing the string.
e := processMultibulkBuffer(c, reader, CloseClientCh)
if e != nil {
_, _ = c.conn.Write([]byte("-ERR unknown command\r\n"))
log.Println("ERR unknown command")
continue
} else {
commandCh <- *c
}
} //......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
最后步入processMultibulkBuffer即可看到笔者所说的通过$获取字符串长度,然后基于这个长度得到对应字符串存入argv中,并且每次还会自增一下argc的值,表示数组长度+1:
func processMultibulkBuffer(c *redisClient, reader *bufio.Reader, CloseClientCh chan redisClient) error {
c.argc = 0
//initialize "ll" to record the length following each "$", then fetch the string based on this length.
ll := int64(-1)
//perform a for loop based on "multibulklen".
for i := 0; i < int(c.multibulklen); i++ {
bytes, e := reader.ReadBytes('\n')
c.queryBuf = bytes
//......
//......
//if a "$" is intercepted in this line, store the following numerical value in "ll".
if c.queryBuf[0] == '$' {
ll, e = strconv.ParseInt(string(c.queryBuf[1:len(c.queryBuf)-2]), 10, 32)
if e != nil || ll <= 0 {
return e
}
strBytes, e := reader.ReadBytes('\n')
c.queryBuf = strBytes
//......
//parse and extract a string of specified length based on the value of "ll", store it in "argv", and then increment "argc".
c.argv[c.argc] = string(c.queryBuf[0 : len(c.queryBuf)-2])
c.argc++
} else if c.queryBuf[0] != '$' && ll < 0 { //未解析到长度就遇到其他的字符
return errors.New("ERR unknown command")
}
}
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 用一个ping指令了解mini-redis指令处理过程
如果客户都安传入command指令那么argv数组存入的就是ping指令,上一步步骤通过channel传入交由redis server解析,而redis server的解析逻辑比较简单,它会调用processCommand方法拿到数组中的第一个值,然后到命令表commands中找到是否有对应的处理函数并调用,将结果直接响应给客户端:

对应我们给出服务端指令的入口代码,即位于main.go的的main方法,可以看到笔者创建了一个协程处理commandCh中那些需要进行指令处理的客户端,将其指针传入processCommand进行处理:
go func(s *redisServer) {
//retrieve the Redis client from "commandCh" and call "processCommand" to handle the instructions parsed from the array.
for redisClient := range s.commandCh {
processCommand(&redisClient)
}
}(&server)
2
3
4
5
6
走到processCommand即可看到笔者所说的到命令表中获取指令对应的函数,并将参数传入处理的逻辑,同时笔者也给出目前命令表redisCommandTable :
var redisCommandTable = []redisCommand{
{name: "COMMAND", proc: commandCommand, sflag: "rlt", flag: 0},
{name: "PING", proc: pingCommand, sflag: "rtF", flag: 0},
{name: "SET", proc: setCommand, sflag: "rtF", flag: 0},
{name: "GET", proc: getCommand, sflag: "rtF", flag: 0},
}
func processCommand(c *redisClient) {
//check the command table to see if the specified command exists.
redisCommand, exists := server.commands[strings.ToUpper(c.argv[0])]
if !exists {
c.conn.Write([]byte("-ERR unknown command\r\n"))
return
}
//assign the function of the command to "cmd".
c.cmd = redisCommand
c.lastCmd = redisCommand
//invoke "call" to pass the parameters to the function pointed to by "cmd" for processing.
call(c, REDIS_CALL_FULL)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
这里笔者就以ping指令为例,收到客户端的ping之后,对应处理函数pingCommand从常量池shared中拿到pong的字符串返回给客户端:
func pingCommand(c *redisClient) {
addReply(c, shared.pong)
}
func addReply(c *redisClient, reply string) {
c.conn.Write([]byte(reply))
}
2
3
4
5
6
7
对应的常量池的字符串,笔者这里也给出,读者可自行参阅了解:
func createSharedObjects() {
shared = sharedObjectsStruct{
crlf: "\r\n",
ok: "+OK\r\n",
err: "-ERR\r\n",
pong: "+PONG\r\n",
syntaxerr: "-ERR syntax error\r\n",
nullbulk: "$-1\r\n",
}
}
2
3
4
5
6
7
8
9
10
11
# 小结
自此笔者通过一篇文章, 介绍了笔者所实现的mini-redis中协议解析和指令处理的核心流程,希望对你理解笔者的源码有所帮助。
我是 sharkchili ,CSDN Java 领域博客专家,mini-redis的作者,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。 因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
