Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Apache/modules/http2/   (Apache Software Stiftung Version 2.4.65©)  Datei vom 7.6.2025 mit Größe 63 kB image not shown  

Quelle  h2_proxy_session.c   Sprache: C

 
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

 
#include <stddef.h>
#include <apr_strings.h>
#include <nghttp2/nghttp2.h>

#include <mpm_common.h>
#include <httpd.h>
#include <http_protocol.h>
#include <mod_proxy.h>

#include "mod_http2.h"
#include "h2.h"
#include "h2_proxy_util.h"
#include "h2_proxy_session.h"

APLOG_USE_MODULE(proxy_http2);

typedef struct h2_proxy_stream {
    int id;
    apr_pool_t *pool;
    h2_proxy_session *session;

    const char *url;
    request_rec *r;
    conn_rec *cfront;
    h2_proxy_request *req;
    const char *real_server_uri;
    const char *p_server_uri;
    int standalone;

    h2_proxy_stream_state_t state;
    unsigned int suspended : 1;
    unsigned int waiting_on_100 : 1;
    unsigned int waiting_on_ping : 1;
    unsigned int headers_ended : 1;
    uint32_t error_code;

    apr_bucket_brigade *input;
    apr_off_t data_sent;
    apr_bucket_brigade *output;
    apr_off_t data_received;
    
    apr_table_t *saves;
} h2_proxy_stream;


static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
                           int arg, const char *msg);
static void ping_arrived(h2_proxy_session *session);
static apr_status_t check_suspended(h2_proxy_session *session);
static void stream_resume(h2_proxy_stream *stream);
static apr_status_t submit_trailers(h2_proxy_stream *stream);

/*
 * The H2_PING connection sub-state: a state independant of the H2_SESSION state
 * of the connection:
 * - H2_PING_ST_NONE: no interference with request handling, ProxyTimeout in effect.
 *   When entered, all suspended streams are unsuspended again.
 * - H2_PING_ST_AWAIT_ANY: new requests are suspended, a possibly configured "ping"
 *   timeout is in effect. Any frame received transits to H2_PING_ST_NONE.
 * - H2_PING_ST_AWAIT_PING: same as above, but only a PING frame transits 
 *   to H2_PING_ST_NONE.
 *
 * An AWAIT state is entered on a new connection or when re-using a connection and
 * the last frame received has been some time ago. The latter sends a PING frame
 * and insists on an answer, the former is satisfied by any frame received from the
 * backend.
 *
 * This works for new connections as there is always at least one SETTINGS frame
 * that the backend sends. When re-using connection, we send a PING and insist on
 * receiving one back, as there might be frames in our connection buffers from
 * some time ago. Since some servers have protections against PING flooding, we
 * only ever have one PING unanswered.
 *
 * Requests are suspended while in a PING state, as we do not want to send data
 * before we can be reasonably sure that the connection is working (at least on
 * the h2 protocol level). This also means that the session can do blocking reads
 * when expecting PING answers.
 */

static void set_ping_timeout(h2_proxy_session *session)
{
    if (session->ping_timeout != -1 && session->save_timeout == -1) {
        apr_socket_t *socket = NULL;

        socket = ap_get_conn_socket(session->c);
        if (socket) {
            apr_socket_timeout_get(socket, &session->save_timeout);
            apr_socket_timeout_set(socket, session->ping_timeout);
        }
    }
}

static void unset_ping_timeout(h2_proxy_session *session)
{
    if (session->save_timeout != -1) {
        apr_socket_t *socket = NULL;

        socket = ap_get_conn_socket(session->c);
        if (socket) {
            apr_socket_timeout_set(socket, session->save_timeout);
            session->save_timeout = -1;
        }
    }
}

static void enter_ping_state(h2_proxy_session *session, h2_ping_state_t state)
{
    if (session->ping_state == state) return;
    switch (session->ping_state) {
    case H2_PING_ST_NONE:
        /* leaving NONE, enforce timeout, send frame maybe */
        if (H2_PING_ST_AWAIT_PING == state) {
            unset_ping_timeout(session);
            nghttp2_submit_ping(session->ngh2, 0, (const uint8_t *)"nevergonnagiveyouup");
        }
        set_ping_timeout(session);
        session->ping_state = state;
        break;
    default:
        /* no switching between the != NONE states */
        if (H2_PING_ST_NONE == state) {
            session->ping_state = state;
            unset_ping_timeout(session);
            ping_arrived(session);
        }
        break;
    }
}

static void ping_new_session(h2_proxy_session *session, proxy_conn_rec *p_conn)
{
    session->save_timeout = -1;
    session->ping_timeout = (p_conn->worker->s->ping_timeout_set?
                             p_conn->worker->s->ping_timeout : -1);
    session->ping_state = H2_PING_ST_NONE;
    enter_ping_state(session, H2_PING_ST_AWAIT_ANY);
}

static void ping_reuse_session(h2_proxy_session *session)
{
    if (H2_PING_ST_NONE == session->ping_state) {
        apr_interval_time_t age = apr_time_now() - session->last_frame_received;
        if (age > apr_time_from_sec(1)) {
            enter_ping_state(session, H2_PING_ST_AWAIT_PING);
        }
    }
}

static void ping_ev_frame_received(h2_proxy_session *session, const nghttp2_frame *frame)
{
    session->last_frame_received = apr_time_now();
    switch (session->ping_state) {
    case H2_PING_ST_NONE:
        /* nop */
        break;
    case H2_PING_ST_AWAIT_ANY:
        enter_ping_state(session, H2_PING_ST_NONE);
        break;
    case H2_PING_ST_AWAIT_PING:
        if (NGHTTP2_PING == frame->hd.type) {
            enter_ping_state(session, H2_PING_ST_NONE);
        }
        /* we may receive many other frames while we are waiting for the
         * PING answer. They may come all from our connection buffers and
         * say nothing about the current state of the backend. */

        break;
    }
}

static apr_status_t proxy_session_pre_close(void *theconn)
{
    proxy_conn_rec *p_conn = (proxy_conn_rec *)theconn;
    h2_proxy_session *session = p_conn->data;

    if (session && session->ngh2) {
        ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
                      "proxy_session(%s): pool cleanup, state=%d, streams=%d",
                      session->id, session->state, 
                      (int)h2_proxy_ihash_count(session->streams));
        session->aborted = 1;
        dispatch_event(session, H2_PROXYS_EV_PRE_CLOSE, 0, NULL);
        nghttp2_session_del(session->ngh2);
        session->ngh2 = NULL;
        p_conn->data = NULL;
    }
    return APR_SUCCESS;
}

static int proxy_pass_brigade(apr_bucket_alloc_t *bucket_alloc,
                              proxy_conn_rec *p_conn,
                              conn_rec *origin, apr_bucket_brigade *bb,
                              int flush)
{
    apr_status_t status;
    apr_off_t transferred;

    if (flush) {
        apr_bucket *e = apr_bucket_flush_create(bucket_alloc);
        APR_BRIGADE_INSERT_TAIL(bb, e);
    }
    apr_brigade_length(bb, 0, &transferred);
    if (transferred != -1)
        p_conn->worker->s->transferred += transferred;
    status = ap_pass_brigade(origin->output_filters, bb);
    /* Cleanup the brigade now to avoid buckets lifetime
     * issues in case of error returned below. */

    apr_brigade_cleanup(bb);
    if (status != APR_SUCCESS) {
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, origin, APLOGNO(03357)
                      "pass output failed to %pI (%s)",
                      p_conn->addr, p_conn->hostname);
    }
    return status;
}

