nginx的mirror与subrequest源码分析

    技术2023-06-19  84

    mirror配置

        server {

            listen 0.0.0.0:978;

            location / {

                mirror /mirror;

                proxy_pass http://vpc_support;                                                                                                                           

                session_sticky_hide_cookie  upstream=vpc_support;

            }   

            location = /mirror {

                internal;

                proxy_pass http://test_backend$request_uri;

            }   

        }   

     

    核心数据结构

    两个核心数据结构:

    第一个是每个请求都有的postponed链表,一般情况下每个链表节点保存了该请求的一个子请求

    struct ngx_http_postponed_request_s {

        ngx_http_request_t               *request;

        ngx_chain_t                      *out;

        ngx_http_postponed_request_t     *next;

    };

    第二个是posted_requests链表,它挂载了当前需要遍历的请求(节点), 该链表保存在主请求(根节点)的posted_requests字段

    struct ngx_http_posted_request_s {

        ngx_http_request_t               *request;

        ngx_http_posted_request_t        *next;

    };

     

    ngx_http_mirror_handler

    handler处于precontent阶段

    static ngx_int_t

    ngx_http_mirror_handler(ngx_http_request_t *r)

    {

    ...

    mlcf = ngx_http_get_module_loc_conf(r, ngx_http_mirror_module);

    // 接收包体打开

    if (mlcf->request_body) {

    rc = ngx_http_read_client_request_body(r, ngx_http_mirror_body_handler);

    }

    return ngx_http_mirror_handler_internal(r);

    ...

    }

     

    // 该handler是接收完包体的回调函数

    static void

    ngx_http_mirror_body_handler(ngx_http_request_t *r)

    {

        ngx_http_mirror_ctx_t  *ctx;

        ctx = ngx_http_get_module_ctx(r, ngx_http_mirror_module);

    // 子请求在这个函数中被创建,与主请求关联

        ctx->status = ngx_http_mirror_handler_internal(r);

        r->preserve_body = 1;

    // 主请求继续run phases

        r->write_event_handler = ngx_http_core_run_phases;

        ngx_http_core_run_phases(r);

    }

     

    static ngx_int_t

    ngx_http_mirror_handler_internal(ngx_http_request_t *r)

    {

    ...

    for (i = 0; i < mlcf->mirror->nelts; i++) {

    // 遍历mirror数组,生成subrequest

            if (ngx_http_subrequest(r, &name[i], &r->args, &sr, NULL,

                                    NGX_HTTP_SUBREQUEST_BACKGROUND)

                != NGX_OK)

            {

                return NGX_HTTP_INTERNAL_SERVER_ERROR;

            }

            sr->header_only = 1;

            sr->method = r->method;

            sr->method_name = r->method_name;

        }

        return NGX_DECLINED;

    ...

    }

     

    ngx_http_subrequest

    ngx_int_t

    ngx_http_subrequest(ngx_http_request_t *r,

        ngx_str_t *uri, ngx_str_t *args, ngx_http_request_t **psr,

        ngx_http_post_subrequest_t *ps, ngx_uint_t flags)

    {

    ...

    // 申请一个新的request

    sr = ngx_pcalloc(r->pool, sizeof(ngx_http_request_t));

        if (sr == NULL) {

            return NGX_ERROR;

        }

    // 如果不是background模式(mirror默认使用这个模式),会申请ngx_http_postponed_request_t结构体(包装子请求)

    //,将sr赋值给pr->request,并把pr挂到r->postponed的链表里(sr设置为r的孩子)

    if (!sr->background) {

            if (c->data == r && r->postponed == NULL) {

                c->data = sr;

            }

            pr = ngx_palloc(r->pool, sizeof(ngx_http_postponed_request_t));

            if (pr == NULL) {

                return NGX_ERROR;

            }

            pr->request = sr;

            pr->out = NULL;

            pr->next = NULL;

            if (r->postponed) {

                for (p = r->postponed; p->next; p = p->next) { /* void */ }

                p->next = pr;

            } else {

                r->postponed = pr;

            }

        }

    sr->internal = 1;

    sr->method = NGX_HTTP_GET;

    sr->read_event_handler = ngx_http_request_empty_handler;

        sr->write_event_handler = ngx_http_handler;

    r->main->count++;

    *psr = sr;

    // 将当前这个sr挂到main request的posted_requests中。

    return ngx_http_post_request(sr, NULL);

    ...

    }

     

    ngx_int_t

    ngx_http_post_request(ngx_http_request_t *r, ngx_http_posted_request_t *pr)

    {

    ...

    // pr是null,申请一个pr

    if (pr == NULL) {

            pr = ngx_palloc(r->pool, sizeof(ngx_http_posted_request_t));

            if (pr == NULL) {

                return NGX_ERROR;

            }

        }

    // 将当前的子请求赋值给pr->request

        pr->request = r;

        pr->next = NULL;

     

        for (p = &r->main->posted_requests; *p; p = &(*p)->next) { /* void */ }

    // 找到main request的posted_requests,给挂到末尾

        *p = pr;

    ...

    }

     

    ngx_http_run_posted_requests

    到这时,子请求创建完毕,一般来说子请求的创建都发生在某个请求的content handler或者某个filter内。

    从上面的函数可以看到子请求并没有马上被执行,只是被挂载在了主请求的posted_requests链表中。

    posted_requests链表是在ngx_http_run_posted_requests函数中遍历。在某个请求的读(写)事件的handler中,执行完该请求相关的处理后被调用。

    比如主请求在走完一遍PHASE的时候会调用ngx_http_run_posted_requests。

    这时子请求得以运行。

    void

    ngx_http_run_posted_requests(ngx_connection_t *c)

    {

        ngx_http_request_t         *r;

        ngx_http_posted_request_t  *pr;

     

        for ( ;; ) {

            if (c->destroyed) {

                return;

            }

            r = c->data;

    // 从main开始遍历他的所有子请求

            pr = r->main->posted_requests;

            if (pr == NULL) {

                return;

            }

            r->main->posted_requests = pr->next;

            r = pr->request;

            ngx_http_set_log_request(c->log, r);

            ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,

                           "http posted request: \"%V?%V\"", &r->uri, &r->args);

    // 调用write event handller, mirror的handler就是ngx_http_handler

            r->write_event_handler(r);

        }

    }

     

    void

    ngx_http_handler(ngx_http_request_t *r)

    {

        ngx_http_core_main_conf_t  *cmcf;

        r->connection->log->action = NULL;

        if (!r->internal) {

            switch (r->headers_in.connection_type) {

            case 0:

                r->keepalive = (r->http_version > NGX_HTTP_VERSION_10);

                break;

            case NGX_HTTP_CONNECTION_CLOSE:

                r->keepalive = 0;

                break;

            case NGX_HTTP_CONNECTION_KEEP_ALIVE:

                r->keepalive = 1;

                break;

            }

            r->lingering_close = (r->headers_in.content_length_n > 0

                                  || r->headers_in.chunked);

            r->phase_handler = 0;

        } else {

    // 对于mirror来说,因为是interal,从server_rewrite开始

            cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);

            r->phase_handler = cmcf->phase_engine.server_rewrite_index;

        }

        r->valid_location = 1;

    #if (NGX_HTTP_GZIP)

        r->gzip_tested = 0;

        r->gzip_ok = 0;

        r->gzip_vary = 0;

    #endif

    // 从头开始执行phases

        r->write_event_handler = ngx_http_core_run_phases;

        ngx_http_core_run_phases(r);

    }

     

    ngx_http_process_request_headers,处理请求,

    static void

    ngx_http_process_request_headers(ngx_event_t *rev)

    {

    ...

    /* a whole header has been parsed successfully */

    rc = ngx_http_process_request_header(r);

     

        if (rc != NGX_OK) {

             break;

        }

        ngx_http_process_request(r);

    // 运行子请求

    ngx_http_run_posted_requests(c);

    ...

    }

     

    void

    ngx_http_process_request(ngx_http_request_t *r)

    {

    ...

    c->read->handler = ngx_http_request_handler;

        c->write->handler = ngx_http_request_handler;

        r->read_event_handler = ngx_http_block_reading;

        ngx_http_handler(r);

    ...

    }

     

    void

    ngx_http_handler(ngx_http_request_t *r)

    {

    ...

    if (!r->internal) {

            switch (r->headers_in.connection_type) {

            case 0:

                r->keepalive = (r->http_version > NGX_HTTP_VERSION_10);

                break;

            case NGX_HTTP_CONNECTION_CLOSE:

                r->keepalive = 0;

                break;

            case NGX_HTTP_CONNECTION_KEEP_ALIVE:

                r->keepalive = 1;

                break;

            }

            r->lingering_close = (r->headers_in.content_length_n > 0

                                  || r->headers_in.chunked);

            r->phase_handler = 0;

        } else {

            cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);

            r->phase_handler = cmcf->phase_engine.server_rewrite_index;

        }

     

    r->write_event_handler = ngx_http_core_run_phases;

        ngx_http_core_run_phases(r);

    ...

    }

    finalize:这块对于mirror来说,是从server rewrite开始一直到upstream,结束之后,对这个request进行finalize

    void

    ngx_http_finalize_request(ngx_http_request_t *r, ngx_int_t rc)

    {

    ...

    // 子请求,且有回调函数,执行

    if (r != r->main && r->post_subrequest) {

            rc = r->post_subrequest->handler(r, r->post_subrequest->data, rc);

        }

    // 子请求

    if (r != r->main) {

    // background直接关闭request

    if (r->background) {

                r->done = 1;

                ngx_http_finalize_connection(r);

                return;

            }

    /* 该子请求还有未处理完的数据或者子请求 */

    if (r->buffered || r->postponed) {

                /* 添加一个该子请求的写事件,并设置合适的write event hander,

                   以便下次写事件来的时候继续处理,这里实际上下次执行时会调用ngx_http_output_filter函数,

                   最终还是会进入ngx_http_postpone_filter进行处理 */

                if (ngx_http_set_write_handler(r) != NGX_OK) {

                    ngx_http_terminate_request(r, 0);

                }

                return;

            }

    pr = r->parent;

    /* 该子请求已经处理完毕,如果它拥有发送数据的权利,则将权利移交给父请求,c->data指向正在处理的请求,和当前request相同说明,是子请求移交过来的 */

            if (r == c->data) {

                r->main->count--;

                r->done = 1;

                /* 如果该子请求不是提前完成,则从父请求的postponed链表中删除 */

                if (pr->postponed && pr->postponed->request == r) {

                    pr->postponed = pr->postponed->next;

                }

    // 将c->data指向parent

    c->data = pr;

    } else {

                /* 到这里其实表明该子请求提前执行完成,而且它没有产生任何数据,则它下次再次获得

                   执行机会时,将会执行ngx_http_request_finalzier函数,它实际上是执行

                   ngx_http_finalzie_request(r,0),也就是什么都不干,直到轮到它发送数据时,

                   ngx_http_finalzie_request函数会将它从父请求的postponed链表中删除 */

                r->write_event_handler = ngx_http_request_finalizer;

                if (r->waited) {

                    r->done = 1;

                }

            }

    /* 将父请求加入posted_request队尾,获得一次运行机会???????这个运行机会是啥意思 */

            if (ngx_http_post_request(pr, NULL) != NGX_OK) {

                r->main->count++;

                ngx_http_terminate_request(r, 0);

                return;

            }

    }

    /* 这里是处理主请求结束的逻辑,如果主请求有未发送的数据或者未处理的子请求,

           则给主请求添加写事件,并设置合适的write event hander,

           以便下次写事件来的时候继续处理 */

        if (r->buffered || c->buffered || r->postponed || r->blocked) {

     

            if (ngx_http_set_write_handler(r) != NGX_OK) {

                ngx_http_terminate_request(r, 0);

            }

            return;

        }

    ...

    }

     

    body response

    我们nginx做得是反向代理,upstream处理完请求后,开始发送response

    static void

    ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)

    {

    ...

    ngx_http_upstream_send_response(r, u);

    ...

    }

    static void

    ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)

    {

    ...

    rc = ngx_http_send_header(r);

    p->output_filter = ngx_http_upstream_output_filter;

    ...

    }

     

    static ngx_int_t

    ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)

    {

    ...

    rc = ngx_http_output_filter(r, chain);

    ...

    }

     

    ngx_int_t

    ngx_http_send_header(ngx_http_request_t *r)

    {

    return ngx_http_top_header_filter(r);

    }

     

    ngx_int_t

    ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)

    {

    rc = ngx_http_top_body_filter(r, in);

    }

     

    output filter是发送output的filter,里面会调用top_body_filter,这个链表里面有ngx_http_postpone_filter,这个是组织回包格式的filter

    ngx_int_t

    ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)

    {

        ngx_int_t          rc;

        ngx_connection_t  *c;

        c = r->connection;

        ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,

                       "http output filter \"%V?%V\"", &r->uri, &r->args);

     

        rc = ngx_http_top_body_filter(r, in);

        if (rc == NGX_ERROR) {

            /* NGX_ERROR may be returned by any filter */

            c->error = 1;

        }

        return rc;

    }

    static ngx_int_t

    ngx_http_postpone_filter(ngx_http_request_t *r, ngx_chain_t *in)

    {

    ...

    ngx_connection_t              *c;  

        ngx_http_postponed_request_t  *pr;  

    //取得当前的链接  

        c = r->connection;  

      

    //如果r不等于c->data,前面的分析知道c->data保存的是最新的一个sub request(同级的话,是第一个),因此不等于则说明是需要保存数据的父request。  

        if (r != c->data) {  

            if (in) {  

    //保存数据  

                ngx_http_postpone_filter_add(r, in);  

    //这里注意不发送任何数据,直接返回OK。而最终会在finalize_request中处理。  

                return NGX_OK;  

            }  

            return NGX_OK;  

        }

    //如果r->postponed为空,则说明是最后一个sub request,也就是最新的那个,因此需要将它先发送出去。  

        if (r->postponed == NULL) {  

    //如果in存在,则发送出去  

            if (in || c->buffered) {  

                return ngx_http_next_filter(r->main, in);  

            }  

            return NGX_OK;  

        }

    ...

    }

    子请求

    1,子请求不存在响应头部的概念。

            2,子请求读事件设置为空,子请求不受前段控制。

    Processed: 0.013, SQL: 9