Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  event.c   Sprache: C

 
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


/**
 * This MPM tries to fix the 'keep alive problem' in HTTP.
 *
 * After a client completes the first request, the client can keep the
 * connection open to send more requests with the same socket.  This can save
 * significant overhead in creating TCP connections.  However, the major
 * disadvantage is that Apache traditionally keeps an entire child
 * process/thread waiting for data from the client.  To solve this problem,
 * this MPM has a dedicated thread for handling both the Listening sockets,
 * and all sockets that are in a Keep Alive status.
 *
 * The MPM assumes the underlying apr_pollset implementation is somewhat
 * threadsafe.  This currently is only compatible with KQueue and EPoll.  This
 * enables the MPM to avoid extra high level locking or having to wake up the
 * listener thread when a keep-alive socket needs to be sent to it.
 *
 * This MPM does not perform well on older platforms that do not have very good
 * threading, like Linux with a 2.4 kernel, but this does not matter, since we
 * require EPoll or KQueue.
 *
 * For FreeBSD, use 5.3.  It is possible to run this MPM on FreeBSD 5.2.1, if
 * you use libkse (see `man libmap.conf`).
 *
 * For NetBSD, use at least 2.0.
 *
 * For Linux, you should use a 2.6 kernel, and make sure your glibc has epoll
 * support compiled in.
 *
 */


#include "apr.h"
#include "apr_portable.h"
#include "apr_strings.h"
#include "apr_file_io.h"
#include "apr_thread_proc.h"
#include "apr_signal.h"
#include "apr_thread_mutex.h"
#include "apr_poll.h"
#include "apr_ring.h"
#include "apr_queue.h"
#include "apr_atomic.h"
#define APR_WANT_STRFUNC
#include "apr_want.h"
#include "apr_version.h"

#include <stdlib.h>

#if APR_HAVE_UNISTD_H
#include <unistd.h>
#endif
#if APR_HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif
#if APR_HAVE_SYS_WAIT_H
#include <sys/wait.h>
#endif
#ifdef HAVE_SYS_PROCESSOR_H
#include <sys/processor.h>      /* for bindprocessor() */
#endif

#if !APR_HAS_THREADS
#error The Event MPM requires APR threads, but they are unavailable.
#endif

#include "ap_config.h"
#include "httpd.h"
#include "http_main.h"
#include "http_log.h"
#include "http_config.h"        /* for read_config */
#include "http_core.h"          /* for get_remote_host */
#include "http_connection.h"
#include "http_protocol.h"
#include "ap_mpm.h"
#include "mpm_common.h"
#include "ap_listen.h"
#include "scoreboard.h"
#include "mpm_fdqueue.h"
#include "mpm_default.h"
#include "http_vhost.h"
#include "unixd.h"
#include "apr_skiplist.h"

#include <signal.h>
#include <limits.h>             /* for INT_MAX */


/* Limit on the total --- clients will be locked out if more servers than
 * this are needed.  It is intended solely to keep the server from crashing
 * when things get out of hand.
 *
 * We keep a hard maximum number of servers, for two reasons --- first off,
 * in case something goes seriously wrong, we want to stop the fork bomb
 * short of actually crashing the machine we're running on by filling some
 * kernel table.  Secondly, it keeps the size of the scoreboard file small
 * enough that we can read the whole thing without worrying too much about
 * the overhead.
 */

#ifndef DEFAULT_SERVER_LIMIT
#define DEFAULT_SERVER_LIMIT 16
#endif

/* Admin can't tune ServerLimit beyond MAX_SERVER_LIMIT.  We want
 * some sort of compile-time limit to help catch typos.
 */

#ifndef MAX_SERVER_LIMIT
#define MAX_SERVER_LIMIT 20000
#endif

/* Limit on the threads per process.  Clients will be locked out if more than
 * this are needed.
 *
 * We keep this for one reason it keeps the size of the scoreboard file small
 * enough that we can read the whole thing without worrying too much about
 * the overhead.
 */

#ifndef DEFAULT_THREAD_LIMIT
#define DEFAULT_THREAD_LIMIT 64
#endif

/* Admin can't tune ThreadLimit beyond MAX_THREAD_LIMIT.  We want
 * some sort of compile-time limit to help catch typos.
 */

#ifndef MAX_THREAD_LIMIT
#define MAX_THREAD_LIMIT 100000
#endif

#define MPM_CHILD_PID(i) (ap_scoreboard_image->parent[i].pid)

#if !APR_VERSION_AT_LEAST(1,4,0)
#define apr_time_from_msec(x) (x * 1000)
#endif

#define CONN_STATE_IS_LINGERING_CLOSE(s) ((s) >= CONN_STATE_LINGER && \
                                          (s) <= CONN_STATE_LINGER_SHORT)
#ifndef MAX_SECS_TO_LINGER
#define MAX_SECS_TO_LINGER 30
#endif
#define SECONDS_TO_LINGER  2

/*
 * Actual definitions of config globals
 */


#ifndef DEFAULT_WORKER_FACTOR
#define DEFAULT_WORKER_FACTOR 2
#endif
#define WORKER_FACTOR_SCALE   16  /* scale factor to allow fractional values */
static unsigned int worker_factor = DEFAULT_WORKER_FACTOR * WORKER_FACTOR_SCALE;
    /* AsyncRequestWorkerFactor * 16 */

static int threads_per_child = 0;           /* ThreadsPerChild */
static int ap_daemons_to_start = 0;         /* StartServers */
static int min_spare_threads = 0;           /* MinSpareThreads */
static int max_spare_threads = 0;           /* MaxSpareThreads */
static int active_daemons_limit = 0;        /* MaxRequestWorkers / ThreadsPerChild */
static int max_workers = 0;                 /* MaxRequestWorkers */
static int server_limit = 0;                /* ServerLimit */
static int thread_limit = 0;                /* ThreadLimit */
static int had_healthy_child = 0;
static volatile int dying = 0;
static volatile int workers_may_exit = 0;
static volatile int start_thread_may_exit = 0;
static volatile int listener_may_exit = 0;
static int listener_is_wakeable = 0;        /* Pollset supports APR_POLLSET_WAKEABLE */
static int num_listensocks = 0;
static apr_int32_t conns_this_child;        /* MaxConnectionsPerChild, only access
                                               in listener thread */

static apr_uint32_t connection_count = 0;   /* Number of open connections */
static apr_uint32_t lingering_count = 0;    /* Number of connections in lingering close */
static apr_uint32_t suspended_count = 0;    /* Number of suspended connections */
static apr_uint32_t clogged_count = 0;      /* Number of threads processing ssl conns */
static apr_uint32_t threads_shutdown = 0;   /* Number of threads that have shutdown
                                               early during graceful termination */

static int resource_shortage = 0;
static fd_queue_t *worker_queue;
static fd_queue_info_t *worker_queue_info;

static apr_thread_mutex_t *timeout_mutex;

module AP_MODULE_DECLARE_DATA mpm_event_module;

/* forward declare */
struct event_srv_cfg_s;
typedef struct event_srv_cfg_s event_srv_cfg;

static apr_pollfd_t *listener_pollfd;

/*
 * The pollset for sockets that are in any of the timeout queues. Currently
 * we use the timeout_mutex to make sure that connections are added/removed
 * atomically to/from both event_pollset and a timeout queue. Otherwise
 * some confusion can happen under high load if timeout queues and pollset
 * get out of sync.
 * XXX: It should be possible to make the lock unnecessary in many or even all
 * XXX: cases.
 */

static apr_pollset_t *event_pollset;

typedef struct event_conn_state_t event_conn_state_t;

/*
 * The chain of connections to be shutdown by a worker thread (deferred),
 * linked list updated atomically.
 */

static event_conn_state_t *volatile defer_linger_chain;

