// SPDX-License-Identifier: GPL-2.0-or-later /* Processing of received RxRPC packets * * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com)
*/
#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
#include"ar-internal.h"
/* Override priority when generating ACKs for received DATA */ staticconst u8 rxrpc_ack_priority[RXRPC_ACK__INVALID] = {
[RXRPC_ACK_IDLE] = 1,
[RXRPC_ACK_DELAY] = 2,
[RXRPC_ACK_REQUESTED] = 3,
[RXRPC_ACK_DUPLICATE] = 4,
[RXRPC_ACK_EXCEEDS_WINDOW] = 5,
[RXRPC_ACK_NOSPACE] = 6,
[RXRPC_ACK_OUT_OF_SEQUENCE] = 7,
};
switch (call->cong_ca_state) { case RXRPC_CA_SLOW_START: if (call->acks_nr_snacks > 0) goto packet_loss_detected; if (call->cong_cumul_acks > 0)
call->cong_cwnd += 1; if (call->cong_cwnd >= call->cong_ssthresh) {
call->cong_ca_state = RXRPC_CA_CONGEST_AVOIDANCE;
call->cong_tstamp = call->acks_latest_ts;
} goto out;
case RXRPC_CA_CONGEST_AVOIDANCE: if (call->acks_nr_snacks > 0) goto packet_loss_detected;
/* We analyse the number of packets that get ACK'd per RTT * period and increase the window if we managed to fill it.
*/ if (call->rtt_count == 0) goto out; if (ktime_before(call->acks_latest_ts,
ktime_add_us(call->cong_tstamp,
call->srtt_us >> 3))) goto out_no_clear_ca;
summary->change = rxrpc_cong_rtt_window_end;
call->cong_tstamp = call->acks_latest_ts; if (call->cong_cumul_acks >= call->cong_cwnd)
call->cong_cwnd++; goto out;
case RXRPC_CA_PACKET_LOSS: if (call->acks_nr_snacks == 0) goto resume_normality;
if (summary->new_low_snack) {
summary->change = rxrpc_cong_new_low_nack;
call->cong_dup_acks = 1; if (call->cong_extra > 1)
call->cong_extra = 1; goto send_extra_data;
}
call->cong_dup_acks++; if (call->cong_dup_acks < 3) goto send_extra_data;
send_extra_data: /* Send some previously unsent DATA if we have some to advance the ACK * state.
*/ if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) ||
call->acks_nr_sacks != call->tx_top - call->tx_bottom) {
call->cong_extra++;
wake_up(&call->waitq);
} goto out_no_clear_ca;
}
/* * Degrade the congestion window if we haven't transmitted a packet for >1RTT.
*/ void rxrpc_congestion_degrade(struct rxrpc_call *call)
{
ktime_t rtt, now, time_since;
if (call->cong_ca_state != RXRPC_CA_SLOW_START &&
call->cong_ca_state != RXRPC_CA_CONGEST_AVOIDANCE) return; if (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_REPLY) return;
rtt = ns_to_ktime(call->srtt_us * (NSEC_PER_USEC / 8));
now = ktime_get_real();
time_since = ktime_sub(now, call->tx_last_sent); if (ktime_before(time_since, rtt)) return;
/* We may have a left over fully-consumed buffer at the front that we * couldn't drop before (rotate_and_keep below).
*/ if (seq == call->tx_qbase + RXRPC_NR_TXQUEUE) {
call->tx_qbase += RXRPC_NR_TXQUEUE;
call->tx_queue = tq->next;
trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
kfree(tq);
tq = call->tx_queue;
}
if (trace)
trace_rxrpc_rack_update(call, summary);
if (rot_last) {
set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags); if (tq) {
trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
kfree(tq);
call->tx_queue = NULL;
}
}
_debug("%x,%x,%x,%d", to, call->tx_bottom, call->tx_top, rot_last);
wake_up(&call->waitq); return rot_last;
}
/* * End the transmission phase of a call. * * This occurs when we get an ACKALL packet, the first DATA packet of a reply, * or a final ACK packet.
*/ staticvoid rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun, enum rxrpc_abort_reason abort_why)
{
ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags));
/* * Push a DATA packet onto the Rx queue.
*/ staticvoid rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
rxrpc_seq_t window, rxrpc_seq_t wtop, enum rxrpc_receive_trace why)
{ struct rxrpc_skb_priv *sp = rxrpc_skb(skb); bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
spin_lock_irq(&call->recvmsg_queue.lock);
__skb_queue_tail(&call->recvmsg_queue, skb);
rxrpc_input_update_ack_window(call, window, wtop);
trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq); if (last) /* Change the state inside the lock so that recvmsg syncs * correctly with it and using sendmsg() to send a reply * doesn't race.
*/
rxrpc_end_rx_phase(call, sp->hdr.serial);
spin_unlock_irq(&call->recvmsg_queue.lock);
}
/* * Process a DATA packet.
*/ staticvoid rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, bool *_notify, rxrpc_serial_t *_ack_serial, int *_ack_reason)
{ struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct sk_buff *oos;
rxrpc_serial_t serial = sp->hdr.serial; unsignedint sack = call->ackr_sack_base;
rxrpc_seq_t window = call->ackr_window;
rxrpc_seq_t wtop = call->ackr_wtop;
rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
rxrpc_seq_t seq = sp->hdr.seq; bool last = sp->hdr.flags & RXRPC_LAST_PACKET; int ack_reason = -1;
rxrpc_inc_stat(call->rxnet, stat_rx_data); if (sp->hdr.flags & RXRPC_REQUEST_ACK)
rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack); if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);
if (before(seq, window)) {
ack_reason = RXRPC_ACK_DUPLICATE; goto send_ack;
} if (after(seq, wlimit)) {
ack_reason = RXRPC_ACK_EXCEEDS_WINDOW; goto send_ack;
}
/* Queue the packet. */ if (seq == window) { if (sp->hdr.flags & RXRPC_REQUEST_ACK)
ack_reason = RXRPC_ACK_REQUESTED; /* Send an immediate ACK if we fill in a hole */ elseif (!skb_queue_empty(&call->rx_oos_queue))
ack_reason = RXRPC_ACK_DELAY;
/* * Process a DATA packet, adding the packet to the Rx ring. The caller's * packet ref must be passed on or discarded.
*/ staticvoid rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
{ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
rxrpc_serial_t serial = sp->hdr.serial;
rxrpc_seq_t seq0 = sp->hdr.seq;
switch (__rxrpc_call_state(call)) { case RXRPC_CALL_CLIENT_SEND_REQUEST: case RXRPC_CALL_CLIENT_AWAIT_REPLY: /* Received data implicitly ACKs all of the request * packets we sent when we're acting as a client.
*/ if (!rxrpc_receiving_reply(call)) goto out_notify; break;
case RXRPC_CALL_SERVER_RECV_REQUEST: { unsignedlong timo = READ_ONCE(call->next_req_timo);
if (orig_serial == acked_serial) {
clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
smp_mb(); /* Read data before setting avail bit */
set_bit(i, &call->rtt_avail);
rxrpc_call_add_rtt(call, type, i, acked_serial, ack_serial,
sent_at, resp_time);
matched = true;
}
/* If a later serial is being acked, then mark this slot as * being available.
*/ if (after(acked_serial, orig_serial)) {
trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_obsolete, i,
orig_serial, acked_serial, 0, 0, 0);
clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
smp_wmb();
set_bit(i, &call->rtt_avail);
}
}
/* See how many previously logged ACKs/NAKs have flipped. */
flipped = (tq->segment_acked ^ extracted_acks) & old_reported; if (flipped) {
n_to_a = ~tq->segment_acked & flipped; /* Old NAK -> ACK */
a_to_n = tq->segment_acked & flipped; /* Old ACK -> NAK */
a = hweight_long(n_to_a);
n = hweight_long(a_to_n);
_debug("flip %16lx", flipped);
_debug("ntoa %16lx %d", n_to_a, a);
_debug("aton %16lx %d", a_to_n, n);
call->acks_nr_sacks += a - n;
call->acks_nr_snacks += n - a;
summary->nr_new_sacks += a;
summary->nr_new_snacks += n;
}
/* See how many new ACKs/NAKs have been acquired. */ new = nr_reported - tq->nr_reported_acks; if (new > 0) {
new_acks = extracted_acks & ~old_reported; if (new_acks) {
a = hweight_long(new_acks);
n = new - a;
_debug("new_a %16lx new=%d a=%d n=%d", new_acks, new, a, n);
call->acks_nr_sacks += a;
call->acks_nr_snacks += n;
summary->nr_new_sacks += a;
summary->nr_new_snacks += n;
} else {
call->acks_nr_snacks += new;
summary->nr_new_snacks += new;
}
}
/* * Process individual soft ACKs. * * Each ACK in the array corresponds to one packet and can be either an ACK or * a NAK. If we get find an explicitly NAK'd packet we resend immediately; * packets that lie beyond the end of the ACK list are scheduled for resend by * the timer on the basis that the peer might just not have processed them at * the time the ACK was sent.
*/ staticvoid rxrpc_input_soft_acks(struct rxrpc_call *call, struct rxrpc_ack_summary *summary, struct sk_buff *skb)
{ struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_txqueue *tq = call->tx_queue; unsignedlong extracted = ~0UL; unsignedint nr = 0;
rxrpc_seq_t seq = call->acks_hard_ack + 1;
rxrpc_seq_t lowest_nak = seq + sp->ack.nr_acks;
u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket);
while (after(seq, tq->qbase + RXRPC_NR_TXQUEUE - 1))
tq = tq->next;
for (unsignedint i = 0; i < sp->ack.nr_acks; i++) { /* Decant ACKs until we hit a txqueue boundary. */
shiftr_adv_rotr(acks, extracted); if (i == 256) {
acks -= i;
i = 0;
}
seq++;
nr++; if ((seq & RXRPC_TXQ_MASK) != 0) continue;
/* We *can* have more nacks than we did - the peer is permitted to drop * packets it has soft-acked and re-request them. Further, it is * possible for the nack distribution to change whilst the number of * nacks stays the same or goes down.
*/ if (lowest_nak != call->acks_lowest_nak) {
call->acks_lowest_nak = lowest_nak;
summary->new_low_snack = true;
}
/* * Return true if the ACK is valid - ie. it doesn't appear to have regressed * with respect to the ack state conveyed by preceding ACKs.
*/ staticbool rxrpc_is_ack_valid(struct rxrpc_call *call,
rxrpc_seq_t hard_ack, rxrpc_seq_t prev_pkt)
{
rxrpc_seq_t base = READ_ONCE(call->acks_hard_ack);
if (after(hard_ack, base)) returntrue; /* The window advanced */
if (before(hard_ack, base)) returnfalse; /* firstPacket regressed */
if (after_eq(prev_pkt, call->acks_prev_seq)) returntrue; /* previousPacket hasn't regressed. */
/* Some rx implementations put a serial number in previousPacket. */ if (after(prev_pkt, base + call->tx_winsize)) returnfalse; returntrue;
}
/* * Process an ACK packet. * * ack.firstPacket is the sequence number of the first soft-ACK'd/NAK'd packet * in the ACK array. Anything before that is hard-ACK'd and may be discarded. * * A hard-ACK means that a packet has been processed and may be discarded; a * soft-ACK means that the packet may be discarded and retransmission * requested. A phase is complete when all packets are hard-ACK'd.
*/ staticvoid rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
{ struct rxrpc_ack_summary summary = { 0 }; struct rxrpc_acktrailer trailer; struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt; int nr_acks, offset, ioffset;
/* If we get an EXCEEDS_WINDOW ACK from the server, it probably * indicates that the client address changed due to NAT. The server * lost the call because it switched to a different peer.
*/ if (unlikely(summary.ack_reason == RXRPC_ACK_EXCEEDS_WINDOW) &&
hard_ack == 0 &&
prev_pkt == 0 &&
rxrpc_is_client_call(call)) {
rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
0, -ENETRESET); goto send_response;
}
/* If we get an OUT_OF_SEQUENCE ACK from the server, that can also * indicate a change of address. However, we can retransmit the call * if we still have it buffered to the beginning.
*/ if (unlikely(summary.ack_reason == RXRPC_ACK_OUT_OF_SEQUENCE) &&
hard_ack == 0 &&
prev_pkt == 0 &&
call->tx_bottom == 0 &&
rxrpc_is_client_call(call)) {
rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
0, -ENETRESET); goto send_response;
}
/* Discard any out-of-order or duplicate ACKs (outside lock). */ if (!rxrpc_is_ack_valid(call, hard_ack, prev_pkt)) {
trace_rxrpc_rx_discard_ack(call, summary.ack_serial, hard_ack, prev_pkt); goto send_response; /* Still respond if requested. */
}
if (summary.acked_serial) { switch (summary.ack_reason) { case RXRPC_ACK_PING_RESPONSE:
rxrpc_complete_rtt_probe(call, call->acks_latest_ts,
summary.acked_serial, summary.ack_serial,
rxrpc_rtt_rx_ping_response); break; default: if (after(summary.acked_serial, call->acks_highest_serial))
call->acks_highest_serial = summary.acked_serial;
summary.rtt_sample_avail = true; break;
}
}
/* Parse rwind and mtu sizes if provided. */ if (trailer.maxMTU)
rxrpc_input_ack_trailer(call, skb, &trailer);
if (hard_ack + 1 == 0) return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_zero);
/* Ignore ACKs unless we are or have just been transmitting. */ switch (__rxrpc_call_state(call)) { case RXRPC_CALL_CLIENT_SEND_REQUEST: case RXRPC_CALL_CLIENT_AWAIT_REPLY: case RXRPC_CALL_SERVER_SEND_REPLY: case RXRPC_CALL_SERVER_AWAIT_ACK: break; default: goto send_response;
}
if (before(hard_ack, call->tx_bottom) ||
after(hard_ack, call->tx_top)) return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_outside_window); if (nr_acks > call->tx_top - hard_ack) return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_sack_overflow);
if (after(hard_ack, call->tx_bottom)) { if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) {
rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ack); goto send_response;
}
}
if (nr_acks > 0) { if (offset > (int)skb->len - nr_acks) return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_short_sack);
rxrpc_input_soft_acks(call, &summary, skb);
}
/* Drive the congestion management algorithm first and then RACK-TLP as * the latter depends on the state/change in state in the former.
*/
rxrpc_congestion_management(call, &summary);
rxrpc_rack_detect_loss_and_arm_timer(call, &summary);
rxrpc_tlp_process_ack(call, &summary); if (call->tlp_serial && after_eq(summary.acked_serial, call->tlp_serial))
call->tlp_serial = 0;
switch (sp->hdr.type) { case RXRPC_PACKET_TYPE_DATA: return rxrpc_input_data(call, skb);
case RXRPC_PACKET_TYPE_ACK: return rxrpc_input_ack(call, skb);
case RXRPC_PACKET_TYPE_BUSY: /* Just ignore BUSY packets from the server; the retry and * lifespan timers will take care of business. BUSY packets * from the client don't make sense.
*/ return;
case RXRPC_PACKET_TYPE_ABORT: return rxrpc_input_abort(call, skb);
case RXRPC_PACKET_TYPE_ACKALL: return rxrpc_input_ackall(call, skb);
default: break;
}
}
/* * Handle a new service call on a channel implicitly completing the preceding * call on that channel. This does not apply to client conns. * * TODO: If callNumber > call_id + 1, renegotiate security.
*/ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
{ switch (__rxrpc_call_state(call)) { case RXRPC_CALL_SERVER_AWAIT_ACK:
rxrpc_call_completed(call);
fallthrough; case RXRPC_CALL_COMPLETE: break; default:
rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ESHUTDOWN,
rxrpc_eproto_improper_term);
trace_rxrpc_improper_term(call); break;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.