static ssize_t raw_send(nghttp2_session *ngh2, const uint8_t *data,
                        size_t length, int flags, void *user_data)
{
    h2_proxy_session *session = user_data;
    apr_bucket *b;
    apr_status_t status;
    int flush = 1;

    if (data) {
        b = apr_bucket_transient_create((const char*)data, length, 
                                        session->c->bucket_alloc);
        APR_BRIGADE_INSERT_TAIL(session->output, b);
    }

    status = proxy_pass_brigade(session->c->bucket_alloc,  
                                session->p_conn, session->c, 
                                session->output, flush);
    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
                  "h2_proxy_sesssion(%s): raw_send %d bytes, flush=%d"
                  session->id, (int)length, flush);
    if (status != APR_SUCCESS) {
        return NGHTTP2_ERR_CALLBACK_FAILURE;
    }
    return length;
}

static int on_frame_recv(nghttp2_session *ngh2, const nghttp2_frame *frame,
                         void *user_data) 
{
    h2_proxy_session *session = user_data;
    h2_proxy_stream *stream;
    request_rec *r;
    int n;
    
    if (APLOGcdebug(session->c)) {
        char buffer[256];
        
        h2_proxy_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03341)
                      "h2_proxy_session(%s): recv FRAME[%s]",
                      session->id, buffer);
    }

    ping_ev_frame_received(session, frame);
    /* Action for frame types: */
    switch (frame->hd.type) {
        case NGHTTP2_HEADERS:
            stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
            if (!stream) {
                return NGHTTP2_ERR_CALLBACK_FAILURE;
            }
            r = stream->r;
            if (r->status >= 100 && r->status < 200) {
                /* By default, we will forward all interim responses when
                 * we are sitting on a HTTP/2 connection to the client */

                int forward = session->h2_front;
                switch(r->status) {
                    case 100:
                        if (stream->waiting_on_100) {
                            stream->waiting_on_100 = 0;
                            r->status_line = ap_get_status_line(r->status);
                            forward = 1;
                        } 
                        break;
                    case 103:
                        /* workaround until we get this into http protocol base
                         * parts. without this, unknown codes are converted to
                         * 500... */

                        r->status_line = "103 Early Hints";
                        break;
                    default:
                        r->status_line = ap_get_status_line(r->status);
                        break;
                }
                ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03487) 
                              "h2_proxy_session(%s): got interim HEADERS, "
                              "status=%d, will forward=%d",
                              session->id, r->status, forward);
                if (forward) {
                    ap_send_interim_response(r, 1);
                }
            }
            stream_resume(stream);
            break;
        case NGHTTP2_PING:
            break;
        case NGHTTP2_PUSH_PROMISE:
            break;
        case NGHTTP2_SETTINGS:
            if (frame->settings.niv > 0) {
                n = nghttp2_session_get_remote_settings(ngh2, NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
                if (n > 0) {
                    session->remote_max_concurrent = n;
                }
            }
            break;
        case NGHTTP2_GOAWAY:
            /* we expect the remote server to tell us the highest stream id
             * that it has started processing. */

            session->last_stream_id = frame->goaway.last_stream_id;
            dispatch_event(session, H2_PROXYS_EV_REMOTE_GOAWAY, 0, NULL);
            break;
        default:
            break;
    }
    return 0;
}

static int before_frame_send(nghttp2_session *ngh2,
                             const nghttp2_frame *frame, void *user_data)
{
    h2_proxy_session *session = user_data;
    if (APLOGcdebug(session->c)) {
        char buffer[256];

        h2_proxy_util_frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0]));
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03343)
                      "h2_proxy_session(%s): sent FRAME[%s]",
                      session->id, buffer);
    }
    return 0;
}

static int add_header(void *table, const char *n, const char *v)
{
    apr_table_add(table, n, v);
    return 1;
}

static void process_proxy_header(apr_table_t *headers, h2_proxy_stream *stream, 
                                 const char *n, const char *v)
{
    static const struct {
        const char *name;
        ap_proxy_header_reverse_map_fn func;
    } transform_hdrs[] = {
        { "Location", ap_proxy_location_reverse_map },
        { "Content-Location", ap_proxy_location_reverse_map },
        { "URI", ap_proxy_location_reverse_map },
        { "Destination", ap_proxy_location_reverse_map },
        { "Set-Cookie", ap_proxy_cookie_reverse_map },
        { NULL, NULL }
    };
    request_rec *r = stream->r;
    proxy_dir_conf *dconf;
    int i;
    
    dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
    if (!dconf->preserve_host) {
        for (i = 0; transform_hdrs[i].name; ++i) {
            if (!ap_cstr_casecmp(transform_hdrs[i].name, n)) {
                apr_table_add(headers, n, (*transform_hdrs[i].func)(r, dconf, v));
                return;
            }
        }
        if (!ap_cstr_casecmp("Link", n)) {
            dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
            apr_table_add(headers, n, h2_proxy_link_reverse_map(r, dconf, 
                            stream->real_server_uri, stream->p_server_uri, v));
            return;
        }
    }
    apr_table_add(headers, n, v);
}

static apr_status_t h2_proxy_stream_add_header_out(h2_proxy_stream *stream,
                                                   const char *n, apr_size_t nlen,
                                                   const char *v, apr_size_t vlen)
{
    if (n[0] == ':') {
        if (!stream->data_received && !strncmp(":status", n, nlen)) {
            char *s = apr_pstrndup(stream->r->pool, v, vlen);
            
            apr_table_setn(stream->r->notes, "proxy-status", s);
            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
                          "h2_proxy_stream(%s-%d): got status %s"
                          stream->session->id, stream->id, s);
            stream->r->status = (int)apr_atoi64(s);
            if (stream->r->status <= 0) {
                stream->r->status = 500;
                return APR_EGENERAL;
            }
        }
        return APR_SUCCESS;
    }
    
    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
                  "h2_proxy_stream(%s-%d): on_header %s: %s"
                  stream->session->id, stream->id, n, v);
    if (!h2_proxy_res_ignore_header(n, nlen)) {
        char *hname, *hvalue;
        apr_table_t *headers = (stream->headers_ended? 
                               stream->r->trailers_out : stream->r->headers_out);
    
        hname = apr_pstrndup(stream->pool, n, nlen);
        h2_proxy_util_camel_case_header(hname, nlen);
        hvalue = apr_pstrndup(stream->pool, v, vlen);
        
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
                      "h2_proxy_stream(%s-%d): got header %s: %s"
                      stream->session->id, stream->id, hname, hvalue);
        process_proxy_header(headers, stream, hname, hvalue);
    }
    return APR_SUCCESS;
}

static int log_header(void *ctx, const char *key, const char *value)
{
    h2_proxy_stream *stream = ctx;
    ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
                  "h2_proxy_stream(%s-%d), header_out %s: %s"
                  stream->session->id, stream->id, key, value);
    return 1;
}