struct event_conn_state_t {
    /** APR_RING of expiration timeouts */
    APR_RING_ENTRY(event_conn_state_t) timeout_list;
    /** the time when the entry was queued */
    apr_time_t queue_timestamp;
    /** connection record this struct refers to */
    conn_rec *c;
    /** request record (if any) this struct refers to */
    request_rec *r;
    /** server config this struct refers to */
    event_srv_cfg *sc;
    /** scoreboard handle for the conn_rec */
    ap_sb_handle_t *sbh;
    /** is the current conn_rec suspended?  (disassociated with
     * a particular MPM thread; for suspend_/resume_connection
     * hooks)
     */

    int suspended;
    /** memory pool to allocate from */
    apr_pool_t *p;
    /** bucket allocator */
    apr_bucket_alloc_t *bucket_alloc;
    /** poll file descriptor information */
    apr_pollfd_t pfd;
    /** public parts of the connection state */
    conn_state_t pub;
    /** chaining in defer_linger_chain */
    struct event_conn_state_t *chain;
    unsigned int 
        /** Is lingering close from defer_lingering_close()? */
        deferred_linger :1,
        /** Has ap_start_lingering_close() been called? */
        linger_started  :1;
};

APR_RING_HEAD(timeout_head_t, event_conn_state_t);

struct timeout_queue {
    struct timeout_head_t head;
    apr_interval_time_t timeout;
    apr_uint32_t count;         /* for this queue */
    apr_uint32_t *total;        /* for all chained/related queues */
    struct timeout_queue *next; /* chaining */
};
/*
 * Several timeout queues that use different timeouts, so that we always can
 * simply append to the end.
 *   waitio_q           uses vhost's TimeOut
 *   write_completion_q uses vhost's TimeOut
 *   keepalive_q        uses vhost's KeepAliveTimeOut
 *   linger_q           uses MAX_SECS_TO_LINGER
 *   short_linger_q     uses SECONDS_TO_LINGER
 */

static struct timeout_queue *waitio_q,
                            *write_completion_q,
                            *keepalive_q,
                            *linger_q,
                            *short_linger_q;
static volatile apr_time_t  queues_next_expiry;

/* Prevent extra poll/wakeup calls for timeouts close in the future (queues
 * have the granularity of a second anyway).
 * XXX: Wouldn't 0.5s (instead of 0.1s) be "enough"?
 */

#define TIMEOUT_FUDGE_FACTOR apr_time_from_msec(100)

/*
 * Macros for accessing struct timeout_queue.
 * For TO_QUEUE_APPEND and TO_QUEUE_REMOVE, timeout_mutex must be held.
 */

static void TO_QUEUE_APPEND(struct timeout_queue *q, event_conn_state_t *el)
{
    apr_time_t elem_expiry;
    apr_time_t next_expiry;

    APR_RING_INSERT_TAIL(&q->head, el, event_conn_state_t, timeout_list);
    ++*q->total;
    ++q->count;

    /* Cheaply update the global queues_next_expiry with the one of the
     * first entry of this queue (oldest) if it expires before.
     */

    el = APR_RING_FIRST(&q->head);
    elem_expiry = el->queue_timestamp + q->timeout;
    next_expiry = queues_next_expiry;
    if (!next_expiry || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
        queues_next_expiry = elem_expiry;
        /* Unblock the poll()ing listener for it to update its timeout. */
        if (listener_is_wakeable) {
            apr_pollset_wakeup(event_pollset);
        }
    }
}

static void TO_QUEUE_REMOVE(struct timeout_queue *q, event_conn_state_t *el)
{
    APR_RING_REMOVE(el, timeout_list);
    APR_RING_ELEM_INIT(el, timeout_list);
    --*q->total;
    --q->count;
}

static struct timeout_queue *TO_QUEUE_MAKE(apr_pool_t *p, apr_time_t t,
                                           struct timeout_queue *ref)
{
    struct timeout_queue *q;
                                           
    q = apr_pcalloc(p, sizeof *q);
    APR_RING_INIT(&q->head, event_conn_state_t, timeout_list);
    q->total = (ref) ? ref->total : apr_pcalloc(p, sizeof *q->total);
    q->timeout = t;

    return q;
}

#define TO_QUEUE_ELEM_INIT(el) \
    APR_RING_ELEM_INIT((el), timeout_list)

/* The structure used to pass unique initialization info to each thread */
typedef struct
{
    int pslot;  /* process slot */
    int tslot;  /* worker slot of the thread */
} proc_info;

/* Structure used to pass information to the thread responsible for
 * creating the rest of the threads.
 */

typedef struct
{
    apr_thread_t **threads;
    apr_thread_t *listener;
    int child_num_arg;
    apr_threadattr_t *threadattr;
} thread_starter;

typedef enum
{
    PT_CSD,
    PT_ACCEPT
} poll_type_e;

typedef struct
{
    poll_type_e type;
    void *baton;
} listener_poll_type;

/* data retained by event across load/unload of the module
 * allocated on first call to pre-config hook; located on
 * subsequent calls to pre-config hook
 */

typedef struct event_retained_data {
    ap_unixd_mpm_retained_data *mpm;

    int first_server_limit;
    int first_thread_limit;
    int sick_child_detected;
    int maxclients_reported;
    int near_maxclients_reported;
    /*
     * The max child slot ever assigned, preserved across restarts.  Necessary
     * to deal with MaxRequestWorkers changes across AP_SIG_GRACEFUL restarts.
     * We use this value to optimize routines that have to scan the entire
     * scoreboard.
     */

    int max_daemon_used;

    /*
     * All running workers, active and shutting down, including those that
     * may be left from before a graceful restart.
     * Not kept up-to-date when shutdown is pending.
     */

    int total_daemons;
    /*
     * Workers that still active, i.e. are not shutting down gracefully.
     */

    int active_daemons;
    /*
     * idle_spawn_rate is the number of children that will be spawned on the
     * next maintenance cycle if there aren't enough idle servers.  It is
     * maintained per listeners bucket, doubled up to MAX_SPAWN_RATE, and
     * reset only when a cycle goes by without the need to spawn.
     */

    int *idle_spawn_rate;
#ifndef MAX_SPAWN_RATE
#define MAX_SPAWN_RATE        (32)
#endif
    int hold_off_on_exponential_spawning;
} event_retained_data;
static event_retained_data *retained;
 
typedef struct event_child_bucket {
    ap_pod_t *pod;
    ap_listen_rec *listeners;
} event_child_bucket;
static event_child_bucket *all_buckets, /* All listeners buckets */
                          *my_bucket;   /* Current child bucket */

struct event_srv_cfg_s {
    struct timeout_queue *io_q,
                         *wc_q,
                         *ka_q;
};

#define ID_FROM_CHILD_THREAD(c, t)    ((c * thread_limit) + t)

/* The event MPM respects a couple of runtime flags that can aid
 * in debugging. Setting the -DNO_DETACH flag will prevent the root process
 * from detaching from its controlling terminal. Additionally, setting
 * the -DONE_PROCESS flag (which implies -DNO_DETACH) will get you the
 * child_main loop running in the process which originally started up.
 * This gives you a pretty nice debugging environment.  (You'll get a SIGHUP
 * early in standalone_main; just continue through.  This is the server
 * trying to kill off any child processes which it might have lying
 * around --- Apache doesn't keep track of their pids, it just sends
 * SIGHUP to the process group, ignoring it in the root process.
 * Continue through and you'll be fine.).
 */


static int one_process = 0;

#ifdef DEBUG_SIGSTOP
int raise_sigstop_flags;
#endif

static apr_pool_t *pconf;       /* Pool for config stuff */
static apr_pool_t *pchild;      /* Pool for httpd child stuff */
static apr_pool_t *pruntime;    /* Pool for MPM threads stuff */

static pid_t ap_my_pid;         /* Linux getpid() doesn't work except in main
                                   thread. Use this instead */

static pid_t parent_pid;
static apr_os_thread_t *listener_os_thread;

static int ap_child_slot;       /* Current child process slot in scoreboard */

/* The LISTENER_SIGNAL signal will be sent from the main thread to the
 * listener thread to wake it up for graceful termination (what a child
 * process from an old generation does when the admin does "apachectl
 * graceful").  This signal will be blocked in all threads of a child
 * process except for the listener thread.
 */

