Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Linux/fs/ceph/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 138 kB image not shown  

Quelle  caps.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>

#include <linux/fs.h>
#include <linux/kernel.h>
#include <linux/sched/signal.h>
#include <linux/slab.h>
#include <linux/vmalloc.h>
#include <linux/wait.h>
#include <linux/writeback.h>
#include <linux/iversion.h>
#include <linux/filelock.h>
#include <linux/jiffies.h>

#include "super.h"
#include "mds_client.h"
#include "cache.h"
#include "crypto.h"
#include <linux/ceph/decode.h>
#include <linux/ceph/messenger.h>

/*
 * Capability management
 *
 * The Ceph metadata servers control client access to inode metadata
 * and file data by issuing capabilities, granting clients permission
 * to read and/or write both inode field and file data to OSDs
 * (storage nodes).  Each capability consists of a set of bits
 * indicating which operations are allowed.
 *
 * If the client holds a *_SHARED cap, the client has a coherent value
 * that can be safely read from the cached inode.
 *
 * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the
 * client is allowed to change inode attributes (e.g., file size,
 * mtime), note its dirty state in the ceph_cap, and asynchronously
 * flush that metadata change to the MDS.
 *
 * In the event of a conflicting operation (perhaps by another
 * client), the MDS will revoke the conflicting client capabilities.
 *
 * In order for a client to cache an inode, it must hold a capability
 * with at least one MDS server.  When inodes are released, release
 * notifications are batched and periodically sent en masse to the MDS
 * cluster to release server state.
 */


static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc);
static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
     struct ceph_mds_session *session,
     struct ceph_inode_info *ci,
     u64 oldest_flush_tid);

/*
 * Generate readable cap strings for debugging output.
 */

#define MAX_CAP_STR 20
static char cap_str[MAX_CAP_STR][40];
static DEFINE_SPINLOCK(cap_str_lock);
static int last_cap_str;

static char *gcap_string(char *s, int c)
{
 if (c & CEPH_CAP_GSHARED)
  *s++ = 's';
 if (c & CEPH_CAP_GEXCL)
  *s++ = 'x';
 if (c & CEPH_CAP_GCACHE)
  *s++ = 'c';
 if (c & CEPH_CAP_GRD)
  *s++ = 'r';
 if (c & CEPH_CAP_GWR)
  *s++ = 'w';
 if (c & CEPH_CAP_GBUFFER)
  *s++ = 'b';
 if (c & CEPH_CAP_GWREXTEND)
  *s++ = 'a';
 if (c & CEPH_CAP_GLAZYIO)
  *s++ = 'l';
 return s;
}

const char *ceph_cap_string(int caps)
{
 int i;
 char *s;
 int c;

 spin_lock(&cap_str_lock);
 i = last_cap_str++;
 if (last_cap_str == MAX_CAP_STR)
  last_cap_str = 0;
 spin_unlock(&cap_str_lock);

 s = cap_str[i];

 if (caps & CEPH_CAP_PIN)
  *s++ = 'p';

 c = (caps >> CEPH_CAP_SAUTH) & 3;
 if (c) {
  *s++ = 'A';
  s = gcap_string(s, c);
 }

 c = (caps >> CEPH_CAP_SLINK) & 3;
 if (c) {
  *s++ = 'L';
  s = gcap_string(s, c);
 }

 c = (caps >> CEPH_CAP_SXATTR) & 3;
 if (c) {
  *s++ = 'X';
  s = gcap_string(s, c);
 }

 c = caps >> CEPH_CAP_SFILE;
 if (c) {
  *s++ = 'F';
  s = gcap_string(s, c);
 }

 if (s == cap_str[i])
  *s++ = '-';
 *s = 0;
 return cap_str[i];
}

void ceph_caps_init(struct ceph_mds_client *mdsc)
{
 INIT_LIST_HEAD(&mdsc->caps_list);
 spin_lock_init(&mdsc->caps_list_lock);
}

void ceph_caps_finalize(struct ceph_mds_client *mdsc)
{
 struct ceph_cap *cap;

 spin_lock(&mdsc->caps_list_lock);
 while (!list_empty(&mdsc->caps_list)) {
  cap = list_first_entry(&mdsc->caps_list,
           struct ceph_cap, caps_item);
  list_del(&cap->caps_item);
  kmem_cache_free(ceph_cap_cachep, cap);
 }
 mdsc->caps_total_count = 0;
 mdsc->caps_avail_count = 0;
 mdsc->caps_use_count = 0;
 mdsc->caps_reserve_count = 0;
 mdsc->caps_min_count = 0;
 spin_unlock(&mdsc->caps_list_lock);
}

void ceph_adjust_caps_max_min(struct ceph_mds_client *mdsc,
         struct ceph_mount_options *fsopt)
{
 spin_lock(&mdsc->caps_list_lock);
 mdsc->caps_min_count = fsopt->max_readdir;
 if (mdsc->caps_min_count < 1024)
  mdsc->caps_min_count = 1024;
 mdsc->caps_use_max = fsopt->caps_max;
 if (mdsc->caps_use_max > 0 &&
     mdsc->caps_use_max < mdsc->caps_min_count)
  mdsc->caps_use_max = mdsc->caps_min_count;
 spin_unlock(&mdsc->caps_list_lock);
}

static void __ceph_unreserve_caps(struct ceph_mds_client *mdsc, int nr_caps)
{
 struct ceph_cap *cap;
 int i;

 if (nr_caps) {
  BUG_ON(mdsc->caps_reserve_count < nr_caps);
  mdsc->caps_reserve_count -= nr_caps;
  if (mdsc->caps_avail_count >=
      mdsc->caps_reserve_count + mdsc->caps_min_count) {
   mdsc->caps_total_count -= nr_caps;
   for (i = 0; i < nr_caps; i++) {
    cap = list_first_entry(&mdsc->caps_list,
     struct ceph_cap, caps_item);
    list_del(&cap->caps_item);
    kmem_cache_free(ceph_cap_cachep, cap);
   }
  } else {
   mdsc->caps_avail_count += nr_caps;
  }

  doutc(mdsc->fsc->client,
        "caps %d = %d used + %d resv + %d avail\n",
        mdsc->caps_total_count, mdsc->caps_use_count,
        mdsc->caps_reserve_count, mdsc->caps_avail_count);
  BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
       mdsc->caps_reserve_count +
       mdsc->caps_avail_count);
 }
}

/*
 * Called under mdsc->mutex.
 */

int ceph_reserve_caps(struct ceph_mds_client *mdsc,
        struct ceph_cap_reservation *ctx, int need)
{
 struct ceph_client *cl = mdsc->fsc->client;
 int i, j;
 struct ceph_cap *cap;
 int have;
 int alloc = 0;
 int max_caps;
 int err = 0;
 bool trimmed = false;
 struct ceph_mds_session *s;
 LIST_HEAD(newcaps);

 doutc(cl, "ctx=%p need=%d\n", ctx, need);

 /* first reserve any caps that are already allocated */
 spin_lock(&mdsc->caps_list_lock);
 if (mdsc->caps_avail_count >= need)
  have = need;
 else
  have = mdsc->caps_avail_count;
 mdsc->caps_avail_count -= have;
 mdsc->caps_reserve_count += have;
 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
      mdsc->caps_reserve_count +
      mdsc->caps_avail_count);
 spin_unlock(&mdsc->caps_list_lock);

 for (i = have; i < need; ) {
  cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
  if (cap) {
   list_add(&cap->caps_item, &newcaps);
   alloc++;
   i++;
   continue;
  }

  if (!trimmed) {
   for (j = 0; j < mdsc->max_sessions; j++) {
    s = __ceph_lookup_mds_session(mdsc, j);
    if (!s)
     continue;
    mutex_unlock(&mdsc->mutex);

    mutex_lock(&s->s_mutex);
    max_caps = s->s_nr_caps - (need - i);
    ceph_trim_caps(mdsc, s, max_caps);
    mutex_unlock(&s->s_mutex);

    ceph_put_mds_session(s);
    mutex_lock(&mdsc->mutex);
   }
   trimmed = true;

   spin_lock(&mdsc->caps_list_lock);
   if (mdsc->caps_avail_count) {
    int more_have;
    if (mdsc->caps_avail_count >= need - i)
     more_have = need - i;
    else
     more_have = mdsc->caps_avail_count;

    i += more_have;
    have += more_have;
    mdsc->caps_avail_count -= more_have;
    mdsc->caps_reserve_count += more_have;

   }
   spin_unlock(&mdsc->caps_list_lock);

   continue;
  }

  pr_warn_client(cl, "ctx=%p ENOMEM need=%d got=%d\n", ctx, need,
          have + alloc);
  err = -ENOMEM;
  break;
 }

 if (!err) {
  BUG_ON(have + alloc != need);
  ctx->count = need;
  ctx->used = 0;
 }

 spin_lock(&mdsc->caps_list_lock);
 mdsc->caps_total_count += alloc;
 mdsc->caps_reserve_count += alloc;
 list_splice(&newcaps, &mdsc->caps_list);

 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
      mdsc->caps_reserve_count +
      mdsc->caps_avail_count);

 if (err)
  __ceph_unreserve_caps(mdsc, have + alloc);

 spin_unlock(&mdsc->caps_list_lock);

 doutc(cl, "ctx=%p %d = %d used + %d resv + %d avail\n", ctx,
       mdsc->caps_total_count, mdsc->caps_use_count,
       mdsc->caps_reserve_count, mdsc->caps_avail_count);
 return err;
}

