/* * Allow one bg request per queue, ignoring global fc limits. * This prevents a single queue from consuming all resources and * eliminates the need for remote queue wake-ups when global * limits are met but this queue has no more waiting requests.
*/ while ((fc->active_background < fc->max_background ||
!queue->active_background) &&
(!list_empty(&queue->fuse_req_bg_queue))) { struct fuse_req *req;
/* Abort all list queued request on the given ring queue */ staticvoid fuse_uring_abort_end_queue_requests(struct fuse_ring_queue *queue)
{ struct fuse_req *req;
LIST_HEAD(req_list);
spin_lock(&fc->lock); if (fc->ring) { /* race, another thread created the ring in the meantime */
spin_unlock(&fc->lock);
res = fc->ring; goto out_err;
}
/* * The entry must not be freed immediately, due to access of direct * pointer access of entries through IO_URING_F_CANCEL - there is a risk * of race between daemon termination (which triggers IO_URING_F_CANCEL * and accesses entries without checking the list state first
*/
list_move(&ent->list, &queue->ent_released);
ent->state = FRRS_RELEASED;
spin_unlock(&queue->lock);
if (cmd)
io_uring_cmd_done(cmd, -ENOTCONN, 0, IO_URING_F_UNLOCKED);
/* * Some ring entries might be in the middle of IO operations, * i.e. in process to get handled by file_operations::uring_cmd * or on the way to userspace - we could handle that with conditions in * run time code, but easier/cleaner to have an async tear down handler * If there are still queue references left
*/ if (atomic_read(&ring->queue_refs) > 0) { if (time_after(jiffies,
ring->teardown_time + FUSE_URING_TEARDOWN_TIMEOUT))
fuse_uring_log_ent_state(ring);
/* * Handle IO_URING_F_CANCEL, typically should come on daemon termination. * * Releasing the last entry should trigger fuse_dev_release() if * the daemon was terminated
*/ staticvoid fuse_uring_cancel(struct io_uring_cmd *cmd, unsignedint issue_flags)
{ struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd); struct fuse_ring_queue *queue; bool need_cmd_done = false;
/* * direct access on ent - it must not be destructed as long as * IO_URING_F_CANCEL might come up
*/
queue = ent->queue;
spin_lock(&queue->lock); if (ent->state == FRRS_AVAILABLE) {
ent->state = FRRS_USERSPACE;
list_move_tail(&ent->list, &queue->ent_in_userspace);
need_cmd_done = true;
ent->cmd = NULL;
}
spin_unlock(&queue->lock);
if (need_cmd_done) { /* no queue lock to avoid lock order issues */
io_uring_cmd_done(cmd, -ENOTCONN, 0, issue_flags);
}
}
/* * Checks for errors and stores it into the request
*/ staticint fuse_uring_out_header_has_err(struct fuse_out_header *oh, struct fuse_req *req, struct fuse_conn *fc)
{ int err;
err = -EINVAL; if (oh->unique == 0) { /* Not supported through io-uring yet */
pr_warn_once("notify through fuse-io-uring not supported\n"); goto err;
}
if (oh->error <= -ERESTARTSYS || oh->error > 0) goto err;
/* * Is it an interrupt reply ID? * XXX: Not supported through fuse-io-uring yet, it should not even * find the request - should not happen.
*/
WARN_ON_ONCE(oh->unique & FUSE_INT_REQ_BIT);
if (num_args > 0) { /* * Expectation is that the first argument is the per op header. * Some op code have that as zero size.
*/ if (args->in_args[0].size > 0) {
err = copy_to_user(&ent->headers->op_in, in_args->value,
in_args->size); if (err) {
pr_info_ratelimited( "Copying the header failed.\n"); return -EFAULT;
}
}
in_args++;
num_args--;
}
/* * Write data to the ring buffer and send the request to userspace, * userspace will read it * This is comparable with classical read(/dev/fuse)
*/ staticint fuse_uring_send_next_to_ring(struct fuse_ring_ent *ent, struct fuse_req *req, unsignedint issue_flags)
{ struct fuse_ring_queue *queue = ent->queue; int err; struct io_uring_cmd *cmd;
err = fuse_uring_prepare_send(ent, req); if (err) return err;
/* Fetch the next fuse request if available */ staticstruct fuse_req *fuse_uring_ent_assign_req(struct fuse_ring_ent *ent)
__must_hold(&queue->lock)
{ struct fuse_req *req; struct fuse_ring_queue *queue = ent->queue; struct list_head *req_queue = &queue->fuse_req_queue;
lockdep_assert_held(&queue->lock);
/* get and assign the next entry while it is still holding the lock */
req = list_first_entry_or_null(req_queue, struct fuse_req, list); if (req)
fuse_uring_add_req_to_ring_ent(ent, req);
return req;
}
/* * Read data from the ring buffer, which user space has written to * This is comparible with handling of classical write(/dev/fuse). * Also make the ring request available again for new fuse requests.
*/ staticvoid fuse_uring_commit(struct fuse_ring_ent *ent, struct fuse_req *req, unsignedint issue_flags)
{ struct fuse_ring *ring = ent->queue->ring; struct fuse_conn *fc = ring->fc;
ssize_t err = 0;
/* * Get the next fuse req and send it
*/ staticvoid fuse_uring_next_fuse_req(struct fuse_ring_ent *ent, struct fuse_ring_queue *queue, unsignedint issue_flags)
{ int err; struct fuse_req *req;
queue = ring->queues[qid]; if (!queue) return err;
fpq = &queue->fpq;
if (!READ_ONCE(fc->connected) || READ_ONCE(queue->stopped)) return err;
spin_lock(&queue->lock); /* Find a request based on the unique ID of the fuse request * This should get revised, as it needs a hash calculation and list * search. And full struct fuse_pqueue is needed (memory overhead). * As well as the link from req to ring_ent.
*/
req = fuse_request_find(fpq, commit_id);
err = -ENOENT; if (!req) {
pr_info("qid=%d commit_id %llu not found\n", queue->qid,
commit_id);
spin_unlock(&queue->lock); return err;
}
list_del_init(&req->list);
ent = req->ring_entry;
req->ring_entry = NULL;
/* without the queue lock, as other locks are taken */
fuse_uring_prepare_cancel(cmd, issue_flags, ent);
fuse_uring_commit(ent, req, issue_flags);
/* * Fetching the next request is absolutely required as queued * fuse requests would otherwise not get processed - committing * and fetching is done in one step vs legacy fuse, which has separated * read (fetch request) and write (commit result).
*/
fuse_uring_next_fuse_req(ent, queue, issue_flags); return 0;
}
staticbool is_ring_ready(struct fuse_ring *ring, int current_qid)
{ int qid; struct fuse_ring_queue *queue; bool ready = true;
for (qid = 0; qid < ring->nr_queues && ready; qid++) { if (current_qid == qid) continue;
/* * sqe->addr is a ptr to an iovec array, iov[0] has the headers, iov[1] * the payload
*/ staticint fuse_uring_get_iovec_from_sqe(conststruct io_uring_sqe *sqe, struct iovec iov[FUSE_URING_IOV_SEGS])
{ struct iovec __user *uiov = u64_to_user_ptr(READ_ONCE(sqe->addr)); struct iov_iter iter;
ssize_t ret;
if (sqe->len != FUSE_URING_IOV_SEGS) return -EINVAL;
/* * Direction for buffer access will actually be READ and WRITE, * using write for the import should include READ access as well.
*/
ret = import_iovec(WRITE, uiov, FUSE_URING_IOV_SEGS,
FUSE_URING_IOV_SEGS, &iov, &iter); if (ret < 0) return ret;
err = fuse_uring_get_iovec_from_sqe(cmd->sqe, iov); if (err) {
pr_info_ratelimited("Failed to get iovec from sqe, err=%d\n",
err); return ERR_PTR(err);
}
err = -EINVAL; if (iov[0].iov_len < sizeof(struct fuse_uring_req_header)) {
pr_info_ratelimited("Invalid header len %zu\n", iov[0].iov_len); return ERR_PTR(err);
}
payload_size = iov[1].iov_len; if (payload_size < ring->max_payload_sz) {
pr_info_ratelimited("Invalid req payload len %zu\n",
payload_size); return ERR_PTR(err);
}
err = -ENOMEM;
ent = kzalloc(sizeof(*ent), GFP_KERNEL_ACCOUNT); if (!ent) return ERR_PTR(err);
/* * Register header and payload buffer with the kernel and puts the * entry as "ready to get fuse requests" on the queue
*/ staticint fuse_uring_register(struct io_uring_cmd *cmd, unsignedint issue_flags, struct fuse_conn *fc)
{ conststruct fuse_uring_cmd_req *cmd_req = io_uring_sqe_cmd(cmd->sqe); struct fuse_ring *ring = smp_load_acquire(&fc->ring); struct fuse_ring_queue *queue; struct fuse_ring_ent *ent; int err; unsignedint qid = READ_ONCE(cmd_req->qid);
err = -ENOMEM; if (!ring) {
ring = fuse_uring_create(fc); if (!ring) return err;
}
if (qid >= ring->nr_queues) {
pr_info_ratelimited("fuse: Invalid ring qid %u\n", qid); return -EINVAL;
}
queue = ring->queues[qid]; if (!queue) {
queue = fuse_uring_create_queue(ring, qid); if (!queue) return err;
}
/* * The created queue above does not need to be destructed in * case of entry errors below, will be done at ring destruction time.
*/
ent = fuse_uring_create_ring_ent(cmd, queue); if (IS_ERR(ent)) return PTR_ERR(ent);
fuse_uring_do_register(ent, cmd, issue_flags);
return 0;
}
/* * Entry function from io_uring to handle the given passthrough command * (op code IORING_OP_URING_CMD)
*/ int fuse_uring_cmd(struct io_uring_cmd *cmd, unsignedint issue_flags)
{ struct fuse_dev *fud; struct fuse_conn *fc;
u32 cmd_op = cmd->cmd_op; int err;
if ((unlikely(issue_flags & IO_URING_F_CANCEL))) {
fuse_uring_cancel(cmd, issue_flags); return 0;
}
/* This extra SQE size holds struct fuse_uring_cmd_req */ if (!(issue_flags & IO_URING_F_SQE128)) return -EINVAL;
fud = fuse_get_dev(cmd->file); if (!fud) {
pr_info_ratelimited("No fuse device found\n"); return -ENOTCONN;
}
fc = fud->fc;
/* Once a connection has io-uring enabled on it, it can't be disabled */ if (!enable_uring && !fc->io_uring) {
pr_info_ratelimited("fuse-io-uring is disabled\n"); return -EOPNOTSUPP;
}
if (fc->aborted) return -ECONNABORTED; if (!fc->connected) return -ENOTCONN;
/* * fuse_uring_register() needs the ring to be initialized, * we need to know the max payload size
*/ if (!fc->initialized) return -EAGAIN;
/* * This prepares and sends the ring request in fuse-uring task context. * User buffers are not mapped yet - the application does not have permission * to write to it - this has to be executed in ring task context.
*/ staticvoid fuse_uring_send_in_task(struct io_uring_cmd *cmd, unsignedint issue_flags)
{ struct fuse_ring_ent *ent = uring_cmd_to_ring_ent(cmd); struct fuse_ring_queue *queue = ent->queue; int err;
/* queue a fuse request and send it if a ring entry is available */ void fuse_uring_queue_fuse_req(struct fuse_iqueue *fiq, struct fuse_req *req)
{ struct fuse_conn *fc = req->fm->fc; struct fuse_ring *ring = fc->ring; struct fuse_ring_queue *queue; struct fuse_ring_ent *ent = NULL; int err;
err = -EINVAL;
queue = fuse_uring_task_to_queue(ring); if (!queue) goto err;
if (req->in.h.opcode != FUSE_NOTIFY_REPLY)
req->in.h.unique = fuse_get_unique(fiq);
spin_lock(&queue->lock);
err = -ENOTCONN; if (unlikely(queue->stopped)) goto err_unlock;
set_bit(FR_URING, &req->flags);
req->ring_queue = queue;
ent = list_first_entry_or_null(&queue->ent_avail_queue, struct fuse_ring_ent, list); if (ent)
fuse_uring_add_req_to_ring_ent(ent, req); else
list_add_tail(&req->list, &queue->fuse_req_queue);
spin_unlock(&queue->lock);
ent = list_first_entry_or_null(&queue->ent_avail_queue, struct fuse_ring_ent, list);
spin_lock(&fc->bg_lock);
fc->num_background++; if (fc->num_background == fc->max_background)
fc->blocked = 1;
fuse_uring_flush_bg(queue);
spin_unlock(&fc->bg_lock);
/* * Due to bg_queue flush limits there might be other bg requests * in the queue that need to be handled first. Or no further req * might be available.
*/
req = list_first_entry_or_null(&queue->fuse_req_queue, struct fuse_req,
list); if (ent && req) {
fuse_uring_add_req_to_ring_ent(ent, req);
spin_unlock(&queue->lock);
staticconststruct fuse_iqueue_ops fuse_io_uring_ops = { /* should be send over io-uring as enhancement */
.send_forget = fuse_dev_queue_forget,
/* * could be send over io-uring, but interrupts should be rare, * no need to make the code complex
*/
.send_interrupt = fuse_dev_queue_interrupt,
.send_req = fuse_uring_queue_fuse_req,
};
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.