Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/neqo-http3/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 261 kB image not shown  

Impressum connection_client.rs   Sprache: unbekannt

 
Haftungsausschluß.rs KontaktUnknown {[0] [0] [0]}diese Dinge liegen außhalb unserer Verantwortung

// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{
    cell::RefCell,
    fmt::{Debug, Display},
    iter, mem,
    net::SocketAddr,
    rc::Rc,
    time::Instant,
};

use neqo_common::{
    event::Provider as EventProvider, hex, hex_with_len, qdebug, qinfo, qlog::NeqoQlog, qtrace,
    Datagram, Decoder, Encoder, Header, MessageType, Role,
};
use neqo_crypto::{agent::CertificateInfo, AuthenticationStatus, ResumptionToken, SecretAgentInfo};
use neqo_qpack::Stats as QpackStats;
use neqo_transport::{
    streams::SendOrder, AppError, Connection, ConnectionEvent, ConnectionId, ConnectionIdGenerator,
    DatagramTracking, Output, RecvStreamStats, SendStreamStats, Stats as TransportStats, StreamId,
    StreamType, Version, ZeroRttState,
};

use crate::{
    client_events::{Http3ClientEvent, Http3ClientEvents},
    connection::{Http3Connection, Http3State, RequestDescription},
    frames::HFrame,
    push_controller::{PushController, RecvPushEvents},
    recv_message::{RecvMessage, RecvMessageInfo},
    request_target::AsRequestTarget,
    settings::HSettings,
    Error, Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler,
    ReceiveOutput, Res,
};

// This is used for filtering send_streams and recv_Streams with a stream_ids greater than or equal
// a given id. Only the same type (bidirectional or unidirectionsl) streams are filtered.
fn id_gte<U>(base: StreamId) -> impl FnMut((&StreamId, &U)) -> Option<StreamId> + 'static
where
    U: ?Sized,
{
    move |(id, _)| {
        if *id >= base && !(id.is_bidi() ^ base.is_bidi()) {
            Some(*id)
        } else {
            None
        }
    }
}

const fn alpn_from_quic_version(version: Version) -> &'static str {
    match version {
        Version::Version2 | Version::Version1 => "h3",
        Version::Draft29 => "h3-29",
        Version::Draft30 => "h3-30",
        Version::Draft31 => "h3-31",
        Version::Draft32 => "h3-32",
    }
}

/// # The HTTP/3 client API
///
/// This module implements the HTTP/3 client API. The main implementation of the protocol is in
/// [connection.rs](https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs) which
/// implements common behavior for the client-side and the server-side. `Http3Client` structure
/// implements the public API and set of functions that differ between the client and the server.
///
/// The API is used for:
/// - create and close an endpoint:
///   - [`Http3Client::new`]
///   - [`Http3Client::new_with_conn`]
///   - [`Http3Client::close`]
/// - configuring an endpoint:
///   - [`Http3Client::authenticated`]
///   - [`Http3Client::enable_ech`]
///   - [`Http3Client::enable_resumption`]
///   - [`Http3Client::initiate_key_update`]
///   - [`Http3Client::set_qlog`]
/// - retrieving information about a connection:
/// - [`Http3Client::peer_certificate`]
///   - [`Http3Client::qpack_decoder_stats`]
///   - [`Http3Client::qpack_encoder_stats`]
///   - [`Http3Client::transport_stats`]
///   - [`Http3Client::state`]
///   - [`Http3Client::take_resumption_token`]
///   - [`Http3Client::tls_info`]
/// - driving HTTP/3 session:
///   - [`Http3Client::process_output`]
///   - [`Http3Client::process_input`]
///   - [`Http3Client::process`]
/// - create requests, send/receive data, and cancel requests:
///   - [`Http3Client::fetch`]
///   - [`Http3Client::send_data`]
///   - [`Http3Client::read_data`]
///   - [`Http3Client::stream_close_send`]
///   - [`Http3Client::cancel_fetch`]
///   - [`Http3Client::stream_reset_send`]
///   - [`Http3Client::stream_stop_sending`]
///   - [`Http3Client::set_stream_max_data`]
/// - priority feature:
///   - [`Http3Client::priority_update`]
/// - `WebTransport` feature:
///   - [`Http3Client::webtransport_create_session`]
///   - [`Http3Client::webtransport_close_session`]
///   - [`Http3Client::webtransport_create_stream`]
///   - [`Http3Client::webtransport_enabled`]
///
/// ## Examples
///
/// ### Fetching a resource
///
/// ```ignore
/// let mut client = Http3Client::new(...);
///
/// // Perform a handshake
/// ...
///
/// let req = client
///     .fetch(
///         Instant::now(),
///         "GET",
///         &("https", "something.com", "/"),
///         &[Header::new("example1", "value1"), Header::new("example1", "value2")],
///         Priority::default(),
///     )
///     .unwrap();
///
/// client.stream_close_send(req).unwrap();
///
/// loop {
///     // exchange packets
///     ...
///
///     while let Some(event) = client.next_event() {
///         match event {
///             Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin } => {
///                 println!("New response headers received for stream {:?} [fin={?}, interim={:?}]: {:?}",
///                     stream_id,
///                     fin,
///                     interim,
///                     headers,
///                 );
///             }
///             Http3ClientEvent::DataReadable { stream_id } => {
///                 println!("New data available on stream {}", stream_id);
///                let mut buf = [0; 100];
///                let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
///                 println!("Read {:?} bytes from stream {:?} [fin={?}]",
///                     amount,
///                     stream_id,
///                     fin,
///                 );
///             }
///             _ => {
///                 println!("Unhandled event {:?}", event);
///             }
///         }
///     }
/// }
/// ```
///
/// ### Creating a `WebTransport` session
///
/// ```ignore
/// let mut client = Http3Client::new(...);
///
/// // Perform a handshake
/// ...
///
/// // Create a session
/// let wt_session_id = client
///     .webtransport_create_session(now(), &("https", "something.com", "/"), &[])
///     .unwrap();
///
/// loop {
///     // exchange packets
///     ...
///
///     while let Some(event) = client.next_event() {
///         match event {
///             Http3ClientEvent::WebTransport(WebTransportEvent::Session{
///                 stream_id,
///                 status,
///                 ..
///             }) => {
///                 println!("The response from the server: WebTransport session ID {:?} status={:?}",
///                     stream_id,
///                     status,
///                 );
///             }
///             _ => {
///                 println!("Unhandled event {:?}", event);
///             }
///         }
///     }
/// }
/// ```
///
/// ### `WebTransport`: create a stream, send and receive data on the stream
///
/// ```ignore
/// const BUF_CLIENT: &[u8] = &[0; 10];
/// // wt_session_id is the session ID of a newly created WebTransport session, see the example above.
///
/// // create a  stream
/// let wt_stream_id = client
///     .webtransport_create_stream(wt_session_id, StreamType::BiDi)
///     .unwrap();
///
/// // send data
/// let data_sent = client.send_data(wt_stream_id, BUF_CLIENT).unwrap();
/// assert_eq!(data_sent, BUF_CLIENT.len());
///
/// // close stream for sending
/// client.stream_close_send(wt_stream_id).unwrap();
///
/// // wait for data from the server
/// loop {
///     // exchange packets
///     ...
///
///     while let Some(event) = client.next_event() {
///         match event {
///             Http3ClientEvent::DataReadable{ stream_id } => {
///                 println!("Data receivedd form the server on WebTransport stream ID {:?}",
///                     stream_id,
///                 );
///                 let mut buf = [0; 100];
///                 let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
///                 println!("Read {:?} bytes from stream {:?} [fin={?}]",
///                     amount,
///                     stream_id,
///                     fin,
///                 );
///             }
///             _ => {
///                 println!("Unhandled event {:?}", event);
///             }
///         }
///     }
/// }
/// ```
///
/// ### `WebTransport`: receive a new stream form the server
///
/// ```ignore
/// // wt_session_id is the session ID of a newly created WebTransport session, see the example above.
///
/// // wait for a new stream from the server
/// loop {
///     // exchange packets
///     ...
///
///     while let Some(event) = client.next_event() {
///         match event {
///             Http3ClientEvent::WebTransport(WebTransportEvent::NewStream {
///                 stream_id,
///                 session_id,
///             }) => {
///                 println!("New stream received on session{:?}, stream id={:?} stream type={:?}",
///                     sesson_id.stream_id(),
///                     stream_id.stream_id(),
///                     stream_id.stream_type()
///                 );
///             }
///             Http3ClientEvent::DataReadable{ stream_id } => {
///                 println!("Data receivedd form the server on WebTransport stream ID {:?}",
///                     stream_id,
///                 );
///                 let mut buf = [0; 100];
///                 let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
///                 println!("Read {:?} bytes from stream {:?} [fin={:?}]",
///                     amount,
///                     stream_id,
///                     fin,
///                 );
///             }
///             _ => {
///                 println!("Unhandled event {:?}", event);
///             }
///         }
///     }
/// }
/// ```
pub struct Http3Client {
    conn: Connection,
    base_handler: Http3Connection,
    events: Http3ClientEvents,
    push_handler: Rc<RefCell<PushController>>,
}

