Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/third_party/rust/neqo-transport/src/connection/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 134 kB image not shown  

Quelle  mod.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

// The class implementing a QUIC connection.

use std::{
    cell::RefCell,
    cmp::{max, min},
    fmt::{self, Debug},
    iter, mem,
    net::{IpAddr, SocketAddr},
    num::NonZeroUsize,
    ops::RangeInclusive,
    rc::{Rc, Weak},
    time::{Duration, Instant},
};

use neqo_common::{
    event::Provider as EventProvider, hex, hex_snip_middle, hrtime, qdebug, qerror, qinfo,
    qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, Role,
};
use neqo_crypto::{
    agent::CertificateInfo, Agent, AntiReplay, AuthenticationStatus, Cipher, Client, Group,
    HandshakeState, PrivateKey, PublicKey, ResumptionToken, SecretAgentInfo, SecretAgentPreInfo,
    Server, ZeroRttChecker,
};
use smallvec::SmallVec;

use crate::{
    addr_valid::{AddressValidation, NewTokenState},
    cid::{
        ConnectionId, ConnectionIdEntry, ConnectionIdGenerator, ConnectionIdManager,
        ConnectionIdRef, ConnectionIdStore, LOCAL_ACTIVE_CID_LIMIT,
    },
    crypto::{Crypto, CryptoDxState, CryptoSpace},
    ecn::EcnCount,
    events::{ConnectionEvent, ConnectionEvents, OutgoingDatagramOutcome},
    frame::{
        CloseError, Frame, FrameType, FRAME_TYPE_CONNECTION_CLOSE_APPLICATION,
        FRAME_TYPE_CONNECTION_CLOSE_TRANSPORT,
    },
    packet::{DecryptedPacket, PacketBuilder, PacketNumber, PacketType, PublicPacket},
    path::{Path, PathRef, Paths},
    qlog,
    quic_datagrams::{DatagramTracking, QuicDatagrams},
    recovery::{LossRecovery, RecoveryToken, SendProfile, SentPacket},
    recv_stream::RecvStreamStats,
    rtt::{RttEstimate, GRANULARITY, INITIAL_RTT},
    send_stream::SendStream,
    stats::{Stats, StatsCell},
    stream_id::StreamType,
    streams::{SendOrder, Streams},
    tparams::{
        self, TransportParameter, TransportParameterId, TransportParameters,
        TransportParametersHandler,
    },
    tracking::{AckTracker, PacketNumberSpace, RecvdPackets},
    version::{Version, WireVersion},
    AppError, CloseReason, Error, Res, StreamId,
};

mod dump;
mod idle;
pub mod params;
mod saved;
mod state;
#[cfg(test)]
pub mod test_internal;

use dump::dump_packet;
use idle::IdleTimeout;
pub use params::ConnectionParameters;
use params::PreferredAddressConfig;
#[cfg(test)]
pub use params::ACK_RATIO_SCALE;
use saved::SavedDatagrams;
use state::StateSignaling;
pub use state::{ClosingFrame, State};

pub use crate::send_stream::{RetransmissionPriority, SendStreamStats, TransmissionPriority};

/// The number of Initial packets that the client will send in response
/// to receiving an undecryptable packet during the early part of the
/// handshake.  This is a hack, but a useful one.
const EXTRA_INITIALS: usize = 4;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ZeroRttState {
    Init,
    Sending,
    AcceptedClient,
    AcceptedServer,
    Rejected,
}

#[derive(Clone, Debug, PartialEq, Eq)]
/// Type returned from `process()` and `process_output()`. Users are required to
/// call these repeatedly until `Callback` or `None` is returned.
pub enum Output {
    /// Connection requires no action.
    None,
    /// Connection requires the datagram be sent.
    Datagram(Datagram),
    /// Connection requires `process_input()` be called when the `Duration`
    /// elapses.
    Callback(Duration),
}

impl Output {
    /// Convert into an `Option<Datagram>`.
    #[must_use]
    pub fn dgram(self) -> Option<Datagram> {
        match self {
            Self::Datagram(dg) => Some(dg),
            _ => None,
        }
    }

    /// Get a reference to the Datagram, if any.
    #[must_use]
    pub const fn as_dgram_ref(&self) -> Option<&Datagram> {
        match self {
            Self::Datagram(dg) => Some(dg),
            _ => None,
        }
    }

    /// Ask how long the caller should wait before calling back.
    #[must_use]
    pub const fn callback(&self) -> Duration {
        match self {
            Self::Callback(t) => *t,
            _ => Duration::new(0, 0),
        }
    }

    #[must_use]
    pub fn or_else<F>(self, f: F) -> Self
    where
        F: FnOnce() -> Self,
    {
        match self {
            x @ (Self::Datagram(_) | Self::Callback(_)) => x,
            Self::None => f(),
        }
    }
}

/// Used by inner functions like `Connection::output`.
enum SendOption {
    /// Yes, please send this datagram.
    Yes(Datagram),
    /// Don't send.  If this was blocked on the pacer (the arg is true).
    No(bool),
}

impl Default for SendOption {
    fn default() -> Self {
        Self::No(false)
    }
}

/// Used by `Connection::preprocess` to determine what to do
/// with an packet before attempting to remove protection.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PreprocessResult {
    /// End processing and return successfully.
    End,
    /// Stop processing this datagram and move on to the next.
    Next,
    /// Continue and process this packet.
    Continue,
}

/// `AddressValidationInfo` holds information relevant to either
/// responding to address validation (`NewToken`, `Retry`) or generating
/// tokens for address validation (`Server`).
enum AddressValidationInfo {
    None,
    // We are a client and have information from `NEW_TOKEN`.
    NewToken(Vec<u8>),
    // We are a client and have received a `Retry` packet.
    Retry {
        token: Vec<u8>,
        retry_source_cid: ConnectionId,
    },
    // We are a server and can generate tokens.
    Server(Weak<RefCell<AddressValidation>>),
}

impl AddressValidationInfo {
    pub fn token(&self) -> &[u8] {
        match self {
            Self::NewToken(token) | Self::Retry { token, .. } => token,
            _ => &[],
        }
    }

    pub fn generate_new_token(&self, peer_address: SocketAddr, now: Instant) -> Option<Vec<u8>> {
        match self {
            Self::Server(ref w) => w.upgrade().and_then(|validation| {
                validation
                    .borrow()
                    .generate_new_token(peer_address, now)
                    .ok()
            }),
            Self::None => None,
            _ => unreachable!("called a server function on a client"),
        }
    }
}

/// A QUIC Connection
///
/// First, create a new connection using `new_client()` or `new_server()`.
///
/// For the life of the connection, handle activity in the following manner:
/// 1. Perform operations using the `stream_*()` methods.
/// 1. Call `process_input()` when a datagram is received or the timer expires. Obtain information
///    on connection state changes by checking `events()`.
/// 1. Having completed handling current activity, repeatedly call `process_output()` for packets to
///    send, until it returns `Output::Callback` or `Output::None`.
///
/// After the connection is closed (either by calling `close()` or by the
/// remote) continue processing until `state()` returns `Closed`.
pub struct Connection {
    role: Role,
    version: Version,
    state: State,
    tps: Rc<RefCell<TransportParametersHandler>>,
    /// What we are doing with 0-RTT.
    zero_rtt_state: ZeroRttState,
    /// All of the network paths that we are aware of.
    paths: Paths,
    /// This object will generate connection IDs for the connection.
    cid_manager: ConnectionIdManager,
    address_validation: AddressValidationInfo,
    /// The connection IDs that were provided by the peer.
    connection_ids: ConnectionIdStore<[u8; 16]>,

