// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* * Copyright (c) 2016-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the BSD-type * license below: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials provided * with the distribution. * * Neither the name of the Network Appliance, Inc. nor the names of * its contributors may be used to endorse or promote products * derived from this software without specific prior written * permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Author: Tom Tucker <tom@opengridcomputing.com>
*/
/* Operation * * The main entry point is svc_rdma_recvfrom. This is called from * svc_recv when the transport indicates there is incoming data to * be read. "Data Ready" is signaled when an RDMA Receive completes, * or when a set of RDMA Reads complete. * * An svc_rqst is passed in. This structure contains an array of * free pages (rq_pages) that will contain the incoming RPC message. * * Short messages are moved directly into svc_rqst::rq_arg, and * the RPC Call is ready to be processed by the Upper Layer. * svc_rdma_recvfrom returns the length of the RPC Call message, * completing the reception of the RPC Call. * * However, when an incoming message has Read chunks, * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's * data payload from the client. svc_rdma_recvfrom sets up the * RDMA Reads using pages in svc_rqst::rq_pages, which are * transferred to an svc_rdma_recv_ctxt for the duration of the * I/O. svc_rdma_recvfrom then returns zero, since the RPC message * is still not yet ready. * * When the Read chunk payloads have become available on the * server, "Data Ready" is raised again, and svc_recv calls * svc_rdma_recvfrom again. This second call may use a different * svc_rqst than the first one, thus any information that needs * to be preserved across these two calls is kept in an * svc_rdma_recv_ctxt. * * The second call to svc_rdma_recvfrom performs final assembly * of the RPC Call message, using the RDMA Read sink pages kept in * the svc_rdma_recv_ctxt. The xdr_buf is copied from the * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns * the length of the completed RPC Call message. * * Page Management * * Pages under I/O must be transferred from the first svc_rqst to an * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns. * * The first svc_rqst supplies pages for RDMA Reads. These are moved * from rqstp::rq_pages into ctxt::pages. The consumed elements of * the rq_pages array are set to NULL and refilled with the first * svc_rdma_recvfrom call returns. * * During the second svc_rdma_recvfrom call, RDMA Read sink pages * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst.
*/
/** * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt * @rdma: svcxprt_rdma being torn down *
*/ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
{ struct svc_rdma_recv_ctxt *ctxt; struct llist_node *node;
/** * svc_rdma_release_ctxt - Release transport-specific per-rqst resources * @xprt: the transport which owned the context * @vctxt: the context from rqstp->rq_xprt_ctxt or dr->xprt_ctxt * * Ensure that the recv_ctxt is released whether or not a Reply * was sent. For example, the client could close the connection, * or svc_process could drop an RPC, before the Reply is sent.
*/ void svc_rdma_release_ctxt(struct svc_xprt *xprt, void *vctxt)
{ struct svc_rdma_recv_ctxt *ctxt = vctxt; struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr); if (ret) goto err_free; returntrue;
err_free:
trace_svcrdma_rq_post_err(rdma, ret); while (bad_wr) {
ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt,
rc_recv_wr);
bad_wr = bad_wr->next;
svc_rdma_recv_ctxt_put(rdma, ctxt);
} /* Since we're destroying the xprt, no need to reset
* sc_pending_recvs. */ returnfalse;
}
/** * svc_rdma_post_recvs - Post initial set of Recv WRs * @rdma: fresh svcxprt_rdma * * Return values: * %true: Receive Queue initialization successful * %false: memory allocation or DMA error
*/ bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
{ unsignedint total;
/* For each credit, allocate enough recv_ctxts for one * posted Receive and one RPC in process.
*/
total = (rdma->sc_max_requests * 2) + rdma->sc_recv_batch; while (total--) { struct svc_rdma_recv_ctxt *ctxt;
ctxt = svc_rdma_recv_ctxt_alloc(rdma); if (!ctxt) returnfalse;
llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
}
/* WARNING: Only wc->wr_cqe and wc->status are reliable */
ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
if (wc->status != IB_WC_SUCCESS) goto flushed;
trace_svcrdma_wc_recv(wc, &ctxt->rc_cid);
/* If receive posting fails, the connection is about to be * lost anyway. The server will not be able to send a reply * for this RPC, and the client will retransmit this RPC * anyway when it reconnects. * * Therefore we drop the Receive, even if status was SUCCESS * to reduce the likelihood of replayed requests once the * client reconnects.
*/ if (rdma->sc_pending_recvs < rdma->sc_max_requests) if (!svc_rdma_refresh_recvs(rdma, rdma->sc_recv_batch)) goto dropped;
/* All wc fields are now known to be valid */
ctxt->rc_byte_len = wc->byte_len;
spin_lock(&rdma->sc_rq_dto_lock);
list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q); /* Note the unlock pairs with the smp_rmb in svc_xprt_ready: */
set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
spin_unlock(&rdma->sc_rq_dto_lock); if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
svc_xprt_enqueue(&rdma->sc_xprt); return;
/** * xdr_count_read_segments - Count number of Read segments in Read list * @rctxt: Ingress receive context * @p: Start of an un-decoded Read list * * Before allocating anything, ensure the ingress Read list is safe * to use. * * The segment count is limited to how many segments can fit in the * transport header without overflowing the buffer. That's about 40 * Read segments for a 1KB inline threshold. * * Return values: * %true: Read list is valid. @rctxt's xdr_stream is updated to point * to the first byte past the Read list. rc_read_pcl and * rc_call_pcl cl_count fields are set to the number of * Read segments in the list. * %false: Read list is corrupt. @rctxt's xdr_stream is left in an * unknown state.
*/ staticbool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
{
rctxt->rc_call_pcl.cl_count = 0;
rctxt->rc_read_pcl.cl_count = 0; while (xdr_item_is_present(p)) {
u32 position, handle, length;
u64 offset;
p = xdr_inline_decode(&rctxt->rc_stream,
rpcrdma_readseg_maxsz * sizeof(*p)); if (!p) returnfalse;
xdr_decode_read_segment(p, &position, &handle,
&length, &offset); if (position) { if (position & 3) returnfalse;
++rctxt->rc_read_pcl.cl_count;
} else {
++rctxt->rc_call_pcl.cl_count;
}
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) returnfalse;
} returntrue;
}
/* Sanity check the Read list. * * Sanity checks: * - Read list does not overflow Receive buffer. * - Chunk size limited by largest NFS data payload. * * Return values: * %true: Read list is valid. @rctxt's xdr_stream is updated * to point to the first byte past the Read list. * %false: Read list is corrupt. @rctxt's xdr_stream is left * in an unknown state.
*/ staticbool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
{
__be32 *p;
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) returnfalse; if (!xdr_count_read_segments(rctxt, p)) returnfalse; if (!pcl_alloc_call(rctxt, p)) returnfalse; return pcl_alloc_read(rctxt, p);
}
if (xdr_stream_decode_u32(&rctxt->rc_stream, &segcount)) returnfalse;
/* Before trusting the segcount value enough to use it in * a computation, perform a simple range check. This is an * arbitrary but sensible limit (ie, not architectural).
*/ if (unlikely(segcount > rctxt->rc_maxpages)) returnfalse;
p = xdr_inline_decode(&rctxt->rc_stream,
segcount * rpcrdma_segment_maxsz * sizeof(*p)); return p != NULL;
}
/** * xdr_count_write_chunks - Count number of Write chunks in Write list * @rctxt: Received header and decoding state * @p: start of an un-decoded Write list * * Before allocating anything, ensure the ingress Write list is * safe to use. * * Return values: * %true: Write list is valid. @rctxt's xdr_stream is updated * to point to the first byte past the Write list, and * the number of Write chunks is in rc_write_pcl.cl_count. * %false: Write list is corrupt. @rctxt's xdr_stream is left * in an indeterminate state.
*/ staticbool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
{
rctxt->rc_write_pcl.cl_count = 0; while (xdr_item_is_present(p)) { if (!xdr_check_write_chunk(rctxt)) returnfalse;
++rctxt->rc_write_pcl.cl_count;
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) returnfalse;
} returntrue;
}
/* Sanity check the Write list. * * Implementation limits: * - This implementation currently supports only one Write chunk. * * Sanity checks: * - Write list does not overflow Receive buffer. * - Chunk size limited by largest NFS data payload. * * Return values: * %true: Write list is valid. @rctxt's xdr_stream is updated * to point to the first byte past the Write list. * %false: Write list is corrupt. @rctxt's xdr_stream is left * in an unknown state.
*/ staticbool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
{
__be32 *p;
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) returnfalse; if (!xdr_count_write_chunks(rctxt, p)) returnfalse; if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p)) returnfalse;
/* Sanity check the Reply chunk. * * Sanity checks: * - Reply chunk does not overflow Receive buffer. * - Chunk size limited by largest NFS data payload. * * Return values: * %true: Reply chunk is valid. @rctxt's xdr_stream is updated * to point to the first byte past the Reply chunk. * %false: Reply chunk is corrupt. @rctxt's xdr_stream is left * in an unknown state.
*/ staticbool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
{
__be32 *p;
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p)); if (!p) returnfalse;
if (!xdr_item_is_present(p)) returntrue; if (!xdr_check_write_chunk(rctxt)) returnfalse;
/* RPC-over-RDMA Version One private extension: Remote Invalidation. * Responder's choice: requester signals it can handle Send With * Invalidate, and responder chooses one R_key to invalidate. * * If there is exactly one distinct R_key in the received transport * header, set rc_inv_rkey to that R_key. Otherwise, set it to zero.
*/ staticvoid svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma, struct svc_rdma_recv_ctxt *ctxt)
{ struct svc_rdma_segment *segment; struct svc_rdma_chunk *chunk;
u32 inv_rkey;
/** * svc_rdma_xdr_decode_req - Decode the transport header * @rq_arg: xdr_buf containing ingress RPC/RDMA message * @rctxt: state of decoding * * On entry, xdr->head[0].iov_base points to first byte of the * RPC-over-RDMA transport header. * * On successful exit, head[0] points to first byte past the * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message. * * The length of the RPC-over-RDMA header is returned. * * Assumptions: * - The transport header is entirely contained in the head iovec.
*/ staticint svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg, struct svc_rdma_recv_ctxt *rctxt)
{
__be32 *p, *rdma_argp; unsignedint hdr_len;
/* By convention, backchannel calls arrive via rdma_msg type * messages, and never populate the chunk lists. This makes * the RPC/RDMA header small and fixed in size, so it is * straightforward to check the RPC header's direction field.
*/ staticbool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt, struct svc_rdma_recv_ctxt *rctxt)
{
__be32 *p = rctxt->rc_recv_buf;
if (!xprt->xpt_bc_xprt) returnfalse;
if (rctxt->rc_msgtype != rdma_msg) returnfalse;
if (!pcl_is_empty(&rctxt->rc_call_pcl)) returnfalse; if (!pcl_is_empty(&rctxt->rc_read_pcl)) returnfalse; if (!pcl_is_empty(&rctxt->rc_write_pcl)) returnfalse; if (!pcl_is_empty(&rctxt->rc_reply_pcl)) returnfalse;
/* RPC call direction */ if (*(p + 8) == cpu_to_be32(RPC_CALL)) returnfalse;
returntrue;
}
/* Finish constructing the RPC Call message in rqstp::rq_arg. * * The incoming RPC/RDMA message is an RDMA_MSG type message * with a single Read chunk (only the upper layer data payload * was conveyed via RDMA Read).
*/ staticvoid svc_rdma_read_complete_one(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *ctxt)
{ struct svc_rdma_chunk *chunk = pcl_first_chunk(&ctxt->rc_read_pcl); struct xdr_buf *buf = &rqstp->rq_arg; unsignedint length;
/* Split the Receive buffer between the head and tail * buffers at Read chunk's position. XDR roundup of the * chunk is not included in either the pagelist or in * the tail.
*/
buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
buf->head[0].iov_len = chunk->ch_position;
/* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). * * If the client already rounded up the chunk length, the * length does not change. Otherwise, the length of the page * list is increased to include XDR round-up. * * Currently these chunks always start at page offset 0, * thus the rounded-up length never crosses a page boundary.
*/
buf->pages = &rqstp->rq_pages[0];
length = xdr_align_size(chunk->ch_length);
buf->page_len = length;
buf->len += length;
buf->buflen += length;
}
/* Finish constructing the RPC Call message in rqstp::rq_arg. * * The incoming RPC/RDMA message is an RDMA_MSG type message * with payload in multiple Read chunks and no PZRC.
*/ staticvoid svc_rdma_read_complete_multiple(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *ctxt)
{ struct xdr_buf *buf = &rqstp->rq_arg;
/* Finish constructing the RPC Call message in rqstp::rq_arg. * * The incoming RPC/RDMA message is an RDMA_NOMSG type message * (the RPC message body was conveyed via RDMA Read).
*/ staticvoid svc_rdma_read_complete_pzrc(struct svc_rqst *rqstp, struct svc_rdma_recv_ctxt *ctxt)
{ struct xdr_buf *buf = &rqstp->rq_arg;
/* Transfer the Read chunk pages into @rqstp.rq_pages, replacing * the rq_pages that were already allocated for this rqstp.
*/
release_pages(rqstp->rq_respages, ctxt->rc_page_count); for (i = 0; i < ctxt->rc_page_count; i++)
rqstp->rq_pages[i] = ctxt->rc_pages[i];
/* Update @rqstp's result send buffer to start after the * last page in the RDMA Read payload.
*/
rqstp->rq_respages = &rqstp->rq_pages[ctxt->rc_page_count];
rqstp->rq_next_page = rqstp->rq_respages + 1;
/* Prevent svc_rdma_recv_ctxt_put() from releasing the * pages in ctxt::rc_pages a second time.
*/
ctxt->rc_page_count = 0;
/* Finish constructing the RPC Call message. The exact * procedure for that depends on what kind of RPC/RDMA * chunks were provided by the client.
*/
rqstp->rq_arg = ctxt->rc_saved_arg; if (pcl_is_empty(&ctxt->rc_call_pcl)) { if (ctxt->rc_read_pcl.cl_count == 1)
svc_rdma_read_complete_one(rqstp, ctxt); else
svc_rdma_read_complete_multiple(rqstp, ctxt);
} else {
svc_rdma_read_complete_pzrc(rqstp, ctxt);
}
trace_svcrdma_read_finished(&ctxt->rc_cid);
}
/** * svc_rdma_recvfrom - Receive an RPC call * @rqstp: request structure into which to receive an RPC Call * * Returns: * The positive number of bytes in the RPC Call message, * %0 if there were no Calls ready to return, * %-EINVAL if the Read chunk data is too large, * %-ENOMEM if rdma_rw context pool was exhausted, * %-ENOTCONN if posting failed (connection is lost), * %-EIO if rdma_rw initialization failed (DMA mapping, etc). * * Called in a loop when XPT_DATA is set. XPT_DATA is cleared only * when there are no remaining ctxt's to process. * * The next ctxt is removed from the "receive" lists. * * - If the ctxt completes a Receive, then construct the Call * message from the contents of the Receive buffer. * * - If there are no Read chunks in this message, then finish * assembling the Call message and return the number of bytes * in the message. * * - If there are Read chunks in this message, post Read WRs to * pull that payload. When the Read WRs complete, build the * full message and return the number of bytes in it.
*/ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
{ struct svc_xprt *xprt = rqstp->rq_xprt; struct svcxprt_rdma *rdma_xprt =
container_of(xprt, struct svcxprt_rdma, sc_xprt); struct svc_rdma_recv_ctxt *ctxt; int ret;
/* Prevent svc_xprt_release() from releasing pages in rq_pages * when returning 0 or an error.
*/
rqstp->rq_respages = rqstp->rq_pages;
rqstp->rq_next_page = rqstp->rq_respages;
rqstp->rq_xprt_ctxt = NULL;
spin_lock(&rdma_xprt->sc_rq_dto_lock);
ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q); if (ctxt) {
list_del(&ctxt->rc_list);
spin_unlock(&rdma_xprt->sc_rq_dto_lock);
svc_xprt_received(xprt);
svc_rdma_read_complete(rqstp, ctxt); goto complete;
}
ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q); if (ctxt)
list_del(&ctxt->rc_list); else /* No new incoming requests, terminate the loop */
clear_bit(XPT_DATA, &xprt->xpt_flags);
spin_unlock(&rdma_xprt->sc_rq_dto_lock);
/* Unblock the transport for the next receive */
svc_xprt_received(xprt); if (!ctxt) return 0;
out_readlist: /* This @rqstp is about to be recycled. Save the work * already done constructing the Call message in rq_arg * so it can be restored when the RDMA Reads have * completed.
*/
ctxt->rc_saved_arg = rqstp->rq_arg;
ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt); if (ret < 0) { if (ret == -EINVAL)
svc_rdma_send_error(rdma_xprt, ctxt, ret);
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
svc_xprt_deferred_close(xprt); return ret;
} return 0;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.