impl Display for Http3Client {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "Http3 client")
    }
}

impl Http3Client {
    /// # Errors
    ///
    /// Making a `neqo-transport::connection` may produce an error. This can only be a crypto error
    /// if the crypto context can't be created or configured.
    pub fn new(
        server_name: impl Into<String>,
        cid_manager: Rc<RefCell<dyn ConnectionIdGenerator>>,
        local_addr: SocketAddr,
        remote_addr: SocketAddr,
        http3_parameters: Http3Parameters,
        now: Instant,
    ) -> Res<Self> {
        Ok(Self::new_with_conn(
            Connection::new_client(
                server_name,
                &[alpn_from_quic_version(
                    http3_parameters
                        .get_connection_parameters()
                        .get_versions()
                        .initial(),
                )],
                cid_manager,
                local_addr,
                remote_addr,
                http3_parameters.get_connection_parameters().clone(),
                now,
            )?,
            http3_parameters,
        ))
    }

    /// This is a similar function to `new`. In this case, `neqo-transport::connection` has been
    /// already created.
    ///
    /// It is recommended to use `new` instead.
    #[must_use]
    pub fn new_with_conn(c: Connection, http3_parameters: Http3Parameters) -> Self {
        let events = Http3ClientEvents::default();
        let webtransport = http3_parameters.get_webtransport();
        let push_streams = http3_parameters.get_max_concurrent_push_streams();
        let mut base_handler = Http3Connection::new(http3_parameters, Role::Client);
        if webtransport {
            base_handler.set_features_listener(events.clone());
        }
        Self {
            conn: c,
            events: events.clone(),
            push_handler: Rc::new(RefCell::new(PushController::new(push_streams, events))),
            base_handler,
        }
    }

    #[must_use]
    pub const fn role(&self) -> Role {
        self.conn.role()
    }

    /// The function returns the current state of the connection.
    #[must_use]
    pub fn state(&self) -> Http3State {
        self.base_handler.state()
    }

    #[must_use]
    pub fn tls_info(&self) -> Option<&SecretAgentInfo> {
        self.conn.tls_info()
    }

    /// Get the peer's certificate.
    #[must_use]
    pub fn peer_certificate(&self) -> Option<CertificateInfo> {
        self.conn.peer_certificate()
    }