    /// The source connection ID that this endpoint uses for the handshake.
    /// Since we need to communicate this to our peer in tparams, setting this
    /// value is part of constructing the struct.
    local_initial_source_cid: ConnectionId,
    /// The source connection ID from the first packet from the other end.
    /// This is checked against the peer's transport parameters.
    remote_initial_source_cid: Option<ConnectionId>,
    /// The destination connection ID from the first packet from the client.
    /// This is checked by the client against the server's transport parameters.
    original_destination_cid: Option<ConnectionId>,

    /// We sometimes save a datagram against the possibility that keys will later
    /// become available.  This avoids reporting packets as dropped during the handshake
    /// when they are either just reordered or we haven't been able to install keys yet.
    /// In particular, this occurs when asynchronous certificate validation happens.
    saved_datagrams: SavedDatagrams,
    /// Some packets were received, but not tracked.
    received_untracked: bool,

    /// This is responsible for the `QuicDatagrams`' handling:
    /// <https://datatracker.ietf.org/doc/html/draft-ietf-quic-datagram>
    quic_datagrams: QuicDatagrams,

    pub(crate) crypto: Crypto,
    pub(crate) acks: AckTracker,
    idle_timeout: IdleTimeout,
    streams: Streams,
    state_signaling: StateSignaling,
    loss_recovery: LossRecovery,
    events: ConnectionEvents,
    new_token: NewTokenState,
    stats: StatsCell,
    qlog: NeqoQlog,
    /// A session ticket was received without `NEW_TOKEN`,
    /// this is when that turns into an event without `NEW_TOKEN`.
    release_resumption_token_timer: Option<Instant>,
    conn_params: ConnectionParameters,
    hrtime: hrtime::Handle,

    /// For testing purposes it is sometimes necessary to inject frames that wouldn't
    /// otherwise be sent, just to see how a connection handles them.  Inserting them
    /// into packets proper mean that the frames follow the entire processing path.
    #[cfg(test)]
    pub test_frame_writer: Option<Box<dyn test_internal::FrameWriter>>,
}

impl Debug for Connection {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "{:?} Connection: {:?} {:?}",
            self.role,
            self.state,
            self.paths.primary()
        )
    }
}

impl Connection {
    /// A long default for timer resolution, so that we don't tax the
    /// system too hard when we don't need to.
    const LOOSE_TIMER_RESOLUTION: Duration = Duration::from_millis(50);

    /// Create a new QUIC connection with Client role.
    /// # Errors
    /// When NSS fails and an agent cannot be created.
    pub fn new_client(
        server_name: impl Into<String>,
        protocols: &[impl AsRef<str>],
        cid_generator: Rc<RefCell<dyn ConnectionIdGenerator>>,
        local_addr: SocketAddr,
        remote_addr: SocketAddr,
        conn_params: ConnectionParameters,
        now: Instant,
    ) -> Res<Self> {
        let dcid = ConnectionId::generate_initial();
        let mut c = Self::new(
            Role::Client,
            Agent::from(Client::new(server_name.into(), conn_params.is_greasing())?),
            cid_generator,
            protocols,
            conn_params,
        )?;
        c.crypto.states.init(
            c.conn_params.get_versions().compatible(),
            Role::Client,
            &dcid,
        )?;
        c.original_destination_cid = Some(dcid);
        let path = Path::temporary(
            local_addr,
            remote_addr,
            c.conn_params.get_cc_algorithm(),
            c.conn_params.pacing_enabled(),
            NeqoQlog::default(),
            now,
        );
        c.setup_handshake_path(&Rc::new(RefCell::new(path)), now);
        Ok(c)
    }

    /// Create a new QUIC connection with Server role.
    /// # Errors
    /// When NSS fails and an agent cannot be created.
    pub fn new_server(
        certs: &[impl AsRef<str>],
        protocols: &[impl AsRef<str>],
        cid_generator: Rc<RefCell<dyn ConnectionIdGenerator>>,
        conn_params: ConnectionParameters,
    ) -> Res<Self> {
        Self::new(
            Role::Server,
            Agent::from(Server::new(certs)?),
            cid_generator,
            protocols,
            conn_params,
        )
    }

    fn new<P: AsRef<str>>(
        role: Role,
        agent: Agent,
        cid_generator: Rc<RefCell<dyn ConnectionIdGenerator>>,
        protocols: &[P],
        conn_params: ConnectionParameters,
    ) -> Res<Self> {
        // Setup the local connection ID.
        let local_initial_source_cid = cid_generator
            .borrow_mut()
            .generate_cid()
            .ok_or(Error::ConnectionIdsExhausted)?;
        let mut cid_manager =
            ConnectionIdManager::new(cid_generator, local_initial_source_cid.clone());
        let mut tps = conn_params.create_transport_parameter(role, &mut cid_manager)?;
        tps.local.set_bytes(
            tparams::INITIAL_SOURCE_CONNECTION_ID,
            local_initial_source_cid.to_vec(),
        );

        let tphandler = Rc::new(RefCell::new(tps));
        let crypto = Crypto::new(
            conn_params.get_versions().initial(),
            agent,
            protocols.iter().map(P::as_ref).map(String::from).collect(),
            Rc::clone(&tphandler),
        )?;

        let stats = StatsCell::default();
        let events = ConnectionEvents::default();
        let quic_datagrams = QuicDatagrams::new(
            conn_params.get_datagram_size(),
            conn_params.get_outgoing_datagram_queue(),
            conn_params.get_incoming_datagram_queue(),
            events.clone(),
        );

        let c = Self {
            role,
            version: conn_params.get_versions().initial(),
            state: State::Init,
            paths: Paths::default(),
            cid_manager,
            tps: tphandler.clone(),
            zero_rtt_state: ZeroRttState::Init,
            address_validation: AddressValidationInfo::None,
            local_initial_source_cid,
            remote_initial_source_cid: None,
            original_destination_cid: None,
            saved_datagrams: SavedDatagrams::default(),
            received_untracked: false,
            crypto,
            acks: AckTracker::default(),
            idle_timeout: IdleTimeout::new(conn_params.get_idle_timeout()),
            streams: Streams::new(tphandler, role, events.clone()),
            connection_ids: ConnectionIdStore::default(),
            state_signaling: StateSignaling::Idle,
            loss_recovery: LossRecovery::new(stats.clone(), conn_params.get_fast_pto()),
            events,
            new_token: NewTokenState::new(role),
            stats,
            qlog: NeqoQlog::disabled(),
            release_resumption_token_timer: None,
            conn_params,
            hrtime: hrtime::Time::get(Self::LOOSE_TIMER_RESOLUTION),
            quic_datagrams,
            #[cfg(test)]
            test_frame_writer: None,
        };
        c.stats.borrow_mut().init(format!("{c}"));
        Ok(c)
    }

    /// # Errors
    /// When the operation fails.
    pub fn server_enable_0rtt(
        &mut self,
        anti_replay: &AntiReplay,
        zero_rtt_checker: impl ZeroRttChecker + 'static,
    ) -> Res<()> {
        self.crypto
            .server_enable_0rtt(self.tps.clone(), anti_replay, zero_rtt_checker)
    }

