Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Linux/fs/ceph/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 89 kB image not shown  

Quelle  inode.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>

#include <linux/module.h>
#include <linux/fs.h>
#include <linux/slab.h>
#include <linux/string.h>
#include <linux/uaccess.h>
#include <linux/kernel.h>
#include <linux/writeback.h>
#include <linux/vmalloc.h>
#include <linux/xattr.h>
#include <linux/posix_acl.h>
#include <linux/random.h>
#include <linux/sort.h>
#include <linux/iversion.h>
#include <linux/fscrypt.h>

#include "super.h"
#include "mds_client.h"
#include "cache.h"
#include "crypto.h"
#include <linux/ceph/decode.h>

/*
 * Ceph inode operations
 *
 * Implement basic inode helpers (get, alloc) and inode ops (getattr,
 * setattr, etc.), xattr helpers, and helpers for assimilating
 * metadata returned by the MDS into our cache.
 *
 * Also define helpers for doing asynchronous writeback, invalidation,
 * and truncation for the benefit of those who can't afford to block
 * (typically because they are in the message handler path).
 */


static const struct inode_operations ceph_symlink_iops;
static const struct inode_operations ceph_encrypted_symlink_iops;

static void ceph_inode_work(struct work_struct *work);

/*
 * find or create an inode, given the ceph ino number
 */

static int ceph_set_ino_cb(struct inode *inode, void *data)
{
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);

 ci->i_vino = *(struct ceph_vino *)data;
 inode->i_ino = ceph_vino_to_ino_t(ci->i_vino);
 inode_set_iversion_raw(inode, 0);
 percpu_counter_inc(&mdsc->metric.total_inodes);

 return 0;
}

/*
 * Check if the parent inode matches the vino from directory reply info
 */

static inline bool ceph_vino_matches_parent(struct inode *parent,
         struct ceph_vino vino)
{
 return ceph_ino(parent) == vino.ino && ceph_snap(parent) == vino.snap;
}

/*
 * Validate that the directory inode referenced by @req->r_parent matches the
 * inode number and snapshot id contained in the reply's directory record.  If
 * they do not match – which can theoretically happen if the parent dentry was
 * moved between the time the request was issued and the reply arrived – fall
 * back to looking up the correct inode in the inode cache.
 *
 * A reference is *always* returned.  Callers that receive a different inode
 * than the original @parent are responsible for dropping the extra reference
 * once the reply has been processed.
 */

static struct inode *ceph_get_reply_dir(struct super_block *sb,
     struct inode *parent,
     struct ceph_mds_reply_info_parsed *rinfo)
{
 struct ceph_vino vino;

 if (unlikely(!rinfo->diri.in))
  return parent; /* nothing to compare against */

 /* If we didn't have a cached parent inode to begin with, just bail out. */
 if (!parent)
  return NULL;

 vino.ino  = le64_to_cpu(rinfo->diri.in->ino);
 vino.snap = le64_to_cpu(rinfo->diri.in->snapid);

 if (likely(ceph_vino_matches_parent(parent, vino)))
  return parent; /* matches – use the original reference */

 /* Mismatch – this should be rare.  Emit a WARN and obtain the correct inode. */
 WARN_ONCE(1, "ceph: reply dir mismatch (parent valid %llx.%llx reply %llx.%llx)\n",
    ceph_ino(parent), ceph_snap(parent), vino.ino, vino.snap);

 return ceph_get_inode(sb, vino, NULL);
}

/**
 * ceph_new_inode - allocate a new inode in advance of an expected create
 * @dir: parent directory for new inode
 * @dentry: dentry that may eventually point to new inode
 * @mode: mode of new inode
 * @as_ctx: pointer to inherited security context
 *
 * Allocate a new inode in advance of an operation to create a new inode.
 * This allocates the inode and sets up the acl_sec_ctx with appropriate
 * info for the new inode.
 *
 * Returns a pointer to the new inode or an ERR_PTR.
 */

struct inode *ceph_new_inode(struct inode *dir, struct dentry *dentry,
        umode_t *mode, struct ceph_acl_sec_ctx *as_ctx)
{
 int err;
 struct inode *inode;

 inode = new_inode(dir->i_sb);
 if (!inode)
  return ERR_PTR(-ENOMEM);

 inode->i_blkbits = CEPH_FSCRYPT_BLOCK_SHIFT;

 if (!S_ISLNK(*mode)) {
  err = ceph_pre_init_acls(dir, mode, as_ctx);
  if (err < 0)
   goto out_err;
 }

 inode->i_state = 0;
 inode->i_mode = *mode;

 err = ceph_security_init_secctx(dentry, *mode, as_ctx);
 if (err < 0)
  goto out_err;

 /*
 * We'll skip setting fscrypt context for snapshots, leaving that for
 * the handle_reply().
 */

 if (ceph_snap(dir) != CEPH_SNAPDIR) {
  err = ceph_fscrypt_prepare_context(dir, inode, as_ctx);
  if (err)
   goto out_err;
 }

 return inode;
out_err:
 iput(inode);
 return ERR_PTR(err);
}

void ceph_as_ctx_to_req(struct ceph_mds_request *req,
   struct ceph_acl_sec_ctx *as_ctx)
{
 if (as_ctx->pagelist) {
  req->r_pagelist = as_ctx->pagelist;
  as_ctx->pagelist = NULL;
 }
 ceph_fscrypt_as_ctx_to_req(req, as_ctx);
}

/**
 * ceph_get_inode - find or create/hash a new inode
 * @sb: superblock to search and allocate in
 * @vino: vino to search for
 * @newino: optional new inode to insert if one isn't found (may be NULL)
 *
 * Search for or insert a new inode into the hash for the given vino, and
 * return a reference to it. If new is non-NULL, its reference is consumed.
 */

struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino,
        struct inode *newino)
{
 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(sb);
 struct ceph_client *cl = mdsc->fsc->client;
 struct inode *inode;

 if (ceph_vino_is_reserved(vino))
  return ERR_PTR(-EREMOTEIO);

 if (newino) {
  inode = inode_insert5(newino, (unsigned long)vino.ino,
          ceph_ino_compare, ceph_set_ino_cb, &vino);
  if (inode != newino)
   iput(newino);
 } else {
  inode = iget5_locked(sb, (unsigned long)vino.ino,
         ceph_ino_compare, ceph_set_ino_cb, &vino);
 }

 if (!inode) {
  doutc(cl, "no inode found for %llx.%llx\n", vino.ino, vino.snap);
  return ERR_PTR(-ENOMEM);
 }

 doutc(cl, "on %llx=%llx.%llx got %p new %d\n",
       ceph_present_inode(inode), ceph_vinop(inode), inode,
       !!(inode->i_state & I_NEW));
 return inode;
}

/*
 * get/construct snapdir inode for a given directory
 */

struct inode *ceph_get_snapdir(struct inode *parent)
{
 struct ceph_client *cl = ceph_inode_to_client(parent);
 struct ceph_vino vino = {
  .ino = ceph_ino(parent),
  .snap = CEPH_SNAPDIR,
 };
 struct inode *inode = ceph_get_inode(parent->i_sb, vino, NULL);
 struct ceph_inode_info *ci = ceph_inode(inode);
 int ret = -ENOTDIR;

 if (IS_ERR(inode))
  return inode;

 if (!S_ISDIR(parent->i_mode)) {
  pr_warn_once_client(cl, "bad snapdir parent type (mode=0%o)\n",
        parent->i_mode);
  goto err;
 }

 if (!(inode->i_state & I_NEW) && !S_ISDIR(inode->i_mode)) {
  pr_warn_once_client(cl, "bad snapdir inode type (mode=0%o)\n",
        inode->i_mode);
  goto err;
 }

 inode->i_mode = parent->i_mode;
 inode->i_uid = parent->i_uid;
 inode->i_gid = parent->i_gid;
 inode_set_mtime_to_ts(inode, inode_get_mtime(parent));
 inode_set_ctime_to_ts(inode, inode_get_ctime(parent));
 inode_set_atime_to_ts(inode, inode_get_atime(parent));
 ci->i_rbytes = 0;
 ci->i_btime = ceph_inode(parent)->i_btime;

#ifdef CONFIG_FS_ENCRYPTION
 /* if encrypted, just borrow fscrypt_auth from parent */
 if (IS_ENCRYPTED(parent)) {
  struct ceph_inode_info *pci = ceph_inode(parent);

  ci->fscrypt_auth = kmemdup(pci->fscrypt_auth,
        pci->fscrypt_auth_len,
        GFP_KERNEL);
  if (ci->fscrypt_auth) {
   inode->i_flags |= S_ENCRYPTED;
   ci->fscrypt_auth_len = pci->fscrypt_auth_len;
  } else {
   doutc(cl, "Failed to alloc snapdir fscrypt_auth\n");
   ret = -ENOMEM;
   goto err;
  }
 }
#endif
 if (inode->i_state & I_NEW) {
  inode->i_op = &ceph_snapdir_iops;
  inode->i_fop = &ceph_snapdir_fops;
  ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
  unlock_new_inode(inode);
 }

 return inode;
err:
 if ((inode->i_state & I_NEW))
  discard_new_inode(inode);
 else
  iput(inode);
 return ERR_PTR(ret);
}

const struct inode_operations ceph_file_iops = {
 .permission = ceph_permission,
 .setattr = ceph_setattr,
 .getattr = ceph_getattr,
 .listxattr = ceph_listxattr,
 .get_inode_acl = ceph_get_acl,
 .set_acl = ceph_set_acl,
};


/*
 * We use a 'frag tree' to keep track of the MDS's directory fragments
 * for a given inode (usually there is just a single fragment).  We
 * need to know when a child frag is delegated to a new MDS, or when
 * it is flagged as replicated, so we can direct our requests
 * accordingly.
 */