    /// This called when peer certificates have been verified.
    ///
    /// `Http3ClientEvent::AuthenticationNeeded` event is emitted when peer’s certificates are
    /// available and need to be verified. When the verification is completed this function is
    /// called. To inform HTTP/3 session of the verification results.
    pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) {
        self.conn.authenticated(status, now);
    }

    pub fn set_qlog(&mut self, qlog: NeqoQlog) {
        self.conn.set_qlog(qlog);
    }

    /// Enable encrypted client hello (ECH).
    ///
    /// # Errors
    ///
    /// Fails when the configuration provided is bad.
    pub fn enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> {
        self.conn.client_enable_ech(ech_config_list)?;
        Ok(())
    }

    /// Get the connection id, which is useful for disambiguating connections to
    /// the same origin.
    ///
    /// # Panics
    ///
    /// Never, because clients always have this field.
    #[must_use]
    pub fn connection_id(&self) -> &ConnectionId {
        self.conn.odcid().expect("Client always has odcid")
    }

    fn encode_resumption_token(&self, token: &ResumptionToken) -> Option<ResumptionToken> {
        self.base_handler.get_settings().map(|settings| {
            let mut enc = Encoder::default();
            settings.encode_frame_contents(&mut enc);
            enc.encode(token.as_ref());
            ResumptionToken::new(enc.into(), token.expiration_time())
        })
    }

    /// The correct way to obtain a resumption token is to wait for the
    /// `Http3ClientEvent::ResumptionToken` event. To emit the event we are waiting for a
    /// resumtion token and a `NEW_TOKEN` frame to arrive. Some servers don't send `NEW_TOKEN`
    /// frames and in this case, we wait for 3xPTO before emitting an event. This is especially a
    /// problem for short-lived connections, where the connection is closed before any events are
    /// released. This function retrieves the token, without waiting for a `NEW_TOKEN` frame to
    /// arrive.
    ///
    /// In addition to the token, HTTP/3 settings are encoded into the token before giving it to
    /// the application(`encode_resumption_token`). When the resumption token is supplied to a new
    /// connection the HTTP/3 setting will be decoded and used until the setting are received from
    /// the server.
    pub fn take_resumption_token(&mut self, now: Instant) -> Option<ResumptionToken> {
        self.conn
            .take_resumption_token(now)
            .and_then(|t| self.encode_resumption_token(&t))
    }

    /// This may be call if an application has a resumption token. This must be called before
    /// connection starts.
    ///
    /// The resumption token also contains encoded HTTP/3 settings. The settings will be decoded
    /// and used until the setting are received from the server.
    ///
    /// # Errors
    ///
    /// An error is return if token cannot be decoded or a connection is is a wrong state.
    ///
    /// # Panics
    ///
    /// On closing if the base handler can't handle it (debug only).
    pub fn enable_resumption(&mut self, now: Instant, token: impl AsRef<[u8]>) -> Res<()> {
        if self.base_handler.state != Http3State::Initializing {
            return Err(Error::InvalidState);
        }
        let mut dec = Decoder::from(token.as_ref());
        let Some(settings_slice) = dec.decode_vvec() else {
            return Err(Error::InvalidResumptionToken);
        };
        qtrace!([self], "  settings {}", hex_with_len(settings_slice));
        let mut dec_settings = Decoder::from(settings_slice);
        let mut settings = HSettings::default();
        Error::map_error(
            settings.decode_frame_contents(&mut dec_settings),
            Error::InvalidResumptionToken,
        )?;
        let tok = dec.decode_remainder();
        qtrace!([self], "  Transport token {}", hex(tok));
        self.conn.enable_resumption(now, tok)?;
        if self.conn.state().closed() {
            let state = self.conn.state().clone();
            let res = self
                .base_handler
                .handle_state_change(&mut self.conn, &state);
            debug_assert_eq!(Ok(true), res);
            return Err(Error::FatalError);
        }
        if self.conn.zero_rtt_state() == ZeroRttState::Sending {
            self.base_handler
                .set_0rtt_settings(&mut self.conn, settings)?;
            self.events
                .connection_state_change(self.base_handler.state());
            self.push_handler
                .borrow_mut()
                .maybe_send_max_push_id_frame(&mut self.base_handler);
        }
        Ok(())
    }

    /// This is call to close a connection.
    pub fn close<S>(&mut self, now: Instant, error: AppError, msg: S)
    where
        S: AsRef<str> + Display,
    {
        qinfo!([self], "Close the connection error={} msg={}.", error, msg);
        if !matches!(
            self.base_handler.state,
            Http3State::Closing(_) | Http3State::Closed(_)
        ) {
            self.push_handler.borrow_mut().clear();
            self.conn.close(now, error, msg);
            self.base_handler.close(error);
            self.events
                .connection_state_change(self.base_handler.state());
        }
    }

    /// Attempt to force a key update.
    ///
    /// # Errors
    ///
    /// If the connection isn't confirmed, or there is an outstanding key update, this
    /// returns `Err(Error::TransportError(neqo_transport::Error::KeyUpdateBlocked))`.
    pub fn initiate_key_update(&mut self) -> Res<()> {
        self.conn.initiate_key_update()?;
        Ok(())
    }

    // API: Request/response

    /// The function fetches a resource using `method`, `target` and `headers`. A response body
    /// may be added by calling `send_data`. `stream_close_send` must be sent to finish the request
    /// even if request data are not sent.
    ///
    /// # Errors
    ///
    /// If a new stream cannot be created an error will be return.
    ///
    /// # Panics
    ///
    /// `SendMessage` implements `http_stream` so it will not panic.
    pub fn fetch<'x, 't: 'x, T>(
        &mut self,
        now: Instant,
        method: &'t str,
        target: &'t T,
        headers: &'t [Header],
        priority: Priority,
    ) -> Res<StreamId>
    where
        T: AsRequestTarget<'x> + ?Sized + Debug,
    {
        let output = self.base_handler.fetch(
            &mut self.conn,
            Box::new(self.events.clone()),
            Box::new(self.events.clone()),
            Some(Rc::clone(&self.push_handler)),
            &RequestDescription {
                method,
                connect_type: None,
                target,
                headers,
                priority,
            },
        );
        if let Err(e) = &output {
            if e.connection_error() {
                self.close(now, e.code(), "");
            }
        }
        output
    }

    /// Send an [`PRIORITY_UPDATE`-frame][1] on next `Http3Client::process_output()` call.
    /// Returns if the priority got changed.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist
    ///
    /// [1]: https://datatracker.ietf.org/doc/html/draft-kazuho-httpbis-priority-04#section-5.2
    pub fn priority_update(&mut self, stream_id: StreamId, priority: Priority) -> Res<bool> {
        self.base_handler.queue_update_priority(stream_id, priority)
    }

    /// An application may cancel a stream(request).
    /// Both sides, the receiviing and sending side, sending and receiving side, will be closed.
    ///
    /// # Errors
    ///
    /// An error will be return if a stream does not exist.
    pub fn cancel_fetch(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
        qinfo!([self], "reset_stream {} error={}.", stream_id, error);
        self.base_handler
            .cancel_fetch(stream_id, error, &mut self.conn)
    }

    /// This is call when application is done sending a request.
    ///
    /// # Errors
    ///
    /// An error will be return if stream does not exist.
    pub fn stream_close_send(&mut self, stream_id: StreamId) -> Res<()> {
        qdebug!([self], "Close sending side stream={}.", stream_id);
        self.base_handler
            .stream_close_send(&mut self.conn, stream_id)
    }

    /// # Errors
    ///
    /// An error will be return if a stream does not exist.
    pub fn stream_reset_send(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
        qinfo!([self], "stream_reset_send {} error={}.", stream_id, error);
        self.base_handler
            .stream_reset_send(&mut self.conn, stream_id, error)
    }

    /// # Errors
    ///
    /// An error will be return if a stream does not exist.
    pub fn stream_stop_sending(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
        qinfo!([self], "stream_stop_sending {} error={}.", stream_id, error);
        self.base_handler
            .stream_stop_sending(&mut self.conn, stream_id, error)
    }

    /// This function is used for regular HTTP requests and `WebTransport` streams.
    /// In the case of regular HTTP requests, the request body is supplied using this function, and
    /// headers are supplied through the `fetch` function.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist,
    /// `AlreadyClosed` if the stream has already been closed.
    /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
    /// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
    /// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
    /// supplied.
    pub fn send_data(&mut self, stream_id: StreamId, buf: &[u8]) -> Res<usize> {
        qinfo!(
            [self],
            "send_data from stream {} sending {} bytes.",
            stream_id,
            buf.len()
        );
        self.base_handler
            .send_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?
            .send_data(&mut self.conn, buf)
    }

    /// Response data are read directly into a buffer supplied as a parameter of this function to
    /// avoid copying data.
    ///
    /// # Errors
    ///
    /// It returns an error if a stream does not exist or an error happen while reading a stream,
    /// e.g. early close, protocol error, etc.
    pub fn read_data(
        &mut self,
        now: Instant,
        stream_id: StreamId,
        buf: &mut [u8],
    ) -> Res<(usize, bool)> {
        qdebug!([self], "read_data from stream {}.", stream_id);
        let res = self.base_handler.read_data(&mut self.conn, stream_id, buf);
        if let Err(e) = &res {
            if e.connection_error() {
                self.close(now, e.code(), "");
            }
        }
        res
    }

    // API: Push streams

    /// Cancel a push
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist.
    pub fn cancel_push(&mut self, push_id: u64) -> Res<()> {
        self.push_handler
            .borrow_mut()
            .cancel(push_id, &mut self.conn, &mut self.base_handler)
    }

    /// Push response data are read directly into a buffer supplied as a parameter of this function
    /// to avoid copying data.
    ///
    /// # Errors
    ///
    /// It returns an error if a stream does not exist(`InvalidStreamId`) or an error has happened
    /// while reading a stream, e.g. early close, protocol error, etc.
    pub fn push_read_data(
        &mut self,
        now: Instant,
        push_id: u64,
        buf: &mut [u8],
    ) -> Res<(usize, bool)> {
        let stream_id = self
            .push_handler
            .borrow_mut()
            .get_active_stream_id(push_id)
            .ok_or(Error::InvalidStreamId)?;
        self.conn.stream_keep_alive(stream_id, true)?;
        self.read_data(now, stream_id, buf)
    }

    // API WebTransport
    //
    /// # Errors
    ///
    /// If `WebTransport` cannot be created, e.g. the `WebTransport` support is
    /// not negotiated or the HTTP/3 connection is closed.
    pub fn webtransport_create_session<'x, 't: 'x, T>(
        &mut self,
        now: Instant,
        target: &'t T,
        headers: &'t [Header],
    ) -> Res<StreamId>
    where
        T: AsRequestTarget<'x> + ?Sized + Debug,
    {
        let output = self.base_handler.webtransport_create_session(
            &mut self.conn,
            Box::new(self.events.clone()),
            target,
            headers,
        );

        if let Err(e) = &output {
            if e.connection_error() {
                self.close(now, e.code(), "");
            }
        }
        output
    }

    /// Close `WebTransport` cleanly
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist,
    /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
    /// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
    /// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
    /// supplied.
    pub fn webtransport_close_session(
        &mut self,
        session_id: StreamId,
        error: u32,
        message: &str,
    ) -> Res<()> {
        self.base_handler
            .webtransport_close_session(&mut self.conn, session_id, error, message)
    }

    /// # Errors
    ///
    /// This may return an error if the particular session does not exist
    /// or the connection is not in the active state.
    pub fn webtransport_create_stream(
        &mut self,
        session_id: StreamId,
        stream_type: StreamType,
    ) -> Res<StreamId> {
        self.base_handler.webtransport_create_stream_local(
            &mut self.conn,
            session_id,
            stream_type,
            Box::new(self.events.clone()),
            Box::new(self.events.clone()),
        )
    }

    /// Send `WebTransport` datagram.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    /// The function returns `TooMuchData` if the supply buffer is bigger than
    /// the allowed remote datagram size.
    pub fn webtransport_send_datagram(
        &mut self,
        session_id: StreamId,
        buf: &[u8],
        id: impl Into<DatagramTracking>,
    ) -> Res<()> {
        qtrace!("webtransport_send_datagram session:{:?}", session_id);
        self.base_handler
            .webtransport_send_datagram(session_id, &mut self.conn, buf, id)
    }

    /// Returns the current max size of a datagram that can fit into a packet.
    /// The value will change over time depending on the encoded size of the
    /// packet number, ack frames, etc.
    ///
    /// # Errors
    ///
    /// The function returns `NotAvailable` if datagrams are not enabled.
    ///
    /// # Panics
    ///
    /// This cannot panic. The max varint length is 8.
    pub fn webtransport_max_datagram_size(&self, session_id: StreamId) -> Res<u64> {
        Ok(self.conn.max_datagram_size()?
            - u64::try_from(Encoder::varint_len(session_id.as_u64())).unwrap())
    }

    /// Sets the `SendOrder` for a given stream
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    ///
    /// # Panics
    ///
    /// This cannot panic.
    pub fn webtransport_set_sendorder(
        &mut self,
        stream_id: StreamId,
        sendorder: Option<SendOrder>,
    ) -> Res<()> {
        Http3Connection::stream_set_sendorder(&mut self.conn, stream_id, sendorder)
    }

    /// Sets the `Fairness` for a given stream
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    ///
    /// # Panics
    ///
    /// This cannot panic.
    pub fn webtransport_set_fairness(&mut self, stream_id: StreamId, fairness: bool) -> Res<()> {
        Http3Connection::stream_set_fairness(&mut self.conn, stream_id, fairness)
    }

    /// Returns the current `SendStreamStats` of a `WebTransportSendStream`.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist.
    pub fn webtransport_send_stream_stats(&mut self, stream_id: StreamId) -> Res<SendStreamStats> {
        self.base_handler
            .send_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?
            .stats(&mut self.conn)
    }

    /// Returns the current `RecvStreamStats` of a `WebTransportRecvStream`.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist.
    pub fn webtransport_recv_stream_stats(&mut self, stream_id: StreamId) -> Res<RecvStreamStats> {
        self.base_handler
            .recv_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?
            .stats(&mut self.conn)
    }

    /// This function combines  `process_input` and `process_output` function.
    pub fn process(&mut self, dgram: Option<Datagram<impl AsRef<[u8]>>>, now: Instant) -> Output {
        qtrace!([self], "Process.");
        if let Some(d) = dgram {
            self.process_input(d, now);
        }
        self.process_output(now)
    }

    /// The function should be called when there is a new UDP packet available. The function will
    /// handle the packet payload.
    ///
    /// First, the payload will be handled by the QUIC layer. Afterward, `process_http3` will be
    /// called to handle new [`ConnectionEvent`][1]s.
    ///
    /// After this function is called `process_output` should be called to check whether new
    /// packets need to be sent or if a timer needs to be updated.
    ///
    /// [1]: ../neqo_transport/enum.ConnectionEvent.html
    pub fn process_input(&mut self, dgram: Datagram<impl AsRef<[u8]>>, now: Instant) {
        self.process_multiple_input(iter::once(dgram), now);
    }

    pub fn process_multiple_input(
        &mut self,
        dgrams: impl IntoIterator<Item = Datagram<impl AsRef<[u8]>>>,
        now: Instant,
    ) {
        let mut dgrams = dgrams.into_iter().peekable();
        qtrace!([self], "Process multiple datagrams");
        if dgrams.peek().is_none() {
            return;
        }
        self.conn.process_multiple_input(dgrams, now);
        self.process_http3(now);
    }

    /// Process HTTP3 layer.
    /// When `process_output`, `process_input`, or `process` is called we must call this function
    /// as well. The functions calls `Http3Client::check_connection_events` to handle events from
    /// the QUC layer and calls `Http3Connection::process_sending` to ensure that HTTP/3 layer
    /// data, e.g. control frames, are sent.
    fn process_http3(&mut self, now: Instant) {
        qtrace!([self], "Process http3 internal.");
        match self.base_handler.state() {
            Http3State::ZeroRtt | Http3State::Connected | Http3State::GoingAway(..) => {
                let res = self.check_connection_events();
                if self.check_result(now, &res) {
                    return;
                }
                self.push_handler
                    .borrow_mut()
                    .maybe_send_max_push_id_frame(&mut self.base_handler);
                let res = self.base_handler.process_sending(&mut self.conn);
                self.check_result(now, &res);
            }
            Http3State::Closed { .. } => {}
            _ => {
                let res = self.check_connection_events();
                _ = self.check_result(now, &res);
            }
        }
    }

    /// The function should be called to check if there is a new UDP packet to be sent. It should
    /// be called after a new packet is received and processed and after a timer expires (QUIC
    /// needs timers to handle events like PTO detection and timers are not implemented by the neqo
    /// library, but instead must be driven by the application).
    ///
    /// `process_output` can return:
    /// - a [`Output::Datagram(Datagram)`][1]: data that should be sent as a UDP payload,
    /// - a [`Output::Callback(Duration)`][1]: the duration of a  timer. `process_output` should be
    ///   called at least after the time expires,
    /// - [`Output::None`][1]: this is returned when `Nttp3Client` is done and can be destroyed.
    ///
    /// The application should call this function repeatedly until a timer value or None is
    /// returned. After that, the application should call the function again if a new UDP packet is
    /// received and processed or the timer value expires.
    ///
    /// The HTTP/3 neqo implementation drives the HTTP/3 and QUIC layers, therefore this function
    /// will call both layers:
    ///  - First it calls HTTP/3 layer processing (`process_http3`) to make sure the layer writes
    ///    data to QUIC layer or cancels streams if needed.
    ///  - Then QUIC layer processing is called - [`Connection::process_output`][3]. This produces a
    ///    packet or a timer value. It may also produce new [`ConnectionEvent`][2]s, e.g. connection
    ///    state-change event.
    ///  - Therefore the HTTP/3 layer processing (`process_http3`) is called again.
    ///
    /// [1]: ../neqo_transport/enum.Output.html
    /// [2]: ../neqo_transport/struct.ConnectionEvents.html
    /// [3]: ../neqo_transport/struct.Connection.html#method.process_output
    pub fn process_output(&mut self, now: Instant) -> Output {
        qtrace!([self], "Process output.");

        // Maybe send() stuff on http3-managed streams
        self.process_http3(now);

        let out = self.conn.process_output(now);

        // Update H3 for any transport state changes and events
        self.process_http3(now);

        out
    }

    /// This function takes the provided result and check for an error.
    /// An error results in closing the connection.
    fn check_result<ERR>(&mut self, now: Instant, res: &Res<ERR>) -> bool {
        match &res {
            Err(Error::HttpGoaway) => {
                qinfo!([self], "Connection error: goaway stream_id increased.");
                self.close(
                    now,
                    Error::HttpGeneralProtocol.code(),
                    "Connection error: goaway stream_id increased",
                );
                true
            }
            Err(e) => {
                qinfo!([self], "Connection error: {}.", e);
                self.close(now, e.code(), format!("{e}"));
                true
            }
            _ => false,
        }
    }

    /// This function checks [`ConnectionEvent`][2]s emitted by the QUIC layer, e.g. connection
    /// change state events, new incoming stream data is available, a stream is was reset, etc.
    /// The HTTP/3 layer needs to handle these events. Most of the events are handled by
    /// [`Http3Connection`][1] by calling appropriate functions, e.g. `handle_state_change`,
    /// `handle_stream_reset`, etc. [`Http3Connection`][1] handle functionalities that are common
    /// for the client and server side. Some of the functionalities are specific to the client and
    /// they are handled by `Http3Client`. For example, [`ConnectionEvent::RecvStreamReadable`][3]
    /// event is handled by `Http3Client::handle_stream_readable`. The  function calls
    /// `Http3Connection::handle_stream_readable` and then hands the return value as appropriate
    /// for the client-side.
    ///
    /// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs
    /// [2]: ../neqo_transport/enum.ConnectionEvent.html
    /// [3]: ../neqo_transport/enum.ConnectionEvent.html#variant.RecvStreamReadable
    fn check_connection_events(&mut self) -> Res<()> {
        qtrace!([self], "Check connection events.");
        while let Some(e) = self.conn.next_event() {
            qdebug!([self], "check_connection_events - event {:?}.", e);
            match e {
                ConnectionEvent::NewStream { stream_id } => {
                    // During this event we only add a new stream to the Http3Connection stream
                    // list, with NewStreamHeadReader stream handler.
                    // This function will not read from the stream and try to decode the stream.
                    // RecvStreamReadable  will be emitted after this event and reading, i.e.
                    // decoding of a stream will happen during that event.
                    self.base_handler.add_new_stream(stream_id);
                }
                ConnectionEvent::SendStreamWritable { stream_id } => {
                    if let Some(s) = self.base_handler.send_streams.get_mut(&stream_id) {
                        s.stream_writable();
                    }
                }
                ConnectionEvent::RecvStreamReadable { stream_id } => {
                    self.handle_stream_readable(stream_id)?;
                }
                ConnectionEvent::RecvStreamReset {
                    stream_id,
                    app_error,
                } => self
                    .base_handler
                    .handle_stream_reset(stream_id, app_error, &mut self.conn)?,
                ConnectionEvent::SendStreamStopSending {
                    stream_id,
                    app_error,
                } => self.base_handler.handle_stream_stop_sending(
                    stream_id,
                    app_error,
                    &mut self.conn,
                )?,

                ConnectionEvent::SendStreamCreatable { stream_type } => {
                    self.events.new_requests_creatable(stream_type);
                }
                ConnectionEvent::AuthenticationNeeded => self.events.authentication_needed(),
                ConnectionEvent::EchFallbackAuthenticationNeeded { public_name } => {
                    self.events.ech_fallback_authentication_needed(public_name);
                }
                ConnectionEvent::StateChange(state) => {
                    if self
                        .base_handler
                        .handle_state_change(&mut self.conn, &state)?
                    {
                        self.events
                            .connection_state_change(self.base_handler.state());
                    }
                }
                ConnectionEvent::ZeroRttRejected => {
                    self.base_handler.handle_zero_rtt_rejected()?;
                    self.events.zero_rtt_rejected();
                    self.push_handler.borrow_mut().handle_zero_rtt_rejected();
                }
                ConnectionEvent::ResumptionToken(token) => {
                    if let Some(t) = self.encode_resumption_token(&token) {
                        self.events.resumption_token(t);
                    }
                }
                ConnectionEvent::Datagram(dgram) => {
                    self.base_handler.handle_datagram(&dgram);
                }
                ConnectionEvent::SendStreamComplete { .. }
                | ConnectionEvent::OutgoingDatagramOutcome { .. }
                | ConnectionEvent::IncomingDatagramDropped => {}
            }
        }
        Ok(())
    }

    /// This function handled new data available on a stream. It calls
    /// `Http3Client::handle_stream_readable` and handles its response. Reading streams are mostly
    /// handled by [`Http3Connection`][1] because most part of it is common for the client and
    /// server. The following actions need to be handled by the client-specific code:
    ///  - `ReceiveOutput::NewStream(NewStreamType::Push(_))` - the server cannot receive a push
    ///    stream,
    ///  - `ReceiveOutput::NewStream(NewStreamType::Http)` - client cannot  receive a
    ///    server-initiated HTTP request,
    ///  - `ReceiveOutput::NewStream(NewStreamType::WebTransportStream(_))` - because
    ///    `Http3ClientEvents`is needed and events handler is specific to the client.
    ///  - `ReceiveOutput::ControlFrames(control_frames)` - some control frame handling differs
    ///    between the  client and the server:
    ///     - `HFrame::CancelPush` - only the client-side may receive it,
    ///     - `HFrame::MaxPushId { .. }`, `HFrame::PriorityUpdateRequest { .. } ` and
    ///       `HFrame::PriorityUpdatePush` can only be receive on the server side,
    ///     - `HFrame::Goaway { stream_id }` needs specific handling by the client by the protocol
    ///       specification.
    ///
    /// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs
    fn handle_stream_readable(&mut self, stream_id: StreamId) -> Res<()> {
        match self
            .base_handler
            .handle_stream_readable(&mut self.conn, stream_id)?
        {
            ReceiveOutput::NewStream(NewStreamType::Push(push_id)) => {
                self.handle_new_push_stream(stream_id, push_id)
            }
            ReceiveOutput::NewStream(NewStreamType::Http(_)) => Err(Error::HttpStreamCreation),
            ReceiveOutput::NewStream(NewStreamType::WebTransportStream(session_id)) => {
                self.base_handler.webtransport_create_stream_remote(
                    StreamId::from(session_id),
                    stream_id,
                    Box::new(self.events.clone()),
                    Box::new(self.events.clone()),
                )?;
                let res = self
                    .base_handler
                    .handle_stream_readable(&mut self.conn, stream_id)?;
                debug_assert!(matches!(res, ReceiveOutput::NoOutput));
                Ok(())
            }
            ReceiveOutput::ControlFrames(control_frames) => {
                for f in control_frames {
                    match f {
                        HFrame::CancelPush { push_id } => self
                            .push_handler
                            .borrow_mut()
                            .handle_cancel_push(push_id, &mut self.conn, &mut self.base_handler),
                        HFrame::MaxPushId { .. }
                        | HFrame::PriorityUpdateRequest { .. }
                        | HFrame::PriorityUpdatePush { .. } => Err(Error::HttpFrameUnexpected),
                        HFrame::Goaway { stream_id } => self.handle_goaway(stream_id),
                        _ => {
                            unreachable!(
                                "we should only put MaxPushId, Goaway and PriorityUpdates into control_frames."
                            );
                        }
                    }?;
                }
                Ok(())
            }
            _ => Ok(()),
        }
    }

    fn handle_new_push_stream(&mut self, stream_id: StreamId, push_id: u64) -> Res<()> {
        if !self.push_handler.borrow().can_receive_push() {
            return Err(Error::HttpId);
        }

        // Add a new push stream to `PushController`. `add_new_push_stream` may return an error
        // (this will be a connection error) or a bool.
        // If false is returned that means that the stream should be reset because the push has
        // been already canceled (CANCEL_PUSH frame or canceling push from the application).
        if !self
            .push_handler
            .borrow_mut()
            .add_new_push_stream(push_id, stream_id)?
        {
            // We are not interested in the result of stream_stop_sending, we are not interested
            // in this stream.
            mem::drop(
                self.conn
                    .stream_stop_sending(stream_id, Error::HttpRequestCancelled.code()),
            );
            return Ok(());
        }

        self.base_handler.add_recv_stream(
            stream_id,
            Box::new(RecvMessage::new(
                &RecvMessageInfo {
                    message_type: MessageType::Response,
                    stream_type: Http3StreamType::Push,
                    stream_id,
                    first_frame_type: None,
                },
                Rc::clone(&self.base_handler.qpack_decoder),
                Box::new(RecvPushEvents::new(push_id, Rc::clone(&self.push_handler))),
                None,
                // TODO: think about the right prority for the push streams.
                PriorityHandler::new(true, Priority::default()),
            )),
        );
        let res = self
            .base_handler
            .handle_stream_readable(&mut self.conn, stream_id)?;
        debug_assert!(matches!(res, ReceiveOutput::NoOutput));
        Ok(())
    }

    fn handle_goaway(&mut self, goaway_stream_id: StreamId) -> Res<()> {
        qinfo!([self], "handle_goaway {}", goaway_stream_id);

        if goaway_stream_id.is_uni() || goaway_stream_id.is_server_initiated() {
            return Err(Error::HttpId);
        }

        match self.base_handler.state {
            Http3State::Connected => {
                self.base_handler.state = Http3State::GoingAway(goaway_stream_id);
            }
            Http3State::GoingAway(ref mut stream_id) => {
                if goaway_stream_id > *stream_id {
                    return Err(Error::HttpGoaway);
                }
                *stream_id = goaway_stream_id;
            }
            Http3State::Closing(..) | Http3State::Closed(..) => {}
            _ => unreachable!("Should not receive Goaway frame in this state."),
        }

        // Issue reset events for streams >= goaway stream id
        let send_ids: Vec<StreamId> = self
            .base_handler
            .send_streams
            .iter()
            .filter_map(id_gte(goaway_stream_id))
            .collect();
        for id in send_ids {
            // We do not care about streams that are going to be closed.
            mem::drop(self.base_handler.handle_stream_stop_sending(
                id,
                Error::HttpRequestRejected.code(),
                &mut self.conn,
            ));
        }

        let recv_ids: Vec<StreamId> = self
            .base_handler
            .recv_streams
            .iter()
            .filter_map(id_gte(goaway_stream_id))
            .collect();
        for id in recv_ids {
            // We do not care about streams that are going to be closed.
            mem::drop(self.base_handler.handle_stream_reset(
                id,
                Error::HttpRequestRejected.code(),
                &mut self.conn,
            ));
        }

        self.events.goaway_received();

        Ok(())
    }

    /// Increases `max_stream_data` for a `stream_id`.
    ///
    /// # Errors
    ///
    /// Returns `InvalidStreamId` if a stream does not exist or the receiving
    /// side is closed.
    pub fn set_stream_max_data(&mut self, stream_id: StreamId, max_data: u64) -> Res<()> {
        self.conn.set_stream_max_data(stream_id, max_data)?;
        Ok(())
    }

    #[must_use]
    pub fn qpack_decoder_stats(&self) -> QpackStats {
        self.base_handler.qpack_decoder.borrow().stats()
    }

    #[must_use]
    pub fn qpack_encoder_stats(&self) -> QpackStats {
        self.base_handler.qpack_encoder.borrow().stats()
    }

    #[must_use]
    pub fn transport_stats(&self) -> TransportStats {
        self.conn.stats()
    }

    #[must_use]
    pub const fn webtransport_enabled(&self) -> bool {
        self.base_handler.webtransport_enabled()
    }
}

