/* * Capability management * * The Ceph metadata servers control client access to inode metadata * and file data by issuing capabilities, granting clients permission * to read and/or write both inode field and file data to OSDs * (storage nodes). Each capability consists of a set of bits * indicating which operations are allowed. * * If the client holds a *_SHARED cap, the client has a coherent value * that can be safely read from the cached inode. * * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the * client is allowed to change inode attributes (e.g., file size, * mtime), note its dirty state in the ceph_cap, and asynchronously * flush that metadata change to the MDS. * * In the event of a conflicting operation (perhaps by another * client), the MDS will revoke the conflicting client capabilities. * * In order for a client to cache an inode, it must hold a capability * with at least one MDS server. When inodes are released, release * notifications are batched and periodically sent en masse to the MDS * cluster to release server state.
*/
/* * Called under mdsc->mutex.
*/ int ceph_reserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx, int need)
{ struct ceph_client *cl = mdsc->fsc->client; int i, j; struct ceph_cap *cap; int have; int alloc = 0; int max_caps; int err = 0; bool trimmed = false; struct ceph_mds_session *s;
LIST_HEAD(newcaps);
doutc(cl, "ctx=%p need=%d\n", ctx, need);
/* first reserve any caps that are already allocated */
spin_lock(&mdsc->caps_list_lock); if (mdsc->caps_avail_count >= need)
have = need; else
have = mdsc->caps_avail_count;
mdsc->caps_avail_count -= have;
mdsc->caps_reserve_count += have;
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
mdsc->caps_reserve_count +
mdsc->caps_avail_count);
spin_unlock(&mdsc->caps_list_lock);
for (i = have; i < need; ) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); if (cap) {
list_add(&cap->caps_item, &newcaps);
alloc++;
i++; continue;
}
if (!trimmed) { for (j = 0; j < mdsc->max_sessions; j++) {
s = __ceph_lookup_mds_session(mdsc, j); if (!s) continue;
mutex_unlock(&mdsc->mutex);
spin_lock(&mdsc->caps_list_lock); if (mdsc->caps_avail_count) { int more_have; if (mdsc->caps_avail_count >= need - i)
more_have = need - i; else
more_have = mdsc->caps_avail_count;
i += more_have;
have += more_have;
mdsc->caps_avail_count -= more_have;
mdsc->caps_reserve_count += more_have;
}
spin_unlock(&mdsc->caps_list_lock);
continue;
}
pr_warn_client(cl, "ctx=%p ENOMEM need=%d got=%d\n", ctx, need,
have + alloc);
err = -ENOMEM; break;
}
/* temporary, until we do something about cap import/export */ if (!ctx) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); if (cap) {
spin_lock(&mdsc->caps_list_lock);
mdsc->caps_use_count++;
mdsc->caps_total_count++;
spin_unlock(&mdsc->caps_list_lock);
} else {
spin_lock(&mdsc->caps_list_lock); if (mdsc->caps_avail_count) {
BUG_ON(list_empty(&mdsc->caps_list));
mdsc->caps_avail_count--;
mdsc->caps_use_count++;
cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
list_del(&cap->caps_item);
void ceph_reservation_status(struct ceph_fs_client *fsc, int *total, int *avail, int *used, int *reserved, int *min)
{ struct ceph_mds_client *mdsc = fsc->mdsc;
spin_lock(&mdsc->caps_list_lock);
if (total)
*total = mdsc->caps_total_count; if (avail)
*avail = mdsc->caps_avail_count; if (used)
*used = mdsc->caps_use_count; if (reserved)
*reserved = mdsc->caps_reserve_count; if (min)
*min = mdsc->caps_min_count;
spin_unlock(&mdsc->caps_list_lock);
}
/* * Find ceph_cap for given mds, if any. * * Called with i_ceph_lock held.
*/ struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
{ struct ceph_cap *cap; struct rb_node *n = ci->i_caps.rb_node;
while (n) {
cap = rb_entry(n, struct ceph_cap, ci_node); if (mds < cap->mds)
n = n->rb_left; elseif (mds > cap->mds)
n = n->rb_right; else return cap;
} return NULL;
}
/* * (re)set cap hold timeouts, which control the delayed release * of unused caps back to the MDS. Should be called on cap use.
*/ staticvoid __cap_set_timeouts(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci)
{ struct inode *inode = &ci->netfs.inode; struct ceph_mount_options *opt = mdsc->fsc->mount_options;
/* * (Re)queue cap at the end of the delayed cap release list. * * If I_FLUSH is set, leave the inode at the front of the list. * * Caller holds i_ceph_lock * -> we take mdsc->cap_delay_lock
*/ staticvoid __cap_delay_requeue(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci)
{ struct inode *inode = &ci->netfs.inode;
doutc(mdsc->fsc->client, "%p %llx.%llx flags 0x%lx at %lu\n",
inode, ceph_vinop(inode), ci->i_ceph_flags,
ci->i_hold_caps_max); if (!mdsc->stopping) {
spin_lock(&mdsc->cap_delay_lock); if (!list_empty(&ci->i_cap_delay_list)) { if (ci->i_ceph_flags & CEPH_I_FLUSH) goto no_change;
list_del_init(&ci->i_cap_delay_list);
}
__cap_set_timeouts(mdsc, ci);
list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
no_change:
spin_unlock(&mdsc->cap_delay_lock);
}
}
/* * Queue an inode for immediate writeback. Mark inode with I_FLUSH, * indicating we should send a cap message to flush dirty metadata * asap, and move to the front of the delayed cap list.
*/ staticvoid __cap_delay_requeue_front(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci)
{ struct inode *inode = &ci->netfs.inode;
/* * Each time we receive FILE_CACHE anew, we increment * i_rdcache_gen.
*/ if (S_ISREG(ci->netfs.inode.i_mode) &&
(issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
(had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
ci->i_rdcache_gen++;
}
/* * If FILE_SHARED is newly issued, mark dir not complete. We don't * know what happened to this directory while we didn't have the cap. * If FILE_SHARED is being revoked, also mark dir not complete. It * stops on-going cached readdir.
*/ if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) { if (issued & CEPH_CAP_FILE_SHARED)
atomic_inc(&ci->i_shared_gen); if (S_ISDIR(ci->netfs.inode.i_mode)) {
doutc(cl, " marking %p NOT complete\n", inode);
__ceph_dir_clear_complete(ci);
}
}
/** * change_auth_cap_ses - move inode to appropriate lists when auth caps change * @ci: inode to be moved * @session: new auth caps session
*/ void change_auth_cap_ses(struct ceph_inode_info *ci, struct ceph_mds_session *session)
{
lockdep_assert_held(&ci->i_ceph_lock);
if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item)) return;
spin_lock(&session->s_mdsc->cap_dirty_lock); if (!list_empty(&ci->i_dirty_item))
list_move(&ci->i_dirty_item, &session->s_cap_dirty); if (!list_empty(&ci->i_flushing_item))
list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
spin_unlock(&session->s_mdsc->cap_dirty_lock);
}
/* * Add a capability under the given MDS session. * * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock * * @fmode is the open file mode, if we are opening a file, otherwise * it is < 0. (This is so we can atomically add the cap and add an * open file reference to it.)
*/ void ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, u64 cap_id, unsigned issued, unsigned wanted, unsigned seq, unsigned mseq, u64 realmino, int flags, struct ceph_cap **new_cap)
{ struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; int mds = session->s_mds; int actual_wanted;
u32 gen;
/* add to session cap list */
cap->session = session;
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps, &session->s_caps);
session->s_nr_caps++;
atomic64_inc(&mdsc->metric.total_caps);
spin_unlock(&session->s_cap_lock);
} else {
spin_lock(&session->s_cap_lock);
list_move_tail(&cap->session_caps, &session->s_caps);
spin_unlock(&session->s_cap_lock);
if (cap->cap_gen < gen)
cap->issued = cap->implemented = CEPH_CAP_PIN;
/* * auth mds of the inode changed. we received the cap export * message, but still haven't received the cap import message. * handle_cap_export() updated the new auth MDS' cap. * * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing * a message that was send before the cap import message. So * don't remove caps.
*/ if (ceph_seq_cmp(seq, cap->seq) <= 0) {
WARN_ON(cap != ci->i_auth_cap);
WARN_ON(cap->cap_id != cap_id);
seq = cap->seq;
mseq = cap->mseq;
issued |= cap->issued;
flags |= CEPH_CAP_FLAG_AUTH;
}
}
/* * If we are issued caps we don't want, or the mds' wanted * value appears to be off, queue a check so we'll release * later and/or update the mds wanted value.
*/
actual_wanted = __ceph_caps_wanted(ci); if ((wanted & ~actual_wanted) ||
(issued & ~actual_wanted & CEPH_CAP_ANY_WR)) {
doutc(cl, "issued %s, mds wanted %s, actual %s, queueing\n",
ceph_cap_string(issued), ceph_cap_string(wanted),
ceph_cap_string(actual_wanted));
__cap_delay_requeue(mdsc, ci);
}
/* * Return true if cap has not timed out and belongs to the current * generation of the MDS session (i.e. has not gone 'stale' due to * us losing touch with the mds).
*/ staticint __cap_is_valid(struct ceph_cap *cap)
{ struct inode *inode = &cap->ci->netfs.inode; struct ceph_client *cl = cap->session->s_mdsc->fsc->client; unsignedlong ttl;
u32 gen;
gen = atomic_read(&cap->session->s_cap_gen);
ttl = cap->session->s_cap_ttl;
if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
doutc(cl, "%p %llx.%llx cap %p issued %s but STALE (gen %u vs %u)\n",
inode, ceph_vinop(inode), cap,
ceph_cap_string(cap->issued), cap->cap_gen, gen); return 0;
}
return 1;
}
/* * Return set of valid cap bits issued to us. Note that caps time * out, and may be invalidated in bulk if the client session times out * and session->s_cap_gen is bumped.
*/ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
{ struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); int have = ci->i_snap_caps; struct ceph_cap *cap; struct rb_node *p;
if (implemented)
*implemented = 0; for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (!__cap_is_valid(cap)) continue;
doutc(cl, "%p %llx.%llx cap %p issued %s\n", inode,
ceph_vinop(inode), cap, ceph_cap_string(cap->issued));
have |= cap->issued; if (implemented)
*implemented |= cap->implemented;
} /* * exclude caps issued by non-auth MDS, but are been revoking * by the auth MDS. The non-auth MDS should be revoking/exporting * these caps, but the message is delayed.
*/ if (ci->i_auth_cap) {
cap = ci->i_auth_cap;
have &= ~cap->implemented | cap->issued;
} return have;
}
/* * Get cap bits issued by caps other than @ocap
*/ int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
{ int have = ci->i_snap_caps; struct ceph_cap *cap; struct rb_node *p;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (cap == ocap) continue; if (!__cap_is_valid(cap)) continue;
have |= cap->issued;
} return have;
}
/* * Move a cap to the end of the LRU (oldest caps at list head, newest * at list tail).
*/ staticvoid __touch_cap(struct ceph_cap *cap)
{ struct inode *inode = &cap->ci->netfs.inode; struct ceph_mds_session *s = cap->session; struct ceph_client *cl = s->s_mdsc->fsc->client;
spin_lock(&s->s_cap_lock); if (!s->s_cap_iterator) {
doutc(cl, "%p %llx.%llx cap %p mds%d\n", inode,
ceph_vinop(inode), cap, s->s_mds);
list_move_tail(&cap->session_caps, &s->s_caps);
} else {
doutc(cl, "%p %llx.%llx cap %p mds%d NOP, iterating over caps\n",
inode, ceph_vinop(inode), cap, s->s_mds);
}
spin_unlock(&s->s_cap_lock);
}
/* * Check if we hold the given mask. If so, move the cap(s) to the * front of their respective LRUs. (This is the preferred way for * callers to check for caps they want.)
*/ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
{ struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_cap *cap; struct rb_node *p; int have = ci->i_snap_caps;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (!__cap_is_valid(cap)) continue; if ((cap->issued & mask) == mask) {
doutc(cl, "mask %p %llx.%llx cap %p issued %s (mask %s)\n",
inode, ceph_vinop(inode), cap,
ceph_cap_string(cap->issued),
ceph_cap_string(mask)); if (touch)
__touch_cap(cap); return 1;
}
/* does a combination of caps satisfy mask? */
have |= cap->issued; if ((have & mask) == mask) {
doutc(cl, "mask %p %llx.%llx combo issued %s (mask %s)\n",
inode, ceph_vinop(inode),
ceph_cap_string(cap->issued),
ceph_cap_string(mask)); if (touch) { struct rb_node *q;
/* touch this + preceding caps */
__touch_cap(cap); for (q = rb_first(&ci->i_caps); q != p;
q = rb_next(q)) {
cap = rb_entry(q, struct ceph_cap,
ci_node); if (!__cap_is_valid(cap)) continue; if (cap->issued & mask)
__touch_cap(cap);
}
} return 1;
}
}
return 0;
}
int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask, int touch)
{ struct ceph_fs_client *fsc = ceph_sb_to_fs_client(ci->netfs.inode.i_sb); int r;
r = __ceph_caps_issued_mask(ci, mask, touch); if (r)
ceph_update_cap_hit(&fsc->mdsc->metric); else
ceph_update_cap_mis(&fsc->mdsc->metric); return r;
}
/* * Return true if mask caps are currently being revoked by an MDS.
*/ int __ceph_caps_revoking_other(struct ceph_inode_info *ci, struct ceph_cap *ocap, int mask)
{ struct ceph_cap *cap; struct rb_node *p;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (cap != ocap &&
(cap->implemented & ~cap->issued & mask)) return 1;
} return 0;
}
int __ceph_caps_used(struct ceph_inode_info *ci)
{ int used = 0; if (ci->i_pin_ref)
used |= CEPH_CAP_PIN; if (ci->i_rd_ref)
used |= CEPH_CAP_FILE_RD; if (ci->i_rdcache_ref ||
(S_ISREG(ci->netfs.inode.i_mode) &&
ci->netfs.inode.i_data.nrpages))
used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref)
used |= CEPH_CAP_FILE_WR; if (ci->i_wb_ref || ci->i_wrbuffer_ref)
used |= CEPH_CAP_FILE_BUFFER; if (ci->i_fx_ref)
used |= CEPH_CAP_FILE_EXCL; return used;
}
/* * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
*/ int __ceph_caps_wanted(struct ceph_inode_info *ci)
{ int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci); if (S_ISDIR(ci->netfs.inode.i_mode)) { /* we want EXCL if holding caps of dir ops */ if (w & CEPH_CAP_ANY_DIR_OPS)
w |= CEPH_CAP_FILE_EXCL;
} else { /* we want EXCL if dirty data */ if (w & CEPH_CAP_FILE_BUFFER)
w |= CEPH_CAP_FILE_EXCL;
} return w;
}
/* * Return caps we have registered with the MDS(s) as 'wanted'.
*/ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
{ struct ceph_cap *cap; struct rb_node *p; int mds_wanted = 0;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (check && !__cap_is_valid(cap)) continue; if (cap == ci->i_auth_cap)
mds_wanted |= cap->mds_wanted; else
mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
} return mds_wanted;
}
int ceph_is_any_caps(struct inode *inode)
{ struct ceph_inode_info *ci = ceph_inode(inode); int ret;
spin_lock(&ci->i_ceph_lock);
ret = __ceph_is_any_real_caps(ci);
spin_unlock(&ci->i_ceph_lock);
return ret;
}
/* * Remove a cap. Take steps to deal with a racing iterate_session_caps. * * caller should hold i_ceph_lock. * caller will not hold session s_mutex if called from destroy_inode.
*/ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
{ struct ceph_mds_session *session = cap->session; struct ceph_client *cl = session->s_mdsc->fsc->client; struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->netfs.inode; struct ceph_mds_client *mdsc; int removed = 0;
/* 'ci' being NULL means the remove have already occurred */ if (!ci) {
doutc(cl, "inode is NULL\n"); return;
}
lockdep_assert_held(&ci->i_ceph_lock);
doutc(cl, "%p from %p %llx.%llx\n", cap, inode, ceph_vinop(inode));
/* remove from inode's cap rbtree, and clear auth cap */
rb_erase(&cap->ci_node, &ci->i_caps); if (ci->i_auth_cap == cap)
ci->i_auth_cap = NULL;
/* remove from session list */
spin_lock(&session->s_cap_lock); if (session->s_cap_iterator == cap) { /* not yet, we are iterating over this very cap */
doutc(cl, "delaying %p removal from session %p\n", cap,
cap->session);
} else {
list_del_init(&cap->session_caps);
session->s_nr_caps--;
atomic64_dec(&mdsc->metric.total_caps);
cap->session = NULL;
removed = 1;
} /* protect backpointer with s_cap_lock: see iterate_session_caps */
cap->ci = NULL;
/* * s_cap_reconnect is protected by s_cap_lock. no one changes * s_cap_gen while session is in the reconnect state.
*/ if (queue_release &&
(!session->s_cap_reconnect ||
cap->cap_gen == atomic_read(&session->s_cap_gen))) {
cap->queue_release = 1; if (removed) {
__ceph_queue_cap_release(session, cap);
removed = 0;
}
} else {
cap->queue_release = 0;
}
cap->cap_ino = ci->i_vino.ino;
spin_unlock(&session->s_cap_lock);
if (removed)
ceph_put_cap(mdsc, cap);
if (!__ceph_is_any_real_caps(ci)) { /* when reconnect denied, we remove session caps forcibly, * i_wr_ref can be non-zero. If there are ongoing write, * keep i_snap_realm.
*/ if (ci->i_wr_ref == 0 && ci->i_snap_realm)
ceph_change_snap_realm(&ci->netfs.inode, NULL);
p = fc + 1; /* flock buffer size (version 2) */
ceph_encode_32(&p, 0); /* inline version (version 4) */
ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */
ceph_encode_32(&p, 0); /* * osd_epoch_barrier (version 5) * The epoch_barrier is protected osdc->lock, so READ_ONCE here in * case it was recently changed
*/
ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier)); /* oldest_flush_tid (version 6) */
ceph_encode_64(&p, arg->oldest_flush_tid);
/* * caller_uid/caller_gid (version 7) * * Currently, we don't properly track which caller dirtied the caps * last, and force a flush of them when there is a conflict. For now, * just set this to 0:0, to emulate how the MDS has worked up to now.
*/
ceph_encode_32(&p, 0);
ceph_encode_32(&p, 0);
/* dirstats (version 11) - these are r/o on the client */
ceph_encode_64(&p, 0);
ceph_encode_64(&p, 0);
#if IS_ENABLED(CONFIG_FS_ENCRYPTION) /* * fscrypt_auth and fscrypt_file (version 12) * * fscrypt_auth holds the crypto context (if any). fscrypt_file * tracks the real i_size as an __le64 field (and we use a rounded-up * i_size in the traditional size field).
*/
ceph_encode_32(&p, arg->fscrypt_auth_len);
ceph_encode_copy(&p, arg->fscrypt_auth, arg->fscrypt_auth_len);
ceph_encode_32(&p, sizeof(__le64));
ceph_encode_64(&p, arg->size); #else/* CONFIG_FS_ENCRYPTION */
ceph_encode_32(&p, 0);
ceph_encode_32(&p, 0); #endif/* CONFIG_FS_ENCRYPTION */
}
/* * Queue cap releases when an inode is dropped from our cache.
*/ void __ceph_remove_caps(struct ceph_inode_info *ci)
{ struct inode *inode = &ci->netfs.inode; struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc; struct rb_node *p;
/* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
* may call __ceph_caps_issued_mask() on a freeing inode. */
spin_lock(&ci->i_ceph_lock);
p = rb_first(&ci->i_caps); while (p) { struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
p = rb_next(p);
ceph_remove_cap(mdsc, cap, true);
}
spin_unlock(&ci->i_ceph_lock);
}
/* * Prepare to send a cap message to an MDS. Update the cap state, and populate * the arg struct with the parameters that will need to be sent. This should * be done under the i_ceph_lock to guard against changes to cap state. * * Make note of max_size reported/requested from mds, revoked caps * that have now been implemented.
*/ staticvoid __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap, int op, int flags, int used, int want, int retain, int flushing, u64 flush_tid, u64 oldest_flush_tid)
{ struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); int held, revoking;
cap->issued &= retain; /* drop bits we don't want */ /* * Wake up any waiters on wanted -> needed transition. This is due to * the weird transition from buffered to sync IO... we need to flush * dirty pages _before_ allowing sync writes to avoid reordering.
*/
arg->wake = cap->implemented & ~cap->issued;
cap->implemented &= cap->issued | used;
cap->mds_wanted = want;
/* * When a snapshot is taken, clients accumulate dirty metadata on * inodes with capabilities in ceph_cap_snaps to describe the file * state at the time the snapshot was taken. This must be flushed * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * * Called under i_ceph_lock.
*/ staticvoid __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_session *session)
__releases(ci->i_ceph_lock)
__acquires(ci->i_ceph_lock)
{ struct inode *inode = &ci->netfs.inode; struct ceph_mds_client *mdsc = session->s_mdsc; struct ceph_client *cl = mdsc->fsc->client; struct ceph_cap_snap *capsnap;
u64 oldest_flush_tid = 0;
u64 first_tid = 1, last_tid = 0;
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { /* * we need to wait for sync writes to complete and for dirty * pages to be written out.
*/ if (capsnap->dirty_pages || capsnap->writing) break;
/* should be removed by ceph_try_drop_cap_snap() */
BUG_ON(!capsnap->need_flush);
/* only flush each capsnap once */ if (capsnap->cap_flush.tid > 0) {
doutc(cl, "already flushed %p, skipping\n", capsnap); continue;
}
spin_lock(&mdsc->cap_dirty_lock);
capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
list_add_tail(&capsnap->cap_flush.g_list,
&mdsc->cap_flush_list); if (oldest_flush_tid == 0)
oldest_flush_tid = __get_oldest_flush_tid(mdsc); if (list_empty(&ci->i_flushing_item)) {
list_add_tail(&ci->i_flushing_item,
&session->s_cap_flushing);
}
spin_unlock(&mdsc->cap_dirty_lock);
if (psession)
*psession = session; else
ceph_put_mds_session(session); /* we flushed them all; remove this inode from the queue */
spin_lock(&mdsc->snap_flush_lock); if (!list_empty(&ci->i_snap_flush_item))
need_put = true;
list_del_init(&ci->i_snap_flush_item);
spin_unlock(&mdsc->snap_flush_lock);
if (need_put)
iput(inode);
}
/* * Mark caps dirty. If inode is newly dirty, return the dirty flags. * Caller is then responsible for calling __mark_inode_dirty with the * returned flags value.
*/ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, struct ceph_cap_flush **pcf)
{ struct ceph_mds_client *mdsc =
ceph_sb_to_fs_client(ci->netfs.inode.i_sb)->mdsc; struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); int was = ci->i_dirty_caps; int dirty = 0;
lockdep_assert_held(&ci->i_ceph_lock);
if (!ci->i_auth_cap) {
pr_warn_client(cl, "%p %llx.%llx mask %s, " "but no auth cap (session was closed?)\n",
inode, ceph_vinop(inode),
ceph_cap_string(mask)); return 0;
}
/* * Remove cap_flush from the mdsc's or inode's flushing cap list. * Return true if caller needs to wake up flush waiters.
*/ staticbool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc, struct ceph_cap_flush *cf)
{ struct ceph_cap_flush *prev; bool wake = cf->wake;
/* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. * * Called under i_ceph_lock. Returns the flush tid.
*/ static u64 __mark_caps_flushing(struct inode *inode, struct ceph_mds_session *session, bool wake,
u64 *oldest_flush_tid)
{ struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap_flush *cf = NULL; int flushing;
if (inode->i_data.nrpages == 0 &&
invalidating_gen == ci->i_rdcache_gen) { /* success. */
doutc(cl, "%p %llx.%llx success\n", inode,
ceph_vinop(inode)); /* save any racing async invalidate some trouble */
ci->i_rdcache_revoking = ci->i_rdcache_gen - 1; return 0;
}
doutc(cl, "%p %llx.%llx failed\n", inode, ceph_vinop(inode)); return -1;
}
bool __ceph_should_report_size(struct ceph_inode_info *ci)
{
loff_t size = i_size_read(&ci->netfs.inode); /* mds will adjust max size according to the reported size */ if (ci->i_flushing_caps & CEPH_CAP_FILE_WR) returnfalse; if (size >= ci->i_max_size) returntrue; /* half of previous max_size increment has been used */ if (ci->i_max_size > ci->i_reported_size &&
(size << 1) >= ci->i_max_size + ci->i_reported_size) returntrue; returnfalse;
}
/* * Swiss army knife function to examine currently used and wanted * versus held caps. Release, flush, ack revoked caps to mds as * appropriate. * * CHECK_CAPS_AUTHONLY - we should only check the auth cap * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without * further delay. * CHECK_CAPS_FLUSH_FORCE - we should flush any caps immediately, without * further delay.
*/ void ceph_check_caps(struct ceph_inode_info *ci, int flags)
{ struct inode *inode = &ci->netfs.inode; struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_cap *cap;
u64 flush_tid, oldest_flush_tid; int file_wanted, used, cap_used; int issued, implemented, want, retain, revoking, flushing = 0; int mds = -1; /* keep track of how far we've gone through i_caps list
to avoid an infinite loop on retry */ struct rb_node *p; bool queue_invalidate = false; bool tried_invalidate = false; bool queue_writeback = false; struct ceph_mds_session *session = NULL;
spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
ci->i_ceph_flags |= CEPH_I_ASYNC_CHECK_CAPS;
/* Don't send messages until we get async create reply */
spin_unlock(&ci->i_ceph_lock); return;
}
if (ci->i_ceph_flags & CEPH_I_FLUSH)
flags |= CHECK_CAPS_FLUSH;
retry: /* Caps wanted by virtue of active open files. */
file_wanted = __ceph_caps_file_wanted(ci);
/* Caps which have active references against them */
used = __ceph_caps_used(ci);
/* * "issued" represents the current caps that the MDS wants us to have. * "implemented" is the set that we have been granted, and includes the * ones that have not yet been returned to the MDS (the "revoking" set, * usually because they have outstanding references).
*/
issued = __ceph_caps_issued(ci, &implemented);
revoking = implemented & ~issued;
want = file_wanted;
/* The ones we currently want to retain (may be adjusted below) */
retain = file_wanted | used | CEPH_CAP_PIN; if (!mdsc->stopping && inode->i_nlink > 0) { if (file_wanted) {
retain |= CEPH_CAP_ANY; /* be greedy */
} elseif (S_ISDIR(inode->i_mode) &&
(issued & CEPH_CAP_FILE_SHARED) &&
__ceph_dir_is_complete(ci)) { /* * If a directory is complete, we want to keep * the exclusive cap. So that MDS does not end up * revoking the shared cap on every create/unlink * operation.
*/ if (IS_RDONLY(inode)) {
want = CEPH_CAP_ANY_SHARED;
} else {
want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
}
retain |= want;
} else {
retain |= CEPH_CAP_ANY_SHARED; /* * keep RD only if we didn't have the file open RW, * because then the mds would revoke it anyway to * journal max_size=0.
*/ if (ci->i_max_size == 0)
retain |= CEPH_CAP_ANY_RD;
}
}
/* * If we no longer need to hold onto old our caps, and we may * have cached pages, but don't want them, then try to invalidate. * If we fail, it's because pages are locked.... try again later.
*/ if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) &&
S_ISREG(inode->i_mode) &&
!(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
inode->i_data.nrpages && /* have cached pages */
(revoking & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
!tried_invalidate) {
doutc(cl, "trying to invalidate on %p %llx.%llx\n",
inode, ceph_vinop(inode)); if (try_nonblocking_invalidate(inode) < 0) {
doutc(cl, "queuing invalidate\n");
queue_invalidate = true;
ci->i_rdcache_revoking = ci->i_rdcache_gen;
}
tried_invalidate = true; goto retry;
}
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { int mflags = 0; struct cap_msg_args arg;
cap = rb_entry(p, struct ceph_cap, ci_node);
/* avoid looping forever */ if (mds >= cap->mds ||
((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap)) continue;
/* * If we have an auth cap, we don't need to consider any * overlapping caps as used.
*/
cap_used = used; if (ci->i_auth_cap && cap != ci->i_auth_cap)
cap_used &= ~ci->i_auth_cap->issued;
/* completed revocation? going down and there are no caps? */ if (revoking) { if ((revoking & cap_used) == 0) {
doutc(cl, "completed revocation of %s\n",
ceph_cap_string(cap->implemented & ~cap->issued)); goto ack;
}
/* * If the "i_wrbuffer_ref" was increased by mmap or generic * cache write just before the ceph_check_caps() is called, * the Fb capability revoking will fail this time. Then we * must wait for the BDI's delayed work to flush the dirty * pages and to release the "i_wrbuffer_ref", which will cost * at most 5 seconds. That means the MDS needs to wait at * most 5 seconds to finished the Fb capability's revocation. * * Let's queue a writeback for it.
*/ if (S_ISREG(inode->i_mode) && ci->i_wrbuffer_ref &&
(revoking & CEPH_CAP_FILE_BUFFER))
queue_writeback = true;
}
if (flags & CHECK_CAPS_FLUSH_FORCE) {
doutc(cl, "force to flush caps\n"); goto ack;
}
if (cap == ci->i_auth_cap &&
(cap->issued & CEPH_CAP_FILE_WR)) { /* request larger max_size from MDS? */ if (ci->i_wanted_max_size > ci->i_max_size &&
ci->i_wanted_max_size > ci->i_requested_max_size) {
doutc(cl, "requesting new max_size\n"); goto ack;
}
/* want more caps from mds? */ if (want & ~cap->mds_wanted) { if (want & ~(cap->mds_wanted | cap->issued)) goto ack; if (!__cap_is_valid(cap)) goto ack;
}
/* things we might delay */ if ((cap->issued & ~retain) == 0) continue; /* nope, all good */
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.