/*
 * find/create a frag in the tree
 */

static struct ceph_inode_frag *__get_or_create_frag(struct ceph_inode_info *ci,
          u32 f)
{
 struct inode *inode = &ci->netfs.inode;
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct rb_node **p;
 struct rb_node *parent = NULL;
 struct ceph_inode_frag *frag;
 int c;

 p = &ci->i_fragtree.rb_node;
 while (*p) {
  parent = *p;
  frag = rb_entry(parent, struct ceph_inode_frag, node);
  c = ceph_frag_compare(f, frag->frag);
  if (c < 0)
   p = &(*p)->rb_left;
  else if (c > 0)
   p = &(*p)->rb_right;
  else
   return frag;
 }

 frag = kmalloc(sizeof(*frag), GFP_NOFS);
 if (!frag)
  return ERR_PTR(-ENOMEM);

 frag->frag = f;
 frag->split_by = 0;
 frag->mds = -1;
 frag->ndist = 0;

 rb_link_node(&frag->node, parent, p);
 rb_insert_color(&frag->node, &ci->i_fragtree);

 doutc(cl, "added %p %llx.%llx frag %x\n", inode, ceph_vinop(inode), f);
 return frag;
}

/*
 * find a specific frag @f
 */

struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
{
 struct rb_node *n = ci->i_fragtree.rb_node;

 while (n) {
  struct ceph_inode_frag *frag =
   rb_entry(n, struct ceph_inode_frag, node);
  int c = ceph_frag_compare(f, frag->frag);
  if (c < 0)
   n = n->rb_left;
  else if (c > 0)
   n = n->rb_right;
  else
   return frag;
 }
 return NULL;
}

/*
 * Choose frag containing the given value @v.  If @pfrag is
 * specified, copy the frag delegation info to the caller if
 * it is present.
 */

static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
         struct ceph_inode_frag *pfrag, int *found)
{
 struct ceph_client *cl = ceph_inode_to_client(&ci->netfs.inode);
 u32 t = ceph_frag_make(0, 0);
 struct ceph_inode_frag *frag;
 unsigned nway, i;
 u32 n;

 if (found)
  *found = 0;

 while (1) {
  WARN_ON(!ceph_frag_contains_value(t, v));
  frag = __ceph_find_frag(ci, t);
  if (!frag)
   break/* t is a leaf */
  if (frag->split_by == 0) {
   if (pfrag)
    memcpy(pfrag, frag, sizeof(*pfrag));
   if (found)
    *found = 1;
   break;
  }

  /* choose child */
  nway = 1 << frag->split_by;
  doutc(cl, "frag(%x) %x splits by %d (%d ways)\n", v, t,
        frag->split_by, nway);
  for (i = 0; i < nway; i++) {
   n = ceph_frag_make_child(t, frag->split_by, i);
   if (ceph_frag_contains_value(n, v)) {
    t = n;
    break;
   }
  }
  BUG_ON(i == nway);
 }
 doutc(cl, "frag(%x) = %x\n", v, t);

 return t;
}

u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
       struct ceph_inode_frag *pfrag, int *found)
{
 u32 ret;
 mutex_lock(&ci->i_fragtree_mutex);
 ret = __ceph_choose_frag(ci, v, pfrag, found);
 mutex_unlock(&ci->i_fragtree_mutex);
 return ret;
}

/*
 * Process dirfrag (delegation) info from the mds.  Include leaf
 * fragment in tree ONLY if ndist > 0.  Otherwise, only
 * branches/splits are included in i_fragtree)
 */

static int ceph_fill_dirfrag(struct inode *inode,
        struct ceph_mds_reply_dirfrag *dirinfo)
{
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_frag *frag;
 u32 id = le32_to_cpu(dirinfo->frag);
 int mds = le32_to_cpu(dirinfo->auth);
 int ndist = le32_to_cpu(dirinfo->ndist);
 int diri_auth = -1;
 int i;
 int err = 0;

 spin_lock(&ci->i_ceph_lock);
 if (ci->i_auth_cap)
  diri_auth = ci->i_auth_cap->mds;
 spin_unlock(&ci->i_ceph_lock);

 if (mds == -1) /* CDIR_AUTH_PARENT */
  mds = diri_auth;

 mutex_lock(&ci->i_fragtree_mutex);
 if (ndist == 0 && mds == diri_auth) {
  /* no delegation info needed. */
  frag = __ceph_find_frag(ci, id);
  if (!frag)
   goto out;
  if (frag->split_by == 0) {
   /* tree leaf, remove */
   doutc(cl, "removed %p %llx.%llx frag %x (no ref)\n",
         inode, ceph_vinop(inode), id);
   rb_erase(&frag->node, &ci->i_fragtree);
   kfree(frag);
  } else {
   /* tree branch, keep and clear */
   doutc(cl, "cleared %p %llx.%llx frag %x referral\n",
         inode, ceph_vinop(inode), id);
   frag->mds = -1;
   frag->ndist = 0;
  }
  goto out;
 }


 /* find/add this frag to store mds delegation info */
 frag = __get_or_create_frag(ci, id);
 if (IS_ERR(frag)) {
  /* this is not the end of the world; we can continue
   with bad/inaccurate delegation info */

  pr_err_client(cl, "ENOMEM on mds ref %p %llx.%llx fg %x\n",
         inode, ceph_vinop(inode),
         le32_to_cpu(dirinfo->frag));
  err = -ENOMEM;
  goto out;
 }

 frag->mds = mds;
 frag->ndist = min_t(u32, ndist, CEPH_MAX_DIRFRAG_REP);
 for (i = 0; i < frag->ndist; i++)
  frag->dist[i] = le32_to_cpu(dirinfo->dist[i]);
 doutc(cl, "%p %llx.%llx frag %x ndist=%d\n", inode,
       ceph_vinop(inode), frag->frag, frag->ndist);

out:
 mutex_unlock(&ci->i_fragtree_mutex);
 return err;
}

static int frag_tree_split_cmp(const void *l, const void *r)
{
 struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l;
 struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r;
 return ceph_frag_compare(le32_to_cpu(ls->frag),
     le32_to_cpu(rs->frag));
}

static bool is_frag_child(u32 f, struct ceph_inode_frag *frag)
{
 if (!frag)
  return f == ceph_frag_make(0, 0);
 if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by)
  return false;
 return ceph_frag_contains_value(frag->frag, ceph_frag_value(f));
}

static int ceph_fill_fragtree(struct inode *inode,
         struct ceph_frag_tree_head *fragtree,
         struct ceph_mds_reply_dirfrag *dirinfo)
{
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct ceph_inode_frag *frag, *prev_frag = NULL;
 struct rb_node *rb_node;
 unsigned i, split_by, nsplits;
 u32 id;
 bool update = false;

 mutex_lock(&ci->i_fragtree_mutex);
 nsplits = le32_to_cpu(fragtree->nsplits);
 if (nsplits != ci->i_fragtree_nsplits) {
  update = true;
 } else if (nsplits) {
  i = get_random_u32_below(nsplits);
  id = le32_to_cpu(fragtree->splits[i].frag);
  if (!__ceph_find_frag(ci, id))
   update = true;
 } else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
  rb_node = rb_first(&ci->i_fragtree);
  frag = rb_entry(rb_node, struct ceph_inode_frag, node);
  if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
   update = true;
 }
 if (!update && dirinfo) {
  id = le32_to_cpu(dirinfo->frag);
  if (id != __ceph_choose_frag(ci, id, NULL, NULL))
   update = true;
 }
 if (!update)
  goto out_unlock;

 if (nsplits > 1) {
  sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]),
       frag_tree_split_cmp, NULL);
 }

 doutc(cl, "%p %llx.%llx\n", inode, ceph_vinop(inode));
 rb_node = rb_first(&ci->i_fragtree);
 for (i = 0; i < nsplits; i++) {
  id = le32_to_cpu(fragtree->splits[i].frag);
  split_by = le32_to_cpu(fragtree->splits[i].by);
  if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) {
   pr_err_client(cl, "%p %llx.%llx invalid split %d/%u, "
          "frag %x split by %d\n", inode,
          ceph_vinop(inode), i, nsplits, id, split_by);
   continue;
  }
  frag = NULL;
  while (rb_node) {
   frag = rb_entry(rb_node, struct ceph_inode_frag, node);
   if (ceph_frag_compare(frag->frag, id) >= 0) {
    if (frag->frag != id)
     frag = NULL;
    else
     rb_node = rb_next(rb_node);
    break;
   }
   rb_node = rb_next(rb_node);
   /* delete stale split/leaf node */
   if (frag->split_by > 0 ||
       !is_frag_child(frag->frag, prev_frag)) {
    rb_erase(&frag->node, &ci->i_fragtree);
    if (frag->split_by > 0)
     ci->i_fragtree_nsplits--;
    kfree(frag);
   }
   frag = NULL;
  }
  if (!frag) {
   frag = __get_or_create_frag(ci, id);
   if (IS_ERR(frag))
    continue;
  }
  if (frag->split_by == 0)
   ci->i_fragtree_nsplits++;
  frag->split_by = split_by;
  doutc(cl, " frag %x split by %d\n", frag->frag, frag->split_by);
  prev_frag = frag;
 }
 while (rb_node) {
  frag = rb_entry(rb_node, struct ceph_inode_frag, node);
  rb_node = rb_next(rb_node);
  /* delete stale split/leaf node */
  if (frag->split_by > 0 ||
      !is_frag_child(frag->frag, prev_frag)) {
   rb_erase(&frag->node, &ci->i_fragtree);
   if (frag->split_by > 0)
    ci->i_fragtree_nsplits--;
   kfree(frag);
  }
 }
out_unlock:
 mutex_unlock(&ci->i_fragtree_mutex);
 return 0;
}