#define LISTENER_SIGNAL     SIGHUP

/* An array of socket descriptors in use by each thread used to
 * perform a non-graceful (forced) shutdown of the server.
 */

static apr_socket_t **worker_sockets;

static volatile apr_uint32_t listensocks_disabled;

static void disable_listensocks(void)
{
    int i;
    if (apr_atomic_cas32(&listensocks_disabled, 1, 0) != 0) {
        return;
    }
    if (event_pollset) {
        for (i = 0; i < num_listensocks; i++) {
            apr_pollset_remove(event_pollset, &listener_pollfd[i]);
        }
    }
    ap_scoreboard_image->parent[ap_child_slot].not_accepting = 1;
}

static void enable_listensocks(void)
{
    int i;
    if (listener_may_exit
            || apr_atomic_cas32(&listensocks_disabled, 0, 1) != 1) {
        return;
    }
    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf, APLOGNO(00457)
                 "Accepting new connections again: "
                 "%u active conns (%u lingering/%u clogged/%u suspended), "
                 "%u idle workers",
                 apr_atomic_read32(&connection_count),
                 apr_atomic_read32(&lingering_count),
                 apr_atomic_read32(&clogged_count),
                 apr_atomic_read32(&suspended_count),
                 ap_queue_info_num_idlers(worker_queue_info));
    for (i = 0; i < num_listensocks; i++)
        apr_pollset_add(event_pollset, &listener_pollfd[i]);
    /*
     * XXX: This is not yet optimal. If many workers suddenly become available,
     * XXX: the parent may kill some processes off too soon.
     */

    ap_scoreboard_image->parent[ap_child_slot].not_accepting = 0;
}

static APR_INLINE apr_uint32_t listeners_disabled(void)
{
    return apr_atomic_read32(&listensocks_disabled);
}

static APR_INLINE int connections_above_limit(int *busy)
{
    apr_uint32_t i_count = ap_queue_info_num_idlers(worker_queue_info);
    if (i_count > 0) {
        apr_uint32_t c_count = apr_atomic_read32(&connection_count);
        apr_uint32_t l_count = apr_atomic_read32(&lingering_count);
        if (c_count <= l_count
                /* Off by 'listeners_disabled()' to avoid flip flop */
                || c_count - l_count < (apr_uint32_t)threads_per_child +
                                       (i_count - listeners_disabled()) *
                                       (worker_factor / WORKER_FACTOR_SCALE)) {
            return 0;
        }
    }
    else if (busy) {
        *busy = 1;
    }
    return 1;
}

static APR_INLINE int should_enable_listensocks(void)
{
    return !dying && listeners_disabled() && !connections_above_limit(NULL);
}

static void close_socket_nonblocking_(apr_socket_t *csd,
                                      const char *from, int line)
{
    apr_status_t rv;
    apr_os_sock_t fd = -1;

    /* close_worker_sockets() may have closed it already */
    rv = apr_os_sock_get(&fd, csd);
    ap_log_error(APLOG_MARK, APLOG_TRACE8, 0, ap_server_conf,
                "closing socket %i/%pp from %s:%i", (int)fd, csd, from, line);
    if (rv == APR_SUCCESS && fd == -1) {
        return;
    }

    apr_socket_timeout_set(csd, 0);
    rv = apr_socket_close(csd);
    if (rv != APR_SUCCESS) {
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(00468)
                     "error closing socket");
        AP_DEBUG_ASSERT(0);
    }
}
#define close_socket_nonblocking(csd) \
    close_socket_nonblocking_(csd, __FUNCTION__, __LINE__)

static void close_worker_sockets(void)
{
    int i;
    for (i = 0; i < threads_per_child; i++) {
        apr_socket_t *csd = worker_sockets[i];
        if (csd) {
            worker_sockets[i] = NULL;
            close_socket_nonblocking(csd);
        }
    }
}

static void wakeup_listener(void)
{
    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
                 "wake up listener%s", listener_may_exit ? " again" : "");

    listener_may_exit = 1;
    disable_listensocks();

    /* Unblock the listener if it's poll()ing */
    if (event_pollset && listener_is_wakeable) {
        apr_pollset_wakeup(event_pollset);
    }

    /* unblock the listener if it's waiting for a worker */
    if (worker_queue_info) {
        ap_queue_info_term(worker_queue_info);
    }

    if (!listener_os_thread) {
        /* XXX there is an obscure path that this doesn't handle perfectly:
         *     right after listener thread is created but before
         *     listener_os_thread is set, the first worker thread hits an
         *     error and starts graceful termination
         */

        return;
    }
    /*
     * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all
     * platforms and wake up the listener thread since it is the only thread
     * with SIGHUP unblocked, but that doesn't work on Linux
     */

#ifdef HAVE_PTHREAD_KILL
    pthread_kill(*listener_os_thread, LISTENER_SIGNAL);
#else
    kill(ap_my_pid, LISTENER_SIGNAL);
#endif
}

#define ST_INIT              0
#define ST_GRACEFUL          1
#define ST_UNGRACEFUL        2

static int terminate_mode = ST_INIT;

static void signal_threads(int mode)
{
    if (terminate_mode >= mode) {
        return;
    }
    terminate_mode = mode;
    retained->mpm->mpm_state = AP_MPMQ_STOPPING;

    /* in case we weren't called from the listener thread, wake up the
     * listener thread
     */

    wakeup_listener();

    /* for ungraceful termination, let the workers exit now;
     * for graceful termination, the listener thread will notify the
     * workers to exit once it has stopped accepting new connections
     */

    if (mode == ST_UNGRACEFUL) {
        workers_may_exit = 1;
        ap_queue_interrupt_all(worker_queue);
        close_worker_sockets(); /* forcefully kill all current connections */
    }

    ap_run_child_stopping(pchild, mode == ST_GRACEFUL);
}

static int event_query(int query_code, int *result, apr_status_t *rv)
{
    *rv = APR_SUCCESS;
    switch (query_code) {
    case AP_MPMQ_MAX_DAEMON_USED:
        *result = retained->max_daemon_used;
        break;
    case AP_MPMQ_IS_THREADED:
        *result = AP_MPMQ_STATIC;
        break;
    case AP_MPMQ_IS_FORKED:
        *result = AP_MPMQ_DYNAMIC;
        break;
    case AP_MPMQ_IS_ASYNC:
        *result = 1;
        break;
    case AP_MPMQ_HARD_LIMIT_DAEMONS:
        *result = server_limit;
        break;
    case AP_MPMQ_HARD_LIMIT_THREADS:
        *result = thread_limit;
        break;
    case AP_MPMQ_MAX_THREADS:
        *result = threads_per_child;
        break;
    case AP_MPMQ_MIN_SPARE_DAEMONS:
        *result = 0;
        break;
    case AP_MPMQ_MIN_SPARE_THREADS:
        *result = min_spare_threads;
        break;
    case AP_MPMQ_MAX_SPARE_DAEMONS:
        *result = 0;
        break;
    case AP_MPMQ_MAX_SPARE_THREADS:
        *result = max_spare_threads;
        break;
    case AP_MPMQ_MAX_REQUESTS_DAEMON:
        *result = ap_max_requests_per_child;
        break;
    case AP_MPMQ_MAX_DAEMONS:
        *result = active_daemons_limit;
        break;
    case AP_MPMQ_MPM_STATE:
        *result = retained->mpm->mpm_state;
        break;
    case AP_MPMQ_GENERATION:
        *result = retained->mpm->my_generation;
        break;
    case AP_MPMQ_CAN_WAITIO:
        *result = 1;
        break;
    default:
        *rv = APR_ENOTIMPL;
        break;
    }
    return OK;
}