impl EventProvider for Http3Client {
    type Event = Http3ClientEvent;

    /// Return true if there are outstanding events.
    fn has_events(&self) -> bool {
        self.events.has_events()
    }

    /// Get events that indicate state changes on the connection. This method
    /// correctly handles cases where handling one event can obsolete
    /// previously-queued events, or cause new events to be generated.
    fn next_event(&mut self) -> Option<Self::Event> {
        self.events.next_event()
    }
}

#[cfg(test)]
mod tests {
    use std::{mem, time::Duration};

    use neqo_common::{event::Provider, qtrace, Datagram, Decoder, Encoder};
    use neqo_crypto::{AllowZeroRtt, AntiReplay, ResumptionToken};
    use neqo_qpack::{encoder::QPackEncoder, QpackSettings};
    use neqo_transport::{
        CloseReason, ConnectionEvent, ConnectionParameters, Output, State, StreamId, StreamType,
        Version, MIN_INITIAL_PACKET_SIZE, RECV_BUFFER_SIZE, SEND_BUFFER_SIZE,
    };
    use test_fixture::{
        anti_replay, default_server_h3, fixture_init, new_server, now,
        CountingConnectionIdGenerator, DEFAULT_ADDR, DEFAULT_ALPN_H3, DEFAULT_KEYS,
        DEFAULT_SERVER_NAME,
    };