/*
 * initialize a newly allocated inode.
 */

struct inode *ceph_alloc_inode(struct super_block *sb)
{
 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(sb);
 struct ceph_inode_info *ci;
 int i;

 ci = alloc_inode_sb(sb, ceph_inode_cachep, GFP_NOFS);
 if (!ci)
  return NULL;

 doutc(fsc->client, "%p\n", &ci->netfs.inode);

 /* Set parameters for the netfs library */
 netfs_inode_init(&ci->netfs, &ceph_netfs_ops, false);

 spin_lock_init(&ci->i_ceph_lock);

 ci->i_version = 0;
 ci->i_inline_version = 0;
 ci->i_time_warp_seq = 0;
 ci->i_ceph_flags = 0;
 atomic64_set(&ci->i_ordered_count, 1);
 atomic64_set(&ci->i_release_count, 1);
 atomic64_set(&ci->i_complete_seq[0], 0);
 atomic64_set(&ci->i_complete_seq[1], 0);
 ci->i_symlink = NULL;

 ci->i_max_bytes = 0;
 ci->i_max_files = 0;

 memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
 RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);

 ci->i_fragtree = RB_ROOT;
 mutex_init(&ci->i_fragtree_mutex);

 ci->i_xattrs.blob = NULL;
 ci->i_xattrs.prealloc_blob = NULL;
 ci->i_xattrs.dirty = false;
 ci->i_xattrs.index = RB_ROOT;
 ci->i_xattrs.count = 0;
 ci->i_xattrs.names_size = 0;
 ci->i_xattrs.vals_size = 0;
 ci->i_xattrs.version = 0;
 ci->i_xattrs.index_version = 0;

 ci->i_caps = RB_ROOT;
 ci->i_auth_cap = NULL;
 ci->i_dirty_caps = 0;
 ci->i_flushing_caps = 0;
 INIT_LIST_HEAD(&ci->i_dirty_item);
 INIT_LIST_HEAD(&ci->i_flushing_item);
 ci->i_prealloc_cap_flush = NULL;
 INIT_LIST_HEAD(&ci->i_cap_flush_list);
 init_waitqueue_head(&ci->i_cap_wq);
 ci->i_hold_caps_max = 0;
 INIT_LIST_HEAD(&ci->i_cap_delay_list);
 INIT_LIST_HEAD(&ci->i_cap_snaps);
 ci->i_head_snapc = NULL;
 ci->i_snap_caps = 0;

 ci->i_last_rd = ci->i_last_wr = jiffies - 3600 * HZ;
 for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
  ci->i_nr_by_mode[i] = 0;

 mutex_init(&ci->i_truncate_mutex);
 ci->i_truncate_seq = 0;
 ci->i_truncate_size = 0;
 ci->i_truncate_pending = 0;
 ci->i_truncate_pagecache_size = 0;

 ci->i_max_size = 0;
 ci->i_reported_size = 0;
 ci->i_wanted_max_size = 0;
 ci->i_requested_max_size = 0;

 ci->i_pin_ref = 0;
 ci->i_rd_ref = 0;
 ci->i_rdcache_ref = 0;
 ci->i_wr_ref = 0;
 ci->i_wb_ref = 0;
 ci->i_fx_ref = 0;
 ci->i_wrbuffer_ref = 0;
 ci->i_wrbuffer_ref_head = 0;
 atomic_set(&ci->i_filelock_ref, 0);
 atomic_set(&ci->i_shared_gen, 1);
 ci->i_rdcache_gen = 0;
 ci->i_rdcache_revoking = 0;

 INIT_LIST_HEAD(&ci->i_unsafe_dirops);
 INIT_LIST_HEAD(&ci->i_unsafe_iops);
 spin_lock_init(&ci->i_unsafe_lock);

 ci->i_snap_realm = NULL;
 INIT_LIST_HEAD(&ci->i_snap_realm_item);
 INIT_LIST_HEAD(&ci->i_snap_flush_item);

 INIT_WORK(&ci->i_work, ceph_inode_work);
 ci->i_work_mask = 0;
 memset(&ci->i_btime, '\0'sizeof(ci->i_btime));
#ifdef CONFIG_FS_ENCRYPTION
 ci->fscrypt_auth = NULL;
 ci->fscrypt_auth_len = 0;
#endif
 return &ci->netfs.inode;
}

void ceph_free_inode(struct inode *inode)
{
 struct ceph_inode_info *ci = ceph_inode(inode);

 kfree(ci->i_symlink);
#ifdef CONFIG_FS_ENCRYPTION
 kfree(ci->fscrypt_auth);
#endif
 fscrypt_free_inode(inode);
 kmem_cache_free(ceph_inode_cachep, ci);
}

void ceph_evict_inode(struct inode *inode)
{
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_frag *frag;
 struct rb_node *n;

 doutc(cl, "%p ino %llx.%llx\n", inode, ceph_vinop(inode));

 percpu_counter_dec(&mdsc->metric.total_inodes);

 netfs_wait_for_outstanding_io(inode);
 truncate_inode_pages_final(&inode->i_data);
 if (inode->i_state & I_PINNING_NETFS_WB)
  ceph_fscache_unuse_cookie(inode, true);
 clear_inode(inode);

 ceph_fscache_unregister_inode_cookie(ci);
 fscrypt_put_encryption_info(inode);

 __ceph_remove_caps(ci);

 if (__ceph_has_quota(ci, QUOTA_GET_ANY))
  ceph_adjust_quota_realms_count(inode, false);

 /*
 * we may still have a snap_realm reference if there are stray
 * caps in i_snap_caps.
 */

 if (ci->i_snap_realm) {
  if (ceph_snap(inode) == CEPH_NOSNAP) {
   doutc(cl, " dropping residual ref to snap realm %p\n",
         ci->i_snap_realm);
   ceph_change_snap_realm(inode, NULL);
  } else {
   ceph_put_snapid_map(mdsc, ci->i_snapid_map);
   ci->i_snap_realm = NULL;
  }
 }

 while ((n = rb_first(&ci->i_fragtree)) != NULL) {
  frag = rb_entry(n, struct ceph_inode_frag, node);
  rb_erase(n, &ci->i_fragtree);
  kfree(frag);
 }
 ci->i_fragtree_nsplits = 0;

 __ceph_destroy_xattrs(ci);
 if (ci->i_xattrs.blob)
  ceph_buffer_put(ci->i_xattrs.blob);
 if (ci->i_xattrs.prealloc_blob)
  ceph_buffer_put(ci->i_xattrs.prealloc_blob);

 ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
}

static inline blkcnt_t calc_inode_blocks(u64 size)
{
 return (size + (1<<9) - 1) >> 9;
}

/*
 * Helpers to fill in size, ctime, mtime, and atime.  We have to be
 * careful because either the client or MDS may have more up to date
 * info, depending on which capabilities are held, and whether
 * time_warp_seq or truncate_seq have increased.  (Ordinarily, mtime
 * and size are monotonically increasing, except when utimes() or
 * truncate() increments the corresponding _seq values.)
 */

int ceph_fill_file_size(struct inode *inode, int issued,
   u32 truncate_seq, u64 truncate_size, u64 size)
{
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 int queue_trunc = 0;
 loff_t isize = i_size_read(inode);

 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
     (truncate_seq == ci->i_truncate_seq && size > isize)) {
  doutc(cl, "size %lld -> %llu\n", isize, size);
  if (size > 0 && S_ISDIR(inode->i_mode)) {
   pr_err_client(cl, "non-zero size for directory\n");
   size = 0;
  }
  i_size_write(inode, size);
  inode->i_blocks = calc_inode_blocks(size);
  /*
 * If we're expanding, then we should be able to just update
 * the existing cookie.
 */

  if (size > isize)
   ceph_fscache_update(inode);
  ci->i_reported_size = size;
  if (truncate_seq != ci->i_truncate_seq) {
   doutc(cl, "truncate_seq %u -> %u\n",
         ci->i_truncate_seq, truncate_seq);
   ci->i_truncate_seq = truncate_seq;

   /* the MDS should have revoked these caps */
   WARN_ON_ONCE(issued & (CEPH_CAP_FILE_RD |
            CEPH_CAP_FILE_LAZYIO));
   /*
 * If we hold relevant caps, or in the case where we're
 * not the only client referencing this file and we
 * don't hold those caps, then we need to check whether
 * the file is either opened or mmaped
 */

   if ((issued & (CEPH_CAP_FILE_CACHE|
           CEPH_CAP_FILE_BUFFER)) ||
       mapping_mapped(inode->i_mapping) ||
       __ceph_is_file_opened(ci)) {
    ci->i_truncate_pending++;
    queue_trunc = 1;
   }
  }
 }

 /*
 * It's possible that the new sizes of the two consecutive
 * size truncations will be in the same fscrypt last block,
 * and we need to truncate the corresponding page caches
 * anyway.
 */

 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) >= 0) {
  doutc(cl, "truncate_size %lld -> %llu, encrypted %d\n",
        ci->i_truncate_size, truncate_size,
        !!IS_ENCRYPTED(inode));

  ci->i_truncate_size = truncate_size;

  if (IS_ENCRYPTED(inode)) {
   doutc(cl, "truncate_pagecache_size %lld -> %llu\n",
         ci->i_truncate_pagecache_size, size);
   ci->i_truncate_pagecache_size = size;
  } else {
   ci->i_truncate_pagecache_size = truncate_size;
  }
 }
 return queue_trunc;
}

void ceph_fill_file_time(struct inode *inode, int issued,
    u64 time_warp_seq, struct timespec64 *ctime,
    struct timespec64 *mtime, struct timespec64 *atime)
{
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct timespec64 ictime = inode_get_ctime(inode);
 int warn = 0;