static void event_note_child_stopped(int slot, pid_t pid, ap_generation_t gen)
{
    if (slot != -1) { /* child had a scoreboard slot? */
        process_score *ps = &ap_scoreboard_image->parent[slot];
        int i;

        pid = ps->pid;
        gen = ps->generation;
        for (i = 0; i < threads_per_child; i++) {
            ap_update_child_status_from_indexes(slot, i, SERVER_DEAD, NULL);
        }
        ap_run_child_status(ap_server_conf, pid, gen, slot, MPM_CHILD_EXITED);
        if (ps->quiescing != 2) { /* vs perform_idle_server_maintenance() */
            retained->active_daemons--;
        }
        retained->total_daemons--;
        ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
                     "Child %d stopped: pid %d, gen %d, "
                     "active %d/%d, total %d/%d/%d, quiescing %d",
                     slot, (int)pid, (int)gen,
                     retained->active_daemons, active_daemons_limit,
                     retained->total_daemons, retained->max_daemon_used,
                     server_limit, ps->quiescing);
        ps->not_accepting = 0;
        ps->quiescing = 0;
        ps->pid = 0;
    }
    else {
        ap_run_child_status(ap_server_conf, pid, gen, -1, MPM_CHILD_EXITED);
    }
}

static void event_note_child_started(int slot, pid_t pid)
{
    ap_generation_t gen = retained->mpm->my_generation;

    retained->total_daemons++;
    retained->active_daemons++;
    ap_scoreboard_image->parent[slot].pid = pid;
    ap_scoreboard_image->parent[slot].generation = gen;
    ap_run_child_status(ap_server_conf, pid, gen, slot, MPM_CHILD_STARTED);
    ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ap_server_conf,
                 "Child %d started: pid %d, gen %d, "
                 "active %d/%d, total %d/%d/%d",
                 slot, (int)pid, (int)gen,
                 retained->active_daemons, active_daemons_limit,
                 retained->total_daemons, retained->max_daemon_used,
                 server_limit);
}

static const char *event_get_name(void)
{
    return "event";
}

/* a clean exit from a child with proper cleanup */
static void clean_child_exit(int code) __attribute__ ((noreturn));
static void clean_child_exit(int code)
{
    retained->mpm->mpm_state = AP_MPMQ_STOPPING;
    if (terminate_mode == ST_INIT) {
        ap_run_child_stopping(pchild, 0);
    }

    if (pchild) {
        apr_pool_destroy(pchild);
    }

    if (one_process) {
        event_note_child_stopped(/* slot */ 0, 0, 0);
    }

    exit(code);
}

static void just_die(int sig)
{
    clean_child_exit(0);
}

/*****************************************************************
 * Connection structures and accounting...
 */


static int child_fatal;

static apr_status_t decrement_connection_count(void *cs_)
{
    int is_last_connection;
    event_conn_state_t *cs = cs_;
    ap_log_cerror(APLOG_MARK, APLOG_TRACE8, 0, cs->c,
                  "cleanup connection from state %i", (int)cs->pub.state);
    switch (cs->pub.state) {
        case CONN_STATE_LINGER:
        case CONN_STATE_LINGER_NORMAL:
        case CONN_STATE_LINGER_SHORT:
            apr_atomic_dec32(&lingering_count);
            break;
        case CONN_STATE_SUSPENDED:
            apr_atomic_dec32(&suspended_count);
            break;
        default:
            break;
    }
    /* Unblock the listener if it's waiting for connection_count = 0,
     * or if the listening sockets were disabled due to limits and can
     * now accept new connections.
     */

    is_last_connection = !apr_atomic_dec32(&connection_count);
    if (listener_is_wakeable
            && ((is_last_connection && listener_may_exit)
                || should_enable_listensocks())) {
        apr_pollset_wakeup(event_pollset);
    }
    if (dying) {
        /* Help worker_thread_should_exit_early() */
        ap_queue_interrupt_one(worker_queue);
    }
    return APR_SUCCESS;
}

static void notify_suspend(event_conn_state_t *cs)
{
    ap_run_suspend_connection(cs->c, cs->r);
    cs->c->sbh = NULL;
    cs->suspended = 1;
}

static void notify_resume(event_conn_state_t *cs, int cleanup)
{
    cs->suspended = 0;
    cs->c->sbh = cleanup ? NULL : cs->sbh;
    ap_run_resume_connection(cs->c, cs->r);
}

/*
 * Defer flush and close of the connection by adding it to defer_linger_chain,
 * for a worker to grab it and do the job (should that be blocking).
 * Pre-condition: nonblocking, can be called from anywhere provided cs is not
 *                in any timeout queue or in the pollset.
 */

static int defer_lingering_close(event_conn_state_t *cs)
{
    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
                  "deferring close from state %i", (int)cs->pub.state);

    /* The connection is not shutdown() yet strictly speaking, but it's not
     * in any queue nor handled by a worker either (will be very soon), so
     * to account for it somewhere we bump lingering_count now (and set
     * deferred_linger for process_lingering_close() to know).
     */

    cs->pub.state = CONN_STATE_LINGER;
    apr_atomic_inc32(&lingering_count);
    cs->deferred_linger = 1;
    for (;;) {
        event_conn_state_t *chain = cs->chain = defer_linger_chain;
        if (apr_atomic_casptr((void *)&defer_linger_chain, cs,
                              chain) != chain) {
            /* Race lost, try again */
            continue;
        }
        return 1;
    }
}

/* Close the connection and release its resources (ptrans), either because an
 * unrecoverable error occured (queues or pollset add/remove) or more usually
 * if lingering close timed out.
 * Pre-condition: nonblocking, can be called from anywhere provided cs is not
 *                in any timeout queue or in the pollset.
 */

static void close_connection(event_conn_state_t *cs)
{
    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
                  "closing connection from state %i", (int)cs->pub.state);

    close_socket_nonblocking(cs->pfd.desc.s);
    ap_queue_info_push_pool(worker_queue_info, cs->p);
}

/* Shutdown the connection in case of timeout, error or resources shortage.
 * This starts short lingering close if not already there, or directly closes
 * the connection otherwise.
 * Pre-condition: nonblocking, can be called from anywhere provided cs is not
 *                in any timeout queue or in the pollset.
 */

static int shutdown_connection(event_conn_state_t *cs)
{
    if (!CONN_STATE_IS_LINGERING_CLOSE(cs->pub.state)) {
        apr_table_setn(cs->c->notes, "short-lingering-close""1");
        defer_lingering_close(cs);
    }
    else {
        close_connection(cs);
    }
    return 1;
}

/*
 * This runs before any non-MPM cleanup code on the connection;
 * if the connection is currently suspended as far as modules
 * know, provide notification of resumption.
 */

static apr_status_t ptrans_pre_cleanup(void *dummy)
{
    event_conn_state_t *cs = dummy;

    if (cs->suspended) {
        notify_resume(cs, 1);
    }
    return APR_SUCCESS;
}

/*
 * event_pre_read_request() and event_request_cleanup() track the
 * current r for a given connection.
 */

static apr_status_t event_request_cleanup(void *dummy)
{
    conn_rec *c = dummy;
    event_conn_state_t *cs = ap_get_module_config(c->conn_config,
                                                  &mpm_event_module);

    cs->r = NULL;
    return APR_SUCCESS;
}

static void event_pre_read_request(request_rec *r, conn_rec *c)
{
    event_conn_state_t *cs = ap_get_module_config(c->conn_config,
                                                  &mpm_event_module);

    cs->r = r;
    cs->sc = ap_get_module_config(ap_server_conf->module_config,
                                  &mpm_event_module);
    apr_pool_cleanup_register(r->pool, c, event_request_cleanup,
                              apr_pool_cleanup_null);
}

/*
 * event_post_read_request() tracks the current server config for a
 * given request.
 */

static int event_post_read_request(request_rec *r)
{
    conn_rec *c = r->connection;
    event_conn_state_t *cs = ap_get_module_config(c->conn_config,
                                                  &mpm_event_module);

    /* To preserve legacy behaviour (consistent with other MPMs), use
     * the keepalive timeout from the base server (first on this IP:port)
     * when none is explicitly configured on this server.
     */

    if (r->server->keep_alive_timeout_set) {
        cs->sc = ap_get_module_config(r->server->module_config,
                                      &mpm_event_module);
    }
    else {
        cs->sc = ap_get_module_config(c->base_server->module_config,
                                      &mpm_event_module);
    }
    return OK;
}