void ceph_unreserve_caps(struct ceph_mds_client *mdsc,
    struct ceph_cap_reservation *ctx)
{
 struct ceph_client *cl = mdsc->fsc->client;
 bool reclaim = false;
 if (!ctx->count)
  return;

 doutc(cl, "ctx=%p count=%d\n", ctx, ctx->count);
 spin_lock(&mdsc->caps_list_lock);
 __ceph_unreserve_caps(mdsc, ctx->count);
 ctx->count = 0;

 if (mdsc->caps_use_max > 0 &&
     mdsc->caps_use_count > mdsc->caps_use_max)
  reclaim = true;
 spin_unlock(&mdsc->caps_list_lock);

 if (reclaim)
  ceph_reclaim_caps_nr(mdsc, ctx->used);
}

struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
         struct ceph_cap_reservation *ctx)
{
 struct ceph_client *cl = mdsc->fsc->client;
 struct ceph_cap *cap = NULL;

 /* temporary, until we do something about cap import/export */
 if (!ctx) {
  cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
  if (cap) {
   spin_lock(&mdsc->caps_list_lock);
   mdsc->caps_use_count++;
   mdsc->caps_total_count++;
   spin_unlock(&mdsc->caps_list_lock);
  } else {
   spin_lock(&mdsc->caps_list_lock);
   if (mdsc->caps_avail_count) {
    BUG_ON(list_empty(&mdsc->caps_list));

    mdsc->caps_avail_count--;
    mdsc->caps_use_count++;
    cap = list_first_entry(&mdsc->caps_list,
      struct ceph_cap, caps_item);
    list_del(&cap->caps_item);

    BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
           mdsc->caps_reserve_count + mdsc->caps_avail_count);
   }
   spin_unlock(&mdsc->caps_list_lock);
  }

  return cap;
 }

 spin_lock(&mdsc->caps_list_lock);
 doutc(cl, "ctx=%p (%d) %d = %d used + %d resv + %d avail\n", ctx,
       ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
       mdsc->caps_reserve_count, mdsc->caps_avail_count);
 BUG_ON(!ctx->count);
 BUG_ON(ctx->count > mdsc->caps_reserve_count);
 BUG_ON(list_empty(&mdsc->caps_list));

 ctx->count--;
 ctx->used++;
 mdsc->caps_reserve_count--;
 mdsc->caps_use_count++;

 cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
 list_del(&cap->caps_item);

 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
        mdsc->caps_reserve_count + mdsc->caps_avail_count);
 spin_unlock(&mdsc->caps_list_lock);
 return cap;
}

void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
{
 struct ceph_client *cl = mdsc->fsc->client;

 spin_lock(&mdsc->caps_list_lock);
 doutc(cl, "%p %d = %d used + %d resv + %d avail\n", cap,
       mdsc->caps_total_count, mdsc->caps_use_count,
       mdsc->caps_reserve_count, mdsc->caps_avail_count);
 mdsc->caps_use_count--;
 /*
 * Keep some preallocated caps around (ceph_min_count), to
 * avoid lots of free/alloc churn.
 */

 if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
          mdsc->caps_min_count) {
  mdsc->caps_total_count--;
  kmem_cache_free(ceph_cap_cachep, cap);
 } else {
  mdsc->caps_avail_count++;
  list_add(&cap->caps_item, &mdsc->caps_list);
 }

 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
        mdsc->caps_reserve_count + mdsc->caps_avail_count);
 spin_unlock(&mdsc->caps_list_lock);
}

void ceph_reservation_status(struct ceph_fs_client *fsc,
        int *total, int *avail, int *used, int *reserved,
        int *min)
{
 struct ceph_mds_client *mdsc = fsc->mdsc;

 spin_lock(&mdsc->caps_list_lock);

 if (total)
  *total = mdsc->caps_total_count;
 if (avail)
  *avail = mdsc->caps_avail_count;
 if (used)
  *used = mdsc->caps_use_count;
 if (reserved)
  *reserved = mdsc->caps_reserve_count;
 if (min)
  *min = mdsc->caps_min_count;

 spin_unlock(&mdsc->caps_list_lock);
}

/*
 * Find ceph_cap for given mds, if any.
 *
 * Called with i_ceph_lock held.
 */

struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
{
 struct ceph_cap *cap;
 struct rb_node *n = ci->i_caps.rb_node;

 while (n) {
  cap = rb_entry(n, struct ceph_cap, ci_node);
  if (mds < cap->mds)
   n = n->rb_left;
  else if (mds > cap->mds)
   n = n->rb_right;
  else
   return cap;
 }
 return NULL;
}

struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
{
 struct ceph_cap *cap;

 spin_lock(&ci->i_ceph_lock);
 cap = __get_cap_for_mds(ci, mds);
 spin_unlock(&ci->i_ceph_lock);
 return cap;
}

/*
 * Called under i_ceph_lock.
 */

static void __insert_cap_node(struct ceph_inode_info *ci,
         struct ceph_cap *new)
{
 struct rb_node **p = &ci->i_caps.rb_node;
 struct rb_node *parent = NULL;
 struct ceph_cap *cap = NULL;

 while (*p) {
  parent = *p;
  cap = rb_entry(parent, struct ceph_cap, ci_node);
  if (new->mds < cap->mds)
   p = &(*p)->rb_left;
  else if (new->mds > cap->mds)
   p = &(*p)->rb_right;
  else
   BUG();
 }

 rb_link_node(&new->ci_node, parent, p);
 rb_insert_color(&new->ci_node, &ci->i_caps);
}

/*
 * (re)set cap hold timeouts, which control the delayed release
 * of unused caps back to the MDS.  Should be called on cap use.
 */

static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
          struct ceph_inode_info *ci)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_mount_options *opt = mdsc->fsc->mount_options;

 ci->i_hold_caps_max = round_jiffies(jiffies +
         opt->caps_wanted_delay_max * HZ);
 doutc(mdsc->fsc->client, "%p %llx.%llx %lu\n", inode,
       ceph_vinop(inode), ci->i_hold_caps_max - jiffies);
}

/*
 * (Re)queue cap at the end of the delayed cap release list.
 *
 * If I_FLUSH is set, leave the inode at the front of the list.
 *
 * Caller holds i_ceph_lock
 *    -> we take mdsc->cap_delay_lock
 */

static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
    struct ceph_inode_info *ci)
{
 struct inode *inode = &ci->netfs.inode;

 doutc(mdsc->fsc->client, "%p %llx.%llx flags 0x%lx at %lu\n",
       inode, ceph_vinop(inode), ci->i_ceph_flags,
       ci->i_hold_caps_max);
 if (!mdsc->stopping) {
  spin_lock(&mdsc->cap_delay_lock);
  if (!list_empty(&ci->i_cap_delay_list)) {
   if (ci->i_ceph_flags & CEPH_I_FLUSH)
    goto no_change;
   list_del_init(&ci->i_cap_delay_list);
  }
  __cap_set_timeouts(mdsc, ci);
  list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
no_change:
  spin_unlock(&mdsc->cap_delay_lock);
 }
}

/*
 * Queue an inode for immediate writeback.  Mark inode with I_FLUSH,
 * indicating we should send a cap message to flush dirty metadata
 * asap, and move to the front of the delayed cap list.
 */

static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc,
          struct ceph_inode_info *ci)
{
 struct inode *inode = &ci->netfs.inode;

 doutc(mdsc->fsc->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
 spin_lock(&mdsc->cap_delay_lock);
 ci->i_ceph_flags |= CEPH_I_FLUSH;
 if (!list_empty(&ci->i_cap_delay_list))
  list_del_init(&ci->i_cap_delay_list);
 list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
 spin_unlock(&mdsc->cap_delay_lock);
}

/*
 * Cancel delayed work on cap.
 *
 * Caller must hold i_ceph_lock.
 */

static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
          struct ceph_inode_info *ci)
{
 struct inode *inode = &ci->netfs.inode;

 doutc(mdsc->fsc->client, "%p %llx.%llx\n", inode, ceph_vinop(inode));
 if (list_empty(&ci->i_cap_delay_list))
  return;
 spin_lock(&mdsc->cap_delay_lock);
 list_del_init(&ci->i_cap_delay_list);
 spin_unlock(&mdsc->cap_delay_lock);
}

/* Common issue checks for add_cap, handle_cap_grant. */
static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
         unsigned issued)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_client *cl = ceph_inode_to_client(inode);

 unsigned had = __ceph_caps_issued(ci, NULL);

 lockdep_assert_held(&ci->i_ceph_lock);

 /*
 * Each time we receive FILE_CACHE anew, we increment
 * i_rdcache_gen.
 */

 if (S_ISREG(ci->netfs.inode.i_mode) &&
     (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
     (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
  ci->i_rdcache_gen++;
 }

 /*
 * If FILE_SHARED is newly issued, mark dir not complete. We don't
 * know what happened to this directory while we didn't have the cap.
 * If FILE_SHARED is being revoked, also mark dir not complete. It
 * stops on-going cached readdir.
 */

 if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
  if (issued & CEPH_CAP_FILE_SHARED)
   atomic_inc(&ci->i_shared_gen);
  if (S_ISDIR(ci->netfs.inode.i_mode)) {
   doutc(cl, " marking %p NOT complete\n", inode);
   __ceph_dir_clear_complete(ci);
  }
 }

 /* Wipe saved layout if we're losing DIR_CREATE caps */
 if (S_ISDIR(ci->netfs.inode.i_mode) && (had & CEPH_CAP_DIR_CREATE) &&
  !(issued & CEPH_CAP_DIR_CREATE)) {
      ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
      memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
 }
}

/**
 * change_auth_cap_ses - move inode to appropriate lists when auth caps change
 * @ci: inode to be moved
 * @session: new auth caps session
 */

