/* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
*/
/* We create a pool with its own allocator to be used for * processing a request. This is the only way to have the processing * independent of its parent pool in the sense that it can work in * another thread.
*/
rv = apr_allocator_create(&allocator); if (rv == APR_SUCCESS) {
apr_allocator_max_free_set(allocator, ap_max_mem_free);
rv = apr_pool_create_ex(&ptrans, m->pool, NULL, allocator);
} if (rv != APR_SUCCESS) { /* maybe the log goes through, maybe not. */
ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, m->c1,
APLOGNO(10004) "h2_mplx: create transit pool");
ap_abort_on_oom(); return NULL; /* should never be reached. */
}
/** * A h2_mplx needs to be thread-safe *and* if will be called by * the h2_session thread *and* the h2_worker threads. Therefore: * - calls are protected by a mutex lock, m->lock * - the pool needs its own allocator, since apr_allocator_t are * not re-entrant. The separate allocator works without a * separate lock since we already protect h2_mplx itself. * Since HTTP/2 connections can be expected to live longer than * their HTTP/1 cousins, the separate allocator seems to work better * than protecting a shared h2_session one with an own lock.
*/
h2_mplx *h2_mplx_c1_create(int child_num, apr_uint32_t id, h2_stream *stream0,
server_rec *s, apr_pool_t *parent,
h2_workers *workers)
{
h2_conn_ctx_t *conn_ctx;
apr_status_t status = APR_SUCCESS;
apr_allocator_t *allocator;
apr_thread_mutex_t *mutex = NULL;
h2_mplx *m = NULL;
/* We create a pool with its own allocator to be used for * processing secondary connections. This is the only way to have the * processing independent of its parent pool in the sense that it * can work in another thread. Also, the new allocator needs its own * mutex to synchronize sub-pools.
*/
status = apr_allocator_create(&allocator); if (status != APR_SUCCESS) {
allocator = NULL; goto failure;
}
apr_allocator_max_free_set(allocator, ap_max_mem_free);
apr_pool_create_ex(&m->pool, parent, NULL, allocator); if (!m->pool) goto failure;
/* take over event monitoring */
h2_stream_set_monitor(stream, NULL); /* Reset, should transit to CLOSED state */
h2_stream_rst(stream, H2_ERR_NO_ERROR); /* All connection data has been sent, simulate cleanup */
h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
m_stream_cleanup(m, stream); return 0;
}
staticvoid c1_purge_streams(h2_mplx *m);
void h2_mplx_c1_destroy(h2_mplx *m)
{
apr_status_t status; unsignedint i, wait_secs = 60; int old_aborted;
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_MPLX_MSG(m, "start release")); /* How to shut down a h2 connection:
* 0. abort and tell the workers that no more work will come from us */
m->shutdown = m->aborted = 1;
H2_MPLX_ENTER_ALWAYS(m);
/* While really terminating any c2 connections, treat the master * connection as aborted. It's not as if we could send any more data
* at this point. */
old_aborted = m->c1->aborted;
m->c1->aborted = 1;
/* How to shut down a h2 connection:
* 1. cancel all streams still active */
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c1,
H2_MPLX_MSG(m, "release, %u/%u/%d streams (total/hold/purge), %d streams"),
h2_ihash_count(m->streams),
h2_ihash_count(m->shold),
m->spurge->nelts, m->processing_count); while (!h2_ihash_iter(m->streams, m_stream_cancel_iter, m)) { /* until empty */
}
/* 2. no more streams should be scheduled or in the active set */
ap_assert(h2_ihash_empty(m->streams));
ap_assert(h2_iq_empty(m->q));
/* 3. while workers are busy on this connection, meaning they * are processing streams from this connection, wait on them finishing * in order to wake us and let us check again.
* Eventually, this has to succeed. */ if (!m->join_wait) {
apr_thread_cond_create(&m->join_wait, m->pool);
}
for (i = 0; h2_ihash_count(m->shold) > 0; ++i) {
status = apr_thread_cond_timedwait(m->join_wait, m->lock, apr_time_from_sec(wait_secs));
if (APR_STATUS_IS_TIMEUP(status)) { /* This can happen if we have very long running requests
* that do not time out on IO. */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1, APLOGNO(03198)
H2_MPLX_MSG(m, "waited %u sec for %u streams"),
i*wait_secs, h2_ihash_count(m->shold));
h2_ihash_iter(m->shold, m_report_stream_iter, m);
}
}
/* 4. With all workers done, all streams should be in spurge */
ap_assert(m->processing_count == 0); if (!h2_ihash_empty(m->shold)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c1, APLOGNO(03516)
H2_MPLX_MSG(m, "unexpected %u streams in hold"),
h2_ihash_count(m->shold));
h2_ihash_iter(m->shold, m_unexpected_stream_iter, m);
}
if (m->aborted) {
rv = APR_ECONNABORTED; goto cleanup;
} /* Purge (destroy) streams outside of pollset processing. * Streams that are registered in the pollset, will be removed * when they are destroyed, but the pollset works on copies * of these registrations. So, if we destroy streams while * processing pollset events, we might access freed memory.
*/ if (m->spurge->nelts) {
c1_purge_streams(m);
}
rv = mplx_pollset_poll(m, timeout, on_stream_input, on_stream_output, on_ctx);
if (conn_ctx && conn_ctx->stream_id) {
add_stream_poll_event(conn_ctx->mplx, conn_ctx->stream_id,
conn_ctx->mplx->streams_input_read);
}
}
staticvoid c2_beam_input_read_eagain(void *ctx, h2_bucket_beam *beam)
{
conn_rec *c = ctx;
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); /* installed in the input bucket beams when we use pipes. * Drain the pipe just before the beam returns APR_EAGAIN. * A clean state for allowing polling on the pipe to rest
* when the beam is empty */ if (conn_ctx && conn_ctx->pipe_in[H2_PIPE_OUT]) {
h2_util_drain_pipe(conn_ctx->pipe_in[H2_PIPE_OUT]);
}
}
AP_DEBUG_ASSERT(apr_atomic_read32(&conn_ctx->done) == 0);
apr_atomic_set32(&conn_ctx->done, 1);
conn_ctx->done_at = apr_time_now();
++c2->keepalives; /* From here on, the final handling of c2 is done by c1 processing.
* Which means we can give it c1's scoreboard handle for updates. */
c2->sbh = m->c1->sbh; #if AP_MODULE_MAGIC_AT_LEAST(20211221, 29)
ap_set_time_process_request(c2->sbh,conn_ctx->started_at,conn_ctx->done_at); #endif
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2, "h2_mplx(%s-%d): request done, %f ms elapsed",
conn_ctx->id, conn_ctx->stream_id,
(conn_ctx->done_at - conn_ctx->started_at) / 1000.0);
stream = h2_ihash_get(m->streams, conn_ctx->stream_id); if (stream) { /* stream not done yet. trigger a potential polling on the output
* since nothing more will happening here. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
H2_STRM_MSG(stream, "c2_done, stream open"));
c2_beam_output_write_notify(c2, NULL);
} elseif ((stream = h2_ihash_get(m->shold, conn_ctx->stream_id)) != NULL) { /* stream is done, was just waiting for this. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
H2_STRM_MSG(stream, "c2_done, in hold"));
c1c2_stream_joined(m, stream);
} else { int i;
for (i = 0; i < m->spurge->nelts; ++i) { if (stream == APR_ARRAY_IDX(m->spurge, i, h2_stream*)) {
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2,
H2_STRM_LOG(APLOGNO(03517), stream, "already in spurge"));
ap_assert("stream should not be in spurge" == NULL); return;
}
}
ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, c2, APLOGNO(03518) "h2_mplx(%s-%d): c2_done, stream not found",
conn_ctx->id, conn_ctx->stream_id);
ap_assert("stream should still be available" == NULL);
}
}
--m->processing_count;
s_c2_done(m, c2, conn_ctx); if (m->join_wait) apr_thread_cond_signal(m->join_wait);
H2_MPLX_LEAVE(m);
}
staticvoid workers_shutdown(void *baton, int graceful)
{
h2_mplx *m = baton;
apr_thread_mutex_lock(m->poll_lock); /* time to wakeup and assess what to do */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_MPLX_MSG(m, "workers shutdown, waking pollset"));
m->shutdown = 1; if (!graceful) {
m->aborted = 1;
}
apr_pollset_wakeup(m->pollset);
apr_thread_mutex_unlock(m->poll_lock);
}
/******************************************************************************* * h2_mplx DoS protection
******************************************************************************/
staticint reset_is_acceptable(h2_stream *stream)
{ /* client may terminate a stream via H2 RST_STREAM message at any time. * This is annyoing when we have committed resources (e.g. worker threads) * to it, so our mood (e.g. willingness to commit resources on this * connection in the future) goes down. * * This is a DoS protection. We do not want to make it too easy for * a client to eat up server resources. * * However: there are cases where a RST_STREAM is the only way to end * a request. This includes websockets and server-side-event streams (SSEs). * The responses to such requests continue forever otherwise. *
*/ if (!stream_is_running(stream)) return 1; if (!(stream->id & 0x01)) return 1; /* stream initiated by us. acceptable. */ if (!stream->response) return 0; /* no response headers produced yet. bad. */ if (!stream->out_data_frames) return 0; /* no response body data sent yet. bad. */ return 1; /* otherwise, be forgiving */
}
apr_status_t h2_mplx_c1_client_rst(h2_mplx *m, int stream_id, h2_stream *stream)
{
apr_status_t status = APR_SUCCESS; int registered;
H2_MPLX_ENTER_ALWAYS(m);
registered = (h2_ihash_get(m->streams, stream_id) != NULL); if (!stream) { /* a RST might arrive so late, we have already forgotten
* about it. Seems ok. */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
H2_MPLX_MSG(m, "RST on unknown stream %d"), stream_id);
AP_DEBUG_ASSERT(!registered);
} elseif (!registered) { /* a RST on a stream that mplx has not been told about, but
* which the session knows. Very early and annoying. */
ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c1,
H2_STRM_MSG(stream, "very early RST, drop"));
h2_stream_set_monitor(stream, NULL);
h2_stream_rst(stream, H2_ERR_STREAM_CLOSED);
h2_stream_dispatch(stream, H2_SEV_EOS_SENT);
m_stream_cleanup(m, stream);
m_be_annoyed(m);
} elseif (!reset_is_acceptable(stream)) {
m_be_annoyed(m);
}
H2_MPLX_LEAVE(m); return status;
}
/* Make sure we are not called recursively. */
ap_assert(!m->polling);
m->polling = 1; do {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_MPLX_MSG(m, "enter polling timeout=%d"),
(int)apr_time_sec(timeout));
for (i = 0; i < nresults; i++) {
pfd = &results[i];
conn_ctx = pfd->client_data;
AP_DEBUG_ASSERT(conn_ctx); if (conn_ctx->stream_id == 0) { if (on_stream_input) {
APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = m->stream0;
} break;
}
}
if (on_stream_input && m->streams_ev_in->nelts) {
H2_MPLX_LEAVE(m); for (i = 0; i < m->streams_ev_in->nelts; ++i) {
on_stream_input(on_ctx, APR_ARRAY_IDX(m->streams_ev_in, i, h2_stream*));
}
H2_MPLX_ENTER_ALWAYS(m);
} if (on_stream_output && m->streams_ev_out->nelts) {
H2_MPLX_LEAVE(m); for (i = 0; i < m->streams_ev_out->nelts; ++i) {
on_stream_output(on_ctx, APR_ARRAY_IDX(m->streams_ev_out, i, h2_stream*));
}
H2_MPLX_ENTER_ALWAYS(m);
} break;
} while(1);
cleanup:
m->polling = 0; return rv;
}
¤ Dauer der Verarbeitung: 0.28 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.