    use super::{
        AuthenticationStatus, Connection, Error, HSettings, Header, Http3Client, Http3ClientEvent,
        Http3Parameters, Http3State, Rc, RefCell,
    };
    use crate::{
        frames::{HFrame, H3_FRAME_TYPE_SETTINGS, H3_RESERVED_FRAME_TYPES},
        qpack_encoder_receiver::EncoderRecvStream,
        settings::{HSetting, HSettingType, H3_RESERVED_SETTINGS},
        Http3Server, Priority, RecvStream,
    };

    fn assert_closed(client: &Http3Client, expected: &Error) {
        match client.state() {
            Http3State::Closing(err) | Http3State::Closed(err) => {
                assert_eq!(err, CloseReason::Application(expected.code()));
            }
            _ => panic!("Wrong state {:?}", client.state()),
        };
    }

    /// Create a http3 client with default configuration.
    pub fn default_http3_client() -> Http3Client {
        default_http3_client_param(100)
    }

    pub fn default_http3_client_param(max_table_size: u64) -> Http3Client {
        fixture_init();
        Http3Client::new(
            DEFAULT_SERVER_NAME,
            Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
            DEFAULT_ADDR,
            DEFAULT_ADDR,
            Http3Parameters::default()
                .connection_parameters(
                    // Disable compatible upgrade, which complicates tests.
                    ConnectionParameters::default()
                        .versions(Version::default(), vec![Version::default()]),
                )
                .max_table_size_encoder(max_table_size)
                .max_table_size_decoder(max_table_size)
                .max_blocked_streams(100)
                .max_concurrent_push_streams(5),
            now(),
        )
        .expect("create a default client")
    }