void change_auth_cap_ses(struct ceph_inode_info *ci,
    struct ceph_mds_session *session)
{
 lockdep_assert_held(&ci->i_ceph_lock);

 if (list_empty(&ci->i_dirty_item) && list_empty(&ci->i_flushing_item))
  return;

 spin_lock(&session->s_mdsc->cap_dirty_lock);
 if (!list_empty(&ci->i_dirty_item))
  list_move(&ci->i_dirty_item, &session->s_cap_dirty);
 if (!list_empty(&ci->i_flushing_item))
  list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
 spin_unlock(&session->s_mdsc->cap_dirty_lock);
}

/*
 * Add a capability under the given MDS session.
 *
 * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock
 *
 * @fmode is the open file mode, if we are opening a file, otherwise
 * it is < 0.  (This is so we can atomically add the cap and add an
 * open file reference to it.)
 */

void ceph_add_cap(struct inode *inode,
    struct ceph_mds_session *session, u64 cap_id,
    unsigned issued, unsigned wanted,
    unsigned seq, unsigned mseq, u64 realmino, int flags,
    struct ceph_cap **new_cap)
{
 struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc;
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct ceph_cap *cap;
 int mds = session->s_mds;
 int actual_wanted;
 u32 gen;

 lockdep_assert_held(&ci->i_ceph_lock);

 doutc(cl, "%p %llx.%llx mds%d cap %llx %s seq %d\n", inode,
       ceph_vinop(inode), session->s_mds, cap_id,
       ceph_cap_string(issued), seq);

 gen = atomic_read(&session->s_cap_gen);

 cap = __get_cap_for_mds(ci, mds);
 if (!cap) {
  cap = *new_cap;
  *new_cap = NULL;

  cap->issued = 0;
  cap->implemented = 0;
  cap->mds = mds;
  cap->mds_wanted = 0;
  cap->mseq = 0;

  cap->ci = ci;
  __insert_cap_node(ci, cap);

  /* add to session cap list */
  cap->session = session;
  spin_lock(&session->s_cap_lock);
  list_add_tail(&cap->session_caps, &session->s_caps);
  session->s_nr_caps++;
  atomic64_inc(&mdsc->metric.total_caps);
  spin_unlock(&session->s_cap_lock);
 } else {
  spin_lock(&session->s_cap_lock);
  list_move_tail(&cap->session_caps, &session->s_caps);
  spin_unlock(&session->s_cap_lock);

  if (cap->cap_gen < gen)
   cap->issued = cap->implemented = CEPH_CAP_PIN;

  /*
 * auth mds of the inode changed. we received the cap export
 * message, but still haven't received the cap import message.
 * handle_cap_export() updated the new auth MDS' cap.
 *
 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
 * a message that was send before the cap import message. So
 * don't remove caps.
 */

  if (ceph_seq_cmp(seq, cap->seq) <= 0) {
   WARN_ON(cap != ci->i_auth_cap);
   WARN_ON(cap->cap_id != cap_id);
   seq = cap->seq;
   mseq = cap->mseq;
   issued |= cap->issued;
   flags |= CEPH_CAP_FLAG_AUTH;
  }
 }

 if (!ci->i_snap_realm ||
     ((flags & CEPH_CAP_FLAG_AUTH) &&
      realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
  /*
 * add this inode to the appropriate snap realm
 */

  struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
              realmino);
  if (realm)
   ceph_change_snap_realm(inode, realm);
  else
   WARN(1, "%s: couldn't find snap realm 0x%llx (ino 0x%llx oldrealm 0x%llx)\n",
        __func__, realmino, ci->i_vino.ino,
        ci->i_snap_realm ? ci->i_snap_realm->ino : 0);
 }

 __check_cap_issue(ci, cap, issued);

 /*
 * If we are issued caps we don't want, or the mds' wanted
 * value appears to be off, queue a check so we'll release
 * later and/or update the mds wanted value.
 */

 actual_wanted = __ceph_caps_wanted(ci);
 if ((wanted & ~actual_wanted) ||
     (issued & ~actual_wanted & CEPH_CAP_ANY_WR)) {
  doutc(cl, "issued %s, mds wanted %s, actual %s, queueing\n",
        ceph_cap_string(issued), ceph_cap_string(wanted),
        ceph_cap_string(actual_wanted));
  __cap_delay_requeue(mdsc, ci);
 }

 if (flags & CEPH_CAP_FLAG_AUTH) {
  if (!ci->i_auth_cap ||
      ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
   if (ci->i_auth_cap &&
       ci->i_auth_cap->session != cap->session)
    change_auth_cap_ses(ci, cap->session);
   ci->i_auth_cap = cap;
   cap->mds_wanted = wanted;
  }
 } else {
  WARN_ON(ci->i_auth_cap == cap);
 }

 doutc(cl, "inode %p %llx.%llx cap %p %s now %s seq %d mds%d\n",
       inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
       ceph_cap_string(issued|cap->issued), seq, mds);
 cap->cap_id = cap_id;
 cap->issued = issued;
 cap->implemented |= issued;
 if (ceph_seq_cmp(mseq, cap->mseq) > 0)
  cap->mds_wanted = wanted;
 else
  cap->mds_wanted |= wanted;
 cap->seq = seq;
 cap->issue_seq = seq;
 cap->mseq = mseq;
 cap->cap_gen = gen;
 wake_up_all(&ci->i_cap_wq);
}

/*
 * Return true if cap has not timed out and belongs to the current
 * generation of the MDS session (i.e. has not gone 'stale' due to
 * us losing touch with the mds).
 */

static int __cap_is_valid(struct ceph_cap *cap)
{
 struct inode *inode = &cap->ci->netfs.inode;
 struct ceph_client *cl = cap->session->s_mdsc->fsc->client;
 unsigned long ttl;
 u32 gen;

 gen = atomic_read(&cap->session->s_cap_gen);
 ttl = cap->session->s_cap_ttl;

 if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
  doutc(cl, "%p %llx.%llx cap %p issued %s but STALE (gen %u vs %u)\n",
        inode, ceph_vinop(inode), cap,
        ceph_cap_string(cap->issued), cap->cap_gen, gen);
  return 0;
 }

 return 1;
}

/*
 * Return set of valid cap bits issued to us.  Note that caps time
 * out, and may be invalidated in bulk if the client session times out
 * and session->s_cap_gen is bumped.
 */

int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_client *cl = ceph_inode_to_client(inode);
 int have = ci->i_snap_caps;
 struct ceph_cap *cap;
 struct rb_node *p;

 if (implemented)
  *implemented = 0;
 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
  cap = rb_entry(p, struct ceph_cap, ci_node);
  if (!__cap_is_valid(cap))
   continue;
  doutc(cl, "%p %llx.%llx cap %p issued %s\n", inode,
        ceph_vinop(inode), cap, ceph_cap_string(cap->issued));
  have |= cap->issued;
  if (implemented)
   *implemented |= cap->implemented;
 }
 /*
 * exclude caps issued by non-auth MDS, but are been revoking
 * by the auth MDS. The non-auth MDS should be revoking/exporting
 * these caps, but the message is delayed.
 */

 if (ci->i_auth_cap) {
  cap = ci->i_auth_cap;
  have &= ~cap->implemented | cap->issued;
 }
 return have;
}

/*
 * Get cap bits issued by caps other than @ocap
 */

int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
{
 int have = ci->i_snap_caps;
 struct ceph_cap *cap;
 struct rb_node *p;

 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
  cap = rb_entry(p, struct ceph_cap, ci_node);
  if (cap == ocap)
   continue;
  if (!__cap_is_valid(cap))
   continue;
  have |= cap->issued;
 }
 return have;
}

/*
 * Move a cap to the end of the LRU (oldest caps at list head, newest
 * at list tail).
 */

static void __touch_cap(struct ceph_cap *cap)
{
 struct inode *inode = &cap->ci->netfs.inode;
 struct ceph_mds_session *s = cap->session;
 struct ceph_client *cl = s->s_mdsc->fsc->client;

 spin_lock(&s->s_cap_lock);
 if (!s->s_cap_iterator) {
  doutc(cl, "%p %llx.%llx cap %p mds%d\n", inode,
        ceph_vinop(inode), cap, s->s_mds);
  list_move_tail(&cap->session_caps, &s->s_caps);
 } else {
  doutc(cl, "%p %llx.%llx cap %p mds%d NOP, iterating over caps\n",
        inode, ceph_vinop(inode), cap, s->s_mds);
 }
 spin_unlock(&s->s_cap_lock);
}

/*
 * Check if we hold the given mask.  If so, move the cap(s) to the
 * front of their respective LRUs.  (This is the preferred way for
 * callers to check for caps they want.)
 */

int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_cap *cap;
 struct rb_node *p;
 int have = ci->i_snap_caps;

 if ((have & mask) == mask) {
  doutc(cl, "mask %p %llx.%llx snap issued %s (mask %s)\n",
        inode, ceph_vinop(inode), ceph_cap_string(have),
        ceph_cap_string(mask));
  return 1;
 }

 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
  cap = rb_entry(p, struct ceph_cap, ci_node);
  if (!__cap_is_valid(cap))
   continue;
  if ((cap->issued & mask) == mask) {
   doutc(cl, "mask %p %llx.%llx cap %p issued %s (mask %s)\n",
         inode, ceph_vinop(inode), cap,
         ceph_cap_string(cap->issued),
         ceph_cap_string(mask));
   if (touch)
    __touch_cap(cap);
   return 1;
  }

  /* does a combination of caps satisfy mask? */
  have |= cap->issued;
  if ((have & mask) == mask) {
   doutc(cl, "mask %p %llx.%llx combo issued %s (mask %s)\n",
         inode, ceph_vinop(inode),
         ceph_cap_string(cap->issued),
         ceph_cap_string(mask));
   if (touch) {
    struct rb_node *q;

    /* touch this + preceding caps */
    __touch_cap(cap);
    for (q = rb_first(&ci->i_caps); q != p;
         q = rb_next(q)) {
     cap = rb_entry(q, struct ceph_cap,
             ci_node);
     if (!__cap_is_valid(cap))
      continue;
     if (cap->issued & mask)
      __touch_cap(cap);
    }
   }
   return 1;
  }
 }

 return 0;
}