 if (issued & (CEPH_CAP_FILE_EXCL|
        CEPH_CAP_FILE_WR|
        CEPH_CAP_FILE_BUFFER|
        CEPH_CAP_AUTH_EXCL|
        CEPH_CAP_XATTR_EXCL)) {
  if (ci->i_version == 0 ||
      timespec64_compare(ctime, &ictime) > 0) {
   doutc(cl, "ctime %lld.%09ld -> %lld.%09ld inc w/ cap\n",
        ictime.tv_sec, ictime.tv_nsec,
        ctime->tv_sec, ctime->tv_nsec);
   inode_set_ctime_to_ts(inode, *ctime);
  }
  if (ci->i_version == 0 ||
      ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) > 0) {
   /* the MDS did a utimes() */
   doutc(cl, "mtime %lld.%09ld -> %lld.%09ld tw %d -> %d\n",
        inode_get_mtime_sec(inode),
        inode_get_mtime_nsec(inode),
        mtime->tv_sec, mtime->tv_nsec,
        ci->i_time_warp_seq, (int)time_warp_seq);

   inode_set_mtime_to_ts(inode, *mtime);
   inode_set_atime_to_ts(inode, *atime);
   ci->i_time_warp_seq = time_warp_seq;
  } else if (time_warp_seq == ci->i_time_warp_seq) {
   struct timespec64 ts;

   /* nobody did utimes(); take the max */
   ts = inode_get_mtime(inode);
   if (timespec64_compare(mtime, &ts) > 0) {
    doutc(cl, "mtime %lld.%09ld -> %lld.%09ld inc\n",
         ts.tv_sec, ts.tv_nsec,
         mtime->tv_sec, mtime->tv_nsec);
    inode_set_mtime_to_ts(inode, *mtime);
   }
   ts = inode_get_atime(inode);
   if (timespec64_compare(atime, &ts) > 0) {
    doutc(cl, "atime %lld.%09ld -> %lld.%09ld inc\n",
         ts.tv_sec, ts.tv_nsec,
         atime->tv_sec, atime->tv_nsec);
    inode_set_atime_to_ts(inode, *atime);
   }
  } else if (issued & CEPH_CAP_FILE_EXCL) {
   /* we did a utimes(); ignore mds values */
  } else {
   warn = 1;
  }
 } else {
  /* we have no write|excl caps; whatever the MDS says is true */
  if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {
   inode_set_ctime_to_ts(inode, *ctime);
   inode_set_mtime_to_ts(inode, *mtime);
   inode_set_atime_to_ts(inode, *atime);
   ci->i_time_warp_seq = time_warp_seq;
  } else {
   warn = 1;
  }
 }
 if (warn) /* time_warp_seq shouldn't go backwards */
  doutc(cl, "%p mds time_warp_seq %llu < %u\n", inode,
        time_warp_seq, ci->i_time_warp_seq);
}

#if IS_ENABLED(CONFIG_FS_ENCRYPTION)
static int decode_encrypted_symlink(struct ceph_mds_client *mdsc,
        const char *encsym,
        int enclen, u8 **decsym)
{
 struct ceph_client *cl = mdsc->fsc->client;
 int declen;
 u8 *sym;

 sym = kmalloc(enclen + 1, GFP_NOFS);
 if (!sym)
  return -ENOMEM;

 declen = ceph_base64_decode(encsym, enclen, sym);
 if (declen < 0) {
  pr_err_client(cl,
   "can't decode symlink (%d). Content: %.*s\n",
   declen, enclen, encsym);
  kfree(sym);
  return -EIO;
 }
 sym[declen + 1] = '\0';
 *decsym = sym;
 return declen;
}
#else
static int decode_encrypted_symlink(struct ceph_mds_client *mdsc,
        const char *encsym,
        int symlen, u8 **decsym)
{
 return -EOPNOTSUPP;
}
#endif

/*
 * Populate an inode based on info from mds.  May be called on new or
 * existing inodes.
 */

int ceph_fill_inode(struct inode *inode, struct page *locked_page,
      struct ceph_mds_reply_info_in *iinfo,
      struct ceph_mds_reply_dirfrag *dirinfo,
      struct ceph_mds_session *session, int cap_fmode,
      struct ceph_cap_reservation *caps_reservation)
{
 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 struct ceph_client *cl = mdsc->fsc->client;
 struct ceph_mds_reply_inode *info = iinfo->in;
 struct ceph_inode_info *ci = ceph_inode(inode);
 int issued, new_issued, info_caps;
 struct timespec64 mtime, atime, ctime;
 struct ceph_buffer *xattr_blob = NULL;
 struct ceph_buffer *old_blob = NULL;
 struct ceph_string *pool_ns = NULL;
 struct ceph_cap *new_cap = NULL;
 int err = 0;
 bool wake = false;
 bool queue_trunc = false;
 bool new_version = false;
 bool fill_inline = false;
 umode_t mode = le32_to_cpu(info->mode);
 dev_t rdev = le32_to_cpu(info->rdev);

 lockdep_assert_held(&mdsc->snap_rwsem);

 doutc(cl, "%p ino %llx.%llx v %llu had %llu\n", inode, ceph_vinop(inode),
       le64_to_cpu(info->version), ci->i_version);

 /* Once I_NEW is cleared, we can't change type or dev numbers */
 if (inode->i_state & I_NEW) {
  inode->i_mode = mode;
 } else {
  if (inode_wrong_type(inode, mode)) {
   pr_warn_once_client(cl,
    "inode type changed! (ino %llx.%llx is 0%o, mds says 0%o)\n",
    ceph_vinop(inode), inode->i_mode, mode);
   return -ESTALE;
  }

  if ((S_ISCHR(mode) || S_ISBLK(mode)) && inode->i_rdev != rdev) {
   pr_warn_once_client(cl,
    "dev inode rdev changed! (ino %llx.%llx is %u:%u, mds says %u:%u)\n",
    ceph_vinop(inode), MAJOR(inode->i_rdev),
    MINOR(inode->i_rdev), MAJOR(rdev),
    MINOR(rdev));
   return -ESTALE;
  }
 }

 info_caps = le32_to_cpu(info->cap.caps);

 /* prealloc new cap struct */
 if (info_caps && ceph_snap(inode) == CEPH_NOSNAP) {
  new_cap = ceph_get_cap(mdsc, caps_reservation);
  if (!new_cap)
   return -ENOMEM;
 }

 /*
 * prealloc xattr data, if it looks like we'll need it.  only
 * if len > 4 (meaning there are actually xattrs; the first 4
 * bytes are the xattr count).
 */

 if (iinfo->xattr_len > 4) {
  xattr_blob = ceph_buffer_new(iinfo->xattr_len, GFP_NOFS);
  if (!xattr_blob)
   pr_err_client(cl, "ENOMEM xattr blob %d bytes\n",
          iinfo->xattr_len);
 }

 if (iinfo->pool_ns_len > 0)
  pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
           iinfo->pool_ns_len);

 if (ceph_snap(inode) != CEPH_NOSNAP && !ci->i_snapid_map)
  ci->i_snapid_map = ceph_get_snapid_map(mdsc, ceph_snap(inode));

 spin_lock(&ci->i_ceph_lock);

 /*
 * provided version will be odd if inode value is projected,
 * even if stable.  skip the update if we have newer stable
 * info (ours>=theirs, e.g. due to racing mds replies), unless
 * we are getting projected (unstable) info (in which case the
 * version is odd, and we want ours>theirs).
 *   us   them
 *   2    2     skip
 *   3    2     skip
 *   3    3     update
 */

 if (ci->i_version == 0 ||
     ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
      le64_to_cpu(info->version) > (ci->i_version & ~1)))
  new_version = true;

 /* Update change_attribute */
 inode_set_max_iversion_raw(inode, iinfo->change_attr);

 __ceph_caps_issued(ci, &issued);
 issued |= __ceph_caps_dirty(ci);
 new_issued = ~issued & info_caps;

 __ceph_update_quota(ci, iinfo->max_bytes, iinfo->max_files);

#ifdef CONFIG_FS_ENCRYPTION
 if (iinfo->fscrypt_auth_len &&
     ((inode->i_state & I_NEW) || (ci->fscrypt_auth_len == 0))) {
  kfree(ci->fscrypt_auth);
  ci->fscrypt_auth_len = iinfo->fscrypt_auth_len;
  ci->fscrypt_auth = iinfo->fscrypt_auth;
  iinfo->fscrypt_auth = NULL;
  iinfo->fscrypt_auth_len = 0;
  inode_set_flags(inode, S_ENCRYPTED, S_ENCRYPTED);
 }
