Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Linux/io_uring/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 108 kB image not shown  

Quelle  io_uring.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0
/*
 * Shared application/kernel submission and completion ring pairs, for
 * supporting fast/efficient IO.
 *
 * A note on the read/write ordering memory barriers that are matched between
 * the application and kernel side.
 *
 * After the application reads the CQ ring tail, it must use an
 * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
 * before writing the tail (using smp_load_acquire to read the tail will
 * do). It also needs a smp_mb() before updating CQ head (ordering the
 * entry load(s) with the head store), pairing with an implicit barrier
 * through a control-dependency in io_get_cqe (smp_store_release to
 * store head will do). Failure to do so could lead to reading invalid
 * CQ entries.
 *
 * Likewise, the application must use an appropriate smp_wmb() before
 * writing the SQ tail (ordering SQ entry stores with the tail store),
 * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
 * to store the tail will do). And it needs a barrier ordering the SQ
 * head load before writing new SQ entries (smp_load_acquire to read
 * head will do).
 *
 * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
 * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
 * updating the SQ tail; a full memory barrier smp_mb() is needed
 * between.
 *
 * Also see the examples in the liburing library:
 *
 * git://git.kernel.dk/liburing
 *
 * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
 * from data shared between the kernel and application. This is done both
 * for ordering purposes, but also to ensure that once a value is loaded from
 * data that the application could potentially modify, it remains stable.
 *
 * Copyright (C) 2018-2019 Jens Axboe
 * Copyright (c) 2018-2019 Christoph Hellwig
 */

#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/errno.h>
#include <linux/syscalls.h>
#include <net/compat.h>
#include <linux/refcount.h>
#include <linux/uio.h>
#include <linux/bits.h>

#include <linux/sched/signal.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/mm.h>
#include <linux/mman.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/bvec.h>
#include <linux/net.h>
#include <net/sock.h>
#include <linux/anon_inodes.h>
#include <linux/sched/mm.h>
#include <linux/uaccess.h>
#include <linux/nospec.h>
#include <linux/fsnotify.h>
#include <linux/fadvise.h>
#include <linux/task_work.h>
#include <linux/io_uring.h>
#include <linux/io_uring/cmd.h>
#include <linux/audit.h>
#include <linux/security.h>
#include <linux/jump_label.h>
#include <asm/shmparam.h>

#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>

#include <uapi/linux/io_uring.h>

#include "io-wq.h"

#include "io_uring.h"
#include "opdef.h"
#include "refs.h"
#include "tctx.h"
#include "register.h"
#include "sqpoll.h"
#include "fdinfo.h"
#include "kbuf.h"
#include "rsrc.h"
#include "cancel.h"
#include "net.h"
#include "notif.h"
#include "waitid.h"
#include "futex.h"
#include "napi.h"
#include "uring_cmd.h"
#include "msg_ring.h"
#include "memmap.h"
#include "zcrx.h"

#include "timeout.h"
#include "poll.h"
#include "rw.h"
#include "alloc_cache.h"
#include "eventfd.h"

#define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
     IOSQE_IO_HARDLINK | IOSQE_ASYNC)

#define SQE_VALID_FLAGS (SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
   IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)

#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)

#define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
    REQ_F_INFLIGHT | REQ_F_CREDS | REQ_F_ASYNC_DATA)

#define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | IO_REQ_LINK_FLAGS | \
     REQ_F_REISSUE | REQ_F_POLLED | \
     IO_REQ_CLEAN_FLAGS)

#define IO_TCTX_REFS_CACHE_NR (1U << 10)

#define IO_COMPL_BATCH   32
#define IO_REQ_ALLOC_BATCH  8
#define IO_LOCAL_TW_DEFAULT_MAX  20

struct io_defer_entry {
 struct list_head list;
 struct io_kiocb  *req;
};

/* requests with any of those set should undergo io_disarm_next() */
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)

/*
 * No waiters. It's larger than any valid value of the tw counter
 * so that tests against ->cq_wait_nr would fail and skip wake_up().
 */

#define IO_CQ_WAKE_INIT  (-1U)
/* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
#define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)

static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
      struct io_uring_task *tctx,
      bool cancel_all,
      bool is_sqpoll_thread);

static void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags);
static void __io_req_caches_free(struct io_ring_ctx *ctx);

static __read_mostly DEFINE_STATIC_KEY_FALSE(io_key_has_sqarray);

struct kmem_cache *req_cachep;
static struct workqueue_struct *iou_wq __ro_after_init;

static int __read_mostly sysctl_io_uring_disabled;
static int __read_mostly sysctl_io_uring_group = -1;

#ifdef CONFIG_SYSCTL
static const struct ctl_table kernel_io_uring_disabled_table[] = {
 {
  .procname = "io_uring_disabled",
  .data  = &sysctl_io_uring_disabled,
  .maxlen  = sizeof(sysctl_io_uring_disabled),
  .mode  = 0644,
  .proc_handler = proc_dointvec_minmax,
  .extra1  = SYSCTL_ZERO,
  .extra2  = SYSCTL_TWO,
 },
 {
  .procname = "io_uring_group",
  .data  = &sysctl_io_uring_group,
  .maxlen  = sizeof(gid_t),
  .mode  = 0644,
  .proc_handler = proc_dointvec,
 },
};
#endif

static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
{
 return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
}

static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
{
 return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
}

static bool io_match_linked(struct io_kiocb *head)
{
 struct io_kiocb *req;

 io_for_each_link(req, head) {
  if (req->flags & REQ_F_INFLIGHT)
   return true;
 }
 return false;
}

/*
 * As io_match_task() but protected against racing with linked timeouts.
 * User must not hold timeout_lock.
 */

bool io_match_task_safe(struct io_kiocb *head, struct io_uring_task *tctx,
   bool cancel_all)
{
 bool matched;

 if (tctx && head->tctx != tctx)
  return false;
 if (cancel_all)
  return true;

 if (head->flags & REQ_F_LINK_TIMEOUT) {
  struct io_ring_ctx *ctx = head->ctx;

  /* protect against races with linked timeouts */
  raw_spin_lock_irq(&ctx->timeout_lock);
  matched = io_match_linked(head);
  raw_spin_unlock_irq(&ctx->timeout_lock);
 } else {
  matched = io_match_linked(head);
 }
 return matched;
}

static inline void req_fail_link_node(struct io_kiocb *req, int res)
{
 req_set_fail(req);
 io_req_set_res(req, res, 0);
}

static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
{
 wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
}

static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
{
 struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);

 complete(&ctx->ref_comp);
}

static __cold void io_fallback_req_func(struct work_struct *work)
{
 struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
      fallback_work.work);
 struct llist_node *node = llist_del_all(&ctx->fallback_llist);
 struct io_kiocb *req, *tmp;
 struct io_tw_state ts = {};

 percpu_ref_get(&ctx->refs);
 mutex_lock(&ctx->uring_lock);
 llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
  req->io_task_work.func(req, ts);
 io_submit_flush_completions(ctx);
 mutex_unlock(&ctx->uring_lock);
 percpu_ref_put(&ctx->refs);
}

static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
{
 unsigned int hash_buckets;
 int i;

 do {
  hash_buckets = 1U << bits;
  table->hbs = kvmalloc_array(hash_buckets, sizeof(table->hbs[0]),
      GFP_KERNEL_ACCOUNT);
  if (table->hbs)
   break;
  if (bits == 1)
   return -ENOMEM;
  bits--;
 } while (1);

 table->hash_bits = bits;
 for (i = 0; i < hash_buckets; i++)
  INIT_HLIST_HEAD(&table->hbs[i].list);
 return 0;
}

static void io_free_alloc_caches(struct io_ring_ctx *ctx)
{
 io_alloc_cache_free(&ctx->apoll_cache, kfree);
 io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
 io_alloc_cache_free(&ctx->rw_cache, io_rw_cache_free);
 io_alloc_cache_free(&ctx->cmd_cache, io_cmd_cache_free);
 io_futex_cache_free(ctx);
 io_rsrc_cache_free(ctx);
}