int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
       int touch)
{
 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(ci->netfs.inode.i_sb);
 int r;

 r = __ceph_caps_issued_mask(ci, mask, touch);
 if (r)
  ceph_update_cap_hit(&fsc->mdsc->metric);
 else
  ceph_update_cap_mis(&fsc->mdsc->metric);
 return r;
}

/*
 * Return true if mask caps are currently being revoked by an MDS.
 */

int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
          struct ceph_cap *ocap, int mask)
{
 struct ceph_cap *cap;
 struct rb_node *p;

 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
  cap = rb_entry(p, struct ceph_cap, ci_node);
  if (cap != ocap &&
      (cap->implemented & ~cap->issued & mask))
   return 1;
 }
 return 0;
}

int __ceph_caps_used(struct ceph_inode_info *ci)
{
 int used = 0;
 if (ci->i_pin_ref)
  used |= CEPH_CAP_PIN;
 if (ci->i_rd_ref)
  used |= CEPH_CAP_FILE_RD;
 if (ci->i_rdcache_ref ||
     (S_ISREG(ci->netfs.inode.i_mode) &&
      ci->netfs.inode.i_data.nrpages))
  used |= CEPH_CAP_FILE_CACHE;
 if (ci->i_wr_ref)
  used |= CEPH_CAP_FILE_WR;
 if (ci->i_wb_ref || ci->i_wrbuffer_ref)
  used |= CEPH_CAP_FILE_BUFFER;
 if (ci->i_fx_ref)
  used |= CEPH_CAP_FILE_EXCL;
 return used;
}

#define FMODE_WAIT_BIAS 1000

/*
 * wanted, by virtue of open file modes
 */

int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
{
 const int PIN_SHIFT = ffs(CEPH_FILE_MODE_PIN);
 const int RD_SHIFT = ffs(CEPH_FILE_MODE_RD);
 const int WR_SHIFT = ffs(CEPH_FILE_MODE_WR);
 const int LAZY_SHIFT = ffs(CEPH_FILE_MODE_LAZY);
 struct ceph_mount_options *opt =
  ceph_inode_to_fs_client(&ci->netfs.inode)->mount_options;
 unsigned long used_cutoff = jiffies - opt->caps_wanted_delay_max * HZ;
 unsigned long idle_cutoff = jiffies - opt->caps_wanted_delay_min * HZ;

 if (S_ISDIR(ci->netfs.inode.i_mode)) {
  int want = 0;

  /* use used_cutoff here, to keep dir's wanted caps longer */
  if (ci->i_nr_by_mode[RD_SHIFT] > 0 ||
      time_after(ci->i_last_rd, used_cutoff))
   want |= CEPH_CAP_ANY_SHARED;

  if (ci->i_nr_by_mode[WR_SHIFT] > 0 ||
      time_after(ci->i_last_wr, used_cutoff)) {
   want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
   if (opt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
    want |= CEPH_CAP_ANY_DIR_OPS;
  }

  if (want || ci->i_nr_by_mode[PIN_SHIFT] > 0)
   want |= CEPH_CAP_PIN;

  return want;
 } else {
  int bits = 0;

  if (ci->i_nr_by_mode[RD_SHIFT] > 0) {
   if (ci->i_nr_by_mode[RD_SHIFT] >= FMODE_WAIT_BIAS ||
       time_after(ci->i_last_rd, used_cutoff))
    bits |= 1 << RD_SHIFT;
  } else if (time_after(ci->i_last_rd, idle_cutoff)) {
   bits |= 1 << RD_SHIFT;
  }

  if (ci->i_nr_by_mode[WR_SHIFT] > 0) {
   if (ci->i_nr_by_mode[WR_SHIFT] >= FMODE_WAIT_BIAS ||
       time_after(ci->i_last_wr, used_cutoff))
    bits |= 1 << WR_SHIFT;
  } else if (time_after(ci->i_last_wr, idle_cutoff)) {
   bits |= 1 << WR_SHIFT;
  }

  /* check lazyio only when read/write is wanted */
  if ((bits & (CEPH_FILE_MODE_RDWR << 1)) &&
      ci->i_nr_by_mode[LAZY_SHIFT] > 0)
   bits |= 1 << LAZY_SHIFT;

  return bits ? ceph_caps_for_mode(bits >> 1) : 0;
 }
}

/*
 * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
 */

int __ceph_caps_wanted(struct ceph_inode_info *ci)
{
 int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
 if (S_ISDIR(ci->netfs.inode.i_mode)) {
  /* we want EXCL if holding caps of dir ops */
  if (w & CEPH_CAP_ANY_DIR_OPS)
   w |= CEPH_CAP_FILE_EXCL;
 } else {
  /* we want EXCL if dirty data */
  if (w & CEPH_CAP_FILE_BUFFER)
   w |= CEPH_CAP_FILE_EXCL;
 }
 return w;
}

/*
 * Return caps we have registered with the MDS(s) as 'wanted'.
 */

int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
{
 struct ceph_cap *cap;
 struct rb_node *p;
 int mds_wanted = 0;

 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
  cap = rb_entry(p, struct ceph_cap, ci_node);
  if (check && !__cap_is_valid(cap))
   continue;
  if (cap == ci->i_auth_cap)
   mds_wanted |= cap->mds_wanted;
  else
   mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
 }
 return mds_wanted;
}

int ceph_is_any_caps(struct inode *inode)
{
 struct ceph_inode_info *ci = ceph_inode(inode);
 int ret;

 spin_lock(&ci->i_ceph_lock);
 ret = __ceph_is_any_real_caps(ci);
 spin_unlock(&ci->i_ceph_lock);

 return ret;
}

/*
 * Remove a cap.  Take steps to deal with a racing iterate_session_caps.
 *
 * caller should hold i_ceph_lock.
 * caller will not hold session s_mutex if called from destroy_inode.
 */

void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
{
 struct ceph_mds_session *session = cap->session;
 struct ceph_client *cl = session->s_mdsc->fsc->client;
 struct ceph_inode_info *ci = cap->ci;
 struct inode *inode = &ci->netfs.inode;
 struct ceph_mds_client *mdsc;
 int removed = 0;

 /* 'ci' being NULL means the remove have already occurred */
 if (!ci) {
  doutc(cl, "inode is NULL\n");
  return;
 }

 lockdep_assert_held(&ci->i_ceph_lock);

 doutc(cl, "%p from %p %llx.%llx\n", cap, inode, ceph_vinop(inode));

 mdsc = ceph_inode_to_fs_client(&ci->netfs.inode)->mdsc;

 /* remove from inode's cap rbtree, and clear auth cap */
 rb_erase(&cap->ci_node, &ci->i_caps);
 if (ci->i_auth_cap == cap)
  ci->i_auth_cap = NULL;

 /* remove from session list */
 spin_lock(&session->s_cap_lock);
 if (session->s_cap_iterator == cap) {
  /* not yet, we are iterating over this very cap */
  doutc(cl, "delaying %p removal from session %p\n", cap,
        cap->session);
 } else {
  list_del_init(&cap->session_caps);
  session->s_nr_caps--;
  atomic64_dec(&mdsc->metric.total_caps);
  cap->session = NULL;
  removed = 1;
 }
 /* protect backpointer with s_cap_lock: see iterate_session_caps */
 cap->ci = NULL;

 /*
 * s_cap_reconnect is protected by s_cap_lock. no one changes
 * s_cap_gen while session is in the reconnect state.
 */

 if (queue_release &&
     (!session->s_cap_reconnect ||
      cap->cap_gen == atomic_read(&session->s_cap_gen))) {
  cap->queue_release = 1;
  if (removed) {
   __ceph_queue_cap_release(session, cap);
   removed = 0;
  }
 } else {
  cap->queue_release = 0;
 }
 cap->cap_ino = ci->i_vino.ino;

 spin_unlock(&session->s_cap_lock);

 if (removed)
  ceph_put_cap(mdsc, cap);

 if (!__ceph_is_any_real_caps(ci)) {
  /* when reconnect denied, we remove session caps forcibly,
 * i_wr_ref can be non-zero. If there are ongoing write,
 * keep i_snap_realm.
 */

  if (ci->i_wr_ref == 0 && ci->i_snap_realm)
   ceph_change_snap_realm(&ci->netfs.inode, NULL);

  __cap_delay_cancel(mdsc, ci);
 }
}

void ceph_remove_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
       bool queue_release)
{
 struct ceph_inode_info *ci = cap->ci;
 struct ceph_fs_client *fsc;

 /* 'ci' being NULL means the remove have already occurred */
 if (!ci) {
  doutc(mdsc->fsc->client, "inode is NULL\n");
  return;
 }

 lockdep_assert_held(&ci->i_ceph_lock);

 fsc = ceph_inode_to_fs_client(&ci->netfs.inode);
 WARN_ON_ONCE(ci->i_auth_cap == cap &&
       !list_empty(&ci->i_dirty_item) &&
       !fsc->blocklisted &&
       !ceph_inode_is_shutdown(&ci->netfs.inode));

 __ceph_remove_cap(cap, queue_release);
}