    const CONTROL_STREAM_TYPE: &[u8] = &[0x0];

    // Encoder stream data
    const ENCODER_STREAM_DATA: &[u8] = &[0x2];
    const ENCODER_STREAM_CAP_INSTRUCTION: &[u8] = &[0x3f, 0x45];

    // Encoder stream data with a change capacity instruction(0x3f, 0x45 = change capacity to 100)
    // This data will be send when 0-RTT is used and we already have a max_table_capacity from
    // resumed settings.
    const ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION: &[u8] = &[0x2, 0x3f, 0x45];

    const ENCODER_STREAM_DATA_WITH_CAP_INST_AND_ENCODING_INST: &[u8] = &[
        0x2, 0x3f, 0x45, 0x67, 0xa7, 0xd4, 0xe5, 0x1c, 0x85, 0xb1, 0x1f, 0x86, 0xa7, 0xd7, 0x71,
        0xd1, 0x69, 0x7f,
    ];

    // Decoder stream data
    const DECODER_STREAM_DATA: &[u8] = &[0x3];

    const PUSH_STREAM_TYPE: &[u8] = &[0x1];

    const CLIENT_SIDE_CONTROL_STREAM_ID: StreamId = StreamId::new(2);
    const CLIENT_SIDE_ENCODER_STREAM_ID: StreamId = StreamId::new(6);
    const CLIENT_SIDE_DECODER_STREAM_ID: StreamId = StreamId::new(10);