static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
{
 struct io_ring_ctx *ctx;
 int hash_bits;
 bool ret;

 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
 if (!ctx)
  return NULL;

 xa_init(&ctx->io_bl_xa);

 /*
 * Use 5 bits less than the max cq entries, that should give us around
 * 32 entries per hash list if totally full and uniformly spread, but
 * don't keep too many buckets to not overconsume memory.
 */

 hash_bits = ilog2(p->cq_entries) - 5;
 hash_bits = clamp(hash_bits, 1, 8);
 if (io_alloc_hash_table(&ctx->cancel_table, hash_bits))
  goto err;
 if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
       0, GFP_KERNEL))
  goto err;

 ctx->flags = p->flags;
 ctx->hybrid_poll_time = LLONG_MAX;
 atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
 init_waitqueue_head(&ctx->sqo_sq_wait);
 INIT_LIST_HEAD(&ctx->sqd_list);
 INIT_LIST_HEAD(&ctx->cq_overflow_list);
 ret = io_alloc_cache_init(&ctx->apoll_cache, IO_POLL_ALLOC_CACHE_MAX,
       sizeof(struct async_poll), 0);
 ret |= io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
       sizeof(struct io_async_msghdr),
       offsetof(struct io_async_msghdr, clear));
 ret |= io_alloc_cache_init(&ctx->rw_cache, IO_ALLOC_CACHE_MAX,
       sizeof(struct io_async_rw),
       offsetof(struct io_async_rw, clear));
 ret |= io_alloc_cache_init(&ctx->cmd_cache, IO_ALLOC_CACHE_MAX,
       sizeof(struct io_async_cmd),
       sizeof(struct io_async_cmd));
 ret |= io_futex_cache_init(ctx);
 ret |= io_rsrc_cache_init(ctx);
 if (ret)
  goto free_ref;
 init_completion(&ctx->ref_comp);
 xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
 mutex_init(&ctx->uring_lock);
 init_waitqueue_head(&ctx->cq_wait);
 init_waitqueue_head(&ctx->poll_wq);
 spin_lock_init(&ctx->completion_lock);
 raw_spin_lock_init(&ctx->timeout_lock);
 INIT_WQ_LIST(&ctx->iopoll_list);
 INIT_LIST_HEAD(&ctx->defer_list);
 INIT_LIST_HEAD(&ctx->timeout_list);
 INIT_LIST_HEAD(&ctx->ltimeout_list);
 init_llist_head(&ctx->work_llist);
 INIT_LIST_HEAD(&ctx->tctx_list);
 ctx->submit_state.free_list.next = NULL;
 INIT_HLIST_HEAD(&ctx->waitid_list);
 xa_init_flags(&ctx->zcrx_ctxs, XA_FLAGS_ALLOC);
#ifdef CONFIG_FUTEX
 INIT_HLIST_HEAD(&ctx->futex_list);
#endif
 INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
 INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
 INIT_HLIST_HEAD(&ctx->cancelable_uring_cmd);
 io_napi_init(ctx);
 mutex_init(&ctx->mmap_lock);

 return ctx;

free_ref:
 percpu_ref_exit(&ctx->refs);
err:
 io_free_alloc_caches(ctx);
 kvfree(ctx->cancel_table.hbs);
 xa_destroy(&ctx->io_bl_xa);
 kfree(ctx);
 return NULL;
}

static void io_clean_op(struct io_kiocb *req)
{
 if (unlikely(req->flags & REQ_F_BUFFER_SELECTED))
  io_kbuf_drop_legacy(req);

 if (req->flags & REQ_F_NEED_CLEANUP) {
  const struct io_cold_def *def = &io_cold_defs[req->opcode];

  if (def->cleanup)
   def->cleanup(req);
 }
 if (req->flags & REQ_F_INFLIGHT)
  atomic_dec(&req->tctx->inflight_tracked);
 if (req->flags & REQ_F_CREDS)
  put_cred(req->creds);
 if (req->flags & REQ_F_ASYNC_DATA) {
  kfree(req->async_data);
  req->async_data = NULL;
 }
 req->flags &= ~IO_REQ_CLEAN_FLAGS;
}

/*
 * Mark the request as inflight, so that file cancelation will find it.
 * Can be used if the file is an io_uring instance, or if the request itself
 * relies on ->mm being alive for the duration of the request.
 */

inline void io_req_track_inflight(struct io_kiocb *req)
{
 if (!(req->flags & REQ_F_INFLIGHT)) {
  req->flags |= REQ_F_INFLIGHT;
  atomic_inc(&req->tctx->inflight_tracked);
 }
}

static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
{
 if (WARN_ON_ONCE(!req->link))
  return NULL;

 req->flags &= ~REQ_F_ARM_LTIMEOUT;
 req->flags |= REQ_F_LINK_TIMEOUT;

 /* linked timeouts should have two refs once prep'ed */
 io_req_set_refcount(req);
 __io_req_set_refcount(req->link, 2);
 return req->link;
}

static void io_prep_async_work(struct io_kiocb *req)
{
 const struct io_issue_def *def = &io_issue_defs[req->opcode];
 struct io_ring_ctx *ctx = req->ctx;

 if (!(req->flags & REQ_F_CREDS)) {
  req->flags |= REQ_F_CREDS;
  req->creds = get_current_cred();
 }

 req->work.list.next = NULL;
 atomic_set(&req->work.flags, 0);
 if (req->flags & REQ_F_FORCE_ASYNC)
  atomic_or(IO_WQ_WORK_CONCURRENT, &req->work.flags);

 if (req->file && !(req->flags & REQ_F_FIXED_FILE))
  req->flags |= io_file_get_flags(req->file);

 if (req->file && (req->flags & REQ_F_ISREG)) {
  bool should_hash = def->hash_reg_file;

  /* don't serialize this request if the fs doesn't need it */
  if (should_hash && (req->file->f_flags & O_DIRECT) &&
      (req->file->f_op->fop_flags & FOP_DIO_PARALLEL_WRITE))
   should_hash = false;
  if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
   io_wq_hash_work(&req->work, file_inode(req->file));
 } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
  if (def->unbound_nonreg_file)
   atomic_or(IO_WQ_WORK_UNBOUND, &req->work.flags);
 }
}

static void io_prep_async_link(struct io_kiocb *req)
{
 struct io_kiocb *cur;

 if (req->flags & REQ_F_LINK_TIMEOUT) {
  struct io_ring_ctx *ctx = req->ctx;

  raw_spin_lock_irq(&ctx->timeout_lock);
  io_for_each_link(cur, req)
   io_prep_async_work(cur);
  raw_spin_unlock_irq(&ctx->timeout_lock);
 } else {
  io_for_each_link(cur, req)
   io_prep_async_work(cur);
 }
}

static void io_queue_iowq(struct io_kiocb *req)
{
 struct io_uring_task *tctx = req->tctx;

 BUG_ON(!tctx);

 if ((current->flags & PF_KTHREAD) || !tctx->io_wq) {
  io_req_task_queue_fail(req, -ECANCELED);
  return;
 }

 /* init ->work of the whole link before punting */
 io_prep_async_link(req);

 /*
 * Not expected to happen, but if we do have a bug where this _can_
 * happen, catch it here and ensure the request is marked as
 * canceled. That will make io-wq go through the usual work cancel
 * procedure rather than attempt to run this request (or create a new
 * worker for it).
 */

 if (WARN_ON_ONCE(!same_thread_group(tctx->task, current)))
  atomic_or(IO_WQ_WORK_CANCEL, &req->work.flags);

 trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
 io_wq_enqueue(tctx->io_wq, &req->work);
}

static void io_req_queue_iowq_tw(struct io_kiocb *req, io_tw_token_t tw)
{
 io_queue_iowq(req);
}

void io_req_queue_iowq(struct io_kiocb *req)
{
 req->io_task_work.func = io_req_queue_iowq_tw;
 io_req_task_work_add(req);
}

static unsigned io_linked_nr(struct io_kiocb *req)
{
 struct io_kiocb *tmp;
 unsigned nr = 0;

 io_for_each_link(tmp, req)
  nr++;
 return nr;
}

static __cold noinline void io_queue_deferred(struct io_ring_ctx *ctx)
{
 bool drain_seen = false, first = true;

 lockdep_assert_held(&ctx->uring_lock);
 __io_req_caches_free(ctx);

 while (!list_empty(&ctx->defer_list)) {
  struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
      struct io_defer_entry, list);

  drain_seen |= de->req->flags & REQ_F_IO_DRAIN;
  if ((drain_seen || first) && ctx->nr_req_allocated != ctx->nr_drained)
   return;

  list_del_init(&de->list);
  ctx->nr_drained -= io_linked_nr(de->req);
  io_req_task_queue(de->req);
  kfree(de);
  first = false;
 }
}

void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
 if (ctx->poll_activated)
  io_poll_wq_wake(ctx);
 if (ctx->off_timeout_used)
  io_flush_timeouts(ctx);
 if (ctx->has_evfd)
  io_eventfd_signal(ctx, true);
}

static inline void __io_cq_lock(struct io_ring_ctx *ctx)
{
 if (!ctx->lockless_cq)
  spin_lock(&ctx->completion_lock);
}

static inline void io_cq_lock(struct io_ring_ctx *ctx)
 __acquires(ctx->completion_lock)
{
 spin_lock(&ctx->completion_lock);
}

static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
{
 io_commit_cqring(ctx);
 if (!ctx->task_complete) {
  if (!ctx->lockless_cq)
   spin_unlock(&ctx->completion_lock);
  /* IOPOLL rings only need to wake up if it's also SQPOLL */
  if (!ctx->syscall_iopoll)
   io_cqring_wake(ctx);
 }
 io_commit_cqring_flush(ctx);
}

static void io_cq_unlock_post(struct io_ring_ctx *ctx)
 __releases(ctx->completion_lock)
{
 io_commit_cqring(ctx);
 spin_unlock(&ctx->completion_lock);
 io_cqring_wake(ctx);
 io_commit_cqring_flush(ctx);
}