struct cap_msg_args {
 struct ceph_mds_session *session;
 u64   ino, cid, follows;
 u64   flush_tid, oldest_flush_tid, size, max_size;
 u64   xattr_version;
 u64   change_attr;
 struct ceph_buffer *xattr_buf;
 struct ceph_buffer *old_xattr_buf;
 struct timespec64 atime, mtime, ctime, btime;
 int   op, caps, wanted, dirty;
 u32   seq, issue_seq, mseq, time_warp_seq;
 u32   flags;
 kuid_t   uid;
 kgid_t   gid;
 umode_t   mode;
 bool   inline_data;
 bool   wake;
 bool   encrypted;
 u32   fscrypt_auth_len;
 u8   fscrypt_auth[sizeof(struct ceph_fscrypt_auth)]; // for context
};

/* Marshal up the cap msg to the MDS */
static void encode_cap_msg(struct ceph_msg *msg, struct cap_msg_args *arg)
{
 struct ceph_mds_caps *fc;
 void *p;
 struct ceph_mds_client *mdsc = arg->session->s_mdsc;
 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;

 doutc(mdsc->fsc->client,
       "%s %llx %llx caps %s wanted %s dirty %s seq %u/%u"
       " tid %llu/%llu mseq %u follows %lld size %llu/%llu"
       " xattr_ver %llu xattr_len %d\n",
       ceph_cap_op_name(arg->op), arg->cid, arg->ino,
       ceph_cap_string(arg->caps), ceph_cap_string(arg->wanted),
       ceph_cap_string(arg->dirty), arg->seq, arg->issue_seq,
       arg->flush_tid, arg->oldest_flush_tid, arg->mseq, arg->follows,
       arg->size, arg->max_size, arg->xattr_version,
       arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);

 msg->hdr.version = cpu_to_le16(12);
 msg->hdr.tid = cpu_to_le64(arg->flush_tid);

 fc = msg->front.iov_base;
 memset(fc, 0, sizeof(*fc));

 fc->cap_id = cpu_to_le64(arg->cid);
 fc->op = cpu_to_le32(arg->op);
 fc->seq = cpu_to_le32(arg->seq);
 fc->issue_seq = cpu_to_le32(arg->issue_seq);
 fc->migrate_seq = cpu_to_le32(arg->mseq);
 fc->caps = cpu_to_le32(arg->caps);
 fc->wanted = cpu_to_le32(arg->wanted);
 fc->dirty = cpu_to_le32(arg->dirty);
 fc->ino = cpu_to_le64(arg->ino);
 fc->snap_follows = cpu_to_le64(arg->follows);

#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
 if (arg->encrypted)
  fc->size = cpu_to_le64(round_up(arg->size,
      CEPH_FSCRYPT_BLOCK_SIZE));
 else
#endif
  fc->size = cpu_to_le64(arg->size);
 fc->max_size = cpu_to_le64(arg->max_size);
 ceph_encode_timespec64(&fc->mtime, &arg->mtime);
 ceph_encode_timespec64(&fc->atime, &arg->atime);
 ceph_encode_timespec64(&fc->ctime, &arg->ctime);
 fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq);

 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid));
 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid));
 fc->mode = cpu_to_le32(arg->mode);

 fc->xattr_version = cpu_to_le64(arg->xattr_version);
 if (arg->xattr_buf) {
  msg->middle = ceph_buffer_get(arg->xattr_buf);
  fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
  msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
 }

 p = fc + 1;
 /* flock buffer size (version 2) */
 ceph_encode_32(&p, 0);
 /* inline version (version 4) */
 ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
 /* inline data size */
 ceph_encode_32(&p, 0);
 /*
 * osd_epoch_barrier (version 5)
 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in
 * case it was recently changed
 */

 ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier));
 /* oldest_flush_tid (version 6) */
 ceph_encode_64(&p, arg->oldest_flush_tid);

 /*
 * caller_uid/caller_gid (version 7)
 *
 * Currently, we don't properly track which caller dirtied the caps
 * last, and force a flush of them when there is a conflict. For now,
 * just set this to 0:0, to emulate how the MDS has worked up to now.
 */

 ceph_encode_32(&p, 0);
 ceph_encode_32(&p, 0);

 /* pool namespace (version 8) (mds always ignores this) */
 ceph_encode_32(&p, 0);

 /* btime and change_attr (version 9) */
 ceph_encode_timespec64(p, &arg->btime);
 p += sizeof(struct ceph_timespec);
 ceph_encode_64(&p, arg->change_attr);

 /* Advisory flags (version 10) */
 ceph_encode_32(&p, arg->flags);

 /* dirstats (version 11) - these are r/o on the client */
 ceph_encode_64(&p, 0);
 ceph_encode_64(&p, 0);

#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
 /*
 * fscrypt_auth and fscrypt_file (version 12)
 *
 * fscrypt_auth holds the crypto context (if any). fscrypt_file
 * tracks the real i_size as an __le64 field (and we use a rounded-up
 * i_size in the traditional size field).
 */

 ceph_encode_32(&p, arg->fscrypt_auth_len);
 ceph_encode_copy(&p, arg->fscrypt_auth, arg->fscrypt_auth_len);
 ceph_encode_32(&p, sizeof(__le64));
 ceph_encode_64(&p, arg->size);
#else /* CONFIG_FS_ENCRYPTION */
 ceph_encode_32(&p, 0);
 ceph_encode_32(&p, 0);
#endif /* CONFIG_FS_ENCRYPTION */
}

/*
 * Queue cap releases when an inode is dropped from our cache.
 */

void __ceph_remove_caps(struct ceph_inode_info *ci)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc;
 struct rb_node *p;

 /* lock i_ceph_lock, because ceph_d_revalidate(..., LOOKUP_RCU)
 * may call __ceph_caps_issued_mask() on a freeing inode. */

 spin_lock(&ci->i_ceph_lock);
 p = rb_first(&ci->i_caps);
 while (p) {
  struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
  p = rb_next(p);
  ceph_remove_cap(mdsc, cap, true);
 }
 spin_unlock(&ci->i_ceph_lock);
}

/*
 * Prepare to send a cap message to an MDS. Update the cap state, and populate
 * the arg struct with the parameters that will need to be sent. This should
 * be done under the i_ceph_lock to guard against changes to cap state.
 *
 * Make note of max_size reported/requested from mds, revoked caps
 * that have now been implemented.
 */

static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
         int op, int flags, int used, int want, int retain,
         int flushing, u64 flush_tid, u64 oldest_flush_tid)
{
 struct ceph_inode_info *ci = cap->ci;
 struct inode *inode = &ci->netfs.inode;
 struct ceph_client *cl = ceph_inode_to_client(inode);
 int held, revoking;

 lockdep_assert_held(&ci->i_ceph_lock);

 held = cap->issued | cap->implemented;
 revoking = cap->implemented & ~cap->issued;
 retain &= ~revoking;

 doutc(cl, "%p %llx.%llx cap %p session %p %s -> %s (revoking %s)\n",
       inode, ceph_vinop(inode), cap, cap->session,
       ceph_cap_string(held), ceph_cap_string(held & retain),
       ceph_cap_string(revoking));
 BUG_ON((retain & CEPH_CAP_PIN) == 0);

 ci->i_ceph_flags &= ~CEPH_I_FLUSH;

 cap->issued &= retain;  /* drop bits we don't want */
 /*
 * Wake up any waiters on wanted -> needed transition. This is due to
 * the weird transition from buffered to sync IO... we need to flush
 * dirty pages _before_ allowing sync writes to avoid reordering.
 */

 arg->wake = cap->implemented & ~cap->issued;
 cap->implemented &= cap->issued | used;
 cap->mds_wanted = want;

 arg->session = cap->session;
 arg->ino = ceph_vino(inode).ino;
 arg->cid = cap->cap_id;
 arg->follows = flushing ? ci->i_head_snapc->seq : 0;
 arg->flush_tid = flush_tid;
 arg->oldest_flush_tid = oldest_flush_tid;
 arg->size = i_size_read(inode);
 ci->i_reported_size = arg->size;
 arg->max_size = ci->i_wanted_max_size;
 if (cap == ci->i_auth_cap) {
  if (want & CEPH_CAP_ANY_FILE_WR)
   ci->i_requested_max_size = arg->max_size;
  else
   ci->i_requested_max_size = 0;
 }

 if (flushing & CEPH_CAP_XATTR_EXCL) {
  arg->old_xattr_buf = __ceph_build_xattrs_blob(ci);
  arg->xattr_version = ci->i_xattrs.version;
  arg->xattr_buf = ceph_buffer_get(ci->i_xattrs.blob);
 } else {
  arg->xattr_buf = NULL;
  arg->old_xattr_buf = NULL;
 }

 arg->mtime = inode_get_mtime(inode);
 arg->atime = inode_get_atime(inode);
 arg->ctime = inode_get_ctime(inode);
 arg->btime = ci->i_btime;
 arg->change_attr = inode_peek_iversion_raw(inode);

 arg->op = op;
 arg->caps = cap->implemented;
 arg->wanted = want;
 arg->dirty = flushing;

 arg->seq = cap->seq;
 arg->issue_seq = cap->issue_seq;
 arg->mseq = cap->mseq;
 arg->time_warp_seq = ci->i_time_warp_seq;

 arg->uid = inode->i_uid;
 arg->gid = inode->i_gid;
 arg->mode = inode->i_mode;

 arg->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
 if (!(flags & CEPH_CLIENT_CAPS_PENDING_CAPSNAP) &&
     !list_empty(&ci->i_cap_snaps)) {
  struct ceph_cap_snap *capsnap;
  list_for_each_entry_reverse(capsnap, &ci->i_cap_snaps, ci_item) {
   if (capsnap->cap_flush.tid)
    break;
   if (capsnap->need_flush) {
    flags |= CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
    break;
   }
  }
 }
 arg->flags = flags;
 arg->encrypted = IS_ENCRYPTED(inode);
#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
 if (ci->fscrypt_auth_len &&
     WARN_ON_ONCE(ci->fscrypt_auth_len > sizeof(struct ceph_fscrypt_auth))) {
  /* Don't set this if it's too big */
  arg->fscrypt_auth_len = 0;
 } else {
  arg->fscrypt_auth_len = ci->fscrypt_auth_len;
  memcpy(arg->fscrypt_auth, ci->fscrypt_auth,
         min_t(size_t, ci->fscrypt_auth_len,
        sizeof(arg->fscrypt_auth)));
 }
#endif /* CONFIG_FS_ENCRYPTION */
}