    /// # Errors
    /// When the operation fails.
    pub fn server_enable_ech(
        &mut self,
        config: u8,
        public_name: &str,
        sk: &PrivateKey,
        pk: &PublicKey,
    ) -> Res<()> {
        self.crypto.server_enable_ech(config, public_name, sk, pk)
    }

    /// Get the active ECH configuration, which is empty if ECH is disabled.
    #[must_use]
    pub fn ech_config(&self) -> &[u8] {
        self.crypto.ech_config()
    }

    /// # Errors
    /// When the operation fails.
    pub fn client_enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> {
        self.crypto.client_enable_ech(ech_config_list)
    }

    /// Set or clear the qlog for this connection.
    pub fn set_qlog(&mut self, qlog: NeqoQlog) {
        self.loss_recovery.set_qlog(qlog.clone());
        self.paths.set_qlog(qlog.clone());
        self.qlog = qlog;
    }

    /// Get the qlog (if any) for this connection.
    pub fn qlog_mut(&mut self) -> &mut NeqoQlog {
        &mut self.qlog
    }

    /// Get the original destination connection id for this connection. This
    /// will always be present for `Role::Client` but not if `Role::Server` is in
    /// `State::Init`.
    #[must_use]
    pub const fn odcid(&self) -> Option<&ConnectionId> {
        self.original_destination_cid.as_ref()
    }

    /// Set a local transport parameter, possibly overriding a default value.
    /// This only sets transport parameters without dealing with other aspects of
    /// setting the value.
    ///
    /// # Errors
    /// When the transport parameter is invalid.
    /// # Panics
    /// This panics if the transport parameter is known to this crate.
    pub fn set_local_tparam(&self, tp: TransportParameterId, value: TransportParameter) -> Res<()> {
        #[cfg(not(test))]
        {
            assert!(!tparams::INTERNAL_TRANSPORT_PARAMETERS.contains(&tp));
        }
        if *self.state() == State::Init {
            self.tps.borrow_mut().local.set(tp, value);
            Ok(())
        } else {
            qerror!("Current state: {:?}", self.state());
            qerror!("Cannot set local tparam when not in an initial connection state.");
            Err(Error::ConnectionState)
        }
    }

    /// `odcid` is their original choice for our CID, which we get from the Retry token.
    /// `remote_cid` is the value from the Source Connection ID field of an incoming packet: what
    /// the peer wants us to use now. `retry_cid` is what we asked them to use when we sent the
    /// Retry.
    pub(crate) fn set_retry_cids(
        &mut self,
        odcid: &ConnectionId,
        remote_cid: ConnectionId,
        retry_cid: &ConnectionId,
    ) {
        debug_assert_eq!(self.role, Role::Server);
        qtrace!(
            [self],
            "Retry CIDs: odcid={} remote={} retry={}",
            odcid,
            remote_cid,
            retry_cid
        );
        // We advertise "our" choices in transport parameters.
        let local_tps = &mut self.tps.borrow_mut().local;
        local_tps.set_bytes(tparams::ORIGINAL_DESTINATION_CONNECTION_ID, odcid.to_vec());
        local_tps.set_bytes(tparams::RETRY_SOURCE_CONNECTION_ID, retry_cid.to_vec());

        // ...and save their choices for later validation.
        self.remote_initial_source_cid = Some(remote_cid);
    }

    fn retry_sent(&self) -> bool {
        self.tps
            .borrow()
            .local
            .get_bytes(tparams::RETRY_SOURCE_CONNECTION_ID)
            .is_some()
    }

    /// Set ALPN preferences. Strings that appear earlier in the list are given
    /// higher preference.
    /// # Errors
    /// When the operation fails, which is usually due to bad inputs or bad connection state.
    pub fn set_alpn(&mut self, protocols: &[impl AsRef<str>]) -> Res<()> {
        self.crypto.tls.set_alpn(protocols)?;
        Ok(())
    }

    /// Enable a set of ciphers.
    /// # Errors
    /// When the operation fails, which is usually due to bad inputs or bad connection state.
    pub fn set_ciphers(&mut self, ciphers: &[Cipher]) -> Res<()> {
        if self.state != State::Init {
            qerror!([self], "Cannot enable ciphers in state {:?}", self.state);
            return Err(Error::ConnectionState);
        }
        self.crypto.tls.set_ciphers(ciphers)?;
        Ok(())
    }

    /// Enable a set of key exchange groups.
    /// # Errors
    /// When the operation fails, which is usually due to bad inputs or bad connection state.
    pub fn set_groups(&mut self, groups: &[Group]) -> Res<()> {
        if self.state != State::Init {
            qerror!([self], "Cannot enable groups in state {:?}", self.state);
            return Err(Error::ConnectionState);
        }
        self.crypto.tls.set_groups(groups)?;
        Ok(())
    }

    /// Set the number of additional key shares to send in the client hello.
    /// # Errors
    /// When the operation fails, which is usually due to bad inputs or bad connection state.
    pub fn send_additional_key_shares(&mut self, count: usize) -> Res<()> {
        if self.state != State::Init {
            qerror!([self], "Cannot enable groups in state {:?}", self.state);
            return Err(Error::ConnectionState);
        }
        self.crypto.tls.send_additional_key_shares(count)?;
        Ok(())
    }

    fn make_resumption_token(&mut self) -> ResumptionToken {
        debug_assert_eq!(self.role, Role::Client);
        debug_assert!(self.crypto.has_resumption_token());
        // Values less than GRANULARITY are ignored when using the token, so use 0 where needed.
        let rtt = self.paths.primary().map_or_else(
            // If we don't have a path, we don't have an RTT.
            || Duration::from_millis(0),
            |p| {
                let rtt = p.borrow().rtt().estimate();
                if p.borrow().rtt().is_guesstimate() {
                    // When we have no actual RTT sample, do not encode a guestimated RTT larger
                    // than the default initial RTT. (The guess can be very large under lossy
                    // conditions.)
                    if rtt < INITIAL_RTT {
                        rtt
                    } else {
                        Duration::from_millis(0)
                    }
                } else {
                    rtt
                }
            },
        );

        self.crypto
            .create_resumption_token(
                self.new_token.take_token(),
                self.tps
                    .borrow()
                    .remote
                    .as_ref()
                    .expect("should have transport parameters"),
                self.version,
                u64::try_from(rtt.as_millis()).unwrap_or(0),
            )
            .unwrap()
    }

    fn confirmed(&self) -> bool {
        self.state == State::Confirmed
    }

    /// Get the simplest PTO calculation for all those cases where we need
    /// a value of this approximate order.  Don't use this for loss recovery,
    /// only use it where a more precise value is not important.
    fn pto(&self) -> Duration {
        self.paths.primary().map_or_else(
            || RttEstimate::default().pto(self.confirmed()),
            |p| p.borrow().rtt().pto(self.confirmed()),
        )
    }