#endif

 if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
     (issued & CEPH_CAP_AUTH_EXCL) == 0) {
  inode->i_mode = mode;
  inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
  inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
  doutc(cl, "%p %llx.%llx mode 0%o uid.gid %d.%d\n", inode,
        ceph_vinop(inode), inode->i_mode,
        from_kuid(&init_user_ns, inode->i_uid),
        from_kgid(&init_user_ns, inode->i_gid));
  ceph_decode_timespec64(&ci->i_btime, &iinfo->btime);
  ceph_decode_timespec64(&ci->i_snap_btime, &iinfo->snap_btime);
 }

 /* directories have fl_stripe_unit set to zero */
 if (IS_ENCRYPTED(inode))
  inode->i_blkbits = CEPH_FSCRYPT_BLOCK_SHIFT;
 else if (le32_to_cpu(info->layout.fl_stripe_unit))
  inode->i_blkbits =
   fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 else
  inode->i_blkbits = CEPH_BLOCK_SHIFT;

 if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
     (issued & CEPH_CAP_LINK_EXCL) == 0)
  set_nlink(inode, le32_to_cpu(info->nlink));

 if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
  /* be careful with mtime, atime, size */
  ceph_decode_timespec64(&atime, &info->atime);
  ceph_decode_timespec64(&mtime, &info->mtime);
  ceph_decode_timespec64(&ctime, &info->ctime);
  ceph_fill_file_time(inode, issued,
    le32_to_cpu(info->time_warp_seq),
    &ctime, &mtime, &atime);
 }

 if (new_version || (info_caps & CEPH_CAP_FILE_SHARED)) {
  ci->i_files = le64_to_cpu(info->files);
  ci->i_subdirs = le64_to_cpu(info->subdirs);
 }

 if (new_version ||
     (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
  u64 size = le64_to_cpu(info->size);
  s64 old_pool = ci->i_layout.pool_id;
  struct ceph_string *old_ns;

  ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
  old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
     lockdep_is_held(&ci->i_ceph_lock));
  rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);

  if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
   ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;

  pool_ns = old_ns;

  if (IS_ENCRYPTED(inode) && size &&
      iinfo->fscrypt_file_len == sizeof(__le64)) {
   u64 fsize = __le64_to_cpu(*(__le64 *)iinfo->fscrypt_file);

   if (size == round_up(fsize, CEPH_FSCRYPT_BLOCK_SIZE)) {
    size = fsize;
   } else {
    pr_warn_client(cl,
     "fscrypt size mismatch: size=%llu fscrypt_file=%llu, discarding fscrypt_file size.\n",
     info->size, size);
   }
  }

  queue_trunc = ceph_fill_file_size(inode, issued,
     le32_to_cpu(info->truncate_seq),
     le64_to_cpu(info->truncate_size),
     size);
  /* only update max_size on auth cap */
  if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
      ci->i_max_size != le64_to_cpu(info->max_size)) {
   doutc(cl, "max_size %lld -> %llu\n",
       ci->i_max_size, le64_to_cpu(info->max_size));
   ci->i_max_size = le64_to_cpu(info->max_size);
  }
 }

 /* layout and rstat are not tracked by capability, update them if
 * the inode info is from auth mds */

 if (new_version || (info->cap.flags & CEPH_CAP_FLAG_AUTH)) {
  if (S_ISDIR(inode->i_mode)) {
   ci->i_dir_layout = iinfo->dir_layout;
   ci->i_rbytes = le64_to_cpu(info->rbytes);
   ci->i_rfiles = le64_to_cpu(info->rfiles);
   ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
   ci->i_dir_pin = iinfo->dir_pin;
   ci->i_rsnaps = iinfo->rsnaps;
   ceph_decode_timespec64(&ci->i_rctime, &info->rctime);
  }
 }

 /* xattrs */
 /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
 if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))  &&
     le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
  if (ci->i_xattrs.blob)
   old_blob = ci->i_xattrs.blob;
  ci->i_xattrs.blob = xattr_blob;
  if (xattr_blob)
   memcpy(ci->i_xattrs.blob->vec.iov_base,
          iinfo->xattr_data, iinfo->xattr_len);
  ci->i_xattrs.version = le64_to_cpu(info->xattr_version);
  ceph_forget_all_cached_acls(inode);
  ceph_security_invalidate_secctx(inode);
  xattr_blob = NULL;
 }

 /* finally update i_version */
 if (le64_to_cpu(info->version) > ci->i_version)
  ci->i_version = le64_to_cpu(info->version);

 inode->i_mapping->a_ops = &ceph_aops;

 switch (inode->i_mode & S_IFMT) {
 case S_IFIFO:
 case S_IFBLK:
 case S_IFCHR:
 case S_IFSOCK:
  inode->i_blkbits = PAGE_SHIFT;
  init_special_inode(inode, inode->i_mode, rdev);
  inode->i_op = &ceph_file_iops;
  break;
 case S_IFREG:
  inode->i_op = &ceph_file_iops;
  inode->i_fop = &ceph_file_fops;
  break;
 case S_IFLNK:
  if (!ci->i_symlink) {
   u32 symlen = iinfo->symlink_len;
   char *sym;

   spin_unlock(&ci->i_ceph_lock);

   if (IS_ENCRYPTED(inode)) {
    if (symlen != i_size_read(inode))
     pr_err_client(cl,
      "%p %llx.%llx BAD symlink size %lld\n",
      inode, ceph_vinop(inode),
      i_size_read(inode));

    err = decode_encrypted_symlink(mdsc, iinfo->symlink,
              symlen, (u8 **)&sym);
    if (err < 0) {
     pr_err_client(cl,
      "decoding encrypted symlink failed: %d\n",
      err);
     goto out;
    }
    symlen = err;
    i_size_write(inode, symlen);
    inode->i_blocks = calc_inode_blocks(symlen);
   } else {
    if (symlen != i_size_read(inode)) {
     pr_err_client(cl,
      "%p %llx.%llx BAD symlink size %lld\n",
      inode, ceph_vinop(inode),
      i_size_read(inode));
     i_size_write(inode, symlen);
     inode->i_blocks = calc_inode_blocks(symlen);
    }

    err = -ENOMEM;
    sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS);
    if (!sym)
     goto out;
   }

   spin_lock(&ci->i_ceph_lock);
   if (!ci->i_symlink)
    ci->i_symlink = sym;
   else
    kfree(sym); /* lost a race */
  }

  if (IS_ENCRYPTED(inode)) {
   /*
 * Encrypted symlinks need to be decrypted before we can
 * cache their targets in i_link. Don't touch it here.
 */

   inode->i_op = &ceph_encrypted_symlink_iops;
  } else {
   inode->i_link = ci->i_symlink;
   inode->i_op = &ceph_symlink_iops;
  }
  break;
 case S_IFDIR:
  inode->i_op = &ceph_dir_iops;
  inode->i_fop = &ceph_dir_fops;
  break;
 default:
  pr_err_client(cl, "%p %llx.%llx BAD mode 0%o\n", inode,
         ceph_vinop(inode), inode->i_mode);
 }

 /* were we issued a capability? */
 if (info_caps) {
  if (ceph_snap(inode) == CEPH_NOSNAP) {
   ceph_add_cap(inode, session,
         le64_to_cpu(info->cap.cap_id),
         info_caps,
         le32_to_cpu(info->cap.wanted),
         le32_to_cpu(info->cap.seq),
         le32_to_cpu(info->cap.mseq),
         le64_to_cpu(info->cap.realm),
         info->cap.flags, &new_cap);

   /* set dir completion flag? */
   if (S_ISDIR(inode->i_mode) &&
       ci->i_files == 0 && ci->i_subdirs == 0 &&
       (info_caps & CEPH_CAP_FILE_SHARED) &&
       (issued & CEPH_CAP_FILE_EXCL) == 0 &&
       !__ceph_dir_is_complete(ci)) {
    doutc(cl, " marking %p complete (empty)\n",
          inode);
    i_size_write(inode, 0);
    __ceph_dir_set_complete(ci,
     atomic64_read(&ci->i_release_count),
     atomic64_read(&ci->i_ordered_count));
   }

   wake = true;
  } else {
   doutc(cl, " %p got snap_caps %s\n", inode,
         ceph_cap_string(info_caps));
   ci->i_snap_caps |= info_caps;
  }
 }

 if (iinfo->inline_version > 0 &&
     iinfo->inline_version >= ci->i_inline_version) {
  int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
  ci->i_inline_version = iinfo->inline_version;
  if (ceph_has_inline_data(ci) &&
      (locked_page || (info_caps & cache_caps)))
   fill_inline = true;
 }

 if (cap_fmode >= 0) {
  if (!info_caps)
   pr_warn_client(cl, "mds issued no caps on %llx.%llx\n",
           ceph_vinop(inode));
  __ceph_touch_fmode(ci, mdsc, cap_fmode);
 }

 spin_unlock(&ci->i_ceph_lock);

 ceph_fscache_register_inode_cookie(inode);

 if (fill_inline)
  ceph_fill_inline_data(inode, locked_page,
          iinfo->inline_data, iinfo->inline_len);

 if (wake)
  wake_up_all(&ci->i_cap_wq);

 /* queue truncate if we saw i_size decrease */
 if (queue_trunc)
  ceph_queue_vmtruncate(inode);

 /* populate frag tree */
 if (S_ISDIR(inode->i_mode))
  ceph_fill_fragtree(inode, &info->fragtree, dirinfo);

 /* update delegation info? */
 if (dirinfo)
  ceph_fill_dirfrag(inode, dirinfo);

 err = 0;
out:
 if (new_cap)
  ceph_put_cap(mdsc, new_cap);
 ceph_buffer_put(old_blob);
 ceph_buffer_put(xattr_blob);
 ceph_put_string(pool_ns);
 return err;
}

/*
 * caller should hold session s_mutex and dentry->d_lock.
 */

static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
      struct ceph_mds_reply_lease *lease,
      struct ceph_mds_session *session,
      unsigned long from_time,
      struct ceph_mds_session **old_lease_session)
{
 struct ceph_client *cl = ceph_inode_to_client(dir);
 struct ceph_dentry_info *di = ceph_dentry(dentry);
 unsigned mask = le16_to_cpu(lease->mask);
 long unsigned duration = le32_to_cpu(lease->duration_ms);
 long unsigned ttl = from_time + (duration * HZ) / 1000;
 long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000;

 doutc(cl, "%p duration %lu ms ttl %lu\n", dentry, duration, ttl);

 /* only track leases on regular dentries */
 if (ceph_snap(dir) != CEPH_NOSNAP)
  return;

 if (mask & CEPH_LEASE_PRIMARY_LINK)
  di->flags |= CEPH_DENTRY_PRIMARY_LINK;
 else
  di->flags &= ~CEPH_DENTRY_PRIMARY_LINK;

 di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
 if (!(mask & CEPH_LEASE_VALID)) {
  __ceph_dentry_dir_lease_touch(di);
  return;
 }

