|
|
|
|
Quelle mod.rs
Sprache: unbekannt
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
// The class implementing a QUIC connection.
use std::{
cell::RefCell,
cmp::{max, min},
fmt::{self, Debug},
iter, mem,
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
ops::RangeInclusive,
rc::{Rc, Weak},
time::{Duration, Instant},
};
use neqo_common::{
event::Provider as EventProvider, hex, hex_snip_middle, hrtime, qdebug, qerror, qinfo,
qlog::NeqoQlog, qtrace, qwarn, Datagram, Decoder, Encoder, Role,
};
use neqo_crypto::{
agent::CertificateInfo, Agent, AntiReplay, AuthenticationStatus, Cipher, Client, Group ,
HandshakeState, PrivateKey, PublicKey, ResumptionToken, SecretAgentInfo, SecretAgentPreInfo,
Server, ZeroRttChecker,
};
use smallvec::SmallVec;
use crate::{
addr_valid::{AddressValidation, NewTokenState},
cid::{
ConnectionId, ConnectionIdEntry, ConnectionIdGenerator, ConnectionIdManager,
ConnectionIdRef, ConnectionIdStore, LOCAL_ACTIVE_CID_LIMIT,
},
crypto::{Crypto, CryptoDxState, CryptoSpace},
ecn::EcnCount,
events::{ConnectionEvent, ConnectionEvents, OutgoingDatagramOutcome},
frame::{
CloseError, Frame, FrameType, FRAME_TYPE_CONNECTION_CLOSE_APPLICATION,
FRAME_TYPE_CONNECTION_CLOSE_TRANSPORT,
},
packet::{DecryptedPacket, PacketBuilder, PacketNumber, PacketType, PublicPacket},
path::{Path, PathRef, Paths},
qlog,
quic_datagrams::{DatagramTracking, QuicDatagrams},
recovery::{LossRecovery, RecoveryToken, SendProfile, SentPacket},
recv_stream::RecvStreamStats,
rtt::{RttEstimate, GRANULARITY, INITIAL_RTT},
send_stream::SendStream,
stats::{Stats, StatsCell},
stream_id::StreamType,
streams::{SendOrder, Streams},
tparams::{
self, TransportParameter, TransportParameterId, TransportParameters,
TransportParametersHandler,
},
tracking::{AckTracker, PacketNumberSpace, RecvdPackets},
version::{Version, WireVersion},
AppError, CloseReason, Error, Res, StreamId,
};
mod dump;
mod idle;
pub mod params;
mod saved;
mod state;
#[cfg(test)]
pub mod test_internal;
use dump::dump_packet;
use idle::IdleTimeout;
pub use params::ConnectionParameters;
use params::PreferredAddressConfig;
#[cfg(test)]
pub use params::ACK_RATIO_SCALE;
use saved::SavedDatagrams;
use state::StateSignaling;
pub use state::{ClosingFrame, State};
pub use crate::send_stream::{RetransmissionPriority, SendStreamStats, TransmissionPriority};
/// The number of Initial packets that the client will send in response
/// to receiving an undecryptable packet during the early part of the
/// handshake. This is a hack, but a useful one.
const EXTRA_INITIALS: usize = 4;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum ZeroRttState {
Init,
Sending,
AcceptedClient,
AcceptedServer,
Rejected,
}
#[derive(Clone, Debug, PartialEq, Eq)]
/// Type returned from `process()` and `process_output()`. Users are required to
/// call these repeatedly until `Callback` or `None` is returned.
pub enum Output {
/// Connection requires no action.
None,
/// Connection requires the datagram be sent.
Datagram(Datagram),
/// Connection requires `process_input()` be called when the `Duration`
/// elapses.
Callback(Duration),
}
impl Output {
/// Convert into an `Option<Datagram>`.
#[must_use]
pub fn dgram(self) -> Option<Datagram> {
match self {
Self::Datagram(dg) => Some(dg),
_ => None,
}
}
/// Get a reference to the Datagram, if any.
#[must_use]
pub const fn as_dgram_ref(&self) -> Option<&Datagram> {
match self {
Self::Datagram(dg) => Some(dg),
_ => None,
}
}
/// Ask how long the caller should wait before calling back.
#[must_use]
pub const fn callback(&self) -> Duration {
match self {
Self::Callback(t) => *t,
_ => Duration::new(0, 0),
}
}
#[must_use]
pub fn or_else<F>(self, f: F) -> Self
where
F: FnOnce() -> Self,
{
match self {
x @ (Self::Datagram(_) | Self::Callback(_)) => x,
Self::None => f(),
}
}
}
/// Used by inner functions like `Connection::output`.
enum SendOption {
/// Yes, please send this datagram.
Yes(Datagram),
/// Don't send. If this was blocked on the pacer (the arg is true).
No(bool),
}
impl Default for SendOption {
fn default() -> Self {
Self::No(false)
}
}
/// Used by `Connection::preprocess` to determine what to do
/// with an packet before attempting to remove protection.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PreprocessResult {
/// End processing and return successfully.
End,
/// Stop processing this datagram and move on to the next.
Next,
/// Continue and process this packet.
Continue,
}
/// `AddressValidationInfo` holds information relevant to either
/// responding to address validation (`NewToken`, `Retry`) or generating
/// tokens for address validation (`Server`).
enum AddressValidationInfo {
None,
// We are a client and have information from `NEW_TOKEN`.
NewToken(Vec<u8>),
// We are a client and have received a `Retry` packet.
Retry {
token: Vec<u8>,
retry_source_cid: ConnectionId,
},
// We are a server and can generate tokens.
Server(Weak<RefCell<AddressValidation>>),
}
impl AddressValidationInfo {
pub fn token(&self) -> &[u8] {
match self {
Self::NewToken(token) | Self::Retry { token, .. } => token,
_ => &[],
}
}
pub fn generate_new_token(&self, peer_address: SocketAddr, now: Instant) -> Option<Vec<u8>> {
match self {
Self::Server(ref w) => w.upgrade().and_then(|validation| {
validation
.borrow()
.generate_new_token(peer_address, now)
.ok()
}),
Self::None => None,
_ => unreachable!("called a server function on a client"),
}
}
}
/// A QUIC Connection
///
/// First, create a new connection using `new_client()` or `new_server()`.
///
/// For the life of the connection, handle activity in the following manner:
/// 1. Perform operations using the `stream_*()` methods.
/// 1. Call `process_input()` when a datagram is received or the timer expires. Obtain information
/// on connection state changes by checking `events()`.
/// 1. Having completed handling current activity, repeatedly call `process_output()` for packets to
/// send, until it returns `Output::Callback` or `Output::None`.
///
/// After the connection is closed (either by calling `close()` or by the
/// remote) continue processing until `state()` returns `Closed`.
pub struct Connection {
role: Role,
version: Version,
state: State,
tps: Rc<RefCell<TransportParametersHandler>>,
/// What we are doing with 0-RTT.
zero_rtt_state: ZeroRttState,
/// All of the network paths that we are aware of.
paths: Paths,
/// This object will generate connection IDs for the connection.
cid_manager: ConnectionIdManager,
address_validation: AddressValidationInfo,
/// The connection IDs that were provided by the peer.
connection_ids: ConnectionIdStore<[u8; 16]>,
/// The source connection ID that this endpoint uses for the handshake.
/// Since we need to communicate this to our peer in tparams, setting this
/// value is part of constructing the struct.
local_initial_source_cid: ConnectionId,
/// The source connection ID from the first packet from the other end.
/// This is checked against the peer's transport parameters.
remote_initial_source_cid: Option<ConnectionId>,
/// The destination connection ID from the first packet from the client.
/// This is checked by the client against the server's transport parameters.
original_destination_cid: Option<ConnectionId>,
/// We sometimes save a datagram against the possibility that keys will later
/// become available. This avoids reporting packets as dropped during the handshake
/// when they are either just reordered or we haven't been able to install keys yet.
/// In particular, this occurs when asynchronous certificate validation happens.
saved_datagrams: SavedDatagrams,
/// Some packets were received, but not tracked.
received_untracked: bool,
/// This is responsible for the `QuicDatagrams`' handling:
/// <https://datatracker.ietf.org/doc/html/draft-ietf-quic-datagram>
quic_datagrams: QuicDatagrams,
pub(crate) crypto: Crypto,
pub(crate) acks: AckTracker,
idle_timeout: IdleTimeout,
streams: Streams,
state_signaling: StateSignaling,
loss_recovery: LossRecovery,
events: ConnectionEvents,
new_token: NewTokenState,
stats: StatsCell,
qlog: NeqoQlog,
/// A session ticket was received without `NEW_TOKEN`,
/// this is when that turns into an event without `NEW_TOKEN`.
release_resumption_token_timer: Option<Instant>,
conn_params: ConnectionParameters,
hrtime: hrtime::Handle,
/// For testing purposes it is sometimes necessary to inject frames that wouldn't
/// otherwise be sent, just to see how a connection handles them. Inserting them
/// into packets proper mean that the frames follow the entire processing path.
#[cfg(test)]
pub test_frame_writer: Option<Box<dyn test_internal::FrameWriter>>,
}
impl Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{:?} Connection: {:?} {:?}",
self.role,
self.state,
self.paths.primary()
)
}
}
impl Connection {
/// A long default for timer resolution, so that we don't tax the
/// system too hard when we don't need to.
const LOOSE_TIMER_RESOLUTION: Duration = Duration::from_millis(50);
/// Create a new QUIC connection with Client role.
/// # Errors
/// When NSS fails and an agent cannot be created.
pub fn new_client(
server_name: impl Into<String>,
protocols: &[impl AsRef<str>],
cid_generator: Rc<RefCell<dyn ConnectionIdGenerator>>,
local_addr: SocketAddr,
remote_addr: SocketAddr,
conn_params: ConnectionParameters,
now: Instant,
) -> Res<Self> {
let dcid = ConnectionId::generate_initial();
let mut c = Self::new(
Role::Client,
Agent::from(Client::new(server_name.into(), conn_params.is_greasing())?),
cid_generator,
protocols,
conn_params,
)?;
c.crypto.states.init(
c.conn_params.get_versions().compatible(),
Role::Client,
&dcid,
)?;
c.original_destination_cid = Some(dcid);
let path = Path::temporary(
local_addr,
remote_addr,
c.conn_params.get_cc_algorithm(),
c.conn_params.pacing_enabled(),
NeqoQlog::default(),
now,
);
c.setup_handshake_path(&Rc::new(RefCell::new(path)), now);
Ok(c)
}
/// Create a new QUIC connection with Server role.
/// # Errors
/// When NSS fails and an agent cannot be created.
pub fn new_server(
certs: &[impl AsRef<str>],
protocols: &[impl AsRef<str>],
cid_generator: Rc<RefCell<dyn ConnectionIdGenerator>>,
conn_params: ConnectionParameters,
) -> Res<Self> {
Self::new(
Role::Server,
Agent::from(Server::new(certs)?),
cid_generator,
protocols,
conn_params,
)
}
fn new<P: AsRef<str>>(
role: Role,
agent: Agent,
cid_generator: Rc<RefCell<dyn ConnectionIdGenerator>>,
protocols: &[P],
conn_params: ConnectionParameters,
) -> Res<Self> {
// Setup the local connection ID.
let local_initial_source_cid = cid_generator
.borrow_mut()
.generate_cid()
.ok_or(Error::ConnectionIdsExhausted)?;
let mut cid_manager =
ConnectionIdManager::new(cid_generator, local_initial_source_cid.clone());
let mut tps = conn_params.create_transport_parameter(role, &mut cid_manager)?;
tps.local.set_bytes(
tparams::INITIAL_SOURCE_CONNECTION_ID,
local_initial_source_cid.to_vec(),
);
let tphandler = Rc::new(RefCell::new(tps));
let crypto = Crypto::new(
conn_params.get_versions().initial(),
agent,
protocols.iter().map(P::as_ref).map(String::from).collect(),
Rc::clone(&tphandler),
)?;
let stats = StatsCell::default();
let events = ConnectionEvents::default();
let quic_datagrams = QuicDatagrams::new(
conn_params.get_datagram_size(),
conn_params.get_outgoing_datagram_queue(),
conn_params.get_incoming_datagram_queue(),
events.clone(),
);
let c = Self {
role,
version: conn_params.get_versions().initial(),
state: State::Init,
paths: Paths::default(),
cid_manager,
tps: tphandler.clone(),
zero_rtt_state: ZeroRttState::Init,
address_validation: AddressValidationInfo::None,
local_initial_source_cid,
remote_initial_source_cid: None,
original_destination_cid: None,
saved_datagrams: SavedDatagrams::default(),
received_untracked: false,
crypto,
acks: AckTracker::default(),
idle_timeout: IdleTimeout::new(conn_params.get_idle_timeout()),
streams: Streams::new(tphandler, role, events.clone()),
connection_ids: ConnectionIdStore::default(),
state_signaling: StateSignaling::Idle,
loss_recovery: LossRecovery::new(stats.clone(), conn_params.get_fast_pto()),
events,
new_token: NewTokenState::new(role),
stats,
qlog: NeqoQlog::disabled(),
release_resumption_token_timer: None,
conn_params,
hrtime: hrtime::Time::get(Self::LOOSE_TIMER_RESOLUTION),
quic_datagrams,
#[cfg(test)]
test_frame_writer: None,
};
c.stats.borrow_mut().init(format!("{c}"));
Ok(c)
}
/// # Errors
/// When the operation fails.
pub fn server_enable_0rtt(
&mut self,
anti_replay: &AntiReplay,
zero_rtt_checker: impl ZeroRttChecker + 'static,
) -> Res<()> {
self.crypto
.server_enable_0rtt(self.tps.clone(), anti_replay, zero_rtt_checker)
}
/// # Errors
/// When the operation fails.
pub fn server_enable_ech(
&mut self,
config: u8,
public_name: &str,
sk: &PrivateKey,
pk: &PublicKey,
) -> Res<()> {
self.crypto.server_enable_ech(config, public_name, sk, pk)
}
/// Get the active ECH configuration, which is empty if ECH is disabled.
#[must_use]
pub fn ech_config(&self) -> &[u8] {
self.crypto.ech_config()
}
/// # Errors
/// When the operation fails.
pub fn client_enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> {
self.crypto.client_enable_ech(ech_config_list)
}
/// Set or clear the qlog for this connection.
pub fn set_qlog(&mut self, qlog: NeqoQlog) {
self.loss_recovery.set_qlog(qlog.clone());
self.paths.set_qlog(qlog.clone());
self.qlog = qlog;
}
/// Get the qlog (if any) for this connection.
pub fn qlog_mut(&mut self) -> &mut NeqoQlog {
&mut self.qlog
}
/// Get the original destination connection id for this connection. This
/// will always be present for `Role::Client` but not if `Role::Server` is in
/// `State::Init`.
#[must_use]
pub const fn odcid(&self) -> Option<&ConnectionId> {
self.original_destination_cid.as_ref()
}
/// Set a local transport parameter, possibly overriding a default value.
/// This only sets transport parameters without dealing with other aspects of
/// setting the value.
///
/// # Errors
/// When the transport parameter is invalid.
/// # Panics
/// This panics if the transport parameter is known to this crate.
pub fn set_local_tparam(&self, tp: TransportParameterId, value: TransportParameter) -> Res<()> {
#[cfg(not(test))]
{
assert!(!tparams::INTERNAL_TRANSPORT_PARAMETERS.contains(&tp));
}
if *self.state() == State::Init {
self.tps.borrow_mut().local.set(tp, value);
Ok(())
} else {
qerror!("Current state: {:?}", self.state());
qerror!("Cannot set local tparam when not in an initial connection state.");
Err(Error::ConnectionState)
}
}
/// `odcid` is their original choice for our CID, which we get from the Retry token.
/// `remote_cid` is the value from the Source Connection ID field of an incoming packet: what
/// the peer wants us to use now. `retry_cid` is what we asked them to use when we sent the
/// Retry.
pub(crate) fn set_retry_cids(
&mut self,
odcid: &ConnectionId,
remote_cid: ConnectionId,
retry_cid: &ConnectionId,
) {
debug_assert_eq!(self.role, Role::Server);
qtrace!(
[self],
"Retry CIDs: odcid={} remote={} retry={}",
odcid,
remote_cid,
retry_cid
);
// We advertise "our" choices in transport parameters.
let local_tps = &mut self.tps.borrow_mut().local;
local_tps.set_bytes(tparams::ORIGINAL_DESTINATION_CONNECTION_ID, odcid.to_vec());
local_tps.set_bytes(tparams::RETRY_SOURCE_CONNECTION_ID, retry_cid.to_vec());
// ...and save their choices for later validation.
self.remote_initial_source_cid = Some(remote_cid);
}
fn retry_sent(&self) -> bool {
self.tps
.borrow()
.local
.get_bytes(tparams::RETRY_SOURCE_CONNECTION_ID)
.is_some()
}
/// Set ALPN preferences. Strings that appear earlier in the list are given
/// higher preference.
/// # Errors
/// When the operation fails, which is usually due to bad inputs or bad connection state.
pub fn set_alpn(&mut self, protocols: &[impl AsRef<str>]) -> Res<()> {
self.crypto.tls.set_alpn(protocols)?;
Ok(())
}
/// Enable a set of ciphers.
/// # Errors
/// When the operation fails, which is usually due to bad inputs or bad connection state.
pub fn set_ciphers(&mut self, ciphers: &[Cipher]) -> Res<()> {
if self.state != State::Init {
qerror!([self], "Cannot enable ciphers in state {:?}", self.state);
return Err(Error::ConnectionState);
}
self.crypto.tls.set_ciphers(ciphers)?;
Ok(())
}
/// Enable a set of key exchange groups.
/// # Errors
/// When the operation fails, which is usually due to bad inputs or bad connection state.
pub fn set_groups(&mut self, groups: &[Group]) -> Res<()> {
if self.state != State::Init {
qerror!([self], "Cannot enable groups in state {:?}", self.state);
return Err(Error::ConnectionState);
}
self.crypto.tls.set_groups(groups)?;
Ok(())
}
/// Set the number of additional key shares to send in the client hello.
/// # Errors
/// When the operation fails, which is usually due to bad inputs or bad connection state.
pub fn send_additional_key_shares(&mut self, count: usize) -> Res<()> {
if self.state != State::Init {
qerror!([self], "Cannot enable groups in state {:?}", self.state);
return Err(Error::ConnectionState);
}
self.crypto.tls.send_additional_key_shares(count)?;
Ok(())
}
fn make_resumption_token(&mut self) -> ResumptionToken {
debug_assert_eq!(self.role, Role::Client);
debug_assert!(self.crypto.has_resumption_token());
// Values less than GRANULARITY are ignored when using the token, so use 0 where needed.
let rtt = self.paths.primary().map_or_else(
// If we don't have a path, we don't have an RTT.
|| Duration::from_millis(0),
|p| {
let rtt = p.borrow().rtt().estimate();
if p.borrow().rtt().is_guesstimate() {
// When we have no actual RTT sample, do not encode a guestimated RTT larger
// than the default initial RTT. (The guess can be very large under lossy
// conditions.)
if rtt < INITIAL_RTT {
rtt
} else {
Duration::from_millis(0)
}
} else {
rtt
}
},
);
self.crypto
.create_resumption_token(
self.new_token.take_token(),
self.tps
.borrow()
.remote
.as_ref()
.expect("should have transport parameters"),
self.version,
u64::try_from(rtt.as_millis()).unwrap_or(0),
)
.unwrap()
}
fn confirmed(&self) -> bool {
self.state == State::Confirmed
}
/// Get the simplest PTO calculation for all those cases where we need
/// a value of this approximate order. Don't use this for loss recovery,
/// only use it where a more precise value is not important.
fn pto(&self) -> Duration {
self.paths.primary().map_or_else(
|| RttEstimate::default().pto(self.confirmed()),
|p| p.borrow().rtt().pto(self.confirmed()),
)
}
fn create_resumption_token(&mut self, now: Instant) {
if self.role == Role::Server || self.state < State::Connected {
return;
}
qtrace!(
[self],
"Maybe create resumption token: {} {}",
self.crypto.has_resumption_token(),
self.new_token.has_token()
);
while self.crypto.has_resumption_token() && self.new_token.has_token() {
let token = self.make_resumption_token();
self.events.client_resumption_token(token);
}
// If we have a resumption ticket check or set a timer.
if self.crypto.has_resumption_token() {
let arm = if let Some(expiration_time) = self.release_resumption_token_timer {
if expiration_time <= now {
let token = self.make_resumption_token();
self.events.client_resumption_token(token);
self.release_resumption_token_timer = None;
// This means that we release one session ticket every 3 PTOs
// if no NEW_TOKEN frame is received.
self.crypto.has_resumption_token()
} else {
false
}
} else {
true
};
if arm {
self.release_resumption_token_timer = Some(now + 3 * self.pto());
}
}
}
/// The correct way to obtain a resumption token is to wait for the
/// `ConnectionEvent::ResumptionToken` event. To emit the event we are waiting for a
/// resumption token and a `NEW_TOKEN` frame to arrive. Some servers don't send `NEW_TOKEN`
/// frames and in this case, we wait for 3xPTO before emitting an event. This is especially a
/// problem for short-lived connections, where the connection is closed before any events are
/// released. This function retrieves the token, without waiting for a `NEW_TOKEN` frame to
/// arrive.
///
/// # Panics
///
/// If this is called on a server.
pub fn take_resumption_token(&mut self, now: Instant) -> Option<ResumptionToken> {
assert_eq!(self.role, Role::Client);
if self.crypto.has_resumption_token() {
let token = self.make_resumption_token();
if self.crypto.has_resumption_token() {
self.release_resumption_token_timer = Some(now + 3 * self.pto());
}
Some(token)
} else {
None
}
}
/// Enable resumption, using a token previously provided.
/// This can only be called once and only on the client.
/// After calling the function, it should be possible to attempt 0-RTT
/// if the token supports that.
/// # Errors
/// When the operation fails, which is usually due to bad inputs or bad connection state.
pub fn enable_resumption(&mut self, now: Instant, token: impl AsRef<[u8]>) -> Res<()> {
if self.state != State::Init {
qerror!([self], "set token in state {:?}", self.state);
return Err(Error::ConnectionState);
}
if self.role == Role::Server {
return Err(Error::ConnectionState);
}
qinfo!(
[self],
"resumption token {}",
hex_snip_middle(token.as_ref())
);
let mut dec = Decoder::from(token.as_ref());
let version = Version::try_from(u32::try_from(
dec.decode_uint(4).ok_or(Error::InvalidResumptionToken)?,
)?)?;
qtrace!([self], " version {:?}", version);
if !self.conn_params.get_versions().all().contains(&version) {
return Err(Error::DisabledVersion);
}
let rtt = Duration::from_millis(dec.decode_varint().ok_or(Error::InvalidResumptionToken)?);
qtrace!([self], " RTT {:?}", rtt);
let tp_slice = dec.decode_vvec().ok_or(Error::InvalidResumptionToken)?;
qtrace!([self], " transport parameters {}", hex(tp_slice));
let mut dec_tp = Decoder::from(tp_slice);
let tp =
TransportParameters::decode(&mut dec_tp).map_err(|_| Error::InvalidResumptionToken)?;
let init_token = dec.decode_vvec().ok_or(Error::InvalidResumptionToken)?;
qtrace!([self], " Initial token {}", hex(init_token));
let tok = dec.decode_remainder();
qtrace!([self], " TLS token {}", hex(tok));
match self.crypto.tls {
Agent::Client(ref mut c) => {
let res = c.enable_resumption(tok);
if let Err(e) = res {
self.absorb_error::<Error>(now, Err(Error::from(e)));
return Ok(());
}
}
Agent::Server(_) => return Err(Error::WrongRole),
}
self.version = version;
self.conn_params.get_versions_mut().set_initial(version);
self.tps.borrow_mut().set_version(version);
self.tps.borrow_mut().remote_0rtt = Some(tp);
if !init_token.is_empty() {
self.address_validation = AddressValidationInfo::NewToken(init_token.to_vec());
}
self.paths
.primary()
.ok_or(Error::InternalError)?
.borrow_mut()
.rtt_mut()
.set_initial(rtt);
self.set_initial_limits();
// Start up TLS, which has the effect of setting up all the necessary
// state for 0-RTT. This only stages the CRYPTO frames.
let res = self.client_start(now);
self.absorb_error(now, res);
Ok(())
}
pub(crate) fn set_validation(&mut self, validation: &Rc<RefCell<AddressValidation>>) {
qtrace!([self], "Enabling NEW_TOKEN");
assert_eq!(self.role, Role::Server);
self.address_validation = AddressValidationInfo::Server(Rc::downgrade(validation));
}
/// Send a TLS session ticket AND a `NEW_TOKEN` frame (if possible).
/// # Errors
/// When the operation fails, which is usually due to bad inputs or bad connection state.
pub fn send_ticket(&mut self, now: Instant, extra: &[u8]) -> Res<()> {
if self.role == Role::Client {
return Err(Error::WrongRole);
}
let tps = &self.tps;
if let Agent::Server(ref mut s) = self.crypto.tls {
let mut enc = Encoder::default();
enc.encode_vvec_with(|enc_inner| {
tps.borrow().local.encode(enc_inner);
});
enc.encode(extra);
let records = s.send_ticket(now, enc.as_ref())?;
qdebug!([self], "send session ticket {}", hex(&enc));
self.crypto.buffer_records(records)?;
} else {
unreachable!();
}
// If we are able, also send a NEW_TOKEN frame.
// This should be recording all remote addresses that are valid,
// but there are just 0 or 1 in the current implementation.
if let Some(path) = self.paths.primary() {
if let Some(token) = self
.address_validation
.generate_new_token(path.borrow().remote_address(), now)
{
self.new_token.send_new_token(token);
}
Ok(())
} else {
Err(Error::NotConnected)
}
}
#[must_use]
pub fn tls_info(&self) -> Option<&SecretAgentInfo> {
self.crypto.tls.info()
}
/// # Errors
/// When there is no information to obtain.
pub fn tls_preinfo(&self) -> Res<SecretAgentPreInfo> {
Ok(self.crypto.tls.preinfo()?)
}
/// Get the peer's certificate chain and other info.
#[must_use]
pub fn peer_certificate(&self) -> Option<CertificateInfo> {
self.crypto.tls.peer_certificate()
}
/// Call by application when the peer cert has been verified.
///
/// This panics if there is no active peer. It's OK to call this
/// when authentication isn't needed, that will likely only cause
/// the connection to fail. However, if no packets have been
/// exchanged, it's not OK.
pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) {
qdebug!([self], "Authenticated {:?}", status);
self.crypto.tls.authenticated(status);
let res = self.handshake(now, self.version, PacketNumberSpace::Handshake, None);
self.absorb_error(now, res);
self.process_saved(now);
}
/// Get the role of the connection.
#[must_use]
pub const fn role(&self) -> Role {
self.role
}
/// Get the state of the connection.
#[must_use]
pub const fn state(&self) -> &State {
&self.state
}
/// The QUIC version in use.
#[must_use]
pub const fn version(&self) -> Version {
self.version
}
/// Get the 0-RTT state of the connection.
#[must_use]
pub const fn zero_rtt_state(&self) -> ZeroRttState {
self.zero_rtt_state
}
/// Get a snapshot of collected statistics.
#[must_use]
pub fn stats(&self) -> Stats {
let mut v = self.stats.borrow().clone();
if let Some(p) = self.paths.primary() {
let p = p.borrow();
v.rtt = p.rtt().estimate();
v.rttvar = p.rtt().rttvar();
}
v
}
// This function wraps a call to another function and sets the connection state
// properly if that call fails.
fn capture_error<T>(
&mut self,
path: Option<PathRef>,
now: Instant,
frame_type: FrameType,
res: Res<T>,
) -> Res<T> {
if let Err(v) = &res {
#[cfg(debug_assertions)]
let msg = format!("{v:?}");
#[cfg(not(debug_assertions))]
let msg = "";
let error = CloseReason::Transport(v.clone());
match &self.state {
State::Closing { error: err, .. }
| State::Draining { error: err, .. }
| State::Closed(err) => {
qwarn!([self], "Closing again after error {:?}", err);
}
State::Init => {
// We have not even sent anything just close the connection without sending any
// error. This may happen when client_start fails.
self.set_state(State::Closed(error), now);
}
State::WaitInitial => {
// We don't have any state yet, so don't bother with
// the closing state, just send one CONNECTION_CLOSE.
if let Some(path) = path.or_else(|| self.paths.primary()) {
self.state_signaling
.close(path, error.clone(), frame_type, msg);
}
self.set_state(State::Closed(error), now);
}
_ => {
if let Some(path) = path.or_else(|| self.paths.primary()) {
self.state_signaling
.close(path, error.clone(), frame_type, msg);
if matches!(v, Error::KeysExhausted) {
self.set_state(State::Closed(error), now);
} else {
self.set_state(
State::Closing {
error,
timeout: self.get_closing_period_time(now),
},
now,
);
}
} else {
self.set_state(State::Closed(error), now);
}
}
}
}
res
}
/// For use with `process_input()`. Errors there can be ignored, but this
/// needs to ensure that the state is updated.
fn absorb_error<T>(&mut self, now: Instant, res: Res<T>) -> Option<T> {
self.capture_error(None, now, 0, res).ok()
}
fn process_timer(&mut self, now: Instant) {
match &self.state {
// Only the client runs timers while waiting for Initial packets.
State::WaitInitial => debug_assert_eq!(self.role, Role::Client),
// If Closing or Draining, check if it is time to move to Closed.
State::Closing { error, timeout } | State::Draining { error, timeout } => {
if *timeout <= now {
let st = State::Closed(error.clone());
self.set_state(st, now);
qinfo!("Closing timer expired");
return;
}
}
State::Closed(_) => {
qdebug!("Timer fired while closed");
return;
}
_ => (),
}
let pto = self.pto();
if self.idle_timeout.expired(now, pto) {
qinfo!([self], "idle timeout expired");
self.set_state(
State::Closed(CloseReason::Transport(Error::IdleTimeout)),
now,
);
return;
}
if self.state.closing() {
qtrace!([self], "Closing, not processing other timers");
return;
}
self.streams.cleanup_closed_streams();
let res = self.crypto.states.check_key_update(now);
self.absorb_error(now, res);
if let Some(path) = self.paths.primary() {
let lost = self.loss_recovery.timeout(&path, now);
self.handle_lost_packets(&lost);
qlog::packets_lost(&self.qlog, &lost, now);
}
if self.release_resumption_token_timer.is_some() {
self.create_resumption_token(now);
}
if !self
.paths
.process_timeout(now, pto, &mut self.stats.borrow_mut())
{
qinfo!([self], "last available path failed");
self.absorb_error::<Error>(now, Err(Error::NoAvailablePath));
}
}
/// Whether the given [`ConnectionIdRef`] is a valid local [`ConnectionId`].
#[must_use]
pub fn is_valid_local_cid(&self, cid: ConnectionIdRef) -> bool {
self.cid_manager.is_valid(cid)
}
/// Process new input datagrams on the connection.
pub fn process_input(&mut self, d: Datagram<impl AsRef<[u8]>>, now: Instant) {
self.process_multiple_input(iter::once(d), now);
}
/// Process new input datagrams on the connection.
pub fn process_multiple_input(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<impl AsRef<[u8]>>>,
now: Instant,
) {
let mut dgrams = dgrams.into_iter().peekable();
if dgrams.peek().is_none() {
return;
}
for d in dgrams {
self.input(d, now, now);
}
self.process_saved(now);
self.streams.cleanup_closed_streams();
}
/// Get the time that we next need to be called back, relative to `now`.
fn next_delay(&mut self, now: Instant, paced: bool) -> Duration {
qtrace!([self], "Get callback delay {:?}", now);
// Only one timer matters when closing...
if let State::Closing { timeout, .. } | State::Draining { timeout, .. } = self.state {
self.hrtime.update(Self::LOOSE_TIMER_RESOLUTION);
return timeout.duration_since(now);
}
let mut delays = SmallVec::<[_; 7]>::new();
if let Some(ack_time) = self.acks.ack_time(now) {
qtrace!([self], "Delayed ACK timer {:?}", ack_time);
delays.push(ack_time);
}
if let Some(p) = self.paths.primary() {
let path = p.borrow();
let rtt = path.rtt();
let pto = rtt.pto(self.confirmed());
let idle_time = self.idle_timeout.expiry(now, pto);
qtrace!([self], "Idle timer {:?}", idle_time);
delays.push(idle_time);
if self.streams.need_keep_alive() {
if let Some(keep_alive_time) = self.idle_timeout.next_keep_alive(now, pto) {
qtrace!([self], "Keep alive timer {:?}", keep_alive_time);
delays.push(keep_alive_time);
}
}
if let Some(lr_time) = self.loss_recovery.next_timeout(&path) {
qtrace!([self], "Loss recovery timer {:?}", lr_time);
delays.push(lr_time);
}
if paced {
if let Some(pace_time) = path.sender().next_paced(rtt.estimate()) {
qtrace!([self], "Pacing timer {:?}", pace_time);
delays.push(pace_time);
}
}
if let Some(path_time) = self.paths.next_timeout(pto) {
qtrace!([self], "Path probe timer {:?}", path_time);
delays.push(path_time);
}
}
if let Some(key_update_time) = self.crypto.states.update_time() {
qtrace!([self], "Key update timer {:?}", key_update_time);
delays.push(key_update_time);
}
// `release_resumption_token_timer` is not considered here, because
// it is not important enough to force the application to set a
// timeout for it It is expected that other activities will
// drive it.
let earliest = delays.into_iter().min().unwrap();
// TODO(agrover, mt) - need to analyze and fix #47
// rather than just clamping to zero here.
debug_assert!(earliest > now);
let delay = earliest.saturating_duration_since(now);
qdebug!([self], "delay duration {:?}", delay);
self.hrtime.update(delay / 4);
delay
}
/// Get output packets, as a result of receiving packets, or actions taken
/// by the application.
/// Returns datagrams to send, and how long to wait before calling again
/// even if no incoming packets.
#[must_use = "Output of the process_output function must be handled"]
pub fn process_output(&mut self, now: Instant) -> Output {
qtrace!([self], "process_output {:?} {:?}", self.state, now);
match (&self.state, self.role) {
(State::Init, Role::Client) => {
let res = self.client_start(now);
self.absorb_error(now, res);
}
(State::Init | State::WaitInitial, Role::Server) => {
return Output::None;
}
_ => {
self.process_timer(now);
}
}
match self.output(now) {
SendOption::Yes(dgram) => Output::Datagram(dgram),
SendOption::No(paced) => match self.state {
State::Init | State::Closed(_) => Output::None,
State::Closing { timeout, .. } | State::Draining { timeout, .. } => {
Output::Callback(timeout.duration_since(now))
}
_ => Output::Callback(self.next_delay(now, paced)),
},
}
}
/// A test-only output function that uses the provided writer to
/// pack something extra into the output.
#[cfg(test)]
pub fn test_write_frames<W>(&mut self, writer: W, now: Instant) -> Output
where
W: test_internal::FrameWriter + 'static,
{
self.test_frame_writer = Some(Box::new(writer));
let res = self.process_output(now);
self.test_frame_writer = None;
res
}
/// Process input and generate output.
#[must_use = "Output of the process function must be handled"]
pub fn process(&mut self, dgram: Option<Datagram<impl AsRef<[u8]>>>, now: Instant) -> Output {
if let Some(d) = dgram {
self.input(d, now, now);
self.process_saved(now);
}
#[allow(clippy::let_and_return)]
let output = self.process_output(now);
#[cfg(all(feature = "build-fuzzing-corpus", test))]
if self.test_frame_writer.is_none() {
if let Some(d) = output.clone().dgram() {
neqo_common::write_item_to_fuzzing_corpus("packet", &d);
}
}
output
}
fn handle_retry(&mut self, packet: &PublicPacket, now: Instant) -> Res<()> {
qinfo!([self], "received Retry");
if matches!(self.address_validation, AddressValidationInfo::Retry { .. }) {
self.stats.borrow_mut().pkt_dropped("Extra Retry");
return Ok(());
}
if packet.token().is_empty() {
self.stats.borrow_mut().pkt_dropped("Retry without a token");
return Ok(());
}
if !packet.is_valid_retry(
self.original_destination_cid
.as_ref()
.ok_or(Error::InvalidRetry)?,
) {
self.stats
.borrow_mut()
.pkt_dropped("Retry with bad integrity tag");
return Ok(());
}
// At this point, we should only have the connection ID that we generated.
// Update to the one that the server prefers.
let Some(path) = self.paths.primary() else {
self.stats
.borrow_mut()
.pkt_dropped("Retry without an existing path");
return Ok(());
};
path.borrow_mut().set_remote_cid(packet.scid());
let retry_scid = ConnectionId::from(packet.scid());
qinfo!(
[self],
"Valid Retry received, token={} scid={}",
hex(packet.token()),
retry_scid
);
let lost_packets = self.loss_recovery.retry(&path, now);
self.handle_lost_packets(&lost_packets);
self.crypto.states.init(
self.conn_params.get_versions().compatible(),
self.role,
&retry_scid,
)?;
self.address_validation = AddressValidationInfo::Retry {
token: packet.token().to_vec(),
retry_source_cid: retry_scid,
};
Ok(())
}
fn discard_keys(&mut self, space: PacketNumberSpace, now: Instant) {
if self.crypto.discard(space) {
qdebug!([self], "Drop packet number space {}", space);
if let Some(path) = self.paths.primary() {
self.loss_recovery.discard(&path, space, now);
}
self.acks.drop_space(space);
}
}
fn is_stateless_reset(&self, path: &PathRef, d: &Datagram<impl AsRef<[u8]>>) -> bool {
// If the datagram is too small, don't try.
// If the connection is connected, then the reset token will be invalid.
if d.len() < 16 || !self.state.connected() {
return false;
}
<&[u8; 16]>::try_from(&d.as_ref()[d.len() - 16..])
.is_ok_and(|token| path.borrow().is_stateless_reset(token))
}
fn check_stateless_reset(
&mut self,
path: &PathRef,
d: &Datagram<impl AsRef<[u8]>>,
first: bool,
now: Instant,
) -> Res<()> {
if first && self.is_stateless_reset(path, d) {
// Failing to process a packet in a datagram might
// indicate that there is a stateless reset present.
qdebug!([self], "Stateless reset: {}", hex(&d[d.len() - 16..]));
self.state_signaling.reset();
self.set_state(
State::Draining {
error: CloseReason::Transport(Error::StatelessReset),
timeout: self.get_closing_period_time(now),
},
now,
);
Err(Error::StatelessReset)
} else {
Ok(())
}
}
/// Process any saved datagrams that might be available for processing.
fn process_saved(&mut self, now: Instant) {
while let Some(cspace) = self.saved_datagrams.available() {
qdebug!([self], "process saved for space {:?}", cspace);
debug_assert!(self.crypto.states.rx_hp(self.version, cspace).is_some());
for saved in self.saved_datagrams.take_saved() {
qtrace!([self], "input saved @{:?}: {:?}", saved.t, saved.d);
self.input(saved.d, saved.t, now);
}
}
}
/// In case a datagram arrives that we can only partially process, save any
/// part that we don't have keys for.
#[allow(clippy::needless_pass_by_value)] // To consume an owned datagram below.
fn save_datagram(
&mut self,
cspace: CryptoSpace,
d: Datagram<impl AsRef<[u8]>>,
remaining: usize,
now: Instant,
) {
let d = Datagram::new(
d.source(),
d.destination(),
d.tos(),
d[d.len() - remaining..].to_vec(),
);
self.saved_datagrams.save(cspace, d, now);
self.stats.borrow_mut().saved_datagrams += 1;
}
/// Perform version negotiation.
fn version_negotiation(&mut self, supported: &[WireVersion], now: Instant) -> Res<()> {
debug_assert_eq!(self.role, Role::Client);
if let Some(version) = self.conn_params.get_versions().preferred(supported) {
assert_ne!(self.version, version);
qinfo!([self], "Version negotiation: trying {:?}", version);
let path = self.paths.primary().ok_or(Error::NoAvailablePath)?;
let local_addr = path.borrow().local_address();
let remote_addr = path.borrow().remote_address();
let conn_params = self
.conn_params
.clone()
.versions(version, self.conn_params.get_versions().all().to_vec());
let mut c = Self::new_client(
self.crypto.server_name().ok_or(Error::VersionNegotiation)?,
self.crypto.protocols(),
self.cid_manager.generator(),
local_addr,
remote_addr,
conn_params,
now,
)?;
c.conn_params
.get_versions_mut()
.set_initial(self.conn_params.get_versions().initial());
mem::swap(self, &mut c);
qlog::client_version_information_negotiated(
&self.qlog,
self.conn_params.get_versions().all(),
supported,
version,
now,
);
Ok(())
} else {
qinfo!([self], "Version negotiation: failed with {:?}", supported);
// This error goes straight to closed.
self.set_state(
State::Closed(CloseReason::Transport(Error::VersionNegotiation)),
now,
);
Err(Error::VersionNegotiation)
}
}
/// Perform any processing that we might have to do on packets prior to
/// attempting to remove protection.
#[allow(clippy::too_many_lines)] // Yeah, it's a work in progress.
fn preprocess_packet(
&mut self,
packet: &PublicPacket,
path: &PathRef,
dcid: Option<&ConnectionId>,
now: Instant,
) -> Res<PreprocessResult> {
if dcid.is_some_and(|d| d != &packet.dcid()) {
self.stats
.borrow_mut()
.pkt_dropped("Coalesced packet has different DCID");
return Ok(PreprocessResult::Next);
}
if (packet.packet_type() == PacketType::Initial
|| packet.packet_type() == PacketType::Handshake)
&& self.role == Role::Client
&& !path.borrow().is_primary()
{
// If we have received a packet from a different address than we have sent to
// we should ignore the packet. In such a case a path will be a newly created
// temporary path, not the primary path.
return Ok(PreprocessResult::Next);
}
match (packet.packet_type(), &self.state, &self.role) {
(PacketType::Initial, State::Init, Role::Server) => {
let version = packet.version().ok_or(Error::ProtocolViolation)?;
if !packet.is_valid_initial()
|| !self.conn_params.get_versions().all().contains(&version)
{
self.stats.borrow_mut().pkt_dropped("Invalid Initial");
return Ok(PreprocessResult::Next);
}
qinfo!(
[self],
"Received valid Initial packet with scid {:?} dcid {:?}",
packet.scid(),
packet.dcid()
);
// Record the client's selected CID so that it can be accepted until
// the client starts using a real connection ID.
let dcid = ConnectionId::from(packet.dcid());
self.crypto.states.init_server(version, &dcid)?;
self.original_destination_cid = Some(dcid);
self.set_state(State::WaitInitial, now);
// We need to make sure that we set this transport parameter.
// This has to happen prior to processing the packet so that
// the TLS handshake has all it needs.
if !self.retry_sent() {
self.tps.borrow_mut().local.set_bytes(
tparams::ORIGINAL_DESTINATION_CONNECTION_ID,
packet.dcid().to_vec(),
);
}
}
(PacketType::VersionNegotiation, State::WaitInitial, Role::Client) => {
if let Ok(versions) = packet.supported_versions() {
if versions.is_empty()
|| versions.contains(&self.version().wire_version())
|| versions.contains(&0)
|| &packet.scid() != self.odcid().ok_or(Error::InternalError)?
|| matches!(self.address_validation, AddressValidationInfo::Retry { .. })
{
// Ignore VersionNegotiation packets that contain the current version.
// Or don't have the right connection ID.
// Or are received after a Retry.
self.stats.borrow_mut().pkt_dropped("Invalid VN");
} else {
self.version_negotiation(&versions, now)?;
}
} else {
self.stats.borrow_mut().pkt_dropped("VN with no versions");
};
return Ok(PreprocessResult::End);
}
(PacketType::Retry, State::WaitInitial, Role::Client) => {
self.handle_retry(packet, now)?;
return Ok(PreprocessResult::Next);
}
(PacketType::Handshake | PacketType::Short, State::WaitInitial, Role::Client) => {
// This packet can't be processed now, but it could be a sign
// that Initial packets were lost.
// Resend Initial CRYPTO frames immediately a few times just
// in case. As we don't have an RTT estimate yet, this helps
// when there is a short RTT and losses. Also mark all 0-RTT
// data as lost.
if dcid.is_none()
&& self.cid_manager.is_valid(packet.dcid())
&& self.stats.borrow().saved_datagrams <= EXTRA_INITIALS
{
self.crypto.resend_unacked(PacketNumberSpace::Initial);
self.resend_0rtt(now);
}
}
(PacketType::VersionNegotiation | PacketType::Retry | PacketType::OtherVersion, ..) => {
self.stats
.borrow_mut()
.pkt_dropped(format!("{:?}", packet.packet_type()));
return Ok(PreprocessResult::Next);
}
_ => {}
}
let res = match self.state {
State::Init => {
self.stats
.borrow_mut()
.pkt_dropped("Received while in Init state");
PreprocessResult::Next
}
State::WaitInitial => PreprocessResult::Continue,
State::WaitVersion | State::Handshaking | State::Connected | State::Confirmed => {
if self.cid_manager.is_valid(packet.dcid()) {
if self.role == Role::Server && packet.packet_type() == PacketType::Handshake {
// Server has received a Handshake packet -> discard Initial keys and states
self.discard_keys(PacketNumberSpace::Initial, now);
}
PreprocessResult::Continue
} else {
self.stats
.borrow_mut()
.pkt_dropped(format!("Invalid DCID {:?}", packet.dcid()));
PreprocessResult::Next
}
}
State::Closing { .. } => {
// Don't bother processing the packet. Instead ask to get a
// new close frame.
self.state_signaling.send_close();
PreprocessResult::Next
}
State::Draining { .. } | State::Closed(..) => {
// Do nothing.
self.stats
.borrow_mut()
.pkt_dropped(format!("State {:?}", self.state));
PreprocessResult::Next
}
};
Ok(res)
}
/// After a Initial, Handshake, `ZeroRtt`, or Short packet is successfully processed.
fn postprocess_packet(
&mut self,
path: &PathRef,
d: &Datagram<impl AsRef<[u8]>>,
packet: &PublicPacket,
migrate: bool,
now: Instant,
) {
let space = PacketNumberSpace::from(packet.packet_type());
if let Some(space) = self.acks.get_mut(space) {
let space_ecn_marks = space.ecn_marks();
*space_ecn_marks += d.tos().into();
self.stats.borrow_mut().ecn_rx = *space_ecn_marks;
} else {
qtrace!("Not tracking ECN for dropped packet number space");
}
if self.state == State::WaitInitial {
self.start_handshake(path, packet, now);
}
if self.state.connected() {
self.handle_migration(path, d, migrate, now);
} else if self.role != Role::Client
&& (packet.packet_type() == PacketType::Handshake
|| (packet.dcid().len() >= 8 && packet.dcid() == self.local_initial_source_cid))
{
// We only allow one path during setup, so apply handshake
// path validation to this path.
path.borrow_mut().set_valid(now);
}
}
/// Take a datagram as input. This reports an error if the packet was bad.
/// This takes two times: when the datagram was received, and the current time.
fn input(&mut self, d: Datagram<impl AsRef<[u8]>>, received: Instant, now: Instant) {
// First determine the path.
let path = self.paths.find_path(
d.destination(),
d.source(),
self.conn_params.get_cc_algorithm(),
self.conn_params.pacing_enabled(),
now,
);
path.borrow_mut().add_received(d.len());
let res = self.input_path(&path, d, received);
self.capture_error(Some(path), now, 0, res).ok();
}
fn input_path(
&mut self,
path: &PathRef,
d: Datagram<impl AsRef<[u8]>>,
now: Instant,
) -> Res<()> {
let mut slc = d.as_ref();
let mut dcid = None;
qtrace!([self], "{} input {}", path.borrow(), hex(&d));
let pto = path.borrow().rtt().pto(self.confirmed());
// Handle each packet in the datagram.
while !slc.is_empty() {
self.stats.borrow_mut().packets_rx += 1;
let (packet, remainder) =
match PublicPacket::decode(slc, self.cid_manager.decoder().as_ref()) {
Ok((packet, remainder)) => (packet, remainder),
Err(e) => {
qinfo!([self], "Garbage packet: {}", e);
qtrace!([self], "Garbage packet contents: {}", hex(slc));
self.stats.borrow_mut().pkt_dropped("Garbage packet");
break;
}
};
match self.preprocess_packet(&packet, path, dcid.as_ref(), now)? {
PreprocessResult::Continue => (),
PreprocessResult::Next => break,
PreprocessResult::End => return Ok(()),
}
qtrace!([self], "Received unverified packet {:?}", packet);
match packet.decrypt(&mut self.crypto.states, now + pto) {
Ok(payload) => {
// OK, we have a valid packet.
self.idle_timeout.on_packet_received(now);
dump_packet(
self,
path,
"-> RX",
payload.packet_type(),
payload.pn(),
&payload[..],
d.tos(),
d.len(),
);
#[cfg(feature = "build-fuzzing-corpus")]
if packet.packet_type() == PacketType::Initial {
let target = if self.role == Role::Client {
"server_initial"
} else {
"client_initial"
};
neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]);
}
qlog::packet_received(&self.qlog, &packet, &payload, now);
let space = PacketNumberSpace::from(payload.packet_type());
if let Some(space) = self.acks.get_mut(space) {
if space.is_duplicate(payload.pn()) {
qdebug!("Duplicate packet {}-{}", space, payload.pn());
self.stats.borrow_mut().dups_rx += 1;
} else {
match self.process_packet(path, &payload, now) {
Ok(migrate) => {
self.postprocess_packet(path, &d, &packet, migrate, now);
}
Err(e) => {
self.ensure_error_path(path, &packet, now);
return Err(e);
}
}
}
} else {
qdebug!(
[self],
"Received packet {} for untracked space {}",
space,
payload.pn()
);
return Err(Error::ProtocolViolation);
}
}
Err(e) => {
match e {
Error::KeysPending(cspace) => {
// This packet can't be decrypted because we don't have the keys yet.
// Don't check this packet for a stateless reset, just return.
let remaining = slc.len();
self.save_datagram(cspace, d, remaining, now);
return Ok(());
}
Error::KeysExhausted => {
// Exhausting read keys is fatal.
return Err(e);
}
Error::KeysDiscarded(cspace) => {
// This was a valid-appearing Initial packet: maybe probe with
// a Handshake packet to keep the handshake moving.
self.received_untracked |=
self.role == Role::Client && cspace == CryptoSpace::Initial;
}
_ => (),
}
// Decryption failure, or not having keys is not fatal.
// If the state isn't available, or we can't decrypt the packet, drop
// the rest of the datagram on the floor, but don't generate an error.
self.check_stateless_reset(path, &d, dcid.is_none(), now)?;
self.stats.borrow_mut().pkt_dropped("Decryption failure");
qlog::packet_dropped(&self.qlog, &packet, now);
}
}
slc = remainder;
dcid = Some(ConnectionId::from(packet.dcid()));
}
self.check_stateless_reset(path, &d, dcid.is_none(), now)?;
Ok(())
}
/// Process a packet. Returns true if the packet might initiate migration.
fn process_packet(
&mut self,
path: &PathRef,
packet: &DecryptedPacket,
now: Instant,
) -> Res<bool> {
(!packet.is_empty())
.then_some(())
.ok_or(Error::ProtocolViolation)?;
// TODO(ekr@rtfm.com): Have the server blow away the initial
// crypto state if this fails? Otherwise, we will get a panic
// on the assert for doesn't exist.
// OK, we have a valid packet.
// Get the next packet number we'll send, for ACK verification.
// TODO: Once PR #2118 lands, this can move to `input_frame`. For now, it needs to be here,
// because we can drop packet number spaces as we parse throught the packet, and if an ACK
// frame follows a CRYPTO frame that makes us drop a space, we need to know this
// packet number to verify the ACK against.
let next_pn = self
.crypto
.states
.select_tx(self.version, PacketNumberSpace::from(packet.packet_type()))
.map_or(0, |(_, tx)| tx.next_pn());
let mut ack_eliciting = false;
let mut probing = true;
let mut d = Decoder::from(&packet[..]);
while d.remaining() > 0 {
#[cfg(feature = "build-fuzzing-corpus")]
let pos = d.offset();
let f = Frame::decode(&mut d)?;
#[cfg(feature = "build-fuzzing-corpus")]
neqo_common::write_item_to_fuzzing_corpus("frame", &packet[pos..d.offset()]);
ack_eliciting |= f.ack_eliciting();
probing &= f.path_probing();
let t = f.get_type();
if let Err(e) = self.input_frame(
path,
packet.version(),
packet.packet_type(),
f,
next_pn,
now,
) {
self.capture_error(Some(Rc::clone(path)), now, t, Err(e))?;
}
--> --------------------
--> maximum size reached
--> --------------------
[ Dauer der Verarbeitung: 0.56 Sekunden
(vorverarbeitet)
]
|
2026-04-04
|
|
|
|
|