    fn create_resumption_token(&mut self, now: Instant) {
        if self.role == Role::Server || self.state < State::Connected {
            return;
        }

        qtrace!(
            [self],
            "Maybe create resumption token: {} {}",
            self.crypto.has_resumption_token(),
            self.new_token.has_token()
        );

        while self.crypto.has_resumption_token() && self.new_token.has_token() {
            let token = self.make_resumption_token();
            self.events.client_resumption_token(token);
        }

        // If we have a resumption ticket check or set a timer.
        if self.crypto.has_resumption_token() {
            let arm = if let Some(expiration_time) = self.release_resumption_token_timer {
                if expiration_time <= now {
                    let token = self.make_resumption_token();
                    self.events.client_resumption_token(token);
                    self.release_resumption_token_timer = None;

                    // This means that we release one session ticket every 3 PTOs
                    // if no NEW_TOKEN frame is received.
                    self.crypto.has_resumption_token()
                } else {
                    false
                }
            } else {
                true
            };

            if arm {
                self.release_resumption_token_timer = Some(now + 3 * self.pto());
            }
        }
    }

    /// The correct way to obtain a resumption token is to wait for the
    /// `ConnectionEvent::ResumptionToken` event. To emit the event we are waiting for a
    /// resumption token and a `NEW_TOKEN` frame to arrive. Some servers don't send `NEW_TOKEN`
    /// frames and in this case, we wait for 3xPTO before emitting an event. This is especially a
    /// problem for short-lived connections, where the connection is closed before any events are
    /// released. This function retrieves the token, without waiting for a `NEW_TOKEN` frame to
    /// arrive.
    ///
    /// # Panics
    ///
    /// If this is called on a server.
    pub fn take_resumption_token(&mut self, now: Instant) -> Option<ResumptionToken> {
        assert_eq!(self.role, Role::Client);

        if self.crypto.has_resumption_token() {
            let token = self.make_resumption_token();
            if self.crypto.has_resumption_token() {
                self.release_resumption_token_timer = Some(now + 3 * self.pto());
            }
            Some(token)
        } else {
            None
        }
    }

    /// Enable resumption, using a token previously provided.
    /// This can only be called once and only on the client.
    /// After calling the function, it should be possible to attempt 0-RTT
    /// if the token supports that.
    /// # Errors
    /// When the operation fails, which is usually due to bad inputs or bad connection state.
    pub fn enable_resumption(&mut self, now: Instant, token: impl AsRef<[u8]>) -> Res<()> {
        if self.state != State::Init {
            qerror!([self], "set token in state {:?}", self.state);
            return Err(Error::ConnectionState);
        }
        if self.role == Role::Server {
            return Err(Error::ConnectionState);
        }

        qinfo!(
            [self],
            "resumption token {}",
            hex_snip_middle(token.as_ref())
        );
        let mut dec = Decoder::from(token.as_ref());

        let version = Version::try_from(u32::try_from(
            dec.decode_uint(4).ok_or(Error::InvalidResumptionToken)?,
        )?)?;
        qtrace!([self], "  version {:?}", version);
        if !self.conn_params.get_versions().all().contains(&version) {
            return Err(Error::DisabledVersion);
        }

        let rtt = Duration::from_millis(dec.decode_varint().ok_or(Error::InvalidResumptionToken)?);
        qtrace!([self], "  RTT {:?}", rtt);

        let tp_slice = dec.decode_vvec().ok_or(Error::InvalidResumptionToken)?;
        qtrace!([self], "  transport parameters {}", hex(tp_slice));
        let mut dec_tp = Decoder::from(tp_slice);
        let tp =
            TransportParameters::decode(&mut dec_tp).map_err(|_| Error::InvalidResumptionToken)?;

        let init_token = dec.decode_vvec().ok_or(Error::InvalidResumptionToken)?;
        qtrace!([self], "  Initial token {}", hex(init_token));

        let tok = dec.decode_remainder();
        qtrace!([self], "  TLS token {}", hex(tok));

        match self.crypto.tls {
            Agent::Client(ref mut c) => {
                let res = c.enable_resumption(tok);
                if let Err(e) = res {
                    self.absorb_error::<Error>(now, Err(Error::from(e)));
                    return Ok(());
                }
            }
            Agent::Server(_) => return Err(Error::WrongRole),
        }

        self.version = version;
        self.conn_params.get_versions_mut().set_initial(version);
        self.tps.borrow_mut().set_version(version);
        self.tps.borrow_mut().remote_0rtt = Some(tp);
        if !init_token.is_empty() {
            self.address_validation = AddressValidationInfo::NewToken(init_token.to_vec());
        }
        self.paths
            .primary()
            .ok_or(Error::InternalError)?
            .borrow_mut()
            .rtt_mut()
            .set_initial(rtt);
        self.set_initial_limits();
        // Start up TLS, which has the effect of setting up all the necessary
        // state for 0-RTT.  This only stages the CRYPTO frames.
        let res = self.client_start(now);
        self.absorb_error(now, res);
        Ok(())
    }

    pub(crate) fn set_validation(&mut self, validation: &Rc<RefCell<AddressValidation>>) {
        qtrace!([self], "Enabling NEW_TOKEN");
        assert_eq!(self.role, Role::Server);
        self.address_validation = AddressValidationInfo::Server(Rc::downgrade(validation));
    }

    /// Send a TLS session ticket AND a `NEW_TOKEN` frame (if possible).
    /// # Errors
    /// When the operation fails, which is usually due to bad inputs or bad connection state.
    pub fn send_ticket(&mut self, now: Instant, extra: &[u8]) -> Res<()> {
        if self.role == Role::Client {
            return Err(Error::WrongRole);
        }

        let tps = &self.tps;
        if let Agent::Server(ref mut s) = self.crypto.tls {
            let mut enc = Encoder::default();
            enc.encode_vvec_with(|enc_inner| {
                tps.borrow().local.encode(enc_inner);
            });
            enc.encode(extra);
            let records = s.send_ticket(now, enc.as_ref())?;
            qdebug!([self], "send session ticket {}", hex(&enc));
            self.crypto.buffer_records(records)?;
        } else {
            unreachable!();
        }

        // If we are able, also send a NEW_TOKEN frame.
        // This should be recording all remote addresses that are valid,
        // but there are just 0 or 1 in the current implementation.
        if let Some(path) = self.paths.primary() {
            if let Some(token) = self
                .address_validation
                .generate_new_token(path.borrow().remote_address(), now)
            {
                self.new_token.send_new_token(token);
            }
            Ok(())
        } else {
            Err(Error::NotConnected)
        }
    }

    #[must_use]
    pub fn tls_info(&self) -> Option<&SecretAgentInfo> {
        self.crypto.tls.info()
    }

    /// # Errors
    /// When there is no information to obtain.
    pub fn tls_preinfo(&self) -> Res<SecretAgentPreInfo> {
        Ok(self.crypto.tls.preinfo()?)
    }

    /// Get the peer's certificate chain and other info.
    #[must_use]
    pub fn peer_certificate(&self) -> Option<CertificateInfo> {
        self.crypto.tls.peer_certificate()
    }