 if (di->lease_gen == atomic_read(&session->s_cap_gen) &&
     time_before(ttl, di->time))
  return;  /* we already have a newer lease. */

 if (di->lease_session && di->lease_session != session) {
  *old_lease_session = di->lease_session;
  di->lease_session = NULL;
 }

 if (!di->lease_session)
  di->lease_session = ceph_get_mds_session(session);
 di->lease_gen = atomic_read(&session->s_cap_gen);
 di->lease_seq = le32_to_cpu(lease->seq);
 di->lease_renew_after = half_ttl;
 di->lease_renew_from = 0;
 di->time = ttl;

 __ceph_dentry_lease_touch(di);
}

static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry,
     struct ceph_mds_reply_lease *lease,
     struct ceph_mds_session *session,
     unsigned long from_time)
{
 struct ceph_mds_session *old_lease_session = NULL;
 spin_lock(&dentry->d_lock);
 __update_dentry_lease(dir, dentry, lease, session, from_time,
         &old_lease_session);
 spin_unlock(&dentry->d_lock);
 ceph_put_mds_session(old_lease_session);
}

/*
 * update dentry lease without having parent inode locked
 */

static void update_dentry_lease_careful(struct dentry *dentry,
     struct ceph_mds_reply_lease *lease,
     struct ceph_mds_session *session,
     unsigned long from_time,
     char *dname, u32 dname_len,
     struct ceph_vino *pdvino,
     struct ceph_vino *ptvino)

{
 struct inode *dir;
 struct ceph_mds_session *old_lease_session = NULL;

 spin_lock(&dentry->d_lock);
 /* make sure dentry's name matches target */
 if (dentry->d_name.len != dname_len ||
     memcmp(dentry->d_name.name, dname, dname_len))
  goto out_unlock;

 dir = d_inode(dentry->d_parent);
 /* make sure parent matches dvino */
 if (!ceph_ino_compare(dir, pdvino))
  goto out_unlock;

 /* make sure dentry's inode matches target. NULL ptvino means that
 * we expect a negative dentry */

 if (ptvino) {
  if (d_really_is_negative(dentry))
   goto out_unlock;
  if (!ceph_ino_compare(d_inode(dentry), ptvino))
   goto out_unlock;
 } else {
  if (d_really_is_positive(dentry))
   goto out_unlock;
 }

 __update_dentry_lease(dir, dentry, lease, session,
         from_time, &old_lease_session);
out_unlock:
 spin_unlock(&dentry->d_lock);
 ceph_put_mds_session(old_lease_session);
}

/*
 * splice a dentry to an inode.
 * caller must hold directory i_rwsem for this to be safe.
 */

static int splice_dentry(struct dentry **pdn, struct inode *in)
{
 struct ceph_client *cl = ceph_inode_to_client(in);
 struct dentry *dn = *pdn;
 struct dentry *realdn;

 BUG_ON(d_inode(dn));

 if (S_ISDIR(in->i_mode)) {
  /* If inode is directory, d_splice_alias() below will remove
 * 'realdn' from its origin parent. We need to ensure that
 * origin parent's readdir cache will not reference 'realdn'
 */

  realdn = d_find_any_alias(in);
  if (realdn) {
   struct ceph_dentry_info *di = ceph_dentry(realdn);
   spin_lock(&realdn->d_lock);

   realdn->d_op->d_prune(realdn);

   di->time = jiffies;
   di->lease_shared_gen = 0;
   di->offset = 0;

   spin_unlock(&realdn->d_lock);
   dput(realdn);
  }
 }

 /* dn must be unhashed */
 if (!d_unhashed(dn))
  d_drop(dn);
 realdn = d_splice_alias(in, dn);
 if (IS_ERR(realdn)) {
  pr_err_client(cl, "error %ld %p inode %p ino %llx.%llx\n",
         PTR_ERR(realdn), dn, in, ceph_vinop(in));
  return PTR_ERR(realdn);
 }

 if (realdn) {
  doutc(cl, "dn %p (%d) spliced with %p (%d) inode %p ino %llx.%llx\n",
        dn, d_count(dn), realdn, d_count(realdn),
        d_inode(realdn), ceph_vinop(d_inode(realdn)));
  dput(dn);
  *pdn = realdn;
 } else {
  BUG_ON(!ceph_dentry(dn));
  doutc(cl, "dn %p attached to %p ino %llx.%llx\n", dn,
        d_inode(dn), ceph_vinop(d_inode(dn)));
 }
 return 0;
}

/*
 * Incorporate results into the local cache.  This is either just
 * one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
 * after a lookup).
 *
 * A reply may contain
 *         a directory inode along with a dentry.
 *  and/or a target inode
 *
 * Called with snap_rwsem (read).
 */

int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
{
 struct ceph_mds_session *session = req->r_session;
 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
 struct inode *in = NULL;
 struct ceph_vino tvino, dvino;
 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(sb);
 struct ceph_client *cl = fsc->client;
 struct inode *parent_dir = NULL;
 int err = 0;

 doutc(cl, "%p is_dentry %d is_target %d\n", req,
       rinfo->head->is_dentry, rinfo->head->is_target);

 if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
  doutc(cl, "reply is empty!\n");
  if (rinfo->head->result == 0 && req->r_parent)
   ceph_invalidate_dir_request(req);
  return 0;
 }

 if (rinfo->head->is_dentry) {
  /*
 * r_parent may be stale, in cases when R_PARENT_LOCKED is not set,
 * so we need to get the correct inode
 */

  parent_dir = ceph_get_reply_dir(sb, req->r_parent, rinfo);
  if (unlikely(IS_ERR(parent_dir))) {
   err = PTR_ERR(parent_dir);
   goto done;
  }
  if (parent_dir) {
   err = ceph_fill_inode(parent_dir, NULL, &rinfo->diri,
           rinfo->dirfrag, session, -1,
           &req->r_caps_reservation);
   if (err < 0)
    goto done;
  } else {
   WARN_ON_ONCE(1);
  }

  if (parent_dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME &&
      test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
      !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
   bool is_nokey = false;
   struct qstr dname;
   struct dentry *dn, *parent;
   struct fscrypt_str oname = FSTR_INIT(NULL, 0);
   struct ceph_fname fname = { .dir = parent_dir,
          .name = rinfo->dname,
          .ctext = rinfo->altname,
          .name_len = rinfo->dname_len,
          .ctext_len = rinfo->altname_len };

   BUG_ON(!rinfo->head->is_target);
   BUG_ON(req->r_dentry);

   parent = d_find_any_alias(parent_dir);
   BUG_ON(!parent);

   err = ceph_fname_alloc_buffer(parent_dir, &oname);
   if (err < 0) {
    dput(parent);
    goto done;
   }

   err = ceph_fname_to_usr(&fname, NULL, &oname, &is_nokey);
   if (err < 0) {
    dput(parent);
    ceph_fname_free_buffer(parent_dir, &oname);
    goto done;
   }
   dname.name = oname.name;
   dname.len = oname.len;
   dname.hash = full_name_hash(parent, dname.name, dname.len);
   tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
   tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
retry_lookup:
   dn = d_lookup(parent, &dname);
   doutc(cl, "d_lookup on parent=%p name=%.*s got %p\n",
         parent, dname.len, dname.name, dn);

   if (!dn) {
    dn = d_alloc(parent, &dname);
    doutc(cl, "d_alloc %p '%.*s' = %p\n", parent,
          dname.len, dname.name, dn);
    if (!dn) {
     dput(parent);
     ceph_fname_free_buffer(parent_dir, &oname);
     err = -ENOMEM;
     goto done;
    }
    if (is_nokey) {
     spin_lock(&dn->d_lock);
     dn->d_flags |= DCACHE_NOKEY_NAME;
     spin_unlock(&dn->d_lock);
    }
    err = 0;
   } else if (d_really_is_positive(dn) &&
       (ceph_ino(d_inode(dn)) != tvino.ino ||
        ceph_snap(d_inode(dn)) != tvino.snap)) {
    doutc(cl, " dn %p points to wrong inode %p\n",
          dn, d_inode(dn));
    ceph_dir_clear_ordered(parent_dir);
    d_delete(dn);
    dput(dn);
    goto retry_lookup;
   }
   ceph_fname_free_buffer(parent_dir, &oname);

   req->r_dentry = dn;
   dput(parent);
  }
 }

 if (rinfo->head->is_target) {
  /* Should be filled in by handle_reply */
  BUG_ON(!req->r_target_inode);

  in = req->r_target_inode;
  err = ceph_fill_inode(in, req->r_locked_page, &rinfo->targeti,
    NULL, session,
    (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
     !test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) &&
     rinfo->head->result == 0) ?  req->r_fmode : -1,
    &req->r_caps_reservation);
  if (err < 0) {
   pr_err_client(cl, "badness %p %llx.%llx\n", in,
          ceph_vinop(in));
   req->r_target_inode = NULL;
   if (in->i_state & I_NEW)
    discard_new_inode(in);
   else
    iput(in);
   goto done;
  }
  if (in->i_state & I_NEW)
   unlock_new_inode(in);
 }

 /*
 * ignore null lease/binding on snapdir ENOENT, or else we
 * will have trouble splicing in the virtual snapdir later
 */

 if (rinfo->head->is_dentry &&
            !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
     test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
     (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
            fsc->mount_options->snapdir_name,
            req->r_dentry->d_name.len))) {
  /*
 * lookup link rename   : null -> possibly existing inode
 * mknod symlink mkdir  : null -> new inode
 * unlink               : linked -> null
 */

  struct inode *dir = req->r_parent;
  struct dentry *dn = req->r_dentry;
  bool have_dir_cap, have_lease;

  BUG_ON(!dn);
  BUG_ON(!dir);
  BUG_ON(d_inode(dn->d_parent) != dir);

  dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
  dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);

  BUG_ON(ceph_ino(dir) != dvino.ino);
  BUG_ON(ceph_snap(dir) != dvino.snap);

  /* do we have a lease on the whole dir? */
  have_dir_cap =
   (le32_to_cpu(rinfo->diri.in->cap.caps) &
    CEPH_CAP_FILE_SHARED);

  /* do we have a dn lease? */
  have_lease = have_dir_cap ||
   le32_to_cpu(rinfo->dlease->duration_ms);
  if (!have_lease)
   doutc(cl, "no dentry lease or dir cap\n");

  /* rename? */
  if (req->r_old_dentry && req->r_op == CEPH_MDS_OP_RENAME) {
   struct inode *olddir = req->r_old_dentry_dir;
   BUG_ON(!olddir);

   doutc(cl, " src %p '%pd' dst %p '%pd'\n",
         req->r_old_dentry, req->r_old_dentry, dn, dn);
   doutc(cl, "doing d_move %p -> %p\n", req->r_old_dentry, dn);

   /* d_move screws up sibling dentries' offsets */
   ceph_dir_clear_ordered(dir);
   ceph_dir_clear_ordered(olddir);

   d_move(req->r_old_dentry, dn);
   doutc(cl, " src %p '%pd' dst %p '%pd'\n",
         req->r_old_dentry, req->r_old_dentry, dn, dn);

   /* ensure target dentry is invalidated, despite
   rehashing bug in vfs_rename_dir */

   ceph_invalidate_dentry_lease(dn);

   doutc(cl, "dn %p gets new offset %lld\n",
         req->r_old_dentry,
         ceph_dentry(req->r_old_dentry)->offset);

   /* swap r_dentry and r_old_dentry in case that
 * splice_dentry() gets called later. This is safe
 * because no other place will use them */

   req->r_dentry = req->r_old_dentry;
   req->r_old_dentry = dn;
   dn = req->r_dentry;
  }

  /* null dentry? */
  if (!rinfo->head->is_target) {
   doutc(cl, "null dentry\n");
   if (d_really_is_positive(dn)) {
    doutc(cl, "d_delete %p\n", dn);
    ceph_dir_clear_ordered(dir);
    d_delete(dn);
   } else if (have_lease) {
    if (d_unhashed(dn))
     d_add(dn, NULL);
   }

   if (!d_unhashed(dn) && have_lease)
    update_dentry_lease(dir, dn,
          rinfo->dlease, session,
          req->r_request_started);
   goto done;
  }

  /* attach proper inode */
  if (d_really_is_negative(dn)) {
   ceph_dir_clear_ordered(dir);
   ihold(in);
   err = splice_dentry(&req->r_dentry, in);
   if (err < 0)
    goto done;
   dn = req->r_dentry;  /* may have spliced */
  } else if (d_really_is_positive(dn) && d_inode(dn) != in) {
   doutc(cl, " %p links to %p %llx.%llx, not %llx.%llx\n",
         dn, d_inode(dn), ceph_vinop(d_inode(dn)),
         ceph_vinop(in));
   d_invalidate(dn);
   have_lease = false;
  }

  if (have_lease) {
   update_dentry_lease(dir, dn,
         rinfo->dlease, session,
         req->r_request_started);
  }
  doutc(cl, " final dn %p\n", dn);
 } else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
      req->r_op == CEPH_MDS_OP_MKSNAP) &&
            test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
     !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
  struct inode *dir = req->r_parent;

  /* fill out a snapdir LOOKUPSNAP dentry */
  BUG_ON(!dir);
  BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
  BUG_ON(!req->r_dentry);
  doutc(cl, " linking snapped dir %p to dn %p\n", in,
        req->r_dentry);
  ceph_dir_clear_ordered(dir);
  ihold(in);
  err = splice_dentry(&req->r_dentry, in);
  if (err < 0)
   goto done;
 } else if (rinfo->head->is_dentry && req->r_dentry) {
  /* parent inode is not locked, be careful */
  struct ceph_vino *ptvino = NULL;
  dvino.ino = le64_to_cpu(rinfo->diri.in->ino);
  dvino.snap = le64_to_cpu(rinfo->diri.in->snapid);
  if (rinfo->head->is_target) {
   tvino.ino = le64_to_cpu(rinfo->targeti.in->ino);
   tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid);
   ptvino = &tvino;
  }
  update_dentry_lease_careful(req->r_dentry, rinfo->dlease,
         session, req->r_request_started,
         rinfo->dname, rinfo->dname_len,
         &dvino, ptvino);
 }