#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
#define CAP_MSG_FIXED_FIELDS (sizeof(struct ceph_mds_caps) + \
        4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 8 + 4 + 4 + 8)

static inline int cap_msg_size(struct cap_msg_args *arg)
{
 return CAP_MSG_FIXED_FIELDS + arg->fscrypt_auth_len;
}
#else
#define CAP_MSG_FIXED_FIELDS (sizeof(struct ceph_mds_caps) + \
        4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 8 + 4 + 4)

static inline int cap_msg_size(struct cap_msg_args *arg)
{
 return CAP_MSG_FIXED_FIELDS;
}
#endif /* CONFIG_FS_ENCRYPTION */

/*
 * Send a cap msg on the given inode.
 *
 * Caller should hold snap_rwsem (read), s_mutex.
 */

static void __send_cap(struct cap_msg_args *arg, struct ceph_inode_info *ci)
{
 struct ceph_msg *msg;
 struct inode *inode = &ci->netfs.inode;
 struct ceph_client *cl = ceph_inode_to_client(inode);

 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, cap_msg_size(arg), GFP_NOFS,
      false);
 if (!msg) {
  pr_err_client(cl,
         "error allocating cap msg: ino (%llx.%llx)"
         " flushing %s tid %llu, requeuing cap.\n",
         ceph_vinop(inode), ceph_cap_string(arg->dirty),
         arg->flush_tid);
  spin_lock(&ci->i_ceph_lock);
  __cap_delay_requeue(arg->session->s_mdsc, ci);
  spin_unlock(&ci->i_ceph_lock);
  return;
 }

 encode_cap_msg(msg, arg);
 ceph_con_send(&arg->session->s_con, msg);
 ceph_buffer_put(arg->old_xattr_buf);
 ceph_buffer_put(arg->xattr_buf);
 if (arg->wake)
  wake_up_all(&ci->i_cap_wq);
}

static inline int __send_flush_snap(struct inode *inode,
        struct ceph_mds_session *session,
        struct ceph_cap_snap *capsnap,
        u32 mseq, u64 oldest_flush_tid)
{
 struct cap_msg_args arg;
 struct ceph_msg  *msg;

 arg.session = session;
 arg.ino = ceph_vino(inode).ino;
 arg.cid = 0;
 arg.follows = capsnap->follows;
 arg.flush_tid = capsnap->cap_flush.tid;
 arg.oldest_flush_tid = oldest_flush_tid;

 arg.size = capsnap->size;
 arg.max_size = 0;
 arg.xattr_version = capsnap->xattr_version;
 arg.xattr_buf = capsnap->xattr_blob;
 arg.old_xattr_buf = NULL;

 arg.atime = capsnap->atime;
 arg.mtime = capsnap->mtime;
 arg.ctime = capsnap->ctime;
 arg.btime = capsnap->btime;
 arg.change_attr = capsnap->change_attr;

 arg.op = CEPH_CAP_OP_FLUSHSNAP;
 arg.caps = capsnap->issued;
 arg.wanted = 0;
 arg.dirty = capsnap->dirty;

 arg.seq = 0;
 arg.issue_seq = 0;
 arg.mseq = mseq;
 arg.time_warp_seq = capsnap->time_warp_seq;

 arg.uid = capsnap->uid;
 arg.gid = capsnap->gid;
 arg.mode = capsnap->mode;

 arg.inline_data = capsnap->inline_data;
 arg.flags = 0;
 arg.wake = false;
 arg.encrypted = IS_ENCRYPTED(inode);

 /* No fscrypt_auth changes from a capsnap.*/
 arg.fscrypt_auth_len = 0;

 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, cap_msg_size(&arg),
      GFP_NOFS, false);
 if (!msg)
  return -ENOMEM;

 encode_cap_msg(msg, &arg);
 ceph_con_send(&arg.session->s_con, msg);
 return 0;
}

/*
 * When a snapshot is taken, clients accumulate dirty metadata on
 * inodes with capabilities in ceph_cap_snaps to describe the file
 * state at the time the snapshot was taken.  This must be flushed
 * asynchronously back to the MDS once sync writes complete and dirty
 * data is written out.
 *
 * Called under i_ceph_lock.
 */

static void __ceph_flush_snaps(struct ceph_inode_info *ci,
          struct ceph_mds_session *session)
  __releases(ci->i_ceph_lock)
  __acquires(ci->i_ceph_lock)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_mds_client *mdsc = session->s_mdsc;
 struct ceph_client *cl = mdsc->fsc->client;
 struct ceph_cap_snap *capsnap;
 u64 oldest_flush_tid = 0;
 u64 first_tid = 1, last_tid = 0;

 doutc(cl, "%p %llx.%llx session %p\n", inode, ceph_vinop(inode),
       session);

 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
  /*
 * we need to wait for sync writes to complete and for dirty
 * pages to be written out.
 */

  if (capsnap->dirty_pages || capsnap->writing)
   break;

  /* should be removed by ceph_try_drop_cap_snap() */
  BUG_ON(!capsnap->need_flush);

  /* only flush each capsnap once */
  if (capsnap->cap_flush.tid > 0) {
   doutc(cl, "already flushed %p, skipping\n", capsnap);
   continue;
  }

  spin_lock(&mdsc->cap_dirty_lock);
  capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
  list_add_tail(&capsnap->cap_flush.g_list,
         &mdsc->cap_flush_list);
  if (oldest_flush_tid == 0)
   oldest_flush_tid = __get_oldest_flush_tid(mdsc);
  if (list_empty(&ci->i_flushing_item)) {
   list_add_tail(&ci->i_flushing_item,
          &session->s_cap_flushing);
  }
  spin_unlock(&mdsc->cap_dirty_lock);

  list_add_tail(&capsnap->cap_flush.i_list,
         &ci->i_cap_flush_list);

  if (first_tid == 1)
   first_tid = capsnap->cap_flush.tid;
  last_tid = capsnap->cap_flush.tid;
 }

 ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;

 while (first_tid <= last_tid) {
  struct ceph_cap *cap = ci->i_auth_cap;
  struct ceph_cap_flush *cf = NULL, *iter;
  int ret;

  if (!(cap && cap->session == session)) {
   doutc(cl, "%p %llx.%llx auth cap %p not mds%d, stop\n",
         inode, ceph_vinop(inode), cap, session->s_mds);
   break;
  }

  ret = -ENOENT;
  list_for_each_entry(iter, &ci->i_cap_flush_list, i_list) {
   if (iter->tid >= first_tid) {
    cf = iter;
    ret = 0;
    break;
   }
  }
  if (ret < 0)
   break;

  first_tid = cf->tid + 1;

  capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
  refcount_inc(&capsnap->nref);
  spin_unlock(&ci->i_ceph_lock);

  doutc(cl, "%p %llx.%llx capsnap %p tid %llu %s\n", inode,
        ceph_vinop(inode), capsnap, cf->tid,
        ceph_cap_string(capsnap->dirty));

  ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
     oldest_flush_tid);
  if (ret < 0) {
   pr_err_client(cl, "error sending cap flushsnap, "
          "ino (%llx.%llx) tid %llu follows %llu\n",
          ceph_vinop(inode), cf->tid,
          capsnap->follows);
  }

  ceph_put_cap_snap(capsnap);
  spin_lock(&ci->i_ceph_lock);
 }
}

void ceph_flush_snaps(struct ceph_inode_info *ci,
        struct ceph_mds_session **psession)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_mds_client *mdsc = ceph_inode_to_fs_client(inode)->mdsc;
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_mds_session *session = NULL;
 bool need_put = false;
 int mds;

 doutc(cl, "%p %llx.%llx\n", inode, ceph_vinop(inode));
 if (psession)
  session = *psession;
retry:
 spin_lock(&ci->i_ceph_lock);
 if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
  doutc(cl, " no capsnap needs flush, doing nothing\n");
  goto out;
 }
 if (!ci->i_auth_cap) {
  doutc(cl, " no auth cap (migrating?), doing nothing\n");
  goto out;
 }

 mds = ci->i_auth_cap->session->s_mds;
 if (session && session->s_mds != mds) {
  doutc(cl, " oops, wrong session %p mutex\n", session);
  ceph_put_mds_session(session);
  session = NULL;
 }
 if (!session) {
  spin_unlock(&ci->i_ceph_lock);
  mutex_lock(&mdsc->mutex);
  session = __ceph_lookup_mds_session(mdsc, mds);
  mutex_unlock(&mdsc->mutex);
  goto retry;
 }

 // make sure flushsnap messages are sent in proper order.
 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
  __kick_flushing_caps(mdsc, session, ci, 0);

 __ceph_flush_snaps(ci, session);