static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool dying)
{
 size_t cqe_size = sizeof(struct io_uring_cqe);

 lockdep_assert_held(&ctx->uring_lock);

 /* don't abort if we're dying, entries must get freed */
 if (!dying && __io_cqring_events(ctx) == ctx->cq_entries)
  return;

 if (ctx->flags & IORING_SETUP_CQE32)
  cqe_size <<= 1;

 io_cq_lock(ctx);
 while (!list_empty(&ctx->cq_overflow_list)) {
  struct io_uring_cqe *cqe;
  struct io_overflow_cqe *ocqe;

  ocqe = list_first_entry(&ctx->cq_overflow_list,
     struct io_overflow_cqe, list);

  if (!dying) {
   if (!io_get_cqe_overflow(ctx, &cqe, true))
    break;
   memcpy(cqe, &ocqe->cqe, cqe_size);
  }
  list_del(&ocqe->list);
  kfree(ocqe);

  /*
 * For silly syzbot cases that deliberately overflow by huge
 * amounts, check if we need to resched and drop and
 * reacquire the locks if so. Nothing real would ever hit this.
 * Ideally we'd have a non-posting unlock for this, but hard
 * to care for a non-real case.
 */

  if (need_resched()) {
   ctx->cqe_sentinel = ctx->cqe_cached;
   io_cq_unlock_post(ctx);
   mutex_unlock(&ctx->uring_lock);
   cond_resched();
   mutex_lock(&ctx->uring_lock);
   io_cq_lock(ctx);
  }
 }

 if (list_empty(&ctx->cq_overflow_list)) {
  clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
  atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
 }
 io_cq_unlock_post(ctx);
}

static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
{
 if (ctx->rings)
  __io_cqring_overflow_flush(ctx, true);
}

static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
{
 mutex_lock(&ctx->uring_lock);
 __io_cqring_overflow_flush(ctx, false);
 mutex_unlock(&ctx->uring_lock);
}

/* must to be called somewhat shortly after putting a request */
static inline void io_put_task(struct io_kiocb *req)
{
 struct io_uring_task *tctx = req->tctx;

 if (likely(tctx->task == current)) {
  tctx->cached_refs++;
 } else {
  percpu_counter_sub(&tctx->inflight, 1);
  if (unlikely(atomic_read(&tctx->in_cancel)))
   wake_up(&tctx->wait);
  put_task_struct(tctx->task);
 }
}

void io_task_refs_refill(struct io_uring_task *tctx)
{
 unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;

 percpu_counter_add(&tctx->inflight, refill);
 refcount_add(refill, ¤t->usage);
 tctx->cached_refs += refill;
}

static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
{
 struct io_uring_task *tctx = task->io_uring;
 unsigned int refs = tctx->cached_refs;

 if (refs) {
  tctx->cached_refs = 0;
  percpu_counter_sub(&tctx->inflight, refs);
  put_task_struct_many(task, refs);
 }
}

static __cold bool io_cqring_add_overflow(struct io_ring_ctx *ctx,
       struct io_overflow_cqe *ocqe)
{
 lockdep_assert_held(&ctx->completion_lock);

 if (!ocqe) {
  struct io_rings *r = ctx->rings;

  /*
 * If we're in ring overflow flush mode, or in task cancel mode,
 * or cannot allocate an overflow entry, then we need to drop it
 * on the floor.
 */

  WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
  set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
  return false;
 }
 if (list_empty(&ctx->cq_overflow_list)) {
  set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
  atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);

 }
 list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
 return true;
}

static struct io_overflow_cqe *io_alloc_ocqe(struct io_ring_ctx *ctx,
          struct io_cqe *cqe,
          struct io_big_cqe *big_cqe, gfp_t gfp)
{
 struct io_overflow_cqe *ocqe;
 size_t ocq_size = sizeof(struct io_overflow_cqe);
 bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);

 if (is_cqe32)
  ocq_size += sizeof(struct io_uring_cqe);

 ocqe = kzalloc(ocq_size, gfp | __GFP_ACCOUNT);
 trace_io_uring_cqe_overflow(ctx, cqe->user_data, cqe->res, cqe->flags, ocqe);
 if (ocqe) {
  ocqe->cqe.user_data = cqe->user_data;
  ocqe->cqe.res = cqe->res;
  ocqe->cqe.flags = cqe->flags;
  if (is_cqe32 && big_cqe) {
   ocqe->cqe.big_cqe[0] = big_cqe->extra1;
   ocqe->cqe.big_cqe[1] = big_cqe->extra2;
  }
 }
 if (big_cqe)
  big_cqe->extra1 = big_cqe->extra2 = 0;
 return ocqe;
}

/*
 * writes to the cq entry need to come after reading head; the
 * control dependency is enough as we're using WRITE_ONCE to
 * fill the cq entry
 */

bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow)
{
 struct io_rings *rings = ctx->rings;
 unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
 unsigned int free, queued, len;

 /*
 * Posting into the CQ when there are pending overflowed CQEs may break
 * ordering guarantees, which will affect links, F_MORE users and more.
 * Force overflow the completion.
 */

 if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)))
  return false;

 /* userspace may cheat modifying the tail, be safe and do min */
 queued = min(__io_cqring_events(ctx), ctx->cq_entries);
 free = ctx->cq_entries - queued;
 /* we need a contiguous range, limit based on the current array offset */
 len = min(free, ctx->cq_entries - off);
 if (!len)
  return false;

 if (ctx->flags & IORING_SETUP_CQE32) {
  off <<= 1;
  len <<= 1;
 }

 ctx->cqe_cached = &rings->cqes[off];
 ctx->cqe_sentinel = ctx->cqe_cached + len;
 return true;
}

static bool io_fill_cqe_aux32(struct io_ring_ctx *ctx,
         struct io_uring_cqe src_cqe[2])
{
 struct io_uring_cqe *cqe;

 if (WARN_ON_ONCE(!(ctx->flags & IORING_SETUP_CQE32)))
  return false;
 if (unlikely(!io_get_cqe(ctx, &cqe)))
  return false;

 memcpy(cqe, src_cqe, 2 * sizeof(*cqe));
 trace_io_uring_complete(ctx, NULL, cqe);
 return true;
}

static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
         u32 cflags)
{
 struct io_uring_cqe *cqe;

 if (likely(io_get_cqe(ctx, &cqe))) {
  WRITE_ONCE(cqe->user_data, user_data);
  WRITE_ONCE(cqe->res, res);
  WRITE_ONCE(cqe->flags, cflags);

  if (ctx->flags & IORING_SETUP_CQE32) {
   WRITE_ONCE(cqe->big_cqe[0], 0);
   WRITE_ONCE(cqe->big_cqe[1], 0);
  }

  trace_io_uring_complete(ctx, NULL, cqe);
  return true;
 }
 return false;
}

static inline struct io_cqe io_init_cqe(u64 user_data, s32 res, u32 cflags)
{
 return (struct io_cqe) { .user_data = user_data, .res = res, .flags = cflags };
}

static __cold void io_cqe_overflow(struct io_ring_ctx *ctx, struct io_cqe *cqe,
              struct io_big_cqe *big_cqe)
{
 struct io_overflow_cqe *ocqe;

 ocqe = io_alloc_ocqe(ctx, cqe, big_cqe, GFP_KERNEL);
 spin_lock(&ctx->completion_lock);
 io_cqring_add_overflow(ctx, ocqe);
 spin_unlock(&ctx->completion_lock);
}

static __cold bool io_cqe_overflow_locked(struct io_ring_ctx *ctx,
       struct io_cqe *cqe,
       struct io_big_cqe *big_cqe)
{
 struct io_overflow_cqe *ocqe;

 ocqe = io_alloc_ocqe(ctx, cqe, big_cqe, GFP_ATOMIC);
 return io_cqring_add_overflow(ctx, ocqe);
}

bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
{
 bool filled;

 io_cq_lock(ctx);
 filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
 if (unlikely(!filled)) {
  struct io_cqe cqe = io_init_cqe(user_data, res, cflags);

  filled = io_cqe_overflow_locked(ctx, &cqe, NULL);
 }
 io_cq_unlock_post(ctx);
 return filled;
}

/*
 * Must be called from inline task_work so we now a flush will happen later,
 * and obviously with ctx->uring_lock held (tw always has that).
 */

void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
{
 lockdep_assert_held(&ctx->uring_lock);
 lockdep_assert(ctx->lockless_cq);

 if (!io_fill_cqe_aux(ctx, user_data, res, cflags)) {
  struct io_cqe cqe = io_init_cqe(user_data, res, cflags);

  io_cqe_overflow(ctx, &cqe, NULL);
 }
 ctx->submit_state.cq_flush = true;
}

/*
 * A helper for multishot requests posting additional CQEs.
 * Should only be used from a task_work including IO_URING_F_MULTISHOT.
 */

bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags)
{
 struct io_ring_ctx *ctx = req->ctx;
 bool posted;

 /*
 * If multishot has already posted deferred completions, ensure that
 * those are flushed first before posting this one. If not, CQEs
 * could get reordered.
 */

 if (!wq_list_empty(&ctx->submit_state.compl_reqs))
  __io_submit_flush_completions(ctx);

 lockdep_assert(!io_wq_current_is_worker());
 lockdep_assert_held(&ctx->uring_lock);

 if (!ctx->lockless_cq) {
  spin_lock(&ctx->completion_lock);
  posted = io_fill_cqe_aux(ctx, req->cqe.user_data, res, cflags);
  spin_unlock(&ctx->completion_lock);
 } else {
  posted = io_fill_cqe_aux(ctx, req->cqe.user_data, res, cflags);
 }

 ctx->submit_state.cq_flush = true;
 return posted;
}

/*
 * A helper for multishot requests posting additional CQEs.
 * Should only be used from a task_work including IO_URING_F_MULTISHOT.
 */

bool io_req_post_cqe32(struct io_kiocb *req, struct io_uring_cqe cqe[2])
{
 struct io_ring_ctx *ctx = req->ctx;
 bool posted;

 lockdep_assert(!io_wq_current_is_worker());
 lockdep_assert_held(&ctx->uring_lock);

 cqe[0].user_data = req->cqe.user_data;
 if (!ctx->lockless_cq) {
  spin_lock(&ctx->completion_lock);
  posted = io_fill_cqe_aux32(ctx, cqe);
  spin_unlock(&ctx->completion_lock);
 } else {
  posted = io_fill_cqe_aux32(ctx, cqe);
 }

 ctx->submit_state.cq_flush = true;
 return posted;
}

static void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
 struct io_ring_ctx *ctx = req->ctx;
 bool completed = true;

 /*
 * All execution paths but io-wq use the deferred completions by
 * passing IO_URING_F_COMPLETE_DEFER and thus should not end up here.
 */

 if (WARN_ON_ONCE(!(issue_flags & IO_URING_F_IOWQ)))
  return;

 /*
 * Handle special CQ sync cases via task_work. DEFER_TASKRUN requires
 * the submitter task context, IOPOLL protects with uring_lock.
 */

 if (ctx->lockless_cq || (req->flags & REQ_F_REISSUE)) {
defer_complete:
  req->io_task_work.func = io_req_task_complete;
  io_req_task_work_add(req);
  return;
 }

 io_cq_lock(ctx);
 if (!(req->flags & REQ_F_CQE_SKIP))
  completed = io_fill_cqe_req(ctx, req);
 io_cq_unlock_post(ctx);

 if (!completed)
  goto defer_complete;

 /*
 * We don't free the request here because we know it's called from
 * io-wq only, which holds a reference, so it cannot be the last put.
 */

 req_ref_put(req);
}

void io_req_defer_failed(struct io_kiocb *req, s32 res)
 __must_hold(&ctx->uring_lock)
{
 const struct io_cold_def *def = &io_cold_defs[req->opcode];

 lockdep_assert_held(&req->ctx->uring_lock);

 req_set_fail(req);
 io_req_set_res(req, res, io_put_kbuf(req, res, IO_URING_F_UNLOCKED));
 if (def->fail)
  def->fail(req);
 io_req_complete_defer(req);
}

/*
 * A request might get retired back into the request caches even before opcode
 * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
 * Because of that, io_alloc_req() should be called only under ->uring_lock
 * and with extra caution to not get a request that is still worked on.
 */

__cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
 __must_hold(&ctx->uring_lock)
{
 gfp_t gfp = GFP_KERNEL | __GFP_NOWARN | __GFP_ZERO;
 void *reqs[IO_REQ_ALLOC_BATCH];
 int ret;

 ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);

 /*
 * Bulk alloc is all-or-nothing. If we fail to get a batch,
 * retry single alloc to be on the safe side.
 */

 if (unlikely(ret <= 0)) {
  reqs[0] = kmem_cache_alloc(req_cachep, gfp);
  if (!reqs[0])
   return false;
  ret = 1;
 }

 percpu_ref_get_many(&ctx->refs, ret);
 ctx->nr_req_allocated += ret;

 while (ret--) {
  struct io_kiocb *req = reqs[ret];

  io_req_add_to_cache(req, ctx);
 }
 return true;
}

__cold void io_free_req(struct io_kiocb *req)
{
 /* refs were already put, restore them for io_req_task_complete() */
 req->flags &= ~REQ_F_REFCOUNT;
 /* we only want to free it, don't post CQEs */
 req->flags |= REQ_F_CQE_SKIP;
 req->io_task_work.func = io_req_task_complete;
 io_req_task_work_add(req);
}

static void __io_req_find_next_prep(struct io_kiocb *req)
{
 struct io_ring_ctx *ctx = req->ctx;

 spin_lock(&ctx->completion_lock);
 io_disarm_next(req);
 spin_unlock(&ctx->completion_lock);
}

static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
{
 struct io_kiocb *nxt;

 /*
 * If LINK is set, we have dependent requests in this chain. If we
 * didn't fail this request, queue the first one up, moving any other
 * dependencies to the next request. In case of failure, fail the rest
 * of the chain.
 */

 if (unlikely(req->flags & IO_DISARM_MASK))
  __io_req_find_next_prep(req);
 nxt = req->link;
 req->link = NULL;
 return nxt;
}

static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
{
 if (!ctx)
  return;
 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
  atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);

 io_submit_flush_completions(ctx);
 mutex_unlock(&ctx->uring_lock);
 percpu_ref_put(&ctx->refs);
}

/*
 * Run queued task_work, returning the number of entries processed in *count.
 * If more entries than max_entries are available, stop processing once this
 * is reached and return the rest of the list.
 */

struct llist_node *io_handle_tw_list(struct llist_node *node,
         unsigned int *count,
         unsigned int max_entries)
{
 struct io_ring_ctx *ctx = NULL;
 struct io_tw_state ts = { };

 do {
  struct llist_node *next = node->next;
  struct io_kiocb *req = container_of(node, struct io_kiocb,
          io_task_work.node);

  if (req->ctx != ctx) {
   ctx_flush_and_put(ctx, ts);
   ctx = req->ctx;
   mutex_lock(&ctx->uring_lock);
   percpu_ref_get(&ctx->refs);
  }
  INDIRECT_CALL_2(req->io_task_work.func,
    io_poll_task_func, io_req_rw_complete,
    req, ts);
  node = next;
  (*count)++;
  if (unlikely(need_resched())) {
   ctx_flush_and_put(ctx, ts);
   ctx = NULL;
   cond_resched();
  }
 } while (node && *count < max_entries);

 ctx_flush_and_put(ctx, ts);
 return node;
}

static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
{
 struct io_ring_ctx *last_ctx = NULL;
 struct io_kiocb *req;

 while (node) {
  req = container_of(node, struct io_kiocb, io_task_work.node);
  node = node->next;
  if (last_ctx != req->ctx) {
   if (last_ctx) {
    if (sync)
     flush_delayed_work(&last_ctx->fallback_work);
    percpu_ref_put(&last_ctx->refs);
   }
   last_ctx = req->ctx;
   percpu_ref_get(&last_ctx->refs);
  }
  if (llist_add(&req->io_task_work.node, &last_ctx->fallback_llist))
   schedule_delayed_work(&last_ctx->fallback_work, 1);
 }

 if (last_ctx) {
  if (sync)
   flush_delayed_work(&last_ctx->fallback_work);
  percpu_ref_put(&last_ctx->refs);
 }
}

static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
{
 struct llist_node *node = llist_del_all(&tctx->task_list);

 __io_fallback_tw(node, sync);
}

struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
          unsigned int max_entries,
          unsigned int *count)
{
 struct llist_node *node;

 if (unlikely(current->flags & PF_EXITING)) {
  io_fallback_tw(tctx, true);
  return NULL;
 }

 node = llist_del_all(&tctx->task_list);
 if (node) {
  node = llist_reverse_order(node);
  node = io_handle_tw_list(node, count, max_entries);
 }

 /* relaxed read is enough as only the task itself sets ->in_cancel */
 if (unlikely(atomic_read(&tctx->in_cancel)))
  io_uring_drop_tctx_refs(current);

 trace_io_uring_task_work_run(tctx, *count);
 return node;
}

void tctx_task_work(struct callback_head *cb)
{
 struct io_uring_task *tctx;
 struct llist_node *ret;
 unsigned int count = 0;

 tctx = container_of(cb, struct io_uring_task, task_work);
 ret = tctx_task_work_run(tctx, UINT_MAX, &count);
 /* can't happen */
 WARN_ON_ONCE(ret);
}