done:
 /* Drop extra ref from ceph_get_reply_dir() if it returned a new inode */
 if (unlikely(!IS_ERR_OR_NULL(parent_dir) && parent_dir != req->r_parent))
  iput(parent_dir);
 doutc(cl, "done err=%d\n", err);
 return err;
}

/*
 * Prepopulate our cache with readdir results, leases, etc.
 */

static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
        struct ceph_mds_session *session)
{
 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
 struct ceph_client *cl = session->s_mdsc->fsc->client;
 int i, err = 0;

 for (i = 0; i < rinfo->dir_nr; i++) {
  struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
  struct ceph_vino vino;
  struct inode *in;
  int rc;

  vino.ino = le64_to_cpu(rde->inode.in->ino);
  vino.snap = le64_to_cpu(rde->inode.in->snapid);

  in = ceph_get_inode(req->r_dentry->d_sb, vino, NULL);
  if (IS_ERR(in)) {
   err = PTR_ERR(in);
   doutc(cl, "badness got %d\n", err);
   continue;
  }
  rc = ceph_fill_inode(in, NULL, &rde->inode, NULL, session,
         -1, &req->r_caps_reservation);
  if (rc < 0) {
   pr_err_client(cl, "inode badness on %p got %d\n", in,
          rc);
   err = rc;
   if (in->i_state & I_NEW) {
    ihold(in);
    discard_new_inode(in);
   }
  } else if (in->i_state & I_NEW) {
   unlock_new_inode(in);
  }

  iput(in);
 }

 return err;
}

void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl)
{
 if (ctl->folio) {
  folio_release_kmap(ctl->folio, ctl->dentries);
  ctl->folio = NULL;
 }
}

static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
         struct ceph_readdir_cache_control *ctl,
         struct ceph_mds_request *req)
{
 struct ceph_client *cl = ceph_inode_to_client(dir);
 struct ceph_inode_info *ci = ceph_inode(dir);
 unsigned nsize = PAGE_SIZE / sizeof(struct dentry*);
 unsigned idx = ctl->index % nsize;
 pgoff_t pgoff = ctl->index / nsize;

 if (!ctl->folio || pgoff != ctl->folio->index) {
  ceph_readdir_cache_release(ctl);
  fgf_t fgf = FGP_LOCK;

  if (idx == 0)
   fgf |= FGP_ACCESSED | FGP_CREAT;

  ctl->folio = __filemap_get_folio(&dir->i_data, pgoff,
    fgf, mapping_gfp_mask(&dir->i_data));
  if (IS_ERR(ctl->folio)) {
   int err = PTR_ERR(ctl->folio);

   ctl->folio = NULL;
   ctl->index = -1;
   return idx == 0 ? err : 0;
  }
  /* reading/filling the cache are serialized by
 * i_rwsem, no need to use folio lock */

  folio_unlock(ctl->folio);
  ctl->dentries = kmap_local_folio(ctl->folio, 0);
  if (idx == 0)
   memset(ctl->dentries, 0, PAGE_SIZE);
 }

 if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
     req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) {
  doutc(cl, "dn %p idx %d\n", dn, ctl->index);
  ctl->dentries[idx] = dn;
  ctl->index++;
 } else {
  doutc(cl, "disable readdir cache\n");
  ctl->index = -1;
 }
 return 0;
}