out:
 spin_unlock(&ci->i_ceph_lock);

 if (psession)
  *psession = session;
 else
  ceph_put_mds_session(session);
 /* we flushed them all; remove this inode from the queue */
 spin_lock(&mdsc->snap_flush_lock);
 if (!list_empty(&ci->i_snap_flush_item))
  need_put = true;
 list_del_init(&ci->i_snap_flush_item);
 spin_unlock(&mdsc->snap_flush_lock);

 if (need_put)
  iput(inode);
}

/*
 * Mark caps dirty.  If inode is newly dirty, return the dirty flags.
 * Caller is then responsible for calling __mark_inode_dirty with the
 * returned flags value.
 */

int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
      struct ceph_cap_flush **pcf)
{
 struct ceph_mds_client *mdsc =
  ceph_sb_to_fs_client(ci->netfs.inode.i_sb)->mdsc;
 struct inode *inode = &ci->netfs.inode;
 struct ceph_client *cl = ceph_inode_to_client(inode);
 int was = ci->i_dirty_caps;
 int dirty = 0;

 lockdep_assert_held(&ci->i_ceph_lock);

 if (!ci->i_auth_cap) {
  pr_warn_client(cl, "%p %llx.%llx mask %s, "
          "but no auth cap (session was closed?)\n",
    inode, ceph_vinop(inode),
    ceph_cap_string(mask));
  return 0;
 }

 doutc(cl, "%p %llx.%llx %s dirty %s -> %s\n", inode,
       ceph_vinop(inode), ceph_cap_string(mask),
       ceph_cap_string(was), ceph_cap_string(was | mask));
 ci->i_dirty_caps |= mask;
 if (was == 0) {
  struct ceph_mds_session *session = ci->i_auth_cap->session;

  WARN_ON_ONCE(ci->i_prealloc_cap_flush);
  swap(ci->i_prealloc_cap_flush, *pcf);

  if (!ci->i_head_snapc) {
   WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
   ci->i_head_snapc = ceph_get_snap_context(
    ci->i_snap_realm->cached_context);
  }
  doutc(cl, "%p %llx.%llx now dirty snapc %p auth cap %p\n",
        inode, ceph_vinop(inode), ci->i_head_snapc,
        ci->i_auth_cap);
  BUG_ON(!list_empty(&ci->i_dirty_item));
  spin_lock(&mdsc->cap_dirty_lock);
  list_add(&ci->i_dirty_item, &session->s_cap_dirty);
  spin_unlock(&mdsc->cap_dirty_lock);
  if (ci->i_flushing_caps == 0) {
   ihold(inode);
   dirty |= I_DIRTY_SYNC;
  }
 } else {
  WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
 }
 BUG_ON(list_empty(&ci->i_dirty_item));
 if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
     (mask & CEPH_CAP_FILE_BUFFER))
  dirty |= I_DIRTY_DATASYNC;
 __cap_delay_requeue(mdsc, ci);
 return dirty;
}

struct ceph_cap_flush *ceph_alloc_cap_flush(void)
{
 struct ceph_cap_flush *cf;

 cf = kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
 if (!cf)
  return NULL;

 cf->is_capsnap = false;
 return cf;
}

void ceph_free_cap_flush(struct ceph_cap_flush *cf)
{
 if (cf)
  kmem_cache_free(ceph_cap_flush_cachep, cf);
}

static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
{
 if (!list_empty(&mdsc->cap_flush_list)) {
  struct ceph_cap_flush *cf =
   list_first_entry(&mdsc->cap_flush_list,
      struct ceph_cap_flush, g_list);
  return cf->tid;
 }
 return 0;
}

/*
 * Remove cap_flush from the mdsc's or inode's flushing cap list.
 * Return true if caller needs to wake up flush waiters.
 */

static bool __detach_cap_flush_from_mdsc(struct ceph_mds_client *mdsc,
      struct ceph_cap_flush *cf)
{
 struct ceph_cap_flush *prev;
 bool wake = cf->wake;

 if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
  prev = list_prev_entry(cf, g_list);
  prev->wake = true;
  wake = false;
 }
 list_del_init(&cf->g_list);
 return wake;
}

static bool __detach_cap_flush_from_ci(struct ceph_inode_info *ci,
           struct ceph_cap_flush *cf)
{
 struct ceph_cap_flush *prev;
 bool wake = cf->wake;

 if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
  prev = list_prev_entry(cf, i_list);
  prev->wake = true;
  wake = false;
 }
 list_del_init(&cf->i_list);
 return wake;
}

/*
 * Add dirty inode to the flushing list.  Assigned a seq number so we
 * can wait for caps to flush without starving.
 *
 * Called under i_ceph_lock. Returns the flush tid.
 */

static u64 __mark_caps_flushing(struct inode *inode,
    struct ceph_mds_session *session, bool wake,
    u64 *oldest_flush_tid)
{
 struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc;
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct ceph_cap_flush *cf = NULL;
 int flushing;

 lockdep_assert_held(&ci->i_ceph_lock);
 BUG_ON(ci->i_dirty_caps == 0);
 BUG_ON(list_empty(&ci->i_dirty_item));
 BUG_ON(!ci->i_prealloc_cap_flush);

 flushing = ci->i_dirty_caps;
 doutc(cl, "flushing %s, flushing_caps %s -> %s\n",
       ceph_cap_string(flushing),
       ceph_cap_string(ci->i_flushing_caps),
       ceph_cap_string(ci->i_flushing_caps | flushing));
 ci->i_flushing_caps |= flushing;
 ci->i_dirty_caps = 0;
 doutc(cl, "%p %llx.%llx now !dirty\n", inode, ceph_vinop(inode));

 swap(cf, ci->i_prealloc_cap_flush);
 cf->caps = flushing;
 cf->wake = wake;

 spin_lock(&mdsc->cap_dirty_lock);
 list_del_init(&ci->i_dirty_item);

 cf->tid = ++mdsc->last_cap_flush_tid;
 list_add_tail(&cf->g_list, &mdsc->cap_flush_list);
 *oldest_flush_tid = __get_oldest_flush_tid(mdsc);

 if (list_empty(&ci->i_flushing_item)) {
  list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
  mdsc->num_cap_flushing++;
 }
 spin_unlock(&mdsc->cap_dirty_lock);

 list_add_tail(&cf->i_list, &ci->i_cap_flush_list);

 return cf->tid;
}

/*
 * try to invalidate mapping pages without blocking.
 */

static int try_nonblocking_invalidate(struct inode *inode)
 __releases(ci->i_ceph_lock)
 __acquires(ci->i_ceph_lock)
{
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 u32 invalidating_gen = ci->i_rdcache_gen;

 spin_unlock(&ci->i_ceph_lock);
 ceph_fscache_invalidate(inode, false);
 invalidate_mapping_pages(&inode->i_data, 0, -1);
 spin_lock(&ci->i_ceph_lock);

 if (inode->i_data.nrpages == 0 &&
     invalidating_gen == ci->i_rdcache_gen) {
  /* success. */
  doutc(cl, "%p %llx.%llx success\n", inode,
        ceph_vinop(inode));
  /* save any racing async invalidate some trouble */
  ci->i_rdcache_revoking = ci->i_rdcache_gen - 1;
  return 0;
 }
 doutc(cl, "%p %llx.%llx failed\n", inode, ceph_vinop(inode));
 return -1;
}

bool __ceph_should_report_size(struct ceph_inode_info *ci)
{
 loff_t size = i_size_read(&ci->netfs.inode);
 /* mds will adjust max size according to the reported size */
 if (ci->i_flushing_caps & CEPH_CAP_FILE_WR)
  return false;
 if (size >= ci->i_max_size)
  return true;
 /* half of previous max_size increment has been used */
 if (ci->i_max_size > ci->i_reported_size &&
     (size << 1) >= ci->i_max_size + ci->i_reported_size)
  return true;
 return false;
}

/*
 * Swiss army knife function to examine currently used and wanted
 * versus held caps.  Release, flush, ack revoked caps to mds as
 * appropriate.
 *
 *  CHECK_CAPS_AUTHONLY - we should only check the auth cap
 *  CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
 *    further delay.
 *  CHECK_CAPS_FLUSH_FORCE - we should flush any caps immediately, without
 *    further delay.
 */

void ceph_check_caps(struct ceph_inode_info *ci, int flags)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_cap *cap;
 u64 flush_tid, oldest_flush_tid;
 int file_wanted, used, cap_used;
 int issued, implemented, want, retain, revoking, flushing = 0;
 int mds = -1;   /* keep track of how far we've gone through i_caps list
   to avoid an infinite loop on retry */

 struct rb_node *p;
 bool queue_invalidate = false;
 bool tried_invalidate = false;
 bool queue_writeback = false;
 struct ceph_mds_session *session = NULL;

 spin_lock(&ci->i_ceph_lock);
 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
  ci->i_ceph_flags |= CEPH_I_ASYNC_CHECK_CAPS;

  /* Don't send messages until we get async create reply */
  spin_unlock(&ci->i_ceph_lock);
  return;
 }

 if (ci->i_ceph_flags & CEPH_I_FLUSH)
  flags |= CHECK_CAPS_FLUSH;