static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
 struct io_ring_ctx *ctx = req->ctx;
 unsigned nr_wait, nr_tw, nr_tw_prev;
 struct llist_node *head;

 /* See comment above IO_CQ_WAKE_INIT */
 BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);

 /*
 * We don't know how many reuqests is there in the link and whether
 * they can even be queued lazily, fall back to non-lazy.
 */

 if (req->flags & IO_REQ_LINK_FLAGS)
  flags &= ~IOU_F_TWQ_LAZY_WAKE;

 guard(rcu)();

 head = READ_ONCE(ctx->work_llist.first);
 do {
  nr_tw_prev = 0;
  if (head) {
   struct io_kiocb *first_req = container_of(head,
       struct io_kiocb,
       io_task_work.node);
   /*
 * Might be executed at any moment, rely on
 * SLAB_TYPESAFE_BY_RCU to keep it alive.
 */

   nr_tw_prev = READ_ONCE(first_req->nr_tw);
  }

  /*
 * Theoretically, it can overflow, but that's fine as one of
 * previous adds should've tried to wake the task.
 */

  nr_tw = nr_tw_prev + 1;
  if (!(flags & IOU_F_TWQ_LAZY_WAKE))
   nr_tw = IO_CQ_WAKE_FORCE;

  req->nr_tw = nr_tw;
  req->io_task_work.node.next = head;
 } while (!try_cmpxchg(&ctx->work_llist.first, &head,
         &req->io_task_work.node));

 /*
 * cmpxchg implies a full barrier, which pairs with the barrier
 * in set_current_state() on the io_cqring_wait() side. It's used
 * to ensure that either we see updated ->cq_wait_nr, or waiters
 * going to sleep will observe the work added to the list, which
 * is similar to the wait/wawke task state sync.
 */


 if (!head) {
  if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
   atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
  if (ctx->has_evfd)
   io_eventfd_signal(ctx, false);
 }

 nr_wait = atomic_read(&ctx->cq_wait_nr);
 /* not enough or no one is waiting */
 if (nr_tw < nr_wait)
  return;
 /* the previous add has already woken it up */
 if (nr_tw_prev >= nr_wait)
  return;
 wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}

static void io_req_normal_work_add(struct io_kiocb *req)
{
 struct io_uring_task *tctx = req->tctx;
 struct io_ring_ctx *ctx = req->ctx;

 /* task_work already pending, we're done */
 if (!llist_add(&req->io_task_work.node, &tctx->task_list))
  return;

 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
  atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);

 /* SQPOLL doesn't need the task_work added, it'll run it itself */
 if (ctx->flags & IORING_SETUP_SQPOLL) {
  __set_notify_signal(tctx->task);
  return;
 }

 if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
  return;

 io_fallback_tw(tctx, false);
}

void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
{
 if (req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)
  io_req_local_work_add(req, flags);
 else
  io_req_normal_work_add(req);
}

void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
{
 if (WARN_ON_ONCE(!(req->ctx->flags & IORING_SETUP_DEFER_TASKRUN)))
  return;
 __io_req_task_work_add(req, flags);
}

static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
{
 struct llist_node *node = llist_del_all(&ctx->work_llist);

 __io_fallback_tw(node, false);
 node = llist_del_all(&ctx->retry_llist);
 __io_fallback_tw(node, false);
}

static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events,
           int min_events)
{
 if (!io_local_work_pending(ctx))
  return false;
 if (events < min_events)
  return true;
 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
  atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
 return false;
}

static int __io_run_local_work_loop(struct llist_node **node,
        io_tw_token_t tw,
        int events)
{
 int ret = 0;

 while (*node) {
  struct llist_node *next = (*node)->next;
  struct io_kiocb *req = container_of(*node, struct io_kiocb,
          io_task_work.node);
  INDIRECT_CALL_2(req->io_task_work.func,
    io_poll_task_func, io_req_rw_complete,
    req, tw);
  *node = next;
  if (++ret >= events)
   break;
 }

 return ret;
}

static int __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw,
          int min_events, int max_events)
{
 struct llist_node *node;
 unsigned int loops = 0;
 int ret = 0;

 if (WARN_ON_ONCE(ctx->submitter_task != current))
  return -EEXIST;
 if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
  atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
again:
 min_events -= ret;
 ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events);
 if (ctx->retry_llist.first)
  goto retry_done;

 /*
 * llists are in reverse order, flip it back the right way before
 * running the pending items.
 */

 node = llist_reverse_order(llist_del_all(&ctx->work_llist));
 ret += __io_run_local_work_loop(&node, tw, max_events - ret);
 ctx->retry_llist.first = node;
 loops++;

 if (io_run_local_work_continue(ctx, ret, min_events))
  goto again;
retry_done:
 io_submit_flush_completions(ctx);
 if (io_run_local_work_continue(ctx, ret, min_events))
  goto again;

 trace_io_uring_local_work_run(ctx, ret, loops);
 return ret;
}

static inline int io_run_local_work_locked(struct io_ring_ctx *ctx,
        int min_events)
{
 struct io_tw_state ts = {};

 if (!io_local_work_pending(ctx))
  return 0;
 return __io_run_local_work(ctx, ts, min_events,
     max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
}

static int io_run_local_work(struct io_ring_ctx *ctx, int min_events,
        int max_events)
{
 struct io_tw_state ts = {};
 int ret;

 mutex_lock(&ctx->uring_lock);
 ret = __io_run_local_work(ctx, ts, min_events, max_events);
 mutex_unlock(&ctx->uring_lock);
 return ret;
}

static void io_req_task_cancel(struct io_kiocb *req, io_tw_token_t tw)
{
 io_tw_lock(req->ctx, tw);
 io_req_defer_failed(req, req->cqe.res);
}

void io_req_task_submit(struct io_kiocb *req, io_tw_token_t tw)
{
 struct io_ring_ctx *ctx = req->ctx;

 io_tw_lock(ctx, tw);
 if (unlikely(io_should_terminate_tw(ctx)))
  io_req_defer_failed(req, -EFAULT);
 else if (req->flags & REQ_F_FORCE_ASYNC)
  io_queue_iowq(req);
 else
  io_queue_sqe(req, 0);
}

void io_req_task_queue_fail(struct io_kiocb *req, int ret)
{
 io_req_set_res(req, ret, 0);
 req->io_task_work.func = io_req_task_cancel;
 io_req_task_work_add(req);
}

void io_req_task_queue(struct io_kiocb *req)
{
 req->io_task_work.func = io_req_task_submit;
 io_req_task_work_add(req);
}

void io_queue_next(struct io_kiocb *req)
{
 struct io_kiocb *nxt = io_req_find_next(req);

 if (nxt)
  io_req_task_queue(nxt);
}

static inline void io_req_put_rsrc_nodes(struct io_kiocb *req)
{
 if (req->file_node) {
  io_put_rsrc_node(req->ctx, req->file_node);
  req->file_node = NULL;
 }
 if (req->flags & REQ_F_BUF_NODE)
  io_put_rsrc_node(req->ctx, req->buf_node);
}

static void io_free_batch_list(struct io_ring_ctx *ctx,
          struct io_wq_work_node *node)
 __must_hold(&ctx->uring_lock)
{
 do {
  struct io_kiocb *req = container_of(node, struct io_kiocb,
          comp_list);

  if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
   if (req->flags & REQ_F_REISSUE) {
    node = req->comp_list.next;
    req->flags &= ~REQ_F_REISSUE;
    io_queue_iowq(req);
    continue;
   }
   if (req->flags & REQ_F_REFCOUNT) {
    node = req->comp_list.next;
    if (!req_ref_put_and_test(req))
     continue;
   }
   if ((req->flags & REQ_F_POLLED) && req->apoll) {
    struct async_poll *apoll = req->apoll;

    if (apoll->double_poll)
     kfree(apoll->double_poll);
    io_cache_free(&ctx->apoll_cache, apoll);
    req->flags &= ~REQ_F_POLLED;
   }
   if (req->flags & IO_REQ_LINK_FLAGS)
    io_queue_next(req);
   if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
    io_clean_op(req);
  }
  io_put_file(req);
  io_req_put_rsrc_nodes(req);
  io_put_task(req);

  node = req->comp_list.next;
  io_req_add_to_cache(req, ctx);
 } while (node);
}

void __io_submit_flush_completions(struct io_ring_ctx *ctx)
 __must_hold(&ctx->uring_lock)
{
 struct io_submit_state *state = &ctx->submit_state;
 struct io_wq_work_node *node;

 __io_cq_lock(ctx);
 __wq_list_for_each(node, &state->compl_reqs) {
  struct io_kiocb *req = container_of(node, struct io_kiocb,
         comp_list);

  /*
 * Requests marked with REQUEUE should not post a CQE, they
 * will go through the io-wq retry machinery and post one
 * later.
 */

  if (!(req->flags & (REQ_F_CQE_SKIP | REQ_F_REISSUE)) &&
      unlikely(!io_fill_cqe_req(ctx, req))) {
   if (ctx->lockless_cq)
    io_cqe_overflow(ctx, &req->cqe, &req->big_cqe);
   else
    io_cqe_overflow_locked(ctx, &req->cqe, &req->big_cqe);
  }
 }
 __io_cq_unlock_post(ctx);

 if (!wq_list_empty(&state->compl_reqs)) {
  io_free_batch_list(ctx, state->compl_reqs.first);
  INIT_WQ_LIST(&state->compl_reqs);
 }

 if (unlikely(ctx->drain_active))
  io_queue_deferred(ctx);

 ctx->submit_state.cq_flush = false;
}