static void h2_proxy_stream_end_headers_out(h2_proxy_stream *stream) 
{
    h2_proxy_session *session = stream->session;
    request_rec *r = stream->r;
    apr_pool_t *p = r->pool;
    const char *buf;
    
    /* Now, add in the cookies from the response to the ones already saved */
    apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);
    
    /* and now load 'em all in */
    if (!apr_is_empty_table(stream->saves)) {
        apr_table_unset(r->headers_out, "Set-Cookie");
        r->headers_out = apr_table_overlay(p, r->headers_out, stream->saves);
    }

    if ((buf = apr_table_get(r->headers_out, "Content-Type"))) {
        ap_set_content_type(r, apr_pstrdup(p, buf));
    }
    
    /* handle Via header in response */
    if (session->conf->viaopt != via_off 
        && session->conf->viaopt != via_block) {
        const char *server_name = ap_get_server_name(stream->r);
        apr_port_t port = ap_get_server_port(stream->r);
        char portstr[32];
        
        /* If USE_CANONICAL_NAME_OFF was configured for the proxy virtual host,
         * then the server name returned by ap_get_server_name() is the
         * origin server name (which doesn't make sense with Via: headers)
         * so we use the proxy vhost's name instead.
         */

        if (server_name == stream->r->hostname) {
            server_name = stream->r->server->server_hostname;
        }
        if (ap_is_default_port(port, stream->r)) {
            portstr[0] = '\0';
        }
        else {
            apr_snprintf(portstr, sizeof(portstr), ":%d", port);
        }

        /* create a "Via:" response header entry and merge it */
        apr_table_add(r->headers_out, "Via",
                       (session->conf->viaopt == via_full)
                       ? apr_psprintf(p, "%d.%d %s%s (%s)",
                                      HTTP_VERSION_MAJOR(r->proto_num),
                                      HTTP_VERSION_MINOR(r->proto_num),
                                      server_name, portstr,
                                      AP_SERVER_BASEVERSION)
                       : apr_psprintf(p, "%d.%d %s%s",
                                      HTTP_VERSION_MAJOR(r->proto_num),
                                      HTTP_VERSION_MINOR(r->proto_num),
                                      server_name, portstr)
                       );
    }
    if (r->status >= 200) stream->headers_ended = 1;
    
    if (APLOGrtrace2(stream->r)) {
        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, 0, stream->r, 
                      "h2_proxy_stream(%s-%d), header_out after merging"
                      stream->session->id, stream->id);
        apr_table_do(log_header, stream, stream->r->headers_out, NULL);
    }
}

static int stream_response_data(nghttp2_session *ngh2, uint8_t flags,
                                int32_t stream_id, const uint8_t *data,
                                size_t len, void *user_data) 
{
    h2_proxy_session *session = user_data;
    h2_proxy_stream *stream;
    apr_bucket *b;
    apr_status_t status;
    
    stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
    if (!stream) {
        ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03358)
                     "h2_proxy_session(%s): recv data chunk for "
                     "unknown stream %d, ignored"
                     session->id, stream_id);
        return 0;
    }
    
    if (!stream->data_received) {
        /* last chance to manipulate response headers.
         * after this, only trailers */

        h2_proxy_stream_end_headers_out(stream);
    }
    stream->data_received += len;
    b = apr_bucket_transient_create((const char*)data, len,
                                    stream->cfront->bucket_alloc);
    APR_BRIGADE_INSERT_TAIL(stream->output, b);
    /* always flush after a DATA frame, as we have no other indication
     * of buffer use */

    b = apr_bucket_flush_create(stream->cfront->bucket_alloc);
    APR_BRIGADE_INSERT_TAIL(stream->output, b);

    status = ap_pass_brigade(stream->r->output_filters, stream->output);
    ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03359)
                  "h2_proxy_session(%s): stream=%d, response DATA %ld, %ld"
                  " total", session->id, stream_id, (long)len,
                  (long)stream->data_received);
    if (status != APR_SUCCESS) {
        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03344)
                      "h2_proxy_session(%s): passing output on stream %d"
                      session->id, stream->id);
        nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE,
                                  stream_id, NGHTTP2_STREAM_CLOSED);
        return NGHTTP2_ERR_STREAM_CLOSING;
    }
    return 0;
}

static int on_stream_close(nghttp2_session *ngh2, int32_t stream_id,
                           uint32_t error_code, void *user_data) 
{
    h2_proxy_session *session = user_data;
    h2_proxy_stream *stream;
    if (!session->aborted) {
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03360)
                      "h2_proxy_session(%s): stream=%d, closed, err=%d"
                      session->id, stream_id, error_code);
        stream = h2_proxy_ihash_get(session->streams, stream_id);
        if (stream) {
            stream->error_code = error_code;
        }
        dispatch_event(session, H2_PROXYS_EV_STREAM_DONE, stream_id, NULL);
    }
    return 0;
}

static int on_header(nghttp2_session *ngh2, const nghttp2_frame *frame,
                     const uint8_t *namearg, size_t nlen,
                     const uint8_t *valuearg, size_t vlen, uint8_t flags,
                     void *user_data) 
{
    h2_proxy_session *session = user_data;
    h2_proxy_stream *stream;
    const char *n = (const char*)namearg;
    const char *v = (const char*)valuearg;
    
    (void)session;
    if (frame->hd.type == NGHTTP2_HEADERS && nlen) {
        stream = nghttp2_session_get_stream_user_data(ngh2, frame->hd.stream_id);
        if (stream) {
            if (h2_proxy_stream_add_header_out(stream, n, nlen, v, vlen)) {
                return NGHTTP2_ERR_CALLBACK_FAILURE;
            }
        }
    }
    else if (frame->hd.type == NGHTTP2_PUSH_PROMISE) {
    }
    
    return 0;
}