retry:
 /* Caps wanted by virtue of active open files. */
 file_wanted = __ceph_caps_file_wanted(ci);

 /* Caps which have active references against them */
 used = __ceph_caps_used(ci);

 /*
 * "issued" represents the current caps that the MDS wants us to have.
 * "implemented" is the set that we have been granted, and includes the
 * ones that have not yet been returned to the MDS (the "revoking" set,
 * usually because they have outstanding references).
 */

 issued = __ceph_caps_issued(ci, &implemented);
 revoking = implemented & ~issued;

 want = file_wanted;

 /* The ones we currently want to retain (may be adjusted below) */
 retain = file_wanted | used | CEPH_CAP_PIN;
 if (!mdsc->stopping && inode->i_nlink > 0) {
  if (file_wanted) {
   retain |= CEPH_CAP_ANY;       /* be greedy */
  } else if (S_ISDIR(inode->i_mode) &&
      (issued & CEPH_CAP_FILE_SHARED) &&
      __ceph_dir_is_complete(ci)) {
   /*
 * If a directory is complete, we want to keep
 * the exclusive cap. So that MDS does not end up
 * revoking the shared cap on every create/unlink
 * operation.
 */

   if (IS_RDONLY(inode)) {
    want = CEPH_CAP_ANY_SHARED;
   } else {
    want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
   }
   retain |= want;
  } else {

   retain |= CEPH_CAP_ANY_SHARED;
   /*
 * keep RD only if we didn't have the file open RW,
 * because then the mds would revoke it anyway to
 * journal max_size=0.
 */

   if (ci->i_max_size == 0)
    retain |= CEPH_CAP_ANY_RD;
  }
 }

 doutc(cl, "%p %llx.%llx file_want %s used %s dirty %s "
       "flushing %s issued %s revoking %s retain %s %s%s%s%s\n",
      inode, ceph_vinop(inode), ceph_cap_string(file_wanted),
      ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
      ceph_cap_string(ci->i_flushing_caps),
      ceph_cap_string(issued), ceph_cap_string(revoking),
      ceph_cap_string(retain),
      (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
      (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "",
      (flags & CHECK_CAPS_NOINVAL) ? " NOINVAL" : "",
      (flags & CHECK_CAPS_FLUSH_FORCE) ? " FLUSH_FORCE" : "");

 /*
 * If we no longer need to hold onto old our caps, and we may
 * have cached pages, but don't want them, then try to invalidate.
 * If we fail, it's because pages are locked.... try again later.
 */

 if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) &&
     S_ISREG(inode->i_mode) &&
     !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
     inode->i_data.nrpages &&  /* have cached pages */
     (revoking & (CEPH_CAP_FILE_CACHE|
    CEPH_CAP_FILE_LAZYIO)) && /*  or revoking cache */
     !tried_invalidate) {
  doutc(cl, "trying to invalidate on %p %llx.%llx\n",
        inode, ceph_vinop(inode));
  if (try_nonblocking_invalidate(inode) < 0) {
   doutc(cl, "queuing invalidate\n");
   queue_invalidate = true;
   ci->i_rdcache_revoking = ci->i_rdcache_gen;
  }
  tried_invalidate = true;
  goto retry;
 }

 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
  int mflags = 0;
  struct cap_msg_args arg;

  cap = rb_entry(p, struct ceph_cap, ci_node);

  /* avoid looping forever */
  if (mds >= cap->mds ||
      ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
   continue;

  /*
 * If we have an auth cap, we don't need to consider any
 * overlapping caps as used.
 */

  cap_used = used;
  if (ci->i_auth_cap && cap != ci->i_auth_cap)
   cap_used &= ~ci->i_auth_cap->issued;

  revoking = cap->implemented & ~cap->issued;
  doutc(cl, " mds%d cap %p used %s issued %s implemented %s revoking %s\n",
        cap->mds, cap, ceph_cap_string(cap_used),
        ceph_cap_string(cap->issued),
        ceph_cap_string(cap->implemented),
        ceph_cap_string(revoking));

  /* completed revocation? going down and there are no caps? */
  if (revoking) {
   if ((revoking & cap_used) == 0) {
    doutc(cl, "completed revocation of %s\n",
          ceph_cap_string(cap->implemented & ~cap->issued));
    goto ack;
   }

   /*
 * If the "i_wrbuffer_ref" was increased by mmap or generic
 * cache write just before the ceph_check_caps() is called,
 * the Fb capability revoking will fail this time. Then we
 * must wait for the BDI's delayed work to flush the dirty
 * pages and to release the "i_wrbuffer_ref", which will cost
 * at most 5 seconds. That means the MDS needs to wait at
 * most 5 seconds to finished the Fb capability's revocation.
 *
 * Let's queue a writeback for it.
 */

   if (S_ISREG(inode->i_mode) && ci->i_wrbuffer_ref &&
       (revoking & CEPH_CAP_FILE_BUFFER))
    queue_writeback = true;
  }

  if (flags & CHECK_CAPS_FLUSH_FORCE) {
   doutc(cl, "force to flush caps\n");
   goto ack;
  }

  if (cap == ci->i_auth_cap &&
      (cap->issued & CEPH_CAP_FILE_WR)) {
   /* request larger max_size from MDS? */
   if (ci->i_wanted_max_size > ci->i_max_size &&
       ci->i_wanted_max_size > ci->i_requested_max_size) {
    doutc(cl, "requesting new max_size\n");
    goto ack;
   }

   /* approaching file_max? */
   if (__ceph_should_report_size(ci)) {
    doutc(cl, "i_size approaching max_size\n");
    goto ack;
   }
  }
  /* flush anything dirty? */
  if (cap == ci->i_auth_cap) {
   if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) {
    doutc(cl, "flushing dirty caps\n");
    goto ack;
   }
   if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) {
    doutc(cl, "flushing snap caps\n");
    goto ack;
   }
  }

  /* want more caps from mds? */
  if (want & ~cap->mds_wanted) {
   if (want & ~(cap->mds_wanted | cap->issued))
    goto ack;
   if (!__cap_is_valid(cap))
    goto ack;
  }

  /* things we might delay */
  if ((cap->issued & ~retain) == 0)
   continue;     /* nope, all good */

ack:
  ceph_put_mds_session(session);
  session = ceph_get_mds_session(cap->session);

  /* kick flushing and flush snaps before sending normal
 * cap message */

  if (cap == ci->i_auth_cap &&
      (ci->i_ceph_flags &
       (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
   if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
    __kick_flushing_caps(mdsc, session, ci, 0);
   if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
    __ceph_flush_snaps(ci, session);

   goto retry;
  }

  if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
   flushing = ci->i_dirty_caps;
   flush_tid = __mark_caps_flushing(inode, session, false,
        &oldest_flush_tid);
   if (flags & CHECK_CAPS_FLUSH &&
       list_empty(&session->s_cap_dirty))
    mflags |= CEPH_CLIENT_CAPS_SYNC;
  } else {
   flushing = 0;
   flush_tid = 0;
   spin_lock(&mdsc->cap_dirty_lock);
   oldest_flush_tid = __get_oldest_flush_tid(mdsc);
   spin_unlock(&mdsc->cap_dirty_lock);
  }

  mds = cap->mds;  /* remember mds, so we don't repeat */

  __prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
      want, retain, flushing, flush_tid, oldest_flush_tid);

  spin_unlock(&ci->i_ceph_lock);
  __send_cap(&arg, ci);
  spin_lock(&ci->i_ceph_lock);

  goto retry; /* retake i_ceph_lock and restart our cap scan. */
 }

 /* periodically re-calculate caps wanted by open files */
 if (__ceph_is_any_real_caps(ci) &&
     list_empty(&ci->i_cap_delay_list) &&
     (file_wanted & ~CEPH_CAP_PIN) &&
     !(used & (CEPH_CAP_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
  __cap_delay_requeue(mdsc, ci);
 }

 spin_unlock(&ci->i_ceph_lock);

 ceph_put_mds_session(session);
 if (queue_writeback)
  ceph_queue_writeback(inode);
 if (queue_invalidate)
  ceph_queue_invalidate(inode);
}

/*
 * Try to flush dirty caps back to the auth mds.
 */

static int try_flush_caps(struct inode *inode, u64 *ptid)
{
 struct ceph_mds_client *mdsc = ceph_sb_to_fs_client(inode->i_sb)->mdsc;
 struct ceph_inode_info *ci = ceph_inode(inode);
 int flushing = 0;
 u64 flush_tid = 0, oldest_flush_tid = 0;

 spin_lock(&ci->i_ceph_lock);
retry_locked:
 if (ci->i_dirty_caps && ci->i_auth_cap) {
  struct ceph_cap *cap = ci->i_auth_cap;
  struct cap_msg_args arg;
  struct ceph_mds_session *session = cap->session;

  if (session->s_state < CEPH_MDS_SESSION_OPEN) {
   spin_unlock(&ci->i_ceph_lock);
   goto out;
  }

  if (ci->i_ceph_flags &
      (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS)) {
   if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH)
    __kick_flushing_caps(mdsc, session, ci, 0);
   if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
    __ceph_flush_snaps(ci, session);
   goto retry_locked;
  }

  flushing = ci->i_dirty_caps;
  flush_tid = __mark_caps_flushing(inode, session, true,
       &oldest_flush_tid);

  __prep_cap(&arg, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
      __ceph_caps_used(ci), __ceph_caps_wanted(ci),
      (cap->issued | cap->implemented),
      flushing, flush_tid, oldest_flush_tid);
  spin_unlock(&ci->i_ceph_lock);

  __send_cap(&arg, ci);
 } else {
  if (!list_empty(&ci->i_cap_flush_list)) {
   struct ceph_cap_flush *cf =
    list_last_entry(&ci->i_cap_flush_list,
      struct ceph_cap_flush, i_list);
   cf->wake = true;
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=96 H=91 G=93

¤ Dauer der Verarbeitung: 0.21 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.