static unsigned io_cqring_events(struct io_ring_ctx *ctx)
{
 /* See comment at the top of this file */
 smp_rmb();
 return __io_cqring_events(ctx);
}

/*
 * We can't just wait for polled events to come to us, we have to actively
 * find and complete them.
 */

static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
{
 if (!(ctx->flags & IORING_SETUP_IOPOLL))
  return;

 mutex_lock(&ctx->uring_lock);
 while (!wq_list_empty(&ctx->iopoll_list)) {
  /* let it sleep and repeat later if can't complete a request */
  if (io_do_iopoll(ctx, true) == 0)
   break;
  /*
 * Ensure we allow local-to-the-cpu processing to take place,
 * in this case we need to ensure that we reap all events.
 * Also let task_work, etc. to progress by releasing the mutex
 */

  if (need_resched()) {
   mutex_unlock(&ctx->uring_lock);
   cond_resched();
   mutex_lock(&ctx->uring_lock);
  }
 }
 mutex_unlock(&ctx->uring_lock);

 if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
  io_move_task_work_from_local(ctx);
}

static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned int min_events)
{
 unsigned int nr_events = 0;
 unsigned long check_cq;

 min_events = min(min_events, ctx->cq_entries);

 lockdep_assert_held(&ctx->uring_lock);

 if (!io_allowed_run_tw(ctx))
  return -EEXIST;

 check_cq = READ_ONCE(ctx->check_cq);
 if (unlikely(check_cq)) {
  if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
   __io_cqring_overflow_flush(ctx, false);
  /*
 * Similarly do not spin if we have not informed the user of any
 * dropped CQE.
 */

  if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
   return -EBADR;
 }
 /*
 * Don't enter poll loop if we already have events pending.
 * If we do, we can potentially be spinning for commands that
 * already triggered a CQE (eg in error).
 */

 if (io_cqring_events(ctx))
  return 0;

 do {
  int ret = 0;

  /*
 * If a submit got punted to a workqueue, we can have the
 * application entering polling for a command before it gets
 * issued. That app will hold the uring_lock for the duration
 * of the poll right here, so we need to take a breather every
 * now and then to ensure that the issue has a chance to add
 * the poll to the issued list. Otherwise we can spin here
 * forever, while the workqueue is stuck trying to acquire the
 * very same mutex.
 */

  if (wq_list_empty(&ctx->iopoll_list) ||
      io_task_work_pending(ctx)) {
   u32 tail = ctx->cached_cq_tail;

   (void) io_run_local_work_locked(ctx, min_events);

   if (task_work_pending(current) ||
       wq_list_empty(&ctx->iopoll_list)) {
    mutex_unlock(&ctx->uring_lock);
    io_run_task_work();
    mutex_lock(&ctx->uring_lock);
   }
   /* some requests don't go through iopoll_list */
   if (tail != ctx->cached_cq_tail ||
       wq_list_empty(&ctx->iopoll_list))
    break;
  }
  ret = io_do_iopoll(ctx, !min_events);
  if (unlikely(ret < 0))
   return ret;

  if (task_sigpending(current))
   return -EINTR;
  if (need_resched())
   break;

  nr_events += ret;
 } while (nr_events < min_events);

 return 0;
}

void io_req_task_complete(struct io_kiocb *req, io_tw_token_t tw)
{
 io_req_complete_defer(req);
}

/*
 * After the iocb has been issued, it's safe to be found on the poll list.
 * Adding the kiocb to the list AFTER submission ensures that we don't
 * find it from a io_do_iopoll() thread before the issuer is done
 * accessing the kiocb cookie.
 */

static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
{
 struct io_ring_ctx *ctx = req->ctx;
 const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;

 /* workqueue context doesn't hold uring_lock, grab it now */
 if (unlikely(needs_lock))
  mutex_lock(&ctx->uring_lock);

 /*
 * Track whether we have multiple files in our lists. This will impact
 * how we do polling eventually, not spinning if we're on potentially
 * different devices.
 */

 if (wq_list_empty(&ctx->iopoll_list)) {
  ctx->poll_multi_queue = false;
 } else if (!ctx->poll_multi_queue) {
  struct io_kiocb *list_req;

  list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
     comp_list);
  if (list_req->file != req->file)
   ctx->poll_multi_queue = true;
 }

 /*
 * For fast devices, IO may have already completed. If it has, add
 * it to the front so we find it first.
 */

 if (READ_ONCE(req->iopoll_completed))
  wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
 else
  wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);

 if (unlikely(needs_lock)) {
  /*
 * If IORING_SETUP_SQPOLL is enabled, sqes are either handle
 * in sq thread task context or in io worker task context. If
 * current task context is sq thread, we don't need to check
 * whether should wake up sq thread.
 */

  if ((ctx->flags & IORING_SETUP_SQPOLL) &&
      wq_has_sleeper(&ctx->sq_data->wait))
   wake_up(&ctx->sq_data->wait);

  mutex_unlock(&ctx->uring_lock);
 }
}

io_req_flags_t io_file_get_flags(struct file *file)
{
 io_req_flags_t res = 0;

 BUILD_BUG_ON(REQ_F_ISREG_BIT != REQ_F_SUPPORT_NOWAIT_BIT + 1);

 if (S_ISREG(file_inode(file)->i_mode))
  res |= REQ_F_ISREG;
 if ((file->f_flags & O_NONBLOCK) || (file->f_mode & FMODE_NOWAIT))
  res |= REQ_F_SUPPORT_NOWAIT;
 return res;
}

static __cold void io_drain_req(struct io_kiocb *req)
 __must_hold(&ctx->uring_lock)
{
 struct io_ring_ctx *ctx = req->ctx;
 bool drain = req->flags & IOSQE_IO_DRAIN;
 struct io_defer_entry *de;

 de = kmalloc(sizeof(*de), GFP_KERNEL_ACCOUNT);
 if (!de) {
  io_req_defer_failed(req, -ENOMEM);
  return;
 }

 io_prep_async_link(req);
 trace_io_uring_defer(req);
 de->req = req;

 ctx->nr_drained += io_linked_nr(req);
 list_add_tail(&de->list, &ctx->defer_list);
 io_queue_deferred(ctx);
 if (!drain && list_empty(&ctx->defer_list))
  ctx->drain_active = false;
}

static bool io_assign_file(struct io_kiocb *req, const struct io_issue_def *def,
      unsigned int issue_flags)
{
 if (req->file || !def->needs_file)
  return true;

 if (req->flags & REQ_F_FIXED_FILE)
  req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
 else
  req->file = io_file_get_normal(req, req->cqe.fd);

 return !!req->file;
}

#define REQ_ISSUE_SLOW_FLAGS (REQ_F_CREDS | REQ_F_ARM_LTIMEOUT)

static inline int __io_issue_sqe(struct io_kiocb *req,
     unsigned int issue_flags,
     const struct io_issue_def *def)
{
 const struct cred *creds = NULL;
 struct io_kiocb *link = NULL;
 int ret;

 if (unlikely(req->flags & REQ_ISSUE_SLOW_FLAGS)) {
  if ((req->flags & REQ_F_CREDS) && req->creds != current_cred())
   creds = override_creds(req->creds);
  if (req->flags & REQ_F_ARM_LTIMEOUT)
   link = __io_prep_linked_timeout(req);
 }

 if (!def->audit_skip)
  audit_uring_entry(req->opcode);

 ret = def->issue(req, issue_flags);

 if (!def->audit_skip)
  audit_uring_exit(!ret, ret);

 if (unlikely(creds || link)) {
  if (creds)
   revert_creds(creds);
  if (link)
   io_queue_linked_timeout(link);
 }

 return ret;
}

static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
{
 const struct io_issue_def *def = &io_issue_defs[req->opcode];
 int ret;

 if (unlikely(!io_assign_file(req, def, issue_flags)))
  return -EBADF;

 ret = __io_issue_sqe(req, issue_flags, def);

 if (ret == IOU_COMPLETE) {
  if (issue_flags & IO_URING_F_COMPLETE_DEFER)
   io_req_complete_defer(req);
  else
   io_req_complete_post(req, issue_flags);

  return 0;
 }

 if (ret == IOU_ISSUE_SKIP_COMPLETE) {
  ret = 0;

  /* If the op doesn't have a file, we're not polling for it */
  if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
   io_iopoll_req_issued(req, issue_flags);
 }
 return ret;
}

int io_poll_issue(struct io_kiocb *req, io_tw_token_t tw)
{
 const unsigned int issue_flags = IO_URING_F_NONBLOCK |
      IO_URING_F_MULTISHOT |
      IO_URING_F_COMPLETE_DEFER;
 int ret;

 io_tw_lock(req->ctx, tw);

 WARN_ON_ONCE(!req->file);
 if (WARN_ON_ONCE(req->ctx->flags & IORING_SETUP_IOPOLL))
  return -EFAULT;

 ret = __io_issue_sqe(req, issue_flags, &io_issue_defs[req->opcode]);

 WARN_ON_ONCE(ret == IOU_ISSUE_SKIP_COMPLETE);
 return ret;
}

struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
{
 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
 struct io_kiocb *nxt = NULL;

 if (req_ref_put_and_test_atomic(req)) {
  if (req->flags & IO_REQ_LINK_FLAGS)
   nxt = io_req_find_next(req);
  io_free_req(req);
 }
 return nxt ? &nxt->work : NULL;
}

void io_wq_submit_work(struct io_wq_work *work)
{
 struct io_kiocb *req = container_of(work, struct io_kiocb, work);
 const struct io_issue_def *def = &io_issue_defs[req->opcode];
 unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
 bool needs_poll = false;
 int ret = 0, err = -ECANCELED;

 /* one will be dropped by io_wq_free_work() after returning to io-wq */
 if (!(req->flags & REQ_F_REFCOUNT))
  __io_req_set_refcount(req, 2);
 else
  req_ref_get(req);

 /* either cancelled or io-wq is dying, so don't touch tctx->iowq */
 if (atomic_read(&work->flags) & IO_WQ_WORK_CANCEL) {
fail:
  io_req_task_queue_fail(req, err);
  return;
 }
 if (!io_assign_file(req, def, issue_flags)) {
  err = -EBADF;
  atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
  goto fail;
 }

 /*
 * If DEFER_TASKRUN is set, it's only allowed to post CQEs from the
 * submitter task context. Final request completions are handed to the
 * right context, however this is not the case of auxiliary CQEs,
 * which is the main mean of operation for multishot requests.
 * Don't allow any multishot execution from io-wq. It's more restrictive
 * than necessary and also cleaner.
 */

 if (req->flags & (REQ_F_MULTISHOT|REQ_F_APOLL_MULTISHOT)) {
  err = -EBADFD;
  if (!io_file_can_poll(req))
   goto fail;
  if (req->file->f_flags & O_NONBLOCK ||
      req->file->f_mode & FMODE_NOWAIT) {
   err = -ECANCELED;
   if (io_arm_poll_handler(req, issue_flags) != IO_APOLL_OK)
    goto fail;
   return;
  } else {
   req->flags &= ~(REQ_F_APOLL_MULTISHOT|REQ_F_MULTISHOT);
  }
 }

 if (req->flags & REQ_F_FORCE_ASYNC) {
  bool opcode_poll = def->pollin || def->pollout;

  if (opcode_poll && io_file_can_poll(req)) {
   needs_poll = true;
   issue_flags |= IO_URING_F_NONBLOCK;
  }
 }

 do {
  ret = io_issue_sqe(req, issue_flags);
  if (ret != -EAGAIN)
   break;

  /*
 * If REQ_F_NOWAIT is set, then don't wait or retry with
 * poll. -EAGAIN is final for that case.
 */

  if (req->flags & REQ_F_NOWAIT)
   break;

  /*
 * We can get EAGAIN for iopolled IO even though we're
 * forcing a sync submission from here, since we can't
 * wait for request slots on the block side.
 */

  if (!needs_poll) {
   if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
    break;
   if (io_wq_worker_stopped())
    break;
   cond_resched();
   continue;
  }

  if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
   return;
  /* aborted or ready, in either case retry blocking */
  needs_poll = false;
  issue_flags &= ~IO_URING_F_NONBLOCK;
 } while (1);

 /* avoid locking problems by failing it from a clean context */
 if (ret)
  io_req_task_queue_fail(req, ret);
}

inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
          unsigned int issue_flags)
{
 struct io_ring_ctx *ctx = req->ctx;
 struct io_rsrc_node *node;
 struct file *file = NULL;

 io_ring_submit_lock(ctx, issue_flags);
 node = io_rsrc_node_lookup(&ctx->file_table.data, fd);
 if (node) {
  node->refs++;
  req->file_node = node;
  req->flags |= io_slot_flags(node);
  file = io_slot_file(node);
 }
 io_ring_submit_unlock(ctx, issue_flags);
 return file;
}

struct file *io_file_get_normal(struct io_kiocb *req, int fd)
{
 struct file *file = fget(fd);

 trace_io_uring_file_get(req, fd);

 /* we don't allow fixed io_uring files */
 if (file && io_is_uring_fops(file))
  io_req_track_inflight(req);
 return file;
}

static int io_req_sqe_copy(struct io_kiocb *req, unsigned int issue_flags)
{
 const struct io_cold_def *def = &io_cold_defs[req->opcode];

 if (req->flags & REQ_F_SQE_COPIED)
  return 0;
 req->flags |= REQ_F_SQE_COPIED;
 if (!def->sqe_copy)
  return 0;
 if (WARN_ON_ONCE(!(issue_flags & IO_URING_F_INLINE)))
  return -EFAULT;
 def->sqe_copy(req);
 return 0;
}

static void io_queue_async(struct io_kiocb *req, unsigned int issue_flags, int ret)
 __must_hold(&req->ctx->uring_lock)
{
 if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
fail:
  io_req_defer_failed(req, ret);
  return;
 }

 ret = io_req_sqe_copy(req, issue_flags);
 if (unlikely(ret))
  goto fail;

 switch (io_arm_poll_handler(req, 0)) {
 case IO_APOLL_READY:
  io_kbuf_recycle(req, 0);
  io_req_task_queue(req);
  break;
 case IO_APOLL_ABORTED:
  io_kbuf_recycle(req, 0);
  io_queue_iowq(req);
  break;
 case IO_APOLL_OK:
  break;
 }
}

static inline void io_queue_sqe(struct io_kiocb *req, unsigned int extra_flags)
 __must_hold(&req->ctx->uring_lock)
{
 unsigned int issue_flags = IO_URING_F_NONBLOCK |
       IO_URING_F_COMPLETE_DEFER | extra_flags;
 int ret;

 ret = io_issue_sqe(req, issue_flags);

 /*
 * We async punt it if the file wasn't marked NOWAIT, or if the file
 * doesn't support non-blocking read/write attempts
 */

 if (unlikely(ret))
  io_queue_async(req, issue_flags, ret);
}

static void io_queue_sqe_fallback(struct io_kiocb *req)
 __must_hold(&req->ctx->uring_lock)
{
 if (unlikely(req->flags & REQ_F_FAIL)) {
  /*
 * We don't submit, fail them all, for that replace hardlinks
 * with normal links. Extra REQ_F_LINK is tolerated.
 */

  req->flags &= ~REQ_F_HARDLINK;
  req->flags |= REQ_F_LINK;
  io_req_defer_failed(req, req->cqe.res);
 } else {
  /* can't fail with IO_URING_F_INLINE */
  io_req_sqe_copy(req, IO_URING_F_INLINE);
  if (unlikely(req->ctx->drain_active))
   io_drain_req(req);
  else
   io_queue_iowq(req);
 }
}

/*
 * Check SQE restrictions (opcode and flags).
 *
 * Returns 'true' if SQE is allowed, 'false' otherwise.
 */

static inline bool io_check_restriction(struct io_ring_ctx *ctx,
     struct io_kiocb *req,
     unsigned int sqe_flags)
{
 if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
  return false;

 if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
     ctx->restrictions.sqe_flags_required)
  return false;

 if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
     ctx->restrictions.sqe_flags_required))
  return false;

 return true;
}

static void io_init_drain(struct io_ring_ctx *ctx)
{
 struct io_kiocb *head = ctx->submit_state.link.head;

 ctx->drain_active = true;
 if (head) {
  /*
 * If we need to drain a request in the middle of a link, drain
 * the head request and the next request/link after the current
 * link. Considering sequential execution of links,
 * REQ_F_IO_DRAIN will be maintained for every request of our
 * link.
 */

  head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
  ctx->drain_next = true;
 }
}

static __cold int io_init_fail_req(struct io_kiocb *req, int err)
{
 /* ensure per-opcode data is cleared if we fail before prep */
 memset(&req->cmd.data, 0, sizeof(req->cmd.data));
 return err;
}