/* Forward declare */
static void process_lingering_close(event_conn_state_t *cs);

static void update_reqevents_from_sense(event_conn_state_t *cs,
                                        int default_sense)
{
    int sense = default_sense;

    if (cs->pub.sense != CONN_SENSE_DEFAULT) {
        sense = cs->pub.sense;

        /* Reset to default for the next round */
        cs->pub.sense = CONN_SENSE_DEFAULT;
    }

    if (sense == CONN_SENSE_WANT_READ) {
        cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP;
    }
    else {
        cs->pfd.reqevents = APR_POLLOUT;
    }
    /* POLLERR is usually returned event only, but some pollset
     * backends may require it in reqevents to do the right thing,
     * so it shouldn't hurt (ignored otherwise).
     */

    cs->pfd.reqevents |= APR_POLLERR;
}

/*
 * process one connection in the worker
 */

static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * sock,
                          event_conn_state_t * cs, int my_child_num,
                          int my_thread_num)
{
    conn_rec *c;
    long conn_id = ID_FROM_CHILD_THREAD(my_child_num, my_thread_num);
    int clogging = 0;
    apr_status_t rv;
    int rc = OK;

    if (cs == NULL) {           /* This is a new connection */
        listener_poll_type *pt = apr_pcalloc(p, sizeof(*pt));
        cs = apr_pcalloc(p, sizeof(event_conn_state_t));
        cs->bucket_alloc = apr_bucket_alloc_create(p);
        ap_create_sb_handle(&cs->sbh, p, my_child_num, my_thread_num);
        c = ap_run_create_connection(p, ap_server_conf, sock,
                                     conn_id, cs->sbh, cs->bucket_alloc);
        if (!c) {
            ap_queue_info_push_pool(worker_queue_info, p);
            return;
        }
        apr_atomic_inc32(&connection_count);
        apr_pool_cleanup_register(c->pool, cs, decrement_connection_count,
                                  apr_pool_cleanup_null);
        ap_set_module_config(c->conn_config, &mpm_event_module, cs);
        c->current_thread = thd;
        c->cs = &cs->pub;
        cs->c = c;
        cs->p = p;
        cs->sc = ap_get_module_config(ap_server_conf->module_config,
                                      &mpm_event_module);
        cs->pfd.desc_type = APR_POLL_SOCKET;
        cs->pfd.desc.s = sock;
        pt->type = PT_CSD;
        pt->baton = cs;
        cs->pfd.client_data = pt;
        apr_pool_pre_cleanup_register(p, cs, ptrans_pre_cleanup);
        TO_QUEUE_ELEM_INIT(cs);

        ap_update_vhost_given_ip(c);

        rc = ap_pre_connection(c, sock);
        if (rc != OK && rc != DONE) {
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(00469)
                          "process_socket: connection aborted");
            close_connection(cs);
            return;
        }

        /**
         * XXX If the platform does not have a usable way of bundling
         * accept() with a socket readability check, like Win32,
         * and there are measurable delays before the
         * socket is readable due to the first data packet arriving,
         * it might be better to create the cs on the listener thread
         * with the state set to CONN_STATE_KEEPALIVE
         *
         * FreeBSD users will want to enable the HTTP accept filter
         * module in their kernel for the highest performance
         * When the accept filter is active, sockets are kept in the
         * kernel until a HTTP request is received.
         */

        cs->pub.state = CONN_STATE_PROCESSING;
        cs->pub.sense = CONN_SENSE_DEFAULT;
    }
    else {
        c = cs->c;
        ap_update_sb_handle(cs->sbh, my_child_num, my_thread_num);
        notify_resume(cs, 0);
        c->current_thread = thd;
        /* Subsequent request on a conn, and thread number is part of ID */
        c->id = conn_id;
    }

    if (CONN_STATE_IS_LINGERING_CLOSE(cs->pub.state)) {
        goto lingering_close;
    }

    if (cs->pub.state == CONN_STATE_PROCESSING
        /* If we have an input filter which 'clogs' the input stream,
         * like mod_ssl used to, lets just do the normal read from input
         * filters, like the Worker MPM does. Filters that need to write
         * where they would otherwise read, or read where they would
         * otherwise write, should set the sense appropriately.
         */

         || c->clogging_input_filters) {
 process_connection:
        cs->pub.state = CONN_STATE_PROCESSING;

        clogging = c->clogging_input_filters;
        if (clogging) {
            apr_atomic_inc32(&clogged_count);
        }
        rc = ap_run_process_connection(c);
        if (clogging) {
            apr_atomic_dec32(&clogged_count);
        }
        /*
         * The process_connection hooks should set the appropriate connection
         * state upon return, for event MPM to either:
         * - CONN_STATE_LINGER: do lingering close;
         * - CONN_STATE_WRITE_COMPLETION: flush pending outputs using Timeout
         *   and wait for next incoming data using KeepAliveTimeout, then come
         *   back to process_connection() hooks;
         * - CONN_STATE_SUSPENDED: suspend the connection such that it now
         *   interacts with the MPM through suspend/resume_connection() hooks,
         *   and/or registered poll callbacks (PT_USER), and/or registered
         *   timed callbacks triggered by timer events;
         * - CONN_STATE_ASYNC_WAITIO: wait for read/write-ability of the underlying
         *   socket using Timeout and come back to process_connection() hooks when
         *   ready;
         * - CONN_STATE_KEEPALIVE: now handled by CONN_STATE_WRITE_COMPLETION
         *   to flush before waiting for next data (that might depend on it).
         * If a process_connection hook returns an error or no hook sets the state
         * to one of the above expected value, forcibly close the connection w/
         * CONN_STATE_LINGER.  This covers the cases where no process_connection
         * hook executes (DECLINED), or one returns OK w/o touching the state (i.e.
         * CONN_STATE_PROCESSING remains after the call) which can happen with
         * third-party modules not updated to work specifically with event MPM
         * while this was expected to do lingering close unconditionally with
         * worker or prefork MPMs for instance.
         */

        switch (rc) {
        case DONE:
            rc = OK; /* same as OK, fall through */
        case OK:
            if (cs->pub.state == CONN_STATE_PROCESSING) {
                cs->pub.state = CONN_STATE_LINGER;
            }
            else if (cs->pub.state == CONN_STATE_KEEPALIVE) {
                cs->pub.state = CONN_STATE_WRITE_COMPLETION;
            }
            break;
        }
        if (rc != OK || (cs->pub.state != CONN_STATE_LINGER
                         && cs->pub.state != CONN_STATE_ASYNC_WAITIO
                         && cs->pub.state != CONN_STATE_WRITE_COMPLETION
                         && cs->pub.state != CONN_STATE_SUSPENDED)) {
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(10111)
                          "process_socket: connection processing returned %i "
                          "(%sstate %i): closing",
                          rc, rc ? "" : "unexpected ", (int)cs->pub.state);
            cs->pub.state = CONN_STATE_LINGER;
        }
        else if (c->aborted) {
            cs->pub.state = CONN_STATE_LINGER;
        }
        if (cs->pub.state == CONN_STATE_LINGER) {
            goto lingering_close;
        }
    }

    if (cs->pub.state == CONN_STATE_ASYNC_WAITIO) {
        /* Set a read/write timeout for this connection, and let the
         * event thread poll for read/writeability.
         */

        cs->queue_timestamp = apr_time_now();
        notify_suspend(cs);

        ap_update_child_status(cs->sbh, SERVER_BUSY_READ, NULL);

        /* Modules might set c->cs->sense to CONN_SENSE_WANT_WRITE,
         * the default is CONN_SENSE_WANT_READ still.
         */

        update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
        apr_thread_mutex_lock(timeout_mutex);
        TO_QUEUE_APPEND(cs->sc->io_q, cs);
        rv = apr_pollset_add(event_pollset, &cs->pfd);
        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
            AP_DEBUG_ASSERT(0);
            TO_QUEUE_REMOVE(cs->sc->io_q, cs);
            apr_thread_mutex_unlock(timeout_mutex);
            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(10503)
                         "process_socket: apr_pollset_add failure in "
                         "CONN_STATE_ASYNC_WAITIO");
            close_connection(cs);
            signal_threads(ST_GRACEFUL);
        }
        else {
            apr_thread_mutex_unlock(timeout_mutex);
        }
        return;
    }

    if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) {
        ap_filter_t *output_filter = c->output_filters;
        apr_status_t rv;

        /* Flush all pending outputs before going to CONN_STATE_KEEPALIVE or
         * straight to CONN_STATE_PROCESSING if inputs are pending already.
         */

        
        ap_update_child_status(cs->sbh, SERVER_BUSY_WRITE, NULL);
        while (output_filter->next != NULL) {
            output_filter = output_filter->next;
        }
        rv = output_filter->frec->filter_func.out_func(output_filter, NULL);
        if (rv != APR_SUCCESS) {
            ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470)
                          "network write failure in core output filter");
            cs->pub.state = CONN_STATE_LINGER;
            goto lingering_close;
        }
        if (c->data_in_output_filters || cs->pub.sense == CONN_SENSE_WANT_READ) {
            /* Still in WRITE_COMPLETION_STATE:
             * Set a read/write timeout for this connection, and let the
             * event thread poll for read/writeability.
             */

            cs->queue_timestamp = apr_time_now();
            notify_suspend(cs);

            /* Add work to pollset. */
            update_reqevents_from_sense(cs, CONN_SENSE_WANT_WRITE);
            apr_thread_mutex_lock(timeout_mutex);
            TO_QUEUE_APPEND(cs->sc->wc_q, cs);
            rv = apr_pollset_add(event_pollset, &cs->pfd);
            if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
                AP_DEBUG_ASSERT(0);
                TO_QUEUE_REMOVE(cs->sc->wc_q, cs);
                apr_thread_mutex_unlock(timeout_mutex);
                ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465)
                             "process_socket: apr_pollset_add failure in "
                             "CONN_STATE_WRITE_COMPLETION");
                close_connection(cs);
                signal_threads(ST_GRACEFUL);
            }
            else {
                apr_thread_mutex_unlock(timeout_mutex);
            }
            return;
        }
        if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted) {
            cs->pub.state = CONN_STATE_LINGER;
            goto lingering_close;
        }
        if (c->data_in_input_filters) {
            goto process_connection;
        }
        if (listener_may_exit) {
            cs->pub.state = CONN_STATE_LINGER;
            goto lingering_close;
        }

        /* Fall through */
        cs->pub.state = CONN_STATE_KEEPALIVE;
    }

    if (cs->pub.state == CONN_STATE_KEEPALIVE) {
        ap_update_child_status(cs->sbh, SERVER_BUSY_KEEPALIVE, NULL);

        /* It greatly simplifies the logic to use a single timeout value per q
         * because the new element can just be added to the end of the list and
         * it will stay sorted in expiration time sequence.  If brand new
         * sockets are sent to the event thread for a readability check, this
         * will be a slight behavior change - they use the non-keepalive
         * timeout today.  With a normal client, the socket will be readable in
         * a few milliseconds anyway.
         */

        cs->queue_timestamp = apr_time_now();
        notify_suspend(cs);

        /* Add work to pollset. */
        cs->pub.sense = CONN_SENSE_DEFAULT;
        update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
        apr_thread_mutex_lock(timeout_mutex);
        TO_QUEUE_APPEND(cs->sc->ka_q, cs);
        rv = apr_pollset_add(event_pollset, &cs->pfd);
        if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
            AP_DEBUG_ASSERT(0);
            TO_QUEUE_REMOVE(cs->sc->ka_q, cs);
            apr_thread_mutex_unlock(timeout_mutex);
            ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03093)
                         "process_socket: apr_pollset_add failure for "
                         "keep alive");
            close_connection(cs);
            signal_threads(ST_GRACEFUL);
        }
        else {
            apr_thread_mutex_unlock(timeout_mutex);
        }
        return;
    }

    if (cs->pub.state == CONN_STATE_SUSPENDED) {
        apr_atomic_inc32(&suspended_count);
        notify_suspend(cs);
        return;
    }

 lingering_close:
    /* CONN_STATE_LINGER[_*] fall through process_lingering_close() */
    process_lingering_close(cs);
}