    /// Call by application when the peer cert has been verified.
    ///
    /// This panics if there is no active peer.  It's OK to call this
    /// when authentication isn't needed, that will likely only cause
    /// the connection to fail.  However, if no packets have been
    /// exchanged, it's not OK.
    pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) {
        qdebug!([self], "Authenticated {:?}", status);
        self.crypto.tls.authenticated(status);
        let res = self.handshake(now, self.version, PacketNumberSpace::Handshake, None);
        self.absorb_error(now, res);
        self.process_saved(now);
    }

    /// Get the role of the connection.
    #[must_use]
    pub const fn role(&self) -> Role {
        self.role
    }

    /// Get the state of the connection.
    #[must_use]
    pub const fn state(&self) -> &State {
        &self.state
    }

    /// The QUIC version in use.
    #[must_use]
    pub const fn version(&self) -> Version {
        self.version
    }

    /// Get the 0-RTT state of the connection.
    #[must_use]
    pub const fn zero_rtt_state(&self) -> ZeroRttState {
        self.zero_rtt_state
    }

    /// Get a snapshot of collected statistics.
    #[must_use]
    pub fn stats(&self) -> Stats {
        let mut v = self.stats.borrow().clone();
        if let Some(p) = self.paths.primary() {
            let p = p.borrow();
            v.rtt = p.rtt().estimate();
            v.rttvar = p.rtt().rttvar();
        }
        v
    }

    // This function wraps a call to another function and sets the connection state
    // properly if that call fails.
    fn capture_error<T>(
        &mut self,
        path: Option<PathRef>,
        now: Instant,
        frame_type: FrameType,
        res: Res<T>,
    ) -> Res<T> {
        if let Err(v) = &res {
            #[cfg(debug_assertions)]
            let msg = format!("{v:?}");
            #[cfg(not(debug_assertions))]
            let msg = "";
            let error = CloseReason::Transport(v.clone());
            match &self.state {
                State::Closing { error: err, .. }
                | State::Draining { error: err, .. }
                | State::Closed(err) => {
                    qwarn!([self], "Closing again after error {:?}", err);
                }
                State::Init => {
                    // We have not even sent anything just close the connection without sending any
                    // error. This may happen when client_start fails.
                    self.set_state(State::Closed(error), now);
                }
                State::WaitInitial => {
                    // We don't have any state yet, so don't bother with
                    // the closing state, just send one CONNECTION_CLOSE.
                    if let Some(path) = path.or_else(|| self.paths.primary()) {
                        self.state_signaling
                            .close(path, error.clone(), frame_type, msg);
                    }
                    self.set_state(State::Closed(error), now);
                }
                _ => {
                    if let Some(path) = path.or_else(|| self.paths.primary()) {
                        self.state_signaling
                            .close(path, error.clone(), frame_type, msg);
                        if matches!(v, Error::KeysExhausted) {
                            self.set_state(State::Closed(error), now);
                        } else {
                            self.set_state(
                                State::Closing {
                                    error,
                                    timeout: self.get_closing_period_time(now),
                                },
                                now,
                            );
                        }
                    } else {
                        self.set_state(State::Closed(error), now);
                    }
                }
            }
        }
        res
    }

    /// For use with `process_input()`. Errors there can be ignored, but this
    /// needs to ensure that the state is updated.
    fn absorb_error<T>(&mut self, now: Instant, res: Res<T>) -> Option<T> {
        self.capture_error(None, now, 0, res).ok()
    }

    fn process_timer(&mut self, now: Instant) {
        match &self.state {
            // Only the client runs timers while waiting for Initial packets.
            State::WaitInitial => debug_assert_eq!(self.role, Role::Client),
            // If Closing or Draining, check if it is time to move to Closed.
            State::Closing { error, timeout } | State::Draining { error, timeout } => {
                if *timeout <= now {
                    let st = State::Closed(error.clone());
                    self.set_state(st, now);
                    qinfo!("Closing timer expired");
                    return;
                }
            }
            State::Closed(_) => {
                qdebug!("Timer fired while closed");
                return;
            }
            _ => (),
        }

        let pto = self.pto();
        if self.idle_timeout.expired(now, pto) {
            qinfo!([self], "idle timeout expired");
            self.set_state(
                State::Closed(CloseReason::Transport(Error::IdleTimeout)),
                now,
            );
            return;
        }

        if self.state.closing() {
            qtrace!([self], "Closing, not processing other timers");
            return;
        }

        self.streams.cleanup_closed_streams();

        let res = self.crypto.states.check_key_update(now);
        self.absorb_error(now, res);

        if let Some(path) = self.paths.primary() {
            let lost = self.loss_recovery.timeout(&path, now);
            self.handle_lost_packets(&lost);
            qlog::packets_lost(&self.qlog, &lost, now);
        }

        if self.release_resumption_token_timer.is_some() {
            self.create_resumption_token(now);
        }

        if !self
            .paths
            .process_timeout(now, pto, &mut self.stats.borrow_mut())
        {
            qinfo!([self], "last available path failed");
            self.absorb_error::<Error>(now, Err(Error::NoAvailablePath));
        }
    }

    /// Whether the given [`ConnectionIdRef`] is a valid local [`ConnectionId`].
    #[must_use]
    pub fn is_valid_local_cid(&self, cid: ConnectionIdRef) -> bool {
        self.cid_manager.is_valid(cid)
    }

    /// Process new input datagrams on the connection.
    pub fn process_input(&mut self, d: Datagram<impl AsRef<[u8]>>, now: Instant) {
        self.process_multiple_input(iter::once(d), now);
    }

    /// Process new input datagrams on the connection.
    pub fn process_multiple_input(
        &mut self,
        dgrams: impl IntoIterator<Item = Datagram<impl AsRef<[u8]>>>,
        now: Instant,
    ) {
        let mut dgrams = dgrams.into_iter().peekable();
        if dgrams.peek().is_none() {
            return;
        }

        for d in dgrams {
            self.input(d, now, now);
        }
        self.process_saved(now);
        self.streams.cleanup_closed_streams();
    }

    /// Get the time that we next need to be called back, relative to `now`.
    fn next_delay(&mut self, now: Instant, paced: bool) -> Duration {
        qtrace!([self], "Get callback delay {:?}", now);

        // Only one timer matters when closing...
        if let State::Closing { timeout, .. } | State::Draining { timeout, .. } = self.state {
            self.hrtime.update(Self::LOOSE_TIMER_RESOLUTION);
            return timeout.duration_since(now);
        }

        let mut delays = SmallVec::<[_; 7]>::new();
        if let Some(ack_time) = self.acks.ack_time(now) {
            qtrace!([self], "Delayed ACK timer {:?}", ack_time);
            delays.push(ack_time);
        }

        if let Some(p) = self.paths.primary() {
            let path = p.borrow();
            let rtt = path.rtt();
            let pto = rtt.pto(self.confirmed());

            let idle_time = self.idle_timeout.expiry(now, pto);
            qtrace!([self], "Idle timer {:?}", idle_time);
            delays.push(idle_time);

            if self.streams.need_keep_alive() {
                if let Some(keep_alive_time) = self.idle_timeout.next_keep_alive(now, pto) {
                    qtrace!([self], "Keep alive timer {:?}", keep_alive_time);
                    delays.push(keep_alive_time);
                }
            }

            if let Some(lr_time) = self.loss_recovery.next_timeout(&path) {
                qtrace!([self], "Loss recovery timer {:?}", lr_time);
                delays.push(lr_time);
            }

            if paced {
                if let Some(pace_time) = path.sender().next_paced(rtt.estimate()) {
                    qtrace!([self], "Pacing timer {:?}", pace_time);
                    delays.push(pace_time);
                }
            }

            if let Some(path_time) = self.paths.next_timeout(pto) {
                qtrace!([self], "Path probe timer {:?}", path_time);
                delays.push(path_time);
            }
        }

        if let Some(key_update_time) = self.crypto.states.update_time() {
            qtrace!([self], "Key update timer {:?}", key_update_time);
            delays.push(key_update_time);
        }

        // `release_resumption_token_timer` is not considered here, because
        // it is not important enough to force the application to set a
        // timeout for it  It is expected that other activities will
        // drive it.

        let earliest = delays.into_iter().min().unwrap();
        // TODO(agrover, mt) - need to analyze and fix #47
        // rather than just clamping to zero here.
        debug_assert!(earliest > now);
        let delay = earliest.saturating_duration_since(now);
        qdebug!([self], "delay duration {:?}", delay);
        self.hrtime.update(delay / 4);
        delay
    }

    /// Get output packets, as a result of receiving packets, or actions taken
    /// by the application.
    /// Returns datagrams to send, and how long to wait before calling again
    /// even if no incoming packets.
    #[must_use = "Output of the process_output function must be handled"]
    pub fn process_output(&mut self, now: Instant) -> Output {
        qtrace!([self], "process_output {:?} {:?}", self.state, now);

        match (&self.state, self.role) {
            (State::Init, Role::Client) => {
                let res = self.client_start(now);
                self.absorb_error(now, res);
            }
            (State::Init | State::WaitInitial, Role::Server) => {
                return Output::None;
            }
            _ => {
                self.process_timer(now);
            }
        }

        match self.output(now) {
            SendOption::Yes(dgram) => Output::Datagram(dgram),
            SendOption::No(paced) => match self.state {
                State::Init | State::Closed(_) => Output::None,
                State::Closing { timeout, .. } | State::Draining { timeout, .. } => {
                    Output::Callback(timeout.duration_since(now))
                }
                _ => Output::Callback(self.next_delay(now, paced)),
            },
        }
    }

    /// A test-only output function that uses the provided writer to
    /// pack something extra into the output.
    #[cfg(test)]
    pub fn test_write_frames<W>(&mut self, writer: W, now: Instant) -> Output
    where
        W: test_internal::FrameWriter + 'static,
    {
        self.test_frame_writer = Some(Box::new(writer));
        let res = self.process_output(now);
        self.test_frame_writer = None;
        res
    }

    /// Process input and generate output.
    #[must_use = "Output of the process function must be handled"]
    pub fn process(&mut self, dgram: Option<Datagram<impl AsRef<[u8]>>>, now: Instant) -> Output {
        if let Some(d) = dgram {
            self.input(d, now, now);
            self.process_saved(now);
        }
        #[allow(clippy::let_and_return)]
        let output = self.process_output(now);
        #[cfg(all(feature = "build-fuzzing-corpus", test))]
        if self.test_frame_writer.is_none() {
            if let Some(d) = output.clone().dgram() {
                neqo_common::write_item_to_fuzzing_corpus("packet", &d);
            }
        }
        output
    }

    fn handle_retry(&mut self, packet: &PublicPacket, now: Instant) -> Res<()> {
        qinfo!([self], "received Retry");
        if matches!(self.address_validation, AddressValidationInfo::Retry { .. }) {
            self.stats.borrow_mut().pkt_dropped("Extra Retry");
            return Ok(());
        }
        if packet.token().is_empty() {
            self.stats.borrow_mut().pkt_dropped("Retry without a token");
            return Ok(());
        }
        if !packet.is_valid_retry(
            self.original_destination_cid
                .as_ref()
                .ok_or(Error::InvalidRetry)?,
        ) {
            self.stats
                .borrow_mut()
                .pkt_dropped("Retry with bad integrity tag");
            return Ok(());
        }
        // At this point, we should only have the connection ID that we generated.
        // Update to the one that the server prefers.
        let Some(path) = self.paths.primary() else {
            self.stats
                .borrow_mut()
                .pkt_dropped("Retry without an existing path");
            return Ok(());
        };

        path.borrow_mut().set_remote_cid(packet.scid());

        let retry_scid = ConnectionId::from(packet.scid());
        qinfo!(
            [self],
            "Valid Retry received, token={} scid={}",
            hex(packet.token()),
            retry_scid
        );

        let lost_packets = self.loss_recovery.retry(&path, now);
        self.handle_lost_packets(&lost_packets);

        self.crypto.states.init(
            self.conn_params.get_versions().compatible(),
            self.role,
            &retry_scid,
        )?;
        self.address_validation = AddressValidationInfo::Retry {
            token: packet.token().to_vec(),
            retry_source_cid: retry_scid,
        };
        Ok(())
    }

    fn discard_keys(&mut self, space: PacketNumberSpace, now: Instant) {
        if self.crypto.discard(space) {
            qdebug!([self], "Drop packet number space {}", space);
            if let Some(path) = self.paths.primary() {
                self.loss_recovery.discard(&path, space, now);
            }
            self.acks.drop_space(space);
        }
    }

    fn is_stateless_reset(&self, path: &PathRef, d: &Datagram<impl AsRef<[u8]>>) -> bool {
        // If the datagram is too small, don't try.
        // If the connection is connected, then the reset token will be invalid.
        if d.len() < 16 || !self.state.connected() {
            return false;
        }
        <&[u8; 16]>::try_from(&d.as_ref()[d.len() - 16..])
            .is_ok_and(|token| path.borrow().is_stateless_reset(token))
    }

    fn check_stateless_reset(
        &mut self,
        path: &PathRef,
        d: &Datagram<impl AsRef<[u8]>>,
        first: bool,
        now: Instant,
    ) -> Res<()> {
        if first && self.is_stateless_reset(path, d) {
            // Failing to process a packet in a datagram might
            // indicate that there is a stateless reset present.
            qdebug!([self], "Stateless reset: {}", hex(&d[d.len() - 16..]));
            self.state_signaling.reset();
            self.set_state(
                State::Draining {
                    error: CloseReason::Transport(Error::StatelessReset),
                    timeout: self.get_closing_period_time(now),
                },
                now,
            );
            Err(Error::StatelessReset)
        } else {
            Ok(())
        }
    }

    /// Process any saved datagrams that might be available for processing.
    fn process_saved(&mut self, now: Instant) {
        while let Some(cspace) = self.saved_datagrams.available() {
            qdebug!([self], "process saved for space {:?}", cspace);
            debug_assert!(self.crypto.states.rx_hp(self.version, cspace).is_some());
            for saved in self.saved_datagrams.take_saved() {
                qtrace!([self], "input saved @{:?}: {:?}", saved.t, saved.d);
                self.input(saved.d, saved.t, now);
            }
        }
    }

    /// In case a datagram arrives that we can only partially process, save any
    /// part that we don't have keys for.
    #[allow(clippy::needless_pass_by_value)] // To consume an owned datagram below.
    fn save_datagram(
        &mut self,
        cspace: CryptoSpace,
        d: Datagram<impl AsRef<[u8]>>,
        remaining: usize,
        now: Instant,
    ) {
        let d = Datagram::new(
            d.source(),
            d.destination(),
            d.tos(),
            d[d.len() - remaining..].to_vec(),
        );
        self.saved_datagrams.save(cspace, d, now);
        self.stats.borrow_mut().saved_datagrams += 1;
    }

    /// Perform version negotiation.
    fn version_negotiation(&mut self, supported: &[WireVersion], now: Instant) -> Res<()> {
        debug_assert_eq!(self.role, Role::Client);

        if let Some(version) = self.conn_params.get_versions().preferred(supported) {
            assert_ne!(self.version, version);

            qinfo!([self], "Version negotiation: trying {:?}", version);
            let path = self.paths.primary().ok_or(Error::NoAvailablePath)?;
            let local_addr = path.borrow().local_address();
            let remote_addr = path.borrow().remote_address();
            let conn_params = self
                .conn_params
                .clone()
                .versions(version, self.conn_params.get_versions().all().to_vec());
            let mut c = Self::new_client(
                self.crypto.server_name().ok_or(Error::VersionNegotiation)?,
                self.crypto.protocols(),
                self.cid_manager.generator(),
                local_addr,
                remote_addr,
                conn_params,
                now,
            )?;
            c.conn_params
                .get_versions_mut()
                .set_initial(self.conn_params.get_versions().initial());
            mem::swap(self, &mut c);
            qlog::client_version_information_negotiated(
                &self.qlog,
                self.conn_params.get_versions().all(),
                supported,
                version,
                now,
            );
            Ok(())
        } else {
            qinfo!([self], "Version negotiation: failed with {:?}", supported);
            // This error goes straight to closed.
            self.set_state(
                State::Closed(CloseReason::Transport(Error::VersionNegotiation)),
                now,
            );
            Err(Error::VersionNegotiation)
        }
    }

    /// Perform any processing that we might have to do on packets prior to
    /// attempting to remove protection.
    #[allow(clippy::too_many_lines)] // Yeah, it's a work in progress.
    fn preprocess_packet(
        &mut self,
        packet: &PublicPacket,
        path: &PathRef,
        dcid: Option<&ConnectionId>,
        now: Instant,
    ) -> Res<PreprocessResult> {
        if dcid.is_some_and(|d| d != &packet.dcid()) {
            self.stats
                .borrow_mut()
                .pkt_dropped("Coalesced packet has different DCID");
            return Ok(PreprocessResult::Next);
        }

        if (packet.packet_type() == PacketType::Initial
            || packet.packet_type() == PacketType::Handshake)
            && self.role == Role::Client
            && !path.borrow().is_primary()
        {
            // If we have received a packet from a different address than we have sent to
            // we should ignore the packet. In such a case a path will be a newly created
            // temporary path, not the primary path.
            return Ok(PreprocessResult::Next);
        }

        match (packet.packet_type(), &self.state, &self.role) {
            (PacketType::Initial, State::Init, Role::Server) => {
                let version = packet.version().ok_or(Error::ProtocolViolation)?;
                if !packet.is_valid_initial()
                    || !self.conn_params.get_versions().all().contains(&version)
                {
                    self.stats.borrow_mut().pkt_dropped("Invalid Initial");
                    return Ok(PreprocessResult::Next);
                }
                qinfo!(
                    [self],
                    "Received valid Initial packet with scid {:?} dcid {:?}",
                    packet.scid(),
                    packet.dcid()
                );
                // Record the client's selected CID so that it can be accepted until
                // the client starts using a real connection ID.
                let dcid = ConnectionId::from(packet.dcid());
                self.crypto.states.init_server(version, &dcid)?;
                self.original_destination_cid = Some(dcid);
                self.set_state(State::WaitInitial, now);

                // We need to make sure that we set this transport parameter.
                // This has to happen prior to processing the packet so that
                // the TLS handshake has all it needs.
                if !self.retry_sent() {
                    self.tps.borrow_mut().local.set_bytes(
                        tparams::ORIGINAL_DESTINATION_CONNECTION_ID,
                        packet.dcid().to_vec(),
                    );
                }
            }
            (PacketType::VersionNegotiation, State::WaitInitial, Role::Client) => {
                if let Ok(versions) = packet.supported_versions() {
                    if versions.is_empty()
                        || versions.contains(&self.version().wire_version())
                        || versions.contains(&0)
                        || &packet.scid() != self.odcid().ok_or(Error::InternalError)?
                        || matches!(self.address_validation, AddressValidationInfo::Retry { .. })
                    {
                        // Ignore VersionNegotiation packets that contain the current version.
                        // Or don't have the right connection ID.
                        // Or are received after a Retry.
                        self.stats.borrow_mut().pkt_dropped("Invalid VN");
                    } else {
                        self.version_negotiation(&versions, now)?;
                    }
                } else {
                    self.stats.borrow_mut().pkt_dropped("VN with no versions");
                };
                return Ok(PreprocessResult::End);
            }
            (PacketType::Retry, State::WaitInitial, Role::Client) => {
                self.handle_retry(packet, now)?;
                return Ok(PreprocessResult::Next);
            }
            (PacketType::Handshake | PacketType::Short, State::WaitInitial, Role::Client) => {
                // This packet can't be processed now, but it could be a sign
                // that Initial packets were lost.
                // Resend Initial CRYPTO frames immediately a few times just
                // in case.  As we don't have an RTT estimate yet, this helps
                // when there is a short RTT and losses. Also mark all 0-RTT
                // data as lost.
                if dcid.is_none()
                    && self.cid_manager.is_valid(packet.dcid())
                    && self.stats.borrow().saved_datagrams <= EXTRA_INITIALS
                {
                    self.crypto.resend_unacked(PacketNumberSpace::Initial);
                    self.resend_0rtt(now);
                }
            }
            (PacketType::VersionNegotiation | PacketType::Retry | PacketType::OtherVersion, ..) => {
                self.stats
                    .borrow_mut()
                    .pkt_dropped(format!("{:?}", packet.packet_type()));
                return Ok(PreprocessResult::Next);
            }
            _ => {}
        }

        let res = match self.state {
            State::Init => {
                self.stats
                    .borrow_mut()
                    .pkt_dropped("Received while in Init state");
                PreprocessResult::Next
            }
            State::WaitInitial => PreprocessResult::Continue,
            State::WaitVersion | State::Handshaking | State::Connected | State::Confirmed => {
                if self.cid_manager.is_valid(packet.dcid()) {
                    if self.role == Role::Server && packet.packet_type() == PacketType::Handshake {
                        // Server has received a Handshake packet -> discard Initial keys and states
                        self.discard_keys(PacketNumberSpace::Initial, now);
                    }
                    PreprocessResult::Continue
                } else {
                    self.stats
                        .borrow_mut()
                        .pkt_dropped(format!("Invalid DCID {:?}", packet.dcid()));
                    PreprocessResult::Next
                }
            }
            State::Closing { .. } => {
                // Don't bother processing the packet. Instead ask to get a
                // new close frame.
                self.state_signaling.send_close();
                PreprocessResult::Next
            }
            State::Draining { .. } | State::Closed(..) => {
                // Do nothing.
                self.stats
                    .borrow_mut()
                    .pkt_dropped(format!("State {:?}", self.state));
                PreprocessResult::Next
            }
        };
        Ok(res)
    }

    /// After a Initial, Handshake, `ZeroRtt`, or Short packet is successfully processed.
    fn postprocess_packet(
        &mut self,
        path: &PathRef,
        d: &Datagram<impl AsRef<[u8]>>,
        packet: &PublicPacket,
        migrate: bool,
        now: Instant,
    ) {
        let space = PacketNumberSpace::from(packet.packet_type());
        if let Some(space) = self.acks.get_mut(space) {
            let space_ecn_marks = space.ecn_marks();
            *space_ecn_marks += d.tos().into();
            self.stats.borrow_mut().ecn_rx = *space_ecn_marks;
        } else {
            qtrace!("Not tracking ECN for dropped packet number space");
        }

        if self.state == State::WaitInitial {
            self.start_handshake(path, packet, now);
        }

        if self.state.connected() {
            self.handle_migration(path, d, migrate, now);
        } else if self.role != Role::Client
            && (packet.packet_type() == PacketType::Handshake
                || (packet.dcid().len() >= 8 && packet.dcid() == self.local_initial_source_cid))
        {
            // We only allow one path during setup, so apply handshake
            // path validation to this path.
            path.borrow_mut().set_valid(now);
        }
    }

    /// Take a datagram as input.  This reports an error if the packet was bad.
    /// This takes two times: when the datagram was received, and the current time.
    fn input(&mut self, d: Datagram<impl AsRef<[u8]>>, received: Instant, now: Instant) {
        // First determine the path.
        let path = self.paths.find_path(
            d.destination(),
            d.source(),
            self.conn_params.get_cc_algorithm(),
            self.conn_params.pacing_enabled(),
            now,
        );
        path.borrow_mut().add_received(d.len());
        let res = self.input_path(&path, d, received);
        self.capture_error(Some(path), now, 0, res).ok();
    }

    fn input_path(
        &mut self,
        path: &PathRef,
        d: Datagram<impl AsRef<[u8]>>,
        now: Instant,
    ) -> Res<()> {
        let mut slc = d.as_ref();
        let mut dcid = None;

        qtrace!([self], "{} input {}", path.borrow(), hex(&d));
        let pto = path.borrow().rtt().pto(self.confirmed());

        // Handle each packet in the datagram.
        while !slc.is_empty() {
            self.stats.borrow_mut().packets_rx += 1;
            let (packet, remainder) =
                match PublicPacket::decode(slc, self.cid_manager.decoder().as_ref()) {
                    Ok((packet, remainder)) => (packet, remainder),
                    Err(e) => {
                        qinfo!([self], "Garbage packet: {}", e);
                        qtrace!([self], "Garbage packet contents: {}", hex(slc));
                        self.stats.borrow_mut().pkt_dropped("Garbage packet");
                        break;
                    }
                };
            match self.preprocess_packet(&packet, path, dcid.as_ref(), now)? {
                PreprocessResult::Continue => (),
                PreprocessResult::Next => break,
                PreprocessResult::End => return Ok(()),
            }

            qtrace!([self], "Received unverified packet {:?}", packet);

            match packet.decrypt(&mut self.crypto.states, now + pto) {
                Ok(payload) => {
                    // OK, we have a valid packet.
                    self.idle_timeout.on_packet_received(now);
                    dump_packet(
                        self,
                        path,
                        "-> RX",
                        payload.packet_type(),
                        payload.pn(),
                        &payload[..],
                        d.tos(),
                        d.len(),
                    );

                    #[cfg(feature = "build-fuzzing-corpus")]
                    if packet.packet_type() == PacketType::Initial {
                        let target = if self.role == Role::Client {
                            "server_initial"
                        } else {
                            "client_initial"
                        };
                        neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]);
                    }

                    qlog::packet_received(&self.qlog, &packet, &payload, now);
                    let space = PacketNumberSpace::from(payload.packet_type());
                    if let Some(space) = self.acks.get_mut(space) {
                        if space.is_duplicate(payload.pn()) {
                            qdebug!("Duplicate packet {}-{}", space, payload.pn());
                            self.stats.borrow_mut().dups_rx += 1;
                        } else {
                            match self.process_packet(path, &payload, now) {
                                Ok(migrate) => {
                                    self.postprocess_packet(path, &d, &packet, migrate, now);
                                }
                                Err(e) => {
                                    self.ensure_error_path(path, &packet, now);
                                    return Err(e);
                                }
                            }
                        }
                    } else {
                        qdebug!(
                            [self],
                            "Received packet {} for untracked space {}",
                            space,
                            payload.pn()
                        );
                        return Err(Error::ProtocolViolation);
                    }
                }
                Err(e) => {
                    match e {
                        Error::KeysPending(cspace) => {
                            // This packet can't be decrypted because we don't have the keys yet.
                            // Don't check this packet for a stateless reset, just return.
                            let remaining = slc.len();
                            self.save_datagram(cspace, d, remaining, now);
                            return Ok(());
                        }
                        Error::KeysExhausted => {
                            // Exhausting read keys is fatal.
                            return Err(e);
                        }
                        Error::KeysDiscarded(cspace) => {
                            // This was a valid-appearing Initial packet: maybe probe with
                            // a Handshake packet to keep the handshake moving.
                            self.received_untracked |=
                                self.role == Role::Client && cspace == CryptoSpace::Initial;
                        }
                        _ => (),
                    }
                    // Decryption failure, or not having keys is not fatal.
                    // If the state isn't available, or we can't decrypt the packet, drop
                    // the rest of the datagram on the floor, but don't generate an error.
                    self.check_stateless_reset(path, &d, dcid.is_none(), now)?;
                    self.stats.borrow_mut().pkt_dropped("Decryption failure");
                    qlog::packet_dropped(&self.qlog, &packet, now);
                }
            }
            slc = remainder;
            dcid = Some(ConnectionId::from(packet.dcid()));
        }
        self.check_stateless_reset(path, &d, dcid.is_none(), now)?;
        Ok(())
    }

    /// Process a packet.  Returns true if the packet might initiate migration.
    fn process_packet(
        &mut self,
        path: &PathRef,
        packet: &DecryptedPacket,
        now: Instant,
    ) -> Res<bool> {
        (!packet.is_empty())
            .then_some(())
            .ok_or(Error::ProtocolViolation)?;

        // TODO(ekr@rtfm.com): Have the server blow away the initial
        // crypto state if this fails? Otherwise, we will get a panic
        // on the assert for doesn't exist.
        // OK, we have a valid packet.

        // Get the next packet number we'll send, for ACK verification.
        // TODO: Once PR #2118 lands, this can move to `input_frame`. For now, it needs to be here,
        // because we can drop packet number spaces as we parse throught the packet, and if an ACK
        // frame follows a CRYPTO frame that makes us drop a space, we need to know this
        // packet number to verify the ACK against.
        let next_pn = self
            .crypto
            .states
            .select_tx(self.version, PacketNumberSpace::from(packet.packet_type()))
            .map_or(0, |(_, tx)| tx.next_pn());

        let mut ack_eliciting = false;
        let mut probing = true;
        let mut d = Decoder::from(&packet[..]);
        while d.remaining() > 0 {
            #[cfg(feature = "build-fuzzing-corpus")]
            let pos = d.offset();
            let f = Frame::decode(&mut d)?;
            #[cfg(feature = "build-fuzzing-corpus")]
            neqo_common::write_item_to_fuzzing_corpus("frame", &packet[pos..d.offset()]);
            ack_eliciting |= f.ack_eliciting();
            probing &= f.path_probing();
            let t = f.get_type();
            if let Err(e) = self.input_frame(
                path,
                packet.version(),
                packet.packet_type(),
                f,
                next_pn,
                now,
            ) {
                self.capture_error(Some(Rc::clone(path)), now, t, Err(e))?;
            }
--> --------------------

--> maximum size reached

--> --------------------

[ Dauer der Verarbeitung: 0.27 Sekunden  (vorverarbeitet)  ]