int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        struct ceph_mds_session *session)
{
 struct dentry *parent = req->r_dentry;
 struct inode *inode = d_inode(parent);
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
 struct ceph_client *cl = session->s_mdsc->fsc->client;
 struct qstr dname;
 struct dentry *dn;
 struct inode *in;
 int err = 0, skipped = 0, ret, i;
 u32 frag = le32_to_cpu(req->r_args.readdir.frag);
 u32 last_hash = 0;
 u32 fpos_offset;
 struct ceph_readdir_cache_control cache_ctl = {};

 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
  return readdir_prepopulate_inodes_only(req, session);

 if (rinfo->hash_order) {
  if (req->r_path2) {
   last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
        req->r_path2,
        strlen(req->r_path2));
   last_hash = ceph_frag_value(last_hash);
  } else if (rinfo->offset_hash) {
   /* mds understands offset_hash */
   WARN_ON_ONCE(req->r_readdir_offset != 2);
   last_hash = le32_to_cpu(req->r_args.readdir.offset_hash);
  }
 }

 if (rinfo->dir_dir &&
     le32_to_cpu(rinfo->dir_dir->frag) != frag) {
  doutc(cl, "got new frag %x -> %x\n", frag,
       le32_to_cpu(rinfo->dir_dir->frag));
  frag = le32_to_cpu(rinfo->dir_dir->frag);
  if (!rinfo->hash_order)
   req->r_readdir_offset = 2;
 }

 if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
  doutc(cl, "%d items under SNAPDIR dn %p\n",
        rinfo->dir_nr, parent);
 } else {
  doutc(cl, "%d items under dn %p\n", rinfo->dir_nr, parent);
  if (rinfo->dir_dir)
   ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir);

  if (ceph_frag_is_leftmost(frag) &&
      req->r_readdir_offset == 2 &&
      !(rinfo->hash_order && last_hash)) {
   /* note dir version at start of readdir so we can
 * tell if any dentries get dropped */

   req->r_dir_release_cnt =
    atomic64_read(&ci->i_release_count);
   req->r_dir_ordered_cnt =
    atomic64_read(&ci->i_ordered_count);
   req->r_readdir_cache_idx = 0;
  }
 }

 cache_ctl.index = req->r_readdir_cache_idx;
 fpos_offset = req->r_readdir_offset;

 /* FIXME: release caps/leases if error occurs */
 for (i = 0; i < rinfo->dir_nr; i++) {
  struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
  struct ceph_vino tvino;

  dname.name = rde->name;
  dname.len = rde->name_len;
  dname.hash = full_name_hash(parent, dname.name, dname.len);

  tvino.ino = le64_to_cpu(rde->inode.in->ino);
  tvino.snap = le64_to_cpu(rde->inode.in->snapid);

  if (rinfo->hash_order) {
   u32 hash = ceph_frag_value(rde->raw_hash);
   if (hash != last_hash)
    fpos_offset = 2;
   last_hash = hash;
   rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
  } else {
   rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
  }

retry_lookup:
  dn = d_lookup(parent, &dname);
  doutc(cl, "d_lookup on parent=%p name=%.*s got %p\n",
        parent, dname.len, dname.name, dn);

  if (!dn) {
   dn = d_alloc(parent, &dname);
   doutc(cl, "d_alloc %p '%.*s' = %p\n", parent,
         dname.len, dname.name, dn);
   if (!dn) {
    doutc(cl, "d_alloc badness\n");
    err = -ENOMEM;
    goto out;
   }
   if (rde->is_nokey) {
    spin_lock(&dn->d_lock);
    dn->d_flags |= DCACHE_NOKEY_NAME;
    spin_unlock(&dn->d_lock);
   }
  } else if (d_really_is_positive(dn) &&
      (ceph_ino(d_inode(dn)) != tvino.ino ||
       ceph_snap(d_inode(dn)) != tvino.snap)) {
   struct ceph_dentry_info *di = ceph_dentry(dn);
   doutc(cl, " dn %p points to wrong inode %p\n",
         dn, d_inode(dn));

   spin_lock(&dn->d_lock);
   if (di->offset > 0 &&
       di->lease_shared_gen ==
       atomic_read(&ci->i_shared_gen)) {
    __ceph_dir_clear_ordered(ci);
    di->offset = 0;
   }
   spin_unlock(&dn->d_lock);

   d_delete(dn);
   dput(dn);
   goto retry_lookup;
  }

  /* inode */
  if (d_really_is_positive(dn)) {
   in = d_inode(dn);
  } else {
   in = ceph_get_inode(parent->d_sb, tvino, NULL);
   if (IS_ERR(in)) {
    doutc(cl, "new_inode badness\n");
    d_drop(dn);
    dput(dn);
    err = PTR_ERR(in);
    goto out;
   }
  }

  ret = ceph_fill_inode(in, NULL, &rde->inode, NULL, session,
          -1, &req->r_caps_reservation);
  if (ret < 0) {
   pr_err_client(cl, "badness on %p %llx.%llx\n", in,
          ceph_vinop(in));
   if (d_really_is_negative(dn)) {
    if (in->i_state & I_NEW) {
     ihold(in);
     discard_new_inode(in);
    }
    iput(in);
   }
   d_drop(dn);
   err = ret;
   goto next_item;
  }
  if (in->i_state & I_NEW)
   unlock_new_inode(in);

  if (d_really_is_negative(dn)) {
   if (ceph_security_xattr_deadlock(in)) {
    doutc(cl, " skip splicing dn %p to inode %p"
          " (security xattr deadlock)\n", dn, in);
    iput(in);
    skipped++;
    goto next_item;
   }

   err = splice_dentry(&dn, in);
   if (err < 0)
    goto next_item;
  }

  ceph_dentry(dn)->offset = rde->offset;

  update_dentry_lease(d_inode(parent), dn,
        rde->lease, req->r_session,
        req->r_request_started);

  if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
   ret = fill_readdir_cache(d_inode(parent), dn,
       &cache_ctl, req);
   if (ret < 0)
    err = ret;
  }
next_item:
  dput(dn);
 }
out:
 if (err == 0 && skipped == 0) {
  set_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags);
  req->r_readdir_cache_idx = cache_ctl.index;
 }
 ceph_readdir_cache_release(&cache_ctl);
 doutc(cl, "done\n");
 return err;
}

bool ceph_inode_set_size(struct inode *inode, loff_t size)
{
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 bool ret;

 spin_lock(&ci->i_ceph_lock);
 doutc(cl, "set_size %p %llu -> %llu\n", inode, i_size_read(inode), size);
 i_size_write(inode, size);
 ceph_fscache_update(inode);
 inode->i_blocks = calc_inode_blocks(size);

 ret = __ceph_should_report_size(ci);

 spin_unlock(&ci->i_ceph_lock);

 return ret;
}

void ceph_queue_inode_work(struct inode *inode, int work_bit)
{
 struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
 struct ceph_client *cl = fsc->client;
 struct ceph_inode_info *ci = ceph_inode(inode);
 set_bit(work_bit, &ci->i_work_mask);

 ihold(inode);
 if (queue_work(fsc->inode_wq, &ci->i_work)) {
  doutc(cl, "%p %llx.%llx mask=%lx\n", inode,
        ceph_vinop(inode), ci->i_work_mask);
 } else {
  doutc(cl, "%p %llx.%llx already queued, mask=%lx\n",
        inode, ceph_vinop(inode), ci->i_work_mask);
  iput(inode);
 }
}

static void ceph_do_invalidate_pages(struct inode *inode)
{
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 u32 orig_gen;
 int check = 0;

 ceph_fscache_invalidate(inode, false);

 mutex_lock(&ci->i_truncate_mutex);

 if (ceph_inode_is_shutdown(inode)) {
  pr_warn_ratelimited_client(cl,
   "%p %llx.%llx is shut down\n", inode,
   ceph_vinop(inode));
  mapping_set_error(inode->i_mapping, -EIO);
  truncate_pagecache(inode, 0);
  mutex_unlock(&ci->i_truncate_mutex);
  goto out;
 }

 spin_lock(&ci->i_ceph_lock);
 doutc(cl, "%p %llx.%llx gen %d revoking %d\n", inode,
       ceph_vinop(inode), ci->i_rdcache_gen, ci->i_rdcache_revoking);
 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
  if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
   check = 1;
  spin_unlock(&ci->i_ceph_lock);
  mutex_unlock(&ci->i_truncate_mutex);
  goto out;
 }
 orig_gen = ci->i_rdcache_gen;
 spin_unlock(&ci->i_ceph_lock);

 if (invalidate_inode_pages2(inode->i_mapping) < 0) {
  pr_err_client(cl, "invalidate_inode_pages2 %llx.%llx failed\n",
         ceph_vinop(inode));
 }

 spin_lock(&ci->i_ceph_lock);
 if (orig_gen == ci->i_rdcache_gen &&
     orig_gen == ci->i_rdcache_revoking) {
  doutc(cl, "%p %llx.%llx gen %d successful\n", inode,
        ceph_vinop(inode), ci->i_rdcache_gen);
  ci->i_rdcache_revoking--;
  check = 1;
 } else {
  doutc(cl, "%p %llx.%llx gen %d raced, now %d revoking %d\n",
        inode, ceph_vinop(inode), orig_gen, ci->i_rdcache_gen,
        ci->i_rdcache_revoking);
  if (__ceph_caps_revoking_other(ci, NULL, CEPH_CAP_FILE_CACHE))
   check = 1;
 }
 spin_unlock(&ci->i_ceph_lock);
 mutex_unlock(&ci->i_truncate_mutex);
out:
 if (check)
  ceph_check_caps(ci, 0);
}

/*
 * Make sure any pending truncation is applied before doing anything
 * that may depend on it.
 */

void __ceph_do_pending_vmtruncate(struct inode *inode)
{
 struct ceph_client *cl = ceph_inode_to_client(inode);
 struct ceph_inode_info *ci = ceph_inode(inode);
 u64 to;
 int wrbuffer_refs, finish = 0;

 mutex_lock(&ci->i_truncate_mutex);
retry:
 spin_lock(&ci->i_ceph_lock);
 if (ci->i_truncate_pending == 0) {
  doutc(cl, "%p %llx.%llx none pending\n", inode,
        ceph_vinop(inode));
  spin_unlock(&ci->i_ceph_lock);
  mutex_unlock(&ci->i_truncate_mutex);
  return;
 }

 /*
 * make sure any dirty snapped pages are flushed before we
 * possibly truncate them.. so write AND block!
 */

 if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {
  spin_unlock(&ci->i_ceph_lock);
  doutc(cl, "%p %llx.%llx flushing snaps first\n", inode,
        ceph_vinop(inode));
  filemap_write_and_wait_range(&inode->i_data, 0,
          inode->i_sb->s_maxbytes);
  goto retry;
 }

 /* there should be no reader or writer */
 WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref);

 to = ci->i_truncate_pagecache_size;
 wrbuffer_refs = ci->i_wrbuffer_ref;
 doutc(cl, "%p %llx.%llx (%d) to %lld\n", inode, ceph_vinop(inode),
       ci->i_truncate_pending, to);
 spin_unlock(&ci->i_ceph_lock);

 ceph_fscache_resize(inode, to);
 truncate_pagecache(inode, to);

 spin_lock(&ci->i_ceph_lock);
 if (to == ci->i_truncate_pagecache_size) {
  ci->i_truncate_pending = 0;
  finish = 1;
 }
 spin_unlock(&ci->i_ceph_lock);
 if (!finish)
  goto retry;

 mutex_unlock(&ci->i_truncate_mutex);

 if (wrbuffer_refs == 0)
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=96 H=91 G=93

¤ Dauer der Verarbeitung: 0.54 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.