/* conns_this_child has gone to zero or below.  See if the admin coded
   "MaxConnectionsPerChild 0", and keep going in that case.  Doing it this way
   simplifies the hot path in worker_thread */

static void check_infinite_requests(void)
{
    if (ap_max_requests_per_child) {
        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
                     "Stopping process due to MaxConnectionsPerChild");
        signal_threads(ST_GRACEFUL);
    }
    /* keep going */
    conns_this_child = APR_INT32_MAX;
}

static int close_listeners(int *closed)
{
    ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
                 "clos%s listeners (connection_count=%u)",
                 *closed ? "ed" : "ing", apr_atomic_read32(&connection_count));
    if (!*closed) {
        int i;

        ap_close_listeners_ex(my_bucket->listeners);
        *closed = 1; /* once */

        dying = 1;
        ap_scoreboard_image->parent[ap_child_slot].quiescing = 1;
        for (i = 0; i < threads_per_child; ++i) {
            ap_update_child_status_from_indexes(ap_child_slot, i,
                                                SERVER_GRACEFUL, NULL);
        }
        /* wake up the main thread */
        kill(ap_my_pid, SIGTERM);

        ap_queue_info_free_idle_pools(worker_queue_info);
        ap_queue_interrupt_all(worker_queue);

        return 1;
    }
    return 0;
}

static void unblock_signal(int sig)
{
    sigset_t sig_mask;

    sigemptyset(&sig_mask);
    sigaddset(&sig_mask, sig);
#if defined(SIGPROCMASK_SETS_THREAD_MASK)
    sigprocmask(SIG_UNBLOCK, &sig_mask, NULL);
#else
    pthread_sigmask(SIG_UNBLOCK, &sig_mask, NULL);
#endif
}

static void dummy_signal_handler(int sig)
{
    /* XXX If specifying SIG_IGN is guaranteed to unblock a syscall,
     *     then we don't need this goofy function.
     */

}


static apr_status_t push_timer2worker(timer_event_t* te)
{
    return ap_queue_push_timer(worker_queue, te);
}

/*
 * Pre-condition: cs is neither in event_pollset nor a timeout queue
 * this function may only be called by the listener
 */

static apr_status_t push2worker(event_conn_state_t *cs, apr_socket_t *csd,
                                apr_pool_t *ptrans)
{
    apr_status_t rc;

    if (cs) {
        csd = cs->pfd.desc.s;
        ptrans = cs->p;
    }
    rc = ap_queue_push_socket(worker_queue, csd, cs, ptrans);
    if (rc != APR_SUCCESS) {
        ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, APLOGNO(00471)
                     "push2worker: ap_queue_push_socket failed");
        /* trash the connection; we couldn't queue the connected
         * socket to a worker
         */

        if (cs) {
            shutdown_connection(cs);
        }
        else {
            if (csd) {
                close_socket_nonblocking(csd);
            }
            if (ptrans) {
                ap_queue_info_push_pool(worker_queue_info, ptrans);
            }
        }
        signal_threads(ST_GRACEFUL);
    }

    return rc;
}

/* get_worker:
 *     If *have_idle_worker_p == 0, reserve a worker thread, and set
 *     *have_idle_worker_p = 1.
 *     If *have_idle_worker_p is already 1, will do nothing.
 *     If blocking == 1, block if all workers are currently busy.
 *     If no worker was available immediately, will set *all_busy to 1.
 *     XXX: If there are no workers, we should not block immediately but
 *     XXX: close all keep-alive connections first.
 */

