// SPDX-License-Identifier: GPL-2.0 /* * Shared Memory Communications over RDMA (SMC-R) and RoCE * * Manage RMBE * copy new RMBE data into user space * * Copyright IBM Corp. 2016 * * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
*/
/* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted * @smc smc socket * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout * @peeked number of bytes already peeked * @fcrit add'l criterion to evaluate as function pointer * Returns: * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown. * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
*/ int smc_rx_wait(struct smc_sock *smc, long *timeo, size_t peeked, int (*fcrit)(struct smc_connection *conn, size_t baseline))
{
DEFINE_WAIT_FUNC(wait, woken_wake_function); struct smc_connection *conn = &smc->conn; struct smc_cdc_conn_state_flags *cflags =
&conn->local_tx_ctrl.conn_state_flags; struct sock *sk = &smc->sk; int rc;
if (smc_rx_data_available(conn, peeked)) returntrue; elseif (conn->urg_state == SMC_URG_VALID) /* we received a single urgent Byte - skip */
smc_rx_update_cons(smc, 0); returnfalse;
}
/* smc_rx_recvmsg - receive data from RMBE * @msg: copy data to receive buffer * @pipe: copy data to pipe if set - indicates splice() call * * rcvbuf consumer: main API called by socket layer. * Called under sk lock.
*/ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg, struct pipe_inode_info *pipe, size_t len, int flags)
{
size_t copylen, read_done = 0, read_remaining = len, peeked_bytes = 0;
size_t chunk_len, chunk_off, chunk_len_sum; struct smc_connection *conn = &smc->conn; int (*func)(struct smc_connection *conn, size_t baseline); union smc_host_cursor cons; int readable, chunk; char *rcvbuf_base; struct sock *sk; int splbytes; long timeo; int target; /* Read at least these many bytes */ int rc;
if (unlikely(flags & MSG_ERRQUEUE)) return -EINVAL; /* future work for sk.sk_family == AF_SMC */
readable = atomic_read(&conn->bytes_to_rcv); if (readable >= conn->rmb_desc->len)
SMC_STAT_RMB_RX_FULL(smc, !conn->lnk);
if (len < readable)
SMC_STAT_RMB_RX_SIZE_SMALL(smc, !conn->lnk); /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr;
do { /* while (read_remaining) */ if (read_done >= target || (pipe && read_done)) break;
if (conn->killed) break;
if (smc_rx_recvmsg_data_available(smc, peeked_bytes)) goto copy;
if (sk->sk_shutdown & RCV_SHUTDOWN) { /* smc_cdc_msg_recv_action() could have run after * above smc_rx_recvmsg_data_available()
*/ if (smc_rx_recvmsg_data_available(smc, peeked_bytes)) goto copy; break;
}
if (read_done) { if (sk->sk_err ||
sk->sk_state == SMC_CLOSED ||
!timeo ||
signal_pending(current)) break;
} else { if (sk->sk_err) {
read_done = sock_error(sk); break;
} if (sk->sk_state == SMC_CLOSED) { if (!sock_flag(sk, SOCK_DONE)) { /* This occurs when user tries to read * from never connected socket.
*/
read_done = -ENOTCONN; break;
} break;
} if (!timeo) return -EAGAIN; if (signal_pending(current)) {
read_done = sock_intr_errno(timeo); break;
}
}
if (!smc_rx_data_available(conn, peeked_bytes)) {
smc_rx_wait(smc, &timeo, peeked_bytes, smc_rx_data_available); continue;
}
copy: /* initialize variables for 1st iteration of subsequent loop */ /* could be just 1 byte, even after waiting on data above */
readable = smc_rx_data_available(conn, peeked_bytes);
splbytes = atomic_read(&conn->splice_pending); if (!readable || (msg && splbytes)) { if (splbytes)
func = smc_rx_data_available_and_no_splice_pend; else
func = smc_rx_data_available;
smc_rx_wait(smc, &timeo, peeked_bytes, func); continue;
}
smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn); if ((flags & MSG_PEEK) && peeked_bytes)
smc_curs_add(conn->rmb_desc->len, &cons, peeked_bytes); /* subsequent splice() calls pick up where previous left */ if (splbytes)
smc_curs_add(conn->rmb_desc->len, &cons, splbytes); if (conn->urg_state == SMC_URG_VALID &&
sock_flag(&smc->sk, SOCK_URGINLINE) &&
readable > 1)
readable--; /* always stop at urgent Byte */ /* not more than what user space asked for */
copylen = min_t(size_t, read_remaining, readable); /* determine chunks where to read from rcvbuf */ /* either unwrapped case, or 1st chunk of wrapped case */
chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
cons.count);
chunk_len_sum = chunk_len;
chunk_off = cons.count;
smc_rmb_sync_sg_for_cpu(conn); for (chunk = 0; chunk < 2; chunk++) { if (!(flags & MSG_TRUNC)) { if (msg) {
rc = memcpy_to_msg(msg, rcvbuf_base +
chunk_off,
chunk_len);
} else {
rc = smc_rx_splice(pipe, rcvbuf_base +
chunk_off, chunk_len,
smc);
} if (rc < 0) { if (!read_done)
read_done = -EFAULT; goto out;
}
}
read_remaining -= chunk_len;
read_done += chunk_len; if (flags & MSG_PEEK)
peeked_bytes += chunk_len;
if (chunk_len_sum == copylen) break; /* either on 1st or 2nd iteration */ /* prepare next (== 2nd) iteration */
chunk_len = copylen - chunk_len; /* remainder */
chunk_len_sum += chunk_len;
chunk_off = 0; /* modulo offset in recv ring buffer */
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.