一、线程模型(网络连接库的整体实现框架)
1、文件事件处理器
Redis基于Reactor模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以Redis才叫单线程模型。
2、消息处理流程
文件事件处理器使用I/O多路复用(multiplexing)程序来同时监听多个套接字,并根据套接字目前执行的任务来为套接字关联不同的事件处理器。
3、I/O 多路复用程序的实现
Redis的I/O多路复用程序的所有功能是通过包装select、epoll、evport和kqueue这些I/O多路复用函数库来实现的,每个I/O多路复用函数库在Redis源码中都对应一个单独的文件,比如ae_select.c、ae_epoll.c、ae_kqueue.c等。
因为Redis为每个I/O多路复用函数库都实现了相同的API,所以I/O多路复用程序的底层实现是可以互换的,如下图所示。
Redis在I/O多路复用程序的实现源码ae.c中用#include宏定义了相应的规则,程序会在编译时自动选择系统中性能最好的I/O多路复用函数库来作为Redis的I/O多路复用程序的底层实现:
1 /**
2 * Include the best multiplexing layer supported by this system.
3 * The following should be ordered by performances, descending.
4 * 包括该系统支持的最佳复用层。性能依次下降
5 */
6 #ifdef HAVE_EVPORT
7 #include "ae_evport.c"
8 #else
9 #ifdef HAVE_EPOLL
10 #include "ae_epoll.c"
11 #else
12 #ifdef HAVE_KQUEUE
13 #include "ae_kqueue.c"
14 #else
15 #include "ae_select.c"
16 #endif
17 #endif
18 #endif
4、文件事件的类型
I/O 多路复用程序可以监听多个套接字的ae.h/AE_READABLE事件和ae.h/AE_WRITABLE事件,这两类事件和套接字操作之间的对应关系如下:
事件类型
1 #define AE_NONE 0 /* No events registered. 没有事件*/
2 #define AE_READABLE 1 /* Fire when descriptor is readable. 当描述符可读时触发AE_READABLE事件。*/
3 #define AE_WRITABLE 2 /* Fire when descriptor is writable. 当描述符可写时触发。*/
4 /**
5 * With WRITABLE, never fire the event if the READABLE event already fired in the same event
6 * loop iteration. Useful when you want to persist things to disk before sending replies, and want
7 * to do that in a group fashion.
8 * 如果 READABLE 事件已在同一事件循环迭代中触发,则此时绝不会触发WRITABLE事件,
9 * 当您想在发送回复之前将内容持久保存到磁盘时很有用,并且希望以组方式进行。
10 *
11 * 但是,网络 IO 事件注册的时候,除了正常的读写事件外,还可以注册一个 AE_BARRIER 事件,
12 * 这个事件就是会影响到先读后写的处理顺序。
13 *
14 * 如果某个 fd 的 mask 包含了 AE_BARRIER,那它的处理顺序会是 先写后读。
15 *
16 * 针对这个场景,redis 举的例子是,如果在 beforesleep 回调中进行了 fsync 动作,
17 * 然后需要把结果快速回复给 client。这个情况下就需要用到 AE_BARRIER 事件,用来翻转处理事件顺序了。
18 */
19 #define AE_BARRIER 4
5、文件事件的处理器
Redis为文件事件编写了多个处理器,这些事件处理器分别用于实现不同的网络通讯需求,常用的处理器如下:
6、连接应答处理器
networking.c中acceptTcpHandler函数是Redis的连接应答处理器,这个处理器用于对连接服务器监听套接字的客户端进行应答,具体实现为sys/socket.h/accept函数的包装。
当Redis服务器进行初始化的时候,程序会将这个连接应答处理器和服务器监听套接字的AE_READABLE事件关联起来,当有客户端用sys/socket.h/connect函数连接服务器监听套接字的时候, 套接字就会产生AE_READABLE 事件, 引发连接应答处理器执行, 并执行相应的套接字应答操作,如图所示。
7、命令请求处理器
networking.c中readQueryFromClient函数是Redis的命令请求处理器,这个处理器负责从套接字中读入客户端发送的命令请求内容, 具体实现为unistd.h/read函数的包装。
当一个客户端通过连接应答处理器成功连接到服务器之后, 服务器会将客户端套接字的AE_READABLE事件和命令请求处理器关联起来,当客户端向服务器发送命令请求的时候,套接字就会产生 AE_READABLE事件,引发命令请求处理器执行,并执行相应的套接字读入操作,如图所示。
在客户端连接服务器的整个过程中,服务器都会一直为客户端套接字的AE_READABLE事件关联命令请求处理器。
8、命令回复处理器
networking.c中sendReplyToClient函数是Redis的命令回复处理器,这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端,具体实现为unistd.h/write函数的包装。
当服务器有命令回复需要传送给客户端的时候,服务器会将客户端套接字的AE_WRITABLE事件和命令回复处理器关联起来,当客户端准备好接收服务器传回的命令回复时,就会产生AE_WRITABLE事件,引发命令回复处理器执行,并执行相应的套接字写入操作, 如图所示。
当命令回复发送完毕之后, 服务器就会解除命令回复处理器与客户端套接字的 AE_WRITABLE 事件之间的关联。
9、一次完整的客户端与服务器连接事件示例
假设Redis服务器正在运作,那么这个服务器的监听套接字的AE_READABLE事件应该正处于监听状态之下,而该事件所对应的处理器为连接应答处理器。
如果这时有一个Redis客户端向Redis服务器发起连接,那么监听套接字将产生AE_READABLE事件, 触发连接应答处理器执行:处理器会对客户端的连接请求进行应答, 然后创建客户端套接字,以及客户端状态,并将客户端套接字的 AE_READABLE 事件与命令请求处理器进行关联,使得客户端可以向主服务器发送命令请求。
之后,客户端向Redis服务器发送一个命令请求,那么客户端套接字将产生 AE_READABLE事件,引发命令请求处理器执行,处理器读取客户端的命令内容, 然后传给相关程序去执行。
执行命令将产生相应的命令回复,为了将这些命令回复传送回客户端,服务器会将客户端套接字的AE_WRITABLE事件与命令回复处理器进行关联:当客户端尝试读取命令回复的时候,客户端套接字将产生AE_WRITABLE事件, 触发命令回复处理器执行, 当命令回复处理器将命令回复全部写入到套接字之后, 服务器就会解除客户端套接字的AE_WRITABLE事件与命令回复处理器之间的关联。
二、实现
1. Redis网络连接库介绍
Redis网络连接库对应的文件是networking.c。这个文件主要负责
Redis 命令执行过程:
我们接下来就这几块内容分别列出源码,进行剖析。
2. 客户端的创建与释放
redis 网络链接库的源码详细注释,链接:https://github.com/menwenjun/redis_source_annotation/blob/master/networking.c
2.1客户端的创建
Redis 服务器是一个同时与多个客户端建立连接的程序。当客户端连接上服务器时,服务器会建立一个server.h/client
结构来保存客户端的状态信息。所以在客户端创建时,就会初始化这样一个结构,客户端的创建源码如下:
1 client *createClient(int fd) {
2 //分配空间
3 client *c = zmalloc(sizeof(client));
4
5 /**
6 * passing -1 as fd it is possible to create a non connected client.
7 * This is useful since all the commands needs to be executed
8 * in the context of a client. When commands are executed in other
9 * contexts (for instance a Lua script) we need a non connected client.
10 *
11 * 如果fd为-1,表示创建的是一个无网络连接的伪客户端,用于执行lua脚本的时候。
12 * 如果fd不等于-1,表示创建一个有网络连接的客户端
13 */
14 if (fd != -1) {
15 // 设置fd为非阻塞模式
16 anetNonBlock(NULL,fd);
17 // 禁止使用 Nagle 算法,client向内核递交的每个数据包都会立即发送给server出去,TCP_NODELAY
18 anetEnableTcpNoDelay(NULL,fd);
19 // 如果开启了tcpkeepalive,则设置 SO_KEEPALIVE
20 if (server.tcpkeepalive)
21 anetKeepAlive(NULL,fd,server.tcpkeepalive);// 设置tcp连接的keep alive选项
22 /**
23 * 使能AE_READABLE事件,readQueryFromClient是该事件的回调函数
24 *
25 * 创建一个文件事件状态el,且监听读事件,开始接受命令的输入
26 */
27 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
28 readQueryFromClient, c) == AE_ERR)
29 {
30 close(fd);
31 zfree(c);
32 return NULL;
33 }
34 }
35
36 // 默认选0号数据库
37 selectDb(c,0);
38 uint64_t client_id;
39 // 设置client的ID
40 atomicGetIncr(server.next_client_id,client_id,1);
41 c->id = client_id;
42 // client的套接字
43 c->fd = fd;
44 // client的名字
45 c->name = NULL;
46 // 回复固定(静态)缓冲区的偏移量
47 c->bufpos = 0;
48 c->qb_pos = 0;
49 // 输入缓存区
50 c->querybuf = sdsempty();
51 c->pending_querybuf = sdsempty();
52 // 输入缓存区的峰值
53 c->querybuf_peak = 0;
54 // 请求协议类型,内联或者多条命令,初始化为0
55 c->reqtype = 0;
56 // 参数个数
57 c->argc = 0;
58 // 参数列表
59 c->argv = NULL;
60 // 当前执行的命令和最近一次执行的命令
61 c->cmd = c->lastcmd = NULL;
62 // 查询缓冲区剩余未读取命令的数量
63 c->multibulklen = 0;
64 // 读入参数的长度
65 c->bulklen = -1;
66 // 已发的字节数
67 c->sentlen = 0;
68 // client的状态
69 c->flags = 0;
70 // 设置创建client的时间和最后一次互动的时间
71 c->ctime = c->lastinteraction = server.unixtime;
72 // 认证状态
73 c->authenticated = 0;
74 // replication复制的状态,初始为无
75 c->replstate = REPL_STATE_NONE;
76 // 设置从节点的写处理器为ack,是否在slave向master发送ack
77 c->repl_put_online_on_ack = 0;
78 // replication复制的偏移量
79 c->reploff = 0;
80 c->read_reploff = 0;
81 // 通过ack命令接收到的偏移量
82 c->repl_ack_off = 0;
83 // 通过ack命令接收到的偏移量所用的时间
84 c->repl_ack_time = 0;
85 // 从节点的端口号
86 c->slave_listening_port = 0;
87 // 从节点IP地址
88 c->slave_ip[0] = ‘\0‘;
89 // 从节点的功能
90 c->slave_capa = SLAVE_CAPA_NONE;
91 // 回复链表
92 c->reply = listCreate();
93 // 回复链表的字节数
94 c->reply_bytes = 0;
95 // 回复缓冲区的内存大小软限制
96 c->obuf_soft_limit_reached_time = 0;
97 // 回复链表的释放和复制方法
98 listSetFreeMethod(c->reply,freeClientReplyValue);
99 listSetDupMethod(c->reply,dupClientReplyValue);
100 // 阻塞类型
101 c->btype = BLOCKED_NONE;
102 // 阻塞超过时间
103 c->bpop.timeout = 0;
104 // 造成阻塞的键字典
105 c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
106 // 存储解除阻塞的键,用于保存PUSH入元素的键,也就是dstkey
107 c->bpop.target = NULL;
108 c->bpop.xread_group = NULL;
109 c->bpop.xread_consumer = NULL;
110 c->bpop.xread_group_noack = 0;
111 // 阻塞状态
112 c->bpop.numreplicas = 0;
113 // 要达到的复制偏移量
114 c->bpop.reploffset = 0;
115 // 全局的复制偏移量
116 c->woff = 0;
117 // 监控的键
118 c->watched_keys = listCreate();
119 // 订阅频道
120 c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
121 // 订阅模式
122 c->pubsub_patterns = listCreate();
123 // 被缓存的peerid,peerid就是 ip:port
124 c->peerid = NULL;
125 c->client_list_node = NULL;
126 // 订阅发布模式的释放和比较方法
127 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
128 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
129 // 将真正的client放在服务器的客户端链表中
130 if (fd != -1)
131 linkClient(c);//将当前客户端加入全局的链表中
132 // 初始化client的事物状态
133 initClientMultiState(c);
134 return c;
135 }
根据传入的文件描述符fd,可以创建用于不同情景下的client。这个fd就是服务器接收客户端connect后所返回的文件描述符。
其中,Nagle
算法能自动连接许多的小缓冲器消息,这一过程(称为nagling)通过减少必须发送包的个数来增加网络软件系统的效率。但是服务器和客户端的对即使通信性有很高的要求,因此禁止使用 Nagle 算法,客户端向内核递交的每个数据包都会立即发送给服务器。
创建客户端的过程,会将server.h/client结构的所有成员初始化,接下里会介绍部分重点的成员。
用于保存服务器接受客户端命令的成员:
用于保存服务器给客户端回复的成员:
客户端结构体,位于server.h
1 /**
2 * With multiplexing we need to take per-client state.
3 * Clients are taken in a linked list.
4 * 对于多路复用,我们需要获取每个客户端的状态。 客户端被放入一个链表中。
5 * 封装的客户端结构体
6 */
7 typedef struct client {
8 uint64_t id; /* Client incremental unique ID. */
9 //客户端的套接字,伪客户端的为-1(AOF,Lua脚本),其它为普通客户端
10 int fd; /* Client socket. */ 11 //客户端正在使用的redis数据库指针
12 redisDb *db; /* Pointer to currently SELECTed DB. */
13 //客户端的名字,使用client setname 来设置名称
14 robj *name; /* As set by CLIENT SETNAME. */
15 //输入缓存区
16 sds querybuf; /* Buffer we use to accumulate client queries. 我们用来存储客户端请求的缓冲区*/
17 size_t qb_pos; /* The position we have read in querybuf. 我们在 querybuf 中已经读取的位置。*/
18 sds pending_querybuf; /* If this client is flagged as master, this buffer
19 represents the yet not applied portion of the
20 replication stream that we are receiving from
21 the master.如果此客户端被标记为 master,
22 则此缓冲区表示我们从 master 接收的复制流中尚未应用的部分。 */
23 size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. 查询缓冲区的最近(100 毫秒或更多)峰值*/
24 int argc; /* Num of arguments of current command. 命令参数个数*/
25 robj **argv; /* Arguments of current command. 命令参数*/
26 // 记录被客户端执行的命令,最后一个被执行的命令
27 struct redisCommand *cmd, *lastcmd; /* Last command executed. */
28 /**
29 * telnet命令还是redis-cli命令
30 */
31 int reqtype; /* Request protocol type: PROTO_REQ_* */
32 // 剩下要读取的所有参数的数量。
33 int multibulklen; /* Number of multi bulk arguments left to read. */
34 // 命令内容的长度
35 long bulklen; /* Length of bulk argument in multi bulk request. */
36 // 回复链表
37 list *reply; /* List of reply objects to send to the client. */
38 // 回复链表中对象的总大小
39 unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
40 // 已发送字节,处理 short write 用
41 size_t sentlen; /* Amount of bytes already sent in the current
42 buffer or object being sent. */
43 //创建客户端的时间
44 time_t ctime; /* Client creation time. */
45 //最后一次交互时间,可以用来计算该连接的空转时间
46 time_t lastinteraction; /* Time of the last interaction, used for timeout */
47 //缓冲区第一次达到软性限制的时间
48 time_t obuf_soft_limit_reached_time;
49 //redis标志位
50 int flags; /* Client flags: CLIENT_* macros. */
51 /**
52 * 身份验证,0表示未通过身份验证,1表示通过身份验证。仅在服务端启用了安全验证时才用,若启用
53 * 未通过验证,则只能使用AUTH命令,其它均无法使用
54 */
55 int authenticated; /* When requirepass is non-NULL. */
56 // 如果是从站,代表的是复制状态
57 int replstate; /* Replication state if this is a slave. */
58
59 int repl_put_online_on_ack; /* Install slave write handler on ACK. */
60 // 用于保存主服务器传来的 RDB 文件的文件描述符
61 int repldbfd; /* Replication DB file descriptor. */
62 // 读取主服务器传来的 RDB 文件的偏移量
63 off_t repldboff; /* Replication DB file offset. */
64 // 主服务器传来的 RDB 文件的大小
65 off_t repldbsize; /* Replication DB file size. */
66 sds replpreamble; /* Replication DB preamble. */
67 long long read_reploff; /* Read replication offset if this is a master. */
68 // 主服务器的复制偏移量
69 long long reploff; /* Applied replication offset if this is a master. */
70 // 从服务器最后一次发送 REPLCONF ACK 时的偏移量
71 long long repl_ack_off; /* Replication ack offset, if this is a slave. */
72 // 从服务器最后一次发送 REPLCONF ACK 的时间
73 long long repl_ack_time;/* Replication ack time, if this is a slave. */
74 long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
75 copying this slave output buffer
76 should use. */
77 /**
78 * 主服务器的 master run ID
79 * 保存在客户端,用于执行部分重同步
80 */
81 char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
82 // 从服务器的监听端口号
83 int slave_listening_port; /* As configured with: SLAVECONF listening-port */
84 // 从服务器的监听IP
85 char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
86 // 从节点的功能
87 int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
88 // 事务状态
89 multiState mstate; /* MULTI/EXEC state */
90 // 阻塞类型
91 int btype; /* Type of blocking op if CLIENT_BLOCKED. */
92 // 阻塞状态
93 blockingState bpop; /* blocking state */
94 // 最后被写入的全局复制偏移量
95 long long woff; /* Last write global replication offset. */
96 // 被监视的键
97 list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
98 /**
99 * 这个字典记录了客户端所有订阅的频道
100 * 键为频道名字,值为 NULL
101 * 也即是,一个频道的集合
102 */
103 dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
104 /**
105 * 链表,包含多个 pubsubPattern 结构
106 * 记录了所有订阅频道的客户端的信息
107 * 新 pubsubPattern 结构总是被添加到表尾
108 */
109 list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
110 sds peerid; /* Cached peer ID. */
111 listNode *client_list_node; /* list node in client list */
112
113 /* Response buffer */
114 // 回复偏移量
115 int bufpos;
116 //默认大小为16k,回复缓冲区
117 char buf[PROTO_REPLY_CHUNK_BYTES];
118 } client;
1 /**
2 * This structure holds the blocking operation state for a client.
3 * The fields used depend on client->btype.
4 * 此结构保存客户端的阻塞操作状态。 使用的字段取决于 client->btype。
5 */
6 typedef struct blockingState {
7 /* Generic fields. */
8 // 阻塞超过时间
9 mstime_t timeout; /* Blocking operation timeout. If UNIX current time
10 * is > timeout then the operation timed out. */
11
12 /* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */
13 // 造成阻塞的键字典
14 dict *keys; /* The keys we are waiting to terminate a blocking
15 * operation such as BLPOP or XREAD. Or NULL. */
16 // 存储解除阻塞的键,用于保存PUSH入元素的键,也就是dstkey
17 robj *target; /* The key that should receive the element,
18 * for BRPOPLPUSH. */
19
20 /* BLOCK_STREAM */
21 size_t xread_count; /* XREAD COUNT option. */
22 robj *xread_group; /* XREADGROUP group name. */
23 robj *xread_consumer; /* XREADGROUP consumer name. */
24 mstime_t xread_retry_time, xread_retry_ttl;
25 int xread_group_noack;
26
27 /* BLOCKED_WAIT */
28 // 阻塞状态
29 int numreplicas; /* Number of replicas we are waiting for ACK. */
30 // 要达到的复制偏移量
31 long long reploffset; /* Replication offset to reach. */
32
33 /* BLOCKED_MODULE */
34 void *module_blocked_handle; /* RedisModuleBlockedClient structure.
35 which is opaque for the Redis core, only
36 handled in module.c. */
37 } blockingState;
2.2 客户端的释放
客户端的释放freeClient()
函数主要就是释放各种数据结构和清空一些缓冲区等等操作,这里就不列出源码。但是我们关注一下异步释放客户端。源码如下:
1 /**
2 * Schedule a client to free it at a safe time in the serverCron() function.
3 * This function is useful when we need to terminate a client but we are in
4 * a context where calling freeClient() is not possible, because the client
5 * should be valid for the continuation of the flow of the program.
6 * 异步释放client
7 */
8 void freeClientAsync(client *c) {
9 // 如果是已经即将关闭或者是lua脚本的伪client,则直接返回
10 if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
11 c->flags |= CLIENT_CLOSE_ASAP;
12 // 将client加入到即将关闭的client链表中
13 listAddNodeTail(server.clients_to_close,c);
14 }
其中server.clients_to_close:是服务器保存所有待关闭的client链表,redisServer结构体如下,位于server.h
1 struct redisServer { 2 /* General */ 3 pid_t pid; /* Main process pid. */ 4 // 配置文件的绝对路径 5 char *configfile; /* Absolute config file path, or NULL */ 6 char *executable; /* Absolute executable file path. */ 7 char **exec_argv; /* Executable argv vector (copy). */ 8 int dynamic_hz; /* Change hz value depending on # of clients. */ 9 int config_hz; /* Configured HZ value. May be different than 10 the actual ‘hz‘ field value if dynamic-hz 11 is enabled. */ 12 // serverCron() 每秒调用的次数 13 int hz; /* serverCron() calls frequency in hertz */ 14 // 数据库 15 redisDb *db; 16 // 命令表(受到 rename 配置选项的作用) 17 dict *commands; /* Command table */ 18 // 命令表(无 rename 配置选项的作用) 19 dict *orig_commands; /* Command table before command renaming. */ 20 // 事件状态,网络模型底层结构封装 21 aeEventLoop *el; 22 // 最近一次使用时钟 23 unsigned int lruclock; /* Clock for LRU eviction */ 24 // 关闭服务器的标识 25 int shutdown_asap; /* SHUTDOWN needed ASAP */ 26 // 在执行 serverCron() 时进行渐进式 rehash 27 int activerehashing; /* Incremental rehash in serverCron() */ 28 int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ 29 // 是否设置了密码 30 char *requirepass; /* Pass for AUTH command, or NULL */ 31 // PID 文件 32 char *pidfile; /* PID file path */ 33 // 架构类型 34 int arch_bits; /* 32 or 64 depending on sizeof(long) */ 35 // serverCron() 函数的运行次数计数器 36 int cronloops; /* Number of times the cron function run */ 37 // 本服务器的 RUN ID 38 char runid[CONFIG_RUN_ID_SIZE+1]; /* ID always different at every exec. */ 39 // 服务器是否运行在 SENTINEL 模式 40 int sentinel_mode; /* True if this instance is a Sentinel. */ 41 size_t initial_memory_usage; /* Bytes used after initialization. */ 42 int always_show_logo; /* Show logo even for non-stdout logging. */ 43 /* Modules */ 44 dict *moduleapi; /* Exported APIs dictionary for modules. */ 45 list *loadmodule_queue; /* List of modules to load at startup. */ 46 int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a 47 client blocked on a module command needs 48 to be processed. */ 49 /* Networking */ 50 // TCP 监听端口 51 int port; /* TCP listening port */ 52 // 允许监听的文件描述符 53 int tcp_backlog; /* TCP listen() backlog */ 54 // 地址 55 char *bindaddr[CONFIG_BINDADDR_MAX]; /* Addresses we should bind to */ 56 // 地址数量 57 int bindaddr_count; /* Number of addresses in server.bindaddr[] */ 58 // UNIX 套接字 59 char *unixsocket; /* UNIX socket path */ 60 // UNIX 套接字权限 61 mode_t unixsocketperm; /* UNIX socket permission */ 62 // 描述符 63 int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ 64 // 描述符数量 65 int ipfd_count; /* Used slots in ipfd[] */ 66 int sofd; /* Unix socket file descriptor */ 67 int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */ 68 int cfd_count; /* Used slots in cfd[] */ 69 // 一个链表,保存了所有客户端状态结构 70 list *clients; /* List of active clients */ 71 // 链表,保存了所有待关闭的客户端 72 list *clients_to_close; /* Clients to close asynchronously */ 73 list *clients_pending_write; /* There is to write or install handler. */ 74 // 链表,保存了所有从服务器,以及所有监视器 75 list *slaves, *monitors; /* List of slaves and MONITORs */ 76 // 服务器的当前客户端,仅用于崩溃报告 77 client *current_client; /* Current client, only used on crash report */ 78 // 基于客户端ID的激活客户端字典 79 rax *clients_index; /* Active clients dictionary by client ID. */ 80 //如果当前的客户端正处于阻塞状态,就将其设置为true 81 int clients_paused; /* True if clients are currently paused */ 82 mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ 83 // 网络错误 84 char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ 85 // MIGRATE 缓存 86 dict *migrate_cached_sockets;/* MIGRATE cached sockets */ 87 uint64_t next_client_id; /* Next client unique ID. Incremental. */ 88 int protected_mode; /* Don‘t accept external connections. */ 89 /* RDB / AOF loading information */ 90 int loading; /* We are loading data from disk if true */ 91 off_t loading_total_bytes; 92 off_t loading_loaded_bytes; 93 time_t loading_start_time; 94 off_t loading_process_events_interval_bytes; 95 /* Fast pointers to often looked up command */ 96 struct redisCommand *delCommand, *multiCommand, *lpushCommand, 97 *lpopCommand, *rpopCommand, *zpopminCommand, 98 *zpopmaxCommand, *sremCommand, *execCommand, 99 *expireCommand, *pexpireCommand, *xclaimCommand, 100 *xgroupCommand; 101 /* Fields used only for stats */ 102 time_t stat_starttime; /* Server start time */ 103 long long stat_numcommands; /* Number of processed commands */ 104 long long stat_numconnections; /* Number of connections received */ 105 long long stat_expiredkeys; /* Number of expired keys */ 106 double stat_expired_stale_perc; /* Percentage of keys probably expired */ 107 long long stat_expired_time_cap_reached_count; /* Early expire cylce stops.*/ 108 long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ 109 long long stat_keyspace_hits; /* Number of successful lookups of keys */ 110 long long stat_keyspace_misses; /* Number of failed lookups of keys */ 111 long long stat_active_defrag_hits; /* number of allocations moved */ 112 long long stat_active_defrag_misses; /* number of allocations scanned but not moved */ 113 long long stat_active_defrag_key_hits; /* number of keys with moved allocations */ 114 long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */ 115 long long stat_active_defrag_scanned; /* number of dictEntries scanned */ 116 size_t stat_peak_memory; /* Max used memory record */ 117 long long stat_fork_time; /* Time needed to perform latest fork() */ 118 double stat_fork_rate; /* Fork rate in GB/sec. */ 119 long long stat_rejected_conn; /* Clients rejected because of maxclients */ 120 long long stat_sync_full; /* Number of full resyncs with slaves. */ 121 long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ 122 long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */ 123 list *slowlog; /* SLOWLOG list of commands */ 124 long long slowlog_entry_id; /* SLOWLOG current entry ID */ 125 long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ 126 unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ 127 struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ 128 long long stat_net_input_bytes; /* Bytes read from network. */ 129 long long stat_net_output_bytes; /* Bytes written to network. */ 130 size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ 131 size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ 132 /* The following two are used to track instantaneous metrics, like 133 * number of operations per second, network traffic. */ 134 struct { 135 long long last_sample_time; /* Timestamp of last sample in ms */ 136 long long last_sample_count;/* Count in last sample */ 137 long long samples[STATS_METRIC_SAMPLES]; 138 int idx; 139 } inst_metric[STATS_METRIC_COUNT]; 140 /* Configuration */ 141 int verbosity; /* Loglevel in redis.conf */ 142 int maxidletime; /* Client timeout in seconds */ 143 int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ 144 int active_expire_enabled; /* Can be disabled for testing purposes. */ 145 int active_defrag_enabled; 146 size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ 147 int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */ 148 int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ 149 int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ 150 int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ 151 unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ 152 size_t client_max_querybuf_len; /* Limit for client query buffer length */ 153 int dbnum; /* Total number of configured DBs */ 154 int supervised; /* 1 if supervised, 0 otherwise. */ 155 int supervised_mode; /* See SUPERVISED_* */ 156 int daemonize; /* True if running as a daemon */ 157 clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT]; 158 /* AOF persistence */ 159 // AOF 状态(开启/关闭/可写) 160 int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ 161 // 所使用的 fsync 策略(每个写入/每秒/从不) 162 int aof_fsync; /* Kind of fsync() policy */ 163 char *aof_filename; /* Name of the AOF file */ 164 int aof_no_fsync_on_rewrite; /* Don‘t fsync if a rewrite is in prog. */ 165 int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */ 166 off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ 167 // 最后一次执行 BGREWRITEAOF 时, AOF 文件的大小 168 off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ 169 // AOF 文件的当前字节大小 170 off_t aof_current_size; /* AOF current size. */ 171 int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ 172 // 负责进行 AOF 重写的子进程 ID 173 pid_t aof_child_pid; /* PID if rewriting process */ 174 // AOF 重写缓存链表,链接着多个缓存块 175 list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ 176 // AOF 缓冲区 177 sds aof_buf; /* AOF buffer, written before entering the event loop */ 178 // AOF 文件的描述符 179 int aof_fd; /* File descriptor of currently selected AOF file */ 180 // AOF 的当前目标数据库 181 int aof_selected_db; /* Currently selected DB in AOF */ 182 // 推迟 write 操作的时间 183 time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ 184 // 最后一直执行 fsync 的时间 185 time_t aof_last_fsync; /* UNIX time of last fsync() */ 186 time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ 187 // AOF 重写的开始时间 188 time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ 189 // 最后一次执行 BGREWRITEAOF 的结果 190 int aof_lastbgrewrite_status; /* C_OK or C_ERR */ 191 // 记录 AOF 的 write 操作被推迟了多少次 192 unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ 193 // 指示是否需要每写入一定量的数据,就主动执行一次 fsync() 194 int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */ 195 int rdb_save_incremental_fsync; /* fsync incrementally while rdb saving? */ 196 int aof_last_write_status; /* C_OK or C_ERR */ 197 int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */ 198 int aof_load_truncated; /* Don‘t stop on unexpected AOF EOF. */ 199 int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */ 200 /* AOF pipes used to communicate between parent and child during rewrite. */ 201 int aof_pipe_write_data_to_child; 202 int aof_pipe_read_data_from_parent; 203 int aof_pipe_write_ack_to_parent; 204 int aof_pipe_read_ack_from_child; 205 int aof_pipe_write_ack_to_child; 206 int aof_pipe_read_ack_from_parent; 207 int aof_stop_sending_diff; /* If true stop sending accumulated diffs 208 to child process. */ 209 sds aof_child_diff; /* AOF diff accumulator child side. */ 210 /* RDB persistence */ 211 // 自从上次 SAVE 执行以来,数据库被修改的次数 212 long long dirty; /* Changes to DB from the last save */ 213 // BGSAVE 执行前的数据库被修改次数 214 long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ 215 /** 216 * 负责执行 BGSAVE 的子进程的 ID 217 * 没在执行 BGSAVE 时,设为 -1 218 */ 219 pid_t rdb_child_pid; /* PID of RDB saving child */ 220 struct saveparam *saveparams; /* Save points array for RDB */ 221 int saveparamslen; /* Number of saving points */ 222 char *rdb_filename; /* Name of RDB file */ 223 int rdb_compression; /* Use compression in RDB? */ 224 int rdb_checksum; /* Use RDB checksum? */ 225 // 最后一次完成 SAVE 的时间 226 time_t lastsave; /* Unix time of last successful save */ 227 // 最后一次尝试执行 BGSAVE 的时间 228 time_t lastbgsave_try; /* Unix time of last attempted bgsave */ 229 // 最近一次 BGSAVE 执行耗费的时间 230 time_t rdb_save_time_last; /* Time used by last RDB save run. */ 231 // 数据库最近一次开始执行 BGSAVE 的时间 232 time_t rdb_save_time_start; /* Current RDB save start time. */ 233 int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */ 234 int rdb_child_type; /* Type of save by active child. */ 235 // 最后一次执行 SAVE 的状态 236 int lastbgsave_status; /* C_OK or C_ERR */ 237 int stop_writes_on_bgsave_err; /* Don‘t allow writes if can‘t BGSAVE */ 238 int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ 239 int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ 240 /* Pipe and data structures for child -> parent info sharing. */ 241 int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ 242 struct { 243 int process_type; /* AOF or RDB child? */ 244 size_t cow_size; /* Copy on write size. */ 245 unsigned long long magic; /* Magic value to make sure data is valid. */ 246 } child_info_data; 247 /* Propagation of commands in AOF / replication */ 248 redisOpArray also_propagate; /* Additional command to propagate. */ 249 /* Logging */ 250 char *logfile; /* Path of log file */ 251 int syslog_enabled; /* Is syslog enabled? */ 252 char *syslog_ident; /* Syslog ident */ 253 int syslog_facility; /* Syslog facility */ 254 /* Replication (master) */ 255 char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ 256 char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ 257 // 全局复制偏移量(一个累计值) 258 long long master_repl_offset; /* My current replication offset */ 259 long long second_replid_offset; /* Accept offsets up to this for replid2. */ 260 int slaveseldb; /* Last SELECTed DB in replication output */ 261 // 主服务器发送 PING 的频率 262 int repl_ping_slave_period; /* Master pings the slave every N seconds */ 263 // backlog 本身 264 char *repl_backlog; /* Replication backlog for partial syncs */ 265 // backlog 的长度 266 long long repl_backlog_size; /* Backlog circular buffer size */ 267 // backlog 中数据的长度 268 long long repl_backlog_histlen; /* Backlog actual data length */ 269 // backlog 的当前索引 270 long long repl_backlog_idx; /* Backlog circular buffer current offset, 271 that is the next byte will‘ll write to.*/ 272 // backlog 中可以被还原的第一个字节的偏移量 273 long long repl_backlog_off; /* Replication "master offset" of first 274 byte in the replication backlog buffer.*/ 275 // backlog 的过期时间 276 time_t repl_backlog_time_limit; /* Time without slaves after the backlog 277 gets released. */ 278 // 距离上一次有从服务器的时间 279 time_t repl_no_slaves_since; /* We have no slaves since that time. 280 Only valid if server.slaves len is 0. */ 281 // 是否开启最小数量从服务器写入功能 282 int repl_min_slaves_to_write; /* Min number of slaves to write. */ 283 // 定义最小数量从服务器的最大延迟值 284 int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */ 285 // 延迟良好的从服务器的数量 286 int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ 287 int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ 288 int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ 289 /* Replication (slave) */ 290 // 主服务器的验证密码 291 char *masterauth; /* AUTH with this password with master */ 292 // 主服务器的地址 293 char *masterhost; /* Hostname of master */ 294 // 主服务器的端口 295 int masterport; /* Port of master */ 296 // 超时时间 297 int repl_timeout; /* Timeout after N seconds of master idle */ 298 // 主服务器所对应的客户端 299 redisClient *master; /* Client that is master for this slave */ 300 // 被缓存的主服务器,PSYNC 时使用 301 redisClient *cached_master; /* Cached master to be reused for PSYNC. */ 302 int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ 303 // 复制的状态(服务器是从服务器时使用) 304 int repl_state; /* Replication status if the instance is a slave */ 305 // RDB 文件的大小 306 off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ 307 // 已读 RDB 文件内容的字节数 308 off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ 309 // 最近一次执行 fsync 时的偏移量 310 // 用于 sync_file_range 函数 311 off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ 312 // 主服务器的套接字 313 int repl_transfer_s; /* Slave -> Master SYNC socket */ 314 // 保存 RDB 文件的临时文件的描述符 315 int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ 316 // 保存 RDB 文件的临时文件名字 317 char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ 318 // 最近一次读入 RDB 内容的时间 319 time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ 320 int repl_serve_stale_data; /* Serve stale data when link is down? */ 321 // 是否只读从服务器? 322 int repl_slave_ro; /* Slave is read only? */ 323 int repl_slave_ignore_maxmemory; /* If true slaves do not evict. */ 324 // 连接断开的时长 325 time_t repl_down_since; /* Unix time at which link with master went down */ 326 // 是否要在 SYNC 之后关闭 NODELAY ? 327 int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ 328 // 从服务器优先级 329 int slave_priority; /* Reported in INFO and used by Sentinel. */ 330 int slave_announce_port; /* Give the master this listening port. */ 331 char *slave_announce_ip; /* Give the master this ip address. */ 332 /* The following two fields is where we store master PSYNC replid/offset 333 * while the PSYNC is in progress. At the end we‘ll copy the fields into 334 * the server->master client structure. */ 335 // 本服务器(从服务器)当前主服务器的 RUN ID 336 char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ 337 // 初始化偏移量 338 long long master_initial_offset; /* Master PSYNC offset. */ 339 int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ 340 /* Replication script cache. */ 341 // 复制脚本缓存 342 // 字典 343 dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ 344 // FIFO 队列 345 list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ 346 // 缓存的大小 347 int repl_scriptcache_size; /* Max number of elements. */ 348 /* Synchronous replication. */ 349 list *clients_waiting_acks; /* Clients waiting in WAIT command. */ 350 int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ 351 /* Limits */ 352 unsigned int maxclients; /* Max number of simultaneous clients */ 353 unsigned long long maxmemory; /* Max number of memory bytes to use */ 354 int maxmemory_policy; /* Policy for key eviction */ 355 int maxmemory_samples; /* Pricision of random sampling */ 356 int lfu_log_factor; /* LFU logarithmic counter factor. */ 357 int lfu_decay_time; /* LFU counter decay factor. */ 358 long long proto_max_bulk_len; /* Protocol bulk length maximum size. */ 359 /* Blocked clients */ 360 unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/ 361 unsigned int blocked_clients_by_type[BLOCKED_NUM]; 362 list *unblocked_clients; /* list of clients to unblock before next loop */ 363 list *ready_keys; /* List of readyList structures for BLPOP & co */ 364 /* Sort parameters - qsort_r() is only available under BSD so we 365 * have to take this state global, in order to pass it to sortCompare() */ 366 int sort_desc; 367 int sort_alpha; 368 int sort_bypattern; 369 int sort_store; 370 /* Zip structure config, see redis.conf for more information */ 371 size_t hash_max_ziplist_entries; 372 size_t hash_max_ziplist_value; 373 size_t set_max_intset_entries; 374 size_t zset_max_ziplist_entries; 375 size_t zset_max_ziplist_value; 376 size_t hll_sparse_max_bytes; 377 size_t stream_node_max_bytes; 378 int64_t stream_node_max_entries; 379 /* List parameters */ 380 int list_max_ziplist_size; 381 int list_compress_depth; 382 /* time cache */ 383 time_t unixtime; /* Unix time sampled every cron cycle. */ 384 time_t timezone; /* Cached timezone. As set by tzset(). */ 385 int daylight_active; /* Currently in daylight saving time. */ 386 long long mstime; /* Like ‘unixtime‘ but with milliseconds resolution. */ 387 /* Pubsub */ 388 // 字典,键为频道,值为链表 389 // 链表中保存了所有订阅某个频道的客户端 390 // 新客户端总是被添加到链表的表尾 391 dict *pubsub_channels; /* Map channels to list of subscribed clients */ 392 // 这个链表记录了客户端订阅的所有模式的名字 393 list *pubsub_patterns; /* A list of pubsub_patterns */ 394 int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an 395 xor of NOTIFY_... flags. */ 396 /* Cluster */ 397 int cluster_enabled; /* Is cluster enabled? */ 398 mstime_t cluster_node_timeout; /* Cluster node timeout. */ 399 char *cluster_configfile; /* Cluster auto-generated config file name. */ 400 struct clusterState *cluster; /* State of the cluster */ 401 int cluster_migration_barrier; /* Cluster replicas migration barrier. */ 402 int cluster_slave_validity_factor; /* Slave max data age for failover. */ 403 int cluster_require_full_coverage; /* If true, put the cluster down if 404 there is at least an uncovered slot.*/ 405 int cluster_slave_no_failover; /* Prevent slave from starting a failover 406 if the master is in failure state. */ 407 char *cluster_announce_ip; /* IP address to announce on cluster bus. */ 408 int cluster_announce_port; /* base port to announce on cluster bus. */ 409 int cluster_announce_bus_port; /* bus port to announce on cluster bus. */ 410 int cluster_module_flags; /* Set of flags that Redis modules are able 411 to set in order to suppress certain 412 native Redis Cluster features. Check the 413 REDISMODULE_CLUSTER_FLAG_*. */ 414 /* Scripting */ 415 // Lua 环境 416 lua_State *lua; /* The Lua interpreter. We use just one for all clients */ 417 // 复制执行 Lua 脚本中的 Redis 命令的伪客户端 418 redisClient *lua_client; /* The "fake client" to query Redis from Lua */ 419 // 当前正在执行 EVAL 命令的客户端,如果没有就是 NULL 420 redisClient *lua_caller; /* The client running EVAL right now, or NULL */ 421 // 一个字典,值为 Lua 脚本,键为脚本的 SHA1 校验和 422 dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ 423 unsigned long long lua_scripts_mem; /* Cached scripts‘ memory + oh */ 424 // Lua 脚本的执行时限 425 mstime_t lua_time_limit; /* Script timeout in milliseconds */ 426 // 脚本开始执行的时间 427 mstime_t lua_time_start; /* Start time of script, milliseconds time */ 428 // 脚本是否执行过写命令 429 int lua_write_dirty; /* True if a write command was called during the 430 execution of the current script. */ 431 // 脚本是否执行过带有随机性质的命令 432 int lua_random_dirty; /* True if a random command was called during the 433 execution of the current script. */ 434 int lua_replicate_commands; /* True if we are doing single commands repl. */ 435 int lua_multi_emitted;/* True if we already proagated MULTI. */ 436 int lua_repl; /* Script replication flags for redis.set_repl(). */ 437 // 脚本是否超时 438 int lua_timedout; /* True if we reached the time limit for script 439 execution. */ 440 // 是否要杀死脚本 441 int lua_kill; /* Kill the script if true. */ 442 int lua_always_replicate_commands; /* Default replication type. */ 443 /* Lazy free */ 444 int lazyfree_lazy_eviction; 445 int lazyfree_lazy_expire; 446 int lazyfree_lazy_server_del; 447 /* Latency monitor */ 448 long long latency_monitor_threshold; 449 dict *latency_events; 450 /* Assert & bug reporting */ 451 const char *assert_failed; 452 const char *assert_file; 453 int assert_line; 454 int bug_report_start; /* True if bug report header was already logged. */ 455 int watchdog_period; /* Software watchdog period in ms. 0 = off */ 456 /* System hardware info */ 457 size_t system_memory_size; /* Total memory in system as reported by OS */ 458 459 /* Mutexes used to protect atomic variables when atomic builtins are 460 * not available. */ 461 pthread_mutex_t lruclock_mutex; 462 pthread_mutex_t next_client_id_mutex; 463 pthread_mutex_t unixtime_mutex; 464 };
设置异步释放客户端的目的主要是:防止底层函数正在向客户端的输出缓冲区写数据的时候,关闭客户端,这样是不安全的。Redis会安排客户端在serverCron()
函数的安全时间释放它。
当然也可以取消异步释放,那么就会调用freeClient()
函数立即释放。源码如下:
1 // 取消设置异步释放的client
2 void freeClientsInAsyncFreeQueue(void) {
3 // 遍历所有即将关闭的client
4 while (listLength(server.clients_to_close)) {
5 listNode *ln = listFirst(server.clients_to_close);
6 client *c = listNodeValue(ln);
7
8 // 取消立即关闭的标志
9 c->flags &= ~CLIENT_CLOSE_ASAP;
10 //释放客户端
11 freeClient(c);
12 // 从即将关闭的client链表中删除
13 listDelNode(server.clients_to_close,ln);
14 }
15 }
两种关闭客户端方式的底层都是调用freeClient,源码如下
1 void freeClient(client *c) {
2 listNode *ln;
3
4 /* If a client is protected, yet we need to free it right now, make sure
5 * to at least use asynchronous freeing. */
6 if (c->flags & CLIENT_PROTECTED) {
7 freeClientAsync(c);
8 return;
9 }
10
11 /* If it is our master that‘s beging disconnected we should make sure
12 * to cache the state to try a partial resynchronization later.
13 *
14 * Note that before doing this we make sure that the client is not in
15 * some unexpected state, by checking its flags. */
16 if (server.master && c->flags & CLIENT_MASTER) {
17 serverLog(LL_WARNING,"Connection with master lost.");
18 if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
19 CLIENT_CLOSE_ASAP|
20 CLIENT_BLOCKED)))
21 {
22 replicationCacheMaster(c);
23 return;
24 }
25 }
26
27 /* Log link disconnection with slave */
28 if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
29 serverLog(LL_WARNING,"Connection with replica %s lost.",
30 replicationGetSlaveName(c));
31 }
32
33 /* Free the query buffer */
34 sdsfree(c->querybuf);
35 sdsfree(c->pending_querybuf);
36 c->querybuf = NULL;
37
38 /* Deallocate structures used to block on blocking ops. */
39 if (c->flags & CLIENT_BLOCKED) unblockClient(c);
40 dictRelease(c->bpop.keys);
41
42 /* UNWATCH all the keys */
43 unwatchAllKeys(c);
44 listRelease(c->watched_keys);
45
46 /* Unsubscribe from all the pubsub channels */
47 pubsubUnsubscribeAllChannels(c,0);
48 pubsubUnsubscribeAllPatterns(c,0);
49 dictRelease(c->pubsub_channels);
50 listRelease(c->pubsub_patterns);
51
52 /* Free data structures. */
53 listRelease(c->reply);
54 freeClientArgv(c);
55
56 /* Unlink the client: this will close the socket, remove the I/O
57 * handlers, and remove references of the client from different
58 * places where active clients may be referenced. */
59 unlinkClient(c);
60
61 /* Master/slave cleanup Case 1:
62 * we lost the connection with a slave. */
63 if (c->flags & CLIENT_SLAVE) {
64 if (c->replstate == SLAVE_STATE_SEND_BULK) {
65 if (c->repldbfd != -1) close(c->repldbfd);
66 if (c->replpreamble) sdsfree(c->replpreamble);
67 }
68 list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
69 ln = listSearchKey(l,c);
70 serverAssert(ln != NULL);
71 listDelNode(l,ln);
72 /* We need to remember the time when we started to have zero
73 * attached slaves, as after some time we‘ll free the replication
74 * backlog. */
75 if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
76 server.repl_no_slaves_since = server.unixtime;
77 refreshGoodSlavesCount();
78 }
79
80 /* Master/slave cleanup Case 2:
81 * we lost the connection with the master. */
82 if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
83
84 /* If this client was scheduled for async freeing we need to remove it
85 * from the queue. */
86 if (c->flags & CLIENT_CLOSE_ASAP) {
87 ln = listSearchKey(server.clients_to_close,c);
88 serverAssert(ln != NULL);
89 listDelNode(server.clients_to_close,ln);
90 }
91
92 /* Release other dynamically allocated client structure fields,
93 * and finally release the client structure itself. */
94 if (c->name) decrRefCount(c->name);
95 zfree(c->argv);
96 freeClientMultiState(c);
97 sdsfree(c->peerid);
98 zfree(c);
99 }
3. 命令接收与命令回复
3.1 命令接收
当客户端连接上Redis服务器后,服务器会得到一个文件描述符fd,而且服务器会监听该文件描述符的读事件,这些在createClient()函数中,我们有分析。那么当客户端发送了命令,触发了AE_READABLE事件,那么就会调用回调函数readQueryFromClient()来从文件描述符fd中读发来的命令,并保存在输入缓冲区中querybuf。而这个回调函数就是我们在Redis 事件处理实现一文中所提到的指向回调函数的指针rfileProc和wfileProc。那么,我们先来分析sendReplyToClient()函数。
1 /** 2 * 为了接收客户端传来的命令请求, 服务器要为客户端套接字关联命令请求处理器。 3 * 4 * readQueryFromClient函数是Redis的命令请求处理器,这个处理器负责从套接字中读入客户端发送的命令请求内容, 5 * 具体实现为unistd.h/read函数的包装。 6 * 7 * 当一个客户端通过连接应答处理器成功连接到服务器之后, 8 * 服务器会将客户端套接字的AE_READABLE事件和命令请求处理器关联起来,当客户端向服务器发送命令请求的时候, 9 * 套接字就会产生 AE_READABLE事件,引发命令请求处理器执行,并执行相应的套接字读入操作, 10 * 11 * 在客户端连接服务器的整个过程中,服务器都会一直为客户端套接字的AE_READABLE事件关联命令请求处理器。 12 */ 13 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { 14 //指向之前设置的对象指针 15 client *c = (client*) privdata; 16 //readlen:REDIS_IOBUF_LEN 17 int nread, readlen; 18 //指示之前已经读的数据 19 size_t qblen; 20 //设置几个变量 21 UNUSED(el); 22 UNUSED(mask); 23 24 //每次想读的数据长度16K 25 readlen = PROTO_IOBUF_LEN; 26 /* If this is a multi bulk request, and we are processing a bulk reply 27 * that is large enough, try to maximize the probability that the query 28 * buffer contains exactly the SDS string representing the object, even 29 * at the risk of requiring more read(2) calls. This way the function 30 * processMultiBulkBuffer() can avoid copying buffers to create the 31 * Redis Object representing the argument. */ 32 // 如果是多条请求,根据请求的大小,设置读入的长度readlen 33 if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 34 && c->bulklen >= PROTO_MBULK_BIG_ARG) 35 { 36 ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); 37 38 /* Note that the ‘remaining‘ variable may be zero in some edge case, 39 * for example once we resume a blocked client after CLIENT PAUSE. */ 40 if (remaining > 0 && remaining < readlen) readlen = remaining; 41 } 42 43 //之前缓冲区里已经存在的数据的长度 44 qblen = sdslen(c->querybuf); 45 // 更新缓冲区的峰值 46 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; 47 //保证有足够的空间 48 c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); 49 // 从 fd 对应的socket中读取到 client 中的 querybuf 输入缓冲区 50 nread = read(fd, c->querybuf+qblen, readlen); 51 // 读操作出错 52 if (nread == -1) { 53 if (errno == EAGAIN) { 54 return; 55 } else { 56 // 出错释放 client 57 serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); 58 freeClient(c); 59 return; 60 } 61 } else if (nread == 0) { 62 // 读操作完成 63 // 客户端主动关闭 connection 64 serverLog(LL_VERBOSE, "Client closed connection"); 65 freeClient(c); 66 return; 67 } else if (c->flags & CLIENT_MASTER) { 68 /** 69 * Append the query buffer to the pending (not applied) buffer 70 * of the master. We‘ll use this buffer later in order to have a 71 * copy of the string applied by the last command executed. 72 * 73 * 当这个client代表主从的master节点时,将query buffer和 pending_querybuf结合 74 * 用于主从复制中的命令传播???? 75 * 76 * 将查询缓冲区附加到 master 的挂起(未应用)缓冲区。 稍后我们将使用此缓冲区, 77 * 以便获得执行的最后一个命令所应用的字符串的副本。 78 */ 79 c->pending_querybuf = sdscatlen(c->pending_querybuf, 80 c->querybuf+qblen,nread); 81 } 82 83 // 更新输入缓冲区的已用大小和未用大小。 84 sdsIncrLen(c->querybuf,nread); 85 // 设置最后一次服务器和client交互的时间 86 c->lastinteraction = server.unixtime; 87 // 如果是主节点,则更新复制操作的偏移量 88 if (c->flags & CLIENT_MASTER) c->read_reploff += nread; 89 // 更新从网络输入的字节数 90 server.stat_net_input_bytes += nread; 91 // 如果大于系统配置的最大客户端缓存区大小,也就是配置文件中的client-query-buffer-limit:1G 92 if (sdslen(c->querybuf) > server.client_max_querybuf_len) { 93 // 将client信息转换为sds 94 sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); 95 96 // 返回错误信息,并且关闭client 97 bytes = sdscatrepr(bytes,c->querybuf,64); 98 // 打印到日志 99 serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); 100 // 释放空间 101 sdsfree(ci); 102 sdsfree(bytes); 103 freeClient(c); 104 return; 105 } 106 107 /* Time to process the buffer. If the client is a master we need to 108 * compute the difference between the applied offset before and after 109 * processing the buffer, to understand how much of the replication stream 110 * was actually applied to the master state: this quantity, and its 111 * corresponding part of the replication stream, will be propagated to 112 * the sub-slaves and to the replication backlog. */ 113 // 处理client输入的命令内容 114 processInputBufferAndReplicate(c); 115 }
实际上,这个readQueryFromClient()函数是read函数的封装,从文件描述符fd中读出数据到输入缓冲区querybuf中,并更新输入缓冲区的峰值querybuf_peak,而且会检查读的长度,如果大于了server.client_max_querybuf_len则会退出,而这个阀值在服务器初始化为PROTO_MAX_QUERYBUF_LEN (1024*1024*1024)也就是1G大小。
最后一是调用processInputBufferAndReplicate()函数对客户端角色进行判断,然后执行相应的操作,源码如下:
1 /**
2 * This is a wrapper for processInputBuffer that also cares about handling
3 * the replication forwarding to the sub-slaves, in case the client ‘c‘
4 * is flagged as master. Usually you want to call this instead of the
5 * raw processInputBuffer().
6 *
7 * 这是 processInputBuffer 的一个包装器,它也关心处理复制转发到子从站,
8 * 以防客户端“c”被标记为主站。 通常你想调用它而不是原始的 processInputBuffer()。
9 */
10 void processInputBufferAndReplicate(client *c) {
11 if (!(c->flags & CLIENT_MASTER)) {
12 // processInputBuffer 处理输入缓冲区,解析获取命令
13 processInputBuffer(c);
14 } else {
15 // 如果client是master的连接
16 size_t prev_offset = c->reploff;
17 processInputBuffer(c);
18 // 判断是否同步偏移量发生变化,则通知到后续的slave
19 size_t applied = c->reploff - prev_offset;
20 if (applied) {
21 replicationFeedSlavesFromMasterStream(server.slaves,
22 c->pending_querybuf, applied);
23 sdsrange(c->pending_querybuf,applied,-1);
24 }
25 }
26 }
但是不管客户端是什么角色,都是通过client的argv和argc这两个成员来处理的。因此,服务器还需要将输入缓冲区querybuf中的数据,处理成参数列表的对象,也就是上面的processInputBuffer()函数。源码如下:
1 /**
2 * This function is called every time, in the client structure ‘c‘, there is
3 * more query buffer to process, because we read more data from the socket
4 * or because a client was blocked and later reactivated, so there could be
5 * pending query buffer, already representing a full command, to process.
6 *
7 * 在客户端结构“c”中,每次调用此函数时,有更多的查询缓冲区要处理,因为我们从套接字读取了更多数据,
8 * 或者因为客户端被阻塞并稍后重新激活,因此可能已经有一个要处理完整的命令位待处理的查询缓冲区。
9 *
10 * processInputBuffer 主要是将输入缓冲区中的数据解析成对应的命令,
11 * 根据命令类型是 PROTO_REQ_MULTIBULK 还是 PROTO_REQ_INLINE,来分别调用 processInlineBuffer 和
12 * processMultibulkBuffer 方法来解析命令。
13 *
14 * 然后调用 processCommand 方法来执行命令。执行成功后,如果是主从客户端,
15 * 还需要更新同步偏移量 reploff 属性,然后重置 client,让client可以接收一条命令。
16 */
17 void processInputBuffer(client *c) {
18 server.current_client = c;
19
20 /* Keep processing while there is something in the input buffer */
21 /* 当缓冲区中还有数据时就一直处理 */
22 while(c->qb_pos < sdslen(c->querybuf)) {
23 /* Return if clients are paused. */
24 // 如果处于暂停状态,直接返回
25 if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
26
27 // 处理 client 的各种状态
28
29 /**
30 * Immediately abort if the client is in the middle of something.
31 * 如果client处于被阻塞状态,直接返回
32 */
33 if (c->flags & CLIENT_BLOCKED) break;
34
35 /**
36 * Don‘t process input from the master while there is a busy script
37 * condition on the slave. We want just to accumulate the replication
38 * stream (instead of replying -BUSY like we do with other clients) and
39 * later resume the processing.
40 * 当从站上有繁忙的脚本条件时,不要处理来自主站的输入。
41 * 我们只想累积复制流(而不是像我们对其他客户端那样回复 -BUSY)并且稍后恢复处理。
42 */
43 if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
44
45 /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
46 * written to the client. Make sure to not let the reply grow after
47 * this flag has been set (i.e. don‘t process more commands).
48 *
49 * The same applies for clients we want to terminate ASAP.
50 *
51 * 一旦回复写入客户端,CLIENT_CLOSE_AFTER_REPLY 将关闭连接。
52 * 确保在设置此标志后不要再次回复(即不要处理更多命令)。 这同样适用于我们希望尽快终止的客户。
53 *
54 * 如果client处于关闭状态,则直接返回
55 */
56 if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
57
58 /* Determine request type when unknown. */
59 // 如果是未知的请求类型,则判定请求类型
60 if (!c->reqtype) {
61 if (c->querybuf[c->qb_pos] == ‘*‘) {
62 // 如果是"*"开头,则是多条请求,是client发来的
63 c->reqtype = PROTO_REQ_MULTIBULK;
64 } else {
65 // 否则就是内联请求,是Telnet发来的
66 c->reqtype = PROTO_REQ_INLINE;
67 }
68 }
69
70 // 如果是Telnet内联请求
71 if (c->reqtype == PROTO_REQ_INLINE) {
72 // 处理Telnet发来的内联命令,并创建成对象,保存在client的参数列表中
73 if (processInlineBuffer(c) != C_OK) break;
74 } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
75 // 将client的querybuf中的协议内容转换为client的参数列表中的对象
76 if (processMultibulkBuffer(c) != C_OK) break;
77 } else {
78 serverPanic("Unknown request type");
79 }
80
81 /* Multibulk processing could see a <= 0 length. */
82 // 如果参数为0,则重置client
83 if (c->argc == 0) {
84 resetClient(c);
85 } else {
86 /* Only reset the client when the command was executed. */
87 // 只有执行命令成功后才会重置client
88 if (processCommand(c) == C_OK) {
89 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
90 /* Update the applied replication offset of our master. */
91 c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
92 }
93
94 /**
95 * Don‘t reset the client structure for clients blocked in a
96 * module blocking command, so that the reply callback will
97 * still be able to access the client argv and argc field.
98 * The client will be reset in unblockClientFromModule().
99 * 如果当前客户端是非阻塞的或者当前客户端的命令是非阻塞的就重置客户端
100 */
101 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
102 resetClient(c);
103 }
104 /* freeMemoryIfNeeded may flush slave output buffers. This may
105 * result into a slave, that may be the active client, to be
106 * freed. */
107 if (server.current_client == NULL) break;
108 }
109 }
110
111 /* Trim to pos */
112 if (c->qb_pos) {
113 sdsrange(c->querybuf,c->qb_pos,-1);
114 c->qb_pos = 0;
115 }
116
117 // 执行成功,则将用于崩溃报告的client设置为NULL
118 server.current_client = NULL;
119 }
这个processInputBuffer()函数只要根据reqtype来判断和设置请求的类型,之前提过,因为Redis服务器支持Telnet的连接,因此Telnet命令请求协议类型是PROTO_REQ_INLINE,进而调用processInlineBuffer()函数处理,而redis-cli命令请求的协议类型是PROTO_REQ_MULTIBULK,进而调用processMultibulkBuffer()函数来处理。我们只要看processMultibulkBuffer()函数,是如果将Redis协议的命令,处理成参数列表的对象的。源码如下:
1 /**
2 * Process the query buffer for client ‘c‘, setting up the client argument
3 * vector for command execution. Returns C_OK if after running the function
4 * the client has a well-formed ready to be processed command, otherwise
5 * C_ERR if there is still to read more buffer to get the full command.
6 * The function also returns C_ERR when there is a protocol error: in such a
7 * case the client structure is setup to reply with the error and close
8 * the connection.
9 *
10 * This function is called if processInputBuffer() detects that the next
11 * command is in RESP format, so the first byte in the command is found
12 * to be ‘*‘. Otherwise for inline commands processInlineBuffer() is called.
13 * 将client的querybuf中的协议内容转换为client的参数列表中的对象
14 */
15 int processMultibulkBuffer(client *c) {
16 char *newline = NULL;
17 int ok;
18 long long ll;
19
20 // 参数列表中命令数量为0
21 if (c->multibulklen == 0) {
22 /* The client should have been reset */
23 serverAssertWithInfo(c,NULL,c->argc == 0);
24
25 /* Multi bulk length cannot be read without a \r\n */
26 // 查询第一个换行符
27 newline = strchr(c->querybuf+c->qb_pos,‘\r‘);
28 // 没有找到\r\n,表示不符合协议,返回错误
29 if (newline == NULL) {
30 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
31 addReplyError(c,"Protocol error: too big mbulk count string");
32 setProtocolError("too big mbulk count string",c);
33 }
34 return C_ERR;
35 }
36
37 /* Buffer should also contain \n 检查格式,缓存区也应该包含\n*/
38 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
39 return C_ERR;
40
41 /* We know for sure there is a whole line since newline != NULL,
42 * so go ahead and find out the multi bulk length. */
43 // 保证第一个字符为‘*‘
44 serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == ‘*‘);
45 // 将‘*‘之后的数字转换为整数。*3\r\n
46 ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
47 if (!ok || ll > 1024*1024) {
48 addReplyError(c,"Protocol error: invalid multibulk length");
49 setProtocolError("invalid mbulk count",c);
50 return C_ERR;
51 }
52
53 // 指向"*3\r\n"的"\r\n"之后的位置
54 c->qb_pos = (newline-c->querybuf)+2;
55
56 // 空白命令,则将之前的删除,保留未阅读的部分
57 if (ll <= 0) return C_OK;
58
59 // 参数数量
60 c->multibulklen = ll;
61
62 /* Setup argv array on client structure */
63 // 分配client参数列表的空间
64 if (c->argv) zfree(c->argv);
65 c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
66 }
67
68 serverAssertWithInfo(c,NULL,c->multibulklen > 0);
69 // 读入multibulklen个参数,并创建对象保存在参数列表中
70 while(c->multibulklen) {
71 /* Read bulk length if unknown */
72 // 读入参数的长度
73 if (c->bulklen == -1) {
74 // 找到换行符,确保"\r\n"存在
75 newline = strchr(c->querybuf+c->qb_pos,‘\r‘);
76 if (newline == NULL) {
77 if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
78 addReplyError(c,
79 "Protocol error: too big bulk count string");
80 setProtocolError("too big bulk count string",c);
81 return C_ERR;
82 }
83 break;
84 }
85
86 /* Buffer should also contain \n */
87 if (newline-(c->querybuf+c->qb_pos) > (ssize_t)(sdslen(c->querybuf)-c->qb_pos-2))
88 break;
89
90 // $3\r\nSET\r\n...,确保是‘$‘字符,保证格式
91 if (c->querybuf[c->qb_pos] != ‘$‘) {
92 addReplyErrorFormat(c,
93 "Protocol error: expected ‘$‘, got ‘%c‘",
94 c->querybuf[c->qb_pos]);
95 setProtocolError("expected $ but got something else",c);
96 return C_ERR;
97 }
98
99 // 将命令长度保存到ll。
100 ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
101 if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
102 addReplyError(c,"Protocol error: invalid bulk length");
103 setProtocolError("invalid bulk length",c);
104 return C_ERR;
105 }
106
107 // 定位第一个参数的位置,也就是SET的S
108 c->qb_pos = newline-c->querybuf+2;
109 // 参数太长,进行优化
110 if (ll >= PROTO_MBULK_BIG_ARG) {
111 /* If we are going to read a large object from network
112 * try to make it likely that it will start at c->querybuf
113 * boundary so that we can optimize object creation
114 * avoiding a large copy of data.
115 *
116 * But only when the data we have not parsed is less than
117 * or equal to ll+2. If the data length is greater than
118 * ll+2, trimming querybuf is just a waste of time, because
119 * at this time the querybuf contains not only our bulk. */
120 /**
121 * 如果我们要从网络中读取一个大的对象,尝试使它可能从c-> querybuf边界开始,
122 * 以便我们可以优化对象创建,避免大量的数据副本
123 *
124 * 保存未读取的部分
125 */
126 if (sdslen(c->querybuf)-c->qb_pos <= (size_t)ll+2) {
127 sdsrange(c->querybuf,c->qb_pos,-1);
128 // 重置偏移量
129 c->qb_pos = 0;
130 /* Hint the sds library about the amount of bytes this string is
131 * going to contain. */
132 // 扩展querybuf的大小
133 c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2);
134 }
135 }
136 // 保存参数的长度
137 c->bulklen = ll;
138 }
139
140 /* Read bulk argument */
141 // 因为只读了multibulklen字节的数据,读到的数据不够,则直接跳出循环,执行processInputBuffer()函数循环读取
142 if (sdslen(c->querybuf)-c->qb_pos < (size_t)(c->bulklen+2)) {
143 /* Not enough data (+2 == trailing \r\n) */
144 break;
145 } else {
146 // 为参数创建了对象
147 /* Optimization: if the buffer contains JUST our bulk element
148 * instead of creating a new object by *copying* the sds we
149 * just use the current sds string. */
150 // 如果读入的长度大于32k
151 if (c->qb_pos == 0 &&
152 c->bulklen >= PROTO_MBULK_BIG_ARG &&
153 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
154 {
155 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
156 // 跳过换行
157 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
158 /* Assume that if we saw a fat argument we‘ll see another one
159 * likely... */
160 // 设置一个新长度
161 c->querybuf = sdsnewlen(SDS_NOINIT,c->bulklen+2);
162 sdsclear(c->querybuf);
163 } else {
164 // 创建对象保存在client的参数列表中
165 c->argv[c->argc++] =
166 createStringObject(c->querybuf+c->qb_pos,c->bulklen);
167 c->qb_pos += c->bulklen+2;
168 }
169 // 清空命令内容的长度
170 c->bulklen = -1;
171 // 未读取命令参数的数量,读取一个,该值减1
172 c->multibulklen--;
173 }
174 }
175
176 /* We‘re done when c->multibulk == 0 */
177 // 删除已经读取的,保留未读取的
178 if (c->multibulklen == 0) return C_OK;
179
180 /* Still not ready to process the command */
181 return C_ERR;
182 }
我们结合一个多条批量回复进行分析。一个多条批量回复以 *<argc>\r\n
为前缀,后跟多条不同的批量回复,其中 argc
为这些批量回复的数量。那么SET nmykey nmyvalue
命令转换为Redis协议内容如下:
1 "*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"
当进入processMultibulkBuffer()函数之后,如果是第一次执行该函数,那么argv中未读取的命令数量为0,也就是说参数列表为空,那么会执行if (c->multibulklen == 0)的代码,这里的代码会解析*3\r\n,将3保存到multibulklen中,表示后面的参数个数,然后根据参数个数,为argv分配空间。
接着,执行multibulklen次while循环,每次读一个参数,例如$3\r\nSET\r\n,也是先读出参数长度,保存在bulklen中,然后将参数SET保存构建成对象保存到参数列表中。每次读一个参数,multibulklen就会减1,当等于0时,就表示命令的参数全部读取到参数列表完毕。
于是命令接收的整个过程完成。
3.2 命令回复
命令回复的函数,也是事件处理程序的回调函数之一。当服务器的client的回复缓冲区有数据,那么就会调用aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c)函数,将文件描述符fd和AE_WRITABLE事件关联起来,当客户端可写时,就会触发事件,调用sendReplyToClient()函数,执行写事件。我们重点看这个函数的代码:
1 /**
2 * Write event handler. Just send data to the client.
3 * 为了向客户端返回命令的执行结果, 服务器要为客户端套接字关联命令回复处理器。
4 *
5 * sendReplyToClient函数是Redis的命令回复处理器,
6 * 这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端,具体实现为unistd.h/write函数的包装。
7 *
8 * 当服务器有命令回复需要传送给客户端的时候,服务器会将客户端套接字的AE_WRITABLE事件和命令回复处理器关联起来,
9 * 当客户端准备好接收服务器传回的命令回复时,就会产生AE_WRITABLE事件,引发命令回复处理器执行,
10 * 并执行相应的套接字写入操作,
11 *
12 * 当命令回复发送完毕之后, 服务器就会解除命令回复处理器与客户端套接字的 AE_WRITABLE 事件之间的关联。
13 */
14 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
15 UNUSED(el);
16 UNUSED(mask);
17 // 发送完数据会删除fd的可读事件
18 writeToClient(fd,privdata,1);
19 }
这个函数直接调用了writeToClient()
函数,该函数源码如下:
1 /** 2 * Write data in output buffers to client. Return C_OK if the client 3 * is still valid after the call, C_ERR if it was freed. 4 * 这个函数实际上是对write()函数的封装,将静态回复缓冲区buf或回复链表reply中的数据循环写到文件描述符fd中。 5 * 如果写完了,则将当前客户端的AE_WRITABLE事件删除。 6 * 7 * 将输出缓冲区的数据写给client,如果client被释放则返回C_ERR,没被释放则返回C_OK 8 */ 9 int writeToClient(int fd, client *c, int handler_installed) { 10 ssize_t nwritten = 0, totwritten = 0; 11 size_t objlen; 12 clientReplyBlock *o; 13 14 // 如果指定的client的回复缓冲区中还有数据,则返回真,表示可以写socket 15 while(clientHasPendingReplies(c)) { 16 // 固定缓冲区发送未完成 17 if (c->bufpos > 0) { 18 // 将缓冲区的数据写到fd中 19 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); 20 // 写失败跳出循环 21 if (nwritten <= 0) break; 22 // 更新发送的数据计数器 23 c->sentlen += nwritten; 24 totwritten += nwritten; 25 26 /* If the buffer was sent, set bufpos to zero to continue with 27 * the remainder of the reply. */ 28 // 如果发送的数据等于buf的偏移量,表示发送完成 29 if ((int)c->sentlen == c->bufpos) { 30 // 则将其重置 31 c->bufpos = 0; 32 c->sentlen = 0; 33 } 34 } else {// 固定缓冲区发送完成,发送回复链表的内容 35 // 回复链表的第一条回复对象,和对象值的长度和所占的内存 36 o = listNodeValue(listFirst(c->reply)); 37 objlen = o->used; 38 39 // 跳过空对象,并删除这个对象 40 if (objlen == 0) { 41 c->reply_bytes -= o->size; 42 listDelNode(c->reply,listFirst(c->reply)); 43 continue; 44 } 45 46 // 将当前节点的值写到fd中 47 nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); 48 // 写失败跳出循环 49 if (nwritten <= 0) break; 50 // 更新发送的数据计数器 51 c->sentlen += nwritten; 52 totwritten += nwritten; 53 54 /* If we fully sent the object on head go to the next one */ 55 // 发送完成,则删除该节点,重置发送的数据长度,更新回复链表的总字节数 56 if (c->sentlen == objlen) { 57 c->reply_bytes -= o->size; 58 listDelNode(c->reply,listFirst(c->reply)); 59 c->sentlen = 0; 60 /* If there are no longer objects in the list, we expect 61 * the count of reply bytes to be exactly zero. */ 62 if (listLength(c->reply) == 0) 63 serverAssert(c->reply_bytes == 0); 64 } 65 } 66 /** 67 * Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT 68 * bytes, in a single threaded server it‘s a good idea to serve 69 * other clients as well, even if a very large request comes from 70 * super fast link that is always able to accept data (in real world 71 * scenario think about ‘KEYS *‘ against the loopback interface). 72 * 73 * However if we are over the maxmemory limit we ignore that and 74 * just deliver as much data as it is possible to deliver. 75 * 76 * Moreover, we also send as much as possible if the client is 77 * a slave (otherwise, on high-speed traffic, the replication 78 * buffer will grow indefinitely) 79 * 80 * 如果这次写的总量大于NET_MAX_WRITES_PER_EVENT的限制,则会中断本次的写操作, 81 * 将处理时间让给其他的client,以免一个非常的回复独占服务器,剩余的数据下次继续在写 82 * 83 * 但是,如果当服务器的内存数已经超过maxmemory,即使超过最大写NET_MAX_WRITES_PER_EVENT的限制, 84 * 也会继续执行写入操作,是为了尽快写入给客户端 85 */ 86 if (totwritten > NET_MAX_WRITES_PER_EVENT && 87 (server.maxmemory == 0 || 88 zmalloc_used_memory() < server.maxmemory) && 89 !(c->flags & CLIENT_SLAVE)) break; 90 } 91 // 更新写到网络的字节数 92 server.stat_net_output_bytes += totwritten; 93 // 处理写入失败 94 if (nwritten == -1) { 95 if (errno == EAGAIN) { 96 nwritten = 0; 97 } else { 98 serverLog(LL_VERBOSE, 99 "Error writing to client: %s", strerror(errno)); 100 freeClient(c); 101 return C_ERR; 102 } 103 } 104 // 写入成功 105 if (totwritten > 0) { 106 /** 107 * For clients representing masters we don‘t count sending data 108 * as an interaction, since we always send REPLCONF ACK commands 109 * that take some time to just fill the socket output buffer. 110 * We just rely on data / pings received for timeout detection. 111 * 如果不是主节点服务器,则更新最近和服务器交互的时间 112 */ 113 if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; 114 } 115 // 如果指定的client的回复缓冲区中已经没有数据,发送完成 116 if (!clientHasPendingReplies(c)) { 117 c->sentlen = 0; 118 // 删除当前client的可读事件的监听 119 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); 120 121 /* Close connection after entire reply has been sent. */ 122 // 如果指定了写入按成之后立即关闭的标志,则释放client 123 if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { 124 freeClient(c); 125 return C_ERR; 126 } 127 } 128 return C_OK; 129 }
这个函数实际上是对write()
函数的封装,将静态回复缓冲区buf
或回复链表reply
中的数据循环写到文件描述符fd
中。如果写完了,则将当前客户端的AE_WRITABLE
事件删除。
至此,命令回复就执行完毕。
3.3 服务器连接应答函数
在上面的分析中,将文件事件的两种处理程序,命令接受和命令回复分别分析了,那么就干脆将剩下的服务器连接应答函数的源码也列出来,可以根据Redis 事件处理实现源码剖析来一起学习。
连接应答函数分两种,分别是本地和TCP连接,但是都是对accept()函数的封装。
原文:https://www.cnblogs.com/MrLiuZF/p/14999986.html