/* * A cluster of MDS (metadata server) daemons is responsible for * managing the file system namespace (the directory hierarchy and * inodes) and for coordinating shared access to storage. Metadata is * partitioning hierarchically across a number of servers, and that * partition varies over time as the cluster adjusts the distribution * in order to balance load. * * The MDS client is primarily responsible to managing synchronous * metadata requests for operations like open, unlink, and so forth. * If there is a MDS failure, we find out about it when we (possibly * request and) receive a new MDS map, and can resubmit affected * requests. * * For the most part, though, we take advantage of a lossless * communications channel to the MDS, and do not need to worry about * timing out or resubmitting requests. * * We maintain a stateful "session" with each MDS we interact with. * Within each session, we sent periodic heartbeat messages to ensure * any capabilities or leases we have been issues remain valid. If * the session times out and goes stale, our leases and capabilities * are no longer valid.
*/
/* * parse a normal reply, which may contain a (dir+)dentry and/or a * target inode.
*/ staticint parse_reply_info_trace(void **p, void *end, struct ceph_mds_reply_info_parsed *info,
u64 features)
{ int err;
if (info->head->is_dentry) {
err = parse_reply_info_in(p, end, &info->diri, features); if (err < 0) goto out_bad;
/* * Try to dencrypt the dentry names and update them * in the ceph_mds_reply_dir_entry struct.
*/
fname.dir = inode;
fname.name = _name;
fname.name_len = _name_len;
fname.ctext = altname;
fname.ctext_len = altname_len; /* * The _name_len maybe larger than altname_len, such as * when the human readable name length is in range of * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), * then the copy in ceph_fname_to_usr will corrupt the * data if there has no encryption key. * * Just set the no_copy flag and then if there has no * encryption key the oname.name will be assigned to * _name always.
*/
fname.no_copy = true; if (altname_len == 0) { /* * Set tname to _name, and this will be used * to do the base64_decode in-place. It's * safe because the decoded string should * always be shorter, which is 3/4 of origin * string.
*/
tname.name = _name;
/* * Set oname to _name too, and this will be * used to do the dencryption in-place.
*/
oname.name = _name;
oname.len = _name_len;
} else { /* * This will do the decryption only in-place * from altname cryptext directly.
*/
oname.name = altname;
oname.len = altname_len;
}
rde->is_nokey = false;
err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); if (err) {
pr_err_client(cl, "unable to decode %.*s, got %d\n",
_name_len, _name, err); goto out_bad;
}
rde->name = oname.name;
rde->name_len = oname.len;
/* inode */
err = parse_reply_info_in(p, end, &rde->inode, features); if (err < 0) goto out_bad; /* ceph_readdir_prepopulate() will update it */
rde->offset = 0;
i++;
num--;
}
done: /* Skip over any unrecognized fields */
*p = end; return 0;
/* * In async unlink case the kclient won't wait for the first reply * from MDS and just drop all the links and unhash the dentry and then * succeeds immediately. * * For any new create/link/rename,etc requests followed by using the * same file names we must wait for the first reply of the inflight * unlink request, or the MDS possibly will fail these following * requests with -EEXIST if the inflight async unlink request was * delayed for some reasons. * * And the worst case is that for the none async openc request it will * successfully open the file if the CDentry hasn't been unlinked yet, * but later the previous delayed async unlink request will remove the * CDentry. That means the just created file is possibly deleted later * by accident. * * We need to wait for the inflight async unlink requests to finish * when creating new files/directories by using the same file names.
*/ int ceph_wait_on_conflict_unlink(struct dentry *dentry)
{ struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); struct ceph_client *cl = fsc->client; struct dentry *pdentry = dentry->d_parent; struct dentry *udentry, *found = NULL; struct ceph_dentry_info *di; struct qstr dname;
u32 hash = dentry->d_name.hash; int err;
/* * sessions
*/ constchar *ceph_session_state_name(int s)
{ switch (s) { case CEPH_MDS_SESSION_NEW: return"new"; case CEPH_MDS_SESSION_OPENING: return"opening"; case CEPH_MDS_SESSION_OPEN: return"open"; case CEPH_MDS_SESSION_HUNG: return"hung"; case CEPH_MDS_SESSION_CLOSING: return"closing"; case CEPH_MDS_SESSION_CLOSED: return"closed"; case CEPH_MDS_SESSION_RESTARTING: return"restarting"; case CEPH_MDS_SESSION_RECONNECTING: return"reconnecting"; case CEPH_MDS_SESSION_REJECTED: return"rejected"; default: return"???";
}
}
/* * create+register a new session for given mds. * called under mdsc->mutex.
*/ staticstruct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, int mds)
{ struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_session *s;
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) return ERR_PTR(-EIO);
if (mds >= mdsc->mdsmap->possible_max_rank) return ERR_PTR(-EINVAL);
s = kzalloc(sizeof(*s), GFP_NOFS); if (!s) return ERR_PTR(-ENOMEM);
if (mds >= mdsc->max_sessions) { int newmax = 1 << get_count_order(mds + 1); struct ceph_mds_session **sa;
doutc(cl, "realloc to %d\n", newmax);
sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); if (!sa) goto fail_realloc; if (mdsc->sessions) {
memcpy(sa, mdsc->sessions,
mdsc->max_sessions * sizeof(void *));
kfree(mdsc->sessions);
}
mdsc->sessions = sa;
mdsc->max_sessions = newmax;
}
/* * drop session refs in request. * * should be last request ref, or hold mdsc->mutex
*/ staticvoid put_request_session(struct ceph_mds_request *req)
{ if (req->r_session) {
ceph_put_mds_session(req->r_session);
req->r_session = NULL;
}
}
void ceph_mdsc_release_request(struct kref *kref)
{ struct ceph_mds_request *req = container_of(kref, struct ceph_mds_request,
r_kref);
ceph_mdsc_release_dir_caps_async(req);
destroy_reply_info(&req->r_reply_info); if (req->r_request)
ceph_msg_put(req->r_request); if (req->r_reply)
ceph_msg_put(req->r_reply); if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
iput(req->r_inode);
} if (req->r_parent) {
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
iput(req->r_parent);
}
iput(req->r_target_inode);
iput(req->r_new_inode); if (req->r_dentry)
dput(req->r_dentry); if (req->r_old_dentry)
dput(req->r_old_dentry); if (req->r_old_dentry_dir) { /* * track (and drop pins for) r_old_dentry_dir * separately, since r_old_dentry's d_parent may have * changed between the dir mutex being dropped and * this request being freed.
*/
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
iput(req->r_old_dentry_dir);
}
kfree(req->r_path1);
kfree(req->r_path2);
put_cred(req->r_cred); if (req->r_mnt_idmap)
mnt_idmap_put(req->r_mnt_idmap); if (req->r_pagelist)
ceph_pagelist_release(req->r_pagelist);
kfree(req->r_fscrypt_auth);
kfree(req->r_altname);
put_request_session(req);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
WARN_ON_ONCE(!list_empty(&req->r_wait));
kmem_cache_free(ceph_mds_request_cachep, req);
}
/* * lookup session, bump ref if found. * * called under mdsc->mutex.
*/ staticstruct ceph_mds_request *
lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
{ struct ceph_mds_request *req;
req = lookup_request(&mdsc->request_tree, tid); if (req)
ceph_mdsc_get_request(req);
return req;
}
/* * Register an in-flight request, and assign a tid. Link to directory * are modifying (if any). * * Called under mdsc->mutex.
*/ staticvoid __register_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, struct inode *dir)
{ struct ceph_client *cl = mdsc->fsc->client; int ret = 0;
req->r_tid = ++mdsc->last_tid; if (req->r_num_caps) {
ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
req->r_num_caps); if (ret < 0) {
pr_err_client(cl, "%p failed to reserve caps: %d\n",
req, ret); /* set req->r_err to fail early from __do_request */
req->r_err = ret; return;
}
}
doutc(cl, "%p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
insert_request(&mdsc->request_tree, req);
req->r_cred = get_current_cred(); if (!req->r_mnt_idmap)
req->r_mnt_idmap = &nop_mnt_idmap;
if (req->r_unsafe_dir) {
iput(req->r_unsafe_dir);
req->r_unsafe_dir = NULL;
}
complete_all(&req->r_safe_completion);
ceph_mdsc_put_request(req);
}
/* * Walk back up the dentry tree until we hit a dentry representing a * non-snapshot inode. We do this using the rcu_read_lock (which must be held * when calling this) to ensure that the objects won't disappear while we're * working with them. Once we hit a candidate dentry, we attempt to take a * reference to it, and return that as the result.
*/ staticstruct inode *get_nonsnap_parent(struct dentry *dentry)
{ struct inode *inode = NULL;
while (dentry && !IS_ROOT(dentry)) {
inode = d_inode_rcu(dentry); if (!inode || ceph_snap(inode) == CEPH_NOSNAP) break;
dentry = dentry->d_parent;
} if (inode)
inode = igrab(inode); return inode;
}
/* * Choose mds to send request to next. If there is a hint set in the * request (e.g., due to a prior forward hint from the mds), use that. * Otherwise, consult frag tree and/or caps to identify the * appropriate mds. If all else fails, choose randomly. * * Called under mdsc->mutex.
*/ staticint __choose_mds(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, bool *random)
{ struct inode *inode; struct ceph_inode_info *ci; struct ceph_cap *cap; int mode = req->r_direct_mode; int mds = -1;
u32 hash = req->r_direct_hash; bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); struct ceph_client *cl = mdsc->fsc->client;
if (random)
*random = false;
/* * is there a specific mds we should try? ignore hint if we have * no session and the mds is not up (active or recovering).
*/ if (req->r_resend_mds >= 0 &&
(__have_session(mdsc, req->r_resend_mds) ||
ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); return req->r_resend_mds;
}
if (mode == USE_RANDOM_MDS) goto random;
inode = NULL; if (req->r_inode) { if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
inode = req->r_inode;
ihold(inode);
} else { /* req->r_dentry is non-null for LSSNAP request */
rcu_read_lock();
inode = get_nonsnap_parent(req->r_dentry);
rcu_read_unlock();
doutc(cl, "using snapdir's parent %p %llx.%llx\n",
inode, ceph_vinop(inode));
}
} elseif (req->r_dentry) { /* ignore race with rename; old or new d_parent is okay */ struct dentry *parent; struct inode *dir;
rcu_read_lock();
parent = READ_ONCE(req->r_dentry->d_parent);
dir = req->r_parent ? : d_inode_rcu(parent);
if (!dir || dir->i_sb != mdsc->fsc->sb) { /* not this fs or parent went negative */
inode = d_inode(req->r_dentry); if (inode)
ihold(inode);
} elseif (ceph_snap(dir) != CEPH_NOSNAP) { /* direct snapped/virtual snapdir requests
* based on parent dir inode */
inode = get_nonsnap_parent(parent);
doutc(cl, "using nonsnap parent %p %llx.%llx\n",
inode, ceph_vinop(inode));
} else { /* dentry target */
inode = d_inode(req->r_dentry); if (!inode || mode == USE_AUTH_MDS) { /* dir + name */
inode = igrab(dir);
hash = ceph_dentry_hash(dir, req->r_dentry);
is_hash = true;
} else {
ihold(inode);
}
}
rcu_read_unlock();
}
if (is_hash && S_ISDIR(inode->i_mode)) { struct ceph_inode_frag frag; int found;
ceph_choose_frag(ci, hash, &frag, &found); if (found) { if (mode == USE_ANY_MDS && frag.ndist > 0) {
u8 r;
/* choose a random replica */
get_random_bytes(&r, 1);
r %= frag.ndist;
mds = frag.dist[r];
doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n",
inode, ceph_vinop(inode), frag.frag,
mds, (int)r, frag.ndist); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
CEPH_MDS_STATE_ACTIVE &&
!ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) goto out;
}
/* since this file/dir wasn't known to be * replicated, then we want to look for the
* authoritative mds. */ if (frag.mds >= 0) { /* choose auth mds */
mds = frag.mds;
doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n",
inode, ceph_vinop(inode), frag.frag, mds); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
CEPH_MDS_STATE_ACTIVE) { if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
mds)) goto out;
}
}
mode = USE_AUTH_MDS;
}
}
spin_lock(&ci->i_ceph_lock);
cap = NULL; if (mode == USE_AUTH_MDS)
cap = ci->i_auth_cap; if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) {
spin_unlock(&ci->i_ceph_lock);
iput(inode); goto random;
}
mds = cap->session->s_mds;
doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode,
ceph_vinop(inode), mds,
cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock);
out:
iput(inode); return mds;
/* Allocate the message */
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
GFP_NOFS, false); if (!msg) {
pr_err_client(cl, "ENOMEM creating session open msg\n"); return ERR_PTR(-ENOMEM);
}
p = msg->front.iov_base;
end = p + msg->front.iov_len;
h = p;
h->op = cpu_to_le32(op);
h->seq = cpu_to_le64(seq);
/* * Serialize client metadata into waiting buffer space, using * the format that userspace expects for map<string, string> * * ClientSession messages with metadata are v7
*/
msg->hdr.version = cpu_to_le16(7);
msg->hdr.compat_version = cpu_to_le16(1);
/* The write pointer, following the session_head structure */
p += sizeof(*h);
/* Number of entries in the map */
ceph_encode_32(&p, metadata_key_count);
/* Two length-prefixed strings for each entry in the map */ for (i = 0; metadata[i][0]; ++i) {
size_t const key_len = strlen(metadata[i][0]);
size_t const val_len = strlen(metadata[i][1]);
ceph_encode_32(&p, key_len);
memcpy(p, metadata[i][0], key_len);
p += key_len;
ceph_encode_32(&p, val_len);
memcpy(p, metadata[i][1], val_len);
p += val_len;
}
ret = encode_supported_features(&p, end); if (ret) {
pr_err_client(cl, "encode_supported_features failed!\n");
ceph_msg_put(msg); return ERR_PTR(ret);
}
ret = encode_metric_spec(&p, end); if (ret) {
pr_err_client(cl, "encode_metric_spec failed!\n");
ceph_msg_put(msg); return ERR_PTR(ret);
}
/* version == 5, flags */
ceph_encode_32(&p, 0);
/* version == 6, mds auth caps */
ceph_encode_32(&p, 0);
/* version == 7, oldest_client_tid */
ceph_encode_64(&p, mdsc->oldest_tid);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
return msg;
}
/* * send session open request. * * called under mdsc->mutex
*/ staticint __open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session)
{ struct ceph_msg *msg; int mstate; int mds = session->s_mds;
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) return -EIO;
/* wait for mds to go active? */
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds,
ceph_mds_state_name(mstate));
session->s_state = CEPH_MDS_SESSION_OPENING;
session->s_renew_requested = jiffies;
/* * open sessions for any export targets for the given mds * * called under mdsc->mutex
*/ staticstruct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client *mdsc, int target)
{ struct ceph_mds_session *session; int ret;
session = __ceph_lookup_mds_session(mdsc, target); if (!session) {
session = register_session(mdsc, target); if (IS_ERR(session)) return session;
} if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING) {
ret = __open_session(mdsc, session); if (ret) return ERR_PTR(ret);
}
doutc(cl, "mds%d\n", session->s_mds);
mutex_lock(&mdsc->mutex); while (!list_empty(&session->s_unsafe)) {
req = list_first_entry(&session->s_unsafe, struct ceph_mds_request, r_unsafe_item);
pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n",
req->r_tid); if (req->r_target_inode)
mapping_set_error(req->r_target_inode->i_mapping, -EIO); if (req->r_unsafe_dir)
mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
__unregister_request(mdsc, req);
} /* zero r_attempts, so kick_requests() will re-send requests */
p = rb_first(&mdsc->request_tree); while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p); if (req->r_session &&
req->r_session->s_mds == session->s_mds)
req->r_attempts = 0;
}
mutex_unlock(&mdsc->mutex);
}
/* * Helper to safely iterate over all caps associated with a session, with * special care taken to handle a racing __ceph_remove_cap(). * * Caller must hold session s_mutex.
*/ int ceph_iterate_session_caps(struct ceph_mds_session *session, int (*cb)(struct inode *, int mds, void *), void *arg)
{ struct ceph_client *cl = session->s_mdsc->fsc->client; struct list_head *p; struct ceph_cap *cap; struct inode *inode, *last_inode = NULL; struct ceph_cap *old_cap = NULL; int ret;
doutc(cl, "%p mds%d\n", session, session->s_mds);
spin_lock(&session->s_cap_lock);
p = session->s_caps.next; while (p != &session->s_caps) { int mds;
cap = list_entry(p, struct ceph_cap, session_caps);
inode = igrab(&cap->ci->netfs.inode); if (!inode) {
p = p->next; continue;
}
session->s_cap_iterator = cap;
mds = cap->mds;
spin_unlock(&session->s_cap_lock);
if (last_inode) {
iput(last_inode);
last_inode = NULL;
} if (old_cap) {
ceph_put_cap(session->s_mdsc, old_cap);
old_cap = NULL;
}
ret = cb(inode, mds, arg);
last_inode = inode;
spin_lock(&session->s_cap_lock);
p = p->next; if (!cap->ci) {
doutc(cl, "finishing cap %p removal\n", cap);
BUG_ON(cap->session != session);
cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
atomic64_dec(&session->s_mdsc->metric.total_caps); if (cap->queue_release)
__ceph_queue_cap_release(session, cap); else
old_cap = cap; /* put_cap it w/o locks held */
} if (ret < 0) goto out;
}
ret = 0;
out:
session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock);
iput(last_inode); if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap);
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds); if (cap) {
doutc(cl, " removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->netfs.inode);
spin_lock(&session->s_cap_lock); if (session->s_nr_caps > 0) { struct inode *inode; struct ceph_cap *cap, *prev = NULL; struct ceph_vino vino; /* * iterate_session_caps() skips inodes that are being * deleted, we need to wait until deletions are complete. * __wait_on_freeing_inode() is designed for the job, * but it is not exported, so use lookup inode function * to access it.
*/ while (!list_empty(&session->s_caps)) {
cap = list_entry(session->s_caps.next, struct ceph_cap, session_caps); if (cap == prev) break;
prev = cap;
vino = cap->ci->i_vino;
spin_unlock(&session->s_cap_lock);
inode = ceph_find_inode(sb, vino);
iput(inode);
spin_lock(&session->s_cap_lock);
}
}
// drop cap expires and unlock s_cap_lock
detach_cap_releases(session, &dispose);
/* * wake up any threads waiting on this session's caps. if the cap is * old (didn't get renewed on the client reconnect), remove it now. * * caller must hold s_mutex.
*/ staticint wake_up_session_cb(struct inode *inode, int mds, void *arg)
{ struct ceph_inode_info *ci = ceph_inode(inode); unsignedlong ev = (unsignedlong)arg;
/* * Send periodic message to MDS renewing all currently held caps. The * ack will reset the expiration for all caps from this session. * * caller holds s_mutex
*/ staticint send_renew_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session)
{ struct ceph_client *cl = mdsc->fsc->client; struct ceph_msg *msg; int state;
/* do not try to renew caps until a recovering mds has reconnected
* with its clients. */
state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); if (state < CEPH_MDS_STATE_RECONNECT) {
doutc(cl, "ignoring mds%d (%s)\n", session->s_mds,
ceph_mds_state_name(state)); return 0;
}
/* * Note new cap ttl, and any transition from stale -> not stale (fresh?). * * Called under session->s_mutex
*/ staticvoid renewed_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int is_renew)
{ struct ceph_client *cl = mdsc->fsc->client; int was_stale; int wake = 0;
if (all_negative)
shrink_dcache_parent(dentry);
out: return all_negative;
}
/* * Trim old(er) caps. * * Because we can't cache an inode without one or more caps, we do * this indirectly: if a cap is unused, we prune its aliases, at which * point the inode will hopefully get dropped to. * * Yes, this is a bit sloppy. Our only real goal here is to respond to * memory pressure from the MDS, though, so it needn't be perfect.
*/ staticint trim_caps_cb(struct inode *inode, int mds, void *arg)
{ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_client *cl = mdsc->fsc->client; int *remaining = arg; struct ceph_inode_info *ci = ceph_inode(inode); int used, wanted, oissued, mine; struct ceph_cap *cap;
if (*remaining <= 0) return -1;
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds); if (!cap) {
spin_unlock(&ci->i_ceph_lock); return 0;
}
mine = cap->issued | cap->implemented;
used = __ceph_caps_used(ci);
wanted = __ceph_caps_file_wanted(ci);
oissued = __ceph_caps_issued_other(ci, cap);
doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n",
inode, ceph_vinop(inode), cap, ceph_cap_string(mine),
ceph_cap_string(oissued), ceph_cap_string(used),
ceph_cap_string(wanted)); if (cap == ci->i_auth_cap) { if (ci->i_dirty_caps || ci->i_flushing_caps ||
!list_empty(&ci->i_cap_snaps)) goto out; if ((used | wanted) & CEPH_CAP_ANY_WR) goto out; /* Note: it's possible that i_filelock_ref becomes non-zero * after dropping auth caps. It doesn't hurt because reply
* of lock mds request will re-add auth caps. */ if (atomic_read(&ci->i_filelock_ref) > 0) goto out;
} /* The inode has cached pages, but it's no longer used.
* we can safely drop it */ if (S_ISREG(inode->i_mode) &&
wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
!(oissued & CEPH_CAP_FILE_CACHE)) {
used = 0;
oissued = 0;
} if ((used | wanted) & ~oissued & mine) goto out; /* we need these caps */
if (oissued) { /* we aren't the only cap.. just remove us */
ceph_remove_cap(mdsc, cap, true);
(*remaining)--;
} else { struct dentry *dentry; /* try dropping referring dentries */
spin_unlock(&ci->i_ceph_lock);
dentry = d_find_any_alias(inode); if (dentry && drop_negative_children(dentry)) { int count;
dput(dentry);
d_prune_aliases(inode);
count = atomic_read(&inode->i_count); if (count == 1)
(*remaining)--;
doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n",
inode, ceph_vinop(inode), cap, count);
} else {
dput(dentry);
} return 0;
}
out:
spin_unlock(&ci->i_ceph_lock); return 0;
}
/* * Trim session cap count down to some max number.
*/ int ceph_trim_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int max_caps)
{ struct ceph_client *cl = mdsc->fsc->client; int trim_caps = session->s_nr_caps - max_caps;
doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds,
session->s_nr_caps, max_caps, trim_caps); if (trim_caps > 0) { int remaining = trim_caps;
/** * ceph_mdsc_build_path - build a path string to a given dentry * @mdsc: mds client * @dentry: dentry to which path should be built * @path_info: output path, length, base ino+snap, and freepath ownership flag * @for_wire: is this path going to be sent to the MDS? * * Build a string that represents the path to the dentry. This is mostly called * for two different purposes: * * 1) we need to build a path string to send to the MDS (for_wire == true) * 2) we need a path string for local presentation (e.g. debugfs) * (for_wire == false) * * The path is built in reverse, starting with the dentry. Walk back up toward * the root, building the path until the first non-snapped inode is reached * (for_wire) or the root inode is reached (!for_wire). * * Encode hidden .snap dirs as a double /, i.e. * foo/.snap/bar -> foo//bar
*/ char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry, struct ceph_path_info *path_info, int for_wire)
{ struct ceph_client *cl = mdsc->fsc->client; struct dentry *cur; struct inode *inode; char *path; int pos; unsigned seq;
u64 base;
/* * Proactively copy name into buf, in case we need to * present it as-is.
*/
memcpy(buf, cur->d_name.name, cur->d_name.len);
len = cur->d_name.len;
spin_unlock(&cur->d_lock);
parent = dget_parent(cur);
ret = ceph_fscrypt_prepare_readdir(d_inode(parent)); if (ret < 0) {
dput(parent);
dput(cur); return ERR_PTR(ret);
}
if (fscrypt_has_encryption_key(d_inode(parent))) {
len = ceph_encode_encrypted_dname(d_inode(parent),
buf, len); if (len < 0) {
dput(parent);
dput(cur); return ERR_PTR(len);
}
}
pos -= len; if (pos < 0) {
dput(parent); break;
}
memcpy(path + pos, buf, len);
}
dput(cur);
cur = parent;
/* Are we at the root? */ if (IS_ROOT(cur)) break;
if (pos < 0) { /* * The path is longer than PATH_MAX and this function * cannot ever succeed. Creating paths that long is * possible with Ceph, but Linux cannot use them.
*/ return ERR_PTR(-ENAMETOOLONG);
}
/* Initialize the output structure */
memset(path_info, 0, sizeof(*path_info));
/* Set snap from dentry if available */ if (d_inode(dentry))
path_info->vino.snap = ceph_snap(d_inode(dentry)); else
path_info->vino.snap = CEPH_NOSNAP;
/* * When the parent directory's i_rwsem is *not* locked, req->r_parent may * have become stale (e.g. after a concurrent rename) between the time the * dentry was looked up and now. If we detect that the stored r_parent * does not match the inode number we just encoded for the request, switch * to the correct inode so that the MDS receives a valid parent reference.
*/ if (!parent_locked && req->r_parent && path_info1.vino.ino &&
ceph_ino(req->r_parent) != path_info1.vino.ino) { struct inode *old_parent = req->r_parent; struct inode *correct_dir = ceph_get_inode(mdsc->fsc->sb, path_info1.vino, NULL); if (!IS_ERR(correct_dir)) {
WARN_ONCE(1, "ceph: r_parent mismatch (had %llx wanted %llx) - updating\n",
ceph_ino(old_parent), path_info1.vino.ino); /* * Transfer CEPH_CAP_PIN from the old parent to the new one. * The pin was taken earlier in ceph_mdsc_submit_request().
*/
ceph_put_cap_refs(ceph_inode(old_parent), CEPH_CAP_PIN);
iput(old_parent);
req->r_parent = correct_dir;
ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
}
}
/* If r_old_dentry is set, then assume that its parent is locked */ if (req->r_old_dentry &&
!(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
old_dentry = req->r_old_dentry;
ret = set_request_path_attr(mdsc, NULL, old_dentry,
req->r_old_dentry_dir,
req->r_path2, req->r_ino2.ino,
&path_info2, true); if (ret < 0) {
msg = ERR_PTR(ret); goto out_free1;
}
/* * For old cephs without supporting the 32bit retry/fwd feature * it will copy the raw memories directly when decoding the * requests. While new cephs will decode the head depending the * version member, so we need to make sure it will be compatible * with them both.
*/ if (legacy)
len = sizeof(struct ceph_mds_request_head_legacy); elseif (request_head_version == 1)
len = offsetofend(struct ceph_mds_request_head, args); elseif (request_head_version == 2)
len = offsetofend(struct ceph_mds_request_head, ext_num_fwd); else
len = sizeof(struct ceph_mds_request_head);
/* filepaths */
len += 2 * (1 + sizeof(u32) + sizeof(u64));
len += path_info1.pathlen + path_info2.pathlen;
/* cap releases */
len += sizeof(struct ceph_mds_request_release) *
(!!req->r_inode_drop + !!req->r_dentry_drop +
!!req->r_old_inode_drop + !!req->r_old_dentry_drop);
if (req->r_dentry_drop)
len += path_info1.pathlen; if (req->r_old_dentry_drop)
len += path_info2.pathlen;
/* MClientRequest tail */
/* req->r_stamp */
len += sizeof(struct ceph_timespec);
/* gid list */
len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
/* alternate name */
len += sizeof(u32) + req->r_altname_len;
/* fscrypt_auth */
len += sizeof(u32); // fscrypt_auth if (req->r_fscrypt_auth)
len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
/* fscrypt_file */
len += sizeof(u32); if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
len += sizeof(__le64);
if ((req->r_mnt_idmap != &nop_mnt_idmap) &&
!test_bit(CEPHFS_FEATURE_HAS_OWNER_UIDGID, &session->s_features)) {
WARN_ON_ONCE(!IS_CEPH_MDS_OP_NEWINODE(req->r_op));
if (enable_unsafe_idmap) {
pr_warn_once_client(cl, "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" " is not supported by MDS. UID/GID-based restrictions may" " not work properly.\n");
caller_fsuid = from_vfsuid(req->r_mnt_idmap, &init_user_ns,
VFSUIDT_INIT(req->r_cred->fsuid));
caller_fsgid = from_vfsgid(req->r_mnt_idmap, &init_user_ns,
VFSGIDT_INIT(req->r_cred->fsgid));
} else {
pr_err_ratelimited_client(cl, "idmapped mount is used and CEPHFS_FEATURE_HAS_OWNER_UIDGID" " is not supported by MDS. Fail request with -EIO.\n");
ret = -EIO; goto out_err;
}
}
/* * The ceph_mds_request_head_legacy didn't contain a version field, and * one was added when we moved the message version from 3->4.
*/ if (legacy) {
msg->hdr.version = cpu_to_le16(3);
p = msg->front.iov_base + sizeof(*lhead);
} elseif (request_head_version == 1) { struct ceph_mds_request_head *nhead = msg->front.iov_base;
/* * called under mdsc->mutex if error, under no mutex if * success.
*/ staticvoid complete_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req)
{
req->r_end_latency = ktime_get();
if (req->r_callback)
req->r_callback(mdsc, req);
complete_all(&req->r_completion);
}
/* * Avoid infinite retrying after overflow. The client will * increase the retry count and if the MDS is old version, * so we limit to retry at most 256 times.
*/ if (req->r_attempts) {
old_max_retry = sizeof_field(struct ceph_mds_request_head,
num_retry);
old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE); if ((old_version && req->r_attempts >= old_max_retry) ||
((uint32_t)req->r_attempts >= U32_MAX)) {
pr_warn_ratelimited_client(cl, "request tid %llu seq overflow\n",
req->r_tid); return -EMULTIHOP;
}
}
req->r_attempts++; if (req->r_inode) { struct ceph_cap *cap =
ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { void *p;
/* * Replay. Do not regenerate message (and rebuild * paths, etc.); just use the original message. * Rebuilding paths will break for renames because * d_move mangles the src name.
*/
msg = req->r_request;
lhead = find_legacy_request_head(msg->front.iov_base,
session->s_con.peer_features);
mds = __choose_mds(mdsc, req, &random); if (mds < 0 ||
ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
err = -EJUKEBOX; goto finish;
}
doutc(cl, "no mds or not active, waiting for map\n");
list_add(&req->r_wait, &mdsc->waiting_for_map); return;
}
/* get, open session */
session = __ceph_lookup_mds_session(mdsc, mds); if (!session) {
session = register_session(mdsc, mds); if (IS_ERR(session)) {
err = PTR_ERR(session); goto finish;
}
}
req->r_session = ceph_get_mds_session(session);
doutc(cl, "mds%d session %p state %s\n", mds, session,
ceph_session_state_name(session->s_state));
/* * The old ceph will crash the MDSs when see unknown OPs
*/ if (req->r_feature_needed > 0 &&
!test_bit(req->r_feature_needed, &session->s_features)) {
err = -EOPNOTSUPP; goto out_session;
}
if (session->s_state != CEPH_MDS_SESSION_OPEN &&
session->s_state != CEPH_MDS_SESSION_HUNG) { /* * We cannot queue async requests since the caps and delegated * inodes are bound to the session. Just return -EJUKEBOX and * let the caller retry a sync request in that case.
*/ if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
err = -EJUKEBOX; goto out_session;
}
/* * If the session has been REJECTED, then return a hard error, * unless it's a CLEANRECOVER mount, in which case we'll queue * it to the mdsc queue.
*/ if (session->s_state == CEPH_MDS_SESSION_REJECTED) { if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
list_add(&req->r_wait, &mdsc->waiting_for_map); else
err = -EACCES; goto out_session;
}
if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING) {
err = __open_session(mdsc, session); if (err) goto out_session; /* retry the same mds later */ if (random)
req->r_resend_mds = mds;
}
list_add(&req->r_wait, &session->s_waiting); goto out_session;
}
/* send request */
req->r_resend_mds = -1; /* forget any previous mds hint */
if (req->r_request_started == 0) /* note request start time */
req->r_request_started = jiffies;
/* * For async create we will choose the auth MDS of frag in parent * directory to send the request and usually this works fine, but * if the migrated the dirtory to another MDS before it could handle * it the request will be forwarded. * * And then the auth cap will be changed.
*/ if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); struct ceph_inode_info *ci; struct ceph_cap *cap;
/* * The request maybe handled very fast and the new inode * hasn't been linked to the dentry yet. We need to wait * for the ceph_finish_async_create(), which shouldn't be * stuck too long or fail in thoery, to finish when forwarding * the request.
*/ if (!d_inode(req->r_dentry)) {
err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
TASK_KILLABLE); if (err) {
mutex_lock(&req->r_fill_mutex);
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
mutex_unlock(&req->r_fill_mutex); goto out_session;
}
}
ci = ceph_inode(d_inode(req->r_dentry));
spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap; if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
doutc(cl, "session changed for auth cap %d -> %d\n",
cap->session->s_mds, session->s_mds);
/* Remove the auth cap from old session */
spin_lock(&cap->session->s_cap_lock);
cap->session->s_nr_caps--;
list_del_init(&cap->session_caps);
spin_unlock(&cap->session->s_cap_lock);
/* Add the auth cap to the new session */
cap->mds = mds;
cap->session = session;
spin_lock(&session->s_cap_lock);
session->s_nr_caps++;
list_add_tail(&cap->session_caps, &session->s_caps);
spin_unlock(&session->s_cap_lock);
while (!list_empty(&tmp_list)) {
req = list_entry(tmp_list.next, struct ceph_mds_request, r_wait);
list_del_init(&req->r_wait);
doutc(cl, " wake request %p tid %llu\n", req,
req->r_tid);
__do_request(mdsc, req);
}
}
/* * Wake up threads with requests pending for @mds, so that they can * resubmit their requests to a possibly different mds.
*/ staticvoid kick_requests(struct ceph_mds_client *mdsc, int mds)
{ struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req; struct rb_node *p = rb_first(&mdsc->request_tree);
doutc(cl, "kick_requests mds%d\n", mds); while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p); if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) continue; if (req->r_attempts > 0) continue; /* only new requests */ if (req->r_session &&
req->r_session->s_mds == mds) {
doutc(cl, " kicking tid %llu\n", req->r_tid);
list_del_init(&req->r_wait);
__do_request(mdsc, req);
}
}
}
int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req)
{ struct ceph_client *cl = mdsc->fsc->client; int err = 0;
/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ if (req->r_inode)
ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); if (req->r_parent) { struct ceph_inode_info *ci = ceph_inode(req->r_parent); int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
spin_lock(&ci->i_ceph_lock);
ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
__ceph_touch_fmode(ci, mdsc, fmode);
spin_unlock(&ci->i_ceph_lock);
} if (req->r_old_dentry_dir)
ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
if (req->r_inode) {
err = ceph_wait_on_async_create(req->r_inode); if (err) {
doutc(cl, "wait for async create returned: %d\n", err); return err;
}
}
if (!err && req->r_old_inode) {
err = ceph_wait_on_async_create(req->r_old_inode); if (err) {
doutc(cl, "wait for async create returned: %d\n", err); return err;
}
}
/* only abort if we didn't race with a real reply */ if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
err = le32_to_cpu(req->r_reply_info.head->result);
} elseif (err < 0) {
doutc(cl, "aborted request %lld with %d\n", req->r_tid, err);
/* * ensure we aren't running concurrently with * ceph_fill_trace or ceph_readdir_prepopulate, which * rely on locks (dir mutex) held by our caller.
*/
mutex_lock(&req->r_fill_mutex);
req->r_err = err;
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
mutex_unlock(&req->r_fill_mutex);
/* * Synchrously perform an mds request. Take care of all of the * session setup, forwarding, retry details.
*/ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req)
{ struct ceph_client *cl = mdsc->fsc->client; int err;
ceph_dir_clear_complete(dir); if (old_dir)
ceph_dir_clear_complete(old_dir); if (req->r_dentry)
ceph_invalidate_dentry_lease(req->r_dentry); if (req->r_old_dentry)
ceph_invalidate_dentry_lease(req->r_old_dentry);
}
/* * Handle mds reply. * * We take the session mutex and parse and process the reply immediately. * This preserves the logical ordering of replies, capabilities, etc., sent * by the MDS as they are applied to our local cache.
*/ staticvoid handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
{ struct ceph_mds_client *mdsc = session->s_mdsc; struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_request *req; struct ceph_mds_reply_head *head = msg->front.iov_base; struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ struct ceph_snap_realm *realm;
u64 tid; int err, result; int mds = session->s_mds; bool close_sessions = false;
/* get request, session */
tid = le64_to_cpu(msg->hdr.tid);
mutex_lock(&mdsc->mutex);
req = lookup_get_request(mdsc, tid); if (!req) {
doutc(cl, "on unknown tid %llu\n", tid);
mutex_unlock(&mdsc->mutex); return;
}
doutc(cl, "handle_reply %p\n", req);
/* correct session? */ if (req->r_session != session) {
pr_err_client(cl, "got %llu on session mds%d not mds%d\n",
tid, session->s_mds,
req->r_session ? req->r_session->s_mds : -1);
mutex_unlock(&mdsc->mutex); goto out;
}
/* dup? */ if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
(test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
pr_warn_client(cl, "got a dup %s reply on %llu from mds%d\n",
head->safe ? "safe" : "unsafe", tid, mds);
mutex_unlock(&mdsc->mutex); goto out;
} if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
pr_warn_client(cl, "got unsafe after safe on %llu from mds%d\n",
tid, mds);
mutex_unlock(&mdsc->mutex); goto out;
}
result = le32_to_cpu(head->result);
if (head->safe) {
set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
__unregister_request(mdsc, req);
/* last request during umount? */ if (mdsc->stopping && !__get_oldest_req(mdsc))
complete_all(&mdsc->safe_umount_waiters);
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { /* * We already handled the unsafe response, now do the * cleanup. No need to examine the response; the MDS * doesn't include any result info in the safe * response. And even if it did, there is nothing * useful we could do with a revised return value.
*/
doutc(cl, "got safe reply %llu, mds%d\n", tid, mds);
/* Must find target inode outside of mutexes to avoid deadlocks */
rinfo = &req->r_reply_info; if ((err >= 0) && rinfo->head->is_target) { struct inode *in = xchg(&req->r_new_inode, NULL); struct ceph_vino tvino = {
.ino = le64_to_cpu(rinfo->targeti.in->ino),
.snap = le64_to_cpu(rinfo->targeti.in->snapid)
};
/* * If we ended up opening an existing inode, discard * r_new_inode
*/ if (req->r_op == CEPH_MDS_OP_CREATE &&
!req->r_reply_info.has_create_ino) { /* This should never happen on an async create */
WARN_ON_ONCE(req->r_deleg_ino);
iput(in);
in = NULL;
}
in = ceph_get_inode(mdsc->fsc->sb, tvino, in); if (IS_ERR(in)) {
err = PTR_ERR(in);
mutex_lock(&session->s_mutex); goto out_err;
}
req->r_target_inode = in;
}
mutex_lock(&mdsc->mutex);
req = lookup_get_request(mdsc, tid); if (!req) {
mutex_unlock(&mdsc->mutex);
doutc(cl, "forward tid %llu to mds%d - req dne\n", tid, next_mds); return; /* dup reply? */
}
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
doutc(cl, "forward tid %llu aborted, unregistering\n", tid);
__unregister_request(mdsc, req);
} elseif (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) { /* * Avoid infinite retrying after overflow. * * The MDS will increase the fwd count and in client side * if the num_fwd is less than the one saved in request * that means the MDS is an old version and overflowed of * 8 bits.
*/
mutex_lock(&req->r_fill_mutex);
req->r_err = -EMULTIHOP;
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
mutex_unlock(&req->r_fill_mutex);
aborted = true;
pr_warn_ratelimited_client(cl, "forward tid %llu seq overflow\n",
tid);
} else { /* resend. forward race not possible; mds would drop */
doutc(cl, "forward tid %llu to mds%d (we resend)\n", tid, next_mds);
BUG_ON(req->r_err);
BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
req->r_attempts = 0;
req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds;
put_request_session(req);
__do_request(mdsc, req);
}
mutex_unlock(&mdsc->mutex);
/* kick calling process */ if (aborted)
complete_request(mdsc, req);
ceph_mdsc_put_request(req); return;
if (session->s_state == CEPH_MDS_SESSION_HUNG) {
session->s_state = CEPH_MDS_SESSION_OPEN;
pr_info_client(cl, "mds%d came back\n", session->s_mds);
}
switch (op) { case CEPH_SESSION_OPEN: if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
pr_info_client(cl, "mds%d reconnect success\n",
session->s_mds);
session->s_features = features; if (session->s_state == CEPH_MDS_SESSION_OPEN) {
pr_notice_client(cl, "mds%d is already opened\n",
session->s_mds);
} else {
session->s_state = CEPH_MDS_SESSION_OPEN;
renewed_caps(mdsc, session, 0); if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
&session->s_features))
metric_schedule_delayed(&mdsc->metric);
}
/* * The connection maybe broken and the session in client * side has been reinitialized, need to update the seq * anyway.
*/ if (!session->s_seq && seq)
session->s_seq = seq;
wake = 1; if (mdsc->stopping)
__close_session(mdsc, session); break;
case CEPH_SESSION_RENEWCAPS: if (session->s_renew_seq == seq)
renewed_caps(mdsc, session, 1); break;
case CEPH_SESSION_CLOSE: if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
pr_info_client(cl, "mds%d reconnect denied\n",
session->s_mds);
session->s_state = CEPH_MDS_SESSION_CLOSED;
cleanup_session_requests(mdsc, session);
remove_session_caps(session);
wake = 2; /* for good measure */
wake_up_all(&mdsc->session_close_wq); break;
case CEPH_SESSION_STALE:
pr_info_client(cl, "mds%d caps went stale, renewing\n",
session->s_mds);
atomic_inc(&session->s_cap_gen);
session->s_cap_ttl = jiffies - 1;
send_renew_caps(mdsc, session); break;
case CEPH_SESSION_RECALL_STATE:
ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); break;
case CEPH_SESSION_FLUSHMSG: /* flush cap releases */
spin_lock(&session->s_cap_lock); if (session->s_num_cap_releases)
ceph_flush_session_cap_releases(mdsc, session);
spin_unlock(&session->s_cap_lock);
/* * also re-send old requests when MDS enters reconnect stage. So that MDS * can process completed request in clientreplay stage.
*/
p = rb_first(&mdsc->request_tree); while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p); if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) continue; if (req->r_attempts == 0) continue; /* only old requests */ if (!req->r_session) continue; if (req->r_session->s_mds != session->s_mds) continue;
/* * Encode information about a cap for a reconnect with the MDS.
*/ staticint reconnect_caps_cb(struct inode *inode, int mds, void *arg)
{ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_client *cl = ceph_inode_to_client(inode); union { struct ceph_mds_cap_reconnect v2; struct ceph_mds_cap_reconnect_v1 v1;
} rec; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_reconnect_state *recon_state = arg; struct ceph_pagelist *pagelist = recon_state->pagelist; struct dentry *dentry; struct ceph_cap *cap; struct ceph_path_info path_info = {0}; int err;
u64 snap_follows;
dentry = d_find_primary(inode); if (dentry) { /* set pathbase to parent dir when msg_version >= 2 */ char *path = ceph_mdsc_build_path(mdsc, dentry, &path_info,
recon_state->msg_version >= 2);
dput(dentry); if (IS_ERR(path)) {
err = PTR_ERR(path); goto out_err;
}
}
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds); if (!cap) {
spin_unlock(&ci->i_ceph_lock);
err = 0; goto out_err;
}
doutc(cl, " adding %p ino %llx.%llx cap %p %lld %s\n", inode,
ceph_vinop(inode), cap, cap->cap_id,
ceph_cap_string(cap->issued));
cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */
cap->mseq = 0; /* and migrate_seq */
cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
/* These are lost when the session goes away */ if (S_ISDIR(inode->i_mode)) { if (cap->issued & CEPH_CAP_DIR_CREATE) {
ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
}
cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
}
if (recon_state->msg_version >= 4) {
err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); if (err < 0) goto fail;
}
/* * snaprealms. we provide mds with the ino, seq (version), and * parent for all of our realms. If the mds has any newer info, * it will tell us.
*/ for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { struct ceph_snap_realm *realm =
rb_entry(p, struct ceph_snap_realm, node); struct ceph_mds_snaprealm_reconnect sr_rec;
if (recon_state->msg_version >= 4) {
size_t need = sizeof(u8) * 2 + sizeof(u32) + sizeof(sr_rec);
if (pagelist->length + need > RECONNECT_MAX_SIZE) {
err = send_reconnect_partial(recon_state); if (err) goto fail;
pagelist = recon_state->pagelist;
}
err = ceph_pagelist_reserve(pagelist, need); if (err) goto fail;
err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); if (err) goto fail;
recon_state->nr_realms++;
}
fail: return err;
}
/* * If an MDS fails and recovers, clients need to reconnect in order to * reestablish shared state. This includes all caps issued through * this session _and_ the snap_realm hierarchy. Because it's not * clear which snap realms the mds cares about, we send everything we * know about.. that ensures we'll then get any new info the * recovering MDS might have. * * This is a relatively heavyweight operation, but it's rare.
*/ staticvoid send_mds_reconnect(struct ceph_mds_client *mdsc, struct ceph_mds_session *session)
{ struct ceph_client *cl = mdsc->fsc->client; struct ceph_msg *reply; int mds = session->s_mds; int err = -ENOMEM; struct ceph_reconnect_state recon_state = {
.session = session,
};
LIST_HEAD(dispose);
doutc(cl, "session %p state %s\n", session,
ceph_session_state_name(session->s_state));
atomic_inc(&session->s_cap_gen);
spin_lock(&session->s_cap_lock); /* don't know if session is readonly */
session->s_readonly = 0; /* * notify __ceph_remove_cap() that we are composing cap reconnect. * If a cap get released before being added to the cap reconnect, * __ceph_remove_cap() should skip queuing cap release.
*/
session->s_cap_reconnect = 1; /* drop old cap expires; we're about to reestablish that state */
detach_cap_releases(session, &dispose);
spin_unlock(&session->s_cap_lock);
dispose_cap_releases(mdsc, &dispose);
/* trim unused caps to reduce MDS's cache rejoin time */ if (mdsc->fsc->sb->s_root)
shrink_dcache_parent(mdsc->fsc->sb->s_root);
/* * compare old and new mdsmaps, kicking requests * and closing out old connections as necessary * * called under mdsc->mutex.
*/ staticvoid check_new_map(struct ceph_mds_client *mdsc, struct ceph_mdsmap *newmap, struct ceph_mdsmap *oldmap)
{ int i, j, err; int oldstate, newstate; struct ceph_mds_session *s; unsignedlong targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsignedlong))] = {0}; struct ceph_client *cl = mdsc->fsc->client;
doutc(cl, "new %u old %u\n", newmap->m_epoch, oldmap->m_epoch);
if (newmap->m_info) { for (i = 0; i < newmap->possible_max_rank; i++) { for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
set_bit(newmap->m_info[i].export_targets[j], targets);
}
}
for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { if (!mdsc->sessions[i]) continue;
s = mdsc->sessions[i];
oldstate = ceph_mdsmap_get_state(oldmap, i);
newstate = ceph_mdsmap_get_state(newmap, i);
if (i >= newmap->possible_max_rank) { /* force close session for stopped mds */
ceph_get_mds_session(s);
__unregister_session(mdsc, s);
__wake_requests(mdsc, &s->s_waiting);
mutex_unlock(&mdsc->mutex);
/* * kick request on any mds that has gone active.
*/ if (oldstate < CEPH_MDS_STATE_ACTIVE &&
newstate >= CEPH_MDS_STATE_ACTIVE) { if (oldstate != CEPH_MDS_STATE_CREATING &&
oldstate != CEPH_MDS_STATE_STARTING)
pr_info_client(cl, "mds%d recovery completed\n",
s->s_mds);
kick_requests(mdsc, i);
mutex_unlock(&mdsc->mutex);
mutex_lock(&s->s_mutex);
mutex_lock(&mdsc->mutex);
ceph_kick_flushing_caps(mdsc, s);
mutex_unlock(&s->s_mutex);
wake_up_session_caps(s, RECONNECT);
}
}
/* * Only open and reconnect sessions that don't exist yet.
*/ for (i = 0; i < newmap->possible_max_rank; i++) { /* * In case the import MDS is crashed just after * the EImportStart journal is flushed, so when * a standby MDS takes over it and is replaying * the EImportStart journal the new MDS daemon * will wait the client to reconnect it, but the * client may never register/open the session yet. * * Will try to reconnect that MDS daemon if the * rank number is in the export targets array and * is the up:reconnect state.
*/
newstate = ceph_mdsmap_get_state(newmap, i); if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) continue;
/* * The session maybe registered and opened by some * requests which were choosing random MDSes during * the mdsc->mutex's unlock/lock gap below in rare * case. But the related MDS daemon will just queue * that requests and be still waiting for the client's * reconnection request in up:reconnect state.
*/
s = __ceph_lookup_mds_session(mdsc, i); if (likely(!s)) {
s = __open_export_target_session(mdsc, i); if (IS_ERR(s)) {
err = PTR_ERR(s);
pr_err_client(cl, "failed to open export target session, err %d\n",
err); continue;
}
}
doutc(cl, "send reconnect to export target mds.%d\n", i);
mutex_unlock(&mdsc->mutex);
send_mds_reconnect(mdsc, s);
ceph_put_mds_session(s);
mutex_lock(&mdsc->mutex);
}
for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
s = mdsc->sessions[i]; if (!s) continue; if (!ceph_mdsmap_is_laggy(newmap, i)) continue; if (s->s_state == CEPH_MDS_SESSION_OPEN ||
s->s_state == CEPH_MDS_SESSION_HUNG ||
s->s_state == CEPH_MDS_SESSION_CLOSING) {
doutc(cl, " connecting to export targets of laggy mds%d\n", i);
__open_export_target_sessions(mdsc, s);
}
}
}
/* * leases
*/
/* * caller must hold session s_mutex, dentry->d_lock
*/ void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
{ struct ceph_dentry_info *di = ceph_dentry(dentry);
switch (s->s_state) { case CEPH_MDS_SESSION_OPEN: if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
s->s_state = CEPH_MDS_SESSION_HUNG;
pr_info_client(cl, "mds%d hung\n", s->s_mds);
} break; case CEPH_MDS_SESSION_CLOSING: case CEPH_MDS_SESSION_NEW: case CEPH_MDS_SESSION_RESTARTING: case CEPH_MDS_SESSION_CLOSED: case CEPH_MDS_SESSION_REJECTED: returnfalse;
}
returntrue;
}
/* * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, * then we need to retransmit that request.
*/ void inc_session_sequence(struct ceph_mds_session *s)
{ struct ceph_client *cl = s->s_mdsc->fsc->client;
lockdep_assert_held(&s->s_mutex);
s->s_seq++;
if (s->s_state == CEPH_MDS_SESSION_CLOSING) { int ret;
doutc(cl, "resending session close request for mds%d\n", s->s_mds);
ret = request_close_session(s); if (ret < 0)
pr_err_client(cl, "unable to close session to mds%d: %d\n",
s->s_mds, ret);
}
}
/* * delayed work -- periodically trim expired leases, renew caps with mds. If * the @delay parameter is set to 0 or if it's more than 5 secs, the default * workqueue delay value of 5 secs will be used.
*/ staticvoid schedule_delayed(struct ceph_mds_client *mdsc, unsignedlong delay)
{ unsignedlong max_delay = HZ * 5;
/* * Wait for safe replies on open mds requests. If we time out, drop * all requests from the tree to avoid dangling dentry refs.
*/ staticvoid wait_requests(struct ceph_mds_client *mdsc)
{ struct ceph_client *cl = mdsc->fsc->client; struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_request *req;
mutex_lock(&mdsc->mutex); if (__get_oldest_req(mdsc)) {
mutex_unlock(&mdsc->mutex);
doutc(cl, "waiting for requests\n");
wait_for_completion_timeout(&mdsc->safe_umount_waiters,
ceph_timeout_jiffies(opts->mount_timeout));
/* tear down remaining requests */
mutex_lock(&mdsc->mutex); while ((req = __get_oldest_req(mdsc))) {
doutc(cl, "timed out on tid %llu\n", req->r_tid);
list_del_init(&req->r_wait);
__unregister_request(mdsc, req);
}
}
mutex_unlock(&mdsc->mutex);
doutc(cl, "done\n");
}
if (!root_squash_perms) {
doutc(cl, "root_squash is enabled and user(%d %d) isn't allowed to write",
caller_uid, caller_gid);
} if (rw_perms_s) {
doutc(cl, "mds auth caps readable/writeable %d/%d while request r/w %d/%d",
rw_perms_s->readable, rw_perms_s->writeable,
!!(mask & MAY_READ), !!(mask & MAY_WRITE));
}
doutc(cl, "access denied\n"); return -EACCES;
}
/* * called before mount is ro, and before dentries are torn down. * (hmm, does this still race with new lookups?)
*/ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
{
doutc(mdsc->fsc->client, "begin\n");
mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
mutex_lock(&mdsc->mutex);
ceph_mdsc_put_request(req); if (!nextreq) break; /* next dne before, so we're done! */ if (RB_EMPTY_NODE(&nextreq->r_node)) { /* next request was removed from tree */
ceph_mdsc_put_request(nextreq); goto restart;
}
ceph_mdsc_put_request(nextreq); /* won't go away */
}
req = nextreq;
}
mutex_unlock(&mdsc->mutex);
ceph_put_mds_session(last_session);
doutc(cl, "done\n");
}
/* * true if all sessions are closed, or we force unmount
*/ staticbool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
{ if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) returntrue; return atomic_read(&mdsc->num_sessions) <= skipped;
}
/* * called after sb is ro or when metadata corrupted.
*/ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
{ struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_session *session; int i; int skipped = 0;
doutc(cl, "begin\n");
/* close sessions */
mutex_lock(&mdsc->mutex); for (i = 0; i < mdsc->max_sessions; i++) {
session = __ceph_lookup_mds_session(mdsc, i); if (!session) continue;
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex); if (__close_session(mdsc, session) <= 0)
skipped++;
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
mutex_lock(&mdsc->mutex);
}
mutex_unlock(&mdsc->mutex);
doutc(cl, "waiting for sessions to close\n");
wait_event_timeout(mdsc->session_close_wq,
done_closing_sessions(mdsc, skipped),
ceph_timeout_jiffies(opts->mount_timeout));
/* tear down remaining sessions */
mutex_lock(&mdsc->mutex); for (i = 0; i < mdsc->max_sessions; i++) { if (mdsc->sessions[i]) {
session = ceph_get_mds_session(mdsc->sessions[i]);
__unregister_session(mdsc, session);
mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
remove_session_caps(session);
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
mutex_lock(&mdsc->mutex);
}
}
WARN_ON(!list_empty(&mdsc->cap_delay_list));
mutex_unlock(&mdsc->mutex);
staticvoid ceph_mdsc_stop(struct ceph_mds_client *mdsc)
{
doutc(mdsc->fsc->client, "stop\n"); /* * Make sure the delayed work stopped before releasing * the resources. * * Because the cancel_delayed_work_sync() will only * guarantee that the work finishes executing. But the * delayed work will re-arm itself again after that.
*/
flush_delayed_work(&mdsc->delayed_work);
if (mdsc->mdsmap)
ceph_mdsmap_destroy(mdsc->mdsmap);
kfree(mdsc->sessions);
ceph_caps_finalize(mdsc);
if (mdsc->s_cap_auths) { int i;
for (i = 0; i < mdsc->s_cap_auths_num; i++) {
kfree(mdsc->s_cap_auths[i].match.gids);
kfree(mdsc->s_cap_auths[i].match.path);
kfree(mdsc->s_cap_auths[i].match.fs_name);
}
kfree(mdsc->s_cap_auths);
}
/* * if the client is unresponsive for long enough, the mds will kill * the session entirely.
*/ staticvoid mds_peer_reset(struct ceph_connection *con)
{ struct ceph_mds_session *s = con->private; struct ceph_mds_client *mdsc = s->s_mdsc;
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.129Bemerkung:
(vorverarbeitet am 2026-04-26)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.