/* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
*/
staticint bucket_is_mmap(apr_bucket *b)
{ #if APR_HAS_MMAP return APR_BUCKET_IS_MMAP(b); #else /* if it is not defined as enabled, it should always be no */ return 0; #endif
}
static apr_off_t bucket_mem_used(apr_bucket *b)
{ if (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) { return 0;
} else { /* should all have determinate length */ return (apr_off_t)b->length;
}
}
staticint report_consumption(h2_bucket_beam *beam, int locked)
{ int rv = 0;
apr_off_t len = beam->recv_bytes - beam->recv_bytes_reported;
h2_beam_io_callback *cb = beam->cons_io_cb;
if (len > 0) { if (cb) { void *ctx = beam->cons_ctx;
if (locked) apr_thread_mutex_unlock(beam->lock);
cb(ctx, beam, len); if (locked) apr_thread_mutex_lock(beam->lock);
rv = 1;
}
beam->recv_bytes_reported += len;
} return rv;
}
static apr_size_t calc_buffered(h2_bucket_beam *beam)
{
apr_size_t len = 0;
apr_bucket *b; for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
b = APR_BUCKET_NEXT(b)) { if (b->length == ((apr_size_t)-1)) { /* do not count */
} elseif (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b)) { /* if unread, has no real mem footprint. */
} else {
len += b->length;
}
} return len;
}
staticvoid purge_consumed_buckets(h2_bucket_beam *beam)
{
apr_bucket *b; /* delete all sender buckets in purge brigade, needs to be called
* from sender thread only */ while (!H2_BLIST_EMPTY(&beam->buckets_consumed)) {
b = H2_BLIST_FIRST(&beam->buckets_consumed); if(AP_BUCKET_IS_EOR(b)) {
APR_BUCKET_REMOVE(b);
H2_BLIST_INSERT_TAIL(&beam->buckets_eor, b);
} else {
apr_bucket_delete(b);
}
}
}
staticvoid purge_eor_buckets(h2_bucket_beam *beam)
{
apr_bucket *b; /* delete all sender buckets in purge brigade, needs to be called
* from sender thread only */ while (!H2_BLIST_EMPTY(&beam->buckets_eor)) {
b = H2_BLIST_FIRST(&beam->buckets_eor);
apr_bucket_delete(b);
}
}
(void)block; if (beam->aborted) {
rv = APR_ECONNABORTED; goto cleanup;
}
ap_assert(beam->pool);
b = APR_BRIGADE_FIRST(bb); if (APR_BUCKET_IS_METADATA(b)) {
APR_BUCKET_REMOVE(b);
apr_bucket_setaside(b, beam->pool);
H2_BLIST_INSERT_TAIL(&beam->buckets_to_send, b); goto cleanup;
} /* non meta bucket */
/* in case of indeterminate length, we need to read the bucket,
* so that it transforms itself into something stable. */ if (b->length == ((apr_size_t)-1)) {
rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); if (rv != APR_SUCCESS) goto cleanup;
}
if (APR_BUCKET_IS_FILE(b)) { /* For file buckets the problem is their internal readpool that * is used on the first read to allocate buffer/mmap. * Since setting aside a file bucket will de-register the * file cleanup function from the previous pool, we need to * call that only from the sender thread. * * Currently, we do not handle file bucket with refcount > 1 as * the beam is then not in complete control of the file's lifetime. * Which results in the bug that a file get closed by the receiver * while the sender or the beam still have buckets using it. * * Additionally, we allow callbacks to prevent beaming file * handles across. The use case for this is to limit the number * of open file handles and rather use a less efficient beam
* transport. */
apr_bucket_file *bf = b->data;
can_beam = !beam->copy_files && (bf->refcount.refcount == 1);
} elseif (bucket_is_mmap(b)) {
can_beam = !beam->copy_files;
}
if (!*pspace_left) {
rv = APR_EAGAIN; goto cleanup;
}
/* bucket is accepted and added to beam->buckets_to_send */ if (APR_BUCKET_IS_HEAP(b)) { /* For heap buckets, a read from a receiver thread is fine. The * data will be there and live until the bucket itself is
* destroyed. */
rv = apr_bucket_setaside(b, beam->pool); if (rv != APR_SUCCESS) goto cleanup;
} elseif (can_beam && (APR_BUCKET_IS_FILE(b) || bucket_is_mmap(b))) {
rv = apr_bucket_setaside(b, beam->pool); if (rv != APR_SUCCESS) goto cleanup;
} else { /* we know of no special shortcut to transfer the bucket to
* another pool without copying. So we make it a heap bucket. */
apr_bucket *b2;
rv = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); if (rv != APR_SUCCESS) goto cleanup; /* this allocates and copies data */
b2 = apr_bucket_heap_create(data, len, NULL, bb->bucket_alloc);
apr_bucket_delete(b);
b = b2;
APR_BRIGADE_INSERT_HEAD(bb, b);
}
/* Called from the sender thread to add buckets to the beam */
apr_thread_mutex_lock(beam->lock);
ap_assert(beam->from == from);
ap_assert(sender_bb);
H2_BEAM_LOG(beam, from, APLOG_TRACE2, rv, "start send", sender_bb);
purge_consumed_buckets(beam);
*pwritten = 0;
was_empty = buffer_is_empty(beam);
space_left = calc_space_left(beam); while (!APR_BRIGADE_EMPTY(sender_bb) && APR_SUCCESS == rv) {
rv = append_bucket(beam, sender_bb, block, &space_left, pwritten); if (beam->aborted) { goto cleanup;
} elseif (APR_EAGAIN == rv) { /* bucket was not added, as beam buffer has no space left. * Trigger event callbacks, so receiver can know there is something
* to receive before we do a conditional wait. */
purge_consumed_buckets(beam); if (beam->send_cb) {
beam->send_cb(beam->send_ctx, beam);
} if (was_empty && beam->was_empty_cb) {
beam->was_empty_cb(beam->was_empty_ctx, beam);
}
rv = wait_not_full(beam, from, block, &space_left); if (APR_SUCCESS != rv) { break;
}
was_empty = buffer_is_empty(beam);
}
}
cleanup: if (beam->send_cb && !buffer_is_empty(beam)) {
beam->send_cb(beam->send_ctx, beam);
} if (was_empty && beam->was_empty_cb && !buffer_is_empty(beam)) {
beam->was_empty_cb(beam->was_empty_ctx, beam);
}
apr_thread_cond_broadcast(beam->change);
/* transfer from our sender brigade, transforming sender buckets to
* receiver ones until we have enough */ while (remain >= 0 && !H2_BLIST_EMPTY(&beam->buckets_to_send)) {
if (APR_BUCKET_IS_METADATA(bsender)) { /* we need a real copy into the receivers bucket_alloc */ if (APR_BUCKET_IS_EOS(bsender)) { /* this closes the beam */
beam->closed = 1;
brecv = apr_bucket_eos_create(bb->bucket_alloc);
} elseif (APR_BUCKET_IS_FLUSH(bsender)) {
brecv = apr_bucket_flush_create(bb->bucket_alloc);
} #if AP_HAS_RESPONSE_BUCKETS elseif (AP_BUCKET_IS_RESPONSE(bsender)) {
brecv = ap_bucket_response_clone(bsender, bb->p, bb->bucket_alloc);
} elseif (AP_BUCKET_IS_REQUEST(bsender)) {
brecv = ap_bucket_request_clone(bsender, bb->p, bb->bucket_alloc);
} elseif (AP_BUCKET_IS_HEADERS(bsender)) {
brecv = ap_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc);
} #else elseif (H2_BUCKET_IS_HEADERS(bsender)) {
brecv = h2_bucket_headers_clone(bsender, bb->p, bb->bucket_alloc);
} #endif/* AP_HAS_RESPONSE_BUCKETS */ elseif (AP_BUCKET_IS_ERROR(bsender)) {
ap_bucket_error *eb = bsender->data;
brecv = ap_bucket_error_create(eb->status, eb->data,
bb->p, bb->bucket_alloc);
}
} elseif (bsender->length == 0) { /* nop */
} #if APR_HAS_MMAP elseif (APR_BUCKET_IS_MMAP(bsender)) {
apr_bucket_mmap *bmmap = bsender->data;
apr_mmap_t *mmap;
rv = apr_mmap_dup(&mmap, bmmap->mmap, bb->p); if (rv != APR_SUCCESS) goto leave;
brecv = apr_bucket_mmap_create(mmap, bsender->start, bsender->length, bb->bucket_alloc);
} #endif elseif (APR_BUCKET_IS_FILE(bsender)) { /* This is setaside into the target brigade pool so that * any read operation messes with that pool and not
* the sender one. */
apr_bucket_file *f = (apr_bucket_file *)bsender->data;
apr_file_t *fd = f->fd; int setaside = (f->readpool != bb->p);
if (setaside) {
rv = apr_file_setaside(&fd, fd, bb->p); if (rv != APR_SUCCESS) goto leave;
}
ng = apr_brigade_insert_file(bb, fd, bsender->start, (apr_off_t)bsender->length,
bb->p); #if APR_HAS_MMAP /* disable mmap handling as this leads to segfaults when * the underlying file is changed while memory pointer has
* been handed out. See also PR 59348 */
apr_bucket_file_enable_mmap(ng, 0); #endif
remain -= bsender->length;
++transferred;
} else { constchar *data;
apr_size_t dlen; /* we did that when the bucket was added, so this should * give us the same data as before without changing the bucket
* or anything (pool) connected to it. */
rv = apr_bucket_read(bsender, &data, &dlen, APR_BLOCK_READ); if (rv != APR_SUCCESS) goto leave;
rv = apr_brigade_write(bb, NULL, NULL, data, dlen); if (rv != APR_SUCCESS) goto leave;
remain -= dlen;
++transferred;
}
if (brecv) { /* we have a proxy that we can give the receiver */
APR_BRIGADE_INSERT_TAIL(bb, brecv);
remain -= brecv->length;
++transferred;
}
APR_BUCKET_REMOVE(bsender);
H2_BLIST_INSERT_TAIL(&beam->buckets_consumed, bsender);
beam->recv_bytes += bsender->length;
++consumed_buckets;
}
if (beam->recv_cb && consumed_buckets > 0) {
beam->recv_cb(beam->recv_ctx, beam);
}
for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
b = APR_BUCKET_NEXT(b)) { /* should all have determinate length */
l += b->length;
} return l;
}
apr_off_t h2_beam_get_buffered(h2_bucket_beam *beam)
{
apr_off_t l = 0;
apr_thread_mutex_lock(beam->lock);
l = get_buffered_data_len(beam);
apr_thread_mutex_unlock(beam->lock); return l;
}
apr_off_t h2_beam_get_mem_used(h2_bucket_beam *beam)
{
apr_bucket *b;
apr_off_t l = 0;
apr_thread_mutex_lock(beam->lock); for (b = H2_BLIST_FIRST(&beam->buckets_to_send);
b != H2_BLIST_SENTINEL(&beam->buckets_to_send);
b = APR_BUCKET_NEXT(b)) {
l += bucket_mem_used(b);
}
apr_thread_mutex_unlock(beam->lock); return l;
}
int h2_beam_empty(h2_bucket_beam *beam)
{ int empty = 1;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.