static ssize_t stream_request_data(nghttp2_session *ngh2, int32_t stream_id, 
                                   uint8_t *buf, size_t length,
                                   uint32_t *data_flags, 
                                   nghttp2_data_source *source, void *user_data)
{
    h2_proxy_stream *stream;
    apr_status_t status = APR_SUCCESS;
    
    *data_flags = 0;
    stream = nghttp2_session_get_stream_user_data(ngh2, stream_id);
    if (!stream) {
        ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(03361)
                     "h2_proxy_stream(NULL): data_read, stream %d not found"
                     stream_id);
        return NGHTTP2_ERR_CALLBACK_FAILURE;
    }
    
    if (stream->session->ping_state != H2_PING_ST_NONE) {
        /* suspend until we hear from the other side */
        stream->waiting_on_ping = 1;
        status = APR_EAGAIN;
    }
    else if (stream->r->expecting_100) {
        /* suspend until the answer comes */
        stream->waiting_on_100 = 1;
        status = APR_EAGAIN;
    }
    else if (APR_BRIGADE_EMPTY(stream->input)) {
        status = ap_get_brigade(stream->r->input_filters, stream->input,
                                AP_MODE_READBYTES, APR_NONBLOCK_READ,
                                H2MAX(APR_BUCKET_BUFF_SIZE, length));
        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
                      "h2_proxy_stream(%s-%d): request body read"
                      stream->session->id, stream->id);
    }

    if (status == APR_SUCCESS) {
        size_t readlen = 0;
        while (status == APR_SUCCESS 
               && (readlen < length)
               && !APR_BRIGADE_EMPTY(stream->input)) {
            apr_bucket* b = APR_BRIGADE_FIRST(stream->input);
            if (APR_BUCKET_IS_METADATA(b)) {
                if (APR_BUCKET_IS_EOS(b)) {
                    *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                }
                else {
                    /* we do nothing more regarding any meta here */
                }
            }
            else {
                const char *bdata = NULL;
                apr_size_t blen = 0;
                status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
                
                if (status == APR_SUCCESS && blen > 0) {
                    size_t copylen = H2MIN(length - readlen, blen);
                    memcpy(buf, bdata, copylen);
                    buf += copylen;
                    readlen += copylen;
                    if (copylen < blen) {
                        /* We have data left in the bucket. Split it. */
                        status = apr_bucket_split(b, copylen);
                    }
                }
            }
            apr_bucket_delete(b);
        }

        stream->data_sent += readlen;
        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(03468) 
                      "h2_proxy_stream(%d): request DATA %ld, %ld"
                      " total, flags=%d", stream->id, (long)readlen, (long)stream->data_sent,
                      (int)*data_flags);
        if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && !apr_is_empty_table(stream->r->trailers_in)) {
            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, stream->r, APLOGNO(10179) 
                          "h2_proxy_stream(%d): submit trailers", stream->id);
            *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
            submit_trailers(stream);
        } 
        return readlen;
    }
    else if (APR_STATUS_IS_EAGAIN(status)) {
        /* suspended stream, needs to be re-awakened */
        ap_log_rerror(APLOG_MARK, APLOG_TRACE2, status, stream->r, 
                      "h2_proxy_stream(%s-%d): suspending"
                      stream->session->id, stream_id);
        stream->suspended = 1;
        h2_proxy_iq_add(stream->session->suspended, stream->id, NULL, NULL);
        return NGHTTP2_ERR_DEFERRED;
    }
    else {
        nghttp2_submit_rst_stream(ngh2, NGHTTP2_FLAG_NONE, 
                                  stream_id, NGHTTP2_STREAM_CLOSED);
        return NGHTTP2_ERR_STREAM_CLOSING;
    }
}

#ifdef H2_NG2_INVALID_HEADER_CB
static int on_invalid_header_cb(nghttp2_session *ngh2,
                                const nghttp2_frame *frame, 
                                const uint8_t *name, size_t namelen, 
                                const uint8_t *value, size_t valuelen, 
                                uint8_t flags, void *user_data)
{
    h2_proxy_session *session = user_data;
    if (APLOGcdebug(session->c)) {
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03469)
                      "h2_proxy_session(%s-%d): denying stream with invalid header "
                      "'%s: %s'", session->id, (int)frame->hd.stream_id,
                      apr_pstrndup(session->pool, (const char *)name, namelen),
                      apr_pstrndup(session->pool, (const char *)value, valuelen));
    }
    return nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE,
                                     frame->hd.stream_id, 
                                     NGHTTP2_PROTOCOL_ERROR);
}
#endif

h2_proxy_session *h2_proxy_session_setup(const char *id, proxy_conn_rec *p_conn,
                                         proxy_server_conf *conf,
                                         int h2_front, 
                                         unsigned char window_bits_connection,
                                         unsigned char window_bits_stream,
                                         h2_proxy_request_done *done)
{
    if (!p_conn->data) {
        apr_pool_t *pool = p_conn->scpool;
        h2_proxy_session *session;
        nghttp2_session_callbacks *cbs;
        nghttp2_option *option;

        session = apr_pcalloc(pool, sizeof(*session));
        apr_pool_pre_cleanup_register(pool, p_conn, proxy_session_pre_close);
        p_conn->data = session;
        
        session->id = apr_pstrdup(p_conn->scpool, id);
        session->c = p_conn->connection;
        session->p_conn = p_conn;
        session->conf = conf;
        session->pool = p_conn->scpool;
        session->state = H2_PROXYS_ST_INIT;
        session->h2_front = h2_front;
        session->window_bits_stream = window_bits_stream;
        session->window_bits_connection = window_bits_connection;
        session->streams = h2_proxy_ihash_create(pool, offsetof(h2_proxy_stream, id));
        session->suspended = h2_proxy_iq_create(pool, 5);
        session->done = done;
    
        session->input = apr_brigade_create(session->pool, session->c->bucket_alloc);
        session->output = apr_brigade_create(session->pool, session->c->bucket_alloc);
    
        nghttp2_session_callbacks_new(&cbs);
        nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
        nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, stream_response_data);
        nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
        nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
        nghttp2_session_callbacks_set_before_frame_send_callback(cbs, before_frame_send);
        nghttp2_session_callbacks_set_send_callback(cbs, raw_send);
#ifdef H2_NG2_INVALID_HEADER_CB
        nghttp2_session_callbacks_set_on_invalid_header_callback(cbs, on_invalid_header_cb);
#endif
        nghttp2_option_new(&option);
        nghttp2_option_set_peer_max_concurrent_streams(option, 100);
        nghttp2_option_set_no_auto_window_update(option, 0);
        
        nghttp2_session_client_new2(&session->ngh2, cbs, session, option);
        
        nghttp2_option_del(option);
        nghttp2_session_callbacks_del(cbs);

        ping_new_session(session, p_conn);
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03362)
                      "setup session for %s", p_conn->hostname);
    }
    else {
        h2_proxy_session *session = p_conn->data;
        ping_reuse_session(session);
    }
    return p_conn->data;
}

static apr_status_t session_start(h2_proxy_session *session) 
{
    nghttp2_settings_entry settings[2];
    int rv, add_conn_window;
    apr_socket_t *s;
    
    s = ap_get_conn_socket(session->c);
#if !defined(WIN32) && !defined(NETWARE)
    if (s) {
        ap_sock_disable_nagle(s);
    }
#endif
    
    settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
    settings[0].value = 0;
    settings[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
    settings[1].value = (1 << session->window_bits_stream) - 1;
    
    rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, 
                                 H2_ALEN(settings));
    
    /* If the connection window is larger than our default, trigger a WINDOW_UPDATE */
    add_conn_window = ((1 << session->window_bits_connection) - 1 -
                       NGHTTP2_INITIAL_CONNECTION_WINDOW_SIZE);
    if (!rv && add_conn_window != 0) {
        rv = nghttp2_submit_window_update(session->ngh2, NGHTTP2_FLAG_NONE, 0, add_conn_window);
    }
    return rv? APR_EGENERAL : APR_SUCCESS;
}

