// SPDX-License-Identifier: GPL-2.0 /* * linux/net/sunrpc/xprtsock.c * * Client-side transport implementation for sockets. * * TCP callback races fixes (C) 1998 Red Hat * TCP send fixes (C) 1998 Red Hat * TCP NFS related read + write fixes * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> * * Rewrite of larges part of the code in order to stabilize TCP stuff. * Fix behaviour when socket buffer is full. * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> * * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> * * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005. * <gilles.quillard@bull.net>
*/
/* * We can register our own files under /proc/sys/sunrpc by * calling register_sysctl() again. The files in that * directory become the union of all files registered there. * * We simply need to make sure that we don't collide with * someone else's file names!
*/
/* * Wait duration for a reply from the RPC portmapper.
*/ #define XS_BIND_TO (60U * HZ)
/* * Delay if a UDP socket connect error occurs. This is most likely some * kind of resource problem on the local host.
*/ #define XS_UDP_REEST_TO (2U * HZ)
/* * The reestablish timeout allows clients to delay for a bit before attempting * to reconnect to a server that just dropped our connection. * * We implement an exponential backoff when trying to reestablish a TCP * transport connection with the server. Some servers like to drop a TCP * connection when they are overworked, so we start with a short timeout and * increase over time if the server is down or not responding.
*/ #define XS_TCP_INIT_REEST_TO (3U * HZ)
/* * TCP idle timeout; client drops the transport socket if it is idle * for this long. Note that we also timeout UDP sockets to prevent * holding port numbers when there is no RPC traffic.
*/ #define XS_IDLE_DISC_TO (5U * 60 * HZ)
for (i = 0; i < RPC_DISPLAY_MAX; i++) switch (i) { case RPC_DISPLAY_PROTO: case RPC_DISPLAY_NETID: continue; default:
kfree(xprt->address_strings[i]);
}
}
switch (content_type) { case 0: break; case TLS_RECORD_TYPE_DATA: /* TLS sets EOR at the end of each application data * record, even though there might be more frames * waiting to be decrypted.
*/
*msg_flags &= ~MSG_EOR; break; case TLS_RECORD_TYPE_ALERT:
tls_alert_recv(sock->sk, msg, &level, &description);
ret = (level == TLS_ALERT_LEVEL_FATAL) ?
-EACCES : -EAGAIN; break; default: /* discard this record type */
ret = -EAGAIN;
} return ret;
}
/* Is this transport associated with the backchannel? */ if (!xprt->bc_serv) return -ESHUTDOWN;
/* Look up and lock the request corresponding to the given XID */
req = xprt_lookup_bc_request(xprt, transport->recv.xid); if (!req) {
printk(KERN_WARNING "Callback slot table overflowed\n"); return -ESHUTDOWN;
} if (transport->recv.copied && !req->rq_private_buf.len) return -ESHUTDOWN;
ret = xs_read_stream_request(transport, msg, flags, req); if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_bc_request(req, transport->recv.copied); else
req->rq_private_buf.len = transport->recv.copied;
/* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->queue_lock);
req = xprt_lookup_rqst(xprt, transport->recv.xid); if (!req || (transport->recv.copied && !req->rq_private_buf.len)) {
msg->msg_flags |= MSG_TRUNC; goto out;
}
xprt_pin_rqst(req);
spin_unlock(&xprt->queue_lock);
ret = xs_read_stream_request(transport, msg, flags, req);
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state)) return; if (!xs_poll_socket_readable(transport)) return; if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
queue_work(xprtiod_workqueue, &transport->recv_worker);
}
/** * xs_nospace - handle transmit was incomplete * @req: pointer to RPC request * @transport: pointer to struct sock_xprt *
*/ staticint xs_nospace(struct rpc_rqst *req, struct sock_xprt *transport)
{ struct rpc_xprt *xprt = &transport->xprt; struct sock *sk = transport->inet; int ret = -EAGAIN;
trace_rpc_socket_nospace(req, transport);
/* Protect against races with write_space */
spin_lock(&xprt->transport_lock);
/* Don't race with disconnect */ if (xprt_connected(xprt)) { /* wait for more buffer space */
set_bit(XPRT_SOCK_NOSPACE, &transport->sock_state);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
sk->sk_write_pending++;
xprt_wait_for_buffer_space(xprt);
} else
ret = -ENOTCONN;
if (transport->xmit.offset != 0 &&
!test_bit(XPRT_CLOSE_WAIT, &xprt->state))
xprt_force_disconnect(xprt);
}
/* * Determine if the previous message in the stream was aborted before it * could complete transmission.
*/ staticbool
xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
{ return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
}
/* * Return the stream record marker field for a record of length < 2^31-1
*/ static rpc_fraghdr
xs_stream_record_marker(struct xdr_buf *xdr)
{ if (!xdr->len) return 0; return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len);
}
/** * xs_local_send_request - write an RPC request to an AF_LOCAL socket * @req: pointer to RPC request * * Return values: * 0: The request has been sent * EAGAIN: The socket was blocked, please call again later to * complete the request * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occurred, the request was not sent
*/ staticint xs_local_send_request(struct rpc_rqst *req)
{ struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf;
rpc_fraghdr rm = xs_stream_record_marker(xdr); unsignedint msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; struct msghdr msg = {
.msg_flags = XS_SENDMSG_FLAGS,
}; bool vm_wait; unsignedint sent; int status;
/* Close the stream if the previous transmission was incomplete */ if (xs_send_request_was_aborted(transport, req)) {
xprt_force_disconnect(xprt); return -ENOTCONN;
}
if (likely(sent > 0) || status == 0) {
transport->xmit.offset += sent;
req->rq_bytes_sent = transport->xmit.offset; if (likely(req->rq_bytes_sent >= msglen)) {
req->rq_xmit_bytes_sent += transport->xmit.offset;
transport->xmit.offset = 0; return 0;
}
status = -EAGAIN;
vm_wait = false;
}
switch (status) { case -EAGAIN:
status = xs_stream_nospace(req, vm_wait); break; default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
fallthrough; case -EPIPE:
xprt_force_disconnect(xprt);
status = -ENOTCONN;
}
return status;
}
/** * xs_udp_send_request - write an RPC request to a UDP socket * @req: pointer to RPC request * * Return values: * 0: The request has been sent * EAGAIN: The socket was blocked, please call again later to * complete the request * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occurred, the request was not sent
*/ staticint xs_udp_send_request(struct rpc_rqst *req)
{ struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; struct msghdr msg = {
.msg_name = xs_addr(xprt),
.msg_namelen = xprt->addrlen,
.msg_flags = XS_SENDMSG_FLAGS,
}; unsignedint sent; int status;
/* firewall is blocking us, don't return -EAGAIN or we end up looping */ if (status == -EPERM) goto process_status;
if (status == -EAGAIN && sock_writeable(transport->inet))
status = -ENOBUFS;
if (sent > 0 || status == 0) {
req->rq_xmit_bytes_sent += sent; if (sent >= req->rq_slen) return 0; /* Still some bytes left; set up for a retry later. */
status = -EAGAIN;
}
process_status: switch (status) { case -ENOTSOCK:
status = -ENOTCONN; /* Should we call xs_close() here? */ break; case -EAGAIN:
status = xs_sock_nospace(req); break; case -ENETUNREACH: case -ENOBUFS: case -EPIPE: case -ECONNREFUSED: case -EPERM: /* When the server has died, an ICMP port unreachable message
* prompts ECONNREFUSED. */ break; default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
}
return status;
}
/** * xs_tcp_send_request - write an RPC request to a TCP socket * @req: pointer to RPC request * * Return values: * 0: The request has been sent * EAGAIN: The socket was blocked, please call again later to * complete the request * ENOTCONN: Caller needs to invoke connect logic then call again * other: Some other error occurred, the request was not sent * * XXX: In the case of soft timeouts, should we eventually give up * if sendmsg is not able to make progress?
*/ staticint xs_tcp_send_request(struct rpc_rqst *req)
{ struct rpc_xprt *xprt = req->rq_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf;
rpc_fraghdr rm = xs_stream_record_marker(xdr); unsignedint msglen = rm ? req->rq_slen + sizeof(rm) : req->rq_slen; struct msghdr msg = {
.msg_flags = XS_SENDMSG_FLAGS,
}; bool vm_wait; unsignedint sent; int status;
/* Close the stream if the previous transmission was incomplete */ if (xs_send_request_was_aborted(transport, req)) { if (transport->sock != NULL)
kernel_sock_shutdown(transport->sock, SHUT_RDWR); return -ENOTCONN;
} if (!transport->inet) return -ENOTCONN;
if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
xs_tcp_set_socket_timeouts(xprt, transport->sock);
xs_set_srcport(transport, transport->sock);
/* Continue transmitting the packet/record. We must be careful * to cope with writespace callbacks arriving _after_ we have
* called sendmsg(). */
req->rq_xtime = ktime_get();
tcp_sock_set_cork(transport->inet, true);
/* If we've sent the entire packet, immediately
* reset the count of bytes sent. */
transport->xmit.offset += sent;
req->rq_bytes_sent = transport->xmit.offset; if (likely(req->rq_bytes_sent >= msglen)) {
req->rq_xmit_bytes_sent += transport->xmit.offset;
transport->xmit.offset = 0; if (atomic_long_read(&xprt->xmit_queuelen) == 1)
tcp_sock_set_cork(transport->inet, false); return 0;
}
WARN_ON_ONCE(sent == 0 && status == 0);
if (sent > 0)
vm_wait = false;
} while (status == 0);
switch (status) { case -ENOTSOCK:
status = -ENOTCONN; /* Should we call xs_close() here? */ break; case -EAGAIN:
status = xs_stream_nospace(req, vm_wait); break; case -ECONNRESET: case -ECONNREFUSED: case -ENOTCONN: case -EADDRINUSE: case -ENOBUFS: case -EPIPE: break; default:
dprintk("RPC: sendmsg returned unrecognized error %d\n",
-status);
}
/** * xs_error_report - callback to handle TCP socket state errors * @sk: socket * * Note: we don't call sock_error() since there may be a rpc_task * using the socket, and so we don't want to clear sk->sk_err.
*/ staticvoid xs_error_report(struct sock *sk)
{ struct sock_xprt *transport; struct rpc_xprt *xprt;
if (sk == NULL) return; /* * Make sure we're calling this in a context from which it is safe * to call __fput_sync(). In practice that means rpciod and the * system workqueue.
*/ if (!(current->flags & PF_WQ_WORKER)) {
WARN_ON_ONCE(1);
set_bit(XPRT_CLOSE_WAIT, &xprt->state); return;
}
if (atomic_read(&transport->xprt.swapper))
sk_clear_memalloc(sk);
/** * xs_close - close a socket * @xprt: transport * * This is used when all requests are complete; ie, no DRC state remains * on the server we want to save. * * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with * xs_reset_transport() zeroing the socket from underneath a writer.
*/ staticvoid xs_close(struct rpc_xprt *xprt)
{ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
dprintk("RPC: xs_close xprt %p\n", xprt);
if (transport->sock)
tls_handshake_close(transport->sock);
xs_reset_transport(transport);
xprt->reestablish_timeout = 0;
}
staticvoid xs_inject_disconnect(struct rpc_xprt *xprt)
{
dprintk("RPC: injecting transport disconnect on xprt=%p\n",
xprt);
xprt_disconnect_done(xprt);
}
/* Copy the XID from the skb... */
xp = skb_header_pointer(skb, 0, sizeof(_xid), &_xid); if (xp == NULL) return;
/* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->queue_lock);
rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock;
xprt_pin_rqst(rovr);
xprt_update_rtt(rovr->rq_task);
spin_unlock(&xprt->queue_lock);
task = rovr->rq_task;
if ((copied = rovr->rq_private_buf.buflen) > repsize)
copied = repsize;
/* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
spin_lock(&xprt->queue_lock);
__UDPX_INC_STATS(sk, UDP_MIB_INERRORS); goto out_unpin;
}
if (test_bit(XPRT_SOCK_IGNORE_RECV, &transport->sock_state)) return;
/* Any data means we had a useful conversation, so * then we don't need to delay the next reconnect
*/ if (xprt->reestablish_timeout)
xprt->reestablish_timeout = 0; if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
queue_work(xprtiod_workqueue, &transport->recv_worker);
}
}
/* * Helper function to force a TCP close if the server is sending * junk and/or it has put us in CLOSE_WAIT
*/ staticvoid xs_tcp_force_close(struct rpc_xprt *xprt)
{
xprt_force_disconnect(xprt);
}
/** * xs_local_state_change - callback to handle AF_LOCAL socket state changes * @sk: socket whose state has changed *
*/ staticvoid xs_local_state_change(struct sock *sk)
{ struct rpc_xprt *xprt; struct sock_xprt *transport;
if (!(xprt = xprt_from_sock(sk))) return;
transport = container_of(xprt, struct sock_xprt, xprt); if (sk->sk_shutdown & SHUTDOWN_MASK) {
clear_bit(XPRT_CONNECTED, &xprt->state); /* Trigger the socket release */
xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
}
}
/** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed *
*/ staticvoid xs_tcp_state_change(struct sock *sk)
{ struct rpc_xprt *xprt; struct sock_xprt *transport;
if (!(xprt = xprt_from_sock(sk))) return;
dprintk("RPC: xs_tcp_state_change client %p...\n", xprt);
dprintk("RPC: state %x conn %d dead %d zapped %d sk_shutdown %d\n",
sk->sk_state, xprt_connected(xprt),
sock_flag(sk, SOCK_DEAD),
sock_flag(sk, SOCK_ZAPPED),
sk->sk_shutdown);
transport = container_of(xprt, struct sock_xprt, xprt);
trace_rpc_socket_state_change(xprt, sk->sk_socket); switch (sk->sk_state) { case TCP_ESTABLISHED: if (!xprt_test_and_set_connected(xprt)) {
xprt->connect_cookie++;
clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
xprt_clear_connecting(xprt);
xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start;
xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
} break; case TCP_FIN_WAIT1: /* The client initiated a shutdown of the socket */
xprt->connect_cookie++;
xprt->reestablish_timeout = 0;
set_bit(XPRT_CLOSING, &xprt->state);
smp_mb__before_atomic();
clear_bit(XPRT_CONNECTED, &xprt->state);
clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
smp_mb__after_atomic(); break; case TCP_CLOSE_WAIT: /* The server initiated a shutdown of the socket */
xprt->connect_cookie++;
clear_bit(XPRT_CONNECTED, &xprt->state);
xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
fallthrough; case TCP_CLOSING: /* * If the server closed down the connection, make sure that * we back off before reconnecting
*/ if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; break; case TCP_LAST_ACK:
set_bit(XPRT_CLOSING, &xprt->state);
smp_mb__before_atomic();
clear_bit(XPRT_CONNECTED, &xprt->state);
smp_mb__after_atomic(); break; case TCP_CLOSE: if (test_and_clear_bit(XPRT_SOCK_CONNECTING,
&transport->sock_state)) {
xs_reset_srcport(transport);
xprt_clear_connecting(xprt);
}
clear_bit(XPRT_CLOSING, &xprt->state); /* Trigger the socket release */
xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
}
}
if (!sk->sk_socket) return;
clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
if (unlikely(!(xprt = xprt_from_sock(sk)))) return;
transport = container_of(xprt, struct sock_xprt, xprt); if (!test_and_clear_bit(XPRT_SOCK_NOSPACE, &transport->sock_state)) return;
xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
sk->sk_write_pending--;
}
/** * xs_udp_write_space - callback invoked when socket buffer space * becomes available * @sk: socket whose state has changed * * Called when more output buffer space is available for this socket. * We try not to wake our writers until they can make "significant" * progress, otherwise we'll waste resources thrashing kernel_sendmsg * with a bunch of small requests.
*/ staticvoid xs_udp_write_space(struct sock *sk)
{ /* from net/core/sock.c:sock_def_write_space */ if (sock_writeable(sk))
xs_write_space(sk);
}
/** * xs_tcp_write_space - callback invoked when socket buffer space * becomes available * @sk: socket whose state has changed * * Called when more output buffer space is available for this socket. * We try not to wake our writers until they can make "significant" * progress, otherwise we'll waste resources thrashing kernel_sendmsg * with a bunch of small requests.
*/ staticvoid xs_tcp_write_space(struct sock *sk)
{ /* from net/core/stream.c:sk_stream_write_space */ if (sk_stream_is_writeable(sk))
xs_write_space(sk);
}
/** * xs_udp_set_buffer_size - set send and receive limits * @xprt: generic transport * @sndsize: requested size of send buffer, in bytes * @rcvsize: requested size of receive buffer, in bytes * * Set socket send and receive buffer size limits.
*/ staticvoid xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
{ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
transport->sndsize = 0; if (sndsize)
transport->sndsize = sndsize + 1024;
transport->rcvsize = 0; if (rcvsize)
transport->rcvsize = rcvsize + 1024;
xs_udp_do_set_buffer_size(xprt);
}
/** * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport * @xprt: controlling transport * @task: task that timed out * * Adjust the congestion window after a retransmit timeout has occurred.
*/ staticvoid xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
{
spin_lock(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
spin_unlock(&xprt->transport_lock);
}
staticint xs_get_random_port(void)
{ unsignedshort min = xprt_min_resvport, max = xprt_max_resvport; unsignedshort range; unsignedshort rand;
if (max < min) return -EADDRINUSE;
range = max - min + 1;
rand = get_random_u32_below(range); return rand + min;
}
if (kernel_getsockname(sock, (struct sockaddr *)&buf) < 0) goto out; switch (buf.ss_family) { case AF_INET6:
port = ntohs(((struct sockaddr_in6 *)&buf)->sin6_port); break; case AF_INET:
port = ntohs(((struct sockaddr_in *)&buf)->sin_port);
}
out: return port;
}
/** * xs_set_port - reset the port number in the remote endpoint address * @xprt: generic transport * @port: new port number *
*/ staticvoid xs_set_port(struct rpc_xprt *xprt, unsignedshort port)
{
dprintk("RPC: setting port for xprt %p to %u\n", xprt, port);
mutex_lock(&sock->recv_mutex); if (sock->sock) {
ret = kernel_getsockname(sock->sock, &saddr.sa); if (ret >= 0)
ret = snprintf(buf, buflen, "%pISc", &saddr.sa);
}
mutex_unlock(&sock->recv_mutex); return ret;
}
staticunsignedshort xs_next_srcport(struct sock_xprt *transport, unsignedshort port)
{ if (transport->srcport != 0)
transport->srcport = 0; if (!transport->xprt.resvport) return 0; if (port <= xprt_min_resvport || port > xprt_max_resvport) return xprt_max_resvport; return --port;
} staticint xs_bind(struct sock_xprt *transport, struct socket *sock)
{ struct sockaddr_storage myaddr; int err, nloop = 0; int port = xs_get_srcport(transport); unsignedshort last;
/* * If we are asking for any ephemeral port (i.e. port == 0 && * transport->xprt.resvport == 0), don't bind. Let the local * port selection happen implicitly when the socket is used * (for example at connect time). * * This ensures that we can continue to establish TCP * connections even when all local ephemeral ports are already * a part of some TCP connection. This makes no difference * for UDP sockets, but also doesn't harm them. * * If we're asking for any reserved port (i.e. port == 0 && * transport->xprt.resvport == 1) xs_get_srcport above will * ensure that port is non-zero and we will bind as needed.
*/ if (port <= 0) return port;
memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); do {
rpc_set_port((struct sockaddr *)&myaddr, port);
err = kernel_bind(sock, (struct sockaddr *)&myaddr,
transport->xprt.addrlen); if (err == 0) { if (transport->xprt.reuseport)
transport->srcport = port; break;
}
last = port;
port = xs_next_srcport(transport, port); if (port > last)
nloop++;
} while (err == -EADDRINUSE && nloop != 2);
if (RPC_IS_ASYNC(task)) { /* * We want the AF_LOCAL connect to be resolved in the * filesystem namespace of the process making the rpc * call. Thus we connect synchronously. * * If we want to support asynchronous AF_LOCAL calls, * we'll need to figure out how to pass a namespace to * connect.
*/
rpc_task_set_rpc_status(task, -ENOTCONN); goto out_wake;
}
ret = xs_local_setup_socket(transport); if (ret && !RPC_IS_SOFTCONN(task))
msleep_interruptible(15000); return;
force_disconnect:
xprt_force_disconnect(xprt);
out_wake:
xprt_clear_connecting(xprt);
xprt_wake_pending_tasks(xprt, -ENOTCONN);
}
#if IS_ENABLED(CONFIG_SUNRPC_SWAP) /* * Note that this should be called with XPRT_LOCKED held, or recv_mutex * held, or when we otherwise know that we have exclusive access to the * socket, to guard against races with xs_reset_transport.
*/ staticvoid xs_set_memalloc(struct rpc_xprt *xprt)
{ struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
xprt);
/* * If there's no sock, then we have nothing to set. The * reconnecting process will get it for us.
*/ if (!transport->inet) return; if (atomic_read(&xprt->swapper))
sk_set_memalloc(transport->inet);
}
/** * xs_enable_swap - Tag this transport as being used for swap. * @xprt: transport to tag * * Take a reference to this transport on behalf of the rpc_clnt, and * optionally mark it for swapping if it wasn't already.
*/ staticint
xs_enable_swap(struct rpc_xprt *xprt)
{ struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
/** * xs_disable_swap - Untag this transport as being used for swap. * @xprt: transport to tag * * Drop a "swapper" reference to this xprt on behalf of the rpc_clnt. If the * swapper refcount goes to 0, untag the socket as a memalloc socket.
*/ staticvoid
xs_disable_swap(struct rpc_xprt *xprt)
{ struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
/** * xs_tcp_shutdown - gracefully shut down a TCP socket * @xprt: transport * * Initiates a graceful shutdown of the TCP socket by calling the * equivalent of shutdown(SHUT_RDWR);
*/ staticvoid xs_tcp_shutdown(struct rpc_xprt *xprt)
{ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct socket *sock = transport->sock; int skst = transport->inet ? transport->inet->sk_state : TCP_CLOSE;
if (sock == NULL) return; if (!xprt->reuseport) {
xs_close(xprt); return;
} switch (skst) { case TCP_FIN_WAIT1: case TCP_FIN_WAIT2: case TCP_LAST_ACK: break; case TCP_ESTABLISHED: case TCP_CLOSE_WAIT:
kernel_sock_shutdown(sock, SHUT_RDWR);
trace_rpc_socket_shutdown(xprt, sock); break; default:
xs_reset_transport(transport);
}
}
if (!transport->inet) { struct sock *sk = sock->sk;
/* Avoid temporary address, they are bad for long-lived * connections such as NFS mounts. * RFC4941, section 3.6 suggests that: * Individual applications, which have specific * knowledge about the normal duration of connections, * MAY override this as appropriate.
*/ if (xs_addr(xprt)->sa_family == PF_INET6) {
ip6_sock_set_addr_preferences(sk,
IPV6_PREFER_SRC_PUBLIC);
}
/* Reset to new socket */
transport->sock = sock;
transport->inet = sk;
release_sock(sk);
}
if (!xprt_bound(xprt)) return -ENOTCONN;
xs_set_memalloc(xprt);
xs_stream_start_connect(transport);
/* Tell the socket layer to start connecting... */
set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
}
/** * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint * @work: queued work item * * Invoked by a work queue tasklet.
*/ staticvoid xs_tcp_setup_socket(struct work_struct *work)
{ struct sock_xprt *transport =
container_of(work, struct sock_xprt, connect_worker.work); struct socket *sock = transport->sock; struct rpc_xprt *xprt = &transport->xprt; int status; unsignedint pflags = current->flags;
if (atomic_read(&xprt->swapper))
current->flags |= PF_MEMALLOC;
if (xprt_connected(xprt)) goto out; if (test_and_clear_bit(XPRT_SOCK_CONNECT_SENT,
&transport->sock_state) ||
!sock) {
xs_reset_transport(transport);
sock = xs_create_sock(xprt, transport, xs_addr(xprt)->sa_family,
SOCK_STREAM, IPPROTO_TCP, true); if (IS_ERR(sock)) {
xprt_wake_pending_tasks(xprt, PTR_ERR(sock)); goto out;
}
}
dprintk("RPC: worker connecting xprt %p via %s to " "%s (port %s)\n", xprt,
xprt->address_strings[RPC_DISPLAY_PROTO],
xprt->address_strings[RPC_DISPLAY_ADDR],
xprt->address_strings[RPC_DISPLAY_PORT]);
status = xs_tcp_finish_connecting(xprt, sock);
trace_rpc_socket_connect(xprt, sock, status);
dprintk("RPC: %p connect status %d connected %d sock state %d\n",
xprt, -status, xprt_connected(xprt),
sock->sk->sk_state); switch (status) { case 0: case -EINPROGRESS: /* SYN_SENT! */
set_bit(XPRT_SOCK_CONNECT_SENT, &transport->sock_state); if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
fallthrough; case -EALREADY: goto out_unlock; case -EADDRNOTAVAIL: /* Source port number is unavailable. Try a new one! */
transport->srcport = 0;
status = -EAGAIN; break; case -EPERM: /* Happens, for instance, if a BPF program is preventing * the connect. Remap the error so upper layers can better * deal with it.
*/
status = -ECONNREFUSED;
fallthrough; case -EINVAL: /* Happens, for instance, if the user specified a link * local IPv6 address without a scope-id.
*/ case -ECONNREFUSED: case -ECONNRESET: case -ENETDOWN: case -ENETUNREACH: case -EHOSTUNREACH: case -EADDRINUSE: case -ENOBUFS: case -ENOTCONN: break; default:
printk("%s: connect returned unhandled error %d\n",
__func__, status);
status = -EAGAIN;
}
/* xs_tcp_force_close() wakes tasks with a fixed error code. * We need to wake them first to ensure the correct error code.
*/
--> --------------------
--> maximum size reached
--> --------------------
Messung V0.5
¤ Diese beiden folgenden Angebotsgruppen bietet das Unternehmen0.20Angebot
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.