/* * Maximum number of partitions an instance can have. * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
*/ #define RNBD_PART_BITS 6
/* * If the size changed, we need to revalidate it
*/
rnbd_clt_info(dev, "Device size changed from %llu to %llu sectors\n",
get_capacity(dev->gd), new_nsectors);
set_capacity_and_notify(dev->gd, new_nsectors);
}
int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, sector_t newsize)
{ int ret = 0;
mutex_lock(&dev->lock); if (dev->dev_state != DEV_STATE_MAPPED) {
pr_err("Failed to set new size of the device, device is not opened\n");
ret = -ENOENT; goto out;
}
rnbd_clt_change_capacity(dev, newsize);
out:
mutex_unlock(&dev->lock);
return ret;
}
staticinlinevoid rnbd_clt_dev_requeue(struct rnbd_queue *q)
{ if (WARN_ON(!q->hctx)) return;
/* We can come here from interrupt, thus async=true */
blk_mq_run_hw_queue(q->hctx, true);
}
enum {
RNBD_DELAY_IFBUSY = -1,
};
/** * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun * @sess: Session to find a queue for * @cpu: Cpu to start the search from * * Description: * Each CPU has a list of HW queues, which needs to be rerun. If a list * is not empty - it is marked with a bit. This function finds first * set bit in a bitmap and returns corresponding CPU list.
*/ staticstruct rnbd_cpu_qlist *
rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
{ int bit;
/* Search from cpu to nr_cpu_ids */
bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); if (bit < nr_cpu_ids) { return per_cpu_ptr(sess->cpu_queues, bit);
} elseif (cpu != 0) { /* Search from 0 to cpu */
bit = find_first_bit(sess->cpu_queues_bm, cpu); if (bit < cpu) return per_cpu_ptr(sess->cpu_queues, bit);
}
/** * rnbd_rerun_if_needed() - rerun next queue marked as stopped * @sess: Session to rerun a queue on * * Description: * Each CPU has it's own list of HW queues, which should be rerun. * Function finds such list with HW queues, takes a list lock, picks up * the first HW queue out of the list and requeues it. * * Return: * True if the queue was requeued, false otherwise. * * Context: * Does not matter.
*/ staticbool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
{ struct rnbd_queue *q = NULL; struct rnbd_cpu_qlist *cpu_q; unsignedlong flags; int *cpup;
/* * To keep fairness and not to let other queues starve we always * try to wake up someone else in round-robin manner. That of course * increases latency but queues always have a chance to be executed.
*/
cpup = get_cpu_ptr(sess->cpu_rr); for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) continue; if (!test_bit(cpu_q->cpu, sess->cpu_queues_bm)) goto unlock;
q = list_first_entry_or_null(&cpu_q->requeue_list,
typeof(*q), requeue_list); if (WARN_ON(!q)) goto clear_bit;
list_del_init(&q->requeue_list);
clear_bit_unlock(0, &q->in_list);
if (list_empty(&cpu_q->requeue_list)) { /* Clear bit if nothing is left */
clear_bit:
clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
}
unlock:
spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
if (q) break;
}
/** * Saves the CPU that is going to be requeued on the per-cpu var. Just * incrementing it doesn't work because rnbd_get_cpu_qlist() will * always return the first CPU with something on the queue list when the * value stored on the var is greater than the last CPU with something * on the list.
*/ if (cpu_q)
*cpup = cpu_q->cpu;
put_cpu_ptr(sess->cpu_rr);
if (q)
rnbd_clt_dev_requeue(q);
return q;
}
/** * rnbd_rerun_all_if_idle() - rerun all queues left in the list if * session is idling (there are no requests * in-flight). * @sess: Session to rerun the queues on * * Description: * This function tries to rerun all stopped queues if there are no * requests in-flight anymore. This function tries to solve an obvious * problem, when number of tags < than number of queues (hctx), which * are stopped and put to sleep. If last permit, which has been just put, * does not wake up all left queues (hctxs), IO requests hang forever. * * That can happen when all number of permits, say N, have been exhausted * from one CPU, and we have many block devices per session, say M. * Each block device has it's own queue (hctx) for each CPU, so eventually * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. * * To avoid this hang last caller of rnbd_put_permit() (last caller is the * one who observes sess->busy == 0) must wake up all remaining queues. * * Context: * Does not matter.
*/ staticvoid rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
{ bool requeued;
do {
requeued = rnbd_rerun_if_needed(sess);
} while (atomic_read(&sess->busy) == 0 && requeued);
}
permit = rtrs_clt_get_permit(sess->rtrs, con_type, wait); if (permit) /* We have a subtle rare case here, when all permits can be * consumed before busy counter increased. This is safe, * because loser will get NULL as a permit, observe 0 busy * counter and immediately restart the queue himself.
*/
atomic_inc(&sess->busy);
return permit;
}
staticvoid rnbd_put_permit(struct rnbd_clt_session *sess, struct rtrs_permit *permit)
{
rtrs_clt_put_permit(sess->rtrs, permit);
atomic_dec(&sess->busy); /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first * and then check queue bits.
*/
smp_mb__after_atomic();
rnbd_rerun_all_if_idle(sess);
}
iu->permit = permit; /* * 1st reference is dropped after finishing sending a "user" message, * 2nd reference is dropped after confirmation with the response is * returned. * 1st and 2nd can happen in any order, so the rnbd_iu should be * released (rtrs_permit returned to rtrs) only after both * are finished.
*/
atomic_set(&iu->refcount, 2);
init_waitqueue_head(&iu->comp.wait);
iu->comp.errno = INT_MAX;
/* INIT state is only triggered from rnbd_clt_map_device */ if (dev->dev_state == DEV_STATE_INIT)
from_map = true;
if (errno) {
rnbd_clt_err(dev, "Opening failed, server responded: %d\n",
errno);
} else {
errno = process_msg_open_rsp(dev, rsp); if (errno) {
u32 device_id = le32_to_cpu(rsp->device_id); /* * If server thinks its fine, but we fail to process * then be nice and send a close to server.
*/
send_msg_close(dev, device_id, RTRS_PERMIT_NOWAIT);
}
} /* We free rsp in rnbd_clt_map_device for map scenario */ if (!from_map)
kfree(rsp);
wake_up_iu_comp(iu, errno);
rnbd_put_iu(dev->sess, iu);
rnbd_clt_put_dev(dev);
}
if (!rnbd_clt_get_sess(sess)) { /* * That can happen only in one case, when RTRS has restablished * the connection and link_ev() is called, but session is almost * dead, last reference on session is put and caller is waiting * for RTRS to close everything.
*/
err = -ENODEV; goto put_iu;
}
err = send_usr_msg(sess->rtrs, READ, iu,
&vec, sizeof(*rsp), iu->sgt.sgl, 1,
msg_sess_info_conf, &errno, wait); if (err) {
rnbd_clt_put_sess(sess);
put_iu:
rnbd_put_iu(sess, iu);
kfree(rsp);
} else {
err = errno;
}
rnbd_put_iu(sess, iu); return err;
}
/* * Careful here: we are called from RTRS link event directly, * thus we can't send any RTRS request and wait for response * or RTRS will not be able to complete request with failure * if something goes wrong (failing of outstanding requests * happens exactly from the context where we are blocking now). * * So to avoid deadlocks each usr message sent from here must * be asynchronous.
*/
mutex_lock(&dev->lock);
skip = (dev->dev_state == DEV_STATE_INIT);
mutex_unlock(&dev->lock); if (skip) /* * When device is establishing connection for the first * time - do not remap, it will be closed soon.
*/ continue;
/* * That is simple percpu variable which stores cpu indices, which are * incremented on each access. We need that for the sake of fairness * to wake up queues in a round-robin manner.
*/
sess->cpu_rr = alloc_percpu(int); if (!sess->cpu_rr) {
err = -ENOMEM; goto err;
}
for_each_possible_cpu(cpu)
* per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE); if (IS_ERR_OR_NULL(sess->rtrs)) {
finish_wait(&sess->rtrs_waitq, &wait); return;
}
mutex_unlock(&sess_lock); /* loop in caller, see __find_and_get_sess(). * You can't leave mutex locked and call schedule(), you will catch a * deadlock with a caller of free_sess(), which has just put the last * reference and is about to take the sess_lock in order to delete * the session from the list.
*/
schedule();
mutex_lock(&sess_lock);
}
again:
list_for_each_entry_safe(sess, sn, &sess_list, list) { if (strcmp(sessname, sess->sessname)) continue;
if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs)) /* * No RTRS connection, session is dying.
*/ continue;
if (rnbd_clt_get_sess(sess)) { /* * Alive session is found, wait for RTRS connection.
*/
mutex_unlock(&sess_lock);
err = wait_for_rtrs_connection(sess); if (err)
rnbd_clt_put_sess(sess);
mutex_lock(&sess_lock);
if (err) /* Session is dying, repeat the loop */ goto again;
return sess;
} /* * Ref is 0, session is dying, wait for RTRS disconnect * in order to avoid session names clashes.
*/
wait_for_rtrs_disconnection(sess); /* * RTRS is disconnected and soon session will be freed, * so repeat a loop.
*/ goto again;
}
return NULL;
}
/* caller is responsible for initializing 'first' to false */ staticstruct
rnbd_clt_session *find_or_create_sess(constchar *sessname, bool *first)
{ struct rnbd_clt_session *sess = NULL;
/* The amount of data that belongs to an I/O and the amount of data that * should be read or written to the disk (bi_size) can differ. * * E.g. When WRITE_SAME is used, only a small amount of data is * transferred that is then written repeatedly over a lot of sectors. * * Get the size of data to be transferred via RTRS by summing up the size * of the scather-gather list entries.
*/ static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
{ struct scatterlist *sg;
size_t tsize = 0; int i;
/* * We only support discards/WRITE_ZEROES with single segment for now. * See queue limits.
*/ if ((req_op(rq) != REQ_OP_DISCARD) && (req_op(rq) != REQ_OP_WRITE_ZEROES))
sg_cnt = blk_rq_map_sg(rq, iu->sgt.sgl);
/** * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy * @dev: Device to be checked * @q: Queue to be added to the requeue list if required * * Description: * If session is busy, that means someone will requeue us when resources * are freed. If session is not doing anything - device is not added to * the list and @false is returned.
*/ staticbool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev, struct rnbd_queue *q)
{ struct rnbd_clt_session *sess = dev->sess; struct rnbd_cpu_qlist *cpu_q; unsignedlong flags; bool added = true; bool need_set;
if (!test_and_set_bit_lock(0, &q->in_list)) { if (WARN_ON(!list_empty(&q->requeue_list))) goto unlock;
need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm); if (need_set) {
set_bit(cpu_q->cpu, sess->cpu_queues_bm); /* Paired with rnbd_put_permit(). Set a bit first * and then observe the busy counter.
*/
smp_mb__before_atomic();
} if (atomic_read(&sess->busy)) {
list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
} else { /* Very unlikely, but possible: busy counter was * observed as zero. Drop all bits and return * false to restart the queue by ourselves.
*/ if (need_set)
clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
clear_bit_unlock(0, &q->in_list);
added = false;
}
}
unlock:
spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
put_cpu_ptr(sess->cpu_queues);
if (delay != RNBD_DELAY_IFBUSY)
blk_mq_delay_run_hw_queue(hctx, delay); elseif (!rnbd_clt_dev_add_to_requeue(dev, q)) /* * If session is not busy we have to restart * the queue ourselves.
*/
blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
}
sess = find_or_create_sess(sessname, &first); if (sess == ERR_PTR(-ENOMEM)) { return ERR_PTR(-ENOMEM);
} elseif ((nr_poll_queues && !first) || (!nr_poll_queues && sess->nr_poll_queues)) { /* * A device MUST have its own session to use the polling-mode. * It must fail to map new device with the same session.
*/
err = -EINVAL; goto put_sess;
}
if (!first) return sess;
if (!path_cnt) {
pr_err("Session %s not found, and path parameter not given", sessname);
err = -ENXIO; goto put_sess;
}
rtrs_ops = (struct rtrs_clt_ops) {
.priv = sess,
.link_ev = rnbd_clt_link_ev,
}; /* * Nothing was found, establish rtrs connection and proceed further.
*/
sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
paths, path_cnt, port_nr,
0, /* Do not use pdu of rtrs */
RECONNECT_DELAY,
MAX_RECONNECTS, nr_poll_queues); if (IS_ERR(sess->rtrs)) {
err = PTR_ERR(sess->rtrs); goto wake_up_and_put;
}
err = rtrs_clt_query(sess->rtrs, &attrs); if (err) goto close_rtrs;
dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE); if (!dev) return ERR_PTR(-ENOMEM);
/* * nr_cpu_ids: the number of softirq queues * nr_poll_queues: the number of polling queues
*/
dev->hw_queues = kcalloc(nr_cpu_ids + nr_poll_queues, sizeof(*dev->hw_queues),
GFP_KERNEL); if (!dev->hw_queues) {
ret = -ENOMEM; goto out_alloc;
}
ret = ida_alloc_max(&index_ida, (1 << (MINORBITS - RNBD_PART_BITS)) - 1,
GFP_KERNEL); if (ret < 0) {
pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
pathname, sess->sessname, ret); goto out_queues;
}
dev->pathname = kstrdup(pathname, GFP_KERNEL); if (!dev->pathname) {
ret = -ENOMEM; goto out_queues;
}
/* Firstly forbid access through sysfs interface */
rnbd_clt_destroy_sysfs_files();
/* * Here at this point there is no any concurrent access to sessions * list and devices list: * 1. New session or device can't be created - session sysfs files * are removed. * 2. Device or session can't be removed - module reference is taken * into account in unmap device sysfs callback. * 3. No IO requests inflight - each file open of block_dev increases * module reference in get_disk(). * * But still there can be user requests inflights, which are sent by * asynchronous send_msg_*() functions, thus before unmapping devices * RTRS session must be explicitly closed.
*/
list_for_each_entry_safe(sess, sn, &sess_list, list) { if (!rnbd_clt_get_sess(sess)) continue;
close_rtrs(sess);
list_for_each_entry_safe(dev, tn, &sess->devs_list, list) { /* * Here unmap happens in parallel for only one reason: * del_gendisk() takes around half a second, so * on huge amount of devices the whole module unload * procedure takes minutes.
*/
INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
queue_work(rnbd_clt_wq, &dev->unmap_on_rmmod_work);
}
rnbd_clt_put_sess(sess);
} /* Wait for all scheduled unmap works */
flush_workqueue(rnbd_clt_wq);
WARN_ON(!list_empty(&sess_list));
}
staticint __init rnbd_client_init(void)
{ int err = 0;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.