static apr_status_t open_stream(h2_proxy_session *session, const char *url,
                                request_rec *r, int standalone,
                                h2_proxy_stream **pstream)
{
    h2_proxy_stream *stream;
    apr_uri_t puri;
    const char *authority, *scheme, *path, *orig_host;
    apr_status_t status;
    proxy_dir_conf *dconf;

    stream = apr_pcalloc(r->pool, sizeof(*stream));

    stream->pool = r->pool;
    stream->url = url;
    stream->r = r;
    stream->cfront = r->connection;
    stream->standalone = standalone;
    stream->session = session;
    stream->state = H2_STREAM_ST_IDLE;
    
    stream->input = apr_brigade_create(stream->pool, stream->cfront->bucket_alloc);
    stream->output = apr_brigade_create(stream->pool, stream->cfront->bucket_alloc);
    
    stream->req = h2_proxy_req_create(1, stream->pool);

    status = apr_uri_parse(stream->pool, url, &puri);
    if (status != APR_SUCCESS)
        return status;
    
    scheme = (strcmp(puri.scheme, "h2")? "http" : "https");
    orig_host = apr_table_get(r->headers_in, "Host");
    if (orig_host == NULL) {
        orig_host = r->hostname;
    }

    dconf = ap_get_module_config(r->per_dir_config, &proxy_module);
    if (dconf->preserve_host) {
        authority = orig_host;
        if (!authority) {
            /* Duplicate mod_proxy behaviour if ProxyPreserveHost is
             * used but an "HTTP/0.9" request is received without a
             * Host: header */

            authority = r->server->server_hostname;
            ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, APLOGNO(10511)
                          "HTTP/0.9 request (with no host line) "
                          "on incoming request and preserve host set "
                          "forcing hostname to be %s for uri %s",
                          authority, r->uri);
            apr_table_setn(r->headers_in, "Host", authority);
        }
    }
    else {
        authority = puri.hostname;
        if (!ap_strchr_c(authority, ':') && puri.port
            && apr_uri_port_of_scheme(scheme) != puri.port) {
            /* port info missing and port is not default for scheme: append */
            authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port);
        }
        ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
                      "authority=%s from uri.hostname=%s and uri.port=%d",
                      authority, puri.hostname, puri.port);
    }
    /* See #235, we use only :authority when available and remove Host:
     * since differing values are not acceptable, see RFC 9113 ch. 8.3.1 */

    if (authority && strlen(authority)) {
        apr_table_unset(r->headers_in, "Host");
    }

    /* we need this for mapping relative uris in headers ("Link") back
     * to local uris */

    stream->real_server_uri = apr_psprintf(stream->pool, "%s://%s", scheme, authority);
    stream->p_server_uri = apr_psprintf(stream->pool, "%s://%s", puri.scheme, authority);
    path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART);

    h2_proxy_req_make(stream->req, stream->pool, r->method, scheme,
                authority, path, r->headers_in);

    if (dconf->add_forwarded_headers) {
        if (PROXYREQ_REVERSE == r->proxyreq) {
            /* Add X-Forwarded-For: so that the upstream has a chance to
             * determine, where the original request came from.
             */

            apr_table_mergen(stream->req->headers, "X-Forwarded-For",
                             r->useragent_ip);

            /* Add X-Forwarded-Host: so that upstream knows what the
             * original request hostname was.
             */

            if (orig_host) {
                apr_table_mergen(stream->req->headers, "X-Forwarded-Host",
                                 orig_host);
            }

            /* Add X-Forwarded-Server: so that upstream knows what the
             * name of this proxy server is (if there are more than one)
             * XXX: This duplicates Via: - do we strictly need it?
             */

            apr_table_mergen(stream->req->headers, "X-Forwarded-Server",
                             r->server->server_hostname);
        }
    }

    /* Tuck away all already existing cookies */
    stream->saves = apr_table_make(r->pool, 2);
    apr_table_do(add_header, stream->saves, r->headers_out, "Set-Cookie", NULL);

    *pstream = stream;
    
    return APR_SUCCESS;
}

static apr_status_t submit_stream(h2_proxy_session *session, h2_proxy_stream *stream)
{
    h2_proxy_ngheader *hd;
    nghttp2_data_provider *pp = NULL;
    nghttp2_data_provider provider;
    int rv, may_have_request_body = 1;
    apr_status_t status;

    hd = h2_proxy_util_nghd_make_req(stream->pool, stream->req);
    
    /* If we expect a 100-continue response, we must refrain from reading
       any input until we get it. Reading the input will possibly trigger
       HTTP_IN filter to generate the 100-continue itself. */

    if (stream->waiting_on_100 || stream->waiting_on_ping) {
        /* make a small test if we get an EOF/EOS immediately */
        status = ap_get_brigade(stream->r->input_filters, stream->input,
                                AP_MODE_READBYTES, APR_NONBLOCK_READ,
                                APR_BUCKET_BUFF_SIZE);
        may_have_request_body = APR_STATUS_IS_EAGAIN(status)
                                || (status == APR_SUCCESS 
                                    && !APR_BUCKET_IS_EOS(APR_BRIGADE_FIRST(stream->input)));
    }
    
    if (may_have_request_body) {
        provider.source.fd = 0;
        provider.source.ptr = NULL;
        provider.read_callback = stream_request_data;
        pp = &provider;
    }

    rv = nghttp2_submit_request(session->ngh2, NULL, 
                                hd->nv, hd->nvlen, pp, stream);
                                
    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->cfront, APLOGNO(03363)
                  "h2_proxy_session(%s): submit %s%s -> %d"
                  session->id, stream->req->authority, stream->req->path,
                  rv);
    if (rv > 0) {
        stream->id = rv;
        stream->state = H2_STREAM_ST_OPEN;
        h2_proxy_ihash_add(session->streams, stream);
        dispatch_event(session, H2_PROXYS_EV_STREAM_SUBMITTED, rv, NULL);
        
        return APR_SUCCESS;
    }
    return APR_EGENERAL;
}

static apr_status_t submit_trailers(h2_proxy_stream *stream)
{
    h2_proxy_ngheader *hd;
    int rv;

    hd = h2_proxy_util_nghd_make(stream->pool, stream->r->trailers_in);
    rv = nghttp2_submit_trailer(stream->session->ngh2, stream->id, hd->nv, hd->nvlen);
    return rv == 0? APR_SUCCESS: APR_EGENERAL;
}

static apr_status_t feed_brigade(h2_proxy_session *session, apr_bucket_brigade *bb)
{
    apr_status_t status = APR_SUCCESS;
    apr_size_t readlen = 0;
    ssize_t n;

    while (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(bb)) {
        apr_bucket* b = APR_BRIGADE_FIRST(bb);
        
        if (APR_BUCKET_IS_METADATA(b)) {
            /* nop */
        }
        else {
            const char *bdata = NULL;
            apr_size_t blen = 0;
            
            status = apr_bucket_read(b, &bdata, &blen, APR_BLOCK_READ);
            if (status == APR_SUCCESS && blen > 0) {
                n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)bdata, blen);
                ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, 
                              "h2_proxy_session(%s): feeding %ld bytes -> %ld"
                              session->id, (long)blen, (long)n);
                if (n < 0) {
                    if (nghttp2_is_fatal((int)n)) {
                        status = APR_EGENERAL;
                    }
                }
                else {
                    size_t rlen = (size_t)n;
                    readlen += rlen;
                    if (rlen < blen) {
                        apr_bucket_split(b, rlen);
                    }
                }
            }
        }
        apr_bucket_delete(b);
    }
    
    ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, 
                  "h2_proxy_session(%s): fed %ld bytes of input to session"
                  session->id, (long)readlen);
    if (readlen == 0 && status == APR_SUCCESS) {
        return APR_EAGAIN;
    }
    return status;
}

