if (!psock->tx_kcm) { /* Take off psocks_avail list */
list_del(&psock->psock_avail_list);
} elseif (wakeup_kcm) { /* In this case psock is being aborted while outside of * write_msgs and psock is reserved. Schedule tx_work * to handle the failure there. Need to commit tx_stopped * before queuing work.
*/
smp_mb();
queue_work(kcm_wq, &psock->tx_kcm->tx_work);
}
spin_unlock_bh(&mux->lock);
/* Report error on lower socket */
report_csk_error(csk, err);
}
/* KCM is ready to receive messages on its queue-- either the KCM is new or * has become unblocked after being blocked on full socket buffer. Queue any * pending ready messages on a psock. RX mux lock held.
*/ staticvoid kcm_rcv_ready(struct kcm_sock *kcm)
{ struct kcm_mux *mux = kcm->mux; struct kcm_psock *psock; struct sk_buff *skb;
if (unlikely(kcm->rx_wait || kcm->rx_psock || kcm->rx_disabled)) return;
while (unlikely((skb = __skb_dequeue(&mux->rx_hold_queue)))) { if (kcm_queue_rcv_skb(&kcm->sk, skb)) { /* Assuming buffer limit has been reached */
skb_queue_head(&mux->rx_hold_queue, skb);
WARN_ON(!sk_rmem_alloc_get(&kcm->sk)); return;
}
}
while (!list_empty(&mux->psocks_ready)) {
psock = list_first_entry(&mux->psocks_ready, struct kcm_psock,
psock_ready_list);
if (kcm_queue_rcv_skb(&kcm->sk, psock->ready_rx_msg)) { /* Assuming buffer limit has been reached */
WARN_ON(!sk_rmem_alloc_get(&kcm->sk)); return;
}
/* Consumed the ready message on the psock. Schedule rx_work to * get more messages.
*/
list_del(&psock->psock_ready_list);
psock->ready_rx_msg = NULL; /* Commit clearing of ready_rx_msg for queuing work */
smp_mb();
/* Buffer limit is okay now, add to ready list */
list_add_tail(&kcm->wait_rx_list,
&kcm->mux->kcm_rx_waiters); /* paired with lockless reads in kcm_rfree() */
WRITE_ONCE(kcm->rx_wait, true);
}
if (!sock_flag(sk, SOCK_DEAD))
sk->sk_data_ready(sk);
return 0;
}
/* Requeue received messages for a kcm socket to other kcm sockets. This is * called with a kcm socket is receive disabled. * RX mux lock held.
*/ staticvoid requeue_rx_msgs(struct kcm_mux *mux, struct sk_buff_head *head)
{ struct sk_buff *skb; struct kcm_sock *kcm;
while ((skb = skb_dequeue(head))) { /* Reset destructor to avoid calling kcm_rcv_ready */
skb->destructor = sock_rfree;
skb_orphan(skb);
try_again: if (list_empty(&mux->kcm_rx_waiters)) {
skb_queue_tail(&mux->rx_hold_queue, skb); continue;
}
if (kcm_queue_rcv_skb(&kcm->sk, skb)) { /* Should mean socket buffer full */
list_del(&kcm->wait_rx_list); /* paired with lockless reads in kcm_rfree() */
WRITE_ONCE(kcm->rx_wait, false);
/* Commit rx_wait to read in kcm_free */
smp_wmb();
psock->rx_kcm = NULL; /* paired with lockless reads in kcm_rfree() */
WRITE_ONCE(kcm->rx_psock, NULL);
/* Commit kcm->rx_psock before sk_rmem_alloc_get to sync with * kcm_rfree
*/
smp_mb();
if (unlikely(kcm->done)) {
spin_unlock_bh(&mux->rx_lock);
/* Need to run kcm_done in a task since we need to qcquire * callback locks which may already be held here.
*/
INIT_WORK(&kcm->done_work, kcm_done_work);
schedule_work(&kcm->done_work); return;
}
if (unlikely(kcm->rx_disabled)) {
requeue_rx_msgs(mux, &kcm->sk.sk_receive_queue);
} elseif (rcv_ready || unlikely(!sk_rmem_alloc_get(&kcm->sk))) { /* Check for degenerative race with rx_wait that all * data was dequeued (accounted for in kcm_rfree).
*/
kcm_rcv_ready(kcm);
}
spin_unlock_bh(&mux->rx_lock);
}
staticvoid psock_state_change(struct sock *sk)
{ /* TCP only does a EPOLLIN for a half close. Do a EPOLLHUP here * since application will normally not poll with EPOLLIN * on the TCP sockets.
*/
smp_rmb(); /* Must read tx_psock before tx_wait */
if (psock) {
WARN_ON(kcm->tx_wait); if (unlikely(psock->tx_stopped))
unreserve_psock(kcm); else return kcm->tx_psock;
}
spin_lock_bh(&mux->lock);
/* Check again under lock to see if psock was reserved for this * psock via psock_unreserve.
*/
psock = kcm->tx_psock; if (unlikely(psock)) {
WARN_ON(kcm->tx_wait);
spin_unlock_bh(&mux->lock); return kcm->tx_psock;
}
/* Write any messages ready on the kcm socket. Called with kcm sock lock * held. Return bytes actually sent or error.
*/ staticint kcm_write_msgs(struct kcm_sock *kcm)
{ unsignedint total_sent = 0; struct sock *sk = &kcm->sk; struct kcm_psock *psock; struct sk_buff *head; int ret = 0;
kcm->tx_wait_more = false;
psock = kcm->tx_psock; if (unlikely(psock && psock->tx_stopped)) { /* A reserved psock was aborted asynchronously. Unreserve * it and we'll retry the message.
*/
unreserve_psock(kcm);
kcm_report_tx_retry(kcm); if (skb_queue_empty(&sk->sk_write_queue)) return 0;
do {
ret = sock_sendmsg(psock->sk->sk_socket, &msg); if (ret <= 0) { if (ret == -EAGAIN) { /* Save state to try again when there's * write space on the socket
*/
txm->frag_skb = skb;
ret = 0; goto out;
}
/* Hard failure in sending message, abort this * psock since it has lost framing * synchronization and retry sending the * message from the beginning.
*/
kcm_abort_tx_psock(psock, ret ? -ret : EPIPE, true);
unreserve_psock(kcm);
psock = NULL;
txm->started_tx = false;
kcm_report_tx_retry(kcm);
ret = 0; goto retry;
}
/* Successfully sent the whole packet, account for it. */
sk->sk_wmem_queued -= txm->sent;
total_sent += txm->sent;
skb_dequeue(&sk->sk_write_queue);
kfree_skb(head);
KCM_STATS_INCR(psock->stats.tx_msgs);
}
out: if (!head) { /* Done with all queued messages. */
WARN_ON(!skb_queue_empty(&sk->sk_write_queue)); if (psock)
unreserve_psock(kcm);
}
/* Check if write space is available */
sk->sk_write_space(sk);
/* Per tcp_sendmsg this should be in poll */
sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
if (sk->sk_err) goto out_error;
if (kcm->seq_skb) { /* Previously opened message */
head = kcm->seq_skb;
skb = kcm_tx_msg(head)->last_skb; goto start;
}
/* Call the sk_stream functions to manage the sndbuf mem. */ if (!sk_stream_memory_free(sk)) {
kcm_push(kcm);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
err = sk_stream_wait_memory(sk, &timeo); if (err) goto out_error;
}
if (msg_data_left(msg)) { /* New message, alloc head skb */
head = alloc_skb(0, sk->sk_allocation); while (!head) {
kcm_push(kcm);
err = sk_stream_wait_memory(sk, &timeo); if (err) goto out_error;
head = alloc_skb(0, sk->sk_allocation);
}
skb = head;
/* Set ip_summed to CHECKSUM_UNNECESSARY to avoid calling * csum_and_copy_from_iter from skb_do_copy_data_nocache.
*/
skb->ip_summed = CHECKSUM_UNNECESSARY;
}
start: while (msg_data_left(msg)) { bool merge = true; int i = skb_shinfo(skb)->nr_frags; struct page_frag *pfrag = sk_page_frag(sk);
if (!sk_page_frag_refill(sk, pfrag)) goto wait_for_memory;
if (!skb_can_coalesce(skb, i, pfrag->page,
pfrag->offset)) { if (i == MAX_SKB_FRAGS) { struct sk_buff *tskb;
tskb = alloc_skb(0, sk->sk_allocation); if (!tskb) goto wait_for_memory;
if (eor) { bool not_busy = skb_queue_empty(&sk->sk_write_queue);
if (head) { /* Message complete, queue it on send buffer */
__skb_queue_tail(&sk->sk_write_queue, head);
kcm->seq_skb = NULL;
KCM_STATS_INCR(kcm->stats.tx_msgs);
}
if (msg->msg_flags & MSG_BATCH) {
kcm->tx_wait_more = true;
} elseif (kcm->tx_wait_more || not_busy) {
err = kcm_write_msgs(kcm); if (err < 0) { /* We got a hard error in write_msgs but have * already queued this message. Report an error * in the socket, but don't affect return value * from sendmsg
*/
pr_warn("KCM: Hard failure on kcm_write_msgs\n");
report_csk_error(&kcm->sk, -err);
}
}
} else { /* Message not complete, save state */
partial_message: if (head) {
kcm->seq_skb = head;
kcm_tx_msg(head)->last_skb = skb;
}
}
if (sock->type == SOCK_SEQPACKET) { /* Wrote some bytes before encountering an * error, return partial success.
*/ if (copied) goto partial_message; if (head != kcm->seq_skb)
kfree_skb(head);
} else {
kfree_skb(head);
kcm->seq_skb = NULL;
}
err = sk_stream_error(sk, msg->msg_flags, err);
/* make sure we wake any epoll edge trigger waiter */ if (unlikely(skb_queue_len(&sk->sk_write_queue) == 0 && err == -EAGAIN))
sk->sk_write_space(sk);
/* We have no way to return MSG_EOR. If all the bytes have been * read we still leave the message in the receive socket buffer. * A subsequent recvmsg needs to be done to return MSG_EOR and * finish reading the message.
*/
/* If a psock is reserved we'll do cleanup in unreserve */ if (!kcm->rx_psock) { if (kcm->rx_wait) {
list_del(&kcm->wait_rx_list); /* paired with lockless reads in kcm_rfree() */
WRITE_ONCE(kcm->rx_wait, false);
}
staticint kcm_setsockopt(struct socket *sock, int level, int optname,
sockptr_t optval, unsignedint optlen)
{ struct kcm_sock *kcm = kcm_sk(sock->sk); int val, valbool; int err = 0;
if (level != SOL_KCM) return -ENOPROTOOPT;
if (optlen < sizeof(int)) return -EINVAL;
if (copy_from_sockptr(&val, optval, sizeof(int))) return -EFAULT;
valbool = val ? 1 : 0;
switch (optname) { case KCM_RECV_DISABLE:
lock_sock(&kcm->sk); if (valbool)
kcm_recv_disable(kcm); else
kcm_recv_enable(kcm);
release_sock(&kcm->sk); break; default:
err = -ENOPROTOOPT;
}
return err;
}
staticint kcm_getsockopt(struct socket *sock, int level, int optname, char __user *optval, int __user *optlen)
{ struct kcm_sock *kcm = kcm_sk(sock->sk); int val, len;
if (level != SOL_KCM) return -ENOPROTOOPT;
if (get_user(len, optlen)) return -EFAULT;
if (len < 0) return -EINVAL;
len = min_t(unsignedint, len, sizeof(int));
switch (optname) { case KCM_RECV_DISABLE:
val = kcm->rx_disabled; break; default: return -ENOPROTOOPT;
}
if (put_user(len, optlen)) return -EFAULT; if (copy_to_user(optval, &val, len)) return -EFAULT; return 0;
}
staticvoid init_kcm_sock(struct kcm_sock *kcm, struct kcm_mux *mux)
{ struct kcm_sock *tkcm; struct list_head *head; int index = 0;
/* For SOCK_SEQPACKET sock type, datagram_poll checks the sk_state, so * we set sk_state, otherwise epoll_wait always returns right away with * EPOLLHUP
*/
kcm->sk.sk_state = TCP_ESTABLISHED;
/* Add to mux's kcm sockets list */
kcm->mux = mux;
spin_lock_bh(&mux->lock);
head = &mux->kcm_socks;
list_for_each_entry(tkcm, &mux->kcm_socks, kcm_sock_list) { if (tkcm->index != index) break;
head = &tkcm->kcm_sock_list;
index++;
}
/* Only allow TCP sockets to be attached for now */ if ((csk->sk_family != AF_INET && csk->sk_family != AF_INET6) ||
csk->sk_protocol != IPPROTO_TCP) {
err = -EOPNOTSUPP; goto out;
}
/* Check if sk_user_data is already by KCM or someone else. * Must be done under lock to prevent race conditions.
*/ if (csk->sk_user_data) {
write_unlock_bh(&csk->sk_callback_lock);
kmem_cache_free(kcm_psockp, psock);
err = -EALREADY; goto out;
}
/* Finished initialization, now add the psock to the MUX. */
spin_lock_bh(&mux->lock);
head = &mux->psocks;
list_for_each_entry(tpsock, &mux->psocks, psock_list) { if (tpsock->index != index) break;
head = &tpsock->psock_list;
index++;
}
/* Stop getting callbacks from TCP socket. After this there should * be no way to reserve a kcm for this psock.
*/
write_lock_bh(&csk->sk_callback_lock);
csk->sk_user_data = NULL;
csk->sk_data_ready = psock->save_data_ready;
csk->sk_write_space = psock->save_write_space;
csk->sk_state_change = psock->save_state_change;
strp_stop(&psock->strp);
if (WARN_ON(psock->rx_kcm)) {
write_unlock_bh(&csk->sk_callback_lock);
release_sock(csk); return;
}
spin_lock_bh(&mux->rx_lock);
/* Stop receiver activities. After this point psock should not be * able to get onto ready list either through callbacks or work.
*/ if (psock->ready_rx_msg) {
list_del(&psock->psock_ready_list);
kfree_skb(psock->ready_rx_msg);
psock->ready_rx_msg = NULL;
KCM_STATS_INCR(mux->stats.rx_ready_drops);
}
spin_unlock_bh(&mux->rx_lock);
write_unlock_bh(&csk->sk_callback_lock);
/* Call strp_done without sock lock */
release_sock(csk);
strp_done(&psock->strp);
lock_sock(csk);
if (psock->tx_kcm) { /* psock was reserved. Just mark it finished and we will clean * up in the kcm paths, we need kcm lock which can not be * acquired here.
*/
KCM_STATS_INCR(mux->stats.psock_unattach_rsvd);
spin_unlock_bh(&mux->lock);
/* We are unattaching a socket that is reserved. Abort the * socket since we may be out of sync in sending on it. We need * to do this without the mux lock.
*/
kcm_abort_tx_psock(psock, EPIPE, false);
spin_lock_bh(&mux->lock); if (!psock->tx_kcm) { /* psock now unreserved in window mux was unlocked */ goto no_reserved;
}
psock->done = 1;
/* Commit done before queuing work to process it */
smp_mb();
/* Queue tx work to make sure psock->done is handled */
queue_work(kcm_wq, &psock->tx_kcm->tx_work);
spin_unlock_bh(&mux->lock);
} else {
no_reserved: if (!psock->tx_stopped)
list_del(&psock->psock_avail_list);
list_del(&psock->psock_list);
mux->psocks_cnt--;
spin_unlock_bh(&mux->lock);
spin_lock_bh(&mux->rx_lock); if (kcm->rx_psock) { /* Cleanup in unreserve_rx_kcm */
WARN_ON(kcm->done);
kcm->rx_disabled = 1;
kcm->done = 1;
spin_unlock_bh(&mux->rx_lock); return;
}
if (kcm->rx_wait) {
list_del(&kcm->wait_rx_list); /* paired with lockless reads in kcm_rfree() */
WRITE_ONCE(kcm->rx_wait, false);
} /* Move any pending receive messages to other kcm sockets */
requeue_rx_msgs(mux, &sk->sk_receive_queue);
if (!socks_cnt) { /* We are done with the mux now. */
release_mux(mux);
}
WARN_ON(kcm->rx_wait);
sock_put(&kcm->sk);
}
/* Called by kcm_release to close a KCM socket. * If this is the last KCM socket on the MUX, destroy the MUX.
*/ staticint kcm_release(struct socket *sock)
{ struct sock *sk = sock->sk; struct kcm_sock *kcm; struct kcm_mux *mux; struct kcm_psock *psock;
/* Purge queue under lock to avoid race condition with tx_work trying * to act when queue is nonempty. If tx_work runs after this point * it will just return.
*/
__skb_queue_purge(&sk->sk_write_queue);
release_sock(sk);
spin_lock_bh(&mux->lock); if (kcm->tx_wait) { /* Take of tx_wait list, after this point there should be no way * that a psock will be assigned to this kcm.
*/
list_del(&kcm->wait_psock_list);
kcm->tx_wait = false;
}
spin_unlock_bh(&mux->lock);
/* Cancel work. After this point there should be no outside references * to the kcm socket.
*/
disable_work_sync(&kcm->tx_work);
lock_sock(sk);
psock = kcm->tx_psock; if (psock) { /* A psock was reserved, so we need to kill it since it * may already have some bytes queued from a message. We * need to do this after removing kcm from tx_wait list.
*/
kcm_abort_tx_psock(psock, EPIPE, false);
unreserve_psock(kcm);
}
release_sock(sk);
/* All KCM sockets should be closed at this point, which should mean * that all multiplexors and psocks have been destroyed.
*/
WARN_ON(!list_empty(&knet->mux_list));
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.