// SPDX-License-Identifier: GPL-2.0-or-later /* RxRPC recvmsg() implementation * * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com)
*/
/* * Post a call for attention by the socket or kernel service. Further * notifications are suppressed by putting recvmsg_link on a dummy queue.
*/ void rxrpc_notify_socket(struct rxrpc_call *call)
{ struct rxrpc_sock *rx; struct sock *sk;
_enter("%d", call->debug_id);
if (!list_empty(&call->recvmsg_link)) return; if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
rxrpc_see_call(call, rxrpc_call_see_notify_released); return;
}
rcu_read_lock();
rx = rcu_dereference(call->socket);
sk = &rx->sk; if (rx && sk->sk_state < RXRPC_CLOSE) { if (call->notify_rx) {
spin_lock_irq(&call->notify_lock);
call->notify_rx(sk, call, call->user_call_ID);
spin_unlock_irq(&call->notify_lock);
} else {
spin_lock_irq(&rx->recvmsg_lock); if (list_empty(&call->recvmsg_link)) {
rxrpc_get_call(call, rxrpc_call_get_notify_socket);
list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
}
spin_unlock_irq(&rx->recvmsg_lock);
sp = rxrpc_skb(skb);
tseq = sp->hdr.seq;
serial = sp->hdr.serial;
last = sp->hdr.flags & RXRPC_LAST_PACKET;
/* Barrier against rxrpc_input_data(). */ if (after(tseq, call->rx_consumed))
smp_store_release(&call->rx_consumed, tseq);
rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
serial, call->rx_consumed);
if (last)
set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
/* Check to see if there's an ACK that needs sending. */
acked = atomic_add_return(call->rx_consumed - old_consumed,
&call->ackr_nr_consumed); if (acked > 8 &&
!test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
rxrpc_poke_call(call, rxrpc_call_poke_idle);
}
/* * Decrypt and verify a DATA packet.
*/ staticint rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
{ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
if (sp->flags & RXRPC_RX_VERIFIED) return 0; return call->security->verify_packet(call, skb);
}
/* * Transcribe a call's user ID to a control message.
*/ staticint rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg, int flags)
{ if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) return 0;
if (flags & MSG_CMSG_COMPAT) { unsignedint id32 = call->user_call_ID;
/* * Process OOB packets. Called with the socket locked.
*/ staticint rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg, unsignedint flags)
{ struct rxrpc_sock *rx = rxrpc_sk(sock->sk); struct sk_buff *skb; bool need_response = false; int ret;
skb = skb_peek(&rx->recvmsg_oobq); if (!skb) return -EAGAIN;
rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64),
&skb->skb_mstamp_ns); if (ret < 0) return ret;
switch ((enum rxrpc_oob_type)skb->mark) { case RXRPC_OOB_CHALLENGE:
need_response = true;
ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags); break; default:
WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n",
skb->mark);
ret = -EIO; break;
}
if (!(flags & MSG_PEEK))
skb_unlink(skb, &rx->recvmsg_oobq); if (need_response)
rxrpc_add_pending_oob(rx, skb); else
rxrpc_free_skb(skb, rxrpc_skb_put_oob); return ret;
}
/* * Deliver messages to a call. This keeps processing packets until the buffer * is filled and we find either more DATA (returns 0) or the end of the DATA * (returns 1). If more packets are required, it returns -EAGAIN and if the * call has failed it returns -EIO.
*/ staticint rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, struct msghdr *msg, struct iov_iter *iter,
size_t len, int flags, size_t *_offset)
{ struct rxrpc_skb_priv *sp; struct rxrpc_sock *rx = rxrpc_sk(sock->sk); struct sk_buff *skb;
rxrpc_seq_t seq = 0;
size_t remain; unsignedint rx_pkt_offset, rx_pkt_len; int copy, ret = -EAGAIN, ret2;
if (rxrpc_call_has_failed(call)) {
seq = call->ackr_window - 1;
ret = -EIO; goto done;
}
if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
seq = call->ackr_window - 1;
ret = 1; goto done;
}
/* No one else can be removing stuff from the queue, so we shouldn't * need the Rx lock to walk it.
*/
skb = skb_peek(&call->recvmsg_queue); while (skb) {
rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
sp = rxrpc_skb(skb);
seq = sp->hdr.seq;
if (!(flags & MSG_PEEK))
trace_rxrpc_receive(call, rxrpc_receive_front,
sp->hdr.serial, seq);
/* We have to handle short, empty and used-up DATA packets. */
remain = len - *_offset;
copy = rx_pkt_len; if (copy > remain)
copy = remain; if (copy > 0) {
ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
copy); if (ret2 < 0) {
ret = ret2; goto out;
}
/* handle piecemeal consumption of data packets */
rx_pkt_offset += copy;
rx_pkt_len -= copy;
*_offset += copy;
}
if (rx_pkt_len > 0) {
trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
rx_pkt_offset, rx_pkt_len, 0);
ASSERTCMP(*_offset, ==, len);
ret = 0; break;
}
/* The whole packet has been transferred. */ if (sp->hdr.flags & RXRPC_LAST_PACKET)
ret = 1;
rx_pkt_offset = 0;
rx_pkt_len = 0;
skb = skb_peek_next(skb, &call->recvmsg_queue);
if (!(flags & MSG_PEEK))
rxrpc_rotate_rx_window(call);
/* * Receive a message from an RxRPC socket * - we need to be careful about two or more threads calling recvmsg * simultaneously
*/ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, int flags)
{ struct rxrpc_call *call; struct rxrpc_sock *rx = rxrpc_sk(sock->sk); struct list_head *l; unsignedint call_debug_id = 0;
size_t copied = 0; long timeo; int ret;
DEFINE_WAIT(wait);
trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
if (flags & (MSG_OOB | MSG_TRUNC)) return -EOPNOTSUPP;
/* Return immediately if a client socket has no outstanding calls */ if (RB_EMPTY_ROOT(&rx->calls) &&
list_empty(&rx->recvmsg_q) &&
skb_queue_empty_lockless(&rx->recvmsg_oobq) &&
rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
release_sock(&rx->sk); return -EAGAIN;
}
if (list_empty(&rx->recvmsg_q)) {
ret = -EWOULDBLOCK; if (timeo == 0) {
call = NULL; goto error_no_call;
}
release_sock(&rx->sk);
/* Wait for something to happen */
prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
TASK_INTERRUPTIBLE);
ret = sock_error(&rx->sk); if (ret) goto wait_error;
/* Deal with OOB messages before we consider getting normal data. */ if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
ret = rxrpc_recvmsg_oob(sock, msg, flags);
release_sock(&rx->sk); if (ret == -EAGAIN) goto try_again; goto error_no_call;
}
/* Find the next call and dequeue it if we're not just peeking. If we * do dequeue it, that comes with a ref that we will need to release. * We also want to weed out calls that got requeued whilst we were * shovelling data out.
*/
spin_lock_irq(&rx->recvmsg_lock);
l = rx->recvmsg_q.next;
call = list_entry(l, struct rxrpc_call, recvmsg_link);
/* We're going to drop the socket lock, so we need to lock the call * against interference by sendmsg.
*/ if (!mutex_trylock(&call->user_mutex)) {
ret = -EWOULDBLOCK; if (flags & MSG_DONTWAIT) goto error_requeue_call;
ret = -ERESTARTSYS; if (mutex_lock_interruptible(&call->user_mutex) < 0) goto error_requeue_call;
}
release_sock(&rx->sk);
if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
rxrpc_see_call(call, rxrpc_call_see_already_released);
mutex_unlock(&call->user_mutex); if (!(flags & MSG_PEEK))
rxrpc_put_call(call, rxrpc_call_put_recvmsg); goto try_again;
}
ret = rxrpc_recvmsg_user_id(call, msg, flags); if (ret < 0) goto error_unlock_call;
if (msg->msg_name && call->peer) {
size_t len = sizeof(call->dest_srx);
/** * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info * @sock: The socket that the call exists on * @call: The call to send data through * @iter: The buffer to receive into * @_len: The amount of data we want to receive (decreased on return) * @want_more: True if more data is expected to be read * @_abort: Where the abort code is stored if -ECONNABORTED is returned * @_service: Where to store the actual service ID (may be upgraded) * * Allow a kernel service to receive data and pick up information about the * state of a call. Note that *@_abort should also be initialised to %0. * * Note that we may return %-EAGAIN to drain empty packets at the end * of the data, even if we've already copied over the requested data. * * Return: %0 if got what was asked for and there's more available, %1 * if we got what was asked for and we're at the end of the data and * %-EAGAIN if we need more data.
*/ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, struct iov_iter *iter, size_t *_len, bool want_more, u32 *_abort, u16 *_service)
{
size_t offset = 0; int ret;
ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
*_len -= offset; if (ret == -EIO) goto call_failed; if (ret < 0) goto out;
/* We can only reach here with a partially full buffer if we have * reached the end of the data. We must otherwise have a full buffer * or have been given -EAGAIN.
*/ if (ret == 1) { if (iov_iter_count(iter) > 0) goto short_data; if (!want_more) goto read_phase_complete;
ret = 0; goto out;
}
if (!want_more) goto excess_data; goto out;
read_phase_complete:
ret = 1;
out: if (_service)
*_service = call->dest_srx.srx_service;
mutex_unlock(&call->user_mutex);
_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); return ret;
short_data:
trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
call->cid, call->call_id, call->rx_consumed,
0, -EBADMSG);
ret = -EBADMSG; goto out;
excess_data:
trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
call->cid, call->call_id, call->rx_consumed,
0, -EMSGSIZE);
ret = -EMSGSIZE; goto out;
call_failed:
*_abort = call->abort_code;
ret = call->error; if (call->completion == RXRPC_CALL_SUCCEEDED) {
ret = 1; if (iov_iter_count(iter) > 0)
ret = -ECONNRESET;
} goto out;
}
EXPORT_SYMBOL(rxrpc_kernel_recv_data);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.