static apr_status_t h2_proxy_session_read(h2_proxy_session *session, int block, 
                                          apr_interval_time_t timeout)
{
    apr_status_t status = APR_SUCCESS;
    
    if (APR_BRIGADE_EMPTY(session->input)) {
        apr_socket_t *socket = NULL;
        apr_time_t save_timeout = -1;
        
        if (block && timeout > 0) {
            socket = ap_get_conn_socket(session->c);
            if (socket) {
                apr_socket_timeout_get(socket, &save_timeout);
                apr_socket_timeout_set(socket, timeout);
            }
            else {
                /* cannot block on timeout */
                ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, session->c, APLOGNO(03379)
                              "h2_proxy_session(%s): unable to get conn socket"
                              session->id);
                return APR_ENOTIMPL;
            }
        }
        
        status = ap_get_brigade(session->c->input_filters, session->input, 
                                AP_MODE_READBYTES, 
                                block? APR_BLOCK_READ : APR_NONBLOCK_READ, 
                                64 * 1024);
        ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c, 
                      "h2_proxy_session(%s): read from conn", session->id);
        if (socket && save_timeout != -1) {
            apr_socket_timeout_set(socket, save_timeout);
        }
    }
    
    if (status == APR_SUCCESS) {
        status = feed_brigade(session, session->input);
    }
    else if (APR_STATUS_IS_TIMEUP(status)) {
        /* nop */
    }
    else if (!APR_STATUS_IS_EAGAIN(status)) {
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, APLOGNO(03380)
                      "h2_proxy_session(%s): read error", session->id);
        dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
    }

    return status;
}

apr_status_t h2_proxy_session_submit(h2_proxy_session *session, 
                                     const char *url, request_rec *r,
                                     int standalone)
{
    h2_proxy_stream *stream;
    apr_status_t status;
    
    status = open_stream(session, url, r, standalone, &stream);
    if (status == APR_SUCCESS) {
        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03381)
                      "process stream(%d): %s %s%s, original: %s"
                      stream->id, stream->req->method, 
                      stream->req->authority, stream->req->path, 
                      r->the_request);
        status = submit_stream(session, stream);
    }
    return status;
}

static void stream_resume(h2_proxy_stream *stream)
{
    h2_proxy_session *session = stream->session;
    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->cfront,
                  "h2_proxy_stream(%s-%d): resuming"
                  session->id, stream->id);
    stream->suspended = 0;
    h2_proxy_iq_remove(session->suspended, stream->id);
    nghttp2_session_resume_data(session->ngh2, stream->id);
    dispatch_event(session, H2_PROXYS_EV_STREAM_RESUMED, 0, NULL);
}

static int is_waiting_for_backend(h2_proxy_session *session)
{
    return ((session->ping_state != H2_PING_ST_NONE) 
            || ((session->suspended->nelts <= 0)
                && !nghttp2_session_want_write(session->ngh2)
                && nghttp2_session_want_read(session->ngh2)));
}

static apr_status_t check_suspended(h2_proxy_session *session)
{
    h2_proxy_stream *stream;
    int i, stream_id;
    apr_status_t status;
    
    for (i = 0; i < session->suspended->nelts; ++i) {
        stream_id = session->suspended->elts[i];
        stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
        if (stream) {
            if (stream->waiting_on_100 || stream->waiting_on_ping) {
                status = APR_EAGAIN;
            }
            else {
                status = ap_get_brigade(stream->r->input_filters, stream->input,
                                        AP_MODE_READBYTES, APR_NONBLOCK_READ,
                                        APR_BUCKET_BUFF_SIZE);
            }
            if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->input)) {
                stream_resume(stream);
                check_suspended(session);
                return APR_SUCCESS;
            }
            else if (status != APR_SUCCESS && !APR_STATUS_IS_EAGAIN(status)) {
                ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, stream->cfront,
                              APLOGNO(03382) "h2_proxy_stream(%s-%d): check input"
                              session->id, stream_id);
                stream_resume(stream);
                check_suspended(session);
                return APR_SUCCESS;
            }
        }
        else {
            /* gone? */
            h2_proxy_iq_remove(session->suspended, stream_id);
            check_suspended(session);
            return APR_SUCCESS;
        }
    }
    return APR_EAGAIN;
}

static apr_status_t session_shutdown(h2_proxy_session *session, int reason, 
                                     const char *msg)
{
    apr_status_t status = APR_SUCCESS;
    const char *err = msg;
    
    ap_assert(session);
    if (!err && reason) {
        err = nghttp2_strerror(reason);
    }
    nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0,
                          reason, (uint8_t*)err, err? strlen(err):0);
    status = nghttp2_session_send(session->ngh2);
    dispatch_event(session, H2_PROXYS_EV_LOCAL_GOAWAY, reason, err);
    return status;
}


static const char *StateNames[] = {
    "INIT",      /* H2_PROXYS_ST_INIT */
    "DONE",      /* H2_PROXYS_ST_DONE */
    "IDLE",      /* H2_PROXYS_ST_IDLE */
    "BUSY",      /* H2_PROXYS_ST_BUSY */
    "WAIT",      /* H2_PROXYS_ST_WAIT */
    "LSHUTDOWN"/* H2_PROXYS_ST_LOCAL_SHUTDOWN */
    "RSHUTDOWN"/* H2_PROXYS_ST_REMOTE_SHUTDOWN */
};

static const char *state_name(h2_proxys_state state)
{
    if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) {
        return "unknown";
    }
    return StateNames[state];
}

static int is_accepting_streams(h2_proxy_session *session)
{
    switch (session->state) {
        case H2_PROXYS_ST_IDLE:
        case H2_PROXYS_ST_BUSY:
        case H2_PROXYS_ST_WAIT:
            return 1;
        default:
            return 0;
    }
}

static void transit(h2_proxy_session *session, const char *action, 
                    h2_proxys_state nstate)
{
    ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03345)
                  "h2_proxy_session(%s): transit [%s] -- %s --> [%s]", session->id,
                  state_name(session->state), action, state_name(nstate));
    session->state = nstate;
}

static void ev_init(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_INIT:
            if (h2_proxy_ihash_empty(session->streams)) {
                transit(session, "init", H2_PROXYS_ST_IDLE);
            }
            else {
                transit(session, "init", H2_PROXYS_ST_BUSY);
            }
            break;

        default:
            /* nop */
            break;
    }
}

static void ev_local_goaway(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
            /* already did that? */
            break;
        case H2_PROXYS_ST_IDLE:
        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
            /* all done */
            transit(session, "local goaway", H2_PROXYS_ST_DONE);
            break;
        default:
            transit(session, "local goaway", H2_PROXYS_ST_LOCAL_SHUTDOWN);
            break;
    }
}

static void ev_remote_goaway(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
            /* already received that? */
            break;
        case H2_PROXYS_ST_IDLE:
        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
            /* all done */
            transit(session, "remote goaway", H2_PROXYS_ST_DONE);
            break;
        default:
            transit(session, "remote goaway", H2_PROXYS_ST_REMOTE_SHUTDOWN);
            break;
    }
}

static void ev_conn_error(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_INIT:
        case H2_PROXYS_ST_DONE:
        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
            /* just leave */
            transit(session, "conn error", H2_PROXYS_ST_DONE);
            break;
        
        default:
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, arg, session->c,
                          "h2_proxy_session(%s): conn error -> shutdown", session->id);
            session_shutdown(session, arg, msg);
            break;
    }
}