static void get_worker(int *have_idle_worker_p, int blocking, int *all_busy)
{
    apr_status_t rc;

    if (*have_idle_worker_p) {
        /* already reserved a worker thread - must have hit a
         * transient error on a previous pass
         */

        return;
    }

    if (blocking)
        rc = ap_queue_info_wait_for_idler(worker_queue_info, all_busy);
    else
        rc = ap_queue_info_try_get_idler(worker_queue_info);

    if (rc == APR_SUCCESS || APR_STATUS_IS_EOF(rc)) {
        *have_idle_worker_p = 1;
    }
    else if (!blocking && rc == APR_EAGAIN) {
        *all_busy = 1;
    }
    else {
        ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, APLOGNO(00472)
                     "ap_queue_info_wait_for_idler failed. "
                     "Attempting to shutdown process gracefully");
        signal_threads(ST_GRACEFUL);
    }
}

/* Structures to reuse */
static timer_event_t timer_free_ring;

static apr_skiplist *timer_skiplist;
static volatile apr_time_t timers_next_expiry;

/* Same goal as for TIMEOUT_FUDGE_FACTOR (avoid extra poll calls), but applied
 * to timers. Since their timeouts are custom (user defined), we can't be too
 * approximative here (hence using 0.01s).
 */

#define EVENT_FUDGE_FACTOR apr_time_from_msec(10)

/* The following compare function is used by apr_skiplist_insert() to keep the
 * elements (timers) sorted and provide O(log n) complexity (this is also true
 * for apr_skiplist_{find,remove}(), but those are not used in MPM event where
 * inserted timers are not searched nor removed, but with apr_skiplist_pop()
 * which does use any compare function).  It is meant to return 0 when a == b,
 * <0 when a < b, and >0 when a > b.  However apr_skiplist_insert() will not
 * add duplicates (i.e. a == b), and apr_skiplist_add() is only available in
 * APR 1.6, yet multiple timers could possibly be created in the same micro-
 * second (duplicates with regard to apr_time_t); therefore we implement the
 * compare function to return +1 instead of 0 when compared timers are equal,
 * thus duplicates are still added after each other (in order of insertion).
 */

static int timer_comp(void *a, void *b)
{
    apr_time_t t1 = (apr_time_t) ((timer_event_t *)a)->when;
    apr_time_t t2 = (apr_time_t) ((timer_event_t *)b)->when;
    AP_DEBUG_ASSERT(t1);
    AP_DEBUG_ASSERT(t2);
    return ((t1 < t2) ? -1 : 1);
}

static apr_thread_mutex_t *g_timer_skiplist_mtx;

static apr_status_t event_register_timed_callback(apr_time_t t,
                                                  ap_mpm_callback_fn_t *cbfn,
                                                  void *baton)
{
    timer_event_t *te;
    /* oh yeah, and make locking smarter/fine grained. */
    apr_thread_mutex_lock(g_timer_skiplist_mtx);

    if (!APR_RING_EMPTY(&timer_free_ring.link, timer_event_t, link)) {
        te = APR_RING_FIRST(&timer_free_ring.link);
        APR_RING_REMOVE(te, link);
    }
    else {
        te = apr_skiplist_alloc(timer_skiplist, sizeof(timer_event_t));
        APR_RING_ELEM_INIT(te, link);
    }

    te->cbfunc = cbfn;
    te->baton = baton;
    /* XXXXX: optimize */
    te->when = t + apr_time_now();

    { 
        apr_time_t next_expiry;

        /* Okay, add sorted by when.. */
        apr_skiplist_insert(timer_skiplist, te);

        /* Cheaply update the global timers_next_expiry with this event's
         * if it expires before.
         */

        next_expiry = timers_next_expiry;
        if (!next_expiry || next_expiry > te->when + EVENT_FUDGE_FACTOR) {
            timers_next_expiry = te->when;
            /* Unblock the poll()ing listener for it to update its timeout. */
            if (listener_is_wakeable) {
                apr_pollset_wakeup(event_pollset);
            }
        }
    }

    apr_thread_mutex_unlock(g_timer_skiplist_mtx);

    return APR_SUCCESS;
}


/*
 * Flush data and close our side of the connection, then drain incoming data.
 * If the latter would block put the connection in one of the linger timeout
 * queues to be called back when ready, and repeat until it's closed by peer.
 * Only to be called in the worker thread, and since it's in immediate call
 * stack, we can afford a comfortable buffer size to consume data quickly.
 * Pre-condition: cs is not in any timeout queue and not in the pollset,
 *                timeout_mutex is not locked
 */

#define LINGERING_BUF_SIZE (32 * 1024)
static void process_lingering_close(event_conn_state_t *cs)
{
    apr_socket_t *csd = ap_get_conn_socket(cs->c);
    char dummybuf[LINGERING_BUF_SIZE];
    apr_size_t nbytes;
    apr_status_t rv;
    struct timeout_queue *q;

    ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c,
                  "lingering close from state %i", (int)cs->pub.state);
    AP_DEBUG_ASSERT(CONN_STATE_IS_LINGERING_CLOSE(cs->pub.state));

    if (!cs->linger_started) {
        cs->pub.state = CONN_STATE_LINGER;
        cs->linger_started = 1;

        /* defer_lingering_close() may have bumped lingering_count already */
        if (!cs->deferred_linger) {
            apr_atomic_inc32(&lingering_count);
        }

        apr_socket_timeout_set(csd, apr_time_from_sec(SECONDS_TO_LINGER));
        if (ap_start_lingering_close(cs->c)) {
            notify_suspend(cs);
            close_connection(cs);
            return;
        }
        
        /* All nonblocking from now, no need for APR_INCOMPLETE_READ either */
        apr_socket_timeout_set(csd, 0);
        apr_socket_opt_set(csd, APR_INCOMPLETE_READ, 0);

        /*
         * If some module requested a shortened waiting period, only wait for
         * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain
         * DoS attacks.
         */

        if (apr_table_get(cs->c->notes, "short-lingering-close")) {
            cs->pub.state = CONN_STATE_LINGER_SHORT;
        }
        else {
            cs->pub.state = CONN_STATE_LINGER_NORMAL;
        }
        cs->pub.sense = CONN_SENSE_DEFAULT;
        notify_suspend(cs);

        /* One timestamp/duration for the whole lingering close time.
         * XXX: This makes the (short_)linger_q not sorted/ordered by expiring
         * timeouts whenever multiple schedules are necessary (EAGAIN below),
         * but we probabaly don't care since these connections do not count
         * for connections_above_limit() and all of them will be killed when
         * busy or gracefully stopping anyway.
         */

        cs->queue_timestamp = apr_time_now();
    }

    do {
        nbytes = sizeof(dummybuf);
        rv = apr_socket_recv(csd, dummybuf, &nbytes);
    } while (rv == APR_SUCCESS);

    if (!APR_STATUS_IS_EAGAIN(rv)) {
        close_connection(cs);
        return;
    }

    /* (Re)queue the connection to come back when readable */
    update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ);
    q = (cs->pub.state == CONN_STATE_LINGER_SHORT) ? short_linger_q : linger_q;
    apr_thread_mutex_lock(timeout_mutex);
    TO_QUEUE_APPEND(q, cs);
    rv = apr_pollset_add(event_pollset, &cs->pfd);
    if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) {
        AP_DEBUG_ASSERT(0);
        TO_QUEUE_REMOVE(q, cs);
        apr_thread_mutex_unlock(timeout_mutex);
        ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03092)
                     "process_lingering_close: apr_pollset_add failure");
        close_connection(cs);
        signal_threads(ST_GRACEFUL);
        return;
    }
    apr_thread_mutex_unlock(timeout_mutex);
}

/* call 'func' for all elements of 'q' above 'expiry'.
 * Pre-condition: timeout_mutex must already be locked
 * Post-condition: timeout_mutex will be locked again
 */

