/* * Interact with Ceph monitor cluster. Handle requests for new map * versions, and periodically resend as needed. Also implement * statfs() and umount(). * * A small cluster of Ceph "monitors" are responsible for managing critical * cluster configuration and state information. An odd number (e.g., 3, 5) * of cmon daemons use a modified version of the Paxos part-time parliament * algorithm to manage the MDS map (mds cluster membership), OSD map, and * list of clients who have mounted the file system. * * We maintain an open, active session with a monitor at all times in order to * receive timely MDSMap updates. We periodically send a keepalive byte on the * TCP socket to ensure we detect a failure. If the connection does break, we * randomly hunt for a new monitor. Once the connection is reestablished, we * resend any outstanding requests.
*/
/* * Pick a new monitor at random and set cur_mon. If we are repicking * (i.e. cur_mon is already set), be sure to pick a different one.
*/ staticvoid pick_new_mon(struct ceph_mon_client *monc)
{ int old_mon = monc->cur_mon;
BUG_ON(monc->monmap->num_mon < 1);
if (monc->monmap->num_mon == 1) {
monc->cur_mon = 0;
} else { int max = monc->monmap->num_mon; int o = -1; int n;
if (monc->cur_mon >= 0) { if (monc->cur_mon < monc->monmap->num_mon)
o = monc->cur_mon; if (o >= 0)
max--;
}
n = get_random_u32_below(max); if (o >= 0 && n >= o)
n++;
monc->cur_mon = n;
}
dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
monc->cur_mon, monc->monmap->num_mon);
}
/* * Open a session with a new monitor.
*/ staticvoid __open_session(struct ceph_mon_client *monc)
{ int ret;
pick_new_mon(monc);
monc->hunting = true; if (monc->had_a_connection) {
monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
}
/* * Queue a keepalive to ensure that in case of an early fault * the messenger doesn't put us into STANDBY state and instead * retries. This also ensures that our timestamp is valid by * the time we finish hunting and delayed_work() checks it.
*/
ceph_con_keepalive(&monc->con); if (ceph_msgr2(monc->client)) {
monc->pending_auth = 1; return;
}
/* * Send subscribe request for one or more maps, according to * monc->subs.
*/ staticvoid __send_subscribe(struct ceph_mon_client *monc)
{ struct ceph_msg *msg = monc->m_subscribe; void *p = msg->front.iov_base; void *const end = p + msg->front_alloc_len; int num = 0; int i;
dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
BUG_ON(monc->cur_mon < 0);
if (!monc->sub_renew_sent)
monc->sub_renew_sent = jiffies | 1; /* never 0 */
msg->hdr.version = cpu_to_le16(2);
for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { if (monc->subs[i].want)
num++;
}
BUG_ON(num < 1); /* monmap sub is always there */
ceph_encode_32(&p, num); for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { char buf[32]; int len;
if (!monc->subs[i].want) continue;
len = sprintf(buf, "%s", ceph_sub_str[i]); if (i == CEPH_SUB_MDSMAP &&
monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
if (msg->front.iov_len < sizeof(*h)) goto bad;
seconds = le32_to_cpu(h->duration);
mutex_lock(&monc->mutex); if (monc->sub_renew_sent) { /* * This is only needed for legacy (infernalis or older) * MONs -- see delayed_work().
*/
monc->sub_renew_after = monc->sub_renew_sent +
(seconds >> 1) * HZ - 1;
dout("%s sent %lu duration %d renew after %lu\n", __func__,
monc->sub_renew_sent, seconds, monc->sub_renew_after);
monc->sub_renew_sent = 0;
} else {
dout("%s sent %lu renew after %lu, ignoring\n", __func__,
monc->sub_renew_sent, monc->sub_renew_after);
}
mutex_unlock(&monc->mutex); return;
bad:
pr_err("got corrupt subscribe-ack msg\n");
ceph_msg_dump(msg);
}
/* * Register interest in a map * * @sub: one of CEPH_SUB_* * @epoch: X for "every map since X", or 0 for "just the latest"
*/ staticbool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
u32 epoch, bool continuous)
{
__le64 start = cpu_to_le64(epoch);
u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
/* * Keep track of which maps we have * * @sub: one of CEPH_SUB_*
*/ staticvoid __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
u32 epoch)
{
dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
if (monc->subs[sub].want) { if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
monc->subs[sub].want = false; else
monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
}
/* * Wait for an osdmap with a given epoch. * * @epoch: epoch to wait for * @timeout: in jiffies, 0 means "wait forever"
*/ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, unsignedlong timeout)
{ unsignedlong started = jiffies; long ret;
mutex_lock(&monc->mutex); while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
mutex_unlock(&monc->mutex);
if (timeout && time_after_eq(jiffies, started + timeout)) return -ETIMEDOUT;
ret = wait_event_interruptible_timeout(monc->client->auth_wq,
monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
ceph_timeout_jiffies(timeout)); if (ret < 0) return ret;
/* * Open a session with a random monitor. Request monmap and osdmap, * which are waited upon in __ceph_open_session().
*/ int ceph_monc_open_session(struct ceph_mon_client *monc)
{
mutex_lock(&monc->mutex);
__ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
__open_session(monc);
__schedule_delayed(monc);
mutex_unlock(&monc->mutex); return 0;
}
EXPORT_SYMBOL(ceph_monc_open_session);
mutex_lock(&monc->mutex);
req = lookup_generic_request(&monc->generic_request_tree, tid); if (!req) {
dout("get_generic_reply %lld dne\n", tid);
*skip = 1;
m = NULL;
} else {
dout("get_generic_reply %lld got %p\n", tid, req->reply);
*skip = 0;
m = ceph_msg_get(req->reply); /* * we don't need to track the connection reading into * this reply because we only have one open connection * at a time, ever.
*/
}
mutex_unlock(&monc->mutex); return m;
}
/* * Send MMonGetVersion and wait for the reply. * * @what: one of "mdsmap", "osdmap" or "monmap"
*/ int ceph_monc_get_version(struct ceph_mon_client *monc, constchar *what,
u64 *newest)
{ struct ceph_mon_generic_request *req; int ret;
req = __ceph_monc_get_version(monc, what, NULL, 0); if (IS_ERR(req)) return PTR_ERR(req);
ret = wait_generic_request(req); if (!ret)
*newest = req->u.newest;
int ceph_monc_blocklist_add(struct ceph_mon_client *monc, struct ceph_entity_addr *client_addr)
{ int ret;
ret = do_mon_command(monc, "{ \"prefix\": \"osd blocklist\", \
\"blocklistop\": \"add\", \
\"addr\": \"%pISpc/%u\" }",
&client_addr->in_addr,
le32_to_cpu(client_addr->nonce)); if (ret == -EINVAL) { /* * The monitor returns EINVAL on an unrecognized command. * Try the legacy command -- it is exactly the same except * for the name.
*/
ret = do_mon_command(monc, "{ \"prefix\": \"osd blacklist\", \
\"blacklistop\": \"add\", \
\"addr\": \"%pISpc/%u\" }",
&client_addr->in_addr,
le32_to_cpu(client_addr->nonce));
} if (ret) return ret;
/* * Make sure we have the osdmap that includes the blocklist * entry. This is needed to ensure that the OSDs pick up the * new blocklist before processing any future requests from * this client.
*/ return ceph_wait_for_latest_osdmap(monc->client, 0);
}
EXPORT_SYMBOL(ceph_monc_blocklist_add);
/* * Delayed work. If we haven't mounted yet, retry. Otherwise, * renew/retry subscription as needed (in case it is timing out, or we * got an ENOMEM). And keep the monitor connection alive.
*/ staticvoid delayed_work(struct work_struct *work)
{ struct ceph_mon_client *monc =
container_of(work, struct ceph_mon_client, delayed_work.work);
/* * flush msgr queue before we destroy ourselves to ensure that: * - any work that references our embedded con is finished. * - any osd_client or other work that may reference an authorizer * finishes before we shut down the auth subsystem.
*/
ceph_msgr_flush();
staticint __validate_auth(struct ceph_mon_client *monc)
{ int ret;
if (monc->pending_auth) return 0;
ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
monc->m_auth->front_alloc_len); if (ret <= 0) return ret; /* either an error, or no need to authenticate */
__send_prepared_auth_request(monc, ret); return 0;
}
int ceph_monc_validate_auth(struct ceph_mon_client *monc)
{ int ret;
mutex_lock(&monc->mutex);
ret = __validate_auth(monc);
mutex_unlock(&monc->mutex); return ret;
}
EXPORT_SYMBOL(ceph_monc_validate_auth);
staticint mon_get_auth_request(struct ceph_connection *con, void *buf, int *buf_len, void **authorizer, int *authorizer_len)
{ struct ceph_mon_client *monc = con->private; int ret;
mutex_lock(&monc->mutex);
ret = ceph_auth_get_request(monc->auth, buf, *buf_len);
mutex_unlock(&monc->mutex); if (ret < 0) return ret;
/* * Allocate memory for incoming message
*/ staticstruct ceph_msg *mon_alloc_msg(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip)
{ struct ceph_mon_client *monc = con->private; int type = le16_to_cpu(hdr->type); int front_len = le32_to_cpu(hdr->front_len); struct ceph_msg *m = NULL;
*skip = 0;
switch (type) { case CEPH_MSG_MON_SUBSCRIBE_ACK:
m = ceph_msg_get(monc->m_subscribe_ack); break; case CEPH_MSG_STATFS_REPLY: case CEPH_MSG_MON_COMMAND_ACK: return get_generic_reply(con, hdr, skip); case CEPH_MSG_AUTH_REPLY:
m = ceph_msg_get(monc->m_auth_reply); break; case CEPH_MSG_MON_GET_VERSION_REPLY: if (le64_to_cpu(hdr->tid) != 0) return get_generic_reply(con, hdr, skip);
/* * Older OSDs don't set reply tid even if the original * request had a non-zero tid. Work around this weirdness * by allocating a new message.
*/
fallthrough; case CEPH_MSG_MON_MAP: case CEPH_MSG_MDS_MAP: case CEPH_MSG_OSD_MAP: case CEPH_MSG_FS_MAP_USER:
m = ceph_msg_new(type, front_len, GFP_NOFS, false); if (!m) return NULL; /* ENOMEM--return skip == 0 */ break;
}
if (!m) {
pr_info("alloc_msg unknown type %d\n", type);
*skip = 1;
} elseif (front_len > m->front_alloc_len) {
pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
front_len, m->front_alloc_len,
(unsignedint)con->peer_name.type,
le64_to_cpu(con->peer_name.num));
ceph_msg_put(m);
m = ceph_msg_new(type, front_len, GFP_NOFS, false);
}
return m;
}
/* * If the monitor connection resets, pick a new monitor and resubmit * any pending requests.
*/ staticvoid mon_fault(struct ceph_connection *con)
{ struct ceph_mon_client *monc = con->private;
mutex_lock(&monc->mutex);
dout("%s mon%d\n", __func__, monc->cur_mon); if (monc->cur_mon >= 0) { if (!monc->hunting) {
dout("%s hunting for new mon\n", __func__);
reopen_session(monc);
__schedule_delayed(monc);
} else {
dout("%s already hunting\n", __func__);
}
}
mutex_unlock(&monc->mutex);
}
/* * We can ignore refcounting on the connection struct, as all references * will come from the messenger workqueue, which is drained prior to * mon_client destruction.
*/ staticstruct ceph_connection *mon_get_con(struct ceph_connection *con)
{ return con;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.