static void ev_proto_error(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_DONE:
        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
            /* just leave */
            transit(session, "proto error", H2_PROXYS_ST_DONE);
            break;
        
        default:
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                          "h2_proxy_session(%s): proto error -> shutdown", session->id);
            session_shutdown(session, arg, msg);
            break;
    }
}

static void ev_conn_timeout(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
            transit(session, "conn timeout", H2_PROXYS_ST_DONE);
            break;
        default:
            session_shutdown(session, arg, msg);
            transit(session, "conn timeout", H2_PROXYS_ST_DONE);
            break;
    }
}

static void ev_no_io(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_BUSY:
        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
            /* nothing for input and output to do. If we remain
             * in this state, we go into a tight loop and suck up
             * CPU cycles. Ideally, we'd like to do a blocking read, but that
             * is not possible if we have scheduled tasks and wait
             * for them to produce something. */

            if (h2_proxy_ihash_empty(session->streams)) {
                if (!is_accepting_streams(session)) {
                    /* We are no longer accepting new streams and have
                     * finished processing existing ones. Time to leave. */

                    session_shutdown(session, arg, msg);
                    transit(session, "no io", H2_PROXYS_ST_DONE);
                }
                else {
                    /* When we have no streams, no task events are possible,
                     * switch to blocking reads */

                    transit(session, "no io", H2_PROXYS_ST_IDLE);
                }
            }
            else {
                /* Unable to do blocking reads, as we wait on events from
                 * task processing in other threads. Do a busy wait with
                 * backoff timer. */

                transit(session, "no io", H2_PROXYS_ST_WAIT);
            }
            break;
        default:
            /* nop */
            break;
    }
}

static void ev_stream_submitted(h2_proxy_session *session, int stream_id, 
                                const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_IDLE:
        case H2_PROXYS_ST_WAIT:
            transit(session, "stream submitted", H2_PROXYS_ST_BUSY);
            break;
        default:
            /* nop */
            break;
    }
}

static void ev_stream_done(h2_proxy_session *session, int stream_id, 
                           const char *msg)
{
    h2_proxy_stream *stream;
    apr_bucket *b;

    stream = nghttp2_session_get_stream_user_data(session->ngh2, stream_id);
    if (stream) {
        /* if the stream's connection is aborted, do not send anything
         * more on it. */

        apr_status_t status = (stream->error_code == 0)? APR_SUCCESS : APR_EINVAL;
        int touched = (stream->data_sent || stream->data_received ||
                       stream_id <= session->last_stream_id);
        if (!stream->cfront->aborted) {
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, stream->cfront, APLOGNO(03364)
                          "h2_proxy_sesssion(%s): stream(%d) closed "
                          "(touched=%d, error=%d)",
                          session->id, stream_id, touched, stream->error_code);

            if (status != APR_SUCCESS) {
              /* stream failed. If we have received (and forwarded) response
               * data already, we need to append an error buckt to inform
               * consumers.
               * Otherwise, we have an early fail on the connection and may
               * retry this request on a new one. In that case, keep the
               * output virgin so that a new attempt can be made. */

              if (stream->data_received) {
                int http_status = ap_map_http_request_error(status, HTTP_BAD_REQUEST);
                b = ap_bucket_error_create(http_status, NULL, stream->r->pool,
                                           stream->cfront->bucket_alloc);
                APR_BRIGADE_INSERT_TAIL(stream->output, b);
                b = apr_bucket_eos_create(stream->cfront->bucket_alloc);
                APR_BRIGADE_INSERT_TAIL(stream->output, b);
                ap_pass_brigade(stream->r->output_filters, stream->output);
              }
            }
            else if (!stream->data_received) {
                /* if the response had no body, this is the time to flush
                 * an empty brigade which will also write the response headers */

                h2_proxy_stream_end_headers_out(stream);
                stream->data_received = 1;
                b = apr_bucket_flush_create(stream->cfront->bucket_alloc);
                APR_BRIGADE_INSERT_TAIL(stream->output, b);
                b = apr_bucket_eos_create(stream->cfront->bucket_alloc);
                APR_BRIGADE_INSERT_TAIL(stream->output, b);
                ap_pass_brigade(stream->r->output_filters, stream->output);
            }
        }

        stream->state = H2_STREAM_ST_CLOSED;
        h2_proxy_ihash_remove(session->streams, stream_id);
        h2_proxy_iq_remove(session->suspended, stream_id);
        if (session->done) {
            session->done(session, stream->r, status, touched, stream->error_code);
        }
    }
    
    switch (session->state) {
        default:
            /* nop */
            break;
    }
}

static void ev_stream_resumed(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_WAIT:
            transit(session, "stream resumed", H2_PROXYS_ST_BUSY);
            break;
        default:
            /* nop */
            break;
    }
}

static void ev_data_read(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_IDLE:
        case H2_PROXYS_ST_WAIT:
            transit(session, "data read", H2_PROXYS_ST_BUSY);
            break;
        default:
            /* nop */
            break;
    }
}

static void ev_ngh2_done(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_DONE:
            /* nop */
            break;
        default:
            transit(session, "nghttp2 done", H2_PROXYS_ST_DONE);
            break;
    }
}

static void ev_pre_close(h2_proxy_session *session, int arg, const char *msg)
{
    switch (session->state) {
        case H2_PROXYS_ST_DONE:
        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
            /* nop */
            break;
        default:
            session_shutdown(session, arg, msg);
            break;
    }
}

static void dispatch_event(h2_proxy_session *session, h2_proxys_event_t ev, 
                           int arg, const char *msg)
{
    switch (ev) {
        case H2_PROXYS_EV_INIT:
            ev_init(session, arg, msg);
            break;            
        case H2_PROXYS_EV_LOCAL_GOAWAY:
            ev_local_goaway(session, arg, msg);
            break;
        case H2_PROXYS_EV_REMOTE_GOAWAY:
            ev_remote_goaway(session, arg, msg);
            break;
        case H2_PROXYS_EV_CONN_ERROR:
            ev_conn_error(session, arg, msg);
            break;
        case H2_PROXYS_EV_PROTO_ERROR:
            ev_proto_error(session, arg, msg);
            break;
        case H2_PROXYS_EV_CONN_TIMEOUT:
            ev_conn_timeout(session, arg, msg);
            break;
        case H2_PROXYS_EV_NO_IO:
            ev_no_io(session, arg, msg);
            break;
        case H2_PROXYS_EV_STREAM_SUBMITTED:
            ev_stream_submitted(session, arg, msg);
            break;
        case H2_PROXYS_EV_STREAM_DONE:
            ev_stream_done(session, arg, msg);
            break;
        case H2_PROXYS_EV_STREAM_RESUMED:
            ev_stream_resumed(session, arg, msg);
            break;
        case H2_PROXYS_EV_DATA_READ:
            ev_data_read(session, arg, msg);
            break;
        case H2_PROXYS_EV_NGH2_DONE:
            ev_ngh2_done(session, arg, msg);
            break;
        case H2_PROXYS_EV_PRE_CLOSE:
            ev_pre_close(session, arg, msg);
            break;
        default:
            ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c,
                          "h2_proxy_session(%s): unknown event %d"
                          session->id, ev);
            break;
    }
}

