// SPDX-License-Identifier: GPL-2.0 /* * Copyright (c) 2016-2018 Oracle. All rights reserved. * * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
*/
/* Each R/W context contains state for one chain of RDMA Read or * Write Work Requests. * * Each WR chain handles a single contiguous server-side buffer, * because scatterlist entries after the first have to start on * page alignment. xdr_buf iovecs cannot guarantee alignment. * * Each WR chain handles only one R_key. Each RPC-over-RDMA segment * from a client may contain a unique R_key, so each WR chain moves * up to one segment at a time. * * The scatterlist makes this data structure over 4KB in size. To * make it less likely to fail, and to handle the allocation for * smaller I/O requests without disabling bottom-halves, these * contexts are created on demand, but cached and reused until the * controlling svcxprt_rdma is destroyed.
*/ struct svc_rdma_rw_ctxt { struct llist_node rw_node; struct list_head rw_list; struct rdma_rw_ctx rw_ctx; unsignedint rw_nents; unsignedint rw_first_sgl_nents; struct sg_table rw_sg_table; struct scatterlist rw_first_sgl[];
};
/** * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O * @rdma: controlling transport instance * @ctxt: R/W context to prepare * @offset: RDMA offset * @handle: RDMA tag/handle * @direction: I/O direction * * Returns on success, the number of WQEs that will be needed * on the workqueue, or a negative errno.
*/ staticint svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, struct svc_rdma_rw_ctxt *ctxt,
u64 offset, u32 handle, enum dma_data_direction direction)
{ int ret;
spin_lock(&rdma->sc_rq_dto_lock);
list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q); /* the unlock pairs with the smp_rmb in svc_xprt_ready */
set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
spin_unlock(&rdma->sc_rq_dto_lock);
svc_xprt_enqueue(&rdma->sc_xprt); return; case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); break; default:
trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
}
/* The RDMA Read has flushed, so the incoming RPC message * cannot be constructed and must be dropped. Signal the * loss to the client by closing the connection.
*/
svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
svc_rdma_recv_ctxt_put(rdma, ctxt);
svc_xprt_deferred_close(&rdma->sc_xprt);
}
/* * Assumptions: * - If ib_post_send() succeeds, only one completion is expected, * even if one or more WRs are flushed. This is true when posting * an rdma_rw_ctx or when posting a single signaled WR.
*/ staticint svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma, struct svc_rdma_chunk_ctxt *cc)
{ struct ib_send_wr *first_wr; conststruct ib_send_wr *bad_wr; struct list_head *tmp; struct ib_cqe *cqe; int ret;
might_sleep();
if (cc->cc_sqecount > rdma->sc_sq_depth) return -EINVAL;
/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
*/ staticvoid svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, unsignedint len, struct svc_rdma_rw_ctxt *ctxt)
{ struct scatterlist *sg = ctxt->rw_sg_table.sgl;
/** * svc_rdma_iov_write - Construct RDMA Writes from an iov * @info: pointer to write arguments * @iov: kvec to write * * Returns: * On success, returns zero * %-E2BIG if the client-provided Write chunk is too small * %-ENOMEM if a resource has been exhausted * %-EIO if an rdma-rw error occurred
*/ staticint svc_rdma_iov_write(struct svc_rdma_write_info *info, conststruct kvec *iov)
{
info->wi_base = iov->iov_base; return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
iov->iov_len);
}
/** * svc_rdma_pages_write - Construct RDMA Writes from pages * @info: pointer to write arguments * @xdr: xdr_buf with pages to write * @offset: offset into the content of @xdr * @length: number of bytes to write * * Returns: * On success, returns zero * %-E2BIG if the client-provided Write chunk is too small * %-ENOMEM if a resource has been exhausted * %-EIO if an rdma-rw error occurred
*/ staticint svc_rdma_pages_write(struct svc_rdma_write_info *info, conststruct xdr_buf *xdr, unsignedint offset, unsignedlong length)
{
info->wi_xdr = xdr;
info->wi_next_off = offset - xdr->head[0].iov_len; return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
length);
}
/** * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf * @xdr: xdr_buf to write * @data: pointer to write arguments * * Returns: * On success, returns zero * %-E2BIG if the client-provided Write chunk is too small * %-ENOMEM if a resource has been exhausted * %-EIO if an rdma-rw error occurred
*/ staticint svc_rdma_xb_write(conststruct xdr_buf *xdr, void *data)
{ struct svc_rdma_write_info *info = data; int ret;
if (xdr->head[0].iov_len) {
ret = svc_rdma_iov_write(info, &xdr->head[0]); if (ret < 0) return ret;
}
if (xdr->page_len) {
ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
xdr->page_len); if (ret < 0) return ret;
}
if (xdr->tail[0].iov_len) {
ret = svc_rdma_iov_write(info, &xdr->tail[0]); if (ret < 0) return ret;
}
/** * svc_rdma_send_write_list - Send all chunks on the Write list * @rdma: controlling RDMA transport * @rctxt: Write list provisioned by the client * @xdr: xdr_buf containing an RPC Reply message * * Returns zero on success, or a negative errno if one or more * Write chunks could not be sent.
*/ int svc_rdma_send_write_list(struct svcxprt_rdma *rdma, conststruct svc_rdma_recv_ctxt *rctxt, conststruct xdr_buf *xdr)
{ struct svc_rdma_chunk *chunk; int ret;
pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) { if (!chunk->ch_payload_length) break;
ret = svc_rdma_send_write_chunk(rdma, chunk, xdr); if (ret < 0) return ret;
} return 0;
}
/** * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk * @rdma: controlling RDMA transport * @write_pcl: Write chunk list provided by client * @reply_pcl: Reply chunk provided by client * @sctxt: Send WR resources * @xdr: xdr_buf containing an RPC Reply * * Returns a non-negative number of bytes the chunk consumed, or * %-E2BIG if the payload was larger than the Reply chunk, * %-EINVAL if client provided too many segments, * %-ENOMEM if rdma_rw context pool was exhausted, * %-ENOTCONN if posting failed (connection is lost), * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
*/ int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma, conststruct svc_rdma_pcl *write_pcl, conststruct svc_rdma_pcl *reply_pcl, struct svc_rdma_send_ctxt *sctxt, conststruct xdr_buf *xdr)
{ struct svc_rdma_write_info *info = &sctxt->sc_reply_info; struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; struct ib_send_wr *first_wr; struct list_head *pos; struct ib_cqe *cqe; int ret;
/** * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk * @rqstp: RPC transaction context * @head: context for ongoing I/O * @chunk: Read chunk to pull * * Return values: * %0: the Read WR chain was constructed successfully * %-EINVAL: there were not enough resources to finish * %-ENOMEM: allocating a local resources failed * %-EIO: a DMA mapping error occurred
*/ staticint svc_rdma_build_read_chunk(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *head, conststruct svc_rdma_chunk *chunk)
{ conststruct svc_rdma_segment *segment; int ret;
ret = -EINVAL;
pcl_for_each_segment(segment, chunk) {
ret = svc_rdma_build_read_segment(rqstp, head, segment); if (ret < 0) break;
head->rc_readbytes += segment->rs_length;
} return ret;
}
/** * svc_rdma_copy_inline_range - Copy part of the inline content into pages * @rqstp: RPC transaction context * @head: context for ongoing I/O * @offset: offset into the Receive buffer of region to copy * @remaining: length of region to copy * * Take a page at a time from rqstp->rq_pages and copy the inline * content from the Receive buffer into that page. Update * head->rc_curpage and head->rc_pageoff so that the next RDMA Read * result will land contiguously with the copied content. * * Return values: * %0: Inline content was successfully copied * %-EINVAL: offset or length was incorrect
*/ staticint svc_rdma_copy_inline_range(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *head, unsignedint offset, unsignedint remaining)
{ unsignedchar *dst, *src = head->rc_recv_buf; unsignedint page_no, numpages;
/** * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks * @rqstp: RPC transaction context * @head: context for ongoing I/O * * The chunk data lands in the page list of rqstp->rq_arg.pages. * * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. * Therefore, XDR round-up of the Read chunk and trailing * inline content must both be added at the end of the pagelist. * * Return values: * %0: RDMA Read WQEs were successfully built * %-EINVAL: client provided too many chunks or segments, * %-ENOMEM: rdma_rw context pool was exhausted, * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/ staticint svc_rdma_read_data_item(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *head)
{ return svc_rdma_build_read_chunk(rqstp, head,
pcl_first_chunk(&head->rc_read_pcl));
}
/** * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk * @rqstp: RPC transaction context * @head: context for ongoing I/O * @chunk: parsed Call chunk to pull * @offset: offset of region to pull * @length: length of region to pull * * Return values: * %0: RDMA Read WQEs were successfully built * %-EINVAL: there were not enough resources to finish * %-ENOMEM: rdma_rw context pool was exhausted, * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/ staticint svc_rdma_read_chunk_range(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *head, conststruct svc_rdma_chunk *chunk, unsignedint offset, unsignedint length)
{ conststruct svc_rdma_segment *segment; int ret;
ret = -EINVAL;
pcl_for_each_segment(segment, chunk) { struct svc_rdma_segment dummy;
if (offset > segment->rs_length) {
offset -= segment->rs_length; continue;
}
/** * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message * @rqstp: RPC transaction context * @head: context for ongoing I/O * * The start of the data lands in the first page just after the * Transport header, and the rest lands in rqstp->rq_arg.pages. * * Assumptions: * - A PZRC is never sent in an RDMA_MSG message, though it's * allowed by spec. * * Return values: * %0: RDMA Read WQEs were successfully built * %-EINVAL: client provided too many chunks or segments, * %-ENOMEM: rdma_rw context pool was exhausted, * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/ static noinline int svc_rdma_read_special(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *head)
{ return svc_rdma_read_call_chunk(rqstp, head);
}
/* Pages under I/O have been copied to head->rc_pages. Ensure that * svc_xprt_release() does not put them when svc_rdma_recvfrom() * returns. This has to be done after all Read WRs are constructed * to properly handle a page that happens to be part of I/O on behalf * of two different RDMA segments. * * Note: if the subsequent post_send fails, these pages have already * been moved to head->rc_pages and thus will be cleaned up by * svc_rdma_recv_ctxt_put().
*/ staticvoid svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *head)
{ unsignedint i;
for (i = 0; i < head->rc_page_count; i++) {
head->rc_pages[i] = rqstp->rq_pages[i];
rqstp->rq_pages[i] = NULL;
}
}
/** * svc_rdma_process_read_list - Pull list of Read chunks from the client * @rdma: controlling RDMA transport * @rqstp: set of pages to use as Read sink buffers * @head: pages under I/O collect here * * The RPC/RDMA protocol assumes that the upper layer's XDR decoders * pull each Read chunk as they decode an incoming RPC message. * * On Linux, however, the server needs to have a fully-constructed RPC * message in rqstp->rq_arg when there is a positive return code from * ->xpo_recvfrom. So the Read list is safety-checked immediately when * it is received, then here the whole Read list is pulled all at once. * The ingress RPC message is fully reconstructed once all associated * RDMA Reads have completed. * * Return values: * %1: all needed RDMA Reads were posted successfully, * %-EINVAL: client provided too many chunks or segments, * %-ENOMEM: rdma_rw context pool was exhausted, * %-ENOTCONN: posting failed (connection is lost), * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *head)
{ struct svc_rdma_chunk_ctxt *cc = &head->rc_cc; int ret;
if (pcl_is_empty(&head->rc_call_pcl)) { if (head->rc_read_pcl.cl_count == 1)
ret = svc_rdma_read_data_item(rqstp, head); else
ret = svc_rdma_read_multiple_chunks(rqstp, head);
} else
ret = svc_rdma_read_special(rqstp, head);
svc_rdma_clear_rqst_pages(rqstp, head); if (ret < 0) return ret;
trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
ret = svc_rdma_post_chunk_ctxt(rdma, cc); return ret < 0 ? ret : 1;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.