// SPDX-License-Identifier: GPL-2.0 /* * Shared application/kernel submission and completion ring pairs, for * supporting fast/efficient IO. * * A note on the read/write ordering memory barriers that are matched between * the application and kernel side. * * After the application reads the CQ ring tail, it must use an * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses * before writing the tail (using smp_load_acquire to read the tail will * do). It also needs a smp_mb() before updating CQ head (ordering the * entry load(s) with the head store), pairing with an implicit barrier * through a control-dependency in io_get_cqe (smp_store_release to * store head will do). Failure to do so could lead to reading invalid * CQ entries. * * Likewise, the application must use an appropriate smp_wmb() before * writing the SQ tail (ordering SQ entry stores with the tail store), * which pairs with smp_load_acquire in io_get_sqring (smp_store_release * to store the tail will do). And it needs a barrier ordering the SQ * head load before writing new SQ entries (smp_load_acquire to read * head will do). * * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after* * updating the SQ tail; a full memory barrier smp_mb() is needed * between. * * Also see the examples in the liburing library: * * git://git.kernel.dk/liburing * * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens * from data shared between the kernel and application. This is done both * for ordering purposes, but also to ensure that once a value is loaded from * data that the application could potentially modify, it remains stable. * * Copyright (C) 2018-2019 Jens Axboe * Copyright (c) 2018-2019 Christoph Hellwig
*/ #include <linux/kernel.h> #include <linux/init.h> #include <linux/errno.h> #include <linux/syscalls.h> #include <net/compat.h> #include <linux/refcount.h> #include <linux/uio.h> #include <linux/bits.h>
/* requests with any of those set should undergo io_disarm_next() */ #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
/* * No waiters. It's larger than any valid value of the tw counter * so that tests against ->cq_wait_nr would fail and skip wake_up().
*/ #define IO_CQ_WAKE_INIT (-1U) /* Forced wake up if there is a waiter regardless of ->cq_wait_nr */ #define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
/* * As io_match_task() but protected against racing with linked timeouts. * User must not hold timeout_lock.
*/ bool io_match_task_safe(struct io_kiocb *head, struct io_uring_task *tctx, bool cancel_all)
{ bool matched;
if (tctx && head->tctx != tctx) returnfalse; if (cancel_all) returntrue;
if (head->flags & REQ_F_LINK_TIMEOUT) { struct io_ring_ctx *ctx = head->ctx;
ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); if (!ctx) return NULL;
xa_init(&ctx->io_bl_xa);
/* * Use 5 bits less than the max cq entries, that should give us around * 32 entries per hash list if totally full and uniformly spread, but * don't keep too many buckets to not overconsume memory.
*/
hash_bits = ilog2(p->cq_entries) - 5;
hash_bits = clamp(hash_bits, 1, 8); if (io_alloc_hash_table(&ctx->cancel_table, hash_bits)) goto err; if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
0, GFP_KERNEL)) goto err;
staticvoid io_clean_op(struct io_kiocb *req)
{ if (unlikely(req->flags & REQ_F_BUFFER_SELECTED))
io_kbuf_drop_legacy(req);
if (req->flags & REQ_F_NEED_CLEANUP) { conststruct io_cold_def *def = &io_cold_defs[req->opcode];
if (def->cleanup)
def->cleanup(req);
} if (req->flags & REQ_F_INFLIGHT)
atomic_dec(&req->tctx->inflight_tracked); if (req->flags & REQ_F_CREDS)
put_cred(req->creds); if (req->flags & REQ_F_ASYNC_DATA) {
kfree(req->async_data);
req->async_data = NULL;
}
req->flags &= ~IO_REQ_CLEAN_FLAGS;
}
/* * Mark the request as inflight, so that file cancelation will find it. * Can be used if the file is an io_uring instance, or if the request itself * relies on ->mm being alive for the duration of the request.
*/ inlinevoid io_req_track_inflight(struct io_kiocb *req)
{ if (!(req->flags & REQ_F_INFLIGHT)) {
req->flags |= REQ_F_INFLIGHT;
atomic_inc(&req->tctx->inflight_tracked);
}
}
staticstruct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
{ if (WARN_ON_ONCE(!req->link)) return NULL;
/* init ->work of the whole link before punting */
io_prep_async_link(req);
/* * Not expected to happen, but if we do have a bug where this _can_ * happen, catch it here and ensure the request is marked as * canceled. That will make io-wq go through the usual work cancel * procedure rather than attempt to run this request (or create a new * worker for it).
*/ if (WARN_ON_ONCE(!same_thread_group(tctx->task, current)))
atomic_or(IO_WQ_WORK_CANCEL, &req->work.flags);
staticinlinevoid __io_cq_unlock_post(struct io_ring_ctx *ctx)
{
io_commit_cqring(ctx); if (!ctx->task_complete) { if (!ctx->lockless_cq)
spin_unlock(&ctx->completion_lock); /* IOPOLL rings only need to wake up if it's also SQPOLL */ if (!ctx->syscall_iopoll)
io_cqring_wake(ctx);
}
io_commit_cqring_flush(ctx);
}
if (!dying) { if (!io_get_cqe_overflow(ctx, &cqe, true)) break;
memcpy(cqe, &ocqe->cqe, cqe_size);
}
list_del(&ocqe->list);
kfree(ocqe);
/* * For silly syzbot cases that deliberately overflow by huge * amounts, check if we need to resched and drop and * reacquire the locks if so. Nothing real would ever hit this. * Ideally we'd have a non-posting unlock for this, but hard * to care for a non-real case.
*/ if (need_resched()) {
ctx->cqe_sentinel = ctx->cqe_cached;
io_cq_unlock_post(ctx);
mutex_unlock(&ctx->uring_lock);
cond_resched();
mutex_lock(&ctx->uring_lock);
io_cq_lock(ctx);
}
}
if (list_empty(&ctx->cq_overflow_list)) {
clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
}
io_cq_unlock_post(ctx);
}
staticvoid io_cqring_overflow_kill(struct io_ring_ctx *ctx)
{ if (ctx->rings)
__io_cqring_overflow_flush(ctx, true);
}
/* must to be called somewhat shortly after putting a request */ staticinlinevoid io_put_task(struct io_kiocb *req)
{ struct io_uring_task *tctx = req->tctx;
if (likely(tctx->task == current)) {
tctx->cached_refs++;
} else {
percpu_counter_sub(&tctx->inflight, 1); if (unlikely(atomic_read(&tctx->in_cancel)))
wake_up(&tctx->wait);
put_task_struct(tctx->task);
}
}
/* * If we're in ring overflow flush mode, or in task cancel mode, * or cannot allocate an overflow entry, then we need to drop it * on the floor.
*/
WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq); returnfalse;
} if (list_empty(&ctx->cq_overflow_list)) {
set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
/* * writes to the cq entry need to come after reading head; the * control dependency is enough as we're using WRITE_ONCE to * fill the cq entry
*/ bool io_cqe_cache_refill(struct io_ring_ctx *ctx, bool overflow)
{ struct io_rings *rings = ctx->rings; unsignedint off = ctx->cached_cq_tail & (ctx->cq_entries - 1); unsignedint free, queued, len;
/* * Posting into the CQ when there are pending overflowed CQEs may break * ordering guarantees, which will affect links, F_MORE users and more. * Force overflow the completion.
*/ if (!overflow && (ctx->check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))) returnfalse;
/* userspace may cheat modifying the tail, be safe and do min */
queued = min(__io_cqring_events(ctx), ctx->cq_entries);
free = ctx->cq_entries - queued; /* we need a contiguous range, limit based on the current array offset */
len = min(free, ctx->cq_entries - off); if (!len) returnfalse;
if (ctx->flags & IORING_SETUP_CQE32) {
off <<= 1;
len <<= 1;
}
/* * Must be called from inline task_work so we now a flush will happen later, * and obviously with ctx->uring_lock held (tw always has that).
*/ void io_add_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
{
lockdep_assert_held(&ctx->uring_lock);
lockdep_assert(ctx->lockless_cq);
/* * A helper for multishot requests posting additional CQEs. * Should only be used from a task_work including IO_URING_F_MULTISHOT.
*/ bool io_req_post_cqe(struct io_kiocb *req, s32 res, u32 cflags)
{ struct io_ring_ctx *ctx = req->ctx; bool posted;
/* * If multishot has already posted deferred completions, ensure that * those are flushed first before posting this one. If not, CQEs * could get reordered.
*/ if (!wq_list_empty(&ctx->submit_state.compl_reqs))
__io_submit_flush_completions(ctx);
/* * A helper for multishot requests posting additional CQEs. * Should only be used from a task_work including IO_URING_F_MULTISHOT.
*/ bool io_req_post_cqe32(struct io_kiocb *req, struct io_uring_cqe cqe[2])
{ struct io_ring_ctx *ctx = req->ctx; bool posted;
/* * All execution paths but io-wq use the deferred completions by * passing IO_URING_F_COMPLETE_DEFER and thus should not end up here.
*/ if (WARN_ON_ONCE(!(issue_flags & IO_URING_F_IOWQ))) return;
/* * Handle special CQ sync cases via task_work. DEFER_TASKRUN requires * the submitter task context, IOPOLL protects with uring_lock.
*/ if (ctx->lockless_cq || (req->flags & REQ_F_REISSUE)) {
defer_complete:
req->io_task_work.func = io_req_task_complete;
io_req_task_work_add(req); return;
}
io_cq_lock(ctx); if (!(req->flags & REQ_F_CQE_SKIP))
completed = io_fill_cqe_req(ctx, req);
io_cq_unlock_post(ctx);
if (!completed) goto defer_complete;
/* * We don't free the request here because we know it's called from * io-wq only, which holds a reference, so it cannot be the last put.
*/
req_ref_put(req);
}
/* * A request might get retired back into the request caches even before opcode * handlers and io_issue_sqe() are done with it, e.g. inline completion path. * Because of that, io_alloc_req() should be called only under ->uring_lock * and with extra caution to not get a request that is still worked on.
*/
__cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
__must_hold(&ctx->uring_lock)
{
gfp_t gfp = GFP_KERNEL | __GFP_NOWARN | __GFP_ZERO; void *reqs[IO_REQ_ALLOC_BATCH]; int ret;
ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
/* * Bulk alloc is all-or-nothing. If we fail to get a batch, * retry single alloc to be on the safe side.
*/ if (unlikely(ret <= 0)) {
reqs[0] = kmem_cache_alloc(req_cachep, gfp); if (!reqs[0]) returnfalse;
ret = 1;
}
/* * If LINK is set, we have dependent requests in this chain. If we * didn't fail this request, queue the first one up, moving any other * dependencies to the next request. In case of failure, fail the rest * of the chain.
*/ if (unlikely(req->flags & IO_DISARM_MASK))
__io_req_find_next_prep(req);
nxt = req->link;
req->link = NULL; return nxt;
}
staticvoid ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
{ if (!ctx) return; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
/* * Run queued task_work, returning the number of entries processed in *count. * If more entries than max_entries are available, stop processing once this * is reached and return the rest of the list.
*/ struct llist_node *io_handle_tw_list(struct llist_node *node, unsignedint *count, unsignedint max_entries)
{ struct io_ring_ctx *ctx = NULL; struct io_tw_state ts = { };
/* See comment above IO_CQ_WAKE_INIT */
BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES);
/* * We don't know how many reuqests is there in the link and whether * they can even be queued lazily, fall back to non-lazy.
*/ if (req->flags & IO_REQ_LINK_FLAGS)
flags &= ~IOU_F_TWQ_LAZY_WAKE;
guard(rcu)();
head = READ_ONCE(ctx->work_llist.first); do {
nr_tw_prev = 0; if (head) { struct io_kiocb *first_req = container_of(head, struct io_kiocb,
io_task_work.node); /* * Might be executed at any moment, rely on * SLAB_TYPESAFE_BY_RCU to keep it alive.
*/
nr_tw_prev = READ_ONCE(first_req->nr_tw);
}
/* * Theoretically, it can overflow, but that's fine as one of * previous adds should've tried to wake the task.
*/
nr_tw = nr_tw_prev + 1; if (!(flags & IOU_F_TWQ_LAZY_WAKE))
nr_tw = IO_CQ_WAKE_FORCE;
/* * cmpxchg implies a full barrier, which pairs with the barrier * in set_current_state() on the io_cqring_wait() side. It's used * to ensure that either we see updated ->cq_wait_nr, or waiters * going to sleep will observe the work added to the list, which * is similar to the wait/wawke task state sync.
*/
if (!head) { if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); if (ctx->has_evfd)
io_eventfd_signal(ctx, false);
}
nr_wait = atomic_read(&ctx->cq_wait_nr); /* not enough or no one is waiting */ if (nr_tw < nr_wait) return; /* the previous add has already woken it up */ if (nr_tw_prev >= nr_wait) return;
wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}
staticbool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, int min_events)
{ if (!io_local_work_pending(ctx)) returnfalse; if (events < min_events) returntrue; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); returnfalse;
}
staticint __io_run_local_work_loop(struct llist_node **node,
io_tw_token_t tw, int events)
{ int ret = 0;
staticint __io_run_local_work(struct io_ring_ctx *ctx, io_tw_token_t tw, int min_events, int max_events)
{ struct llist_node *node; unsignedint loops = 0; int ret = 0;
if (WARN_ON_ONCE(ctx->submitter_task != current)) return -EEXIST; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
again:
min_events -= ret;
ret = __io_run_local_work_loop(&ctx->retry_llist.first, tw, max_events); if (ctx->retry_llist.first) goto retry_done;
/* * llists are in reverse order, flip it back the right way before * running the pending items.
*/
node = llist_reverse_order(llist_del_all(&ctx->work_llist));
ret += __io_run_local_work_loop(&node, tw, max_events - ret);
ctx->retry_llist.first = node;
loops++;
if (io_run_local_work_continue(ctx, ret, min_events)) goto again;
retry_done:
io_submit_flush_completions(ctx); if (io_run_local_work_continue(ctx, ret, min_events)) goto again;
/* * Requests marked with REQUEUE should not post a CQE, they * will go through the io-wq retry machinery and post one * later.
*/ if (!(req->flags & (REQ_F_CQE_SKIP | REQ_F_REISSUE)) &&
unlikely(!io_fill_cqe_req(ctx, req))) { if (ctx->lockless_cq)
io_cqe_overflow(ctx, &req->cqe, &req->big_cqe); else
io_cqe_overflow_locked(ctx, &req->cqe, &req->big_cqe);
}
}
__io_cq_unlock_post(ctx);
if (!wq_list_empty(&state->compl_reqs)) {
io_free_batch_list(ctx, state->compl_reqs.first);
INIT_WQ_LIST(&state->compl_reqs);
}
if (unlikely(ctx->drain_active))
io_queue_deferred(ctx);
ctx->submit_state.cq_flush = false;
}
staticunsigned io_cqring_events(struct io_ring_ctx *ctx)
{ /* See comment at the top of this file */
smp_rmb(); return __io_cqring_events(ctx);
}
/* * We can't just wait for polled events to come to us, we have to actively * find and complete them.
*/ static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
{ if (!(ctx->flags & IORING_SETUP_IOPOLL)) return;
mutex_lock(&ctx->uring_lock); while (!wq_list_empty(&ctx->iopoll_list)) { /* let it sleep and repeat later if can't complete a request */ if (io_do_iopoll(ctx, true) == 0) break; /* * Ensure we allow local-to-the-cpu processing to take place, * in this case we need to ensure that we reap all events. * Also let task_work, etc. to progress by releasing the mutex
*/ if (need_resched()) {
mutex_unlock(&ctx->uring_lock);
cond_resched();
mutex_lock(&ctx->uring_lock);
}
}
mutex_unlock(&ctx->uring_lock);
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
io_move_task_work_from_local(ctx);
}
check_cq = READ_ONCE(ctx->check_cq); if (unlikely(check_cq)) { if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
__io_cqring_overflow_flush(ctx, false); /* * Similarly do not spin if we have not informed the user of any * dropped CQE.
*/ if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) return -EBADR;
} /* * Don't enter poll loop if we already have events pending. * If we do, we can potentially be spinning for commands that * already triggered a CQE (eg in error).
*/ if (io_cqring_events(ctx)) return 0;
do { int ret = 0;
/* * If a submit got punted to a workqueue, we can have the * application entering polling for a command before it gets * issued. That app will hold the uring_lock for the duration * of the poll right here, so we need to take a breather every * now and then to ensure that the issue has a chance to add * the poll to the issued list. Otherwise we can spin here * forever, while the workqueue is stuck trying to acquire the * very same mutex.
*/ if (wq_list_empty(&ctx->iopoll_list) ||
io_task_work_pending(ctx)) {
u32 tail = ctx->cached_cq_tail;
(void) io_run_local_work_locked(ctx, min_events);
if (task_work_pending(current) ||
wq_list_empty(&ctx->iopoll_list)) {
mutex_unlock(&ctx->uring_lock);
io_run_task_work();
mutex_lock(&ctx->uring_lock);
} /* some requests don't go through iopoll_list */ if (tail != ctx->cached_cq_tail ||
wq_list_empty(&ctx->iopoll_list)) break;
}
ret = io_do_iopoll(ctx, !min_events); if (unlikely(ret < 0)) return ret;
if (task_sigpending(current)) return -EINTR; if (need_resched()) break;
nr_events += ret;
} while (nr_events < min_events);
/* * After the iocb has been issued, it's safe to be found on the poll list. * Adding the kiocb to the list AFTER submission ensures that we don't * find it from a io_do_iopoll() thread before the issuer is done * accessing the kiocb cookie.
*/ staticvoid io_iopoll_req_issued(struct io_kiocb *req, unsignedint issue_flags)
{ struct io_ring_ctx *ctx = req->ctx; constbool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
/* workqueue context doesn't hold uring_lock, grab it now */ if (unlikely(needs_lock))
mutex_lock(&ctx->uring_lock);
/* * Track whether we have multiple files in our lists. This will impact * how we do polling eventually, not spinning if we're on potentially * different devices.
*/ if (wq_list_empty(&ctx->iopoll_list)) {
ctx->poll_multi_queue = false;
} elseif (!ctx->poll_multi_queue) { struct io_kiocb *list_req;
/* * For fast devices, IO may have already completed. If it has, add * it to the front so we find it first.
*/ if (READ_ONCE(req->iopoll_completed))
wq_list_add_head(&req->comp_list, &ctx->iopoll_list); else
wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
if (unlikely(needs_lock)) { /* * If IORING_SETUP_SQPOLL is enabled, sqes are either handle * in sq thread task context or in io worker task context. If * current task context is sq thread, we don't need to check * whether should wake up sq thread.
*/ if ((ctx->flags & IORING_SETUP_SQPOLL) &&
wq_has_sleeper(&ctx->sq_data->wait))
wake_up(&ctx->sq_data->wait);
mutex_unlock(&ctx->uring_lock);
}
}
io_req_flags_t io_file_get_flags(struct file *file)
{
io_req_flags_t res = 0;
if (S_ISREG(file_inode(file)->i_mode))
res |= REQ_F_ISREG; if ((file->f_flags & O_NONBLOCK) || (file->f_mode & FMODE_NOWAIT))
res |= REQ_F_SUPPORT_NOWAIT; return res;
}
if (unlikely(!io_assign_file(req, def, issue_flags))) return -EBADF;
ret = __io_issue_sqe(req, issue_flags, def);
if (ret == IOU_COMPLETE) { if (issue_flags & IO_URING_F_COMPLETE_DEFER)
io_req_complete_defer(req); else
io_req_complete_post(req, issue_flags);
return 0;
}
if (ret == IOU_ISSUE_SKIP_COMPLETE) {
ret = 0;
/* If the op doesn't have a file, we're not polling for it */ if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
io_iopoll_req_issued(req, issue_flags);
} return ret;
}
int io_poll_issue(struct io_kiocb *req, io_tw_token_t tw)
{ constunsignedint issue_flags = IO_URING_F_NONBLOCK |
IO_URING_F_MULTISHOT |
IO_URING_F_COMPLETE_DEFER; int ret;
io_tw_lock(req->ctx, tw);
WARN_ON_ONCE(!req->file); if (WARN_ON_ONCE(req->ctx->flags & IORING_SETUP_IOPOLL)) return -EFAULT;
ret = __io_issue_sqe(req, issue_flags, &io_issue_defs[req->opcode]);
/* one will be dropped by io_wq_free_work() after returning to io-wq */ if (!(req->flags & REQ_F_REFCOUNT))
__io_req_set_refcount(req, 2); else
req_ref_get(req);
/* either cancelled or io-wq is dying, so don't touch tctx->iowq */ if (atomic_read(&work->flags) & IO_WQ_WORK_CANCEL) {
fail:
io_req_task_queue_fail(req, err); return;
} if (!io_assign_file(req, def, issue_flags)) {
err = -EBADF;
atomic_or(IO_WQ_WORK_CANCEL, &work->flags); goto fail;
}
/* * If DEFER_TASKRUN is set, it's only allowed to post CQEs from the * submitter task context. Final request completions are handed to the * right context, however this is not the case of auxiliary CQEs, * which is the main mean of operation for multishot requests. * Don't allow any multishot execution from io-wq. It's more restrictive * than necessary and also cleaner.
*/ if (req->flags & (REQ_F_MULTISHOT|REQ_F_APOLL_MULTISHOT)) {
err = -EBADFD; if (!io_file_can_poll(req)) goto fail; if (req->file->f_flags & O_NONBLOCK ||
req->file->f_mode & FMODE_NOWAIT) {
err = -ECANCELED; if (io_arm_poll_handler(req, issue_flags) != IO_APOLL_OK) goto fail; return;
} else {
req->flags &= ~(REQ_F_APOLL_MULTISHOT|REQ_F_MULTISHOT);
}
}
do {
ret = io_issue_sqe(req, issue_flags); if (ret != -EAGAIN) break;
/* * If REQ_F_NOWAIT is set, then don't wait or retry with * poll. -EAGAIN is final for that case.
*/ if (req->flags & REQ_F_NOWAIT) break;
/* * We can get EAGAIN for iopolled IO even though we're * forcing a sync submission from here, since we can't * wait for request slots on the block side.
*/ if (!needs_poll) { if (!(req->ctx->flags & IORING_SETUP_IOPOLL)) break; if (io_wq_worker_stopped()) break;
cond_resched(); continue;
}
if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK) return; /* aborted or ready, in either case retry blocking */
needs_poll = false;
issue_flags &= ~IO_URING_F_NONBLOCK;
} while (1);
/* avoid locking problems by failing it from a clean context */ if (ret)
io_req_task_queue_fail(req, ret);
}
/* * We async punt it if the file wasn't marked NOWAIT, or if the file * doesn't support non-blocking read/write attempts
*/ if (unlikely(ret))
io_queue_async(req, issue_flags, ret);
}
staticvoid io_queue_sqe_fallback(struct io_kiocb *req)
__must_hold(&req->ctx->uring_lock)
{ if (unlikely(req->flags & REQ_F_FAIL)) { /* * We don't submit, fail them all, for that replace hardlinks * with normal links. Extra REQ_F_LINK is tolerated.
*/
req->flags &= ~REQ_F_HARDLINK;
req->flags |= REQ_F_LINK;
io_req_defer_failed(req, req->cqe.res);
} else { /* can't fail with IO_URING_F_INLINE */
io_req_sqe_copy(req, IO_URING_F_INLINE); if (unlikely(req->ctx->drain_active))
io_drain_req(req); else
io_queue_iowq(req);
}
}
/* * Check SQE restrictions (opcode and flags). * * Returns 'true' if SQE is allowed, 'false' otherwise.
*/ staticinlinebool io_check_restriction(struct io_ring_ctx *ctx, struct io_kiocb *req, unsignedint sqe_flags)
{ if (!test_bit(req->opcode, ctx->restrictions.sqe_op)) returnfalse;
if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
ctx->restrictions.sqe_flags_required) returnfalse;
if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
ctx->restrictions.sqe_flags_required)) returnfalse;
ctx->drain_active = true; if (head) { /* * If we need to drain a request in the middle of a link, drain * the head request and the next request/link after the current * link. Considering sequential execution of links, * REQ_F_IO_DRAIN will be maintained for every request of our * link.
*/
head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
ctx->drain_next = true;
}
}
static __cold int io_init_fail_req(struct io_kiocb *req, int err)
{ /* ensure per-opcode data is cleared if we fail before prep */
memset(&req->cmd.data, 0, sizeof(req->cmd.data)); return err;
}
def = &io_issue_defs[opcode]; if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) { /* enforce forwards compatibility on users */ if (sqe_flags & ~SQE_VALID_FLAGS) return io_init_fail_req(req, -EINVAL); if (sqe_flags & IOSQE_BUFFER_SELECT) { if (!def->buffer_select) return io_init_fail_req(req, -EOPNOTSUPP);
req->buf_index = READ_ONCE(sqe->buf_group);
} if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
ctx->drain_disabled = true; if (sqe_flags & IOSQE_IO_DRAIN) { if (ctx->drain_disabled) return io_init_fail_req(req, -EOPNOTSUPP);
io_init_drain(ctx);
}
} if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) { if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags)) return io_init_fail_req(req, -EACCES); /* knock it to the slow queue path, will be drained there */ if (ctx->drain_active)
req->flags |= REQ_F_FORCE_ASYNC; /* if there is no link, we're at "next" request and need to drain */ if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
ctx->drain_next = false;
ctx->drain_active = true;
req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
}
}
if (!def->ioprio && sqe->ioprio) return io_init_fail_req(req, -EINVAL); if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL)) return io_init_fail_req(req, -EINVAL);
if (def->needs_file) { struct io_submit_state *state = &ctx->submit_state;
req->cqe.fd = READ_ONCE(sqe->fd);
/* * Plug now if we have more than 2 IO left after this, and the * target is potentially a read/write to block based storage.
*/ if (state->need_plug && def->plug) {
state->plug_started = true;
state->need_plug = false;
blk_start_plug_nr_ios(&state->plug, state->submit_nr);
}
}
personality = READ_ONCE(sqe->personality); if (personality) { int ret;
req->creds = xa_load(&ctx->personalities, personality); if (!req->creds) return io_init_fail_req(req, -EINVAL);
get_cred(req->creds);
ret = security_uring_override_creds(req->creds); if (ret) {
put_cred(req->creds); return io_init_fail_req(req, ret);
}
req->flags |= REQ_F_CREDS;
}
/* * Avoid breaking links in the middle as it renders links with SQPOLL * unusable. Instead of failing eagerly, continue assembling the link if * applicable and mark the head with REQ_F_FAIL. The link flushing code * should find the flag and handle the rest.
*/
req_fail_link_node(req, ret); if (head && !(head->flags & REQ_F_FAIL))
req_fail_link_node(head, -ECANCELED);
ret = io_init_req(ctx, req, sqe); if (unlikely(ret)) return io_submit_fail_init(sqe, req, ret);
trace_io_uring_submit_req(req);
/* * If we already have a head request, queue this one for async * submittal once the head completes. If we don't have a head but * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be * submitted sync once the chain is complete. If none of those * conditions are true (normal request), then just queue it.
*/ if (unlikely(link->head)) {
trace_io_uring_link(req, link->last);
io_req_sqe_copy(req, IO_URING_F_INLINE);
link->last->link = req;
link->last = req;
if (req->flags & IO_REQ_LINK_FLAGS) return 0; /* last request of the link, flush it */
req = link->head;
link->head = NULL; if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL)) goto fallback;
/* * Batched submission is done, ensure local IO is flushed out.
*/ staticvoid io_submit_state_end(struct io_ring_ctx *ctx)
{ struct io_submit_state *state = &ctx->submit_state;
if (unlikely(state->link.head))
io_queue_sqe_fallback(state->link.head); /* flush only after queuing links as they can generate completions */
io_submit_flush_completions(ctx); if (state->plug_started)
blk_finish_plug(&state->plug);
}
/* * Start submission side cache.
*/ staticvoid io_submit_state_start(struct io_submit_state *state, unsignedint max_ios)
{
state->plug_started = false;
state->need_plug = max_ios > 2;
state->submit_nr = max_ios; /* set only head, no need to init link_last in advance */
state->link.head = NULL;
}
/* * Ensure any loads from the SQEs are done at this point, * since once we write the new head, the application could * write new data to them.
*/
smp_store_release(&rings->sq.head, ctx->cached_sq_head);
}
/* * Fetch an sqe, if one is available. Note this returns a pointer to memory * that is mapped by userspace. This means that care needs to be taken to * ensure that reads are stable, as we cannot rely on userspace always * being a good citizen. If members of the sqe are validated and then later * used, it's important that those reads are done through READ_ONCE() to * prevent a re-load down the line.
*/ staticbool io_get_sqe(struct io_ring_ctx *ctx, conststruct io_uring_sqe **sqe)
{ unsigned mask = ctx->sq_entries - 1; unsigned head = ctx->cached_sq_head++ & mask;
if (static_branch_unlikely(&io_key_has_sqarray) &&
(!(ctx->flags & IORING_SETUP_NO_SQARRAY))) {
head = READ_ONCE(ctx->sq_array[head]); if (unlikely(head >= ctx->sq_entries)) {
WRITE_ONCE(ctx->rings->sq_dropped,
READ_ONCE(ctx->rings->sq_dropped) + 1); returnfalse;
}
head = array_index_nospec(head, ctx->sq_entries);
}
/* * The cached sq head (or cq tail) serves two purposes: * * 1) allows us to batch the cost of updating the user visible * head updates. * 2) allows the kernel side to track the head on its own, even * though the application is the one updating it.
*/
/* double index for 128-byte SQEs, twice as long */ if (ctx->flags & IORING_SETUP_SQE128)
head <<= 1;
*sqe = &ctx->sq_sqes[head]; returntrue;
}
int io_submit_sqes(struct io_ring_ctx *ctx, unsignedint nr)
__must_hold(&ctx->uring_lock)
{ unsignedint entries = io_sqring_entries(ctx); unsignedint left; int ret;
if (unlikely(!entries)) return 0; /* make sure SQ entry isn't read before tail */
ret = left = min(nr, entries);
io_get_task_refs(left);
io_submit_state_start(&ctx->submit_state, left);
do { conststruct io_uring_sqe *sqe; struct io_kiocb *req;
if (unlikely(!io_alloc_req(ctx, &req))) break; if (unlikely(!io_get_sqe(ctx, &sqe))) {
io_req_add_to_cache(req, ctx); break;
}
/* * Continue submitting even for sqe failure if the * ring was setup with IORING_SETUP_SUBMIT_ALL
*/ if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
!(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
left--; break;
}
} while (--left);
if (unlikely(left)) {
ret -= left; /* try again if it submitted nothing and can't allocate a req */ if (!ret && io_req_cache_empty(ctx))
ret = -EAGAIN;
current->io_uring->cached_refs += left;
}
io_submit_state_end(ctx); /* Commit SQ ring head once we've consumed and submitted all SQEs */
io_commit_sqring(ctx); return ret;
}
/* * Cannot safely flush overflowed CQEs from here, ensure we wake up * the task, and the next invocation will do it.
*/ if (io_should_wake(iowq) || io_has_work(iowq->ctx)) return autoremove_wake_function(curr, mode, wake_flags, key); return -1;
}
int io_run_task_work_sig(struct io_ring_ctx *ctx)
{ if (io_local_work_pending(ctx)) {
__set_current_state(TASK_RUNNING); if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0) return 0;
} if (io_run_task_work() > 0) return 0; if (task_sigpending(current)) return -EINTR; return 0;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.