nginx upstream机制使得nginx可以成为一个反向代理服务器,nginx一方面从下游客户端接收http请求,处理请求,并根据请求发送tcp报文到上游服务器,根据上游服务器的返回报文,来向下游客户端发送请求响应报文。
ngx_http_proxy_handler(ngx_http_request_t *r) { : : : //设置http proxy使用到的upstream机制的各种方法 //设置创建请求报文的回调函数 u->create_request = ngx_http_proxy_create_request; //设置当链接失败时,需要执行的动作 u->reinit_request = ngx_http_proxy_reinit_request; //设置处理上游服务器的响应头回调函数 u->process_header = ngx_http_proxy_process_status_line; //当前无意义 u->abort_request = ngx_http_proxy_abort_request; //请求结束后会调用该方法 u->finalize_request = ngx_http_proxy_finalize_request; //设置upstream的buffer标志位,为0时,以下游网速优先, //不会使用文件缓存响应包体,为1时,有多个buffer,并且 //可以使用文件来缓存响应包体 u->buffering = plcf->upstream.buffering; //当buffering为1时会使用到该pipe结构,即下游网速优先,需要使用更多的buffer和临时文件缓存响应 u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t)); if (u->pipe == NULL) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } u->pipe->input_filter = ngx_event_pipe_copy_input_filter; u->accel = 1; //开始读取请求包体,读取结束后,开始调用ngx_http_upstream_init, //开始upstream的流程 rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init); : : return NGX_DONE; }
static void ngx_http_upstream_check_broken_connection(ngx_http_request_t *r, ngx_event_t *ev) { int n; char buf[1]; ngx_err_t err; ngx_int_t event; ngx_connection_t *c; ngx_http_upstream_t *u; c = r->connection; u = r->upstream; //若连接已终止的话,该recv返回值会为0,MSG_PEEK表示会去 //读取数据,但不会减少接收缓存中的数据,在这里读取1 //个字节,来判断读方向能否正常工作 n = recv(c->fd, buf, 1, MSG_PEEK); err = ngx_socket_errno; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, err, "http upstream recv(): %d", n); //ev->write表明是写方向触发的事件,读方向能读到数据, //或者返回码为NGX_eagain,表明应该没有问题 if (ev->write && (n >= 0 || err == NGX_EAGAIN)) { return; } if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) { event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT; if (ngx_del_event(ev, event, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } //能用该socket读出数据,说明连接没有问题 if (n > 0) { return; } //返回值为-1,但错误为NGX_EAGAIN,表明recv超时时间到了 if (n == -1) { if (err == NGX_EAGAIN) { return; } //其他情况表明发生错误 ev->error = 1; } else { //n=0,一般表示连接已经结束 err = 0; } //设置事件的标记位,标记已经结束了 ev->eof = 1; c->error = 1; if (!u->cacheable && u->peer.connection) { ngx_log_error(NGX_LOG_INFO, ev->log, err, "client prematurely closed connection, " "so upstream connection is closed too"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_CLIENT_CLOSED_REQUEST); return; } ngx_log_error(NGX_LOG_INFO, ev->log, err, "client prematurely closed connection"); if (u->peer.connection == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_CLIENT_CLOSED_REQUEST); } } 2 ngx_http_upstream_connect static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_int_t rc; ngx_time_t *tp; ngx_connection_t *c; r->connection->log->action = "connecting to upstream"; r->connection->single_connection = 0; //记录下当前的响应秒数和毫秒数 if (u->state && u->state->response_sec) { tp = ngx_timeofday(); u->state->response_sec = tp->sec - u->state->response_sec; u->state->response_msec = tp->msec - u->state->response_msec; } u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); //记录下当前的响应秒数和毫秒数 tp = ngx_timeofday(); u->state->response_sec = tp->sec; u->state->response_msec = tp->msec; //开始连接上游服务器 rc = ngx_event_connect_peer(&u->peer); //printf("@@@@####rc is %d\n", (int)rc); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream connect: %i", rc); if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->state->peer = u->; //在busy或者declined的情况下,会调用ngx_http_upstream_next,该函数会 //多次尝试调用connect试图与上游服务器连接,多次连接失败后, //才会调用ngx_http_upstream_finalize_request if (rc == NGX_BUSY) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE); return; } if (rc == NGX_DECLINED) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } /* rc == NGX_OK || rc == NGX_AGAIN */ c = u->peer.connection; c->data = r; //将客户端与上游服务器的连接的读写事件的处理回调设置为 //ngx_http_upstream_handler c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler; //ngx_http_upstream_handler最后会调用u->write_event_handler或者read_event_handler u->write_event_handler = ngx_http_upstream_send_request_handler; u->read_event_handler = ngx_http_upstream_process_header; c->sendfile &= r->connection->sendfile; u->output.sendfile = c->sendfile; c->pool = r->pool; c->log = r->connection->log; c->read->log = c->log; c->write->log = c->log; /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */ u->writer.out = NULL; u->writer.last = &u->writer.out; u->writer.connection = c; u->writer.limit = 0; if (u->request_sent) { if (ngx_http_upstream_reinit(r, u) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } if (r->request_body && r->request_body->buf && r->request_body->temp_file && r == r->main) { /* * the r->request_body->buf can be reused for one request only, * the subrequests should allocate their own temporay bufs */ u-> = ngx_alloc_chain_link(r->pool); if (u-> == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->>buf = r->request_body->buf; u->>next = NULL; u->output.allocated = 1; r->request_body->buf->pos = r->request_body->buf->start; r->request_body->buf->last = r->request_body->buf->start; r->request_body->buf->tag = u->output.tag; } u->request_sent = 0; //与上游连接尚未建立起来,加入定时器,返回 //当与上游服务器连接建立成功会调用相关的处理函数 if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->connect_timeout); return; } #if (NGX_HTTP_SSL) if (u->ssl && c->ssl == NULL) { ngx_http_upstream_ssl_init_connection(r, u, c); return; } #endif //已经建立连接,向上游服务器发送请求内容 ngx_http_upstream_send_request(r, u); } 3 ngx_http_upstream_init_request static void ngx_http_upstream_init_request(ngx_http_request_t *r) { ngx_str_t *host; ngx_uint_t i; ngx_resolver_ctx_t *ctx, temp; ngx_http_cleanup_t *cln; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; ngx_http_upstream_srv_conf_t *uscf, **uscfp; ngx_http_upstream_main_conf_t *umcf; if (r->aio) { return; } u = r->upstream; u->store = (u->conf->store || u->conf->store_lengths); //ignore_client_abort为0标志着需要关注nginx和客户端的连接是否稳定 if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; } //从代码来看,request_bufs貌似是在create_request中设置的 if (r->request_body) { u->request_bufs = r->request_body->bufs; } //调用用户设置的create_request函数 if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //u->conf->local中保存的是与上游服务建立连接的本地地址 u->peer.local = u->conf->local; //得到http core模块在该loc下的配置 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); //设置upstream向下游客户端转发数据的各种参数,主要和 //buf相关 u->output.alignment = clcf->directio_alignment; u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; //往下游客户端写数据的接口 u->output.output_filter = ngx_chain_writer; u->output.filter_ctx = &u->writer; u->writer.pool = r->pool; if (r->upstream_states == NULL) { r->upstream_states = ngx_array_create(r->pool, 1, sizeof(ngx_http_upstream_state_t)); if (r->upstream_states == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } else { u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); } //将ngx_http_upstream_cleanup函数加入到request的cleanup链表中, //当request被删除时,会调用该函数 cln = ngx_http_cleanup_add(r, 0); if (cln == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //ngx_http_upstream_cleanup主要释放resolve数据结构,执行ngx_http_upstream_finalize cln->handler = ngx_http_upstream_cleanup; cln->data = r; u->cleanup = &cln->handler; //u->resolved中保存了用于与上游服务器建立连接的信息, //可以由开发人员在代码中设置,不设置的话,从配置文件中 //去获取 if (u->resolved == NULL) { uscf = u->conf->upstream; } else { //upstream中直接指定了相关的服务器地址,建立连接就ok了 if (u->resolved->sockaddr) { if (ngx_http_upstream_create_round_robin_peer(r, u->resolved) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_http_upstream_connect(r, u); return; } //在这里host应该为一个upstream组的名字 host = &u->resolved->host; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; //遍历系统中的upstream数组,找到匹配的upstream for (i = 0; i < umcf->upstreams.nelts; i++) { uscf = uscfp[i]; if (uscf->host.len == host->len && ((uscf->port == 0 && u->resolved->no_port) || uscf->port == u->resolved->port) && ngx_memcmp(uscf->, host->data, host->len) == 0) { goto found; } } if (u->resolved->port == 0) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no port in upstream \"%V\"", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } = *host; //下面这部分需要进行域名解析 ctx = ngx_resolve_start(clcf->resolver, &temp); if (ctx == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (ctx == NGX_NO_RESOLVER) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no resolver defined to resolve %V", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY); return; } //ngx_http_upstream_resolve_handler是域名解析后的回调函数 ctx->name = *host; ctx->type = NGX_RESOLVE_A; ctx->handler = ngx_http_upstream_resolve_handler; ctx->data = r; ctx->timeout = clcf->resolver_timeout; u->resolved->ctx = ctx; if (ngx_resolve_name(ctx) != NGX_OK) { u->resolved->ctx = NULL; ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } found: //peer.init()方法中会根据upstream的算法去选择一个服务器,来进行发送 //for example:us->peer.init = ngx_http_upstream_init_ip_hash_peer; if (uscf->peer.init(r, uscf) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //与上游服务器建立连接 ngx_http_upstream_connect(r, u); }2.3.2往上游发送请求
static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_int_t rc; ngx_connection_t *c; //peer.connection中是nginx与上游服务器建立的connection c = u->peer.connection; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream send request"); if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } c->log->action = "sending request to upstream"; //通过ngx_output_chain向上游服务器发送请求报文,request_sent //用来表示是否已经发送请求头了,发送了的话,继续发送 //剩余未发的就OK了,剩余未发送的数据保存在了u->output里面 rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs); //设置request_sent标志,表明已经发送过请求 u->request_sent = 1; if (rc == NGX_ERROR) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } //若写事件已经被加入到了定时器中,删除它,为后面的 //添加做准备 if (c->write->timer_set) { ngx_del_timer(c->write); } //NGX_AGAIN表明数据尚未发送完毕,需要将其加入到定时器中 //当发送事件触发时,会继续调用该函数。 if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->send_timeout); //主要是设置发送缓存的事件唤醒下限 if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } /* rc == NGX_OK */ if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { if (ngx_tcp_push(c->fd) == NGX_ERROR) { ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, ngx_tcp_push_n " failed"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } c->tcp_nopush = NGX_TCP_NOPUSH_UNSET; } //数据发送成功,添加一个读事件定时器 ngx_add_timer(c->read, u->conf->read_timeout); #if 1 //写事件已经发出,判断读事件是否ready if (c->read->ready) { /* post aio operation */ /* * TODO comment * although we can post aio operation just in the end * of ngx_http_upstream_connect() CHECK IT !!! * it's better to do here because we postpone header buffer allocation */ //读事件已经ready了,处理返回的报文头 ngx_http_upstream_process_header(r, u); return; } #endif //将写事件处理函数置为dummy的话,表明在读完相应之前,不允许 //接着写了 u->write_event_handler = ngx_http_upstream_dummy_handler; if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } }
static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u) { ssize_t n; ngx_int_t rc; ngx_connection_t *c; c = u->peer.connection; c->log->action = "reading response header from upstream"; //读事件超时 if (c->read->timedout) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT); return; } //测试与upstream服务器的连通性 if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } //尚未实质分配数据缓冲区 if (u->buffer.start == NULL) { //分配数据缓冲区 u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size); if (u->buffer.start == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //对缓冲区描述符进行初始化 u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; u->buffer.end = u->buffer.start + u->conf->buffer_size; u->buffer.temporary = 1; u->buffer.tag = u->output.tag; //为收到的请求头们创建ngx_list结构,用来存贮解析到的 //请求头的名值对 if (ngx_list_init(&u->headers_in.headers, r->pool, 8, sizeof(ngx_table_elt_t)) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } //准备接受数据吧!!!狂奔的小怪兽 for ( ;; ) { n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last); //数据尚未接收完毕 if (n == NGX_AGAIN) { if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } //返回值为0,标志upstream服务器关闭了连接 if (n == 0) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "upstream prematurely closed connection"); } if (n == NGX_ERROR || n == 0) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } u->buffer.last += n; //处理接收到响应头数据 rc = u->process_header(r); //响应头尚未接收完毕 if (rc == NGX_AGAIN) { //buffer已经满了,无法容纳更多的响应头部 if (u->buffer.last == u->buffer.end) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "upstream sent too big header"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER); return; } continue; } break; } //解析到了无效错误头,真真苦逼啊 if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER); return; } if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } /* rc == NGX_OK */ //头部处理完毕,头部返回码大于300 if (u->headers_in.status_n > NGX_HTTP_SPECIAL_RESPONSE) { if (r->subrequest_in_memory) { u->buffer.last = u->buffer.pos; } if (ngx_http_upstream_test_next(r, u) == NGX_OK) { return; } //处理错误码大于300的错误情况,比如404错误, //页面没找到 if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) { return; } } //对u->headers_in中的头部进行处理过滤,把u->headers_in中的 //各个头部信息挪到r->headers_out里面,以便于发送 if (ngx_http_upstream_process_headers(r, u) != NGX_OK) { return; } //不是子请求,需要转发响应体 if (!r->subrequest_in_memory) { //调用该函数,先转发响应头,再转发响应体 ngx_http_upstream_send_response(r, u); return; } /* subrequest content in memory */ //以下为子请求的处理流程,当子请求的input_filter未设置时, //其默认的input_filter方法为ngx_http_upstream_non_buffered_filter, //即不转发收到的响应 if (u->input_filter == NULL) { u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //buffer.last和buffer.pos之间是多余的包体 n = u->buffer.last - u->buffer.pos; //下面对这段头部以外的包体进行处理 if (n) { u->buffer.last -= n; u->state->response_length += n; if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } //表明包体已经全部处理完毕,可以结束请求类 if (u->length == 0) { ngx_http_upstream_finalize_request(r, u, 0); return; } } //设置接收事件的处理函数 u->read_event_handler = ngx_http_upstream_process_body_in_memory; //在该函数中调用u->input_filter对后续包体进行处理, //该函数是针对子请求来说的,不转发包体,在内存中 //对包体进行处理 ngx_http_upstream_process_body_in_memory(r, u);
1ngx_http_upstream_process_body_in_memory static void ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r, ngx_http_upstream_t *u) { size_t size; ssize_t n; ngx_buf_t *b; ngx_event_t *rev; ngx_connection_t *c; //c是nginx与upstream上游服务器之间建立的连接 c = u->peer.connection; rev = c->read; //读事件超时,结束upstream if (rev->timedout) { return; } //u->buffer用来保存读取的数据 b = &u->buffer; for ( ;; ) { size = b->end - b->last; if (size == 0) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } //读取相关数据到u->buffer中 n = c->recv(c, b->last, size); //没有数据可读了,等待下一次处理 if (n == NGX_AGAIN) { break; } //对端已经结束了该连接或者发生了错误 if (n == 0 || n == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, n); return; } //response_length记录了已接收相应的长度 u->state->response_length += n; //对接收到的数据进行处理,一般子请求会重置该方法, //未设置的话,则会默认为ngx_http_upstream_non_buffered_filter,该 //方法仅仅是设置下该buffer以便继续接收数据 if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } //接收方向未ready退出 if (!rev->ready) { break; } } //设置读事件 if (ngx_handle_read_event(rev, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } if (rev->active) { ngx_add_timer(rev, u->conf->read_timeout); } else if (rev->timer_set) { ngx_del_timer(rev); } } 2 ngx_http_upstream_send_response 该函数会往客户端发送响应头及转发响应体,根据不同的设置来调用不同的包体转发。 static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) { int tcp_nodelay; ssize_t n; ngx_int_t rc; ngx_event_pipe_t *p; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; //发送回应头部,回应头部存放在request的headers_in里面, //在这里,有可能头部没有发送完毕,没关系,未发送 //完的数据在request的out链表里面放着呢,接着处理下面的 //响应包体即可 rc = ngx_http_send_header(r); if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) { ngx_http_upstream_finalize_request(r, u, rc); return; } //c是客户端与nginx之间建立的连接 c = r->connection; if (r->header_only) { if (u->cacheable || u->store) { if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) { ngx_connection_error(c, ngx_socket_errno, ngx_shutdown_socket_n " failed"); } r->read_event_handler = ngx_http_request_empty_handler; r->write_event_handler = ngx_http_request_empty_handler; c->error = 1; } else { ngx_http_upstream_finalize_request(r, u, rc); return; } } //将header_sent置位,表示响应头部已经发送了 u->header_sent = 1; //请求中带有包体,且包体被保存在了临时文件里面, //现在这些临时文件没有用了,可以清理掉了,OK, //毕竟,服务器的回应都来了,应该没问题了 if (r->request_body && r->request_body->temp_file) { ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd); r->request_body->temp_file->file.fd = NGX_INVALID_FILE; } //获得http core在该loc下的配置 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); //u->buffering为0表示下游网速优先,不需要开辟更多的缓存区 //来存放相关回应报文 if (!u->buffering) { //未设置input_filter的话,设置默认的处理函数,input_filter是对 //在buffer中接收到的数据进行相应处理,感觉主要有两个功能 //一是把相关buffer挂到out链表,一是对内容进行过滤 if (u->input_filter == NULL) { //啥都不做 u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; //该函数试图在buffer中缓存所有的数据,会操作设置ngx_buf中的 //各个字段 u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } //设置upstream读事件的处理回调函数 u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream; //设置request写事件的处理回调函数 r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream; r->limit_rate = 0; //调用input_filter之前进行初始化 if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return; } if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay"); tcp_nodelay = 1; if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, (const void *) &tcp_nodelay, sizeof(int)) == -1) { ngx_connection_error(c, ngx_socket_errno, "setsockopt(TCP_NODELAY) failed"); ngx_http_upstream_finalize_request(r, u, 0); return; } c->tcp_nodelay = NGX_TCP_NODELAY_SET; } //buffer.last与buffer.pos之间是剩余未被处理的数据 n = u->buffer.last - u->buffer.pos; //n>0,说明buffer中有未被转发的响应包体 if (n) { //在这里设置该last是为了在input_filter中处理时,对其 //进行重置 u->buffer.last = u->buffer.pos; //将响应包体的长度加上n u->state->response_length += n; //在input_filter中处理此次接收到的数据 if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return; } //在该函数中,开始向下游客户端发送响应包体, //发送完数据还会从上游接收包体 ngx_http_upstream_process_non_buffered_downstream(r); } else { //该buffer中目前仅有头部,没有回应包体,那下次 //从头部接收就可以了 u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return; } //有数据可以进行处理,处理上游数据,在该函数中 //收完上游包体也会往下游发送相应。 if (u->peer.connection->read->ready) { ngx_http_upstream_process_non_buffered_upstream(r, u); } } return; } /* TODO: preallocate event_pipe bufs, look "Content-Length" */ //下面这部分是buffer为1的情况,该情况允许nginx使用更多的buffer //去缓存包体数据,或者使用文件来进行缓存 p = u->pipe; //对pipe结构进行初始化,该结构专用于上游网速优先的情况 //设置向下游发送响应的调用函数 p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter; p->output_ctx = r; p->tag = u->output.tag; //设置可以使用的缓冲区的个数 p->bufs = u->conf->bufs; //设置busy缓冲区中待发送的响应长度触发值 p->busy_size = u->conf->busy_buffers_size; p->upstream = u->peer.connection; p->downstream = c; p->pool = r->pool; p->log = c->log; p->cacheable = u->cacheable || u->store; p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t)); if (p->temp_file == NULL) { ngx_http_upstream_finalize_request(r, u, 0); return; } //设置缓存数据的临时文件的路径,fd等 p->temp_file->file.fd = NGX_INVALID_FILE; p->temp_file->file.log = c->log; p->temp_file->path = u->conf->temp_path; p->temp_file->pool = r->pool; if (p->cacheable) { p->temp_file->persistent = 1; } else { p->temp_file->log_level = NGX_LOG_WARN; p->temp_file->warn = "an upstream response is buffered " "to a temporary file"; } //设置缓冲数据的临时文件的最大大小 p->max_temp_file_size = u->conf->max_temp_file_size; p->temp_file_write_size = u->conf->temp_file_write_size; p->preread_bufs = ngx_alloc_chain_link(r->pool); if (p->preread_bufs == NULL) { ngx_http_upstream_finalize_request(r, u, 0); return; } //初始化预读缓冲区链表,预读缓冲区是在读响应头 //时同时读到的响应体 p->preread_bufs->buf = &u->buffer; p->preread_bufs->next = NULL; u->buffer.recycled = 1; p->preread_size = u->buffer.last - u->buffer.pos; if (u->cacheable) { p->buf_to_file = ngx_calloc_buf(r->pool); if (p->buf_to_file == NULL) { ngx_http_upstream_finalize_request(r, u, 0); return; } p->buf_to_file->pos = u->buffer.start; p->buf_to_file->last = u->buffer.pos; p->buf_to_file->temporary = 1; } if (ngx_event_flags & NGX_USE_AIO_EVENT) { /* the posted aio operation may corrupt a shadow buffer */ p->single_buf = 1; } /* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */ p->free_bufs = 1; u->buffer.last = u->buffer.pos; if (u->conf->cyclic_temp_file) { p->cyclic_temp_file = 1; c->sendfile = 0; } else { p->cyclic_temp_file = 0; } //设置接收,发送的超时时间,设置发送的低水位 p->read_timeout = u->conf->read_timeout; p->send_timeout = clcf->send_timeout; p->send_lowat = clcf->send_lowat; //设置upstreadm的读事件处理方法,设置request的写时间处理方法 u->read_event_handler = ngx_http_upstream_process_upstream; r->write_event_handler = ngx_http_upstream_process_downstream; //处理上游数据,同时会往下游发送数据 ngx_http_upstream_process_upstream(r, u); }
2使用了多个buffer及临时文件来接收包体报文:读取上游报文用ngx_http_upstream_pr ocess_upstream,往下游写报文使用ngx_http_upstream_process_downstream。两个函数最后都是调用ngx_event_pipe,通过一个参数标记来控制读写方向。
3 ngx_http_upstream_process_non_buffered_request static void ngx_http_upstream_process_non_buffered_request(ngx_http_request_t *r, ngx_uint_t do_write) { size_t size; ssize_t n; ngx_buf_t *b; ngx_int_t rc; ngx_connection_t *downstream, *upstream; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; u = r->upstream; //downstream是下游与nginx的连接,upstream是上游与nginx的连接 downstream = r->connection; upstream = u->peer.connection; //buffer用来存放响应数据 b = &u->buffer; //do_write为1,或者所有的数据都已经发送完毕 do_write = do_write || u->length == 0; for ( ;; ) { //do_write标志代表需要进行数据发送操作 if (do_write) { //如果out_bufs或者busy_bufs中有数据,busy_bufs记录的是 //未能发送完毕的数据 if (u->out_bufs || u->busy_bufs) { rc = ngx_http_output_filter(r, u->out_bufs); if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return; } //该函数主要是置u->out_bufs为0,清理busy_bufs上已经被 //发送完毕的数据buf,将不用的数据buf放入到free_bufs里面 ngx_chain_update_chains(&u->free_bufs, &u->busy_bufs, &u->out_bufs, u->output.tag); } //busy_bufs中没有buf了,说明已经没有未发送完的数据了 if (u->busy_bufs == NULL) { //数据已经发送完毕,或者出错 if (u->length == 0 || upstream->read->eof || upstream->read->error) { ngx_http_upstream_finalize_request(r, u, 0); return; } //重置数据buffer的pos,last以便下次进行数据接收 b->pos = b->start; b->last = b->start; } } //last与end之间是可以用于接收数据的buffer大小 size = b->end - b->last; //确定下次要接收的数据大小 if (size > u->length) { size = u->length; } //read方向可读 if (size && upstream->read->ready) { //从upstream方向读取数据,放入到buffer里面 n = upstream->recv(upstream, b->last, size); //未能读取到数据,需要下次再读 if (n == NGX_AGAIN) { break; } //读取到了相关数据 if (n > 0) { u->state->response_length += n; //调用input_filter进行处理 if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, 0); return; } } //读取到了数据,开始往下游写 do_write = 1; continue; } break; } //得到core module在loc下的配置 clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); if (downstream->data == r) { //设置downstream的写事件,添加到epoll中 if (ngx_handle_write_event(downstream->write, clcf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, 0); return; } } //downstream connection写方向未ready,设置写 if (downstream->write->active && !downstream->write->ready) { ngx_add_timer(downstream->write, clcf->send_timeout); } else if (downstream->write->timer_set) { ngx_del_timer(downstream->write); } //设置upstream的读事件到epoll中 if (ngx_handle_read_event(upstream->read, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, 0); return; } //设置读超时 if (upstream->read->active && !upstream->read->ready) { ngx_add_timer(upstream->read, u->conf->read_timeout); } else if (upstream->read->timer_set) { ngx_del_timer(upstream->read); } } 4 ngx_event_pipe ngx_int_t ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write) { u_int flags; ngx_int_t rc; ngx_event_t *rev, *wev; for ( ;; ) { //do_write标记表明此时是向下游客户端写响应 if (do_write) { p->log->action = "sending to client"; //调用pipe_write_to_downstream往下游写响应 rc = ngx_event_pipe_write_to_downstream(p); if (rc == NGX_ABORT) { return NGX_ABORT; } if (rc == NGX_BUSY) { return NGX_OK; } } //p->read应该表明了有没有从上游读取到数据 p->read = 0; //upstream_blocked表明了是否可以继续从上游接收数据, //因为有可能缓冲区和缓存文件都满了,无法继续接收了 p->upstream_blocked = 0; p->log->action = "reading upstream"; //调用pipe_read_upstream从上游接收数据 if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) { return NGX_ABORT; } //在允许读取数据的情况下,没有从上游读取到数据 if (!p->read && !p->upstream_blocked) { break; } //读取到数据,往下游写 do_write = 1; } //将上游的读事件插入到定时器,将上游的读事件 //加入到epoll中,p->upstream是上游的connection. if (p->upstream->fd != -1) { rev = p->upstream->read; flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0; if (ngx_handle_read_event(rev, flags) != NGX_OK) { return NGX_ABORT; } //receive方向处于active状态,但当前没有数据可读 if (rev->active && !rev->ready) { ngx_add_timer(rev, p->read_timeout); } else if (rev->timer_set) { ngx_del_timer(rev); } } //将下游的读事件插入到定时器,将下游的写事件 //加入到epoll中,p->downstream是下游的connection if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) { wev = p->downstream->write; if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) { return NGX_ABORT; } //当发送的响应数据包速率超过了限制,就会置delayed标记 //,表明被延迟 if (!wev->delayed) { if (wev->active && !wev->ready) { ngx_add_timer(wev, p->send_timeout); } else if (wev->timer_set) { ngx_del_timer(wev); } } } return NGX_OK; } 从ngx_event_pipe的代码中可以看出,从上游接收及往下游发送分别调用了两个不同的函数: ngx_event_pipe_read_upstream:从上游读取报文的处理函数。 ngx_event_pipe_write_downstream:往下游发送数据。 4ngx_event_pipe_read_upstream static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p) { ssize_t n, size; ngx_int_t rc; ngx_buf_t *b; ngx_chain_t *chain, *cl, *ln; if (p->upstream_eof || p->upstream_error || p->upstream_done) { return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe read upstream: %d", p->upstream->read->ready); for ( ;; ) { if (p->upstream_eof || p->upstream_error || p->upstream_done) { break; } //upstream读事件没有数据可读,且预读buf不存在, //退出循环,预读buf中在读取响应头时可能会顺带着 //读出一部分的响应包体 if (p->preread_bufs == NULL && !p->upstream->read->ready) { break; } //预读的buf存在 if (p->preread_bufs) { //有preread_bufs存在,那么已经读取了一些响应数据,需要把 //这部分数据挂入到chain里面。 chain = p->preread_bufs; p->preread_bufs = NULL; //n是已经预读出来的数据,预读buf中已经存在了部分读取数据 n = p->preread_size; ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe preread: %z", n); //p->read表明读取出了响应,有preread_bufs,那么 if (n) { p->read = 1; } } else { //preread_bufs使用之后,就被置空了,如上面的代码所示 //下面就是找个buf去进行数据接收 if (p->free_raw_bufs) { /* use the free bufs if they exist */ chain = p->free_raw_bufs; if (p->single_buf) { p->free_raw_bufs = p->free_raw_bufs->next; chain->next = NULL; } else { p->free_raw_bufs = NULL; } //free_raw_bufs不存在,需要分配一个新的buf用,不能超过 //buf的个数限制 } else if (p->allocated < p->bufs.num) { /* allocate a new buf if it's still allowed */ //分配新的buffer,包括实际的缓冲区和buf描述符 b = ngx_create_temp_buf(p->pool, p->bufs.size); if (b == NULL) { return NGX_ABORT; } p->allocated++; //分配一个ngx_chain,指向刚分配的buf chain = ngx_alloc_chain_link(p->pool); if (chain == NULL) { return NGX_ABORT; } chain->buf = b; chain->next = NULL; } else if (!p->cacheable && p->downstream->data == p->output_ctx && p->downstream->write->ready && !p->downstream->write->delayed) {//该条件表明,downstream方向write已经准备好,可以发送数据, //且不存在发送速率超标的情况,那么将upstream_blocked置1, //阻止上游继续接收数据包,然后,尽快发送,以腾出buffer p->upstream_blocked = 1; break; } else if (p->cacheable || p->temp_file->offset < p->max_temp_file_size) {//到了这里,buffer都已经使用光了,且下游现在不能写, //那么只能缓存到临时文件里面去了,HOHO /* * if it is allowed, then save some bufs from r->in * to a temporary file, and add them to a r->out chain */ rc = ngx_event_pipe_write_chain_to_temp_file(p); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe temp offset: %O", p->temp_file->offset); if (rc == NGX_BUSY) { break; } if (rc == NGX_AGAIN) { if (ngx_event_flags & NGX_USE_LEVEL_EVENT && p->upstream->read->active && p->upstream->read->ready) { if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0) == NGX_ERROR) { return NGX_ABORT; } } } if (rc != NGX_OK) { return rc; } //p->free_raw_bufs链表中的buf是p->in链表中把数据写入临时文件后 //空闲出来的buf chain = p->free_raw_bufs; if (p->single_buf) { p->free_raw_bufs = p->free_raw_bufs->next; chain->next = NULL; } else { p->free_raw_bufs = NULL; } } else { /* there are no bufs to read in */ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "no pipe bufs to read in"); break; } //读取数据到buf n = p->upstream->recv_chain(p->upstream, chain); ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe recv chain: %z", n); //后面读取的数据居然在链表的前列,无语了。。。 if (p->free_raw_bufs) { chain->next = p->free_raw_bufs; } //重置free_raw_bufs p->free_raw_bufs = chain; //读取数据发生错误,置upstream_error if (n == NGX_ERROR) { p->upstream_error = 1; return NGX_ERROR; } //没有读取出数据,HOHO,退出就好了 if (n == NGX_AGAIN) { if (p->single_buf) { ngx_event_pipe_remove_shadow_links(chain->buf); } break; } //到这里,说明n>0,已经读取到了数据 p->read = 1; //n==0,说明上游服务器已经关闭了该连接,置upstream_eof标记 if (n == 0) { p->upstream_eof = 1; break; } } //read_length记录了从上游服务器读取的数据的长度 p->read_length += n; //c1其实记录了以前和本次free_raw_bufs的数据的链表 cl = chain; p->free_raw_bufs = NULL; //该段代码中会不停的把读取到的数据buf挂入到p->in的链表中 while (cl && n > 0) { ngx_event_pipe_remove_shadow_links(cl->buf); //size是该buf能读取的数据的大小 size = cl->buf->end - cl->buf->last; //该buf全部用来了读取数据,没有残余空间 if (n >= size) { cl->buf->last = cl->buf->end; /* STUB */ cl->buf->num = p->num++; //调用input_filter对刚刚接收到的数据进行处理,默认为 //ngx_event_pipe_copy_input_filter if (p->input_filter(p, cl->buf) == NGX_ERROR) { return NGX_ABORT; } n -= size; ln = cl; cl = cl->next; //释放相关chain的数据结构 ngx_free_chain(p->pool, ln); } else { //该cl的buf上面尚有部分数据未被挂入到in链表中 cl->buf->last += n; n = 0; } } //c1存在,应该说明其buf上尚有部分空间可以读入数据 if (cl) { for (ln = cl; ln->next; ln = ln->next) { /* void */ } ln->next = p->free_raw_bufs; p->free_raw_bufs = cl; } } if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) { /* STUB */ p->free_raw_bufs->buf->num = p->num++; if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) { return NGX_ABORT; } p->free_raw_bufs = p->free_raw_bufs->next; if (p->free_bufs && p->buf_to_file == NULL) { for (cl = p->free_raw_bufs; cl; cl = cl->next) { if (cl->buf->shadow == NULL) { ngx_pfree(p->pool, cl->buf->start); } } } } if (p->cacheable && p->in) { if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) { return NGX_ABORT; } } return NGX_OK; } 5 ngx_event_pipe_write_to_downstream static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p) { u_char *prev; size_t bsize; ngx_int_t rc; ngx_uint_t flush, flushed, prev_last_shadow; ngx_chain_t *out, **ll, *cl, file; ngx_connection_t *downstream; //downstream表示nginx与下游客户端之间的连接 downstream = p->downstream; flushed = 0; for ( ;; ) { if (p->downstream_error) { return ngx_event_pipe_drain_chains(p); } //上游服务器关闭了连接,或者出错的情况 if (p->upstream_eof || p->upstream_error || p->upstream_done) { for (cl = p->busy; cl; cl = cl->next) { cl->buf->recycled = 0; } //要发送的话,先发送p->out链表上的,再发送p->in链表上的 if (p->out) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write downstream flush out"); for (cl = p->out; cl; cl = cl->next) { cl->buf->recycled = 0; } //将p->out链表上的buf发送出去 rc = p->output_filter(p->output_ctx, p->out); if (rc == NGX_ERROR) { p->downstream_error = 1; return ngx_event_pipe_drain_chains(p); } p->out = NULL; } //后发送p->in链表上的buf,OK if (p->in) { ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write downstream flush in"); for (cl = p->in; cl; cl = cl->next) { cl->buf->recycled = 0; } //将p->in链表上的buf发送出去 rc = p->output_filter(p->output_ctx, p->in); if (rc == NGX_ERROR) { p->downstream_error = 1; return ngx_event_pipe_drain_chains(p); } p->in = NULL; } if (p->cacheable && p->buf_to_file) { file.buf = p->buf_to_file; = NULL; if (ngx_write_chain_to_temp_file(p->temp_file, &file) == NGX_ERROR) { return NGX_ABORT; } } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write downstream done"); /* TODO: free unused bufs */ //置downstream_done标记,表明downstream写操作处理完成 p->downstream_done = 1; break; } if (downstream->data != p->output_ctx || !downstream->write->ready || downstream->write->delayed) { break; } prev = NULL; bsize = 0; //计算已有busy buffer的大小 for (cl = p->busy; cl; cl = cl->next) { if (cl->buf->recycled) { if (prev == cl->buf->start) { continue; } bsize += cl->buf->end - cl->buf->start; prev = cl->buf->start; } } out = NULL; //已有的busy链表中数据长度已经超出了限制,需先 //flush掉该部分数据,才能接着写入out与in链表的数据、 if (bsize >= (size_t) p->busy_size) { flush = 1; goto flush; } flush = 0; ll = NULL; prev_last_shadow = 1; //下面是遍历out链表和in链表,将out和in链表中不超过busy_size //的buf串起来组成一个临时的out链表,调用output_filter函数去发送 for ( ;; ) { if (p->out) { cl = p->out; if (cl->buf->recycled && bsize + cl->buf->last - cl->buf->pos > p->busy_size) { flush = 1; break; } p->out = p->out->next; ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf); } else if (!p->cacheable && p->in) { cl = p->in; ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe write buf ls:%d %p %z", cl->buf->last_shadow, cl->buf->pos, cl->buf->last - cl->buf->pos); if (cl->buf->recycled && cl->buf->last_shadow && bsize + cl->buf->last - cl->buf->pos > p->busy_size) { if (!prev_last_shadow) { p->in = p->in->next; cl->next = NULL; if (out) { *ll = cl; } else { out = cl; } } flush = 1; break; } prev_last_shadow = cl->buf->last_shadow; p->in = p->in->next; } else { break; } if (cl->buf->recycled) { bsize += cl->buf->last - cl->buf->pos; } cl->next = NULL; if (out) { *ll = cl; } else { out = cl; } ll = &cl->next; } flush: if (out == NULL) { if (!flush) { break; } /* a workaround for AIO */ if (flushed++ > 10) { return NGX_BUSY; } } //将临时组成的包含了p->out,p->in中buf的链表,往下游发送 rc = p->output_filter(p->output_ctx, out); //更新busy,free链表 ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag); if (rc == NGX_ERROR) { p->downstream_error = 1; return ngx_event_pipe_drain_chains(p); } //释放free链表中的缓冲区 for (cl = p->free; cl; cl = cl->next) { if (cl->buf->temp_file) { if (p->cacheable || !p->cyclic_temp_file) { continue; } /* reset p->temp_offset if all bufs had been sent */ if (cl->buf->file_last == p->temp_file->offset) { p->temp_file->offset = 0; } } if (cl->buf->last_shadow) { if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) { return NGX_ABORT; } cl->buf->last_shadow = 0; } cl->buf->shadow = NULL; } } return NGX_OK; }