/* * Capability management * * The Ceph metadata servers control client access to inode metadata * and file data by issuing capabilities, granting clients permission * to read and/or write both inode field and file data to OSDs * (storage nodes). Each capability consists of a set of bits * indicating which operations are allowed. * * If the client holds a *_SHARED cap, the client has a coherent value * that can be safely read from the cached inode. * * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the * client is allowed to change inode attributes (e.g., file size, * mtime), note its dirty state in the ceph_cap, and asynchronously * flush that metadata change to the MDS. * * In the event of a conflicting operation (perhaps by another * client), the MDS will revoke the conflicting client capabilities. * * In order for a client to cache an inode, it must hold a capability * with at least one MDS server. When inodes are released, release * notifications are batched and periodically sent en masse to the MDS * cluster to release server state.
*/
/* * Called under mdsc->mutex.
*/ int ceph_reserve_caps(struct ceph_mds_client *mdsc, struct ceph_cap_reservation *ctx, int need)
{ struct ceph_client *cl = mdsc->fsc->client; int i, j; struct ceph_cap *cap; int have; int alloc = 0; int max_caps; int err = 0; bool trimmed = false; struct ceph_mds_session *s;
LIST_HEAD(newcaps);
doutc(cl, "ctx=%p need=%d\n", ctx, need);
/* first reserve any caps that are already allocated */
spin_lock(&mdsc->caps_list_lock); if (mdsc->caps_avail_count >= need)
have = need; else
have = mdsc->caps_avail_count;
mdsc->caps_avail_count -= have;
mdsc->caps_reserve_count += have;
BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
mdsc->caps_reserve_count +
mdsc->caps_avail_count);
spin_unlock(&mdsc->caps_list_lock);
for (i = have; i < need; ) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); if (cap) {
list_add(&cap->caps_item, &newcaps);
alloc++;
i++; continue;
}
if (!trimmed) { for (j = 0; j < mdsc->max_sessions; j++) {
s = __ceph_lookup_mds_session(mdsc, j); if (!s) continue;
mutex_unlock(&mdsc->mutex);
spin_lock(&mdsc->caps_list_lock); if (mdsc->caps_avail_count) { int more_have; if (mdsc->caps_avail_count >= need - i)
more_have = need - i; else
more_have = mdsc->caps_avail_count;
i += more_have;
have += more_have;
mdsc->caps_avail_count -= more_have;
mdsc->caps_reserve_count += more_have;
}
spin_unlock(&mdsc->caps_list_lock);
continue;
}
pr_warn_client(cl, "ctx=%p ENOMEM need=%d got=%d\n", ctx, need,
have + alloc);
err = -ENOMEM; break;
}
/* temporary, until we do something about cap import/export */ if (!ctx) {
cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); if (cap) {
spin_lock(&mdsc->caps_list_lock);
mdsc->caps_use_count++;
mdsc->caps_total_count++;
spin_unlock(&mdsc->caps_list_lock);
} else {
spin_lock(&mdsc->caps_list_lock); if (mdsc->caps_avail_count) {
BUG_ON(list_empty(&mdsc->caps_list));
mdsc->caps_avail_count--;
mdsc->caps_use_count++;
cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
list_del(&cap->caps_item);
void ceph_reservation_status(struct ceph_fs_client *fsc, int *total, int *avail, int *used, int *reserved, int *min)
{ struct ceph_mds_client *mdsc = fsc->mdsc;
spin_lock(&mdsc->caps_list_lock);
if (total)
*total = mdsc->caps_total_count; if (avail)
*avail = mdsc->caps_avail_count; if (used)
*used = mdsc->caps_use_count; if (reserved)
*reserved = mdsc->caps_reserve_count; if (min)
*min = mdsc->caps_min_count;
spin_unlock(&mdsc->caps_list_lock);
}
/* * Find ceph_cap for given mds, if any. * * Called with i_ceph_lock held.
*/ struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
{ struct ceph_cap *cap; struct rb_node *n = ci->i_caps.rb_node;
while (n) {
cap = rb_entry(n, struct ceph_cap, ci_node); if (mds < cap->mds)
n = n->rb_left; elseif (mds > cap->mds)
n = n->rb_right; else return cap;
} return NULL;
}
/* * (re)set cap hold timeouts, which control the delayed release * of unused caps back to the MDS. Should be called on cap use.
*/ staticvoid __cap_set_timeouts(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci)
{ struct inode *inode = &ci->netfs.inode; struct ceph_mount_options *opt = mdsc->fsc->mount_options;
/* * (Re)queue cap at the end of the delayed cap release list. * * If I_FLUSH is set, leave the inode at the front of the list. * * Caller holds i_ceph_lock * -> we take mdsc->cap_delay_lock
*/ staticvoid __cap_delay_requeue(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci)
{ struct inode *inode = &ci->netfs.inode;
doutc(mdsc->fsc->client, "%p %llx.%llx flags 0x%lx at %lu\n",
inode, ceph_vinop(inode), ci->i_ceph_flags,
ci->i_hold_caps_max); if (!mdsc->stopping) {
spin_lock(&mdsc->cap_delay_lock); if (!list_empty(&ci->i_cap_delay_list)) { if (ci->i_ceph_flags & CEPH_I_FLUSH) goto no_change;
list_del_init(&ci->i_cap_delay_list);
}
__cap_set_timeouts(mdsc, ci);
list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
no_change:
spin_unlock(&mdsc->cap_delay_lock);
}
}
/* * Queue an inode for immediate writeback. Mark inode with I_FLUSH, * indicating we should send a cap message to flush dirty metadata * asap, and move to the front of the delayed cap list.
*/ staticvoid __cap_delay_requeue_front(struct ceph_mds_client *mdsc, struct ceph_inode_info *ci)
{ struct inode *inode = &ci->netfs.inode;
/* * Each time we receive FILE_CACHE anew, we increment * i_rdcache_gen.
*/ if (S_ISREG(ci->netfs.inode.i_mode) &&
(issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
(had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
ci->i_rdcache_gen++;
}
/* * If FILE_SHARED is newly issued, mark dir not complete. We don't * know what happened to this directory while we didn't have the cap. * If FILE_SHARED is being revoked, also mark dir not complete. It * stops on-going cached readdir.
*/ if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) { if (issued & CEPH_CAP_FILE_SHARED)
atomic_inc(&ci->i_shared_gen); if (S_ISDIR(ci->netfs.inode.i_mode)) {
doutc(cl, " marking %p NOT complete\n", inode);
__ceph_dir_clear_complete(ci);
}
}
/** * change_auth_cap_ses - move inode to appropriate lists when auth caps change * @ci: inode to be moved * @session: new auth caps session
*/ void change_auth_cap_ses(struct ceph_inode_info *ci, struct ceph_mds_session *session)
{
lockdep_assert_held(&ci->i_ceph_lock);
if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item)) return;
spin_lock(&session->s_mdsc->cap_dirty_lock); if (!list_empty(&ci->i_dirty_item))
list_move(&ci->i_dirty_item, &session->s_cap_dirty); if (!list_empty(&ci->i_flushing_item))
list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
spin_unlock(&session->s_mdsc->cap_dirty_lock);
}
/* * Add a capability under the given MDS session. * * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock * * @fmode is the open file mode, if we are opening a file, otherwise * it is < 0. (This is so we can atomically add the cap and add an * open file reference to it.)
*/ void ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, u64 cap_id, unsigned issued, unsigned wanted, unsigned seq, unsigned mseq, u64 realmino, int flags, struct ceph_cap **new_cap)
{ struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; int mds = session->s_mds; int actual_wanted;
u32 gen;
/* add to session cap list */
cap->session = session;
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps, &session->s_caps);
session->s_nr_caps++;
atomic64_inc(&mdsc->metric.total_caps);
spin_unlock(&session->s_cap_lock);
} else {
spin_lock(&session->s_cap_lock);
list_move_tail(&cap->session_caps, &session->s_caps);
spin_unlock(&session->s_cap_lock);
if (cap->cap_gen < gen)
cap->issued = cap->implemented = CEPH_CAP_PIN;
/* * auth mds of the inode changed. we received the cap export * message, but still haven't received the cap import message. * handle_cap_export() updated the new auth MDS' cap. * * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing * a message that was send before the cap import message. So * don't remove caps.
*/ if (ceph_seq_cmp(seq, cap->seq) <= 0) {
WARN_ON(cap != ci->i_auth_cap);
WARN_ON(cap->cap_id != cap_id);
seq = cap->seq;
mseq = cap->mseq;
issued |= cap->issued;
flags |= CEPH_CAP_FLAG_AUTH;
}
}
/* * If we are issued caps we don't want, or the mds' wanted * value appears to be off, queue a check so we'll release * later and/or update the mds wanted value.
*/
actual_wanted = __ceph_caps_wanted(ci); if ((wanted & ~actual_wanted) ||
(issued & ~actual_wanted & CEPH_CAP_ANY_WR)) {
doutc(cl, "issued %s, mds wanted %s, actual %s, queueing\n",
ceph_cap_string(issued), ceph_cap_string(wanted),
ceph_cap_string(actual_wanted));
__cap_delay_requeue(mdsc, ci);
}
/* * Return true if cap has not timed out and belongs to the current * generation of the MDS session (i.e. has not gone 'stale' due to * us losing touch with the mds).
*/ staticint __cap_is_valid(struct ceph_cap *cap)
{ struct inode *inode = &cap->ci->netfs.inode; struct ceph_client *cl = cap->session->s_mdsc->fsc->client; unsignedlong ttl;
u32 gen;
gen = atomic_read(&cap->session->s_cap_gen);
ttl = cap->session->s_cap_ttl;
if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
doutc(cl, "%p %llx.%llx cap %p issued %s but STALE (gen %u vs %u)\n",
inode, ceph_vinop(inode), cap,
ceph_cap_string(cap->issued), cap->cap_gen, gen); return 0;
}
return 1;
}
/* * Return set of valid cap bits issued to us. Note that caps time * out, and may be invalidated in bulk if the client session times out * and session->s_cap_gen is bumped.
*/ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
{ struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); int have = ci->i_snap_caps; struct ceph_cap *cap; struct rb_node *p;
if (implemented)
*implemented = 0; for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (!__cap_is_valid(cap)) continue;
doutc(cl, "%p %llx.%llx cap %p issued %s\n", inode,
ceph_vinop(inode), cap, ceph_cap_string(cap->issued));
have |= cap->issued; if (implemented)
*implemented |= cap->implemented;
} /* * exclude caps issued by non-auth MDS, but are been revoking * by the auth MDS. The non-auth MDS should be revoking/exporting * these caps, but the message is delayed.
*/ if (ci->i_auth_cap) {
cap = ci->i_auth_cap;
have &= ~cap->implemented | cap->issued;
} return have;
}
/* * Get cap bits issued by caps other than @ocap
*/ int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
{ int have = ci->i_snap_caps; struct ceph_cap *cap; struct rb_node *p;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (cap == ocap) continue; if (!__cap_is_valid(cap)) continue;
have |= cap->issued;
} return have;
}
/* * Move a cap to the end of the LRU (oldest caps at list head, newest * at list tail).
*/ staticvoid __touch_cap(struct ceph_cap *cap)
{ struct inode *inode = &cap->ci->netfs.inode; struct ceph_mds_session *s = cap->session; struct ceph_client *cl = s->s_mdsc->fsc->client;
spin_lock(&s->s_cap_lock); if (!s->s_cap_iterator) {
doutc(cl, "%p %llx.%llx cap %p mds%d\n", inode,
ceph_vinop(inode), cap, s->s_mds);
list_move_tail(&cap->session_caps, &s->s_caps);
} else {
doutc(cl, "%p %llx.%llx cap %p mds%d NOP, iterating over caps\n",
inode, ceph_vinop(inode), cap, s->s_mds);
}
spin_unlock(&s->s_cap_lock);
}
/* * Check if we hold the given mask. If so, move the cap(s) to the * front of their respective LRUs. (This is the preferred way for * callers to check for caps they want.)
*/ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
{ struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_cap *cap; struct rb_node *p; int have = ci->i_snap_caps;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (!__cap_is_valid(cap)) continue; if ((cap->issued & mask) == mask) {
doutc(cl, "mask %p %llx.%llx cap %p issued %s (mask %s)\n",
inode, ceph_vinop(inode), cap,
ceph_cap_string(cap->issued),
ceph_cap_string(mask)); if (touch)
__touch_cap(cap); return 1;
}
/* does a combination of caps satisfy mask? */
have |= cap->issued; if ((have & mask) == mask) {
doutc(cl, "mask %p %llx.%llx combo issued %s (mask %s)\n",
inode, ceph_vinop(inode),
ceph_cap_string(cap->issued),
ceph_cap_string(mask)); if (touch) { struct rb_node *q;
/* touch this + preceding caps */
__touch_cap(cap); for (q = rb_first(&ci->i_caps); q != p;
q = rb_next(q)) {
cap = rb_entry(q, struct ceph_cap,
ci_node); if (!__cap_is_valid(cap)) continue; if (cap->issued & mask)
__touch_cap(cap);
}
} return 1;
}
}
return 0;
}
int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask, int touch)
{ struct ceph_fs_client *fsc = ceph_sb_to_fs_client(ci->netfs.inode.i_sb); int r;
r = __ceph_caps_issued_mask(ci, mask, touch); if (r)
ceph_update_cap_hit(&fsc->mdsc->metric); else
ceph_update_cap_mis(&fsc->mdsc->metric); return r;
}
/* * Return true if mask caps are currently being revoked by an MDS.
*/ int __ceph_caps_revoking_other(struct ceph_inode_info *ci, struct ceph_cap *ocap, int mask)
{ struct ceph_cap *cap; struct rb_node *p;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (cap != ocap &&
(cap->implemented & ~cap->issued & mask)) return 1;
} return 0;
}
int __ceph_caps_used(struct ceph_inode_info *ci)
{ int used = 0; if (ci->i_pin_ref)
used |= CEPH_CAP_PIN; if (ci->i_rd_ref)
used |= CEPH_CAP_FILE_RD; if (ci->i_rdcache_ref ||
(S_ISREG(ci->netfs.inode.i_mode) &&
ci->netfs.inode.i_data.nrpages))
used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref)
used |= CEPH_CAP_FILE_WR; if (ci->i_wb_ref || ci->i_wrbuffer_ref)
used |= CEPH_CAP_FILE_BUFFER; if (ci->i_fx_ref)
used |= CEPH_CAP_FILE_EXCL; return used;
}
/* * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
*/ int __ceph_caps_wanted(struct ceph_inode_info *ci)
{ int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci); if (S_ISDIR(ci->netfs.inode.i_mode)) { /* we want EXCL if holding caps of dir ops */ if (w & CEPH_CAP_ANY_DIR_OPS)
w |= CEPH_CAP_FILE_EXCL;
} else { /* we want EXCL if dirty data */ if (w & CEPH_CAP_FILE_BUFFER)
w |= CEPH_CAP_FILE_EXCL;
} return w;
}
/* * Return caps we have registered with the MDS(s) as 'wanted'.
*/ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
{ struct ceph_cap *cap; struct rb_node *p; int mds_wanted = 0;
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
cap = rb_entry(p, struct ceph_cap, ci_node); if (check && !__cap_is_valid(cap)) continue; if (cap == ci->i_auth_cap)
mds_wanted |= cap->mds_wanted; else
mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
} return mds_wanted;
}
int ceph_is_any_caps(struct inode *inode)
{ struct ceph_inode_info *ci = ceph_inode(inode); int ret;
spin_lock(&ci->i_ceph_lock);
ret = __ceph_is_any_real_caps(ci);
spin_unlock(&ci->i_ceph_lock);
return ret;
}
/* * Remove a cap. Take steps to deal with a racing iterate_session_caps. * * caller should hold i_ceph_lock. * caller will not hold session s_mutex if called from destroy_inode.
*/ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
{ struct ceph_mds_session *session = cap->session; struct ceph_client *cl = session->s_mdsc->fsc->client; struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->netfs.inode; struct ceph_mds_client *mdsc; int removed = 0;
/* 'ci' being NULL means the remove have already occurred */ if (!ci) {
doutc(cl, "inode is NULL\n"); return;
}
lockdep_assert_held(&ci->i_ceph_lock);
doutc(cl, "%p from %p %llx.%llx\n", cap, inode, ceph_vinop(inode));
/* remove from inode's cap rbtree, and clear auth cap */
rb_erase(&cap->ci_node, &ci->i_caps); if (ci->i_auth_cap == cap)
ci->i_auth_cap = NULL;
/* remove from session list */
spin_lock(&session->s_cap_lock); if (session->s_cap_iterator == cap) { /* not yet, we are iterating over this very cap */
doutc(cl, "delaying %p removal from session %p\n", cap,
cap->session);
} else {
list_del_init(&cap->session_caps);
session->s_nr_caps--;
atomic64_dec(&mdsc->metric.total_caps);
cap->session = NULL;
removed = 1;
} /* protect backpointer with s_cap_lock: see iterate_session_caps */
cap->ci = NULL;
/* * s_cap_reconnect is protected by s_cap_lock. no one changes * s_cap_gen while session is in the reconnect state.
*/ if (queue_release &&
(!session->s_cap_reconnect ||
cap->cap_gen == atomic_read(&session->s_cap_gen))) {
cap->queue_release = 1; if (removed) {
__ceph_queue_cap_release(session, cap);
removed = 0;
}
} else {
cap->queue_release = 0;
}
cap->cap_ino = ci->i_vino.ino;
spin_unlock(&session->s_cap_lock);
if (removed)
ceph_put_cap(mdsc, cap);
if (!__ceph_is_any_real_caps(ci)) { /* when reconnect denied, we remove session caps forcibly, * i_wr_ref can be non-zero. If there are ongoing write, * keep i_snap_realm.
*/ if (ci->i_wr_ref == 0 && ci->i_snap_realm)
ceph_change_snap_realm(&ci->netfs.inode, NULL);
p = fc + 1; /* flock buffer size (version 2) */
ceph_encode_32(&p, 0); /* inline version (version 4) */
ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */
ceph_encode_32(&p, 0); /* * osd_epoch_barrier (version 5) * The epoch_barrier is protected osdc->lock, so READ_ONCE here in * case it was recently changed
*/
ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier)); /* oldest_flush_tid (version 6) */
ceph_encode_64(&p, arg->oldest_flush_tid);
/* * caller_uid/caller_gid (version 7) * * Currently, we don't properly track which caller dirtied the caps * last, and force a flush of them when there is a conflict. For now, * just set this to 0:0, to emulate how the MDS has worked up to now.
*/
ceph_encode_32(&p, 0);
ceph_encode_32(&p, 0);
/* dirstats (version 11) - these are r/o on the client */
ceph_encode_64(&p, 0);
ceph_encode_64(&p, 0);
#if IS_ENABLED(CONFIG_FS_ENCRYPTION) /* * fscrypt_auth and fscrypt_file (version 12) * * fscrypt_auth holds the crypto context (if any). fscrypt_file * tracks the real i_size as an __le64 field (and we use a rounded-up * i_size in the traditional size field).
*/
ceph_encode_32(&p, arg->fscrypt_auth_len);
ceph_encode_copy(&p, arg->fscrypt_auth, arg->fscrypt_auth_len);
ceph_encode_32(&p, sizeof(__le64));
ceph_encode_64(&p, arg->size); #else/* CONFIG_FS_ENCRYPTION */
ceph_encode_32(&p, 0);
ceph_encode_32(&p, 0); #endif/* CONFIG_FS_ENCRYPTION */
}
/* * Queue cap releases when an inode is dropped from our cache.
*/ void __ceph_remove_caps(struct ceph_inode_info *ci)
{ struct inode *inode = &ci->netfs.inode; struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc; struct rb_node *p;
/* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
* may call __ceph_caps_issued_mask() on a freeing inode. */
spin_lock(&ci->i_ceph_lock);
p = rb_first(&ci->i_caps); while (p) { struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
p = rb_next(p);
ceph_remove_cap(mdsc, cap, true);
}
spin_unlock(&ci->i_ceph_lock);
}
/* * Prepare to send a cap message to an MDS. Update the cap state, and populate * the arg struct with the parameters that will need to be sent. This should * be done under the i_ceph_lock to guard against changes to cap state. * * Make note of max_size reported/requested from mds, revoked caps * that have now been implemented.
*/ staticvoid __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap, int op, int flags, int used, int want, int retain, int flushing, u64 flush_tid, u64 oldest_flush_tid)
{ struct ceph_inode_info *ci = cap->ci; struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); int held, revoking;
cap->issued &= retain; /* drop bits we don't want */ /* * Wake up any waiters on wanted -> needed transition. This is due to * the weird transition from buffered to sync IO... we need to flush * dirty pages _before_ allowing sync writes to avoid reordering.
*/
arg->wake = cap->implemented & ~cap->issued;
cap->implemented &= cap->issued | used;
cap->mds_wanted = want;
/* * When a snapshot is taken, clients accumulate dirty metadata on * inodes with capabilities in ceph_cap_snaps to describe the file * state at the time the snapshot was taken. This must be flushed * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * * Called under i_ceph_lock.
*/ staticvoid __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_session *session)
__releases(ci->i_ceph_lock)
__acquires(ci->i_ceph_lock)
{ struct inode *inode = &ci->netfs.inode; struct ceph_mds_client *mdsc = session->s_mdsc; struct ceph_client *cl = mdsc->fsc->client; struct ceph_cap_snap *capsnap;
u64 oldest_flush_tid = 0;
u64 first_tid = 1, last_tid = 0;
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { /* * we need to wait for sync writes to complete and for dirty * pages to be written out.
*/ if (capsnap->dirty_pages || capsnap->writing) break;
/* should be removed by ceph_try_drop_cap_snap() */
BUG_ON(!capsnap->need_flush);
/* only flush each capsnap once */ if (capsnap->cap_flush.tid > 0) {
doutc(cl, "already flushed %p, skipping\n", capsnap); continue;
}
spin_lock(&mdsc->cap_dirty_lock);
capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
list_add_tail(&capsnap->cap_flush.g_list,
&mdsc->cap_flush_list); if (oldest_flush_tid == 0)
oldest_flush_tid = __get_oldest_flush_tid(mdsc); if (list_empty(&ci->i_flushing_item)) {
list_add_tail(&ci->i_flushing_item,
&session->s_cap_flushing);
}
spin_unlock(&mdsc->cap_dirty_lock);
if (psession)
*psession = session; else
ceph_put_mds_session(session); /* we flushed them all; remove this inode from the queue */
spin_lock(&mdsc->snap_flush_lock); if (!list_empty(&ci->i_snap_flush_item))
need_put = true;
list_del_init(&ci->i_snap_flush_item);
spin_unlock(&mdsc->snap_flush_lock);
if (need_put)
iput(inode);
}
/* * Mark caps dirty. If inode is newly dirty, return the dirty flags. * Caller is then responsible for calling __mark_inode_dirty with the * returned flags value.
*/ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, struct ceph_cap_flush **pcf)
{ struct ceph_mds_client *mdsc =
ceph_sb_to_fs_client(ci->netfs.inode.i_sb)->mdsc; struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); int was = ci->i_dirty_caps; int dirty = 0;
lockdep_assert_held(&ci->i_ceph_lock);
if (!ci->i_auth_cap) {
pr_warn_client(cl, "%p %llx.%llx mask %s, " "but no auth cap (session was closed?)\n",
inode, ceph_vinop(inode),
ceph_cap_string(mask)); return 0;
}
/* * Remove cap_flush from the mdsc's or inode's flushing cap list. * Return true if caller needs to wake up flush waiters.
*/ staticbool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc, struct ceph_cap_flush *cf)
{ struct ceph_cap_flush *prev; bool wake = cf->wake;
/* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. * * Called under i_ceph_lock. Returns the flush tid.
*/ static u64 __mark_caps_flushing(struct inode *inode, struct ceph_mds_session *session, bool wake,
u64 *oldest_flush_tid)
{ struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap_flush *cf = NULL; int flushing;
if (inode->i_data.nrpages == 0 &&
invalidating_gen == ci->i_rdcache_gen) { /* success. */
doutc(cl, "%p %llx.%llx success\n", inode,
ceph_vinop(inode)); /* save any racing async invalidate some trouble */
ci->i_rdcache_revoking = ci->i_rdcache_gen - 1; return 0;
}
doutc(cl, "%p %llx.%llx failed\n", inode, ceph_vinop(inode)); return -1;
}
bool __ceph_should_report_size(struct ceph_inode_info *ci)
{
loff_t size = i_size_read(&ci->netfs.inode); /* mds will adjust max size according to the reported size */ if (ci->i_flushing_caps & CEPH_CAP_FILE_WR) returnfalse; if (size >= ci->i_max_size) returntrue; /* half of previous max_size increment has been used */ if (ci->i_max_size > ci->i_reported_size &&
(size << 1) >= ci->i_max_size + ci->i_reported_size) returntrue; returnfalse;
}
/* * Swiss army knife function to examine currently used and wanted * versus held caps. Release, flush, ack revoked caps to mds as * appropriate. * * CHECK_CAPS_AUTHONLY - we should only check the auth cap * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without * further delay. * CHECK_CAPS_FLUSH_FORCE - we should flush any caps immediately, without * further delay.
*/ void ceph_check_caps(struct ceph_inode_info *ci, int flags)
{ struct inode *inode = &ci->netfs.inode; struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_cap *cap;
u64 flush_tid, oldest_flush_tid; int file_wanted, used, cap_used; int issued, implemented, want, retain, revoking, flushing = 0; int mds = -1; /* keep track of how far we've gone through i_caps list
to avoid an infinite loop on retry */ struct rb_node *p; bool queue_invalidate = false; bool tried_invalidate = false; bool queue_writeback = false; struct ceph_mds_session *session = NULL;
spin_lock(&ci->i_ceph_lock); if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
ci->i_ceph_flags |= CEPH_I_ASYNC_CHECK_CAPS;
/* Don't send messages until we get async create reply */
spin_unlock(&ci->i_ceph_lock); return;
}
if (ci->i_ceph_flags & CEPH_I_FLUSH)
flags |= CHECK_CAPS_FLUSH;
retry: /* Caps wanted by virtue of active open files. */
file_wanted = __ceph_caps_file_wanted(ci);
/* Caps which have active references against them */
used = __ceph_caps_used(ci);
/* * "issued" represents the current caps that the MDS wants us to have. * "implemented" is the set that we have been granted, and includes the * ones that have not yet been returned to the MDS (the "revoking" set, * usually because they have outstanding references).
*/
issued = __ceph_caps_issued(ci, &implemented);
revoking = implemented & ~issued;
want = file_wanted;
/* The ones we currently want to retain (may be adjusted below) */
retain = file_wanted | used | CEPH_CAP_PIN; if (!mdsc->stopping && inode->i_nlink > 0) { if (file_wanted) {
retain |= CEPH_CAP_ANY; /* be greedy */
} elseif (S_ISDIR(inode->i_mode) &&
(issued & CEPH_CAP_FILE_SHARED) &&
__ceph_dir_is_complete(ci)) { /* * If a directory is complete, we want to keep * the exclusive cap. So that MDS does not end up * revoking the shared cap on every create/unlink * operation.
*/ if (IS_RDONLY(inode)) {
want = CEPH_CAP_ANY_SHARED;
} else {
want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
}
retain |= want;
} else {
retain |= CEPH_CAP_ANY_SHARED; /* * keep RD only if we didn't have the file open RW, * because then the mds would revoke it anyway to * journal max_size=0.
*/ if (ci->i_max_size == 0)
retain |= CEPH_CAP_ANY_RD;
}
}
/* * If we no longer need to hold onto old our caps, and we may * have cached pages, but don't want them, then try to invalidate. * If we fail, it's because pages are locked.... try again later.
*/ if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) &&
S_ISREG(inode->i_mode) &&
!(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */
inode->i_data.nrpages && /* have cached pages */
(revoking & (CEPH_CAP_FILE_CACHE|
CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */
!tried_invalidate) {
doutc(cl, "trying to invalidate on %p %llx.%llx\n",
inode, ceph_vinop(inode)); if (try_nonblocking_invalidate(inode) < 0) {
doutc(cl, "queuing invalidate\n");
queue_invalidate = true;
ci->i_rdcache_revoking = ci->i_rdcache_gen;
}
tried_invalidate = true; goto retry;
}
for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { int mflags = 0; struct cap_msg_args arg;
cap = rb_entry(p, struct ceph_cap, ci_node);
/* avoid looping forever */ if (mds >= cap->mds ||
((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap)) continue;
/* * If we have an auth cap, we don't need to consider any * overlapping caps as used.
*/
cap_used = used; if (ci->i_auth_cap && cap != ci->i_auth_cap)
cap_used &= ~ci->i_auth_cap->issued;
/* completed revocation? going down and there are no caps? */ if (revoking) { if ((revoking & cap_used) == 0) {
doutc(cl, "completed revocation of %s\n",
ceph_cap_string(cap->implemented & ~cap->issued)); goto ack;
}
/* * If the "i_wrbuffer_ref" was increased by mmap or generic * cache write just before the ceph_check_caps() is called, * the Fb capability revoking will fail this time. Then we * must wait for the BDI's delayed work to flush the dirty * pages and to release the "i_wrbuffer_ref", which will cost * at most 5 seconds. That means the MDS needs to wait at * most 5 seconds to finished the Fb capability's revocation. * * Let's queue a writeback for it.
*/ if (S_ISREG(inode->i_mode) && ci->i_wrbuffer_ref &&
(revoking & CEPH_CAP_FILE_BUFFER))
queue_writeback = true;
}
if (flags & CHECK_CAPS_FLUSH_FORCE) {
doutc(cl, "force to flush caps\n"); goto ack;
}
if (cap == ci->i_auth_cap &&
(cap->issued & CEPH_CAP_FILE_WR)) { /* request larger max_size from MDS? */ if (ci->i_wanted_max_size > ci->i_max_size &&
ci->i_wanted_max_size > ci->i_requested_max_size) {
doutc(cl, "requesting new max_size\n"); goto ack;
}
/* want more caps from mds? */ if (want & ~cap->mds_wanted) { if (want & ~(cap->mds_wanted | cap->issued)) goto ack; if (!__cap_is_valid(cap)) goto ack;
}
/* things we might delay */ if ((cap->issued & ~retain) == 0) continue; /* nope, all good */
/* * Return true if we've flushed caps through the given flush_tid.
*/ staticint caps_are_flushed(struct inode *inode, u64 flush_tid)
{ struct ceph_inode_info *ci = ceph_inode(inode); int ret = 1;
spin_lock(&ci->i_ceph_lock); if (!list_empty(&ci->i_cap_flush_list)) { struct ceph_cap_flush * cf =
list_first_entry(&ci->i_cap_flush_list, struct ceph_cap_flush, i_list); if (cf->tid <= flush_tid)
ret = 0;
}
spin_unlock(&ci->i_ceph_lock); return ret;
}
/* * flush the mdlog and wait for any unsafe requests to complete.
*/ staticint flush_mdlog_and_wait_inode_unsafe_requests(struct inode *inode)
{ struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_request *req1 = NULL, *req2 = NULL; int ret, err = 0;
/* * Trigger to flush the journal logs in all the relevant MDSes * manually, or in the worst case we must wait at most 5 seconds * to wait the journal logs to be flushed by the MDSes periodically.
*/ if (req1 || req2) { struct ceph_mds_request *req; struct ceph_mds_session **sessions; struct ceph_mds_session *s; unsignedint max_sessions; int i;
spin_lock(&ci->i_unsafe_lock); if (req1) {
list_for_each_entry(req, &ci->i_unsafe_dirops,
r_unsafe_dir_item) {
s = req->r_session; if (!s) continue; if (!sessions[s->s_mds]) {
s = ceph_get_mds_session(s);
sessions[s->s_mds] = s;
}
}
} if (req2) {
list_for_each_entry(req, &ci->i_unsafe_iops,
r_unsafe_target_item) {
s = req->r_session; if (!s) continue; if (!sessions[s->s_mds]) {
s = ceph_get_mds_session(s);
sessions[s->s_mds] = s;
}
}
}
spin_unlock(&ci->i_unsafe_lock);
/* the auth MDS */
spin_lock(&ci->i_ceph_lock); if (ci->i_auth_cap) {
s = ci->i_auth_cap->session; if (!sessions[s->s_mds])
sessions[s->s_mds] = ceph_get_mds_session(s);
}
spin_unlock(&ci->i_ceph_lock);
mutex_unlock(&mdsc->mutex);
/* send flush mdlog request to MDSes */ for (i = 0; i < max_sessions; i++) {
s = sessions[i]; if (s) {
send_flush_mdlog(s);
ceph_put_mds_session(s);
}
}
kfree(sessions);
}
doutc(cl, "%p %llx.%llx wait on tid %llu %llu\n", inode,
ceph_vinop(inode), req1 ? req1->r_tid : 0ULL,
req2 ? req2->r_tid : 0ULL); if (req1) {
ret = !wait_for_completion_timeout(&req1->r_safe_completion,
ceph_timeout_jiffies(req1->r_timeout)); if (ret)
err = -EIO;
} if (req2) {
ret = !wait_for_completion_timeout(&req2->r_safe_completion,
ceph_timeout_jiffies(req2->r_timeout)); if (ret)
err = -EIO;
}
out: if (req1)
ceph_mdsc_put_request(req1); if (req2)
ceph_mdsc_put_request(req2); return err;
}
int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
{ struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_client *cl = ceph_inode_to_client(inode);
u64 flush_tid; int ret, err; int dirty;
/* * only wait on non-file metadata writeback (the mds * can recover size and mtime, so we don't need to * wait for that)
*/ if (!err && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
err = wait_event_interruptible(ci->i_cap_wq,
caps_are_flushed(inode, flush_tid));
}
/* * Flush any dirty caps back to the mds. If we aren't asked to wait, * queue inode for flush but don't do so immediately, because we can * get by with fewer MDS messages if we wait for data writeback to * complete first.
*/ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_client *cl = ceph_inode_to_client(inode);
u64 flush_tid; int err = 0; int dirty; int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync);
spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap; if (!(cap && cap->session == session)) {
pr_err_client(cl, "%p %llx.%llx auth cap %p not mds%d ???\n",
inode, ceph_vinop(inode), cap,
session->s_mds);
spin_unlock(&ci->i_ceph_lock); continue;
}
/* * if flushing caps were revoked, we re-send the cap flush * in client reconnect stage. This guarantees MDS * processes * the cap flush message before issuing the flushing caps to * other client.
*/ if ((cap->issued & ci->i_flushing_caps) !=
ci->i_flushing_caps) { /* encode_caps_cb() also will reset these sequence * numbers. make sure sequence numbers in cap flush
* message match later reconnect message */
cap->seq = 0;
cap->issue_seq = 0;
cap->mseq = 0;
__kick_flushing_caps(mdsc, session, ci,
oldest_flush_tid);
} else {
ci->i_ceph_flags |= CEPH_I_KICK_FLUSH;
}
/* * Take references to capabilities we hold, so that we don't release * them to the MDS prematurely.
*/ void ceph_take_cap_refs(struct ceph_inode_info *ci, int got, bool snap_rwsem_locked)
{ struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode);
lockdep_assert_held(&ci->i_ceph_lock);
if (got & CEPH_CAP_PIN)
ci->i_pin_ref++; if (got & CEPH_CAP_FILE_RD)
ci->i_rd_ref++; if (got & CEPH_CAP_FILE_CACHE)
ci->i_rdcache_ref++; if (got & CEPH_CAP_FILE_EXCL)
ci->i_fx_ref++; if (got & CEPH_CAP_FILE_WR) { if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
BUG_ON(!snap_rwsem_locked);
ci->i_head_snapc = ceph_get_snap_context(
ci->i_snap_realm->cached_context);
}
ci->i_wr_ref++;
} if (got & CEPH_CAP_FILE_BUFFER) { if (ci->i_wb_ref == 0)
ihold(inode);
ci->i_wb_ref++;
doutc(cl, "%p %llx.%llx wb %d -> %d (?)\n", inode,
ceph_vinop(inode), ci->i_wb_ref-1, ci->i_wb_ref);
}
}
/* * Try to grab cap references. Specify those refs we @want, and the * minimal set we @need. Also include the larger offset we are writing * to (when applicable), and check against max_size here as well. * Note that caller is responsible for ensuring max_size increases are * requested from the MDS. * * Returns 0 if caps were not able to be acquired (yet), 1 if succeed, * or a negative error code. There are 3 special error codes: * -EAGAIN: need to sleep but non-blocking is specified * -EFBIG: ask caller to call check_max_size() and try again. * -EUCLEAN: ask caller to call ceph_renew_caps() and try again.
*/ enum { /* first 8 bits are reserved for CEPH_FILE_MODE_FOO */
NON_BLOCKING = (1 << 8),
CHECK_FILELOCK = (1 << 9),
};
staticint try_get_cap_refs(struct inode *inode, int need, int want,
loff_t endoff, int flags, int *got)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc; struct ceph_client *cl = ceph_inode_to_client(inode); int ret = 0; int have, implemented; bool snap_rwsem_locked = false;
/* finish pending truncate */ while (ci->i_truncate_pending) {
spin_unlock(&ci->i_ceph_lock); if (snap_rwsem_locked) {
up_read(&mdsc->snap_rwsem);
snap_rwsem_locked = false;
}
__ceph_do_pending_vmtruncate(inode);
spin_lock(&ci->i_ceph_lock);
}
have = __ceph_caps_issued(ci, &implemented);
if (have & need & CEPH_CAP_FILE_WR) { if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
doutc(cl, "%p %llx.%llx endoff %llu > maxsize %llu\n",
inode, ceph_vinop(inode), endoff, ci->i_max_size); if (endoff > ci->i_requested_max_size)
ret = ci->i_auth_cap ? -EFBIG : -EUCLEAN; goto out_unlock;
} /* * If a sync write is in progress, we must wait, so that we * can get a final snapshot value for size+mtime.
*/ if (__ceph_have_pending_cap_snap(ci)) {
doutc(cl, "%p %llx.%llx cap_snap_pending\n", inode,
ceph_vinop(inode)); goto out_unlock;
}
}
if ((have & need) == need) { /* * Look at (implemented & ~have & not) so that we keep waiting * on transition from wanted -> needed caps. This is needed * for WRBUFFER|WR -> WR to avoid a new WR sync write from * going before a prior buffered writeback happens. * * For RDCACHE|RD -> RD, there is not need to wait and we can * just exclude the revoking caps and force to sync read.
*/ intnot = want & ~(have & need); int revoking = implemented & ~have; int exclude = revoking & not;
doutc(cl, "%p %llx.%llx have %s but not %s (revoking %s)\n",
inode, ceph_vinop(inode), ceph_cap_string(have),
ceph_cap_string(not), ceph_cap_string(revoking)); if (!exclude || !(exclude & CEPH_CAP_FILE_BUFFER)) { if (!snap_rwsem_locked &&
!ci->i_head_snapc &&
(need & CEPH_CAP_FILE_WR)) { if (!down_read_trylock(&mdsc->snap_rwsem)) { /* * we can not call down_read() when * task isn't in TASK_RUNNING state
*/ if (flags & NON_BLOCKING) {
ret = -EAGAIN; goto out_unlock;
}
spin_unlock(&ci->i_ceph_lock);
down_read(&mdsc->snap_rwsem);
snap_rwsem_locked = true; goto again;
}
snap_rwsem_locked = true;
} if ((have & want) == want)
*got = need | (want & ~exclude); else
*got = need;
ceph_take_cap_refs(ci, *got, true);
ret = 1;
}
} else { int session_readonly = false; int mds_wanted; if (ci->i_auth_cap &&
(need & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_EXCL))) { struct ceph_mds_session *s = ci->i_auth_cap->session;
spin_lock(&s->s_cap_lock);
session_readonly = s->s_readonly;
spin_unlock(&s->s_cap_lock);
} if (session_readonly) {
doutc(cl, "%p %llx.%llx need %s but mds%d readonly\n",
inode, ceph_vinop(inode), ceph_cap_string(need),
ci->i_auth_cap->mds);
ret = -EROFS; goto out_unlock;
}
if (ceph_inode_is_shutdown(inode)) {
doutc(cl, "%p %llx.%llx inode is shutdown\n",
inode, ceph_vinop(inode));
ret = -ESTALE; goto out_unlock;
}
mds_wanted = __ceph_caps_mds_wanted(ci, false); if (need & ~mds_wanted) {
doutc(cl, "%p %llx.%llx need %s > mds_wanted %s\n",
inode, ceph_vinop(inode), ceph_cap_string(need),
ceph_cap_string(mds_wanted));
ret = -EUCLEAN; goto out_unlock;
}
doutc(cl, "%p %llx.%llx have %s need %s\n", inode,
ceph_vinop(inode), ceph_cap_string(have),
ceph_cap_string(need));
}
out_unlock:
__ceph_touch_fmode(ci, mdsc, flags);
spin_unlock(&ci->i_ceph_lock); if (snap_rwsem_locked)
up_read(&mdsc->snap_rwsem);
if (!ret)
ceph_update_cap_mis(&mdsc->metric); elseif (ret == 1)
ceph_update_cap_hit(&mdsc->metric);
/* * Check the offset we are writing up to against our current * max_size. If necessary, tell the MDS we want to write to * a larger offset.
*/ staticvoid check_max_size(struct inode *inode, loff_t endoff)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_client *cl = ceph_inode_to_client(inode); int check = 0;
/* do we need to explicitly request a larger max_size? */
spin_lock(&ci->i_ceph_lock); if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) {
doutc(cl, "write %p %llx.%llx at large endoff %llu, req max_size\n",
inode, ceph_vinop(inode), endoff);
ci->i_wanted_max_size = endoff;
} /* duplicate ceph_check_caps()'s logic */ if (ci->i_auth_cap &&
(ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) &&
ci->i_wanted_max_size > ci->i_max_size &&
ci->i_wanted_max_size > ci->i_requested_max_size)
check = 1;
spin_unlock(&ci->i_ceph_lock); if (check)
ceph_check_caps(ci, CHECK_CAPS_AUTHONLY);
}
staticinlineint get_used_fmode(int caps)
{ int fmode = 0; if (caps & CEPH_CAP_FILE_RD)
fmode |= CEPH_FILE_MODE_RD; if (caps & CEPH_CAP_FILE_WR)
fmode |= CEPH_FILE_MODE_WR; return fmode;
}
int ceph_try_get_caps(struct inode *inode, int need, int want, bool nonblock, int *got)
{ int ret, flags;
BUG_ON(need & ~CEPH_CAP_FILE_RD);
BUG_ON(want & ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO |
CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
CEPH_CAP_ANY_DIR_OPS)); if (need) {
ret = ceph_pool_perm_check(inode, need); if (ret < 0) return ret;
}
flags = get_used_fmode(need | want); if (nonblock)
flags |= NON_BLOCKING;
ret = try_get_cap_refs(inode, need, want, 0, flags, got); /* three special error codes */ if (ret == -EAGAIN || ret == -EFBIG || ret == -EUCLEAN)
ret = 0; return ret;
}
/* * Wait for caps, and take cap references. If we can't get a WR cap * due to a small max_size, make sure we check_max_size (and possibly * ask the mds) so we don't get hung up indefinitely.
*/ int __ceph_get_caps(struct inode *inode, struct ceph_file_info *fi, int need, int want, loff_t endoff, int *got)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); int ret, _got, flags;
ret = ceph_pool_perm_check(inode, need); if (ret < 0) return ret;
put_page(page); if (uptodate) break;
} /* * drop cap refs first because getattr while * holding * caps refs can cause deadlock.
*/
ceph_put_cap_refs(ci, _got);
_got = 0;
/* * getattr request will bring inline data into * page cache
*/
ret = __ceph_do_getattr(inode, NULL,
CEPH_STAT_CAP_INLINE_DATA, true); if (ret < 0) return ret; continue;
} break;
}
*got = _got; return 0;
}
int ceph_get_caps(struct file *filp, int need, int want, loff_t endoff, int *got)
{ struct ceph_file_info *fi = filp->private_data; struct inode *inode = file_inode(filp);
return __ceph_get_caps(inode, fi, need, want, endoff, got);
}
/* * Take cap refs. Caller must already know we hold at least one ref * on the caps in question or we don't know this is safe.
*/ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
{
spin_lock(&ci->i_ceph_lock);
ceph_take_cap_refs(ci, caps, false);
spin_unlock(&ci->i_ceph_lock);
}
/* * drop cap_snap that is not associated with any snapshot. * we don't need to send FLUSHSNAP message for it.
*/ staticint ceph_try_drop_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap)
{ struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode);
/* * Release cap refs. * * If we released the last ref on any given cap, call ceph_check_caps * to release (or schedule a release). * * If we are releasing a WR cap (from a sync write), finalize any affected * cap_snap, and wake up any waiters.
*/ staticvoid __ceph_put_cap_refs(struct ceph_inode_info *ci, int had, enum put_cap_refs_mode mode)
{ struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); int last = 0, put = 0, flushsnaps = 0, wake = 0; bool check_flushsnaps = false;
spin_lock(&ci->i_ceph_lock); if (had & CEPH_CAP_PIN)
--ci->i_pin_ref; if (had & CEPH_CAP_FILE_RD) if (--ci->i_rd_ref == 0)
last++; if (had & CEPH_CAP_FILE_CACHE) if (--ci->i_rdcache_ref == 0)
last++; if (had & CEPH_CAP_FILE_EXCL) if (--ci->i_fx_ref == 0)
last++; if (had & CEPH_CAP_FILE_BUFFER) { if (--ci->i_wb_ref == 0) {
last++; /* put the ref held by ceph_take_cap_refs() */
put++;
check_flushsnaps = true;
}
doutc(cl, "%p %llx.%llx wb %d -> %d (?)\n", inode,
ceph_vinop(inode), ci->i_wb_ref+1, ci->i_wb_ref);
} if (had & CEPH_CAP_FILE_WR) { if (--ci->i_wr_ref == 0) { /* * The Fb caps will always be took and released * together with the Fw caps.
*/
WARN_ON_ONCE(ci->i_wb_ref);
last++;
check_flushsnaps = true; if (ci->i_wrbuffer_ref_head == 0 &&
ci->i_dirty_caps == 0 &&
ci->i_flushing_caps == 0) {
BUG_ON(!ci->i_head_snapc);
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;
} /* see comment in __ceph_remove_cap() */ if (!__ceph_is_any_real_caps(ci) && ci->i_snap_realm)
ceph_change_snap_realm(inode, NULL);
}
} if (check_flushsnaps && __ceph_have_pending_cap_snap(ci)) { struct ceph_cap_snap *capsnap =
list_last_entry(&ci->i_cap_snaps, struct ceph_cap_snap,
ci_item);
capsnap->writing = 0; if (ceph_try_drop_cap_snap(ci, capsnap)) /* put the ref held by ceph_queue_cap_snap() */
put++; elseif (__ceph_finish_cap_snap(ci, capsnap))
flushsnaps = 1;
wake = 1;
}
spin_unlock(&ci->i_ceph_lock);
doutc(cl, "%p %llx.%llx had %s%s%s\n", inode, ceph_vinop(inode),
ceph_cap_string(had), last ? " last" : "", put ? " put" : "");
switch (mode) { case PUT_CAP_REFS_SYNC: if (last)
ceph_check_caps(ci, 0); elseif (flushsnaps)
ceph_flush_snaps(ci, NULL); break; case PUT_CAP_REFS_ASYNC: if (last)
ceph_queue_check_caps(inode); elseif (flushsnaps)
ceph_queue_flush_snaps(inode); break; default: break;
} if (wake)
wake_up_all(&ci->i_cap_wq); while (put-- > 0)
iput(inode);
}
/* * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap * context. Adjust per-snap dirty page accounting as appropriate. * Once all dirty data for a cap_snap is flushed, flush snapped file * metadata back to the MDS. If we dropped the last ref, call * ceph_check_caps.
*/ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, struct ceph_snap_context *snapc)
{ struct inode *inode = &ci->netfs.inode; struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_cap_snap *capsnap = NULL, *iter; int put = 0; bool last = false; bool flush_snaps = false; bool complete_capsnap = false;
spin_lock(&ci->i_ceph_lock);
ci->i_wrbuffer_ref -= nr; if (ci->i_wrbuffer_ref == 0) {
last = true;
put++;
}
if (!capsnap) { /* * The capsnap should already be removed when removing * auth cap in the case of a forced unmount.
*/
WARN_ON_ONCE(ci->i_auth_cap); goto unlock;
}
if (last) {
ceph_check_caps(ci, 0);
} elseif (flush_snaps) {
ceph_flush_snaps(ci, NULL);
} if (complete_capsnap)
wake_up_all(&ci->i_cap_wq); while (put-- > 0) {
iput(inode);
}
}
/* * Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
*/ staticvoid invalidate_aliases(struct inode *inode)
{ struct ceph_client *cl = ceph_inode_to_client(inode); struct dentry *dn, *prev = NULL;
doutc(cl, "%p %llx.%llx\n", inode, ceph_vinop(inode));
d_prune_aliases(inode); /* * For non-directory inode, d_find_alias() only returns * hashed dentry. After calling d_invalidate(), the * dentry becomes unhashed. * * For directory inode, d_find_alias() can return * unhashed dentry. But directory inode should have * one alias at most.
*/ while ((dn = d_find_alias(inode))) { if (dn == prev) {
dput(dn); break;
}
d_invalidate(dn); if (prev)
dput(prev);
prev = dn;
} if (prev)
dput(prev);
}
/* * Handle a cap GRANT message from the MDS. (Note that a GRANT may * actually be a revocation if it specifies a smaller cap set.) * * caller holds s_mutex and i_ceph_lock, we drop both.
*/ staticvoid handle_cap_grant(struct inode *inode, struct ceph_mds_session *session, struct ceph_cap *cap, struct ceph_mds_caps *grant, struct ceph_buffer *xattr_buf, struct cap_extra_info *extra_info)
__releases(ci->i_ceph_lock)
__releases(session->s_mdsc->snap_rwsem)
{ struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); int seq = le32_to_cpu(grant->seq); int newcaps = le32_to_cpu(grant->caps); int used, wanted, dirty;
u64 size = le64_to_cpu(grant->size);
u64 max_size = le64_to_cpu(grant->max_size); unsignedchar check_caps = 0; bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen); bool wake = false; bool writeback = false; bool queue_trunc = false; bool queue_invalidate = false; bool deleted_inode = false; bool fill_inline = false; bool revoke_wait = false; int flags = 0;
/* * If there is at least one crypto block then we'll trust * fscrypt_file_size. If the real length of the file is 0, then * ignore it (it has probably been truncated down to 0 by the MDS).
*/ if (IS_ENCRYPTED(inode) && size)
size = extra_info->fscrypt_file_size;
/* * If CACHE is being revoked, and we have no dirty buffers, * try to invalidate (once). (If there are dirty buffers, we * will invalidate _after_ writeback.)
*/ if (S_ISREG(inode->i_mode) && /* don't invalidate readdir cache */
((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
!(ci->i_wrbuffer_ref || ci->i_wb_ref)) { if (try_nonblocking_invalidate(inode)) { /* there were locked pages.. invalidate later
in a separate thread. */ if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
queue_invalidate = true;
ci->i_rdcache_revoking = ci->i_rdcache_gen;
}
}
}
if (was_stale)
cap->issued = cap->implemented = CEPH_CAP_PIN;
/* * auth mds of the inode changed. we received the cap export message, * but still haven't received the cap import message. handle_cap_export * updated the new auth MDS' cap. * * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message * that was sent before the cap import message. So don't remove caps.
*/ if (ceph_seq_cmp(seq, cap->seq) <= 0) {
WARN_ON(cap != ci->i_auth_cap);
WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
seq = cap->seq;
newcaps |= cap->issued;
}
/* side effects now are allowed */
cap->cap_gen = atomic_read(&session->s_cap_gen);
cap->seq = seq;
if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
grant->xattr_len) { int len = le32_to_cpu(grant->xattr_len);
u64 version = le64_to_cpu(grant->xattr_version);
if (version > ci->i_xattrs.version) {
doutc(cl, " got new xattrs v%llu on %p %llx.%llx len %d\n",
version, inode, ceph_vinop(inode), len); if (ci->i_xattrs.blob)
ceph_buffer_put(ci->i_xattrs.blob);
ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
ci->i_xattrs.version = version;
ceph_forget_all_cached_acls(inode);
ceph_security_invalidate_secctx(inode);
}
}
if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) { if (max_size != ci->i_max_size) {
doutc(cl, "max_size %lld -> %llu\n", ci->i_max_size,
max_size);
ci->i_max_size = max_size; if (max_size >= ci->i_wanted_max_size) {
ci->i_wanted_max_size = 0; /* reset */
ci->i_requested_max_size = 0;
}
wake = true;
}
}
/* check cap bits */
wanted = __ceph_caps_wanted(ci);
used = __ceph_caps_used(ci);
dirty = __ceph_caps_dirty(ci);
doutc(cl, " my wanted = %s, used = %s, dirty %s\n",
ceph_cap_string(wanted), ceph_cap_string(used),
ceph_cap_string(dirty));
if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) &&
(wanted & ~(cap->mds_wanted | newcaps))) { /* * If mds is importing cap, prior cap messages that update * 'wanted' may get dropped by mds (migrate seq mismatch). * * We don't send cap message to update 'wanted' if what we * want are already issued. If mds revokes caps, cap message * that releases caps also tells mds what we want. But if * caps got revoked by mds forcedly (session stale). We may * haven't told mds what we want.
*/
check_caps = 1;
}
/* revocation, grant, or no-op? */ if (cap->issued & ~newcaps) { int revoking = cap->issued & ~newcaps;
doutc(cl, "revocation: %s -> %s (revoking %s)\n",
ceph_cap_string(cap->issued), ceph_cap_string(newcaps),
ceph_cap_string(revoking)); if (S_ISREG(inode->i_mode) &&
(revoking & used & CEPH_CAP_FILE_BUFFER)) {
writeback = true; /* initiate writeback; will delay ack */
revoke_wait = true;
} elseif (queue_invalidate &&
revoking == CEPH_CAP_FILE_CACHE &&
(newcaps & CEPH_CAP_FILE_LAZYIO) == 0) {
revoke_wait = true; /* do nothing yet, invalidation will be queued */
} elseif (cap == ci->i_auth_cap) {
check_caps = 1; /* check auth cap only */
} else {
check_caps = 2; /* check all caps */
} /* If there is new caps, try to wake up the waiters */ if (~cap->issued & newcaps)
wake = true;
cap->issued = newcaps;
cap->implemented |= newcaps;
} elseif (cap->issued == newcaps) {
doutc(cl, "caps unchanged: %s -> %s\n",
ceph_cap_string(cap->issued),
ceph_cap_string(newcaps));
} else {
doutc(cl, "grant: %s -> %s\n", ceph_cap_string(cap->issued),
ceph_cap_string(newcaps)); /* non-auth MDS is revoking the newly grant caps ? */ if (cap == ci->i_auth_cap &&
__ceph_caps_revoking_other(ci, cap, newcaps))
check_caps = 2;
cap->issued = newcaps;
cap->implemented |= newcaps; /* add bits only, to * avoid stepping on a
* pending revocation */
wake = true;
}
BUG_ON(cap->issued & ~cap->implemented);
/* don't let check_caps skip sending a response to MDS for revoke msgs */ if (!revoke_wait && le32_to_cpu(grant->op) == CEPH_CAP_OP_REVOKE) {
cap->mds_wanted = 0;
flags |= CHECK_CAPS_FLUSH_FORCE; if (cap == ci->i_auth_cap)
check_caps = 1; /* check auth cap only */ else
check_caps = 2; /* check all caps */
}
if (fill_inline)
ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
extra_info->inline_len);
if (queue_trunc)
ceph_queue_vmtruncate(inode);
if (writeback) /* * queue inode for writeback: we can't actually call * filemap_write_and_wait, etc. from message handler * context.
*/
ceph_queue_writeback(inode); if (queue_invalidate)
ceph_queue_invalidate(inode); if (deleted_inode)
invalidate_aliases(inode); if (wake)
wake_up_all(&ci->i_cap_wq);
/* * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the * MDS has been safely committed.
*/ staticvoid handle_cap_flush_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_caps *m, struct ceph_mds_session *session, struct ceph_cap *cap)
__releases(ci->i_ceph_lock)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc; struct ceph_client *cl = mdsc->fsc->client; struct ceph_cap_flush *cf, *tmp_cf;
LIST_HEAD(to_remove); unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; bool drop = false; bool wake_ci = false; bool wake_mdsc = false;
list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { /* Is this the one that was flushed? */ if (cf->tid == flush_tid)
cleaned = cf->caps;
/* Is this a capsnap? */ if (cf->is_capsnap) continue;
if (cf->tid <= flush_tid) { /* * An earlier or current tid. The FLUSH_ACK should * represent a superset of this flush's caps.
*/
wake_ci |= __detach_cap_flush_from_ci(ci, cf);
list_add_tail(&cf->i_list, &to_remove);
} else { /* * This is a later one. Any caps in it are still dirty * so don't count them as cleaned.
*/
cleaned &= ~cf->caps; if (!cleaned) break;
}
}
if (capsnap) {
ceph_put_snap_context(capsnap->context);
ceph_put_cap_snap(capsnap); if (wake_ci)
wake_up_all(&ci->i_cap_wq); if (wake_mdsc)
wake_up_all(&mdsc->cap_flushing_wq);
iput(inode);
}
}
/* * If there is at least one crypto block then we'll trust * fscrypt_file_size. If the real length of the file is 0, then * ignore it (it has probably been truncated down to 0 by the MDS).
*/ if (IS_ENCRYPTED(inode) && size)
size = extra_info->fscrypt_file_size;
/* * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a * different one. If we are the most recent migration we've seen (as * indicated by mseq), make note of the migrating cap bits for the * duration (until we see the corresponding IMPORT). * * caller holds s_mutex
*/ staticvoid handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, struct ceph_mds_cap_peer *ph, struct ceph_mds_session *session)
{ struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc; struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_session *tsession = NULL; struct ceph_cap *cap, *tcap, *new_cap = NULL; struct ceph_inode_info *ci = ceph_inode(inode);
u64 t_cap_id;
u32 t_issue_seq, t_mseq; int target, issued; int mds = session->s_mds;
if (!inode) {
doutc(cl, " i don't have ino %llx\n", vino.ino);
switch (op) { case CEPH_CAP_OP_IMPORT: case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_GRANT:
do_cap_release = true; break; default: break;
} goto flush_cap_releases;
}
ci = ceph_inode(inode);
/* these will work even if we don't have a cap yet */ switch (op) { case CEPH_CAP_OP_FLUSHSNAP_ACK:
handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid),
h, session); goto done;
case CEPH_CAP_OP_EXPORT:
handle_cap_export(inode, h, peer, session); goto done_unlocked;
/* the rest require a cap */
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds); if (!cap) {
doutc(cl, " no cap on %p ino %llx.%llx from mds%d\n",
inode, ceph_ino(inode), ceph_snap(inode),
session->s_mds);
spin_unlock(&ci->i_ceph_lock); switch (op) { case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_GRANT:
do_cap_release = true; break; default: break;
} goto flush_cap_releases;
}
/* note that each of these drops i_ceph_lock for us */ switch (op) { case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &extra_info.issued);
extra_info.issued |= __ceph_caps_dirty(ci);
handle_cap_grant(inode, session, cap,
h, msg->middle, &extra_info); goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK:
handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid),
h, session, cap); break;
case CEPH_CAP_OP_TRUNC:
queue_trunc = handle_cap_trunc(inode, h, session,
&extra_info);
spin_unlock(&ci->i_ceph_lock); if (queue_trunc)
ceph_queue_vmtruncate(inode); break;
default:
spin_unlock(&ci->i_ceph_lock);
pr_err_client(cl, "unknown cap op %d %s\n", op,
ceph_cap_op_name(op));
}
/* Defer closing the sessions after s_mutex lock being released */ if (close_sessions)
ceph_mdsc_close_sessions(mdsc);
kfree(extra_info.fscrypt_auth); return;
flush_cap_releases: /* * send any cap release message to try to move things * along for the mds (who clearly thinks we still have this * cap).
*/ if (do_cap_release) {
cap = ceph_get_cap(mdsc, NULL);
cap->cap_ino = vino.ino;
cap->queue_release = 1;
cap->cap_id = le64_to_cpu(h->cap_id);
cap->mseq = mseq;
cap->seq = seq;
cap->issue_seq = seq;
spin_lock(&session->s_cap_lock);
__ceph_queue_cap_release(session, cap);
spin_unlock(&session->s_cap_lock);
}
ceph_flush_session_cap_releases(mdsc, session); goto done;
/* * Delayed work handler to process end of delayed cap release LRU list. * * If new caps are added to the list while processing it, these won't get * processed in this run. In this case, the ci->i_hold_caps_max will be * returned so that the work can be scheduled accordingly.
*/ unsignedlong ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
{ struct ceph_client *cl = mdsc->fsc->client; struct inode *inode; struct ceph_inode_info *ci; struct ceph_mount_options *opt = mdsc->fsc->mount_options; unsignedlong delay_max = opt->caps_wanted_delay_max * HZ; unsignedlong loop_start = jiffies; unsignedlong delay = 0;
doutc(cl, "begin\n");
spin_lock(&mdsc->cap_delay_lock); while (!list_empty(&mdsc->cap_delay_list)) {
ci = list_first_entry(&mdsc->cap_delay_list, struct ceph_inode_info,
i_cap_delay_list); if (time_before(loop_start, ci->i_hold_caps_max - delay_max)) {
doutc(cl, "caps added recently. Exiting loop");
delay = ci->i_hold_caps_max; break;
} if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
time_before(jiffies, ci->i_hold_caps_max)) break;
list_del_init(&ci->i_cap_delay_list);
/* * Make sure too many dirty caps or general * slowness doesn't block mdsc delayed work, * preventing send_renew_caps() from running.
*/ if (time_after_eq(jiffies, loop_start + 5 * HZ)) break;
}
spin_unlock(&mdsc->cap_delay_lock);
doutc(cl, "done\n");
void __ceph_touch_fmode(struct ceph_inode_info *ci, struct ceph_mds_client *mdsc, int fmode)
{ unsignedlong now = jiffies; if (fmode & CEPH_FILE_MODE_RD)
ci->i_last_rd = now; if (fmode & CEPH_FILE_MODE_WR)
ci->i_last_wr = now; /* queue periodic check */ if (fmode &&
__ceph_is_any_real_caps(ci) &&
list_empty(&ci->i_cap_delay_list))
__cap_delay_requeue(mdsc, ci);
}
void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count)
{ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->netfs.inode.i_sb); int bits = (fmode << 1) | 1; bool already_opened = false; int i;
if (count == 1)
atomic64_inc(&mdsc->metric.opened_files);
spin_lock(&ci->i_ceph_lock); for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { /* * If any of the mode ref is larger than 0, * that means it has been already opened by * others. Just skip checking the PIN ref.
*/ if (i && ci->i_nr_by_mode[i])
already_opened = true;
if (bits & (1 << i))
ci->i_nr_by_mode[i] += count;
}
if (!already_opened)
percpu_counter_inc(&mdsc->metric.opened_inodes);
spin_unlock(&ci->i_ceph_lock);
}
/* * Drop open file reference. If we were the last open file, * we may need to release capabilities to the MDS (or schedule * their delayed release).
*/ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode, int count)
{ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(ci->netfs.inode.i_sb); int bits = (fmode << 1) | 1; bool is_closed = true; int i;
if (count == 1)
atomic64_dec(&mdsc->metric.opened_files);
spin_lock(&ci->i_ceph_lock); for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { if (bits & (1 << i)) {
BUG_ON(ci->i_nr_by_mode[i] < count);
ci->i_nr_by_mode[i] -= count;
}
/* * If any of the mode ref is not 0 after * decreased, that means it is still opened * by others. Just skip checking the PIN ref.
*/ if (i && ci->i_nr_by_mode[i])
is_closed = false;
}
if (is_closed)
percpu_counter_dec(&mdsc->metric.opened_inodes);
spin_unlock(&ci->i_ceph_lock);
}
/* * For a soon-to-be unlinked file, drop the LINK caps. If it * looks like the link count will hit 0, drop any other caps (other * than PIN) we don't specifically want (due to the file still being * open).
*/ int ceph_drop_caps_for_unlink(struct inode *inode)
{ struct ceph_inode_info *ci = ceph_inode(inode); int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
spin_lock(&ci->i_ceph_lock); if (inode->i_nlink == 1) {
drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
if (__ceph_caps_dirty(ci)) { struct ceph_mds_client *mdsc =
ceph_inode_to_fs_client(inode)->mdsc;
/* * Fire the work immediately, because the MDS maybe * waiting for caps release.
*/
ceph_queue_cap_unlink_work(mdsc);
}
}
spin_unlock(&ci->i_ceph_lock); return drop;
}
/* * Helpers for embedding cap and dentry lease releases into mds * requests. * * @force is used by dentry_release (below) to force inclusion of a * record for the directory inode, even when there aren't any caps to * drop.
*/ int ceph_encode_inode_release(void **p, struct inode *inode, int mds, int drop, int unless, int force)
{ struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_client *cl = ceph_inode_to_client(inode); struct ceph_cap *cap; struct ceph_mds_request_release *rel = *p; int used, dirty; int ret = 0;
spin_lock(&ci->i_ceph_lock);
used = __ceph_caps_used(ci);
dirty = __ceph_caps_dirty(ci);
/* only drop unused, clean caps */
drop &= ~(used | dirty);
cap = __get_cap_for_mds(ci, mds); if (cap && __cap_is_valid(cap)) {
unless &= cap->issued; if (unless) { if (unless & CEPH_CAP_AUTH_EXCL)
drop &= ~CEPH_CAP_AUTH_SHARED; if (unless & CEPH_CAP_LINK_EXCL)
drop &= ~CEPH_CAP_LINK_SHARED; if (unless & CEPH_CAP_XATTR_EXCL)
drop &= ~CEPH_CAP_XATTR_SHARED; if (unless & CEPH_CAP_FILE_EXCL)
drop &= ~CEPH_CAP_FILE_SHARED;
}
if (force || (cap->issued & drop)) { if (cap->issued & drop) { int wanted = __ceph_caps_wanted(ci);
doutc(cl, "%p %llx.%llx cap %p %s -> %s, " "wanted %s -> %s\n", inode,
ceph_vinop(inode), cap,
ceph_cap_string(cap->issued),
ceph_cap_string(cap->issued & ~drop),
ceph_cap_string(cap->mds_wanted),
ceph_cap_string(wanted));
/** * ceph_encode_dentry_release - encode a dentry release into an outgoing request * @p: outgoing request buffer * @dentry: dentry to release * @dir: dir to release it from * @mds: mds that we're speaking to * @drop: caps being dropped * @unless: unless we have these caps * * Encode a dentry release into an outgoing request buffer. Returns 1 if the * thing was released, or a negative error code otherwise.
*/ int ceph_encode_dentry_release(void **p, struct dentry *dentry, struct inode *dir, int mds, int drop, int unless)
{ struct ceph_mds_request_release *rel = *p; struct ceph_dentry_info *di = ceph_dentry(dentry); struct ceph_client *cl; int force = 0; int ret;
/* This shouldn't happen */
BUG_ON(!dir);
/* * force an record for the directory caps if we have a dentry lease. * this is racy (can't take i_ceph_lock and d_lock together), but it * doesn't have to be perfect; the mds will revoke anything we don't * release.
*/
spin_lock(&dentry->d_lock); if (di->lease_session && di->lease_session->s_mds == mds)
force = 1;
spin_unlock(&dentry->d_lock);
ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force);
if (ceph_inode_is_shutdown(inode)) { if (inode->i_data.nrpages > 0)
*invalidate = true; if (ci->i_wrbuffer_ref > 0)
mapping_set_error(&inode->i_data, -EIO);
}
spin_lock(&mdsc->cap_dirty_lock);
/* trash all of the cap flushes for this inode */ while (!list_empty(&ci->i_cap_flush_list)) {
cf = list_first_entry(&ci->i_cap_flush_list, struct ceph_cap_flush, i_list);
list_del_init(&cf->g_list);
list_del_init(&cf->i_list); if (!cf->is_capsnap)
ceph_free_cap_flush(cf);
}
if (!list_empty(&ci->i_dirty_item)) {
pr_warn_ratelimited_client(cl, " dropping dirty %s state for %p %llx.%llx\n",
ceph_cap_string(ci->i_dirty_caps),
inode, ceph_vinop(inode));
ci->i_dirty_caps = 0;
list_del_init(&ci->i_dirty_item);
dirty_dropped = true;
} if (!list_empty(&ci->i_flushing_item)) {
pr_warn_ratelimited_client(cl, " dropping dirty+flushing %s state for %p %llx.%llx\n",
ceph_cap_string(ci->i_flushing_caps),
inode, ceph_vinop(inode));
ci->i_flushing_caps = 0;
list_del_init(&ci->i_flushing_item);
mdsc->num_cap_flushing--;
dirty_dropped = true;
}
spin_unlock(&mdsc->cap_dirty_lock);
if (dirty_dropped) {
mapping_set_error(inode->i_mapping, -EIO);
if (atomic_read(&ci->i_filelock_ref) > 0) { /* make further file lock syscall return -EIO */
ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
pr_warn_ratelimited_client(cl, " dropping file locks for %p %llx.%llx\n",
inode, ceph_vinop(inode));
}
if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
cf = ci->i_prealloc_cap_flush;
ci->i_prealloc_cap_flush = NULL; if (!cf->is_capsnap)
ceph_free_cap_flush(cf);
}
if (!list_empty(&ci->i_cap_snaps))
iputs = remove_capsnaps(mdsc, inode);
} if (dirty_dropped)
++iputs; return iputs;
}
Messung V0.5 in Prozent
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.86Bemerkung:
(vorverarbeitet am 2026-04-25)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.