    struct TestServer {
        settings: HFrame,
        conn: Connection,
        control_stream_id: Option<StreamId>,
        encoder: Rc<RefCell<QPackEncoder>>,
        encoder_receiver: EncoderRecvStream,
        encoder_stream_id: Option<StreamId>,
        decoder_stream_id: Option<StreamId>,
    }

    impl TestServer {
        pub fn new() -> Self {
            Self::new_with_settings(&[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ])
        }

        pub fn new_with_settings(server_settings: &[HSetting]) -> Self {
            fixture_init();
            let max_table_size = server_settings
                .iter()
                .find(|s| s.setting_type == HSettingType::MaxTableCapacity)
                .map_or(100, |s| s.value);
            let max_blocked_streams = u16::try_from(
                server_settings
                    .iter()
                    .find(|s| s.setting_type == HSettingType::BlockedStreams)
                    .map_or(100, |s| s.value),
            )
            .unwrap();
            let qpack = Rc::new(RefCell::new(QPackEncoder::new(
                &QpackSettings {
                    max_table_size_encoder: max_table_size,
                    max_table_size_decoder: max_table_size,
                    max_blocked_streams,
                },
                true,
            )));
            Self {
                settings: HFrame::Settings {
                    settings: HSettings::new(server_settings),
                },
                conn: default_server_h3(),
                control_stream_id: None,
                encoder: Rc::clone(&qpack),
                encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack),
                encoder_stream_id: None,
                decoder_stream_id: None,
            }
        }

