Quelle connection_client.rs
Sprache: unbekannt
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::{
cell::RefCell,
fmt::{Debug, Display},
iter, mem,
net::SocketAddr,
rc::Rc,
time::Instant,
};
use neqo_common::{
event::Provider as EventProvider, hex, hex_with_len, qdebug, qinfo, qlog::NeqoQlog, qtra ce,
Datagram, Decoder, Encoder, Header, MessageType, Role,
};
use neqo_crypto::{agent::CertificateInfo, AuthenticationStatus, ResumptionToken, SecretAgentInfo};
use neqo_qpack::Stats as QpackStats;
use neqo_transport::{
streams::SendOrder, AppError, Connection, ConnectionEvent, ConnectionId, ConnectionIdGenerator,
DatagramTracking, Output, RecvStreamStats, SendStreamStats, Stats as TransportStats, StreamId,
StreamType, Version, ZeroRttState,
};
use crate::{
client_events::{Http3ClientEvent, Http3ClientEvents},
connection::{Http3Connection, Http3State, RequestDescription},
frames::HFrame,
push_controller::{PushController, RecvPushEvents},
recv_message::{RecvMessage, RecvMessageInfo},
request_target::AsRequestTarget,
settings::HSettings,
Error, Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler,
ReceiveOutput, Res,
};
// This is used for filtering send_streams and recv_Streams with a stream_ids greater than or equal
// a given id. Only the same type (bidirectional or unidirectionsl) streams are filtered.
fn id_gte<U>(base: StreamId) -> impl FnMut((&StreamId, &U)) -> Option<StreamId> + 'static
where
U: ?Sized,
{
move |(id, _)| {
if *id >= base && !(id.is_bidi() ^ base.is_bidi()) {
Some(*id)
} else {
None
}
}
}
const fn alpn_from_quic_version(version: Version) -> &'static str {
match version {
Version::Version2 | Version::Version1 => "h3",
Version::Draft29 => "h3-29",
Version::Draft30 => "h3-30",
Version::Draft31 => "h3-31",
Version::Draft32 => "h3-32",
}
}
/// # The HTTP/3 client API
///
/// This module implements the HTTP/3 client API. The main implementation of the protocol is in
/// [connection.rs](https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs) which
/// implements common behavior for the client-side and the server-side. `Http3Client` structure
/// implements the public API and set of functions that differ between the client and the server.
///
/// The API is used for:
/// - create and close an endpoint:
/// - [`Http3Client::new`]
/// - [`Http3Client::new_with_conn`]
/// - [`Http3Client::close`]
/// - configuring an endpoint:
/// - [`Http3Client::authenticated`]
/// - [`Http3Client::enable_ech`]
/// - [`Http3Client::enable_resumption`]
/// - [`Http3Client::initiate_key_update`]
/// - [`Http3Client::set_qlog`]
/// - retrieving information about a connection:
/// - [`Http3Client::peer_certificate`]
/// - [`Http3Client::qpack_decoder_stats`]
/// - [`Http3Client::qpack_encoder_stats`]
/// - [`Http3Client::transport_stats`]
/// - [`Http3Client::state`]
/// - [`Http3Client::take_resumption_token`]
/// - [`Http3Client::tls_info`]
/// - driving HTTP/3 session:
/// - [`Http3Client::process_output`]
/// - [`Http3Client::process_input`]
/// - [`Http3Client::process`]
/// - create requests, send/receive data, and cancel requests:
/// - [`Http3Client::fetch`]
/// - [`Http3Client::send_data`]
/// - [`Http3Client::read_data`]
/// - [`Http3Client::stream_close_send`]
/// - [`Http3Client::cancel_fetch`]
/// - [`Http3Client::stream_reset_send`]
/// - [`Http3Client::stream_stop_sending`]
/// - [`Http3Client::set_stream_max_data`]
/// - priority feature:
/// - [`Http3Client::priority_update`]
/// - `WebTransport` feature:
/// - [`Http3Client::webtransport_create_session`]
/// - [`Http3Client::webtransport_close_session`]
/// - [`Http3Client::webtransport_create_stream`]
/// - [`Http3Client::webtransport_enabled`]
///
/// ## Examples
///
/// ### Fetching a resource
///
/// ```ignore
/// let mut client = Http3Client::new(...);
///
/// // Perform a handshake
/// ...
///
/// let req = client
/// .fetch(
/// Instant::now(),
/// "GET",
/// &("https", "something.com", "/"),
/// &[Header::new("example1", "value1"), Header::new("example1", "value2")],
/// Priority::default(),
/// )
/// .unwrap();
///
/// client.stream_close_send(req).unwrap();
///
/// loop {
/// // exchange packets
/// ...
///
/// while let Some(event) = client.next_event() {
/// match event {
/// Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin } => {
/// println!("New response headers received for stream {:?} [fin={?}, interim={:?}]: {:?}",
/// stream_id,
/// fin,
/// interim,
/// headers,
/// );
/// }
/// Http3ClientEvent::DataReadable { stream_id } => {
/// println!("New data available on stream {}", stream_id);
/// let mut buf = [0; 100];
/// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
/// println!("Read {:?} bytes from stream {:?} [fin={?}]",
/// amount,
/// stream_id,
/// fin,
/// );
/// }
/// _ => {
/// println!("Unhandled event {:?}", event);
/// }
/// }
/// }
/// }
/// ```
///
/// ### Creating a `WebTransport` session
///
/// ```ignore
/// let mut client = Http3Client::new(...);
///
/// // Perform a handshake
/// ...
///
/// // Create a session
/// let wt_session_id = client
/// .webtransport_create_session(now(), &("https", "something.com", "/"), &[])
/// .unwrap();
///
/// loop {
/// // exchange packets
/// ...
///
/// while let Some(event) = client.next_event() {
/// match event {
/// Http3ClientEvent::WebTransport(WebTransportEvent::Session{
/// stream_id,
/// status,
/// ..
/// }) => {
/// println!("The response from the server: WebTransport session ID {:?} status={:?}",
/// stream_id,
/// status,
/// );
/// }
/// _ => {
/// println!("Unhandled event {:?}", event);
/// }
/// }
/// }
/// }
/// ```
///
/// ### `WebTransport`: create a stream, send and receive data on the stream
///
/// ```ignore
/// const BUF_CLIENT: &[u8] = &[0; 10];
/// // wt_session_id is the session ID of a newly created WebTransport session, see the example above.
///
/// // create a stream
/// let wt_stream_id = client
/// .webtransport_create_stream(wt_session_id, StreamType::BiDi)
/// .unwrap();
///
/// // send data
/// let data_sent = client.send_data(wt_stream_id, BUF_CLIENT).unwrap();
/// assert_eq!(data_sent, BUF_CLIENT.len());
///
/// // close stream for sending
/// client.stream_close_send(wt_stream_id).unwrap();
///
/// // wait for data from the server
/// loop {
/// // exchange packets
/// ...
///
/// while let Some(event) = client.next_event() {
/// match event {
/// Http3ClientEvent::DataReadable{ stream_id } => {
/// println!("Data receivedd form the server on WebTransport stream ID {:?}",
/// stream_id,
/// );
/// let mut buf = [0; 100];
/// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
/// println!("Read {:?} bytes from stream {:?} [fin={?}]",
/// amount,
/// stream_id,
/// fin,
/// );
/// }
/// _ => {
/// println!("Unhandled event {:?}", event);
/// }
/// }
/// }
/// }
/// ```
///
/// ### `WebTransport`: receive a new stream form the server
///
/// ```ignore
/// // wt_session_id is the session ID of a newly created WebTransport session, see the example above.
///
/// // wait for a new stream from the server
/// loop {
/// // exchange packets
/// ...
///
/// while let Some(event) = client.next_event() {
/// match event {
/// Http3ClientEvent::WebTransport(WebTransportEvent::NewStream {
/// stream_id,
/// session_id,
/// }) => {
/// println!("New stream received on session{:?}, stream id={:?} stream type={:?}",
/// sesson_id.stream_id(),
/// stream_id.stream_id(),
/// stream_id.stream_type()
/// );
/// }
/// Http3ClientEvent::DataReadable{ stream_id } => {
/// println!("Data receivedd form the server on WebTransport stream ID {:?}",
/// stream_id,
/// );
/// let mut buf = [0; 100];
/// let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
/// println!("Read {:?} bytes from stream {:?} [fin={:?}]",
/// amount,
/// stream_id,
/// fin,
/// );
/// }
/// _ => {
/// println!("Unhandled event {:?}", event);
/// }
/// }
/// }
/// }
/// ```
pub struct Http3Client {
conn: Connection,
base_handler: Http3Connection,
events: Http3ClientEvents,
push_handler: Rc<RefCell<PushController>>,
}
impl Display for Http3Client {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "Http3 client")
}
}
impl Http3Client {
/// # Errors
///
/// Making a `neqo-transport::connection` may produce an error. This can only be a crypto error
/// if the crypto context can't be created or configured.
pub fn new(
server_name: impl Into<String>,
cid_manager: Rc<RefCell<dyn ConnectionIdGenerator>>,
local_addr: SocketAddr,
remote_addr: SocketAddr,
http3_parameters: Http3Parameters,
now: Instant,
) -> Res<Self> {
Ok(Self::new_with_conn(
Connection::new_client(
server_name,
&[alpn_from_quic_version(
http3_parameters
.get_connection_parameters()
.get_versions()
.initial(),
)],
cid_manager,
local_addr,
remote_addr,
http3_parameters.get_connection_parameters().clone(),
now,
)?,
http3_parameters,
))
}
/// This is a similar function to `new`. In this case, `neqo-transport::connection` has been
/// already created.
///
/// It is recommended to use `new` instead.
#[must_use]
pub fn new_with_conn(c: Connection, http3_parameters: Http3Parameters) -> Self {
let events = Http3ClientEvents::default();
let webtransport = http3_parameters.get_webtransport();
let push_streams = http3_parameters.get_max_concurrent_push_streams();
let mut base_handler = Http3Connection::new(http3_parameters, Role::Client);
if webtransport {
base_handler.set_features_listener(events.clone());
}
Self {
conn: c,
events: events.clone(),
push_handler: Rc::new(RefCell::new(PushController::new(push_streams, events))),
base_handler,
}
}
#[must_use]
pub const fn role(&self) -> Role {
self.conn.role()
}
/// The function returns the current state of the connection.
#[must_use]
pub fn state(&self) -> Http3State {
self.base_handler.state()
}
#[must_use]
pub fn tls_info(&self) -> Option<&SecretAgentInfo> {
self.conn.tls_info()
}
/// Get the peer's certificate.
#[must_use]
pub fn peer_certificate(&self) -> Option<CertificateInfo> {
self.conn.peer_certificate()
}
/// This called when peer certificates have been verified.
///
/// `Http3ClientEvent::AuthenticationNeeded` event is emitted when peer’s certificates are
/// available and need to be verified. When the verification is completed this function is
/// called. To inform HTTP/3 session of the verification results.
pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) {
self.conn.authenticated(status, now);
}
pub fn set_qlog(&mut self, qlog: NeqoQlog) {
self.conn.set_qlog(qlog);
}
/// Enable encrypted client hello (ECH).
///
/// # Errors
///
/// Fails when the configuration provided is bad.
pub fn enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> {
self.conn.client_enable_ech(ech_config_list)?;
Ok(())
}
/// Get the connection id, which is useful for disambiguating connections to
/// the same origin.
///
/// # Panics
///
/// Never, because clients always have this field.
#[must_use]
pub fn connection_id(&self) -> &ConnectionId {
self.conn.odcid().expect("Client always has odcid")
}
fn encode_resumption_token(&self, token: &ResumptionToken) -> Option<ResumptionToken> {
self.base_handler.get_settings().map(|settings| {
let mut enc = Encoder::default();
settings.encode_frame_contents(&mut enc);
enc.encode(token.as_ref());
ResumptionToken::new(enc.into(), token.expiration_time())
})
}
/// The correct way to obtain a resumption token is to wait for the
/// `Http3ClientEvent::ResumptionToken` event. To emit the event we are waiting for a
/// resumtion token and a `NEW_TOKEN` frame to arrive. Some servers don't send `NEW_TOKEN`
/// frames and in this case, we wait for 3xPTO before emitting an event. This is especially a
/// problem for short-lived connections, where the connection is closed before any events are
/// released. This function retrieves the token, without waiting for a `NEW_TOKEN` frame to
/// arrive.
///
/// In addition to the token, HTTP/3 settings are encoded into the token before giving it to
/// the application(`encode_resumption_token`). When the resumption token is supplied to a new
/// connection the HTTP/3 setting will be decoded and used until the setting are received from
/// the server.
pub fn take_resumption_token(&mut self, now: Instant) -> Option<ResumptionToken> {
self.conn
.take_resumption_token(now)
.and_then(|t| self.encode_resumption_token(&t))
}
/// This may be call if an application has a resumption token. This must be called before
/// connection starts.
///
/// The resumption token also contains encoded HTTP/3 settings. The settings will be decoded
/// and used until the setting are received from the server.
///
/// # Errors
///
/// An error is return if token cannot be decoded or a connection is is a wrong state.
///
/// # Panics
///
/// On closing if the base handler can't handle it (debug only).
pub fn enable_resumption(&mut self, now: Instant, token: impl AsRef<[u8]>) -> Res<()> {
if self.base_handler.state != Http3State::Initializing {
return Err(Error::InvalidState);
}
let mut dec = Decoder::from(token.as_ref());
let Some(settings_slice) = dec.decode_vvec() else {
return Err(Error::InvalidResumptionToken);
};
qtrace!([self], " settings {}", hex_with_len(settings_slice));
let mut dec_settings = Decoder::from(settings_slice);
let mut settings = HSettings::default();
Error::map_error(
settings.decode_frame_contents(&mut dec_settings),
Error::InvalidResumptionToken,
)?;
let tok = dec.decode_remainder();
qtrace!([self], " Transport token {}", hex(tok));
self.conn.enable_resumption(now, tok)?;
if self.conn.state().closed() {
let state = self.conn.state().clone();
let res = self
.base_handler
.handle_state_change(&mut self.conn, &state);
debug_assert_eq!(Ok(true), res);
return Err(Error::FatalError);
}
if self.conn.zero_rtt_state() == ZeroRttState::Sending {
self.base_handler
.set_0rtt_settings(&mut self.conn, settings)?;
self.events
.connection_state_change(self.base_handler.state());
self.push_handler
.borrow_mut()
.maybe_send_max_push_id_frame(&mut self.base_handler);
}
Ok(())
}
/// This is call to close a connection.
pub fn close<S>(&mut self, now: Instant, error: AppError, msg: S)
where
S: AsRef<str> + Display,
{
qinfo!([self], "Close the connection error={} msg={}.", error, msg);
if !matches!(
self.base_handler.state,
Http3State::Closing(_) | Http3State::Closed(_)
) {
self.push_handler.borrow_mut().clear();
self.conn.close(now, error, msg);
self.base_handler.close(error);
self.events
.connection_state_change(self.base_handler.state());
}
}
/// Attempt to force a key update.
///
/// # Errors
///
/// If the connection isn't confirmed, or there is an outstanding key update, this
/// returns `Err(Error::TransportError(neqo_transport::Error::KeyUpdateBlocked))`.
pub fn initiate_key_update(&mut self) -> Res<()> {
self.conn.initiate_key_update()?;
Ok(())
}
// API: Request/response
/// The function fetches a resource using `method`, `target` and `headers`. A response body
/// may be added by calling `send_data`. `stream_close_send` must be sent to finish the request
/// even if request data are not sent.
///
/// # Errors
///
/// If a new stream cannot be created an error will be return.
///
/// # Panics
///
/// `SendMessage` implements `http_stream` so it will not panic.
pub fn fetch<'x, 't: 'x, T>(
&mut self,
now: Instant,
method: &'t str,
target: &'t T,
headers: &'t [Header],
priority: Priority,
) -> Res<StreamId>
where
T: AsRequestTarget<'x> + ?Sized + Debug,
{
let output = self.base_handler.fetch(
&mut self.conn,
Box::new(self.events.clone()),
Box::new(self.events.clone()),
Some(Rc::clone(&self.push_handler)),
&RequestDescription {
method,
connect_type: None,
target,
headers,
priority,
},
);
if let Err(e) = &output {
if e.connection_error() {
self.close(now, e.code(), "");
}
}
output
}
/// Send an [`PRIORITY_UPDATE`-frame][1] on next `Http3Client::process_output()` call.
/// Returns if the priority got changed.
///
/// # Errors
///
/// `InvalidStreamId` if the stream does not exist
///
/// [1]: https://datatracker.ietf.org/doc/html/draft-kazuho-httpbis-priority-04#section-5.2
pub fn priority_update(&mut self, stream_id: StreamId, priority: Priority) -> Res<bool> {
self.base_handler.queue_update_priority(stream_id, priority)
}
/// An application may cancel a stream(request).
/// Both sides, the receiviing and sending side, sending and receiving side, will be closed.
///
/// # Errors
///
/// An error will be return if a stream does not exist.
pub fn cancel_fetch(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
qinfo!([self], "reset_stream {} error={}.", stream_id, error);
self.base_handler
.cancel_fetch(stream_id, error, &mut self.conn)
}
/// This is call when application is done sending a request.
///
/// # Errors
///
/// An error will be return if stream does not exist.
pub fn stream_close_send(&mut self, stream_id: StreamId) -> Res<()> {
qdebug!([self], "Close sending side stream={}.", stream_id);
self.base_handler
.stream_close_send(&mut self.conn, stream_id)
}
/// # Errors
///
/// An error will be return if a stream does not exist.
pub fn stream_reset_send(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
qinfo!([self], "stream_reset_send {} error={}.", stream_id, error);
self.base_handler
.stream_reset_send(&mut self.conn, stream_id, error)
}
/// # Errors
///
/// An error will be return if a stream does not exist.
pub fn stream_stop_sending(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
qinfo!([self], "stream_stop_sending {} error={}.", stream_id, error);
self.base_handler
.stream_stop_sending(&mut self.conn, stream_id, error)
}
/// This function is used for regular HTTP requests and `WebTransport` streams.
/// In the case of regular HTTP requests, the request body is supplied using this function, and
/// headers are supplied through the `fetch` function.
///
/// # Errors
///
/// `InvalidStreamId` if the stream does not exist,
/// `AlreadyClosed` if the stream has already been closed.
/// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
/// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
/// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
/// supplied.
pub fn send_data(&mut self, stream_id: StreamId, buf: &[u8]) -> Res<usize> {
qinfo!(
[self],
"send_data from stream {} sending {} bytes.",
stream_id,
buf.len()
);
self.base_handler
.send_streams
.get_mut(&stream_id)
.ok_or(Error::InvalidStreamId)?
.send_data(&mut self.conn, buf)
}
/// Response data are read directly into a buffer supplied as a parameter of this function to
/// avoid copying data.
///
/// # Errors
///
/// It returns an error if a stream does not exist or an error happen while reading a stream,
/// e.g. early close, protocol error, etc.
pub fn read_data(
&mut self,
now: Instant,
stream_id: StreamId,
buf: &mut [u8],
) -> Res<(usize, bool)> {
qdebug!([self], "read_data from stream {}.", stream_id);
let res = self.base_handler.read_data(&mut self.conn, stream_id, buf);
if let Err(e) = &res {
if e.connection_error() {
self.close(now, e.code(), "");
}
}
res
}
// API: Push streams
/// Cancel a push
///
/// # Errors
///
/// `InvalidStreamId` if the stream does not exist.
pub fn cancel_push(&mut self, push_id: u64) -> Res<()> {
self.push_handler
.borrow_mut()
.cancel(push_id, &mut self.conn, &mut self.base_handler)
}
/// Push response data are read directly into a buffer supplied as a parameter of this function
/// to avoid copying data.
///
/// # Errors
///
/// It returns an error if a stream does not exist(`InvalidStreamId`) or an error has happened
/// while reading a stream, e.g. early close, protocol error, etc.
pub fn push_read_data(
&mut self,
now: Instant,
push_id: u64,
buf: &mut [u8],
) -> Res<(usize, bool)> {
let stream_id = self
.push_handler
.borrow_mut()
.get_active_stream_id(push_id)
.ok_or(Error::InvalidStreamId)?;
self.conn.stream_keep_alive(stream_id, true)?;
self.read_data(now, stream_id, buf)
}
// API WebTransport
//
/// # Errors
///
/// If `WebTransport` cannot be created, e.g. the `WebTransport` support is
/// not negotiated or the HTTP/3 connection is closed.
pub fn webtransport_create_session<'x, 't: 'x, T>(
&mut self,
now: Instant,
target: &'t T,
headers: &'t [Header],
) -> Res<StreamId>
where
T: AsRequestTarget<'x> + ?Sized + Debug,
{
let output = self.base_handler.webtransport_create_session(
&mut self.conn,
Box::new(self.events.clone()),
target,
headers,
);
if let Err(e) = &output {
if e.connection_error() {
self.close(now, e.code(), "");
}
}
output
}
/// Close `WebTransport` cleanly
///
/// # Errors
///
/// `InvalidStreamId` if the stream does not exist,
/// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
/// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
/// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
/// supplied.
pub fn webtransport_close_session(
&mut self,
session_id: StreamId,
error: u32,
message: &str,
) -> Res<()> {
self.base_handler
.webtransport_close_session(&mut self.conn, session_id, error, message)
}
/// # Errors
///
/// This may return an error if the particular session does not exist
/// or the connection is not in the active state.
pub fn webtransport_create_stream(
&mut self,
session_id: StreamId,
stream_type: StreamType,
) -> Res<StreamId> {
self.base_handler.webtransport_create_stream_local(
&mut self.conn,
session_id,
stream_type,
Box::new(self.events.clone()),
Box::new(self.events.clone()),
)
}
/// Send `WebTransport` datagram.
///
/// # Errors
///
/// It may return `InvalidStreamId` if a stream does not exist anymore.
/// The function returns `TooMuchData` if the supply buffer is bigger than
/// the allowed remote datagram size.
pub fn webtransport_send_datagram(
&mut self,
session_id: StreamId,
buf: &[u8],
id: impl Into<DatagramTracking>,
) -> Res<()> {
qtrace!("webtransport_send_datagram session:{:?}", session_id);
self.base_handler
.webtransport_send_datagram(session_id, &mut self.conn, buf, id)
}
/// Returns the current max size of a datagram that can fit into a packet.
/// The value will change over time depending on the encoded size of the
/// packet number, ack frames, etc.
///
/// # Errors
///
/// The function returns `NotAvailable` if datagrams are not enabled.
///
/// # Panics
///
/// This cannot panic. The max varint length is 8.
pub fn webtransport_max_datagram_size(&self, session_id: StreamId) -> Res<u64> {
Ok(self.conn.max_datagram_size()?
- u64::try_from(Encoder::varint_len(session_id.as_u64())).unwrap())
}
/// Sets the `SendOrder` for a given stream
///
/// # Errors
///
/// It may return `InvalidStreamId` if a stream does not exist anymore.
///
/// # Panics
///
/// This cannot panic.
pub fn webtransport_set_sendorder(
&mut self,
stream_id: StreamId,
sendorder: Option<SendOrder>,
) -> Res<()> {
Http3Connection::stream_set_sendorder(&mut self.conn, stream_id, sendorder)
}
/// Sets the `Fairness` for a given stream
///
/// # Errors
///
/// It may return `InvalidStreamId` if a stream does not exist anymore.
///
/// # Panics
///
/// This cannot panic.
pub fn webtransport_set_fairness(&mut self, stream_id: StreamId, fairness: bool) -> Res<()> {
Http3Connection::stream_set_fairness(&mut self.conn, stream_id, fairness)
}
/// Returns the current `SendStreamStats` of a `WebTransportSendStream`.
///
/// # Errors
///
/// `InvalidStreamId` if the stream does not exist.
pub fn webtransport_send_stream_stats(&mut self, stream_id: StreamId) -> Res<SendStreamStats> {
self.base_handler
.send_streams
.get_mut(&stream_id)
.ok_or(Error::InvalidStreamId)?
.stats(&mut self.conn)
}
/// Returns the current `RecvStreamStats` of a `WebTransportRecvStream`.
///
/// # Errors
///
/// `InvalidStreamId` if the stream does not exist.
pub fn webtransport_recv_stream_stats(&mut self, stream_id: StreamId) -> Res<RecvStreamStats> {
self.base_handler
.recv_streams
.get_mut(&stream_id)
.ok_or(Error::InvalidStreamId)?
.stats(&mut self.conn)
}
/// This function combines `process_input` and `process_output` function.
pub fn process(&mut self, dgram: Option<Datagram<impl AsRef<[u8]>>>, now: Instant) -> Output {
qtrace!([self], "Process.");
if let Some(d) = dgram {
self.process_input(d, now);
}
self.process_output(now)
}
/// The function should be called when there is a new UDP packet available. The function will
/// handle the packet payload.
///
/// First, the payload will be handled by the QUIC layer. Afterward, `process_http3` will be
/// called to handle new [`ConnectionEvent`][1]s.
///
/// After this function is called `process_output` should be called to check whether new
/// packets need to be sent or if a timer needs to be updated.
///
/// [1]: ../neqo_transport/enum.ConnectionEvent.html
pub fn process_input(&mut self, dgram: Datagram<impl AsRef<[u8]>>, now: Instant) {
self.process_multiple_input(iter::once(dgram), now);
}
pub fn process_multiple_input(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<impl AsRef<[u8]>>>,
now: Instant,
) {
let mut dgrams = dgrams.into_iter().peekable();
qtrace!([self], "Process multiple datagrams");
if dgrams.peek().is_none() {
return;
}
self.conn.process_multiple_input(dgrams, now);
self.process_http3(now);
}
/// Process HTTP3 layer.
/// When `process_output`, `process_input`, or `process` is called we must call this function
/// as well. The functions calls `Http3Client::check_connection_events` to handle events from
/// the QUC layer and calls `Http3Connection::process_sending` to ensure that HTTP/3 layer
/// data, e.g. control frames, are sent.
fn process_http3(&mut self, now: Instant) {
qtrace!([self], "Process http3 internal.");
match self.base_handler.state() {
Http3State::ZeroRtt | Http3State::Connected | Http3State::GoingAway(..) => {
let res = self.check_connection_events();
if self.check_result(now, &res) {
return;
}
self.push_handler
.borrow_mut()
.maybe_send_max_push_id_frame(&mut self.base_handler);
let res = self.base_handler.process_sending(&mut self.conn);
self.check_result(now, &res);
}
Http3State::Closed { .. } => {}
_ => {
let res = self.check_connection_events();
_ = self.check_result(now, &res);
}
}
}
/// The function should be called to check if there is a new UDP packet to be sent. It should
/// be called after a new packet is received and processed and after a timer expires (QUIC
/// needs timers to handle events like PTO detection and timers are not implemented by the neqo
/// library, but instead must be driven by the application).
///
/// `process_output` can return:
/// - a [`Output::Datagram(Datagram)`][1]: data that should be sent as a UDP payload,
/// - a [`Output::Callback(Duration)`][1]: the duration of a timer. `process_output` should be
/// called at least after the time expires,
/// - [`Output::None`][1]: this is returned when `Nttp3Client` is done and can be destroyed.
///
/// The application should call this function repeatedly until a timer value or None is
/// returned. After that, the application should call the function again if a new UDP packet is
/// received and processed or the timer value expires.
///
/// The HTTP/3 neqo implementation drives the HTTP/3 and QUIC layers, therefore this function
/// will call both layers:
/// - First it calls HTTP/3 layer processing (`process_http3`) to make sure the layer writes
/// data to QUIC layer or cancels streams if needed.
/// - Then QUIC layer processing is called - [`Connection::process_output`][3]. This produces a
/// packet or a timer value. It may also produce new [`ConnectionEvent`][2]s, e.g. connection
/// state-change event.
/// - Therefore the HTTP/3 layer processing (`process_http3`) is called again.
///
/// [1]: ../neqo_transport/enum.Output.html
/// [2]: ../neqo_transport/struct.ConnectionEvents.html
/// [3]: ../neqo_transport/struct.Connection.html#method.process_output
pub fn process_output(&mut self, now: Instant) -> Output {
qtrace!([self], "Process output.");
// Maybe send() stuff on http3-managed streams
self.process_http3(now);
let out = self.conn.process_output(now);
// Update H3 for any transport state changes and events
self.process_http3(now);
out
}
/// This function takes the provided result and check for an error.
/// An error results in closing the connection.
fn check_result<ERR>(&mut self, now: Instant, res: &Res<ERR>) -> bool {
match &res {
Err(Error::HttpGoaway) => {
qinfo!([self], "Connection error: goaway stream_id increased.");
self.close(
now,
Error::HttpGeneralProtocol.code(),
"Connection error: goaway stream_id increased",
);
true
}
Err(e) => {
qinfo!([self], "Connection error: {}.", e);
self.close(now, e.code(), format!("{e}"));
true
}
_ => false,
}
}
/// This function checks [`ConnectionEvent`][2]s emitted by the QUIC layer, e.g. connection
/// change state events, new incoming stream data is available, a stream is was reset, etc.
/// The HTTP/3 layer needs to handle these events. Most of the events are handled by
/// [`Http3Connection`][1] by calling appropriate functions, e.g. `handle_state_change`,
/// `handle_stream_reset`, etc. [`Http3Connection`][1] handle functionalities that are common
/// for the client and server side. Some of the functionalities are specific to the client and
/// they are handled by `Http3Client`. For example, [`ConnectionEvent::RecvStreamReadable`][3]
/// event is handled by `Http3Client::handle_stream_readable`. The function calls
/// `Http3Connection::handle_stream_readable` and then hands the return value as appropriate
/// for the client-side.
///
/// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs
/// [2]: ../neqo_transport/enum.ConnectionEvent.html
/// [3]: ../neqo_transport/enum.ConnectionEvent.html#variant.RecvStreamReadable
fn check_connection_events(&mut self) -> Res<()> {
qtrace!([self], "Check connection events.");
while let Some(e) = self.conn.next_event() {
qdebug!([self], "check_connection_events - event {:?}.", e);
match e {
ConnectionEvent::NewStream { stream_id } => {
// During this event we only add a new stream to the Http3Connection stream
// list, with NewStreamHeadReader stream handler.
// This function will not read from the stream and try to decode the stream.
// RecvStreamReadable will be emitted after this event and reading, i.e.
// decoding of a stream will happen during that event.
self.base_handler.add_new_stream(stream_id);
}
ConnectionEvent::SendStreamWritable { stream_id } => {
if let Some(s) = self.base_handler.send_streams.get_mut(&stream_id) {
s.stream_writable();
}
}
ConnectionEvent::RecvStreamReadable { stream_id } => {
self.handle_stream_readable(stream_id)?;
}
ConnectionEvent::RecvStreamReset {
stream_id,
app_error,
} => self
.base_handler
.handle_stream_reset(stream_id, app_error, &mut self.conn)?,
ConnectionEvent::SendStreamStopSending {
stream_id,
app_error,
} => self.base_handler.handle_stream_stop_sending(
stream_id,
app_error,
&mut self.conn,
)?,
ConnectionEvent::SendStreamCreatable { stream_type } => {
self.events.new_requests_creatable(stream_type);
}
ConnectionEvent::AuthenticationNeeded => self.events.authentication_needed(),
ConnectionEvent::EchFallbackAuthenticationNeeded { public_name } => {
self.events.ech_fallback_authentication_needed(public_name);
}
ConnectionEvent::StateChange(state) => {
if self
.base_handler
.handle_state_change(&mut self.conn, &state)?
{
self.events
.connection_state_change(self.base_handler.state());
}
}
ConnectionEvent::ZeroRttRejected => {
self.base_handler.handle_zero_rtt_rejected()?;
self.events.zero_rtt_rejected();
self.push_handler.borrow_mut().handle_zero_rtt_rejected();
}
ConnectionEvent::ResumptionToken(token) => {
if let Some(t) = self.encode_resumption_token(&token) {
self.events.resumption_token(t);
}
}
ConnectionEvent::Datagram(dgram) => {
self.base_handler.handle_datagram(&dgram);
}
ConnectionEvent::SendStreamComplete { .. }
| ConnectionEvent::OutgoingDatagramOutcome { .. }
| ConnectionEvent::IncomingDatagramDropped => {}
}
}
Ok(())
}
/// This function handled new data available on a stream. It calls
/// `Http3Client::handle_stream_readable` and handles its response. Reading streams are mostly
/// handled by [`Http3Connection`][1] because most part of it is common for the client and
/// server. The following actions need to be handled by the client-specific code:
/// - `ReceiveOutput::NewStream(NewStreamType::Push(_))` - the server cannot receive a push
/// stream,
/// - `ReceiveOutput::NewStream(NewStreamType::Http)` - client cannot receive a
/// server-initiated HTTP request,
/// - `ReceiveOutput::NewStream(NewStreamType::WebTransportStream(_))` - because
/// `Http3ClientEvents`is needed and events handler is specific to the client.
/// - `ReceiveOutput::ControlFrames(control_frames)` - some control frame handling differs
/// between the client and the server:
/// - `HFrame::CancelPush` - only the client-side may receive it,
/// - `HFrame::MaxPushId { .. }`, `HFrame::PriorityUpdateRequest { .. } ` and
/// `HFrame::PriorityUpdatePush` can only be receive on the server side,
/// - `HFrame::Goaway { stream_id }` needs specific handling by the client by the protocol
/// specification.
///
/// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs
fn handle_stream_readable(&mut self, stream_id: StreamId) -> Res<()> {
match self
.base_handler
.handle_stream_readable(&mut self.conn, stream_id)?
{
ReceiveOutput::NewStream(NewStreamType::Push(push_id)) => {
self.handle_new_push_stream(stream_id, push_id)
}
ReceiveOutput::NewStream(NewStreamType::Http(_)) => Err(Error::HttpStreamCreation),
ReceiveOutput::NewStream(NewStreamType::WebTransportStream(session_id)) => {
self.base_handler.webtransport_create_stream_remote(
StreamId::from(session_id),
stream_id,
Box::new(self.events.clone()),
Box::new(self.events.clone()),
)?;
let res = self
.base_handler
.handle_stream_readable(&mut self.conn, stream_id)?;
debug_assert!(matches!(res, ReceiveOutput::NoOutput));
Ok(())
}
ReceiveOutput::ControlFrames(control_frames) => {
for f in control_frames {
match f {
HFrame::CancelPush { push_id } => self
.push_handler
.borrow_mut()
.handle_cancel_push(push_id, &mut self.conn, &mut self.base_handler),
HFrame::MaxPushId { .. }
| HFrame::PriorityUpdateRequest { .. }
| HFrame::PriorityUpdatePush { .. } => Err(Error::HttpFrameUnexpected),
HFrame::Goaway { stream_id } => self.handle_goaway(stream_id),
_ => {
unreachable!(
"we should only put MaxPushId, Goaway and PriorityUpdates into control_frames."
);
}
}?;
}
Ok(())
}
_ => Ok(()),
}
}
fn handle_new_push_stream(&mut self, stream_id: StreamId, push_id: u64) -> Res<()> {
if !self.push_handler.borrow().can_receive_push() {
return Err(Error::HttpId);
}
// Add a new push stream to `PushController`. `add_new_push_stream` may return an error
// (this will be a connection error) or a bool.
// If false is returned that means that the stream should be reset because the push has
// been already canceled (CANCEL_PUSH frame or canceling push from the application).
if !self
.push_handler
.borrow_mut()
.add_new_push_stream(push_id, stream_id)?
{
// We are not interested in the result of stream_stop_sending, we are not interested
// in this stream.
mem::drop(
self.conn
.stream_stop_sending(stream_id, Error::HttpRequestCancelled.code()),
);
return Ok(());
}
self.base_handler.add_recv_stream(
stream_id,
Box::new(RecvMessage::new(
&RecvMessageInfo {
message_type: MessageType::Response,
stream_type: Http3StreamType::Push,
stream_id,
first_frame_type: None,
},
Rc::clone(&self.base_handler.qpack_decoder),
Box::new(RecvPushEvents::new(push_id, Rc::clone(&self.push_handler))),
None,
// TODO: think about the right prority for the push streams.
PriorityHandler::new(true, Priority::default()),
)),
);
let res = self
.base_handler
.handle_stream_readable(&mut self.conn, stream_id)?;
debug_assert!(matches!(res, ReceiveOutput::NoOutput));
Ok(())
}
fn handle_goaway(&mut self, goaway_stream_id: StreamId) -> Res<()> {
qinfo!([self], "handle_goaway {}", goaway_stream_id);
if goaway_stream_id.is_uni() || goaway_stream_id.is_server_initiated() {
return Err(Error::HttpId);
}
match self.base_handler.state {
Http3State::Connected => {
self.base_handler.state = Http3State::GoingAway(goaway_stream_id);
}
Http3State::GoingAway(ref mut stream_id) => {
if goaway_stream_id > *stream_id {
return Err(Error::HttpGoaway);
}
*stream_id = goaway_stream_id;
}
Http3State::Closing(..) | Http3State::Closed(..) => {}
_ => unreachable!("Should not receive Goaway frame in this state."),
}
// Issue reset events for streams >= goaway stream id
let send_ids: Vec<StreamId> = self
.base_handler
.send_streams
.iter()
.filter_map(id_gte(goaway_stream_id))
.collect();
for id in send_ids {
// We do not care about streams that are going to be closed.
mem::drop(self.base_handler.handle_stream_stop_sending(
id,
Error::HttpRequestRejected.code(),
&mut self.conn,
));
}
let recv_ids: Vec<StreamId> = self
.base_handler
.recv_streams
.iter()
.filter_map(id_gte(goaway_stream_id))
.collect();
for id in recv_ids {
// We do not care about streams that are going to be closed.
mem::drop(self.base_handler.handle_stream_reset(
id,
Error::HttpRequestRejected.code(),
&mut self.conn,
));
}
self.events.goaway_received();
Ok(())
}
/// Increases `max_stream_data` for a `stream_id`.
///
/// # Errors
///
/// Returns `InvalidStreamId` if a stream does not exist or the receiving
/// side is closed.
pub fn set_stream_max_data(&mut self, stream_id: StreamId, max_data: u64) -> Res<()> {
self.conn.set_stream_max_data(stream_id, max_data)?;
Ok(())
}
#[must_use]
pub fn qpack_decoder_stats(&self) -> QpackStats {
self.base_handler.qpack_decoder.borrow().stats()
}
#[must_use]
pub fn qpack_encoder_stats(&self) -> QpackStats {
self.base_handler.qpack_encoder.borrow().stats()
}
#[must_use]
pub fn transport_stats(&self) -> TransportStats {
self.conn.stats()
}
#[must_use]
pub const fn webtransport_enabled(&self) -> bool {
self.base_handler.webtransport_enabled()
}
}
impl EventProvider for Http3Client {
type Event = Http3ClientEvent;
/// Return true if there are outstanding events.
fn has_events(&self) -> bool {
self.events.has_events()
}
/// Get events that indicate state changes on the connection. This method
/// correctly handles cases where handling one event can obsolete
/// previously-queued events, or cause new events to be generated.
fn next_event(&mut self) -> Option<Self::Event> {
self.events.next_event()
}
}
#[cfg(test)]
mod tests {
use std::{mem, time::Duration};
use neqo_common::{event::Provider, qtrace, Datagram, Decoder, Encoder};
use neqo_crypto::{AllowZeroRtt, AntiReplay, ResumptionToken};
use neqo_qpack::{encoder::QPackEncoder, QpackSettings};
use neqo_transport::{
CloseReason, ConnectionEvent, ConnectionParameters, Output, State, StreamId, StreamType,
Version, MIN_INITIAL_PACKET_SIZE, RECV_BUFFER_SIZE, SEND_BUFFER_SIZE,
};
use test_fixture::{
anti_replay, default_server_h3, fixture_init, new_server, now,
CountingConnectionIdGenerator, DEFAULT_ADDR, DEFAULT_ALPN_H3, DEFAULT_KEYS,
DEFAULT_SERVER_NAME,
};
use super::{
AuthenticationStatus, Connection, Error, HSettings, Header, Http3Client, Http3ClientEvent,
Http3Parameters, Http3State, Rc, RefCell,
};
use crate::{
frames::{HFrame, H3_FRAME_TYPE_SETTINGS, H3_RESERVED_FRAME_TYPES},
qpack_encoder_receiver::EncoderRecvStream,
settings::{HSetting, HSettingType, H3_RESERVED_SETTINGS},
Http3Server, Priority, RecvStream,
};
fn assert_closed(client: &Http3Client, expected: &Error) {
match client.state() {
Http3State::Closing(err) | Http3State::Closed(err) => {
assert_eq!(err, CloseReason::Application(expected.code()));
}
_ => panic!("Wrong state {:?}", client.state()),
};
}
/// Create a http3 client with default configuration.
pub fn default_http3_client() -> Http3Client {
default_http3_client_param(100)
}
pub fn default_http3_client_param(max_table_size: u64) -> Http3Client {
fixture_init();
Http3Client::new(
DEFAULT_SERVER_NAME,
Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
DEFAULT_ADDR,
DEFAULT_ADDR,
Http3Parameters::default()
.connection_parameters(
// Disable compatible upgrade, which complicates tests.
ConnectionParameters::default()
.versions(Version::default(), vec![Version::default()]),
)
.max_table_size_encoder(max_table_size)
.max_table_size_decoder(max_table_size)
.max_blocked_streams(100)
.max_concurrent_push_streams(5),
now(),
)
.expect("create a default client")
}
const CONTROL_STREAM_TYPE: &[u8] = &[0x0];
// Encoder stream data
const ENCODER_STREAM_DATA: &[u8] = &[0x2];
const ENCODER_STREAM_CAP_INSTRUCTION: &[u8] = &[0x3f, 0x45];
// Encoder stream data with a change capacity instruction(0x3f, 0x45 = change capacity to 100)
// This data will be send when 0-RTT is used and we already have a max_table_capacity from
// resumed settings.
const ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION: &[u8] = &[0x2, 0x3f, 0x45];
const ENCODER_STREAM_DATA_WITH_CAP_INST_AND_ENCODING_INST: &[u8] = &[
0x2, 0x3f, 0x45, 0x67, 0xa7, 0xd4, 0xe5, 0x1c, 0x85, 0xb1, 0x1f, 0x86, 0xa7, 0xd7, 0x71,
0xd1, 0x69, 0x7f,
];
// Decoder stream data
const DECODER_STREAM_DATA: &[u8] = &[0x3];
const PUSH_STREAM_TYPE: &[u8] = &[0x1];
const CLIENT_SIDE_CONTROL_STREAM_ID: StreamId = StreamId::new(2);
const CLIENT_SIDE_ENCODER_STREAM_ID: StreamId = StreamId::new(6);
const CLIENT_SIDE_DECODER_STREAM_ID: StreamId = StreamId::new(10);
struct TestServer {
settings: HFrame,
conn: Connection,
control_stream_id: Option<StreamId>,
encoder: Rc<RefCell<QPackEncoder>>,
encoder_receiver: EncoderRecvStream,
encoder_stream_id: Option<StreamId>,
decoder_stream_id: Option<StreamId>,
}
impl TestServer {
pub fn new() -> Self {
Self::new_with_settings(&[
HSetting::new(HSettingType::MaxTableCapacity, 100),
HSetting::new(HSettingType::BlockedStreams, 100),
HSetting::new(HSettingType::MaxHeaderListSize, 10000),
])
}
pub fn new_with_settings(server_settings: &[HSetting]) -> Self {
fixture_init();
let max_table_size = server_settings
.iter()
.find(|s| s.setting_type == HSettingType::MaxTableCapacity)
.map_or(100, |s| s.value);
let max_blocked_streams = u16::try_from(
server_settings
.iter()
.find(|s| s.setting_type == HSettingType::BlockedStreams)
.map_or(100, |s| s.value),
)
.unwrap();
let qpack = Rc::new(RefCell::new(QPackEncoder::new(
&QpackSettings {
max_table_size_encoder: max_table_size,
max_table_size_decoder: max_table_size,
max_blocked_streams,
},
true,
)));
Self {
settings: HFrame::Settings {
settings: HSettings::new(server_settings),
},
conn: default_server_h3(),
control_stream_id: None,
encoder: Rc::clone(&qpack),
encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack),
encoder_stream_id: None,
decoder_stream_id: None,
}
}
pub fn new_with_conn(conn: Connection) -> Self {
let qpack = Rc::new(RefCell::new(QPackEncoder::new(
&QpackSettings {
max_table_size_encoder: 128,
max_table_size_decoder: 128,
max_blocked_streams: 0,
},
true,
)));
Self {
settings: HFrame::Settings {
settings: HSettings::new(&[]),
},
conn,
control_stream_id: None,
encoder: Rc::clone(&qpack),
encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack),
encoder_stream_id: None,
decoder_stream_id: None,
}
}
pub fn create_qpack_streams(&mut self) {
// Create a QPACK encoder stream
self.encoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap());
self.encoder
.borrow_mut()
.add_send_stream(self.encoder_stream_id.unwrap());
self.encoder
.borrow_mut()
.send_encoder_updates(&mut self.conn)
.unwrap();
// Create decoder stream
self.decoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap());
assert_eq!(
self.conn
.stream_send(self.decoder_stream_id.unwrap(), DECODER_STREAM_DATA)
.unwrap(),
1
);
}
pub fn create_control_stream(&mut self) {
// Create control stream
let control = self.conn.stream_create(StreamType::UniDi).unwrap();
qtrace!(["TestServer"], "control stream: {}", control);
self.control_stream_id = Some(control);
// Send stream type on the control stream.
assert_eq!(
self.conn
.stream_send(self.control_stream_id.unwrap(), CONTROL_STREAM_TYPE)
.unwrap(),
1
);
// Encode a settings frame and send it.
let mut enc = Encoder::default();
self.settings.encode(&mut enc);
assert_eq!(
self.conn
.stream_send(self.control_stream_id.unwrap(), enc.as_ref())
.unwrap(),
enc.len()
);
}
pub fn check_client_control_qpack_streams_no_resumption(&mut self) {
self.check_client_control_qpack_streams(
ENCODER_STREAM_DATA,
EXPECTED_REQUEST_HEADER_FRAME,
false,
true,
);
}
pub fn check_control_qpack_request_streams_resumption(
&mut self,
expect_encoder_stream_data: &[u8],
expect_request_header: &[u8],
expect_request: bool,
) {
self.check_client_control_qpack_streams(
expect_encoder_stream_data,
expect_request_header,
expect_request,
false,
);
}
// Check that server has received correct settings and qpack streams.
pub fn check_client_control_qpack_streams(
&mut self,
expect_encoder_stream_data: &[u8],
expect_request_header: &[u8],
expect_request: bool,
expect_connected: bool,
) {
let mut connected = false;
let mut control_stream = false;
let mut qpack_decoder_stream = false;
let mut qpack_encoder_stream = false;
let mut request = false;
while let Some(e) = self.conn.next_event() {
match e {
ConnectionEvent::NewStream { stream_id }
| ConnectionEvent::SendStreamWritable { stream_id } => {
if expect_request {
assert!(matches!(stream_id.as_u64(), 2 | 6 | 10 | 0));
} else {
assert!(matches!(stream_id.as_u64(), 2 | 6 | 10));
}
}
ConnectionEvent::RecvStreamReadable { stream_id } => {
if stream_id == CLIENT_SIDE_CONTROL_STREAM_ID {
self.check_control_stream();
control_stream = true;
} else if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID {
// the qpack encoder stream
self.read_and_check_stream_data(
stream_id,
expect_encoder_stream_data,
false,
);
qpack_encoder_stream = true;
} else if stream_id == CLIENT_SIDE_DECODER_STREAM_ID {
// the qpack decoder stream
self.read_and_check_stream_data(stream_id, DECODER_STREAM_DATA, false);
qpack_decoder_stream = true;
} else if stream_id == 0 {
assert!(expect_request);
self.read_and_check_stream_data(stream_id, expect_request_header, true);
request = true;
} else {
panic!("unexpected event");
}
}
ConnectionEvent::StateChange(State::Connected) => connected = true,
ConnectionEvent::StateChange(_)
| ConnectionEvent::SendStreamCreatable { .. } => {}
_ => panic!("unexpected event"),
}
}
assert_eq!(connected, expect_connected);
assert!(control_stream);
assert!(qpack_encoder_stream);
assert!(qpack_decoder_stream);
assert_eq!(request, expect_request);
}
// Check that the control stream contains default values.
// Expect a SETTINGS frame, some grease, and a MAX_PUSH_ID frame.
// The default test configuration uses:
// - max_table_capacity = 100
// - max_blocked_streams = 100
// and a maximum of 5 push streams.
fn check_control_stream(&mut self) {
let mut buf = [0_u8; 100];
let (amount, fin) = self
.conn
.stream_recv(CLIENT_SIDE_CONTROL_STREAM_ID, &mut buf)
.unwrap();
let mut dec = Decoder::from(&buf[..amount]);
assert_eq!(dec.decode_varint().unwrap(), 0); // control stream type
assert_eq!(dec.decode_varint().unwrap(), 4); // SETTINGS
assert_eq!(
dec.decode_vvec().unwrap(),
&[1, 0x40, 0x64, 7, 0x40, 0x64, 0xab, 0x60, 0x37, 0x42, 0x00]
);
assert_eq!((dec.decode_varint().unwrap() - 0x21) % 0x1f, 0); // Grease
assert!(dec.decode_vvec().unwrap().len() < 8);
assert_eq!(dec.decode_varint().unwrap(), 0xd); // MAX_PUSH_ID
assert_eq!(dec.decode_vvec().unwrap(), &[5]);
assert_eq!(dec.remaining(), 0);
assert!(!fin);
}
pub fn read_and_check_stream_data(
&mut self,
stream_id: StreamId,
expected_data: &[u8],
expected_fin: bool,
) {
let mut buf = [0_u8; 100];
let (amount, fin) = self.conn.stream_recv(stream_id, &mut buf).unwrap();
assert_eq!(fin, expected_fin);
assert_eq!(amount, expected_data.len());
assert_eq!(&buf[..amount], expected_data);
}
pub fn encode_headers(
&mut self,
stream_id: StreamId,
headers: &[Header],
encoder: &mut Encoder,
) {
let header_block =
self.encoder
.borrow_mut()
.encode_header_block(&mut self.conn, headers, stream_id);
let hframe = HFrame::Headers {
header_block: header_block.as_ref().to_vec(),
};
hframe.encode(encoder);
}
}
fn handshake_only(client: &mut Http3Client, server: &mut TestServer) -> Output {
assert_eq!(client.state(), Http3State::Initializing);
let out = client.process_output(now());
assert_eq!(client.state(), Http3State::Initializing);
assert_eq!(*server.conn.state(), State::Init);
let out = server.conn.process(out.dgram(), now());
assert_eq!(*server.conn.state(), State::Handshaking);
let out = client.process(out.dgram(), now());
let out = server.conn.process(out.dgram(), now());
assert!(out.as_dgram_ref().is_none());
let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded);
assert!(client.events().any(authentication_needed));
client.authenticated(AuthenticationStatus::Ok, now());
out
}
// Perform only Quic transport handshake.
fn connect_only_transport_with(client: &mut Http3Client, server: &mut TestServer) {
let out = handshake_only(client, server);
let out = client.process(out.dgram(), now());
let connected = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::Connected));
assert!(client.events().any(connected));
assert_eq!(client.state(), Http3State::Connected);
server.conn.process_input(out.dgram().unwrap(), now());
assert!(server.conn.state().connected());
}
// Perform only Quic transport handshake.
fn connect_only_transport() -> (Http3Client, TestServer) {
let mut client = default_http3_client();
let mut server = TestServer::new();
connect_only_transport_with(&mut client, &mut server);
(client, server)
}
fn send_and_receive_client_settings(client: &mut Http3Client, server: &mut TestServer) {
// send and receive client settings
let out = client.process_output(now());
server.conn.process_input(out.dgram().unwrap(), now());
server.check_client_control_qpack_streams_no_resumption();
}
// Perform Quic transport handshake and exchange Http3 settings.
fn connect_with(client: &mut Http3Client, server: &mut TestServer) {
connect_only_transport_with(client, server);
send_and_receive_client_settings(client, server);
server.create_control_stream();
server.create_qpack_streams();
// Send the server's control and qpack streams data.
let out = server.conn.process(None::<Datagram>, now());
client.process_input(out.dgram().unwrap(), now());
// assert no error occured.
assert_eq!(client.state(), Http3State::Connected);
}
// Perform Quic transport handshake and exchange Http3 settings.
fn connect_with_connection_parameters(
server_conn_params: ConnectionParameters,
) -> (Http3Client, TestServer) {
// connecting with default max_table_size
let mut client = default_http3_client_param(100);
let server = Connection::new_server(
test_fixture::DEFAULT_KEYS,
test_fixture::DEFAULT_ALPN_H3,
Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
--> --------------------
--> maximum size reached
--> --------------------
[ Dauer der Verarbeitung: 0.58 Sekunden
(vorverarbeitet)
]
|
2026-04-04
|