static void process_timeout_queue(struct timeout_queue *q, apr_time_t expiry,
                                  int (*func)(event_conn_state_t *))
{
    apr_uint32_t total = 0, count;
    event_conn_state_t *first, *cs, *last;
    struct event_conn_state_t trash;
    struct timeout_queue *qp;
    apr_status_t rv;

    if (!*q->total) {
        return;
    }

    APR_RING_INIT(&trash.timeout_list, event_conn_state_t, timeout_list);
    for (qp = q; qp; qp = qp->next) {
        count = 0;
        cs = first = last = APR_RING_FIRST(&qp->head);
        while (cs != APR_RING_SENTINEL(&qp->head, event_conn_state_t,
                                       timeout_list)) {
            /* Trash the entry if:
             * - no expiry was given (zero means all), or
             * - it expired (according to the queue timeout), or
             * - the system clock skewed in the past: no entry should be
             *   registered above the given expiry (~now) + the queue
             *   timeout, we won't keep any here (eg. for centuries).
             *
             * Otherwise stop, no following entry will match thanks to the
             * single timeout per queue (entries are added to the end!).
             * This allows maintenance in O(1).
             */

            if (expiry && cs->queue_timestamp + qp->timeout > expiry
                       && cs->queue_timestamp < expiry + qp->timeout) {
                /* Since this is the next expiring entry of this queue, update
                 * the global queues_next_expiry if it's later than this one.
                 */

                apr_time_t elem_expiry = cs->queue_timestamp + qp->timeout;
                apr_time_t next_expiry = queues_next_expiry;
                if (!next_expiry
                        || next_expiry > elem_expiry + TIMEOUT_FUDGE_FACTOR) {
                    queues_next_expiry = elem_expiry;
                }
                break;
            }

            last = cs;
            rv = apr_pollset_remove(event_pollset, &cs->pfd);
            if (rv != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rv)) {
                AP_DEBUG_ASSERT(0);
                ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, cs->c, APLOGNO(00473)
                              "apr_pollset_remove failed");
            }
            cs = APR_RING_NEXT(cs, timeout_list);
            count++;
        }
        if (!count)
            continue;

        APR_RING_UNSPLICE(first, last, timeout_list);
        APR_RING_SPLICE_TAIL(&trash.timeout_list, first, last, event_conn_state_t,
                             timeout_list);
        AP_DEBUG_ASSERT(*q->total >= count && qp->count >= count);
        *q->total -= count;
        qp->count -= count;
        total += count;
    }
    if (!total)
        return;

    apr_thread_mutex_unlock(timeout_mutex);
    first = APR_RING_FIRST(&trash.timeout_list);
    do {
        cs = APR_RING_NEXT(first, timeout_list);
        TO_QUEUE_ELEM_INIT(first);
        func(first);
        first = cs;
    } while (--total);
    apr_thread_mutex_lock(timeout_mutex);
}

static void process_keepalive_queue(apr_time_t expiry)
{
    /* If all workers are busy, we kill older keep-alive connections so
     * that they may connect to another process.
     */

    if (!expiry && *keepalive_q->total) {
        ap_log_error(APLOG_MARK, APLOG_TRACE1, 0, ap_server_conf,
                     "All workers are busy or dying, will shutdown %u "
                     "keep-alive connections", *keepalive_q->total);
    }
    process_timeout_queue(keepalive_q, expiry, shutdown_connection);
}

static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy)
{
    apr_status_t rc;
    proc_info *ti = dummy;
    int process_slot = ti->pslot;
    struct process_score *ps = ap_get_scoreboard_process(process_slot);
    int closed = 0;
    int have_idle_worker = 0;
    apr_time_t last_log;

    last_log = apr_time_now();
    free(ti);

    /* Unblock the signal used to wake this thread up, and set a handler for
     * it.
     */

    apr_signal(LISTENER_SIGNAL, dummy_signal_handler);
    unblock_signal(LISTENER_SIGNAL);

    for (;;) {
        timer_event_t *te;
        const apr_pollfd_t *out_pfd;
        apr_int32_t num = 0;
        apr_interval_time_t timeout;
        apr_time_t now, expiry = -1;
        int workers_were_busy = 0;

        if (conns_this_child <= 0)
            check_infinite_requests();

        if (listener_may_exit) {
            int first_close = close_listeners(&closed);

            if (terminate_mode == ST_UNGRACEFUL
                || apr_atomic_read32(&connection_count) == 0)
                break;

            /* Don't wait in poll() for the first close (i.e. dying now), we
             * want to maintain the queues and schedule defer_linger_chain ASAP
             * to kill kept-alive connection and shutdown the workers and child
             * faster.
             */

            if (first_close) {
                goto do_maintenance; /* with expiry == -1 */
            }
        }

        now = apr_time_now();
        if (APLOGtrace6(ap_server_conf)) {
            /* trace log status every second */
            if (now - last_log > apr_time_from_sec(1)) {
                ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf,
                             "connections: %u (waitio:%u write-completion:%u"
                             "keep-alive:%u lingering:%u suspended:%u clogged:%u), "
                             "workers: %u/%u shutdown",
                             apr_atomic_read32(&connection_count),
                             apr_atomic_read32(waitio_q->total),
                             apr_atomic_read32(write_completion_q->total),
                             apr_atomic_read32(keepalive_q->total),
                             apr_atomic_read32(&lingering_count),
                             apr_atomic_read32(&suspended_count),
                             apr_atomic_read32(&clogged_count),
                             apr_atomic_read32(&threads_shutdown),
                             threads_per_child);
                last_log = now;
            }
        }

        /* Start with an infinite poll() timeout and update it according to
         * the next expiring timer or queue entry. If there are none, either
         * the listener is wakeable and it can poll() indefinitely until a wake
         * up occurs, otherwise periodic checks (maintenance, shutdown, ...)
         * must be performed.
         */

        now = apr_time_now();
        timeout = -1;

        /* Push expired timers to a worker, the first remaining one determines
         * the maximum time to poll() below, if any.
         */

        expiry = timers_next_expiry;
        if (expiry && expiry < now) {
            apr_thread_mutex_lock(g_timer_skiplist_mtx);
            while ((te = apr_skiplist_peek(timer_skiplist))) {
                if (te->when > now) {
                    timers_next_expiry = te->when;
                    timeout = te->when - now;
                    break;
                }
                apr_skiplist_pop(timer_skiplist, NULL);
                push_timer2worker(te);
            }
            if (!te) {
                timers_next_expiry = 0;
            }
            apr_thread_mutex_unlock(g_timer_skiplist_mtx);
        }

        /* Same for queues, use their next expiry, if any. */
        expiry = queues_next_expiry;
        if (expiry
                && (timeout < 0
                    || expiry <= now
                    || timeout > expiry - now)) {
            timeout = expiry > now ? expiry - now : 0;
        }

        /* When non-wakeable, don't wait more than 100 ms, in any case. */
#define NON_WAKEABLE_POLL_TIMEOUT apr_time_from_msec(100)
        if (!listener_is_wakeable
                && (timeout < 0
                    || timeout > NON_WAKEABLE_POLL_TIMEOUT)) {
            timeout = NON_WAKEABLE_POLL_TIMEOUT;
        }
        else if (timeout > 0) {
            /* apr_pollset_poll() might round down the timeout to milliseconds,
             * let's forcibly round up here to never return before the timeout.
             */

            timeout = apr_time_from_msec(
                apr_time_as_msec(timeout + apr_time_from_msec(1) - 1)
            );
        }

        ap_log_error(APLOG_MARK, APLOG_TRACE7, 0, ap_server_conf,
                     "polling with timeout=%" APR_TIME_T_FMT
                     " queues_timeout=%" APR_TIME_T_FMT
                     " timers_timeout=%" APR_TIME_T_FMT,
                     timeout, queues_next_expiry - now,
                     timers_next_expiry - now);

        rc = apr_pollset_poll(event_pollset, timeout, &num, &out_pfd);
        if (rc != APR_SUCCESS) {
            if (!APR_STATUS_IS_EINTR(rc) && !APR_STATUS_IS_TIMEUP(rc)) {
                ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf,
                             "apr_pollset_poll failed. Attempting to "
                             "shutdown process gracefully");
                signal_threads(ST_GRACEFUL);
            }
            num = 0;
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=94 H=93 G=93

¤ Dauer der Verarbeitung: 0.26 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge