// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* * Copyright (c) 2014-2020, Oracle and/or its affiliates. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the BSD-type * license below: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials provided * with the distribution. * * Neither the name of the Network Appliance, Inc. nor the names of * its contributors may be used to endorse or promote products * derived from this software without specific prior written * permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* * rpc_rdma.c * * This file contains the guts of the RPC RDMA protocol, and * does marshaling/unmarshaling, etc. It is also where interfacing * to the Linux RPC framework lives.
*/
/* Returns size of largest RPC-over-RDMA header in a Call message * * The largest Call header contains a full-size Read list and a * minimal Reply chunk.
*/ staticunsignedint rpcrdma_max_call_header_size(unsignedint maxsegs)
{ unsignedint size;
/* Fixed header fields and list discriminators */
size = RPCRDMA_HDRLEN_MIN;
/* Maximum Read list size */
size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
/* Returns size of largest RPC-over-RDMA header in a Reply message * * There is only one Write list or one Reply chunk per Reply * message. The larger list is the Write list.
*/ staticunsignedint rpcrdma_max_reply_header_size(unsignedint maxsegs)
{ unsignedint size;
/* Fixed header fields and list discriminators */
size = RPCRDMA_HDRLEN_MIN;
/* Maximum Write list size */
size += sizeof(__be32); /* segment count */
size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
size += sizeof(__be32); /* list discriminator */
return size;
}
/** * rpcrdma_set_max_header_sizes - Initialize inline payload sizes * @ep: endpoint to initialize * * The max_inline fields contain the maximum size of an RPC message * so the marshaling code doesn't have to repeat this calculation * for every RPC.
*/ void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep)
{ unsignedint maxsegs = ep->re_max_rdma_segs;
/* The client can send a request inline as long as the RPCRDMA header * plus the RPC call fit under the transport's inline limit. If the * combined call message size exceeds that limit, the client must use * a Read chunk for this operation. * * A Read chunk is also required if sending the RPC call inline would * exceed this device's max_sge limit.
*/ staticbool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
{ struct xdr_buf *xdr = &rqst->rq_snd_buf; struct rpcrdma_ep *ep = r_xprt->rx_ep; unsignedint count, remaining, offset;
if (xdr->len > ep->re_max_inline_send) returnfalse;
/* The client can't know how large the actual reply will be. Thus it * plans for the largest possible reply for that particular ULP * operation. If the maximum combined reply message size exceeds that * limit, the client must provide a write list or a reply chunk for * this request.
*/ staticbool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
{ return rqst->rq_rcv_buf.buflen <= r_xprt->rx_ep->re_max_inline_recv;
}
/* The client is required to provide a Reply chunk if the maximum * size of the non-payload part of the RPC Reply is larger than * the inline threshold.
*/ staticbool
rpcrdma_nonpayload_inline(conststruct rpcrdma_xprt *r_xprt, conststruct rpc_rqst *rqst)
{ conststruct xdr_buf *buf = &rqst->rq_rcv_buf;
/* ACL likes to be lazy in allocating pages. For TCP, these * pages can be allocated during receive processing. Not true * for RDMA, which must always provision receive buffers * up front.
*/ static noinline int
rpcrdma_alloc_sparse_pages(struct xdr_buf *buf)
{ struct page **ppages; int len;
len = buf->page_len;
ppages = buf->pages + (buf->page_base >> PAGE_SHIFT); while (len > 0) { if (!*ppages)
*ppages = alloc_page(GFP_NOWAIT | __GFP_NOWARN); if (!*ppages) return -ENOBUFS;
ppages++;
len -= PAGE_SIZE;
}
return 0;
}
/* Convert @vec to a single SGL element. * * Returns pointer to next available SGE, and bumps the total number * of SGEs consumed.
*/ staticstruct rpcrdma_mr_seg *
rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, unsignedint *n)
{
seg->mr_page = virt_to_page(vec->iov_base);
seg->mr_offset = offset_in_page(vec->iov_base);
seg->mr_len = vec->iov_len;
++seg;
++(*n); return seg;
}
/* Convert @xdrbuf into SGEs no larger than a page each. As they * are registered, these SGEs are then coalesced into RDMA segments * when the selected memreg mode supports it. * * Returns positive number of SGEs consumed, or a negative errno.
*/
/* Register and XDR encode the Read list. Supports encoding a list of read * segments that belong to a single read chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * * Read chunklist (a linked list): * N elements, position P (same P for all chunks of same arg!): * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 * * Returns zero on success, or a negative errno if a failure occurred. * @xdr is advanced to the next position in the stream. * * Only a single @pos value is currently supported.
*/ staticint rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
{ struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mr *mr; unsignedint pos; int nsegs;
if (rtype == rpcrdma_noch_pullup || rtype == rpcrdma_noch_mapped) goto done;
pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch)
pos = 0;
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
rtype, seg); if (nsegs < 0) return nsegs;
do {
seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, false, &mr); if (IS_ERR(seg)) return PTR_ERR(seg);
if (encode_read_segment(xdr, mr, pos) < 0) return -EMSGSIZE;
done: if (xdr_stream_encode_item_absent(xdr) < 0) return -EMSGSIZE; return 0;
}
/* Register and XDR encode the Write list. Supports encoding a list * containing one array of plain segments that belong to a single * write chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * * Write chunklist (a list of (one) counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO - 0 * * Returns zero on success, or a negative errno if a failure occurred. * @xdr is advanced to the next position in the stream. * * Only a single Write chunk is currently supported.
*/ staticint rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
{ struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_ep *ep = r_xprt->rx_ep; struct rpcrdma_mr_seg *seg; struct rpcrdma_mr *mr; int nsegs, nchunks;
__be32 *segcount;
if (wtype != rpcrdma_writech) goto done;
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
rqst->rq_rcv_buf.head[0].iov_len,
wtype, seg); if (nsegs < 0) return nsegs;
if (xdr_stream_encode_item_present(xdr) < 0) return -EMSGSIZE;
segcount = xdr_reserve_space(xdr, sizeof(*segcount)); if (unlikely(!segcount)) return -EMSGSIZE; /* Actual value encoded below */
nchunks = 0; do {
seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg);
if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE;
/* Update count of segments in this Write chunk */
*segcount = cpu_to_be32(nchunks);
done: if (xdr_stream_encode_item_absent(xdr) < 0) return -EMSGSIZE; return 0;
}
/* Register and XDR encode the Reply chunk. Supports encoding an array * of plain segments that belong to a single write (reply) chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * * Reply chunk (a counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO * * Returns zero on success, or a negative errno if a failure occurred. * @xdr is advanced to the next position in the stream.
*/ staticint rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
{ struct xdr_stream *xdr = &req->rl_stream; struct rpcrdma_mr_seg *seg; struct rpcrdma_mr *mr; int nsegs, nchunks;
__be32 *segcount;
if (wtype != rpcrdma_replych) { if (xdr_stream_encode_item_absent(xdr) < 0) return -EMSGSIZE; return 0;
}
seg = req->rl_segments;
nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); if (nsegs < 0) return nsegs;
if (xdr_stream_encode_item_present(xdr) < 0) return -EMSGSIZE;
segcount = xdr_reserve_space(xdr, sizeof(*segcount)); if (unlikely(!segcount)) return -EMSGSIZE; /* Actual value encoded below */
nchunks = 0; do {
seg = rpcrdma_mr_prepare(r_xprt, req, seg, nsegs, true, &mr); if (IS_ERR(seg)) return PTR_ERR(seg);
if (encode_rdma_segment(xdr, mr) < 0) return -EMSGSIZE;
/* The first two SGEs contain the transport header and * the inline buffer. These are always left mapped so * they can be cheaply re-used.
*/ for (sge = &sc->sc_sges[2]; sc->sc_unmap_count;
++sge, --sc->sc_unmap_count)
ib_dma_unmap_page(rdmab_device(rb), sge->addr, sge->length,
DMA_TO_DEVICE);
/* The head iovec is straightforward, as it is usually already * DMA-mapped. Sync the content that has changed.
*/ staticbool rpcrdma_prepare_head_iov(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, unsignedint len)
{ struct rpcrdma_sendctx *sc = req->rl_sendctx; struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; struct rpcrdma_regbuf *rb = req->rl_sendbuf;
if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) returnfalse;
/* If there is a page list present, DMA map and prepare an * SGE for each page to be sent.
*/ staticbool rpcrdma_prepare_pagelist(struct rpcrdma_req *req, struct xdr_buf *xdr)
{ struct rpcrdma_sendctx *sc = req->rl_sendctx; struct rpcrdma_regbuf *rb = req->rl_sendbuf; unsignedint page_base, len, remaining; struct page **ppages; struct ib_sge *sge;
/* The tail iovec may include an XDR pad for the page list, * as well as additional content, and may not reside in the * same page as the head iovec.
*/ staticbool rpcrdma_prepare_tail_iov(struct rpcrdma_req *req, struct xdr_buf *xdr, unsignedint page_base, unsignedint len)
{ struct rpcrdma_sendctx *sc = req->rl_sendctx; struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++]; struct rpcrdma_regbuf *rb = req->rl_sendbuf; struct page *page = virt_to_page(xdr->tail[0].iov_base);
/* Copy the tail to the end of the head buffer.
*/ staticvoid rpcrdma_pullup_tail_iov(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct xdr_buf *xdr)
{ unsignedchar *dst;
/* Copy the contents of @xdr into @rl_sendbuf and DMA sync it. * When the head, pagelist, and tail are small, a pull-up copy * is considerably less costly than DMA mapping the components * of @xdr. * * Assumptions: * - the caller has already verified that the total length * of the RPC Call body will fit into @rl_sendbuf.
*/ staticbool rpcrdma_prepare_noch_pullup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct xdr_buf *xdr)
{ if (unlikely(xdr->tail[0].iov_len))
rpcrdma_pullup_tail_iov(r_xprt, req, xdr);
if (unlikely(xdr->page_len))
rpcrdma_pullup_pagelist(r_xprt, req, xdr);
/* The whole RPC message resides in the head iovec now */ return rpcrdma_prepare_head_iov(r_xprt, req, xdr->len);
}
if (!rpcrdma_prepare_head_iov(r_xprt, req, xdr->head[0].iov_len)) returnfalse; if (xdr->page_len) if (!rpcrdma_prepare_pagelist(req, xdr)) returnfalse; if (tail->iov_len) if (!rpcrdma_prepare_tail_iov(req, xdr,
offset_in_page(tail->iov_base),
tail->iov_len)) returnfalse;
if (req->rl_sendctx->sc_unmap_count)
kref_get(&req->rl_kref); returntrue;
}
/* If there is a Read chunk, the page list is being handled * via explicit RDMA, and thus is skipped here.
*/
/* Do not include the tail if it is only an XDR pad */ if (xdr->tail[0].iov_len > 3) { unsignedint page_base, len;
/* If the content in the page list is an odd length, * xdr_write_pages() adds a pad at the beginning of * the tail iovec. Force the tail's non-pad content to * land at the next XDR position in the Send message.
*/
page_base = offset_in_page(xdr->tail[0].iov_base);
len = xdr->tail[0].iov_len;
page_base += len & 3;
len -= len & 3; if (!rpcrdma_prepare_tail_iov(req, xdr, page_base, len)) returnfalse;
kref_get(&req->rl_kref);
}
returntrue;
}
/** * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR * @r_xprt: controlling transport * @req: context of RPC Call being marshalled * @hdrlen: size of transport header, in bytes * @xdr: xdr_buf containing RPC Call * @rtype: chunk type being encoded * * Returns 0 on success; otherwise a negative errno is returned.
*/ inlineint rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, u32 hdrlen, struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
{ int ret;
ret = -EIO; switch (rtype) { case rpcrdma_noch_pullup: if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr)) goto out_unmap; break; case rpcrdma_noch_mapped: if (!rpcrdma_prepare_noch_mapped(r_xprt, req, xdr)) goto out_unmap; break; case rpcrdma_readch: if (!rpcrdma_prepare_readch(r_xprt, req, xdr)) goto out_unmap; break; case rpcrdma_areadch: break; default: goto out_unmap;
}
/** * rpcrdma_marshal_req - Marshal and send one RPC request * @r_xprt: controlling transport * @rqst: RPC request to be marshaled * * For the RPC in "rqst", this function: * - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG) * - Registers Read, Write, and Reply chunks * - Constructs the transport header * - Posts a Send WR to send the transport header and request * * Returns: * %0 if the RPC was sent successfully, * %-ENOTCONN if the connection was lost, * %-EAGAIN if the caller should call again with the same arguments, * %-ENOBUFS if the caller should call again after a delay, * %-EMSGSIZE if the transport header is too small, * %-EIO if a permanent problem occurred while marshaling.
*/ int
rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
{ struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct xdr_stream *xdr = &req->rl_stream; enum rpcrdma_chunktype rtype, wtype; struct xdr_buf *buf = &rqst->rq_snd_buf; bool ddp_allowed;
__be32 *p; int ret;
if (unlikely(rqst->rq_rcv_buf.flags & XDRBUF_SPARSE_PAGES)) {
ret = rpcrdma_alloc_sparse_pages(&rqst->rq_rcv_buf); if (ret) return ret;
}
/* Fixed header fields */
ret = -EMSGSIZE;
p = xdr_reserve_space(xdr, 4 * sizeof(*p)); if (!p) goto out_err;
*p++ = rqst->rq_xid;
*p++ = rpcrdma_version;
*p++ = r_xprt->rx_buf.rb_max_requests;
/* When the ULP employs a GSS flavor that guarantees integrity * or privacy, direct data placement of individual data items * is not allowed.
*/
ddp_allowed = !test_bit(RPCAUTH_AUTH_DATATOUCH,
&rqst->rq_cred->cr_auth->au_flags);
/* * Chunks needed for results? * * o If the expected result is under the inline threshold, all ops * return as inline. * o Large read ops return data as write chunk(s), header as * inline. * o Large non-read ops return as a single reply chunk.
*/ if (rpcrdma_results_inline(r_xprt, rqst))
wtype = rpcrdma_noch; elseif ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
rpcrdma_nonpayload_inline(r_xprt, rqst))
wtype = rpcrdma_writech; else
wtype = rpcrdma_replych;
/* * Chunks needed for arguments? * * o If the total request is under the inline threshold, all ops * are sent as inline. * o Large write ops transmit data as read chunk(s), header as * inline. * o Large non-write ops are sent with the entire message as a * single read chunk (protocol 0-position special case). * * This assumes that the upper layer does not present a request * that both has a data payload, and whose non-data arguments * by themselves are larger than the inline threshold.
*/ if (rpcrdma_args_inline(r_xprt, rqst)) {
*p++ = rdma_msg;
rtype = buf->len < rdmab_length(req->rl_sendbuf) ?
rpcrdma_noch_pullup : rpcrdma_noch_mapped;
} elseif (ddp_allowed && buf->flags & XDRBUF_WRITE) {
*p++ = rdma_msg;
rtype = rpcrdma_readch;
} else {
r_xprt->rx_stats.nomsg_call_count++;
*p++ = rdma_nomsg;
rtype = rpcrdma_areadch;
}
/* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: * * - Read list * - Write list * - Reply chunk * - Read list + Reply chunk * * It might not yet support the following combinations: * * - Read list + Write list * * It does not support the following combinations: * * - Write list + Reply chunk * - Read list + Write list + Reply chunk * * This implementation supports only a single chunk in each * Read or Write list. Thus for example the client cannot * send a Call message with a Position Zero Read chunk and a * regular Read chunk at the same time.
*/
ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); if (ret) goto out_err;
ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); if (ret) goto out_err;
ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); if (ret) goto out_err;
ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len,
buf, rtype); if (ret) goto out_err;
/** * rpcrdma_reset_cwnd - Reset the xprt's congestion window * @r_xprt: controlling transport instance * * Prepare @r_xprt for the next connection by reinitializing * its credit grant to one (see RFC 8166, Section 3.3.3).
*/ void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt)
{ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
/** * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs * @rqst: controlling RPC request * @srcp: points to RPC message payload in receive buffer * @copy_len: remaining length of receive buffer content * @pad: Write chunk pad bytes needed (zero for pure inline) * * The upper layer has set the maximum number of bytes it can * receive in each component of rq_rcv_buf. These values are set in * the head.iov_len, page_len, tail.iov_len, and buflen fields. * * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in * many cases this function simply updates iov_base pointers in * rq_rcv_buf to point directly to the received reply data, to * avoid copying reply data. * * Returns the count of bytes which had to be memcopied.
*/ staticunsignedlong
rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
{ unsignedlong fixup_copy_count; int i, npages, curlen; char *destp; struct page **ppages; int page_base;
/* The head iovec is redirected to the RPC reply message * in the receive buffer, to avoid a memcopy.
*/
rqst->rq_rcv_buf.head[0].iov_base = srcp;
rqst->rq_private_buf.head[0].iov_base = srcp;
/* The contents of the receive buffer that follow * head.iov_len bytes are copied into the page list.
*/
curlen = rqst->rq_rcv_buf.head[0].iov_len; if (curlen > copy_len)
curlen = copy_len;
srcp += curlen;
copy_len -= curlen;
ppages = rqst->rq_rcv_buf.pages +
(rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
fixup_copy_count = 0; if (copy_len && rqst->rq_rcv_buf.page_len) { int pagelist_len;
pagelist_len = rqst->rq_rcv_buf.page_len; if (pagelist_len > copy_len)
pagelist_len = copy_len;
npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; for (i = 0; i < npages; i++) {
curlen = PAGE_SIZE - page_base; if (curlen > pagelist_len)
curlen = pagelist_len;
/* Implicit padding for the last segment in a Write * chunk is inserted inline at the front of the tail * iovec. The upper layer ignores the content of * the pad. Simply ensure inline content in the tail * that follows the Write chunk is properly aligned.
*/ if (pad)
srcp -= pad;
}
/* The tail iovec is redirected to the remaining data * in the receive buffer, to avoid a memcopy.
*/ if (copy_len || pad) {
rqst->rq_rcv_buf.tail[0].iov_base = srcp;
rqst->rq_private_buf.tail[0].iov_base = srcp;
}
if (fixup_copy_count)
trace_xprtrdma_fixup(rqst, fixup_copy_count); return fixup_copy_count;
}
/* By convention, backchannel calls arrive via rdma_msg type * messages, and never populate the chunk lists. This makes * the RPC/RDMA header small and fixed in size, so it is * straightforward to check the RPC header's direction field.
*/ staticbool
rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep) #ifdefined(CONFIG_SUNRPC_BACKCHANNEL)
{ struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct xdr_stream *xdr = &rep->rr_stream;
__be32 *p;
if (rep->rr_proc != rdma_msg) returnfalse;
/* Peek at stream contents without advancing. */
p = xdr_inline_decode(xdr, 0);
/* Chunk lists */ if (xdr_item_is_present(p++)) returnfalse; if (xdr_item_is_present(p++)) returnfalse; if (xdr_item_is_present(p++)) returnfalse;
/* RPC header */ if (*p++ != rep->rr_xid) returnfalse; if (*p != cpu_to_be32(RPC_CALL)) returnfalse;
/* No bc service. */ if (xprt->bc_serv == NULL) returnfalse;
/* Now that we are sure this is a backchannel call, * advance to the RPC header.
*/
p = xdr_inline_decode(xdr, 3 * sizeof(*p)); if (unlikely(!p)) returntrue;
p = xdr_inline_decode(xdr, sizeof(*p)); if (unlikely(!p)) return -EIO;
*length = 0;
segcount = be32_to_cpup(p); while (segcount--) { if (decode_rdma_segment(xdr, &seglength)) return -EIO;
*length += seglength;
}
return 0;
}
/* In RPC-over-RDMA Version One replies, a Read list is never * expected. This decoder is a stub that returns an error if * a Read list is present.
*/ staticint decode_read_list(struct xdr_stream *xdr)
{
__be32 *p;
p = xdr_inline_decode(xdr, sizeof(*p)); if (unlikely(!p)) return -EIO; if (unlikely(xdr_item_is_present(p))) return -EIO; return 0;
}
/* Supports only one Write chunk in the Write list
*/ staticint decode_write_list(struct xdr_stream *xdr, u32 *length)
{
u32 chunklen; bool first;
__be32 *p;
*length = 0;
first = true; do {
p = xdr_inline_decode(xdr, sizeof(*p)); if (unlikely(!p)) return -EIO; if (xdr_item_is_absent(p)) break; if (!first) return -EIO;
if (decode_write_chunk(xdr, &chunklen)) return -EIO;
*length += chunklen;
first = false;
} while (true); return 0;
}
/* Decode the chunk lists */ if (decode_read_list(xdr)) return -EIO; if (decode_write_list(xdr, &writelist)) return -EIO; if (decode_reply_chunk(xdr, &replychunk)) return -EIO;
/* RDMA_MSG sanity checks */ if (unlikely(replychunk)) return -EIO;
/* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
base = (char *)xdr_inline_decode(xdr, 0);
rpclen = xdr_stream_remaining(xdr);
r_xprt->rx_stats.fixup_copy_count +=
rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
/* Decode the chunk lists */ if (decode_read_list(xdr)) return -EIO; if (decode_write_list(xdr, &writelist)) return -EIO; if (decode_reply_chunk(xdr, &replychunk)) return -EIO;
/* RDMA_NOMSG sanity checks */ if (unlikely(writelist)) return -EIO; if (unlikely(!replychunk)) return -EIO;
/* Reply chunk buffer already is the reply vector */
r_xprt->rx_stats.total_rdma_reply += replychunk; return replychunk;
}
p = xdr_inline_decode(xdr, sizeof(*p)); if (unlikely(!p)) return -EIO;
switch (*p) { case err_vers:
p = xdr_inline_decode(xdr, 2 * sizeof(*p)); if (!p) break;
trace_xprtrdma_err_vers(rqst, p, p + 1); break; case err_chunk:
trace_xprtrdma_err_chunk(rqst); break; default:
trace_xprtrdma_err_unrecognized(rqst, p);
}
return -EIO;
}
/** * rpcrdma_unpin_rqst - Release rqst without completing it * @rep: RPC/RDMA Receive context * * This is done when a connection is lost so that a Reply * can be dropped and its matching Call can be subsequently * retransmitted on a new connection.
*/ void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep)
{ struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt; struct rpc_rqst *rqst = rep->rr_rqst; struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
/** * rpcrdma_complete_rqst - Pass completed rqst back to RPC * @rep: RPC/RDMA Receive context * * Reconstruct the RPC reply and complete the transaction * while @rqst is still pinned to ensure the rep, rqst, and * rq_task pointers remain stable.
*/ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
{ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct rpc_rqst *rqst = rep->rr_rqst; int status;
switch (rep->rr_proc) { case rdma_msg:
status = rpcrdma_decode_msg(r_xprt, rep, rqst); break; case rdma_nomsg:
status = rpcrdma_decode_nomsg(r_xprt, rep); break; case rdma_error:
status = rpcrdma_decode_error(r_xprt, rep, rqst); break; default:
status = -EIO;
} if (status < 0) goto out_badheader;
/** * rpcrdma_reply_handler - Process received RPC/RDMA messages * @rep: Incoming rpcrdma_rep object to process * * Errors must result in the RPC task either being awakened, or * allowed to timeout, to discover the errors at that time.
*/ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
{ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_req *req; struct rpc_rqst *rqst;
u32 credits;
__be32 *p;
/* Any data means we had a useful conversation, so * then we don't need to delay the next reconnect.
*/ if (xprt->reestablish_timeout)
xprt->reestablish_timeout = 0;
if (rep->rr_vers != rpcrdma_version) goto out_badversion;
if (rpcrdma_is_bcall(r_xprt, rep)) return;
/* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks.
*/
spin_lock(&xprt->queue_lock);
rqst = xprt_lookup_rqst(xprt, rep->rr_xid); if (!rqst) goto out_norqst;
xprt_pin_rqst(rqst);
spin_unlock(&xprt->queue_lock);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.