        pub fn new_with_conn(conn: Connection) -> Self {
            let qpack = Rc::new(RefCell::new(QPackEncoder::new(
                &QpackSettings {
                    max_table_size_encoder: 128,
                    max_table_size_decoder: 128,
                    max_blocked_streams: 0,
                },
                true,
            )));
            Self {
                settings: HFrame::Settings {
                    settings: HSettings::new(&[]),
                },
                conn,
                control_stream_id: None,
                encoder: Rc::clone(&qpack),
                encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack),
                encoder_stream_id: None,
                decoder_stream_id: None,
            }
        }

        pub fn create_qpack_streams(&mut self) {
            // Create a QPACK encoder stream
            self.encoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap());
            self.encoder
                .borrow_mut()
                .add_send_stream(self.encoder_stream_id.unwrap());
            self.encoder
                .borrow_mut()
                .send_encoder_updates(&mut self.conn)
                .unwrap();

            // Create decoder stream
            self.decoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap());
            assert_eq!(
                self.conn
                    .stream_send(self.decoder_stream_id.unwrap(), DECODER_STREAM_DATA)
                    .unwrap(),
                1
            );
        }

        pub fn create_control_stream(&mut self) {
            // Create control stream
            let control = self.conn.stream_create(StreamType::UniDi).unwrap();
            qtrace!(["TestServer"], "control stream: {}", control);
            self.control_stream_id = Some(control);
            // Send stream type on the control stream.
            assert_eq!(
                self.conn
                    .stream_send(self.control_stream_id.unwrap(), CONTROL_STREAM_TYPE)
                    .unwrap(),
                1
            );

            // Encode a settings frame and send it.
            let mut enc = Encoder::default();
            self.settings.encode(&mut enc);
            assert_eq!(
                self.conn
                    .stream_send(self.control_stream_id.unwrap(), enc.as_ref())
                    .unwrap(),
                enc.len()
            );
        }

        pub fn check_client_control_qpack_streams_no_resumption(&mut self) {
            self.check_client_control_qpack_streams(
                ENCODER_STREAM_DATA,
                EXPECTED_REQUEST_HEADER_FRAME,
                false,
                true,
            );
        }

        pub fn check_control_qpack_request_streams_resumption(
            &mut self,
            expect_encoder_stream_data: &[u8],
            expect_request_header: &[u8],
            expect_request: bool,
        ) {
            self.check_client_control_qpack_streams(
                expect_encoder_stream_data,
                expect_request_header,
                expect_request,
                false,
            );
        }

        // Check that server has received correct settings and qpack streams.
        pub fn check_client_control_qpack_streams(
            &mut self,
            expect_encoder_stream_data: &[u8],
            expect_request_header: &[u8],
            expect_request: bool,
            expect_connected: bool,
        ) {
            let mut connected = false;
            let mut control_stream = false;
            let mut qpack_decoder_stream = false;
            let mut qpack_encoder_stream = false;
            let mut request = false;
            while let Some(e) = self.conn.next_event() {
                match e {
                    ConnectionEvent::NewStream { stream_id }
                    | ConnectionEvent::SendStreamWritable { stream_id } => {
                        if expect_request {
                            assert!(matches!(stream_id.as_u64(), 2 | 6 | 10 | 0));
                        } else {
                            assert!(matches!(stream_id.as_u64(), 2 | 6 | 10));
                        }
                    }
                    ConnectionEvent::RecvStreamReadable { stream_id } => {
                        if stream_id == CLIENT_SIDE_CONTROL_STREAM_ID {
                            self.check_control_stream();
                            control_stream = true;
                        } else if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID {
                            // the qpack encoder stream
                            self.read_and_check_stream_data(
                                stream_id,
                                expect_encoder_stream_data,
                                false,
                            );
                            qpack_encoder_stream = true;
                        } else if stream_id == CLIENT_SIDE_DECODER_STREAM_ID {
                            // the qpack decoder stream
                            self.read_and_check_stream_data(stream_id, DECODER_STREAM_DATA, false);
                            qpack_decoder_stream = true;
                        } else if stream_id == 0 {
                            assert!(expect_request);
                            self.read_and_check_stream_data(stream_id, expect_request_header, true);
                            request = true;
                        } else {
                            panic!("unexpected event");
                        }
                    }
                    ConnectionEvent::StateChange(State::Connected) => connected = true,
                    ConnectionEvent::StateChange(_)
                    | ConnectionEvent::SendStreamCreatable { .. } => {}
                    _ => panic!("unexpected event"),
                }
            }
            assert_eq!(connected, expect_connected);
            assert!(control_stream);
            assert!(qpack_encoder_stream);
            assert!(qpack_decoder_stream);
            assert_eq!(request, expect_request);
        }

        // Check that the control stream contains default values.
        // Expect a SETTINGS frame, some grease, and a MAX_PUSH_ID frame.
        // The default test configuration uses:
        //  - max_table_capacity = 100
        //  - max_blocked_streams = 100
        // and a maximum of 5 push streams.
        fn check_control_stream(&mut self) {
            let mut buf = [0_u8; 100];
            let (amount, fin) = self
                .conn
                .stream_recv(CLIENT_SIDE_CONTROL_STREAM_ID, &mut buf)
                .unwrap();
            let mut dec = Decoder::from(&buf[..amount]);
            assert_eq!(dec.decode_varint().unwrap(), 0); // control stream type
            assert_eq!(dec.decode_varint().unwrap(), 4); // SETTINGS
            assert_eq!(
                dec.decode_vvec().unwrap(),
                &[1, 0x40, 0x64, 7, 0x40, 0x64, 0xab, 0x60, 0x37, 0x42, 0x00]
            );

            assert_eq!((dec.decode_varint().unwrap() - 0x21) % 0x1f, 0); // Grease
            assert!(dec.decode_vvec().unwrap().len() < 8);

            assert_eq!(dec.decode_varint().unwrap(), 0xd); // MAX_PUSH_ID
            assert_eq!(dec.decode_vvec().unwrap(), &[5]);

            assert_eq!(dec.remaining(), 0);
            assert!(!fin);
        }

        pub fn read_and_check_stream_data(
            &mut self,
            stream_id: StreamId,
            expected_data: &[u8],
            expected_fin: bool,
        ) {
            let mut buf = [0_u8; 100];
            let (amount, fin) = self.conn.stream_recv(stream_id, &mut buf).unwrap();
            assert_eq!(fin, expected_fin);
            assert_eq!(amount, expected_data.len());
            assert_eq!(&buf[..amount], expected_data);
        }

        pub fn encode_headers(
            &mut self,
            stream_id: StreamId,
            headers: &[Header],
            encoder: &mut Encoder,
        ) {
            let header_block =
                self.encoder
                    .borrow_mut()
                    .encode_header_block(&mut self.conn, headers, stream_id);
            let hframe = HFrame::Headers {
                header_block: header_block.as_ref().to_vec(),
            };
            hframe.encode(encoder);
        }
    }

    fn handshake_only(client: &mut Http3Client, server: &mut TestServer) -> Output {
        assert_eq!(client.state(), Http3State::Initializing);
        let out = client.process_output(now());
        assert_eq!(client.state(), Http3State::Initializing);

        assert_eq!(*server.conn.state(), State::Init);
        let out = server.conn.process(out.dgram(), now());
        assert_eq!(*server.conn.state(), State::Handshaking);

        let out = client.process(out.dgram(), now());
        let out = server.conn.process(out.dgram(), now());
        assert!(out.as_dgram_ref().is_none());

        let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded);
        assert!(client.events().any(authentication_needed));
        client.authenticated(AuthenticationStatus::Ok, now());
        out
    }

    // Perform only Quic transport handshake.
    fn connect_only_transport_with(client: &mut Http3Client, server: &mut TestServer) {
        let out = handshake_only(client, server);

        let out = client.process(out.dgram(), now());
        let connected = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::Connected));
        assert!(client.events().any(connected));

        assert_eq!(client.state(), Http3State::Connected);
        server.conn.process_input(out.dgram().unwrap(), now());
        assert!(server.conn.state().connected());
    }

    // Perform only Quic transport handshake.
    fn connect_only_transport() -> (Http3Client, TestServer) {
        let mut client = default_http3_client();
        let mut server = TestServer::new();
        connect_only_transport_with(&mut client, &mut server);
        (client, server)
    }

    fn send_and_receive_client_settings(client: &mut Http3Client, server: &mut TestServer) {
        // send and receive client settings
        let out = client.process_output(now());
        server.conn.process_input(out.dgram().unwrap(), now());
        server.check_client_control_qpack_streams_no_resumption();
    }

    // Perform Quic transport handshake and exchange Http3 settings.
    fn connect_with(client: &mut Http3Client, server: &mut TestServer) {
        connect_only_transport_with(client, server);

        send_and_receive_client_settings(client, server);

        server.create_control_stream();

        server.create_qpack_streams();
        // Send the server's control and qpack streams data.
        let out = server.conn.process(None::<Datagram>, now());
        client.process_input(out.dgram().unwrap(), now());

        // assert no error occured.
        assert_eq!(client.state(), Http3State::Connected);
    }

    // Perform Quic transport handshake and exchange Http3 settings.
    fn connect_with_connection_parameters(
        server_conn_params: ConnectionParameters,
    ) -> (Http3Client, TestServer) {
        // connecting with default max_table_size
        let mut client = default_http3_client_param(100);
        let server = Connection::new_server(
            test_fixture::DEFAULT_KEYS,
            test_fixture::DEFAULT_ALPN_H3,
            Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
--> --------------------

--> maximum size reached

--> --------------------

[ Seitenstruktur0.106Drucken  ]