[转][Nginx 源码分析] Fastcgi 模块(下)
本文主要分析从upstream
读取数据、数据报文解析和向客户端输出这三部分功能。
从 upstream 读取的 FASTCGI 报文主要有三种:
- upstream 处理正常的话,返回 FCGI_STDOUT 报文
- 有错误,则返回 FASTCGI_STDERR 报文。
- 结尾会有个 FCGI_END_REQUEST 报文
将 upstream 请求报文发送出去后,Nginx 会随即绑定 read event,等待处理 upstream
的返回。
u->read_event_handler = ngx_http_upstream_process_header
Http Header 部分处理流程大概如下:
1) 调用 c->recv 从 upstream 读取数据。
2) 调用 u->process_header
处理 Fastcgi 报文。process_header
为函数指针,指向ngx_http_fastcgi_process_header。
3) ngx_http_fastcgi_process_header 解析完报文,调用
ngx_http_parse_header_line 解析 HTTP 报文头。
首先是ngx_http_upstream_process_header
static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t*u) { // ... for ( ;; ) { // 从 upstream 读取数据 n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last); if (n == NGX_AGAIN) { #if 0 ngx_add_timer(rev, u->read_timeout); #endif // 未完成,待下次继续读取 if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } if (n == 0) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "upstream prematurely closed connection"); } if (n == NGX_ERROR || n == 0) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } u->buffer.last += n; #if 0 u->valid_header_in = 0; u->peer.cached = 0; #endif // 处理 fastcgi 报文 // 这里实际调用的是 ngx_http_fastcgi_process_header rc = u->process_header(r); // Fastcgi 报文未完,继续读 if (rc == NGX_AGAIN) { if (u->buffer.pos == u->buffer.end) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "upstream sent too big header"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER); return; } continue; } break; } // .... // copy http header 到 headers_out 域 if (ngx_http_upstream_process_headers(r, u) != NGX_OK) { return; } // ... // 解析完 HTTP 报文头,开始向客户端发送 response,后面详细分析 if (!r->subrequest_in_memory) { ngx_http_upstream_send_response(r, u); return; } // ...
处理 FASTCGI 报文的状态机
该状态机有以下几种状态:
typedef enum { ngx_http_fastcgi_st_version = 0, ngx_http_fastcgi_st_type, ngx_http_fastcgi_st_request_id_hi, ngx_http_fastcgi_st_request_id_lo, ngx_http_fastcgi_st_content_length_hi, ngx_http_fastcgi_st_content_length_lo, ngx_http_fastcgi_st_padding_length, ngx_http_fastcgi_st_reserved, ngx_http_fastcgi_st_data, ngx_http_fastcgi_st_padding } ngx_http_fastcgi_state_e;
FASTCGI Header 报文的每个字段都算一个状态。
该状态机具体实现如下:
static ngx_int_t ngx_http_fastcgi_process_record(ngx_http_request_t *r, ngx_http_fastcgi_ctx_t *f) { u_char ch, *p; ngx_http_fastcgi_state_e state; state = f->state; for (p = f->pos; p < f->last; p++) { ch = *p; switch (state) { case ngx_http_fastcgi_st_version: if (ch != 1) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "upstream sent unsupported FastCGI " "protocol version: %d", ch); return NGX_ERROR; } // 转向 ngx_http_fastcgi_st_type 状态 state = ngx_http_fastcgi_st_type; break; case ngx_http_fastcgi_st_type: switch (ch) { case NGX_HTTP_FASTCGI_STDOUT: case NGX_HTTP_FASTCGI_STDERR: case NGX_HTTP_FASTCGI_END_REQUEST: // 保存下报文类型 f->type = (ngx_uint_t) ch; break; default: ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "upstream sent invalid FastCGI " "record type: %d", ch); return NGX_ERROR; } // 进入 ngx_http_fastcgi_st_request_id_hi 状态 state = ngx_http_fastcgi_st_request_id_hi; break; // ... case ngx_http_fastcgi_st_reserved: state = ngx_http_fastcgi_st_data; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http fastcgi record length: %z", f->length); // 如果到 ngx_http_fastcgi_st_reserved 状态,则下一个是 data 域,返回 ok。 f->pos = p + 1; f->state = state; return NGX_OK;
该自动机处理完成后,可以得到该 FASTCGI 报文的类型,data 域的长度,data
域的开始位置。
ngx_http_fastcgi_process_header 函数
static ngx_int_t ngx_http_fastcgi_process_header(ngx_http_request_t *r) { //............ f = ngx_http_get_module_ctx(r, ngx_http_fastcgi_module); umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); u = r->upstream; for ( ;; ) { // 调用上面的状态机分析 Fastcgi Header,最终得到报文类型、长度、data 域的起始位置 if (f->state < ngx_http_fastcgi_st_data) { f->pos = u->buffer.pos; f->last = u->buffer.last; rc = ngx_http_fastcgi_process_record(r, f); u->buffer.pos = f->pos; u->buffer.last = f->last; if (rc == NGX_AGAIN) { return NGX_AGAIN; } //.... } // ... // 忽略 FASTCGI_STDERR 报文处理,只分析 FASTCGI_STDOUT f->fastcgi_stdout = 1; start = u->buffer.pos; // 下面英文注释写的很清楚 if (u->buffer.pos + f->length < u->buffer.last) { /* * set u->buffer.last to the end of the FastCGI record data * for ngx_http_parse_header_line() */ last = u->buffer.last; u->buffer.last = u->buffer.pos + f->length; } else { last = NULL; } //开始解析 HTTP header for ( ;; ) { part_start = u->buffer.pos; part_end = u->buffer.last; // ngx_http_parse_header_line 又是一个状态机 // 解析出 Header 的 name 和 value 这两部分的 pos 和 length rc = ngx_http_parse_header_line(r, &u->buffer, 1); if (rc == NGX_AGAIN) { break; } // 解析 HTTP Header 成功 if (rc == NGX_OK) { /* a header line has been parsed successfully */ // 创建一个 ngx_table_elt_t 来存储解析的 Header h = ngx_list_push(&u->headers_in.headers); if (h == NULL) { return NGX_ERROR; } // ... //... 设置 header key 和 value 的值 h->key.len = r->header_name_end - r->header_name_start; h->value.len = r->header_end - r->header_start; // 为 key 和 value 分配存储空间 // key 和 value 连续存储 h->key.data = ngx_pnalloc(r->pool, h->key.len + 1 + h->value.len +1 + h->key.len); if (h->key.data == NULL) { return NGX_ERROR; } h->value.data = h->key.data + h->key.len + 1; h->lowcase_key = h->key.data + h->key.len + 1 + h->value.len + 1; // 复制 key 和 value 数据 ngx_cpystrn(h->key.data, r->header_name_start, h->key.len + 1); ngx_cpystrn(h->value.data, r->header_start, h->value.len + 1); //... // ... hh = ngx_hash_find(&umcf->headers_in_hash, h->hash, h->lowcase_key, h->key.len); // 设置 r->headers_in 中该 field(key) 域的值 // hh->handler 函数指针,具体每个 header 的实现,在 ngx_http_upstream.c 中 if (hh && hh->handler(r, h, hh->offset) != NGX_OK) { return NGX_ERROR; } //... } // 解析 HTTP Header 结束 // 根据结果设置 http status 和 status_line if (rc == NGX_HTTP_PARSE_HEADER_DONE) { /* a whole header has been parsed successfully */ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http fastcgi header done"); // 根据结果设置 http status 和 status_line if (u->headers_in.status) { status_line = &u->headers_in.status->value; status = ngx_atoi(status_line->data, 3); if (status == NGX_ERROR) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "upstream sent invalid status \"%V\"", status_line); return NGX_HTTP_UPSTREAM_INVALID_HEADER; } u->headers_in.status_n = status; u->headers_in.status_line = *status_line; } else if (u->headers_in.location) { u->headers_in.status_n = 302; ngx_str_set(&u->headers_in.status_line, "302 Moved Temporarily"); } else { u->headers_in.status_n = 200; ngx_str_set(&u->headers_in.status_line, "200 OK"); } if (u->state) { u->state->status = u->headers_in.status_n; } break; } } //.... // 设置 last 的值 if (last) { u->buffer.last = last; } f->length -= u->buffer.pos - start; if (f->length == 0) { if (f->padding) { f->state = ngx_http_fastcgi_st_padding; } else { f->state = ngx_http_fastcgi_st_version; } } // 如果解析 HTTP Header 结束,直接返回 if (rc == NGX_HTTP_PARSE_HEADER_DONE) { return NGX_OK; } if (rc == NGX_OK) { continue; }
Header 解析部分到此结束。
向客户端发送 Response
部分
上面分析过的 ngx_http_upstream_process_header
函数最后,调用ngx_http_upstream_send_response
开始向客户端发送 Response。
Response 的发送,分两部分进行。首先调用 ngx_http_send_header
发送 http header;然后建立一个 nginx event
pipe,一边从 upstream 读取数据,一边调用 output filter
,向客户端写入。
先看 ngx_http_upstream_send_response 函数
static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t*u) { //... // 先发送 http header,进入 ngx_http_top_header_filter 链 rc = ngx_http_send_header(r); // ... u->header_sent = 1; // ... p = u->pipe; // 设置好 ngx_http_output_filter 链,准备处理 upstream body p->output_filter = (ngx_event_pipe_output_filter_pt)ngx_http_output_filter; p->output_ctx = r; p->tag = u->output.tag; p->bufs = u->conf->bufs; p->busy_size = u->conf->busy_buffers_size; p->upstream = u->peer.connection; p->downstream = c; p->pool = r->pool; p->log = c->log; // ... // 设置 read event 来读取 upstream 数据 u->read_event_handler = ngx_http_upstream_process_upstream; // 设置 write event 来处理 upstream 返回的数据,并进行 output_filter 系列处理输出 r->write_event_handler = ngx_http_upstream_process_downstream; // 先从 upstream 读取数据 ngx_http_upstream_process_upstream(r, u); }
ngx_http_upstream_process_upstream 和 ngx_http_upstream_process_downstream
最终都是调用 ngx_event_pipe
这个函数进行读取和写入的,下面分析下 ngx_event_pipe。
ngx_int_t ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write) { u_int flags; ngx_int_t rc; ngx_event_t *rev, *wev; for ( ;; ) { // do_write 为 1,处理写事件 if (do_write) { p->log->action = "sending to client"; // 进入 output_filter 链,向客户端写数据 rc = ngx_event_pipe_write_to_downstream(p); if (rc == NGX_ABORT) { return NGX_ABORT; } if (rc == NGX_BUSY) { return NGX_OK; } } // do_write 为 0,处理读事件,从 upstream 读数据 p->read = 0; p->upstream_blocked = 0; p->log->action = "reading upstream"; // 从 upstream 读数据 if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) { return NGX_ABORT; } if (!p->read && !p->upstream_blocked) { break; } do_write = 1; } // 如果 upstream 未准备好,设置 read event,待下次读 if (p->upstream->fd != -1) { rev = p->upstream->read; flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0; if (ngx_handle_read_event(rev, flags) != NGX_OK) { return NGX_ABORT; } if (rev->active && !rev->ready) { ngx_add_timer(rev, p->read_timeout); } else if (rev->timer_set) { ngx_del_timer(rev); } } // 设置 write event if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) { wev = p->downstream->write; if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) { return NGX_ABORT; } if (!wev->delayed) { if (wev->active && !wev->ready) { ngx_add_timer(wev, p->send_timeout); } else if (wev->timer_set) { ngx_del_timer(wev); } } } return NGX_OK; }
ngx_event_pipe_read_upstream 做很多 buffer
分配和管理工作,最终调用下面函数完成读取
n = p->upstream->recv_chain(p->upstream, chain);
ngx_event_pipe_write_to_downstream 最终调用 output_filter,进入 filter 链,进行输出
rc = p->output_filter(p->output_ctx, out);