struct io_poll_table { struct poll_table_struct pt; struct io_kiocb *req; int nr_entries; int error; bool owning; /* output value, set only if arm poll returns >0 */
__poll_t result_mask;
};
/* * We usually have 1-2 refs taken, 128 is more than enough and we want to * maximise the margin between this amount and the moment when it overflows.
*/ #define IO_POLL_REF_BIAS 128
#define IO_WQE_F_DOUBLE 1
staticint io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, void *key);
staticbool io_poll_get_ownership_slowpath(struct io_kiocb *req)
{ int v;
/* * poll_refs are already elevated and we don't have much hope for * grabbing the ownership. Instead of incrementing set a retry flag * to notify the loop that there might have been some change.
*/
v = atomic_fetch_or(IO_POLL_RETRY_FLAG, &req->poll_refs); if (v & IO_POLL_REF_MASK) returnfalse; return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
}
/* * If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can * bump it and acquire ownership. It's disallowed to modify requests while not * owning it, that prevents from races for enqueueing task_work's and b/w * arming poll and wakeups.
*/ staticinlinebool io_poll_get_ownership(struct io_kiocb *req)
{ if (unlikely(atomic_read(&req->poll_refs) >= IO_POLL_REF_BIAS)) return io_poll_get_ownership_slowpath(req); return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
}
staticvoid io_poll_remove_entries(struct io_kiocb *req)
{ /* * Nothing to do if neither of those flags are set. Avoid dipping * into the poll/apoll/double cachelines if we can.
*/ if (!(req->flags & (REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL))) return;
/* * While we hold the waitqueue lock and the waitqueue is nonempty, * wake_up_pollfree() will wait for us. However, taking the waitqueue * lock in the first place can race with the waitqueue being freed. * * We solve this as eventpoll does: by taking advantage of the fact that * all users of wake_up_pollfree() will RCU-delay the actual free. If * we enter rcu_read_lock() and see that the pointer to the queue is * non-NULL, we can then lock it without the memory being freed out from * under us. * * Keep holding rcu_read_lock() as long as we hold the queue lock, in * case the caller deletes the entry from the queue, leaving it empty. * In that case, only RCU prevents the queue memory from being freed.
*/
rcu_read_lock(); if (req->flags & REQ_F_SINGLE_POLL)
io_poll_remove_entry(io_poll_get_single(req)); if (req->flags & REQ_F_DOUBLE_POLL)
io_poll_remove_entry(io_poll_get_double(req));
rcu_read_unlock();
}
if (!(req->flags & REQ_F_POLL_NO_LAZY))
flags = IOU_F_TWQ_LAZY_WAKE;
__io_req_task_work_add(req, flags);
}
staticinlinevoid io_poll_execute(struct io_kiocb *req, int res)
{ if (io_poll_get_ownership(req))
__io_poll_execute(req, res);
}
/* * All poll tw should go through this. Checks for poll events, manages * references, does rewait, etc. * * Returns a negative error on failure. IOU_POLL_NO_ACTION when no action * require, which is either spurious wakeup or multishot CQE is served. * IOU_POLL_DONE when it's done with the request, then the mask is stored in * req->cqe.res. IOU_POLL_REMOVE_POLL_USE_RES indicates to remove multishot * poll and that the result is stored in req->cqe.
*/ staticint io_poll_check_events(struct io_kiocb *req, io_tw_token_t tw)
{ int v;
if (unlikely(io_should_terminate_tw(req->ctx))) return -ECANCELED;
do {
v = atomic_read(&req->poll_refs);
if (unlikely(v != 1)) { /* tw should be the owner and so have some refs */ if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK))) return IOU_POLL_NO_ACTION; if (v & IO_POLL_CANCEL_FLAG) return -ECANCELED; /* * cqe.res contains only events of the first wake up * and all others are to be lost. Redo vfs_poll() to get * up to date state.
*/ if ((v & IO_POLL_REF_MASK) != 1)
req->cqe.res = 0;
if (v & IO_POLL_RETRY_FLAG) {
req->cqe.res = 0; /* * We won't find new events that came in between * vfs_poll and the ref put unless we clear the * flag in advance.
*/
atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs);
v &= ~IO_POLL_RETRY_FLAG;
}
}
/* the mask was stashed in __io_poll_execute */ if (!req->cqe.res) { struct poll_table_struct pt = { ._key = req->apoll_events };
req->cqe.res = vfs_poll(req->file, &pt) & req->apoll_events; /* * We got woken with a mask, but someone else got to * it first. The above vfs_poll() doesn't add us back * to the waitqueue, so if we get nothing back, we * should be safe and attempt a reissue.
*/ if (unlikely(!req->cqe.res)) { /* Multishot armed need not reissue */ if (!(req->apoll_events & EPOLLONESHOT)) continue; return IOU_POLL_REISSUE;
}
} if (req->apoll_events & EPOLLONESHOT) return IOU_POLL_DONE;
/* multishot, just fill a CQE and proceed */ if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
__poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events);
if (!io_req_post_cqe(req, mask, IORING_CQE_F_MORE)) {
io_req_set_res(req, mask, 0); return IOU_POLL_REMOVE_POLL_USE_RES;
}
} else { int ret = io_poll_issue(req, tw);
if (ret == IOU_COMPLETE) return IOU_POLL_REMOVE_POLL_USE_RES; elseif (ret == IOU_REQUEUE) return IOU_POLL_REQUEUE; if (ret != IOU_RETRY && ret < 0) return ret;
}
/* force the next iteration to vfs_poll() */
req->cqe.res = 0;
/* * Release all references, retry if someone tried to restart * task_work while we were executing it.
*/
v &= IO_POLL_REF_MASK;
} while (atomic_sub_return(v, &req->poll_refs) & IO_POLL_REF_MASK);
io_napi_add(req); return IOU_POLL_NO_ACTION;
}
void io_poll_task_func(struct io_kiocb *req, io_tw_token_t tw)
{ int ret;
ret = io_poll_check_events(req, tw); if (ret == IOU_POLL_NO_ACTION) {
io_kbuf_recycle(req, 0); return;
} elseif (ret == IOU_POLL_REQUEUE) {
io_kbuf_recycle(req, 0);
__io_poll_execute(req, 0); return;
}
io_poll_remove_entries(req); /* task_work always has ->uring_lock held */
hash_del(&req->hash_node);
if (req->opcode == IORING_OP_POLL_ADD) { if (ret == IOU_POLL_DONE) { struct io_poll *poll;
static __cold int io_pollfree_wake(struct io_kiocb *req, struct io_poll *poll)
{
io_poll_mark_cancelled(req); /* we have to kick tw in case it's not already */
io_poll_execute(req, 0);
/* * If the waitqueue is being freed early but someone is already * holds ownership over it, we have to tear down the request as * best we can. That means immediately removing the request from * its waitqueue and preventing all further accesses to the * waitqueue via the request.
*/
list_del_init(&poll->wait.entry);
/* * Careful: this *must* be the last step, since as soon * as req->head is NULL'ed out, the request can be * completed and freed, since aio_poll_complete_work() * will no longer need to take the waitqueue lock.
*/
smp_store_release(&poll->head, NULL); return 1;
}
if (unlikely(mask & POLLFREE)) return io_pollfree_wake(req, poll);
/* for instances that support it check for an event match first */ if (mask && !(mask & (poll->events & ~IO_ASYNC_POLL_COMMON))) return 0;
if (io_poll_get_ownership(req)) { /* * If we trigger a multishot poll off our own wakeup path, * disable multishot as there is a circular dependency between * CQ posting and triggering the event.
*/ if (mask & EPOLL_URING_WAKE)
poll->events |= EPOLLONESHOT;
/* optional, saves extra locking for removal in tw handler */ if (mask && poll->events & EPOLLONESHOT) {
list_del_init(&poll->wait.entry);
poll->head = NULL; if (wqe_is_double(wait))
req->flags &= ~REQ_F_DOUBLE_POLL; else
req->flags &= ~REQ_F_SINGLE_POLL;
}
__io_poll_execute(req, mask);
} return 1;
}
/* fails only when polling is already completing by the first entry */ staticbool io_poll_double_prepare(struct io_kiocb *req)
{ struct wait_queue_head *head; struct io_poll *poll = io_poll_get_single(req);
/* head is RCU protected, see io_poll_remove_entries() comments */
rcu_read_lock();
head = smp_load_acquire(&poll->head); /* * poll arm might not hold ownership and so race for req->flags with * io_poll_wake(). There is only one poll entry queued, serialise with * it by taking its head lock. As we're still arming the tw hanlder * is not going to be run, so there are no races with it.
*/ if (head) {
spin_lock_irq(&head->lock);
req->flags |= REQ_F_DOUBLE_POLL; if (req->opcode == IORING_OP_POLL_ADD)
req->flags |= REQ_F_ASYNC_DATA;
spin_unlock_irq(&head->lock);
}
rcu_read_unlock(); return !!head;
}
/* * The file being polled uses multiple waitqueues for poll handling * (e.g. one for read, one for write). Setup a separate io_poll * if this happens.
*/ if (unlikely(pt->nr_entries)) { struct io_poll *first = poll;
/* double add on the same waitqueue head, ignore */ if (first->head == head) return; /* already have a 2nd entry, fail a third attempt */ if (*poll_ptr) { if ((*poll_ptr)->head == head) return;
pt->error = -EINVAL; return;
}
/* mark as double wq entry */
wqe_private |= IO_WQE_F_DOUBLE;
io_init_poll_iocb(poll, first->events); if (!io_poll_double_prepare(req)) { /* the request is completing, just back off */
kfree(poll); return;
}
*poll_ptr = poll;
} else { /* fine to modify, there is no poll queued to race with us */
req->flags |= REQ_F_SINGLE_POLL;
}
/* * Returns 0 when it's handed over for polling. The caller owns the requests if * it returns non-zero, but otherwise should not touch it. Negative values * contain an error code. When the result is >0, the polling has completed * inline and ipt.result_mask is set to the mask.
*/ staticint __io_arm_poll_handler(struct io_kiocb *req, struct io_poll *poll, struct io_poll_table *ipt, __poll_t mask, unsigned issue_flags)
{
INIT_HLIST_NODE(&req->hash_node);
io_init_poll_iocb(poll, mask);
poll->file = req->file;
req->apoll_events = poll->events;
ipt->pt._key = mask;
ipt->req = req;
ipt->error = 0;
ipt->nr_entries = 0; /* * Polling is either completed here or via task_work, so if we're in the * task context we're naturally serialised with tw by merit of running * the same task. When it's io-wq, take the ownership to prevent tw * from running. However, when we're in the task context, skip taking * it as an optimisation. * * Note: even though the request won't be completed/freed, without * ownership we still can race with io_poll_wake(). * io_poll_can_finish_inline() tries to deal with that.
*/
ipt->owning = issue_flags & IO_URING_F_UNLOCKED;
atomic_set(&req->poll_refs, (int)ipt->owning);
/* * Exclusive waits may only wake a limited amount of entries * rather than all of them, this may interfere with lazy * wake if someone does wait(events > 1). Ensure we don't do * lazy wake for those, as we need to process each one as they * come in.
*/ if (poll->events & EPOLLEXCLUSIVE)
req->flags |= REQ_F_POLL_NO_LAZY;
if (mask &&
((poll->events & (EPOLLET|EPOLLONESHOT)) == (EPOLLET|EPOLLONESHOT))) { if (!io_poll_can_finish_inline(req, ipt)) {
io_poll_add_hash(req, issue_flags); return 0;
}
io_poll_remove_entries(req);
ipt->result_mask = mask; /* no one else has access to the req, forget about the ref */ return 1;
}
if (ipt->owning) { /* * Try to release ownership. If we see a change of state, e.g. * poll was waken up, queue up a tw, it'll deal with it.
*/ if (atomic_cmpxchg(&req->poll_refs, 1, 0) != 1)
__io_poll_execute(req, 0);
} return 0;
}
/* * We can't reliably detect loops in repeated poll triggers and issue * subsequently failing. But rather than fail these immediately, allow a * certain amount of retries before we give up. Given that this condition * should _rarely_ trigger even once, we should be fine with a larger value.
*/ #define APOLL_MAX_RETRY 128
if (!def->pollin && !def->pollout) return IO_APOLL_ABORTED; if (!io_file_can_poll(req)) return IO_APOLL_ABORTED;
if (def->pollin) {
mask |= EPOLLIN | EPOLLRDNORM;
/* If reading from MSG_ERRQUEUE using recvmsg, ignore POLLIN */ if (req->flags & REQ_F_CLEAR_POLLIN)
mask &= ~EPOLLIN;
} else {
mask |= EPOLLOUT | EPOLLWRNORM;
} if (def->poll_exclusive)
mask |= EPOLLEXCLUSIVE;
return io_arm_apoll(req, issue_flags, mask);
}
/* * Returns true if we found and killed one or more poll requests
*/
__cold bool io_poll_remove_all(struct io_ring_ctx *ctx, struct io_uring_task *tctx, bool cancel_all)
{ unsigned nr_buckets = 1U << ctx->cancel_table.hash_bits; struct hlist_node *tmp; struct io_kiocb *req; bool found = false; int i;
lockdep_assert_held(&ctx->uring_lock);
for (i = 0; i < nr_buckets; i++) { struct io_hash_bucket *hb = &ctx->cancel_table.hbs[i];
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.