// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* * Copyright (c) 2014-2017 Oracle. All rights reserved. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the BSD-type * license below: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials provided * with the distribution. * * Neither the name of the Network Appliance, Inc. nor the names of * its contributors may be used to endorse or promote products * derived from this software without specific prior written * permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/* * verbs.c * * Encapsulates the major functions managing: * o adapters * o endpoints * o connections * o buffer memory
*/
/* Wait for outstanding transport work to finish. ib_drain_qp * handles the drains in the wrong order for us, so open code * them here.
*/ staticvoid rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
{ struct rpcrdma_ep *ep = r_xprt->rx_ep; struct rdma_cm_id *id = ep->re_id;
/* Wait for rpcrdma_post_recvs() to leave its critical * section.
*/ if (atomic_inc_return(&ep->re_receiving) > 1)
wait_for_completion(&ep->re_done);
/* Flush Receives, then wait for deferred Reply work * to complete.
*/
ib_drain_rq(id->qp);
/* Deferred Reply processing might have scheduled * local invalidations.
*/
ib_drain_sq(id->qp);
rpcrdma_ep_put(ep);
}
/* Ensure xprt_force_disconnect() is invoked exactly once when a * connection is closed or lost. (The important thing is it needs * to be invoked "at least" once).
*/ void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
{ if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
xprt_force_disconnect(ep->re_xprt);
}
/** * rpcrdma_flush_disconnect - Disconnect on flushed completion * @r_xprt: transport to disconnect * @wc: work completion entry * * Must be called in process context.
*/ void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
{ if (wc->status != IB_WC_SUCCESS)
rpcrdma_force_disconnect(r_xprt->rx_ep);
}
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_send(wc, &sc->sc_cid);
rpcrdma_sendctx_put_locked(r_xprt, sc);
rpcrdma_flush_disconnect(r_xprt, wc);
}
/* WARNING: Only wr_cqe and status are reliable at this point */
trace_xprtrdma_wc_receive(wc, &rep->rr_cid);
--r_xprt->rx_ep->re_receive_count; if (wc->status != IB_WC_SUCCESS) goto out_flushed;
/* status == SUCCESS means all fields in wc are trustworthy */
rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
rep->rr_wc_flags = wc->wc_flags;
rep->rr_inv_rkey = wc->ex.invalidate_rkey;
if (rsize < ep->re_inline_recv)
ep->re_inline_recv = rsize; if (wsize < ep->re_inline_send)
ep->re_inline_send = wsize;
rpcrdma_set_max_header_sizes(ep);
}
/** * rpcrdma_cm_event_handler - Handle RDMA CM events * @id: rdma_cm_id on which an event has occurred * @event: details of the event * * Called with @id's mutex held. Returns 1 if caller should * destroy @id, otherwise 0.
*/ staticint
rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
{ struct rpcrdma_ep *ep = id->context;
might_sleep();
switch (event->event) { case RDMA_CM_EVENT_ADDR_RESOLVED: case RDMA_CM_EVENT_ROUTE_RESOLVED:
ep->re_async_rc = 0;
complete(&ep->re_done); return 0; case RDMA_CM_EVENT_ADDR_ERROR:
ep->re_async_rc = -EPROTO;
complete(&ep->re_done); return 0; case RDMA_CM_EVENT_ROUTE_ERROR:
ep->re_async_rc = -ENETUNREACH;
complete(&ep->re_done); return 0; case RDMA_CM_EVENT_ADDR_CHANGE:
ep->re_connect_status = -ENODEV; goto disconnected; case RDMA_CM_EVENT_ESTABLISHED:
rpcrdma_ep_get(ep);
ep->re_connect_status = 1;
rpcrdma_update_cm_private(ep, &event->param.conn);
trace_xprtrdma_inline_thresh(ep);
wake_up_all(&ep->re_connect_wait); break; case RDMA_CM_EVENT_CONNECT_ERROR:
ep->re_connect_status = -ENOTCONN; goto wake_connect_worker; case RDMA_CM_EVENT_UNREACHABLE:
ep->re_connect_status = -ENETUNREACH; goto wake_connect_worker; case RDMA_CM_EVENT_REJECTED:
ep->re_connect_status = -ECONNREFUSED; if (event->status == IB_CM_REJ_STALE_CONN)
ep->re_connect_status = -ENOTCONN;
wake_connect_worker:
wake_up_all(&ep->re_connect_wait); return 0; case RDMA_CM_EVENT_DISCONNECTED:
ep->re_connect_status = -ECONNABORTED;
disconnected:
rpcrdma_force_disconnect(ep); return rpcrdma_ep_put(ep); default: break;
}
/* Returns: * %0 if @ep still has a positive kref count, or * %1 if @ep was destroyed successfully.
*/ static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
{ return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
}
/* Client offers RDMA Read but does not initiate */
ep->re_remote_cma.initiator_depth = 0;
ep->re_remote_cma.responder_resources =
min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
/* Limit transport retries so client can detect server * GID changes quickly. RPC layer handles re-establishing * transport connection and retransmission.
*/
ep->re_remote_cma.retry_count = 6;
/* RPC-over-RDMA handles its own flow control. In addition, * make all RNR NAKs visible so we know that RPC-over-RDMA * flow control is working correctly (no NAKs should be seen).
*/
ep->re_remote_cma.flow_control = 0;
ep->re_remote_cma.rnr_retry_count = 0;
/** * rpcrdma_xprt_connect - Connect an unconnected transport * @r_xprt: controlling transport instance * * Returns 0 on success or a negative errno.
*/ int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
{ struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct rpcrdma_ep *ep; int rc;
rc = rpcrdma_ep_create(r_xprt); if (rc) return rc;
ep = r_xprt->rx_ep;
/** * rpcrdma_xprt_disconnect - Disconnect underlying transport * @r_xprt: controlling transport instance * * Caller serializes. Either the transport send lock is held, * or we're being called to destroy the transport. * * On return, @r_xprt is completely divested of all hardware * resources and prepared for the next ->connect operation.
*/ void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
{ struct rpcrdma_ep *ep = r_xprt->rx_ep; struct rdma_cm_id *id; int rc;
if (!ep) return;
id = ep->re_id;
rc = rdma_disconnect(id);
trace_xprtrdma_disconnect(r_xprt, rc);
/* Fixed-size circular FIFO queue. This implementation is wait-free and * lock-free. * * Consumer is the code path that posts Sends. This path dequeues a * sendctx for use by a Send operation. Multiple consumer threads * are serialized by the RPC transport lock, which allows only one * ->send_request call at a time. * * Producer is the code path that handles Send completions. This path * enqueues a sendctx that has been completed. Multiple producer * threads are serialized by the ib_poll_cq() function.
*/
/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced * queue activity, and rpcrdma_xprt_drain has flushed all remaining * Send requests.
*/ staticvoid rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
{ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; unsignedlong i;
if (!buf->rb_sc_ctxs) return; for (i = 0; i <= buf->rb_sc_last; i++)
kfree(buf->rb_sc_ctxs[i]);
kfree(buf->rb_sc_ctxs);
buf->rb_sc_ctxs = NULL;
}
/* Maximum number of concurrent outstanding Send WRs. Capping * the circular queue size stops Send Queue overflow by causing * the ->send_request call to fail temporarily before too many * Sends are posted.
*/
i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), XPRTRDMA_GFP_FLAGS); if (!buf->rb_sc_ctxs) return -ENOMEM;
buf->rb_sc_last = i - 1; for (i = 0; i <= buf->rb_sc_last; i++) {
sc = rpcrdma_sendctx_create(r_xprt->rx_ep); if (!sc) return -ENOMEM;
/* The sendctx queue is not guaranteed to have a size that is a * power of two, thus the helpers in circ_buf.h cannot be used. * The other option is to use modulus (%), which can be expensive.
*/ staticunsignedlong rpcrdma_sendctx_next(struct rpcrdma_buffer *buf, unsignedlong item)
{ return likely(item < buf->rb_sc_last) ? item + 1 : 0;
}
/** * rpcrdma_sendctx_get_locked - Acquire a send context * @r_xprt: controlling transport instance * * Returns pointer to a free send completion context; or NULL if * the queue is empty. * * Usage: Called to acquire an SGE array before preparing a Send WR. * * The caller serializes calls to this function (per transport), and * provides an effective memory barrier that flushes the new value * of rb_sc_head.
*/ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
{ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_sendctx *sc; unsignedlong next_head;
if (next_head == READ_ONCE(buf->rb_sc_tail)) goto out_emptyq;
/* ORDER: item must be accessed _before_ head is updated */
sc = buf->rb_sc_ctxs[next_head];
/* Releasing the lock in the caller acts as a memory * barrier that flushes rb_sc_head.
*/
buf->rb_sc_head = next_head;
return sc;
out_emptyq: /* The queue is "empty" if there have not been enough Send * completions recently. This is a sign the Send Queue is * backing up. Cause the caller to pause and try again.
*/
xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
r_xprt->rx_stats.empty_sendctx_q++; return NULL;
}
/** * rpcrdma_sendctx_put_locked - Release a send context * @r_xprt: controlling transport instance * @sc: send context to release * * Usage: Called from Send completion to return a sendctxt * to the queue. * * The caller serializes calls to this function (per transport).
*/ staticvoid rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt, struct rpcrdma_sendctx *sc)
{ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; unsignedlong next_tail;
/* Unmap SGEs of previously completed but unsignaled * Sends by walking up the queue until @sc is found.
*/
next_tail = buf->rb_sc_tail; do {
next_tail = rpcrdma_sendctx_next(buf, next_tail);
/* ORDER: item must be accessed _before_ tail is updated */
rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
} while (buf->rb_sc_ctxs[next_tail] != sc);
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
/* If there is no underlying connection, it's no use * to wake the refresh worker.
*/ if (ep->re_connect_status != 1) return;
queue_work(system_highpri_wq, &buf->rb_refresh_worker);
}
/** * rpcrdma_req_create - Allocate an rpcrdma_req object * @r_xprt: controlling r_xprt * @size: initial size, in bytes, of send and receive buffers * * Returns an allocated and fully initialized rpcrdma_req or NULL.
*/ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt,
size_t size)
{ struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; struct rpcrdma_req *req;
req = kzalloc(sizeof(*req), XPRTRDMA_GFP_FLAGS); if (req == NULL) goto out1;
req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE); if (!req->rl_sendbuf) goto out2;
req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE); if (!req->rl_recvbuf) goto out3;
/** * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object * @r_xprt: controlling transport instance * @req: rpcrdma_req object to set up * * Returns zero on success, and a negative errno on failure.
*/ int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{ struct rpcrdma_regbuf *rb;
size_t maxhdrsize;
/* Compute maximum header buffer size in bytes */
maxhdrsize = rpcrdma_fixed_maxsz + 3 +
r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
maxhdrsize *= sizeof(__be32);
rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
DMA_TO_DEVICE); if (!rb) goto out;
if (!__rpcrdma_regbuf_dma_map(r_xprt, rb)) goto out_free;
/* ASSUMPTION: the rb_allreqs list is stable for the duration, * and thus can be walked without holding rb_lock. Eg. the * caller is holding the transport send lock to exclude * device removal or disconnection.
*/ staticint rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
{ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_req *req; int rc;
/* The verbs consumer can't know the state of an MR on the * req->rl_registered list unless a successful completion * has occurred, so they cannot be re-used.
*/ while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
/* ASSUMPTION: the rb_allreqs list is stable for the duration, * and thus can be walked without holding rb_lock. Eg. the * caller is holding the transport send lock to exclude * device removal or disconnection.
*/ staticvoid rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
{ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_req *req;
/* Calls to llist_del_first are required to be serialized */
node = llist_del_first(&buf->rb_free_reps); if (!node) return NULL; return llist_entry(node, struct rpcrdma_rep, rr_node);
}
/** * rpcrdma_rep_put - Release rpcrdma_rep back to free list * @buf: buffer pool * @rep: rep to release *
*/ void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
{
llist_add(&rep->rr_node, &buf->rb_free_reps);
}
/* Caller must ensure the QP is quiescent (RQ is drained) before * invoking this function, to guarantee rb_all_reps is not * changing.
*/ staticvoid rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
{ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_rep *rep;
/** * rpcrdma_buffer_create - Create initial set of req/rep objects * @r_xprt: transport instance to (re)initialize * * Returns zero on success, otherwise a negative errno.
*/ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; int i, rc;
/** * rpcrdma_buffer_destroy - Release all hw resources * @buf: root control block for resources * * ORDERING: relies on a prior rpcrdma_xprt_drain : * - No more Send or Receive completions can occur * - All MRs, reps, and reqs are returned to their free lists
*/ void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
rpcrdma_reps_destroy(buf);
while (!list_empty(&buf->rb_send_bufs)) { struct rpcrdma_req *req;
/** * rpcrdma_reply_put - Put reply buffers back into pool * @buffers: buffer pool * @req: object to return *
*/ void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
{ if (req->rl_reply) {
rpcrdma_rep_put(buffers, req->rl_reply);
req->rl_reply = NULL;
}
}
/** * rpcrdma_buffer_get - Get a request buffer * @buffers: Buffer pool from which to obtain a buffer * * Returns a fresh rpcrdma_req, or NULL if none are available.
*/ struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{ struct rpcrdma_req *req;
/* Returns a pointer to a rpcrdma_regbuf object, or NULL. * * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for * receiving the payload of RDMA RECV operations. During Long Calls * or Replies they may be registered externally via frwr_map.
*/ staticstruct rpcrdma_regbuf *
rpcrdma_regbuf_alloc_node(size_t size, enum dma_data_direction direction, int node)
{ struct rpcrdma_regbuf *rb;
/** * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer * @rb: regbuf to reallocate * @size: size of buffer to be allocated, in bytes * @flags: GFP flags * * Returns true if reallocation was successful. If false is * returned, @rb is left untouched.
*/ bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
{ void *buf;
buf = kmalloc(size, flags); if (!buf) returnfalse;
if (atomic_inc_return(&ep->re_receiving) > 1) goto out;
/* fast path: all needed reps can be found on the free list */
wr = NULL; while (needed) {
rep = rpcrdma_rep_get_locked(buf); if (!rep)
rep = rpcrdma_rep_create(r_xprt); if (!rep) break; if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) {
rpcrdma_rep_put(buf, rep); break;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.