/* Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License.
*/
if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02068) "Failed to create listening socket."); return rv;
}
rv = apr_socket_opt_set(ctx->sock, APR_SO_REUSEADDR, 1); if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02069) "Failed to set APR_SO_REUSEADDR to 1 on socket."); return rv;
}
rv = apr_socket_opt_set(ctx->sock, APR_SO_NONBLOCK, 1); if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02070) "Failed to set APR_SO_NONBLOCK to 1 on socket."); return rv;
}
rv = apr_socket_bind(ctx->sock, ctx->mcast_addr); if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02071) "Failed to bind on socket."); return rv;
}
if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02072) "Failed to join multicast group"); return rv;
}
rv = apr_mcast_loopback(ctx->sock, 1); if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02073) "Failed to accept localhost mulitcast on socket."); return rv;
}
return APR_SUCCESS;
}
/* XXX: The same exists in mod_lbmethod_heartbeat.c where it is named argstr_to_table */ staticvoid qs_to_table(constchar *input, apr_table_t *parms,
apr_pool_t *p)
{ char *key; char *value; char *query_string; char *strtok_state;
if (input == NULL) { return;
}
query_string = apr_pstrdup(p, input);
key = apr_strtok(query_string, "&", &strtok_state); while (key) {
value = strchr(key, '='); if (value) {
*value = '\0'; /* Split the string in two */
value++; /* Skip passed the = */
} else {
value = "1";
}
ap_unescape_url(key);
ap_unescape_url(value);
apr_table_set(parms, key, value); /* ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03182) "Found query arg: %s = %s", key, value);
*/
key = apr_strtok(NULL, "&", &strtok_state);
}
}
if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02076) "Unable to read from file: %s", ctx->storage_path); return rv;
}
apr_brigade_flatten(tmpbb, buf, &bsize); if (bsize == 0) { break;
}
buf[bsize - 1] = 0;
t = strchr(buf, ' '); if (t) {
ip = apr_pstrmemdup(pool, buf, t - buf);
} else {
ip = NULL;
}
if (!ip || buf[0] == '#') { /* copy things we can't process */
apr_file_printf(fp, "%s\n", buf);
} elseif (strcmp(ip, s->ip) != 0 ) {
hm_server_t node;
apr_time_t seen; constchar *val;
/* Update seen time according to the last file modification */
apr_table_clear(hbt);
qs_to_table(apr_pstrdup(pool, t), hbt, pool); if ((val = apr_table_get(hbt, "busy"))) {
node.busy = atoi(val);
} else {
node.busy = 0;
}
if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02081) "Unable to open tmp file: %s", path); return rv;
}
now = apr_time_now(); for (hi = apr_hash_first(p, ctx->servers);
hi != NULL; hi = apr_hash_next(hi)) {
hm_server_t *s = NULL;
apr_time_t seen;
apr_hash_this(hi, NULL, NULL, (void **) &s);
seen = apr_time_sec(now - s->seen); if (seen > SEEN_TIMEOUT) { /* * Skip this entry from the heartbeat file -- when it comes back, * we will reuse the memory...
*/
} else {
apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
s->ip, s->ready, s->busy, (unsignedint) seen, s->port);
}
}
rv = apr_file_flush(fp); if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02082) "Unable to flush file: %s", path); return rv;
}
rv = apr_file_close(fp); if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02083) "Unable to close file: %s", path); return rv;
}
rv = apr_file_perms_set(path,
APR_FPROT_UREAD | APR_FPROT_GREAD |
APR_FPROT_WREAD); if (rv && rv != APR_INCOMPLETE && rv != APR_ENOTIMPL) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02084) "Unable to set file permissions on %s",
path); return rv;
}
rv = apr_file_rename(path, ctx->storage_path, p);
if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02085) "Unable to move file: %s -> %s", path,
ctx->storage_path); return rv;
}
return APR_SUCCESS;
} /* Store in a slotmem */ static apr_status_t hm_slotmem_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
{
apr_status_t rv;
apr_time_t now;
apr_hash_index_t *hi;
now = apr_time_now(); for (hi = apr_hash_first(p, ctx->servers);
hi != NULL; hi = apr_hash_next(hi)) {
hm_server_t *s = NULL;
apr_time_t seen;
apr_hash_this(hi, NULL, NULL, (void **) &s);
seen = apr_time_sec(now - s->seen); if (seen > SEEN_TIMEOUT) { /* remove it */
rv = hm_slotmem_remove_stat(s, p);
} else { /* update it */
rv = hm_slotmem_update_stat(s, p);
} if (rv !=APR_SUCCESS) return rv;
} return APR_SUCCESS;
} /* Store/update the stats */ static apr_status_t hm_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
{ if (slotmem) return hm_slotmem_update_stats(ctx, p); else return hm_file_update_stats(ctx, p);
}
s = apr_hash_get(ctx->servers, ip, APR_HASH_KEY_STRING);
if (s == NULL) {
s = apr_palloc(ctx->p, sizeof(hm_server_t));
s->ip = apr_pstrdup(ctx->p, ip);
s->port = port;
s->ready = 0;
s->busy = 0;
s->seen = 0;
apr_hash_set(ctx->servers, s->ip, APR_HASH_KEY_STRING, s);
}
return s;
}
/* Process a message received from a backend node */ staticvoid hm_processmsg(hm_ctx_t *ctx, apr_pool_t *p,
apr_sockaddr_t *from, char *buf, apr_size_t len)
{
apr_table_t *tbl;
buf[len] = '\0';
tbl = apr_table_make(p, 10);
qs_to_table(buf, tbl, p);
if (apr_table_get(tbl, "v") != NULL &&
apr_table_get(tbl, "busy") != NULL &&
apr_table_get(tbl, "ready") != NULL) { char *ip; int port = 80;
hm_server_t *s; /* TODO: REMOVE ME BEFORE PRODUCTION (????) */
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s, APLOGNO(02086) "%pI busy=%s ready=%s", from,
apr_table_get(tbl, "busy"), apr_table_get(tbl, "ready"));
apr_sockaddr_ip_get(&ip, from);
if (apr_table_get(tbl, "port") != NULL)
port = atoi(apr_table_get(tbl, "port"));
switch (state) { case AP_WATCHDOG_STATE_STARTING:
rv = hm_listen(ctx); if (rv) {
ctx->status = rv;
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02090) "Unable to listen for connections!");
} else {
ctx->keep_running = 1;
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s, APLOGNO(02091) "%s listener started.",
HM_WATHCHDOG_NAME);
} break; case AP_WATCHDOG_STATE_RUNNING: /* store in the slotmem or in the file depending on configuration */
hm_update_stats(ctx, pool);
cur = now = apr_time_sec(apr_time_now());
while ((now - cur) < apr_time_sec(ctx->interval)) { int n;
apr_status_t rc;
apr_pool_t *p;
apr_pollfd_t pfd;
apr_interval_time_t timeout;
/* Create the slotmem */ if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_CONFIG) { /* this is the real thing */ if (maxworkers) {
storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shm",
AP_SLOTMEM_PROVIDER_VERSION); if (!storage) {
ap_log_error(APLOG_MARK, APLOG_EMERG, 0, s, APLOGNO(02284) "failed to lookup provider 'shm' for '%s', " "maybe you need to load mod_slotmem_shm?",
AP_SLOTMEM_PROVIDER_GROUP); return !OK;
}
storage->create(&slotmem, "mod_heartmonitor", sizeof(hm_slot_server_t), maxworkers, AP_SLOTMEM_TYPE_PREGRAB, p); if (!slotmem) {
ap_log_error(APLOG_MARK, APLOG_EMERG, 0, s, APLOGNO(02285) "slotmem_create for status failed"); return !OK;
}
}
}
if (!ctx->active) { return OK;
}
rv = hm_watchdog_get_instance(&ctx->watchdog,
HM_WATHCHDOG_NAME,
0, 1, p); if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s, APLOGNO(02094) "Failed to create watchdog instance (%s)",
HM_WATHCHDOG_NAME); return !OK;
} /* Register a callback with zero interval. */
rv = hm_watchdog_register_callback(ctx->watchdog,
0,
ctx,
hm_watchdog_callback); if (rv) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s, APLOGNO(02095) "Failed to register watchdog callback (%s)",
HM_WATHCHDOG_NAME); return !OK;
}
ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, APLOGNO(02096) "wd callback %s", HM_WATHCHDOG_NAME); return OK;
}
maxworkers = atoi(data); if (maxworkers != 0 && maxworkers < 10) return"HeartbeatMaxServers: Should be 0 for file storage, " "or greater or equal than 10 for slotmem";
return NULL;
}
staticconst command_rec hm_cmds[] = {
AP_INIT_TAKE1("HeartbeatListen", cmd_hm_listen, NULL, RSRC_CONF, "Address to listen for heartbeat requests"),
AP_INIT_TAKE1("HeartbeatStorage", cmd_hm_storage, NULL, RSRC_CONF, "Path to store heartbeat data."),
AP_INIT_TAKE1("HeartbeatMaxServers", cmd_hm_maxworkers, NULL, RSRC_CONF, "Max number of servers when using slotmem (instead file) to store heartbeat data."),
{NULL}
};
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.