static int send_loop(h2_proxy_session *session)
{
    while (nghttp2_session_want_write(session->ngh2)) {
        int rv = nghttp2_session_send(session->ngh2);
        if (rv < 0 && nghttp2_is_fatal(rv)) {
            ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
                          "h2_proxy_session(%s): write, rv=%d", session->id, rv);
            dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, rv, NULL);
            break;
        }
        return 1;
    }
    return 0;
}

apr_status_t h2_proxy_session_process(h2_proxy_session *session)
{
    apr_status_t status;
    int have_written = 0, have_read = 0;

    ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, 
                  "h2_proxy_session(%s): process", session->id);
           
run_loop:
    switch (session->state) {
        case H2_PROXYS_ST_INIT:
            status = session_start(session);
            if (status == APR_SUCCESS) {
                dispatch_event(session, H2_PROXYS_EV_INIT, 0, NULL);
                goto run_loop;
            }
            else {
                dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
            }
            break;
            
        case H2_PROXYS_ST_BUSY:
        case H2_PROXYS_ST_LOCAL_SHUTDOWN:
        case H2_PROXYS_ST_REMOTE_SHUTDOWN:
            have_written = send_loop(session);
            
            if (nghttp2_session_want_read(session->ngh2)) {
                status = h2_proxy_session_read(session, 0, 0);
                if (status == APR_SUCCESS) {
                    have_read = 1;
                }
            }
            
            if (!have_written && !have_read 
                && !nghttp2_session_want_write(session->ngh2)) {
                dispatch_event(session, H2_PROXYS_EV_NO_IO, 0, NULL);
                goto run_loop;
            }
            break;
            
        case H2_PROXYS_ST_WAIT:
            if (is_waiting_for_backend(session)) {
                /* we can do a blocking read with the default timeout (as
                 * configured via ProxyTimeout in our socket. There is
                 * nothing we want to send or check until we get more data
                 * from the backend. */

                status = h2_proxy_session_read(session, 1, 0);
                if (status == APR_SUCCESS) {
                    have_read = 1;
                    dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
                }
                else {
                    dispatch_event(session, H2_PROXYS_EV_CONN_ERROR, status, NULL);
                    return status;
                }
            }
            else if (check_suspended(session) == APR_EAGAIN) {
                /* no stream has become resumed. Do a blocking read with
                 * ever increasing timeouts... */

                if (session->wait_timeout < 25) {
                    session->wait_timeout = 25;
                }
                else {
                    session->wait_timeout = H2MIN(apr_time_from_msec(100), 
                                                  2*session->wait_timeout);
                }
                
                status = h2_proxy_session_read(session, 1, session->wait_timeout);
                ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, session->c, 
                              APLOGNO(03365)
                              "h2_proxy_session(%s): WAIT read, timeout=%fms"
                              session->id, session->wait_timeout/1000.0);
                if (status == APR_SUCCESS) {
                    have_read = 1;
                    dispatch_event(session, H2_PROXYS_EV_DATA_READ, 0, NULL);
                }
                else if (APR_STATUS_IS_TIMEUP(status)
                    || APR_STATUS_IS_EAGAIN(status)) {
                    /* go back to checking all inputs again */
                    transit(session, "wait cycle", H2_PROXYS_ST_BUSY);
                }
            }
            break;
            
        case H2_PROXYS_ST_IDLE:
            break;

        case H2_PROXYS_ST_DONE: /* done, session terminated */
            return APR_EOF;
            
        default:
            ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, session->c,
                          APLOGNO(03346)"h2_proxy_session(%s): unknown state %d"
                          session->id, session->state);
            dispatch_event(session, H2_PROXYS_EV_PROTO_ERROR, 0, NULL);
            break;
    }


    if (have_read || have_written) {
        session->wait_timeout = 0;
    }
    
    if (!nghttp2_session_want_read(session->ngh2)
        && !nghttp2_session_want_write(session->ngh2)) {
        dispatch_event(session, H2_PROXYS_EV_NGH2_DONE, 0, NULL);
    }
    
    return APR_SUCCESS; /* needs to be called again */
}

typedef struct {
    h2_proxy_session *session;
    h2_proxy_request_done *done;
} cleanup_iter_ctx;

static int cancel_iter(void *udata, void *val)
{
    cleanup_iter_ctx *ctx = udata;
    h2_proxy_stream *stream = val;
    nghttp2_submit_rst_stream(ctx->session->ngh2, NGHTTP2_FLAG_NONE,
                              stream->id, 0);
    return 1;
}

void h2_proxy_session_cancel_all(h2_proxy_session *session)
{
    if (!h2_proxy_ihash_empty(session->streams)) {
        cleanup_iter_ctx ctx;
        ctx.session = session;
        ctx.done = session->done;
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03366)
                      "h2_proxy_session(%s): cancel %d streams",
                      session->id, (int)h2_proxy_ihash_count(session->streams));
        h2_proxy_ihash_iter(session->streams, cancel_iter, &ctx);
        session_shutdown(session, 0, NULL);
    }
}

static int done_iter(void *udata, void *val)
{
    cleanup_iter_ctx *ctx = udata;
    h2_proxy_stream *stream = val;
    int touched = (stream->data_sent || stream->data_received ||
                   stream->id <= ctx->session->last_stream_id);
    ctx->done(ctx->session, stream->r, APR_ECONNABORTED, touched, stream->error_code);
    return 1;
}

void h2_proxy_session_cleanup(h2_proxy_session *session, 
                              h2_proxy_request_done *done)
{
    if (!h2_proxy_ihash_empty(session->streams)) {
        cleanup_iter_ctx ctx;
        ctx.session = session;
        ctx.done = done;
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03519)
                      "h2_proxy_session(%s): terminated, %d streams unfinished",
                      session->id, (int)h2_proxy_ihash_count(session->streams));
        h2_proxy_ihash_iter(session->streams, done_iter, &ctx);
        h2_proxy_ihash_clear(session->streams);
    }
}

int h2_proxy_session_is_reusable(h2_proxy_session *session)
{
    return (session->state != H2_PROXYS_ST_DONE) &&
           h2_proxy_ihash_empty(session->streams);
}

static int ping_arrived_iter(void *udata, void *val)
{
    h2_proxy_stream *stream = val;
    if (stream->waiting_on_ping) {
        stream->waiting_on_ping = 0;
        stream_resume(stream);
    }
    return 1;
}

static void ping_arrived(h2_proxy_session *session)
{
    if (!h2_proxy_ihash_empty(session->streams)) {
        ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03470)
                      "h2_proxy_session(%s): ping arrived, unblocking streams",
                      session->id);
        h2_proxy_ihash_iter(session->streams, ping_arrived_iter, &session);
    }
}

typedef struct {
    h2_proxy_session *session;
    conn_rec *c;
    apr_off_t bytes;
    int updated;
} win_update_ctx;

98%


¤ Dauer der Verarbeitung: 0.25 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Entwurf

Ziele

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Ergonomie der
Schnittstellen

Diese beiden folgenden Angebotsgruppen bietet das Unternehmen

Angebot

Hier finden Sie eine Liste der Produkte des Unternehmens