static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
         const struct io_uring_sqe *sqe)
 __must_hold(&ctx->uring_lock)
{
 const struct io_issue_def *def;
 unsigned int sqe_flags;
 int personality;
 u8 opcode;

 req->ctx = ctx;
 req->opcode = opcode = READ_ONCE(sqe->opcode);
 /* same numerical values with corresponding REQ_F_*, safe to copy */
 sqe_flags = READ_ONCE(sqe->flags);
 req->flags = (__force io_req_flags_t) sqe_flags;
 req->cqe.user_data = READ_ONCE(sqe->user_data);
 req->file = NULL;
 req->tctx = current->io_uring;
 req->cancel_seq_set = false;
 req->async_data = NULL;

 if (unlikely(opcode >= IORING_OP_LAST)) {
  req->opcode = 0;
  return io_init_fail_req(req, -EINVAL);
 }
 opcode = array_index_nospec(opcode, IORING_OP_LAST);

 def = &io_issue_defs[opcode];
 if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
  /* enforce forwards compatibility on users */
  if (sqe_flags & ~SQE_VALID_FLAGS)
   return io_init_fail_req(req, -EINVAL);
  if (sqe_flags & IOSQE_BUFFER_SELECT) {
   if (!def->buffer_select)
    return io_init_fail_req(req, -EOPNOTSUPP);
   req->buf_index = READ_ONCE(sqe->buf_group);
  }
  if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
   ctx->drain_disabled = true;
  if (sqe_flags & IOSQE_IO_DRAIN) {
   if (ctx->drain_disabled)
    return io_init_fail_req(req, -EOPNOTSUPP);
   io_init_drain(ctx);
  }
 }
 if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
  if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
   return io_init_fail_req(req, -EACCES);
  /* knock it to the slow queue path, will be drained there */
  if (ctx->drain_active)
   req->flags |= REQ_F_FORCE_ASYNC;
  /* if there is no link, we're at "next" request and need to drain */
  if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
   ctx->drain_next = false;
   ctx->drain_active = true;
   req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
  }
 }

 if (!def->ioprio && sqe->ioprio)
  return io_init_fail_req(req, -EINVAL);
 if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
  return io_init_fail_req(req, -EINVAL);

 if (def->needs_file) {
  struct io_submit_state *state = &ctx->submit_state;

  req->cqe.fd = READ_ONCE(sqe->fd);

  /*
 * Plug now if we have more than 2 IO left after this, and the
 * target is potentially a read/write to block based storage.
 */

  if (state->need_plug && def->plug) {
   state->plug_started = true;
   state->need_plug = false;
   blk_start_plug_nr_ios(&state->plug, state->submit_nr);
  }
 }

 personality = READ_ONCE(sqe->personality);
 if (personality) {
  int ret;

  req->creds = xa_load(&ctx->personalities, personality);
  if (!req->creds)
   return io_init_fail_req(req, -EINVAL);
  get_cred(req->creds);
  ret = security_uring_override_creds(req->creds);
  if (ret) {
   put_cred(req->creds);
   return io_init_fail_req(req, ret);
  }
  req->flags |= REQ_F_CREDS;
 }

 return def->prep(req, sqe);
}

static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
          struct io_kiocb *req, int ret)
{
 struct io_ring_ctx *ctx = req->ctx;
 struct io_submit_link *link = &ctx->submit_state.link;
 struct io_kiocb *head = link->head;

 trace_io_uring_req_failed(sqe, req, ret);

 /*
 * Avoid breaking links in the middle as it renders links with SQPOLL
 * unusable. Instead of failing eagerly, continue assembling the link if
 * applicable and mark the head with REQ_F_FAIL. The link flushing code
 * should find the flag and handle the rest.
 */

 req_fail_link_node(req, ret);
 if (head && !(head->flags & REQ_F_FAIL))
  req_fail_link_node(head, -ECANCELED);

 if (!(req->flags & IO_REQ_LINK_FLAGS)) {
  if (head) {
   link->last->link = req;
   link->head = NULL;
   req = head;
  }
  io_queue_sqe_fallback(req);
  return ret;
 }

 if (head)
  link->last->link = req;
 else
  link->head = req;
 link->last = req;
 return 0;
}

static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
    const struct io_uring_sqe *sqe)
 __must_hold(&ctx->uring_lock)
{
 struct io_submit_link *link = &ctx->submit_state.link;
 int ret;

 ret = io_init_req(ctx, req, sqe);
 if (unlikely(ret))
  return io_submit_fail_init(sqe, req, ret);

 trace_io_uring_submit_req(req);

 /*
 * If we already have a head request, queue this one for async
 * submittal once the head completes. If we don't have a head but
 * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
 * submitted sync once the chain is complete. If none of those
 * conditions are true (normal request), then just queue it.
 */

 if (unlikely(link->head)) {
  trace_io_uring_link(req, link->last);
  io_req_sqe_copy(req, IO_URING_F_INLINE);
  link->last->link = req;
  link->last = req;

  if (req->flags & IO_REQ_LINK_FLAGS)
   return 0;
  /* last request of the link, flush it */
  req = link->head;
  link->head = NULL;
  if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
   goto fallback;

 } else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
       REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
  if (req->flags & IO_REQ_LINK_FLAGS) {
   link->head = req;
   link->last = req;
  } else {
fallback:
   io_queue_sqe_fallback(req);
  }
  return 0;
 }

 io_queue_sqe(req, IO_URING_F_INLINE);
 return 0;
}

/*
 * Batched submission is done, ensure local IO is flushed out.
 */

static void io_submit_state_end(struct io_ring_ctx *ctx)
{
 struct io_submit_state *state = &ctx->submit_state;

 if (unlikely(state->link.head))
  io_queue_sqe_fallback(state->link.head);
 /* flush only after queuing links as they can generate completions */
 io_submit_flush_completions(ctx);
 if (state->plug_started)
  blk_finish_plug(&state->plug);
}

/*
 * Start submission side cache.
 */

static void io_submit_state_start(struct io_submit_state *state,
      unsigned int max_ios)
{
 state->plug_started = false;
 state->need_plug = max_ios > 2;
 state->submit_nr = max_ios;
 /* set only head, no need to init link_last in advance */
 state->link.head = NULL;
}

static void io_commit_sqring(struct io_ring_ctx *ctx)
{
 struct io_rings *rings = ctx->rings;

 /*
 * Ensure any loads from the SQEs are done at this point,
 * since once we write the new head, the application could
 * write new data to them.
 */

 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
}

/*
 * Fetch an sqe, if one is available. Note this returns a pointer to memory
 * that is mapped by userspace. This means that care needs to be taken to
 * ensure that reads are stable, as we cannot rely on userspace always
 * being a good citizen. If members of the sqe are validated and then later
 * used, it's important that those reads are done through READ_ONCE() to
 * prevent a re-load down the line.
 */

static bool io_get_sqe(struct io_ring_ctx *ctx, const struct io_uring_sqe **sqe)
{
 unsigned mask = ctx->sq_entries - 1;
 unsigned head = ctx->cached_sq_head++ & mask;

 if (static_branch_unlikely(&io_key_has_sqarray) &&
     (!(ctx->flags & IORING_SETUP_NO_SQARRAY))) {
  head = READ_ONCE(ctx->sq_array[head]);
  if (unlikely(head >= ctx->sq_entries)) {
   WRITE_ONCE(ctx->rings->sq_dropped,
       READ_ONCE(ctx->rings->sq_dropped) + 1);
   return false;
  }
  head = array_index_nospec(head, ctx->sq_entries);
 }

 /*
 * The cached sq head (or cq tail) serves two purposes:
 *
 * 1) allows us to batch the cost of updating the user visible
 *    head updates.
 * 2) allows the kernel side to track the head on its own, even
 *    though the application is the one updating it.
 */


 /* double index for 128-byte SQEs, twice as long */
 if (ctx->flags & IORING_SETUP_SQE128)
  head <<= 1;
 *sqe = &ctx->sq_sqes[head];
 return true;
}

int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
 __must_hold(&ctx->uring_lock)
{
 unsigned int entries = io_sqring_entries(ctx);
 unsigned int left;
 int ret;

 if (unlikely(!entries))
  return 0;
 /* make sure SQ entry isn't read before tail */
 ret = left = min(nr, entries);
 io_get_task_refs(left);
 io_submit_state_start(&ctx->submit_state, left);

 do {
  const struct io_uring_sqe *sqe;
  struct io_kiocb *req;

  if (unlikely(!io_alloc_req(ctx, &req)))
   break;
  if (unlikely(!io_get_sqe(ctx, &sqe))) {
   io_req_add_to_cache(req, ctx);
   break;
  }

  /*
 * Continue submitting even for sqe failure if the
 * ring was setup with IORING_SETUP_SUBMIT_ALL
 */

  if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
      !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
   left--;
   break;
  }
 } while (--left);

 if (unlikely(left)) {
  ret -= left;
  /* try again if it submitted nothing and can't allocate a req */
  if (!ret && io_req_cache_empty(ctx))
   ret = -EAGAIN;
  current->io_uring->cached_refs += left;
 }

 io_submit_state_end(ctx);
  /* Commit SQ ring head once we've consumed and submitted all SQEs */
 io_commit_sqring(ctx);
 return ret;
}

static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
       int wake_flags, void *key)
{
 struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);

 /*
 * Cannot safely flush overflowed CQEs from here, ensure we wake up
 * the task, and the next invocation will do it.
 */

 if (io_should_wake(iowq) || io_has_work(iowq->ctx))
  return autoremove_wake_function(curr, mode, wake_flags, key);
 return -1;
}

int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
 if (io_local_work_pending(ctx)) {
  __set_current_state(TASK_RUNNING);
  if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
   return 0;
 }
 if (io_run_task_work() > 0)
  return 0;
 if (task_sigpending(current))
  return -EINTR;
 return 0;
}

static bool current_pending_io(void)
{
 struct io_uring_task *tctx = current->io_uring;

 if (!tctx)
  return false;
 return percpu_counter_read_positive(&tctx->inflight);
}

--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=96 H=95 G=95

¤ Dauer der Verarbeitung: 0.45 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.