Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/neqo-http3/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 261 kB image not shown  

Impressum connection_client.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{
    cell::RefCell,
    fmt::{Debug, Display},
    iter, mem,
    net::SocketAddr,
    rc::Rc,
    time::Instant,
};

use neqo_common::{
    event::Provider as EventProvider, hex, hex_with_len, qdebug, qinfo, qlog::NeqoQlog, qtrace,
    Datagram, Decoder, Encoder, Header, MessageType, Role,
};
use neqo_crypto::{agent::CertificateInfo, AuthenticationStatus, ResumptionToken, SecretAgentInfo};
use neqo_qpack::Stats as QpackStats;
use neqo_transport::{
    streams::SendOrder, AppError, Connection, ConnectionEvent, ConnectionId, ConnectionIdGenerator,
    DatagramTracking, Output, RecvStreamStats, SendStreamStats, Stats as TransportStats, StreamId,
    StreamType, Version, ZeroRttState,
};

use crate::{
    client_events::{Http3ClientEvent, Http3ClientEvents},
    connection::{Http3Connection, Http3State, RequestDescription},
    frames::HFrame,
    push_controller::{PushController, RecvPushEvents},
    recv_message::{RecvMessage, RecvMessageInfo},
    request_target::AsRequestTarget,
    settings::HSettings,
    Error, Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler,
    ReceiveOutput, Res,
};

// This is used for filtering send_streams and recv_Streams with a stream_ids greater than or equal
// a given id. Only the same type (bidirectional or unidirectionsl) streams are filtered.
fn id_gte<U>(base: StreamId) -> impl FnMut((&StreamId, &U)) -> Option<StreamId> + 'static
where
    U: ?Sized,
{
    move |(id, _)| {
        if *id >= base && !(id.is_bidi() ^ base.is_bidi()) {
            Some(*id)
        } else {
            None
        }
    }
}

const fn alpn_from_quic_version(version: Version) -> &'static str {
    match version {
        Version::Version2 | Version::Version1 => "h3",
        Version::Draft29 => "h3-29",
        Version::Draft30 => "h3-30",
        Version::Draft31 => "h3-31",
        Version::Draft32 => "h3-32",
    }
}

/// # The HTTP/3 client API
///
/// This module implements the HTTP/3 client API. The main implementation of the protocol is in
/// [connection.rs](https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs) which
/// implements common behavior for the client-side and the server-side. `Http3Client` structure
/// implements the public API and set of functions that differ between the client and the server.
///
/// The API is used for:
/// - create and close an endpoint:
///   - [`Http3Client::new`]
///   - [`Http3Client::new_with_conn`]
///   - [`Http3Client::close`]
/// - configuring an endpoint:
///   - [`Http3Client::authenticated`]
///   - [`Http3Client::enable_ech`]
///   - [`Http3Client::enable_resumption`]
///   - [`Http3Client::initiate_key_update`]
///   - [`Http3Client::set_qlog`]
/// - retrieving information about a connection:
/// - [`Http3Client::peer_certificate`]
///   - [`Http3Client::qpack_decoder_stats`]
///   - [`Http3Client::qpack_encoder_stats`]
///   - [`Http3Client::transport_stats`]
///   - [`Http3Client::state`]
///   - [`Http3Client::take_resumption_token`]
///   - [`Http3Client::tls_info`]
/// - driving HTTP/3 session:
///   - [`Http3Client::process_output`]
///   - [`Http3Client::process_input`]
///   - [`Http3Client::process`]
/// - create requests, send/receive data, and cancel requests:
///   - [`Http3Client::fetch`]
///   - [`Http3Client::send_data`]
///   - [`Http3Client::read_data`]
///   - [`Http3Client::stream_close_send`]
///   - [`Http3Client::cancel_fetch`]
///   - [`Http3Client::stream_reset_send`]
///   - [`Http3Client::stream_stop_sending`]
///   - [`Http3Client::set_stream_max_data`]
/// - priority feature:
///   - [`Http3Client::priority_update`]
/// - `WebTransport` feature:
///   - [`Http3Client::webtransport_create_session`]
///   - [`Http3Client::webtransport_close_session`]
///   - [`Http3Client::webtransport_create_stream`]
///   - [`Http3Client::webtransport_enabled`]
///
/// ## Examples
///
/// ### Fetching a resource
///
/// ```ignore
/// let mut client = Http3Client::new(...);
///
/// // Perform a handshake
/// ...
///
/// let req = client
///     .fetch(
///         Instant::now(),
///         "GET",
///         &("https", "something.com", "/"),
///         &[Header::new("example1", "value1"), Header::new("example1", "value2")],
///         Priority::default(),
///     )
///     .unwrap();
///
/// client.stream_close_send(req).unwrap();
///
/// loop {
///     // exchange packets
///     ...
///
///     while let Some(event) = client.next_event() {
///         match event {
///             Http3ClientEvent::HeaderReady { stream_id, headers, interim, fin } => {
///                 println!("New response headers received for stream {:?} [fin={?}, interim={:?}]: {:?}",
///                     stream_id,
///                     fin,
///                     interim,
///                     headers,
///                 );
///             }
///             Http3ClientEvent::DataReadable { stream_id } => {
///                 println!("New data available on stream {}", stream_id);
///                let mut buf = [0; 100];
///                let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
///                 println!("Read {:?} bytes from stream {:?} [fin={?}]",
///                     amount,
///                     stream_id,
///                     fin,
///                 );
///             }
///             _ => {
///                 println!("Unhandled event {:?}", event);
///             }
///         }
///     }
/// }
/// ```
///
/// ### Creating a `WebTransport` session
///
/// ```ignore
/// let mut client = Http3Client::new(...);
///
/// // Perform a handshake
/// ...
///
/// // Create a session
/// let wt_session_id = client
///     .webtransport_create_session(now(), &("https", "something.com", "/"), &[])
///     .unwrap();
///
/// loop {
///     // exchange packets
///     ...
///
///     while let Some(event) = client.next_event() {
///         match event {
///             Http3ClientEvent::WebTransport(WebTransportEvent::Session{
///                 stream_id,
///                 status,
///                 ..
///             }) => {
///                 println!("The response from the server: WebTransport session ID {:?} status={:?}",
///                     stream_id,
///                     status,
///                 );
///             }
///             _ => {
///                 println!("Unhandled event {:?}", event);
///             }
///         }
///     }
/// }
/// ```
///
/// ### `WebTransport`: create a stream, send and receive data on the stream
///
/// ```ignore
/// const BUF_CLIENT: &[u8] = &[0; 10];
/// // wt_session_id is the session ID of a newly created WebTransport session, see the example above.
///
/// // create a  stream
/// let wt_stream_id = client
///     .webtransport_create_stream(wt_session_id, StreamType::BiDi)
///     .unwrap();
///
/// // send data
/// let data_sent = client.send_data(wt_stream_id, BUF_CLIENT).unwrap();
/// assert_eq!(data_sent, BUF_CLIENT.len());
///
/// // close stream for sending
/// client.stream_close_send(wt_stream_id).unwrap();
///
/// // wait for data from the server
/// loop {
///     // exchange packets
///     ...
///
///     while let Some(event) = client.next_event() {
///         match event {
///             Http3ClientEvent::DataReadable{ stream_id } => {
///                 println!("Data receivedd form the server on WebTransport stream ID {:?}",
///                     stream_id,
///                 );
///                 let mut buf = [0; 100];
///                 let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
///                 println!("Read {:?} bytes from stream {:?} [fin={?}]",
///                     amount,
///                     stream_id,
///                     fin,
///                 );
///             }
///             _ => {
///                 println!("Unhandled event {:?}", event);
///             }
///         }
///     }
/// }
/// ```
///
/// ### `WebTransport`: receive a new stream form the server
///
/// ```ignore
/// // wt_session_id is the session ID of a newly created WebTransport session, see the example above.
///
/// // wait for a new stream from the server
/// loop {
///     // exchange packets
///     ...
///
///     while let Some(event) = client.next_event() {
///         match event {
///             Http3ClientEvent::WebTransport(WebTransportEvent::NewStream {
///                 stream_id,
///                 session_id,
///             }) => {
///                 println!("New stream received on session{:?}, stream id={:?} stream type={:?}",
///                     sesson_id.stream_id(),
///                     stream_id.stream_id(),
///                     stream_id.stream_type()
///                 );
///             }
///             Http3ClientEvent::DataReadable{ stream_id } => {
///                 println!("Data receivedd form the server on WebTransport stream ID {:?}",
///                     stream_id,
///                 );
///                 let mut buf = [0; 100];
///                 let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
///                 println!("Read {:?} bytes from stream {:?} [fin={:?}]",
///                     amount,
///                     stream_id,
///                     fin,
///                 );
///             }
///             _ => {
///                 println!("Unhandled event {:?}", event);
///             }
///         }
///     }
/// }
/// ```
pub struct Http3Client {
    conn: Connection,
    base_handler: Http3Connection,
    events: Http3ClientEvents,
    push_handler: Rc<RefCell<PushController>>,
}

impl Display for Http3Client {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "Http3 client")
    }
}

impl Http3Client {
    /// # Errors
    ///
    /// Making a `neqo-transport::connection` may produce an error. This can only be a crypto error
    /// if the crypto context can't be created or configured.
    pub fn new(
        server_name: impl Into<String>,
        cid_manager: Rc<RefCell<dyn ConnectionIdGenerator>>,
        local_addr: SocketAddr,
        remote_addr: SocketAddr,
        http3_parameters: Http3Parameters,
        now: Instant,
    ) -> Res<Self> {
        Ok(Self::new_with_conn(
            Connection::new_client(
                server_name,
                &[alpn_from_quic_version(
                    http3_parameters
                        .get_connection_parameters()
                        .get_versions()
                        .initial(),
                )],
                cid_manager,
                local_addr,
                remote_addr,
                http3_parameters.get_connection_parameters().clone(),
                now,
            )?,
            http3_parameters,
        ))
    }

    /// This is a similar function to `new`. In this case, `neqo-transport::connection` has been
    /// already created.
    ///
    /// It is recommended to use `new` instead.
    #[must_use]
    pub fn new_with_conn(c: Connection, http3_parameters: Http3Parameters) -> Self {
        let events = Http3ClientEvents::default();
        let webtransport = http3_parameters.get_webtransport();
        let push_streams = http3_parameters.get_max_concurrent_push_streams();
        let mut base_handler = Http3Connection::new(http3_parameters, Role::Client);
        if webtransport {
            base_handler.set_features_listener(events.clone());
        }
        Self {
            conn: c,
            events: events.clone(),
            push_handler: Rc::new(RefCell::new(PushController::new(push_streams, events))),
            base_handler,
        }
    }

    #[must_use]
    pub const fn role(&self) -> Role {
        self.conn.role()
    }

    /// The function returns the current state of the connection.
    #[must_use]
    pub fn state(&self) -> Http3State {
        self.base_handler.state()
    }

    #[must_use]
    pub fn tls_info(&self) -> Option<&SecretAgentInfo> {
        self.conn.tls_info()
    }

    /// Get the peer's certificate.
    #[must_use]
    pub fn peer_certificate(&self) -> Option<CertificateInfo> {
        self.conn.peer_certificate()
    }

    /// This called when peer certificates have been verified.
    ///
    /// `Http3ClientEvent::AuthenticationNeeded` event is emitted when peer’s certificates are
    /// available and need to be verified. When the verification is completed this function is
    /// called. To inform HTTP/3 session of the verification results.
    pub fn authenticated(&mut self, status: AuthenticationStatus, now: Instant) {
        self.conn.authenticated(status, now);
    }

    pub fn set_qlog(&mut self, qlog: NeqoQlog) {
        self.conn.set_qlog(qlog);
    }

    /// Enable encrypted client hello (ECH).
    ///
    /// # Errors
    ///
    /// Fails when the configuration provided is bad.
    pub fn enable_ech(&mut self, ech_config_list: impl AsRef<[u8]>) -> Res<()> {
        self.conn.client_enable_ech(ech_config_list)?;
        Ok(())
    }

    /// Get the connection id, which is useful for disambiguating connections to
    /// the same origin.
    ///
    /// # Panics
    ///
    /// Never, because clients always have this field.
    #[must_use]
    pub fn connection_id(&self) -> &ConnectionId {
        self.conn.odcid().expect("Client always has odcid")
    }

    fn encode_resumption_token(&self, token: &ResumptionToken) -> Option<ResumptionToken> {
        self.base_handler.get_settings().map(|settings| {
            let mut enc = Encoder::default();
            settings.encode_frame_contents(&mut enc);
            enc.encode(token.as_ref());
            ResumptionToken::new(enc.into(), token.expiration_time())
        })
    }

    /// The correct way to obtain a resumption token is to wait for the
    /// `Http3ClientEvent::ResumptionToken` event. To emit the event we are waiting for a
    /// resumtion token and a `NEW_TOKEN` frame to arrive. Some servers don't send `NEW_TOKEN`
    /// frames and in this case, we wait for 3xPTO before emitting an event. This is especially a
    /// problem for short-lived connections, where the connection is closed before any events are
    /// released. This function retrieves the token, without waiting for a `NEW_TOKEN` frame to
    /// arrive.
    ///
    /// In addition to the token, HTTP/3 settings are encoded into the token before giving it to
    /// the application(`encode_resumption_token`). When the resumption token is supplied to a new
    /// connection the HTTP/3 setting will be decoded and used until the setting are received from
    /// the server.
    pub fn take_resumption_token(&mut self, now: Instant) -> Option<ResumptionToken> {
        self.conn
            .take_resumption_token(now)
            .and_then(|t| self.encode_resumption_token(&t))
    }

    /// This may be call if an application has a resumption token. This must be called before
    /// connection starts.
    ///
    /// The resumption token also contains encoded HTTP/3 settings. The settings will be decoded
    /// and used until the setting are received from the server.
    ///
    /// # Errors
    ///
    /// An error is return if token cannot be decoded or a connection is is a wrong state.
    ///
    /// # Panics
    ///
    /// On closing if the base handler can't handle it (debug only).
    pub fn enable_resumption(&mut self, now: Instant, token: impl AsRef<[u8]>) -> Res<()> {
        if self.base_handler.state != Http3State::Initializing {
            return Err(Error::InvalidState);
        }
        let mut dec = Decoder::from(token.as_ref());
        let Some(settings_slice) = dec.decode_vvec() else {
            return Err(Error::InvalidResumptionToken);
        };
        qtrace!([self], "  settings {}", hex_with_len(settings_slice));
        let mut dec_settings = Decoder::from(settings_slice);
        let mut settings = HSettings::default();
        Error::map_error(
            settings.decode_frame_contents(&mut dec_settings),
            Error::InvalidResumptionToken,
        )?;
        let tok = dec.decode_remainder();
        qtrace!([self], "  Transport token {}", hex(tok));
        self.conn.enable_resumption(now, tok)?;
        if self.conn.state().closed() {
            let state = self.conn.state().clone();
            let res = self
                .base_handler
                .handle_state_change(&mut self.conn, &state);
            debug_assert_eq!(Ok(true), res);
            return Err(Error::FatalError);
        }
        if self.conn.zero_rtt_state() == ZeroRttState::Sending {
            self.base_handler
                .set_0rtt_settings(&mut self.conn, settings)?;
            self.events
                .connection_state_change(self.base_handler.state());
            self.push_handler
                .borrow_mut()
                .maybe_send_max_push_id_frame(&mut self.base_handler);
        }
        Ok(())
    }

    /// This is call to close a connection.
    pub fn close<S>(&mut self, now: Instant, error: AppError, msg: S)
    where
        S: AsRef<str> + Display,
    {
        qinfo!([self], "Close the connection error={} msg={}.", error, msg);
        if !matches!(
            self.base_handler.state,
            Http3State::Closing(_) | Http3State::Closed(_)
        ) {
            self.push_handler.borrow_mut().clear();
            self.conn.close(now, error, msg);
            self.base_handler.close(error);
            self.events
                .connection_state_change(self.base_handler.state());
        }
    }

    /// Attempt to force a key update.
    ///
    /// # Errors
    ///
    /// If the connection isn't confirmed, or there is an outstanding key update, this
    /// returns `Err(Error::TransportError(neqo_transport::Error::KeyUpdateBlocked))`.
    pub fn initiate_key_update(&mut self) -> Res<()> {
        self.conn.initiate_key_update()?;
        Ok(())
    }

    // API: Request/response

    /// The function fetches a resource using `method`, `target` and `headers`. A response body
    /// may be added by calling `send_data`. `stream_close_send` must be sent to finish the request
    /// even if request data are not sent.
    ///
    /// # Errors
    ///
    /// If a new stream cannot be created an error will be return.
    ///
    /// # Panics
    ///
    /// `SendMessage` implements `http_stream` so it will not panic.
    pub fn fetch<'x, 't: 'x, T>(
        &mut self,
        now: Instant,
        method: &'t str,
        target: &'t T,
        headers: &'t [Header],
        priority: Priority,
    ) -> Res<StreamId>
    where
        T: AsRequestTarget<'x> + ?Sized + Debug,
    {
        let output = self.base_handler.fetch(
            &mut self.conn,
            Box::new(self.events.clone()),
            Box::new(self.events.clone()),
            Some(Rc::clone(&self.push_handler)),
            &RequestDescription {
                method,
                connect_type: None,
                target,
                headers,
                priority,
            },
        );
        if let Err(e) = &output {
            if e.connection_error() {
                self.close(now, e.code(), "");
            }
        }
        output
    }

    /// Send an [`PRIORITY_UPDATE`-frame][1] on next `Http3Client::process_output()` call.
    /// Returns if the priority got changed.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist
    ///
    /// [1]: https://datatracker.ietf.org/doc/html/draft-kazuho-httpbis-priority-04#section-5.2
    pub fn priority_update(&mut self, stream_id: StreamId, priority: Priority) -> Res<bool> {
        self.base_handler.queue_update_priority(stream_id, priority)
    }

    /// An application may cancel a stream(request).
    /// Both sides, the receiviing and sending side, sending and receiving side, will be closed.
    ///
    /// # Errors
    ///
    /// An error will be return if a stream does not exist.
    pub fn cancel_fetch(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
        qinfo!([self], "reset_stream {} error={}.", stream_id, error);
        self.base_handler
            .cancel_fetch(stream_id, error, &mut self.conn)
    }

    /// This is call when application is done sending a request.
    ///
    /// # Errors
    ///
    /// An error will be return if stream does not exist.
    pub fn stream_close_send(&mut self, stream_id: StreamId) -> Res<()> {
        qdebug!([self], "Close sending side stream={}.", stream_id);
        self.base_handler
            .stream_close_send(&mut self.conn, stream_id)
    }

    /// # Errors
    ///
    /// An error will be return if a stream does not exist.
    pub fn stream_reset_send(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
        qinfo!([self], "stream_reset_send {} error={}.", stream_id, error);
        self.base_handler
            .stream_reset_send(&mut self.conn, stream_id, error)
    }

    /// # Errors
    ///
    /// An error will be return if a stream does not exist.
    pub fn stream_stop_sending(&mut self, stream_id: StreamId, error: AppError) -> Res<()> {
        qinfo!([self], "stream_stop_sending {} error={}.", stream_id, error);
        self.base_handler
            .stream_stop_sending(&mut self.conn, stream_id, error)
    }

    /// This function is used for regular HTTP requests and `WebTransport` streams.
    /// In the case of regular HTTP requests, the request body is supplied using this function, and
    /// headers are supplied through the `fetch` function.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist,
    /// `AlreadyClosed` if the stream has already been closed.
    /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
    /// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
    /// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
    /// supplied.
    pub fn send_data(&mut self, stream_id: StreamId, buf: &[u8]) -> Res<usize> {
        qinfo!(
            [self],
            "send_data from stream {} sending {} bytes.",
            stream_id,
            buf.len()
        );
        self.base_handler
            .send_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?
            .send_data(&mut self.conn, buf)
    }

    /// Response data are read directly into a buffer supplied as a parameter of this function to
    /// avoid copying data.
    ///
    /// # Errors
    ///
    /// It returns an error if a stream does not exist or an error happen while reading a stream,
    /// e.g. early close, protocol error, etc.
    pub fn read_data(
        &mut self,
        now: Instant,
        stream_id: StreamId,
        buf: &mut [u8],
    ) -> Res<(usize, bool)> {
        qdebug!([self], "read_data from stream {}.", stream_id);
        let res = self.base_handler.read_data(&mut self.conn, stream_id, buf);
        if let Err(e) = &res {
            if e.connection_error() {
                self.close(now, e.code(), "");
            }
        }
        res
    }

    // API: Push streams

    /// Cancel a push
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist.
    pub fn cancel_push(&mut self, push_id: u64) -> Res<()> {
        self.push_handler
            .borrow_mut()
            .cancel(push_id, &mut self.conn, &mut self.base_handler)
    }

    /// Push response data are read directly into a buffer supplied as a parameter of this function
    /// to avoid copying data.
    ///
    /// # Errors
    ///
    /// It returns an error if a stream does not exist(`InvalidStreamId`) or an error has happened
    /// while reading a stream, e.g. early close, protocol error, etc.
    pub fn push_read_data(
        &mut self,
        now: Instant,
        push_id: u64,
        buf: &mut [u8],
    ) -> Res<(usize, bool)> {
        let stream_id = self
            .push_handler
            .borrow_mut()
            .get_active_stream_id(push_id)
            .ok_or(Error::InvalidStreamId)?;
        self.conn.stream_keep_alive(stream_id, true)?;
        self.read_data(now, stream_id, buf)
    }

    // API WebTransport
    //
    /// # Errors
    ///
    /// If `WebTransport` cannot be created, e.g. the `WebTransport` support is
    /// not negotiated or the HTTP/3 connection is closed.
    pub fn webtransport_create_session<'x, 't: 'x, T>(
        &mut self,
        now: Instant,
        target: &'t T,
        headers: &'t [Header],
    ) -> Res<StreamId>
    where
        T: AsRequestTarget<'x> + ?Sized + Debug,
    {
        let output = self.base_handler.webtransport_create_session(
            &mut self.conn,
            Box::new(self.events.clone()),
            target,
            headers,
        );

        if let Err(e) = &output {
            if e.connection_error() {
                self.close(now, e.code(), "");
            }
        }
        output
    }

    /// Close `WebTransport` cleanly
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist,
    /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
    /// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
    /// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
    /// supplied.
    pub fn webtransport_close_session(
        &mut self,
        session_id: StreamId,
        error: u32,
        message: &str,
    ) -> Res<()> {
        self.base_handler
            .webtransport_close_session(&mut self.conn, session_id, error, message)
    }

    /// # Errors
    ///
    /// This may return an error if the particular session does not exist
    /// or the connection is not in the active state.
    pub fn webtransport_create_stream(
        &mut self,
        session_id: StreamId,
        stream_type: StreamType,
    ) -> Res<StreamId> {
        self.base_handler.webtransport_create_stream_local(
            &mut self.conn,
            session_id,
            stream_type,
            Box::new(self.events.clone()),
            Box::new(self.events.clone()),
        )
    }

    /// Send `WebTransport` datagram.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    /// The function returns `TooMuchData` if the supply buffer is bigger than
    /// the allowed remote datagram size.
    pub fn webtransport_send_datagram(
        &mut self,
        session_id: StreamId,
        buf: &[u8],
        id: impl Into<DatagramTracking>,
    ) -> Res<()> {
        qtrace!("webtransport_send_datagram session:{:?}", session_id);
        self.base_handler
            .webtransport_send_datagram(session_id, &mut self.conn, buf, id)
    }

    /// Returns the current max size of a datagram that can fit into a packet.
    /// The value will change over time depending on the encoded size of the
    /// packet number, ack frames, etc.
    ///
    /// # Errors
    ///
    /// The function returns `NotAvailable` if datagrams are not enabled.
    ///
    /// # Panics
    ///
    /// This cannot panic. The max varint length is 8.
    pub fn webtransport_max_datagram_size(&self, session_id: StreamId) -> Res<u64> {
        Ok(self.conn.max_datagram_size()?
            - u64::try_from(Encoder::varint_len(session_id.as_u64())).unwrap())
    }

    /// Sets the `SendOrder` for a given stream
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    ///
    /// # Panics
    ///
    /// This cannot panic.
    pub fn webtransport_set_sendorder(
        &mut self,
        stream_id: StreamId,
        sendorder: Option<SendOrder>,
    ) -> Res<()> {
        Http3Connection::stream_set_sendorder(&mut self.conn, stream_id, sendorder)
    }

    /// Sets the `Fairness` for a given stream
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    ///
    /// # Panics
    ///
    /// This cannot panic.
    pub fn webtransport_set_fairness(&mut self, stream_id: StreamId, fairness: bool) -> Res<()> {
        Http3Connection::stream_set_fairness(&mut self.conn, stream_id, fairness)
    }

    /// Returns the current `SendStreamStats` of a `WebTransportSendStream`.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist.
    pub fn webtransport_send_stream_stats(&mut self, stream_id: StreamId) -> Res<SendStreamStats> {
        self.base_handler
            .send_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?
            .stats(&mut self.conn)
    }

    /// Returns the current `RecvStreamStats` of a `WebTransportRecvStream`.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist.
    pub fn webtransport_recv_stream_stats(&mut self, stream_id: StreamId) -> Res<RecvStreamStats> {
        self.base_handler
            .recv_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?
            .stats(&mut self.conn)
    }

    /// This function combines  `process_input` and `process_output` function.
    pub fn process(&mut self, dgram: Option<Datagram<impl AsRef<[u8]>>>, now: Instant) -> Output {
        qtrace!([self], "Process.");
        if let Some(d) = dgram {
            self.process_input(d, now);
        }
        self.process_output(now)
    }

    /// The function should be called when there is a new UDP packet available. The function will
    /// handle the packet payload.
    ///
    /// First, the payload will be handled by the QUIC layer. Afterward, `process_http3` will be
    /// called to handle new [`ConnectionEvent`][1]s.
    ///
    /// After this function is called `process_output` should be called to check whether new
    /// packets need to be sent or if a timer needs to be updated.
    ///
    /// [1]: ../neqo_transport/enum.ConnectionEvent.html
    pub fn process_input(&mut self, dgram: Datagram<impl AsRef<[u8]>>, now: Instant) {
        self.process_multiple_input(iter::once(dgram), now);
    }

    pub fn process_multiple_input(
        &mut self,
        dgrams: impl IntoIterator<Item = Datagram<impl AsRef<[u8]>>>,
        now: Instant,
    ) {
        let mut dgrams = dgrams.into_iter().peekable();
        qtrace!([self], "Process multiple datagrams");
        if dgrams.peek().is_none() {
            return;
        }
        self.conn.process_multiple_input(dgrams, now);
        self.process_http3(now);
    }

    /// Process HTTP3 layer.
    /// When `process_output`, `process_input`, or `process` is called we must call this function
    /// as well. The functions calls `Http3Client::check_connection_events` to handle events from
    /// the QUC layer and calls `Http3Connection::process_sending` to ensure that HTTP/3 layer
    /// data, e.g. control frames, are sent.
    fn process_http3(&mut self, now: Instant) {
        qtrace!([self], "Process http3 internal.");
        match self.base_handler.state() {
            Http3State::ZeroRtt | Http3State::Connected | Http3State::GoingAway(..) => {
                let res = self.check_connection_events();
                if self.check_result(now, &res) {
                    return;
                }
                self.push_handler
                    .borrow_mut()
                    .maybe_send_max_push_id_frame(&mut self.base_handler);
                let res = self.base_handler.process_sending(&mut self.conn);
                self.check_result(now, &res);
            }
            Http3State::Closed { .. } => {}
            _ => {
                let res = self.check_connection_events();
                _ = self.check_result(now, &res);
            }
        }
    }

    /// The function should be called to check if there is a new UDP packet to be sent. It should
    /// be called after a new packet is received and processed and after a timer expires (QUIC
    /// needs timers to handle events like PTO detection and timers are not implemented by the neqo
    /// library, but instead must be driven by the application).
    ///
    /// `process_output` can return:
    /// - a [`Output::Datagram(Datagram)`][1]: data that should be sent as a UDP payload,
    /// - a [`Output::Callback(Duration)`][1]: the duration of a  timer. `process_output` should be
    ///   called at least after the time expires,
    /// - [`Output::None`][1]: this is returned when `Nttp3Client` is done and can be destroyed.
    ///
    /// The application should call this function repeatedly until a timer value or None is
    /// returned. After that, the application should call the function again if a new UDP packet is
    /// received and processed or the timer value expires.
    ///
    /// The HTTP/3 neqo implementation drives the HTTP/3 and QUIC layers, therefore this function
    /// will call both layers:
    ///  - First it calls HTTP/3 layer processing (`process_http3`) to make sure the layer writes
    ///    data to QUIC layer or cancels streams if needed.
    ///  - Then QUIC layer processing is called - [`Connection::process_output`][3]. This produces a
    ///    packet or a timer value. It may also produce new [`ConnectionEvent`][2]s, e.g. connection
    ///    state-change event.
    ///  - Therefore the HTTP/3 layer processing (`process_http3`) is called again.
    ///
    /// [1]: ../neqo_transport/enum.Output.html
    /// [2]: ../neqo_transport/struct.ConnectionEvents.html
    /// [3]: ../neqo_transport/struct.Connection.html#method.process_output
    pub fn process_output(&mut self, now: Instant) -> Output {
        qtrace!([self], "Process output.");

        // Maybe send() stuff on http3-managed streams
        self.process_http3(now);

        let out = self.conn.process_output(now);

        // Update H3 for any transport state changes and events
        self.process_http3(now);

        out
    }

    /// This function takes the provided result and check for an error.
    /// An error results in closing the connection.
    fn check_result<ERR>(&mut self, now: Instant, res: &Res<ERR>) -> bool {
        match &res {
            Err(Error::HttpGoaway) => {
                qinfo!([self], "Connection error: goaway stream_id increased.");
                self.close(
                    now,
                    Error::HttpGeneralProtocol.code(),
                    "Connection error: goaway stream_id increased",
                );
                true
            }
            Err(e) => {
                qinfo!([self], "Connection error: {}.", e);
                self.close(now, e.code(), format!("{e}"));
                true
            }
            _ => false,
        }
    }

    /// This function checks [`ConnectionEvent`][2]s emitted by the QUIC layer, e.g. connection
    /// change state events, new incoming stream data is available, a stream is was reset, etc.
    /// The HTTP/3 layer needs to handle these events. Most of the events are handled by
    /// [`Http3Connection`][1] by calling appropriate functions, e.g. `handle_state_change`,
    /// `handle_stream_reset`, etc. [`Http3Connection`][1] handle functionalities that are common
    /// for the client and server side. Some of the functionalities are specific to the client and
    /// they are handled by `Http3Client`. For example, [`ConnectionEvent::RecvStreamReadable`][3]
    /// event is handled by `Http3Client::handle_stream_readable`. The  function calls
    /// `Http3Connection::handle_stream_readable` and then hands the return value as appropriate
    /// for the client-side.
    ///
    /// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs
    /// [2]: ../neqo_transport/enum.ConnectionEvent.html
    /// [3]: ../neqo_transport/enum.ConnectionEvent.html#variant.RecvStreamReadable
    fn check_connection_events(&mut self) -> Res<()> {
        qtrace!([self], "Check connection events.");
        while let Some(e) = self.conn.next_event() {
            qdebug!([self], "check_connection_events - event {:?}.", e);
            match e {
                ConnectionEvent::NewStream { stream_id } => {
                    // During this event we only add a new stream to the Http3Connection stream
                    // list, with NewStreamHeadReader stream handler.
                    // This function will not read from the stream and try to decode the stream.
                    // RecvStreamReadable  will be emitted after this event and reading, i.e.
                    // decoding of a stream will happen during that event.
                    self.base_handler.add_new_stream(stream_id);
                }
                ConnectionEvent::SendStreamWritable { stream_id } => {
                    if let Some(s) = self.base_handler.send_streams.get_mut(&stream_id) {
                        s.stream_writable();
                    }
                }
                ConnectionEvent::RecvStreamReadable { stream_id } => {
                    self.handle_stream_readable(stream_id)?;
                }
                ConnectionEvent::RecvStreamReset {
                    stream_id,
                    app_error,
                } => self
                    .base_handler
                    .handle_stream_reset(stream_id, app_error, &mut self.conn)?,
                ConnectionEvent::SendStreamStopSending {
                    stream_id,
                    app_error,
                } => self.base_handler.handle_stream_stop_sending(
                    stream_id,
                    app_error,
                    &mut self.conn,
                )?,

                ConnectionEvent::SendStreamCreatable { stream_type } => {
                    self.events.new_requests_creatable(stream_type);
                }
                ConnectionEvent::AuthenticationNeeded => self.events.authentication_needed(),
                ConnectionEvent::EchFallbackAuthenticationNeeded { public_name } => {
                    self.events.ech_fallback_authentication_needed(public_name);
                }
                ConnectionEvent::StateChange(state) => {
                    if self
                        .base_handler
                        .handle_state_change(&mut self.conn, &state)?
                    {
                        self.events
                            .connection_state_change(self.base_handler.state());
                    }
                }
                ConnectionEvent::ZeroRttRejected => {
                    self.base_handler.handle_zero_rtt_rejected()?;
                    self.events.zero_rtt_rejected();
                    self.push_handler.borrow_mut().handle_zero_rtt_rejected();
                }
                ConnectionEvent::ResumptionToken(token) => {
                    if let Some(t) = self.encode_resumption_token(&token) {
                        self.events.resumption_token(t);
                    }
                }
                ConnectionEvent::Datagram(dgram) => {
                    self.base_handler.handle_datagram(&dgram);
                }
                ConnectionEvent::SendStreamComplete { .. }
                | ConnectionEvent::OutgoingDatagramOutcome { .. }
                | ConnectionEvent::IncomingDatagramDropped => {}
            }
        }
        Ok(())
    }

    /// This function handled new data available on a stream. It calls
    /// `Http3Client::handle_stream_readable` and handles its response. Reading streams are mostly
    /// handled by [`Http3Connection`][1] because most part of it is common for the client and
    /// server. The following actions need to be handled by the client-specific code:
    ///  - `ReceiveOutput::NewStream(NewStreamType::Push(_))` - the server cannot receive a push
    ///    stream,
    ///  - `ReceiveOutput::NewStream(NewStreamType::Http)` - client cannot  receive a
    ///    server-initiated HTTP request,
    ///  - `ReceiveOutput::NewStream(NewStreamType::WebTransportStream(_))` - because
    ///    `Http3ClientEvents`is needed and events handler is specific to the client.
    ///  - `ReceiveOutput::ControlFrames(control_frames)` - some control frame handling differs
    ///    between the  client and the server:
    ///     - `HFrame::CancelPush` - only the client-side may receive it,
    ///     - `HFrame::MaxPushId { .. }`, `HFrame::PriorityUpdateRequest { .. } ` and
    ///       `HFrame::PriorityUpdatePush` can only be receive on the server side,
    ///     - `HFrame::Goaway { stream_id }` needs specific handling by the client by the protocol
    ///       specification.
    ///
    /// [1]: https://github.com/mozilla/neqo/blob/main/neqo-http3/src/connection.rs
    fn handle_stream_readable(&mut self, stream_id: StreamId) -> Res<()> {
        match self
            .base_handler
            .handle_stream_readable(&mut self.conn, stream_id)?
        {
            ReceiveOutput::NewStream(NewStreamType::Push(push_id)) => {
                self.handle_new_push_stream(stream_id, push_id)
            }
            ReceiveOutput::NewStream(NewStreamType::Http(_)) => Err(Error::HttpStreamCreation),
            ReceiveOutput::NewStream(NewStreamType::WebTransportStream(session_id)) => {
                self.base_handler.webtransport_create_stream_remote(
                    StreamId::from(session_id),
                    stream_id,
                    Box::new(self.events.clone()),
                    Box::new(self.events.clone()),
                )?;
                let res = self
                    .base_handler
                    .handle_stream_readable(&mut self.conn, stream_id)?;
                debug_assert!(matches!(res, ReceiveOutput::NoOutput));
                Ok(())
            }
            ReceiveOutput::ControlFrames(control_frames) => {
                for f in control_frames {
                    match f {
                        HFrame::CancelPush { push_id } => self
                            .push_handler
                            .borrow_mut()
                            .handle_cancel_push(push_id, &mut self.conn, &mut self.base_handler),
                        HFrame::MaxPushId { .. }
                        | HFrame::PriorityUpdateRequest { .. }
                        | HFrame::PriorityUpdatePush { .. } => Err(Error::HttpFrameUnexpected),
                        HFrame::Goaway { stream_id } => self.handle_goaway(stream_id),
                        _ => {
                            unreachable!(
                                "we should only put MaxPushId, Goaway and PriorityUpdates into control_frames."
                            );
                        }
                    }?;
                }
                Ok(())
            }
            _ => Ok(()),
        }
    }

    fn handle_new_push_stream(&mut self, stream_id: StreamId, push_id: u64) -> Res<()> {
        if !self.push_handler.borrow().can_receive_push() {
            return Err(Error::HttpId);
        }

        // Add a new push stream to `PushController`. `add_new_push_stream` may return an error
        // (this will be a connection error) or a bool.
        // If false is returned that means that the stream should be reset because the push has
        // been already canceled (CANCEL_PUSH frame or canceling push from the application).
        if !self
            .push_handler
            .borrow_mut()
            .add_new_push_stream(push_id, stream_id)?
        {
            // We are not interested in the result of stream_stop_sending, we are not interested
            // in this stream.
            mem::drop(
                self.conn
                    .stream_stop_sending(stream_id, Error::HttpRequestCancelled.code()),
            );
            return Ok(());
        }

        self.base_handler.add_recv_stream(
            stream_id,
            Box::new(RecvMessage::new(
                &RecvMessageInfo {
                    message_type: MessageType::Response,
                    stream_type: Http3StreamType::Push,
                    stream_id,
                    first_frame_type: None,
                },
                Rc::clone(&self.base_handler.qpack_decoder),
                Box::new(RecvPushEvents::new(push_id, Rc::clone(&self.push_handler))),
                None,
                // TODO: think about the right prority for the push streams.
                PriorityHandler::new(true, Priority::default()),
            )),
        );
        let res = self
            .base_handler
            .handle_stream_readable(&mut self.conn, stream_id)?;
        debug_assert!(matches!(res, ReceiveOutput::NoOutput));
        Ok(())
    }

    fn handle_goaway(&mut self, goaway_stream_id: StreamId) -> Res<()> {
        qinfo!([self], "handle_goaway {}", goaway_stream_id);

        if goaway_stream_id.is_uni() || goaway_stream_id.is_server_initiated() {
            return Err(Error::HttpId);
        }

        match self.base_handler.state {
            Http3State::Connected => {
                self.base_handler.state = Http3State::GoingAway(goaway_stream_id);
            }
            Http3State::GoingAway(ref mut stream_id) => {
                if goaway_stream_id > *stream_id {
                    return Err(Error::HttpGoaway);
                }
                *stream_id = goaway_stream_id;
            }
            Http3State::Closing(..) | Http3State::Closed(..) => {}
            _ => unreachable!("Should not receive Goaway frame in this state."),
        }

        // Issue reset events for streams >= goaway stream id
        let send_ids: Vec<StreamId> = self
            .base_handler
            .send_streams
            .iter()
            .filter_map(id_gte(goaway_stream_id))
            .collect();
        for id in send_ids {
            // We do not care about streams that are going to be closed.
            mem::drop(self.base_handler.handle_stream_stop_sending(
                id,
                Error::HttpRequestRejected.code(),
                &mut self.conn,
            ));
        }

        let recv_ids: Vec<StreamId> = self
            .base_handler
            .recv_streams
            .iter()
            .filter_map(id_gte(goaway_stream_id))
            .collect();
        for id in recv_ids {
            // We do not care about streams that are going to be closed.
            mem::drop(self.base_handler.handle_stream_reset(
                id,
                Error::HttpRequestRejected.code(),
                &mut self.conn,
            ));
        }

        self.events.goaway_received();

        Ok(())
    }

    /// Increases `max_stream_data` for a `stream_id`.
    ///
    /// # Errors
    ///
    /// Returns `InvalidStreamId` if a stream does not exist or the receiving
    /// side is closed.
    pub fn set_stream_max_data(&mut self, stream_id: StreamId, max_data: u64) -> Res<()> {
        self.conn.set_stream_max_data(stream_id, max_data)?;
        Ok(())
    }

    #[must_use]
    pub fn qpack_decoder_stats(&self) -> QpackStats {
        self.base_handler.qpack_decoder.borrow().stats()
    }

    #[must_use]
    pub fn qpack_encoder_stats(&self) -> QpackStats {
        self.base_handler.qpack_encoder.borrow().stats()
    }

    #[must_use]
    pub fn transport_stats(&self) -> TransportStats {
        self.conn.stats()
    }

    #[must_use]
    pub const fn webtransport_enabled(&self) -> bool {
        self.base_handler.webtransport_enabled()
    }
}

impl EventProvider for Http3Client {
    type Event = Http3ClientEvent;

    /// Return true if there are outstanding events.
    fn has_events(&self) -> bool {
        self.events.has_events()
    }

    /// Get events that indicate state changes on the connection. This method
    /// correctly handles cases where handling one event can obsolete
    /// previously-queued events, or cause new events to be generated.
    fn next_event(&mut self) -> Option<Self::Event> {
        self.events.next_event()
    }
}

#[cfg(test)]
mod tests {
    use std::{mem, time::Duration};

    use neqo_common::{event::Provider, qtrace, Datagram, Decoder, Encoder};
    use neqo_crypto::{AllowZeroRtt, AntiReplay, ResumptionToken};
    use neqo_qpack::{encoder::QPackEncoder, QpackSettings};
    use neqo_transport::{
        CloseReason, ConnectionEvent, ConnectionParameters, Output, State, StreamId, StreamType,
        Version, MIN_INITIAL_PACKET_SIZE, RECV_BUFFER_SIZE, SEND_BUFFER_SIZE,
    };
    use test_fixture::{
        anti_replay, default_server_h3, fixture_init, new_server, now,
        CountingConnectionIdGenerator, DEFAULT_ADDR, DEFAULT_ALPN_H3, DEFAULT_KEYS,
        DEFAULT_SERVER_NAME,
    };

    use super::{
        AuthenticationStatus, Connection, Error, HSettings, Header, Http3Client, Http3ClientEvent,
        Http3Parameters, Http3State, Rc, RefCell,
    };
    use crate::{
        frames::{HFrame, H3_FRAME_TYPE_SETTINGS, H3_RESERVED_FRAME_TYPES},
        qpack_encoder_receiver::EncoderRecvStream,
        settings::{HSetting, HSettingType, H3_RESERVED_SETTINGS},
        Http3Server, Priority, RecvStream,
    };

    fn assert_closed(client: &Http3Client, expected: &Error) {
        match client.state() {
            Http3State::Closing(err) | Http3State::Closed(err) => {
                assert_eq!(err, CloseReason::Application(expected.code()));
            }
            _ => panic!("Wrong state {:?}", client.state()),
        };
    }

    /// Create a http3 client with default configuration.
    pub fn default_http3_client() -> Http3Client {
        default_http3_client_param(100)
    }

    pub fn default_http3_client_param(max_table_size: u64) -> Http3Client {
        fixture_init();
        Http3Client::new(
            DEFAULT_SERVER_NAME,
            Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
            DEFAULT_ADDR,
            DEFAULT_ADDR,
            Http3Parameters::default()
                .connection_parameters(
                    // Disable compatible upgrade, which complicates tests.
                    ConnectionParameters::default()
                        .versions(Version::default(), vec![Version::default()]),
                )
                .max_table_size_encoder(max_table_size)
                .max_table_size_decoder(max_table_size)
                .max_blocked_streams(100)
                .max_concurrent_push_streams(5),
            now(),
        )
        .expect("create a default client")
    }

    const CONTROL_STREAM_TYPE: &[u8] = &[0x0];

    // Encoder stream data
    const ENCODER_STREAM_DATA: &[u8] = &[0x2];
    const ENCODER_STREAM_CAP_INSTRUCTION: &[u8] = &[0x3f, 0x45];

    // Encoder stream data with a change capacity instruction(0x3f, 0x45 = change capacity to 100)
    // This data will be send when 0-RTT is used and we already have a max_table_capacity from
    // resumed settings.
    const ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION: &[u8] = &[0x2, 0x3f, 0x45];

    const ENCODER_STREAM_DATA_WITH_CAP_INST_AND_ENCODING_INST: &[u8] = &[
        0x2, 0x3f, 0x45, 0x67, 0xa7, 0xd4, 0xe5, 0x1c, 0x85, 0xb1, 0x1f, 0x86, 0xa7, 0xd7, 0x71,
        0xd1, 0x69, 0x7f,
    ];

    // Decoder stream data
    const DECODER_STREAM_DATA: &[u8] = &[0x3];

    const PUSH_STREAM_TYPE: &[u8] = &[0x1];

    const CLIENT_SIDE_CONTROL_STREAM_ID: StreamId = StreamId::new(2);
    const CLIENT_SIDE_ENCODER_STREAM_ID: StreamId = StreamId::new(6);
    const CLIENT_SIDE_DECODER_STREAM_ID: StreamId = StreamId::new(10);

    struct TestServer {
        settings: HFrame,
        conn: Connection,
        control_stream_id: Option<StreamId>,
        encoder: Rc<RefCell<QPackEncoder>>,
        encoder_receiver: EncoderRecvStream,
        encoder_stream_id: Option<StreamId>,
        decoder_stream_id: Option<StreamId>,
    }

    impl TestServer {
        pub fn new() -> Self {
            Self::new_with_settings(&[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ])
        }

        pub fn new_with_settings(server_settings: &[HSetting]) -> Self {
            fixture_init();
            let max_table_size = server_settings
                .iter()
                .find(|s| s.setting_type == HSettingType::MaxTableCapacity)
                .map_or(100, |s| s.value);
            let max_blocked_streams = u16::try_from(
                server_settings
                    .iter()
                    .find(|s| s.setting_type == HSettingType::BlockedStreams)
                    .map_or(100, |s| s.value),
            )
            .unwrap();
            let qpack = Rc::new(RefCell::new(QPackEncoder::new(
                &QpackSettings {
                    max_table_size_encoder: max_table_size,
                    max_table_size_decoder: max_table_size,
                    max_blocked_streams,
                },
                true,
            )));
            Self {
                settings: HFrame::Settings {
                    settings: HSettings::new(server_settings),
                },
                conn: default_server_h3(),
                control_stream_id: None,
                encoder: Rc::clone(&qpack),
                encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack),
                encoder_stream_id: None,
                decoder_stream_id: None,
            }
        }

        pub fn new_with_conn(conn: Connection) -> Self {
            let qpack = Rc::new(RefCell::new(QPackEncoder::new(
                &QpackSettings {
                    max_table_size_encoder: 128,
                    max_table_size_decoder: 128,
                    max_blocked_streams: 0,
                },
                true,
            )));
            Self {
                settings: HFrame::Settings {
                    settings: HSettings::new(&[]),
                },
                conn,
                control_stream_id: None,
                encoder: Rc::clone(&qpack),
                encoder_receiver: EncoderRecvStream::new(CLIENT_SIDE_DECODER_STREAM_ID, qpack),
                encoder_stream_id: None,
                decoder_stream_id: None,
            }
        }

        pub fn create_qpack_streams(&mut self) {
            // Create a QPACK encoder stream
            self.encoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap());
            self.encoder
                .borrow_mut()
                .add_send_stream(self.encoder_stream_id.unwrap());
            self.encoder
                .borrow_mut()
                .send_encoder_updates(&mut self.conn)
                .unwrap();

            // Create decoder stream
            self.decoder_stream_id = Some(self.conn.stream_create(StreamType::UniDi).unwrap());
            assert_eq!(
                self.conn
                    .stream_send(self.decoder_stream_id.unwrap(), DECODER_STREAM_DATA)
                    .unwrap(),
                1
            );
        }

        pub fn create_control_stream(&mut self) {
            // Create control stream
            let control = self.conn.stream_create(StreamType::UniDi).unwrap();
            qtrace!(["TestServer"], "control stream: {}", control);
            self.control_stream_id = Some(control);
            // Send stream type on the control stream.
            assert_eq!(
                self.conn
                    .stream_send(self.control_stream_id.unwrap(), CONTROL_STREAM_TYPE)
                    .unwrap(),
                1
            );

            // Encode a settings frame and send it.
            let mut enc = Encoder::default();
            self.settings.encode(&mut enc);
            assert_eq!(
                self.conn
                    .stream_send(self.control_stream_id.unwrap(), enc.as_ref())
                    .unwrap(),
                enc.len()
            );
        }

        pub fn check_client_control_qpack_streams_no_resumption(&mut self) {
            self.check_client_control_qpack_streams(
                ENCODER_STREAM_DATA,
                EXPECTED_REQUEST_HEADER_FRAME,
                false,
                true,
            );
        }

        pub fn check_control_qpack_request_streams_resumption(
            &mut self,
            expect_encoder_stream_data: &[u8],
            expect_request_header: &[u8],
            expect_request: bool,
        ) {
            self.check_client_control_qpack_streams(
                expect_encoder_stream_data,
                expect_request_header,
                expect_request,
                false,
            );
        }

        // Check that server has received correct settings and qpack streams.
        pub fn check_client_control_qpack_streams(
            &mut self,
            expect_encoder_stream_data: &[u8],
            expect_request_header: &[u8],
            expect_request: bool,
            expect_connected: bool,
        ) {
            let mut connected = false;
            let mut control_stream = false;
            let mut qpack_decoder_stream = false;
            let mut qpack_encoder_stream = false;
            let mut request = false;
            while let Some(e) = self.conn.next_event() {
                match e {
                    ConnectionEvent::NewStream { stream_id }
                    | ConnectionEvent::SendStreamWritable { stream_id } => {
                        if expect_request {
                            assert!(matches!(stream_id.as_u64(), 2 | 6 | 10 | 0));
                        } else {
                            assert!(matches!(stream_id.as_u64(), 2 | 6 | 10));
                        }
                    }
                    ConnectionEvent::RecvStreamReadable { stream_id } => {
                        if stream_id == CLIENT_SIDE_CONTROL_STREAM_ID {
                            self.check_control_stream();
                            control_stream = true;
                        } else if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID {
                            // the qpack encoder stream
                            self.read_and_check_stream_data(
                                stream_id,
                                expect_encoder_stream_data,
                                false,
                            );
                            qpack_encoder_stream = true;
                        } else if stream_id == CLIENT_SIDE_DECODER_STREAM_ID {
                            // the qpack decoder stream
                            self.read_and_check_stream_data(stream_id, DECODER_STREAM_DATA, false);
                            qpack_decoder_stream = true;
                        } else if stream_id == 0 {
                            assert!(expect_request);
                            self.read_and_check_stream_data(stream_id, expect_request_header, true);
                            request = true;
                        } else {
                            panic!("unexpected event");
                        }
                    }
                    ConnectionEvent::StateChange(State::Connected) => connected = true,
                    ConnectionEvent::StateChange(_)
                    | ConnectionEvent::SendStreamCreatable { .. } => {}
                    _ => panic!("unexpected event"),
                }
            }
            assert_eq!(connected, expect_connected);
            assert!(control_stream);
            assert!(qpack_encoder_stream);
            assert!(qpack_decoder_stream);
            assert_eq!(request, expect_request);
        }

        // Check that the control stream contains default values.
        // Expect a SETTINGS frame, some grease, and a MAX_PUSH_ID frame.
        // The default test configuration uses:
        //  - max_table_capacity = 100
        //  - max_blocked_streams = 100
        // and a maximum of 5 push streams.
        fn check_control_stream(&mut self) {
            let mut buf = [0_u8; 100];
            let (amount, fin) = self
                .conn
                .stream_recv(CLIENT_SIDE_CONTROL_STREAM_ID, &mut buf)
                .unwrap();
            let mut dec = Decoder::from(&buf[..amount]);
            assert_eq!(dec.decode_varint().unwrap(), 0); // control stream type
            assert_eq!(dec.decode_varint().unwrap(), 4); // SETTINGS
            assert_eq!(
                dec.decode_vvec().unwrap(),
                &[1, 0x40, 0x64, 7, 0x40, 0x64, 0xab, 0x60, 0x37, 0x42, 0x00]
            );

            assert_eq!((dec.decode_varint().unwrap() - 0x21) % 0x1f, 0); // Grease
            assert!(dec.decode_vvec().unwrap().len() < 8);

            assert_eq!(dec.decode_varint().unwrap(), 0xd); // MAX_PUSH_ID
            assert_eq!(dec.decode_vvec().unwrap(), &[5]);

            assert_eq!(dec.remaining(), 0);
            assert!(!fin);
        }

        pub fn read_and_check_stream_data(
            &mut self,
            stream_id: StreamId,
            expected_data: &[u8],
            expected_fin: bool,
        ) {
            let mut buf = [0_u8; 100];
            let (amount, fin) = self.conn.stream_recv(stream_id, &mut buf).unwrap();
            assert_eq!(fin, expected_fin);
            assert_eq!(amount, expected_data.len());
            assert_eq!(&buf[..amount], expected_data);
        }

        pub fn encode_headers(
            &mut self,
            stream_id: StreamId,
            headers: &[Header],
            encoder: &mut Encoder,
        ) {
            let header_block =
                self.encoder
                    .borrow_mut()
                    .encode_header_block(&mut self.conn, headers, stream_id);
            let hframe = HFrame::Headers {
                header_block: header_block.as_ref().to_vec(),
            };
            hframe.encode(encoder);
        }
    }

    fn handshake_only(client: &mut Http3Client, server: &mut TestServer) -> Output {
        assert_eq!(client.state(), Http3State::Initializing);
        let out = client.process_output(now());
        assert_eq!(client.state(), Http3State::Initializing);

        assert_eq!(*server.conn.state(), State::Init);
        let out = server.conn.process(out.dgram(), now());
        assert_eq!(*server.conn.state(), State::Handshaking);

        let out = client.process(out.dgram(), now());
        let out = server.conn.process(out.dgram(), now());
        assert!(out.as_dgram_ref().is_none());

        let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded);
        assert!(client.events().any(authentication_needed));
        client.authenticated(AuthenticationStatus::Ok, now());
        out
    }

    // Perform only Quic transport handshake.
    fn connect_only_transport_with(client: &mut Http3Client, server: &mut TestServer) {
        let out = handshake_only(client, server);

        let out = client.process(out.dgram(), now());
        let connected = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::Connected));
        assert!(client.events().any(connected));

        assert_eq!(client.state(), Http3State::Connected);
        server.conn.process_input(out.dgram().unwrap(), now());
        assert!(server.conn.state().connected());
    }

    // Perform only Quic transport handshake.
    fn connect_only_transport() -> (Http3Client, TestServer) {
        let mut client = default_http3_client();
        let mut server = TestServer::new();
        connect_only_transport_with(&mut client, &mut server);
        (client, server)
    }

    fn send_and_receive_client_settings(client: &mut Http3Client, server: &mut TestServer) {
        // send and receive client settings
        let out = client.process_output(now());
        server.conn.process_input(out.dgram().unwrap(), now());
        server.check_client_control_qpack_streams_no_resumption();
    }

    // Perform Quic transport handshake and exchange Http3 settings.
    fn connect_with(client: &mut Http3Client, server: &mut TestServer) {
        connect_only_transport_with(client, server);

        send_and_receive_client_settings(client, server);

        server.create_control_stream();

        server.create_qpack_streams();
        // Send the server's control and qpack streams data.
        let out = server.conn.process(None::<Datagram>, now());
        client.process_input(out.dgram().unwrap(), now());

        // assert no error occured.
        assert_eq!(client.state(), Http3State::Connected);
    }

    // Perform Quic transport handshake and exchange Http3 settings.
    fn connect_with_connection_parameters(
        server_conn_params: ConnectionParameters,
    ) -> (Http3Client, TestServer) {
        // connecting with default max_table_size
        let mut client = default_http3_client_param(100);
        let server = Connection::new_server(
            test_fixture::DEFAULT_KEYS,
            test_fixture::DEFAULT_ALPN_H3,
            Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
            server_conn_params,
        )
        .unwrap();
        let mut server = TestServer::new_with_conn(server);
        connect_with(&mut client, &mut server);
        (client, server)
    }

    // Perform Quic transport handshake and exchange Http3 settings.
    fn connect() -> (Http3Client, TestServer) {
        let mut client = default_http3_client();
        let mut server = TestServer::new();
        connect_with(&mut client, &mut server);
        (client, server)
    }

    // Fetch request fetch("GET", "https", "something.com", "/", headers).
    fn make_request(
        client: &mut Http3Client,
        close_sending_side: bool,
        headers: &[Header],
    ) -> StreamId {
        let request_stream_id = client
            .fetch(
                now(),
                "GET",
                "https://something.com/",
                headers,
                Priority::default(),
            )
            .unwrap();
        if close_sending_side {
            client.stream_close_send(request_stream_id).unwrap();
        }
        request_stream_id
    }

    // For fetch request fetch("GET", "https", "something.com", "/", &[])
    // the following request header frame will be sent:
    const EXPECTED_REQUEST_HEADER_FRAME: &[u8] = &[
        0x01, 0x10, 0x00, 0x00, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e,
        0x43, 0xd3, 0xc1,
    ];

    // For fetch request fetch("GET", "https", "something.com", "/", &[(String::from("myheaders",
    // "myvalue"))]) the following request header frame will be sent:
    const EXPECTED_REQUEST_HEADER_FRAME_VERSION2: &[u8] = &[
        0x01, 0x11, 0x02, 0x80, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e,
        0x43, 0xd3, 0xc1, 0x10,
    ];

    const HTTP_HEADER_FRAME_0: &[u8] = &[0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x30];

    // The response header from HTTP_HEADER_FRAME (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x30)
    // are decoded into:
    fn check_response_header_0(header: &[Header]) {
        let expected_response_header_0 = &[
            Header::new(":status", "200"),
            Header::new("content-length", "0"),
        ];
        assert_eq!(header, expected_response_header_0);
    }

    const HTTP_RESPONSE_1: &[u8] = &[
        // headers
        0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, // the first data frame
        0x0, 0x3, 0x61, 0x62, 0x63, // the second data frame
        0x0, 0x4, 0x64, 0x65, 0x66, 0x67,
    ];

    const HTTP_RESPONSE_HEADER_ONLY_1: &[u8] = &[
        // headers
        0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37,
    ];
    const HTTP_RESPONSE_DATA_FRAME_1_ONLY_1: &[u8] = &[0x0, 0x3, 0x61, 0x62, 0x63];

    const HTTP_RESPONSE_DATA_FRAME_2_ONLY_1: &[u8] = &[0x0, 0x4, 0x64, 0x65, 0x66, 0x67];

    // The response header from HTTP_RESPONSE_1 (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x36) are
    // decoded into:
    fn check_response_header_1(header: &[Header]) {
        let expected_response_header_1 = &[
            Header::new(":status", "200"),
            Header::new("content-length", "7"),
        ];
        assert_eq!(header, expected_response_header_1);
    }

    const EXPECTED_RESPONSE_DATA_1: &[u8] = &[0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67];

    const HTTP_RESPONSE_2: &[u8] = &[
        // headers
        0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x33, // the data frame
        0x0, 0x3, 0x61, 0x62, 0x63,
    ];

    const HTTP_RESPONSE_HEADER_ONLY_2: &[u8] = &[
        // headers
        0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x33,
    ];

    const HTTP_RESPONSE_DATA_FRAME_ONLY_2: &[u8] = &[
        // the data frame
        0x0, 0x3, 0x61, 0x62, 0x63,
    ];

    // The response header from HTTP_RESPONSE_2 (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x36) are
    // decoded into:
    fn check_response_header_2(header: &[Header]) {
        let expected_response_header_2 = &[
            Header::new(":status", "200"),
            Header::new("content-length", "3"),
        ];
        assert_eq!(header, expected_response_header_2);
    }

    // The data frame payload from HTTP_RESPONSE_2 is:
    const EXPECTED_RESPONSE_DATA_2_FRAME_1: &[u8] = &[0x61, 0x62, 0x63];

    fn make_request_and_exchange_pkts(
        client: &mut Http3Client,
        server: &mut TestServer,
        close_sending_side: bool,
    ) -> StreamId {
        let request_stream_id = make_request(client, close_sending_side, &[]);

        let out = client.process_output(now());
        server.conn.process_input(out.dgram().unwrap(), now());

        // find the new request/response stream and send frame v on it.
        while let Some(e) = server.conn.next_event() {
            match e {
                ConnectionEvent::NewStream { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(stream_id.stream_type(), StreamType::BiDi);
                }
                ConnectionEvent::RecvStreamReadable { stream_id } => {
                    if stream_id == CLIENT_SIDE_ENCODER_STREAM_ID {
                        server.read_and_check_stream_data(
                            stream_id,
                            ENCODER_STREAM_CAP_INSTRUCTION,
                            false,
                        );
                    } else {
                        assert_eq!(stream_id, request_stream_id);
                        server.read_and_check_stream_data(
                            stream_id,
                            EXPECTED_REQUEST_HEADER_FRAME,
                            close_sending_side,
                        );
                    }
                }
                _ => {}
            }
        }
        let dgram = server.conn.process_output(now()).dgram();
        if let Some(d) = dgram {
            client.process_input(d, now());
        }
        request_stream_id
    }

    fn connect_and_send_request(close_sending_side: bool) -> (Http3Client, TestServer, StreamId) {
        let (mut client, mut server) = connect();
        let request_stream_id =
            make_request_and_exchange_pkts(&mut client, &mut server, close_sending_side);
        assert_eq!(request_stream_id, 0);

        (client, server, request_stream_id)
    }

    fn server_send_response_and_exchange_packet(
        client: &mut Http3Client,
        server: &mut TestServer,
        stream_id: StreamId,
        response: impl AsRef<[u8]>,
        close_stream: bool,
    ) {
        _ = server
            .conn
            .stream_send(stream_id, response.as_ref())
            .unwrap();
        if close_stream {
            server.conn.stream_close_send(stream_id).unwrap();
        }
        let out = server.conn.process(None::<Datagram>, now());
        let out = client.process(out.dgram(), now());
        mem::drop(server.conn.process(out.dgram(), now()));
    }

    const PUSH_PROMISE_DATA: &[u8] = &[
        0x00, 0x00, 0xd1, 0xd7, 0x50, 0x89, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x2e, 0x43, 0xd3,
        0xc1,
    ];

    fn check_pushpromise_header(header: &[Header]) {
        let expected_response_header_1 = &[
            Header::new(":method", "GET"),
            Header::new(":scheme", "https"),
            Header::new(":authority", "something.com"),
            Header::new(":path", "/"),
        ];
        assert_eq!(header, expected_response_header_1);
    }

    // Send a push promise with push_id and request_stream_id.
    fn send_push_promise(conn: &mut Connection, stream_id: StreamId, push_id: u64) {
        let frame = HFrame::PushPromise {
            push_id,
            header_block: PUSH_PROMISE_DATA.to_vec(),
        };
        let mut d = Encoder::default();
        frame.encode(&mut d);
        _ = conn.stream_send(stream_id, d.as_ref()).unwrap();
    }

    fn send_push_data_and_exchange_packets(
        client: &mut Http3Client,
        server: &mut TestServer,
        push_id: u8,
        close_push_stream: bool,
    ) -> StreamId {
        let push_stream_id = send_push_data(&mut server.conn, push_id, close_push_stream);

        let out = server.conn.process_output(now());
        let out = client.process(out.dgram(), now());
        mem::drop(server.conn.process(out.dgram(), now()));

        push_stream_id
    }

    fn send_push_promise_and_exchange_packets(
        client: &mut Http3Client,
        server: &mut TestServer,
        stream_id: StreamId,
        push_id: u64,
    ) {
        send_push_promise(&mut server.conn, stream_id, push_id);

        let out = server.conn.process_output(now());
        let out = client.process(out.dgram(), now());
        mem::drop(server.conn.process(out.dgram(), now()));
    }

    fn send_cancel_push_and_exchange_packets(
        client: &mut Http3Client,
        server: &mut TestServer,
        push_id: u64,
    ) {
        let frame = HFrame::CancelPush { push_id };
        let mut d = Encoder::default();
        frame.encode(&mut d);
        server
            .conn
            .stream_send(server.control_stream_id.unwrap(), d.as_ref())
            .unwrap();

        let out = server.conn.process_output(now());
        let out = client.process(out.dgram(), now());
        mem::drop(server.conn.process(out.dgram(), now()));
    }

    const PUSH_DATA: &[u8] = &[
        // headers
        0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x34, // the data frame.
        0x0, 0x4, 0x61, 0x62, 0x63, 0x64,
    ];

    // The response header from PUSH_DATA (0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x34) are
    // decoded into:
    fn check_push_response_header(header: &[Header]) {
        let expected_push_response_header = [
            Header::new(":status", "200"),
            Header::new("content-length", "4"),
        ];
        assert_eq!(header, &expected_push_response_header[..]);
    }

    // The data frame payload from PUSH_DATA is:
    const EXPECTED_PUSH_RESPONSE_DATA_FRAME: &[u8] = &[0x61, 0x62, 0x63, 0x64];

    // Send push data on a push stream:
    //  1) push_stream_type PUSH_STREAM_TYPE
    //  2) push_id
    //  3) PUSH_DATA that contains encoded headers and a data frame.
    // This function can only handle small push_id numbers that fit in a varint of length 1 byte.
    fn send_data_on_push(
        conn: &mut Connection,
        push_stream_id: StreamId,
        push_id: u8,
        data: impl AsRef<[u8]>,
        close_push_stream: bool,
    ) {
        // send data
        _ = conn.stream_send(push_stream_id, PUSH_STREAM_TYPE).unwrap();
        _ = conn.stream_send(push_stream_id, &[push_id]).unwrap();
        _ = conn.stream_send(push_stream_id, data.as_ref()).unwrap();
        if close_push_stream {
            conn.stream_close_send(push_stream_id).unwrap();
        }
    }

    // Send push data on a push stream:
    //  1) push_stream_type PUSH_STREAM_TYPE
    //  2) push_id
    //  3) PUSH_DATA that contains encoded headers and a data frame.
    // This function can only handle small push_id numbers that fit in a varint of length 1 byte.
    fn send_push_data(conn: &mut Connection, push_id: u8, close_push_stream: bool) -> StreamId {
        send_push_with_data(conn, push_id, PUSH_DATA, close_push_stream)
    }

    // Send push data on a push stream:
    //  1) push_stream_type PUSH_STREAM_TYPE
    //  2) push_id
    //  3) and supplied push data.
    // This function can only handle small push_id numbers that fit in a varint of length 1 byte.
    fn send_push_with_data(
        conn: &mut Connection,
        push_id: u8,
        data: &[u8],
        close_push_stream: bool,
    ) -> StreamId {
        // create a push stream
        let push_stream_id = conn.stream_create(StreamType::UniDi).unwrap();
        // send data
        send_data_on_push(conn, push_stream_id, push_id, data, close_push_stream);
        push_stream_id
    }

    struct PushPromiseInfo {
        pub push_id: u64,
        pub ref_stream_id: StreamId,
    }

    // Helper function: read response when a server sends:
    // - HTTP_RESPONSE_2 on the request_stream_id stream,
    // - a number of push promises described by a list of PushPromiseInfo.
    // - and a push streams with push_id in the push_streams list.
    // All push stream contain PUSH_DATA that decodes to headers (that can be checked by calling
    // check_push_response_header) and EXPECTED_PUSH_RESPONSE_DATA_FRAME
    fn read_response_and_push_events(
        client: &mut Http3Client,
        push_promises: &[PushPromiseInfo],
        push_streams: &[u64],
        response_stream_id: StreamId,
    ) {
        let mut num_push_promises = 0;
        let mut num_push_stream_headers = 0;
        let mut num_push_stream_data = 0;
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::PushPromise {
                    push_id,
                    request_stream_id,
                    headers,
                } => {
                    assert!(push_promises
                        .iter()
                        .any(|p| p.push_id == push_id && p.ref_stream_id == request_stream_id));
                    check_pushpromise_header(&headers[..]);
                    num_push_promises += 1;
                }
                Http3ClientEvent::PushHeaderReady {
                    push_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert!(push_streams.contains(&push_id));
                    check_push_response_header(&headers);
                    num_push_stream_headers += 1;
                    assert!(!fin);
                    assert!(!interim);
                }
                Http3ClientEvent::PushDataReadable { push_id } => {
                    assert!(push_streams.contains(&push_id));
                    let mut buf = [0_u8; 100];
                    let (amount, fin) = client.push_read_data(now(), push_id, &mut buf).unwrap();
                    assert!(fin);
                    assert_eq!(amount, EXPECTED_PUSH_RESPONSE_DATA_FRAME.len());
                    assert_eq!(&buf[..amount], EXPECTED_PUSH_RESPONSE_DATA_FRAME);
                    num_push_stream_data += 1;
                }
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert_eq!(stream_id, response_stream_id);
                    check_response_header_2(&headers);
                    assert!(!fin);
                    assert!(!interim);
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert_eq!(stream_id, response_stream_id);
                    let mut buf = [0_u8; 100];
                    let (amount, _) = client.read_data(now(), stream_id, &mut buf).unwrap();
                    assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len());
                    assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1);
                }
                _ => {}
            }
        }

        assert_eq!(num_push_promises, push_promises.len());
        assert_eq!(num_push_stream_headers, push_streams.len());
        assert_eq!(num_push_stream_data, push_streams.len());
    }

    // Client: Test receiving a new control stream and a SETTINGS frame.
    #[test]
    fn client_connect_and_exchange_qpack_and_control_streams() {
        mem::drop(connect());
    }

    // Client: Test that the connection will be closed if control stream
    // has been closed.
    #[test]
    fn client_close_control_stream() {
        let (mut client, mut server) = connect();
        server
            .conn
            .stream_close_send(server.control_stream_id.unwrap())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpClosedCriticalStream);
    }

    // Client: Test that the connection will be closed if the local control stream
    // has been reset.
    #[test]
    fn client_reset_control_stream() {
        let (mut client, mut server) = connect();
        server
            .conn
            .stream_reset_send(server.control_stream_id.unwrap(), Error::HttpNoError.code())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpClosedCriticalStream);
    }

    // Client: Test that the connection will be closed if the server side encoder stream
    // has been reset.
    #[test]
    fn client_reset_server_side_encoder_stream() {
        let (mut client, mut server) = connect();
        server
            .conn
            .stream_reset_send(server.encoder_stream_id.unwrap(), Error::HttpNoError.code())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpClosedCriticalStream);
    }

    // Client: Test that the connection will be closed if the server side decoder stream
    // has been reset.
    #[test]
    fn client_reset_server_side_decoder_stream() {
        let (mut client, mut server) = connect();
        server
            .conn
            .stream_reset_send(server.decoder_stream_id.unwrap(), Error::HttpNoError.code())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpClosedCriticalStream);
    }

    // Client: Test that the connection will be closed if the local control stream
    // has received a stop_sending.
    #[test]
    fn client_stop_sending_control_stream() {
        let (mut client, mut server) = connect();
        server
            .conn
            .stream_stop_sending(CLIENT_SIDE_CONTROL_STREAM_ID, Error::HttpNoError.code())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpClosedCriticalStream);
    }

    // Client: Test that the connection will be closed if the client side encoder stream
    // has received a stop_sending.
    #[test]
    fn client_stop_sending_encoder_stream() {
        let (mut client, mut server) = connect();
        server
            .conn
            .stream_stop_sending(CLIENT_SIDE_ENCODER_STREAM_ID, Error::HttpNoError.code())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpClosedCriticalStream);
    }

    // Client: Test that the connection will be closed if the client side decoder stream
    // has received a stop_sending.
    #[test]
    fn client_stop_sending_decoder_stream() {
        let (mut client, mut server) = connect();
        server
            .conn
            .stream_stop_sending(CLIENT_SIDE_DECODER_STREAM_ID, Error::HttpNoError.code())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpClosedCriticalStream);
    }

    // Client: test missing SETTINGS frame
    // (the first frame sent is a garbage frame).
    #[test]
    fn client_missing_settings() {
        let (mut client, mut server) = connect_only_transport();
        // Create server control stream.
        let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap();
        // Send a HEADERS frame instead (which contains garbage).
        let sent = server
            .conn
            .stream_send(control_stream, &[0x0, 0x1, 0x3, 0x0, 0x1, 0x2]);
        assert_eq!(sent, Ok(6));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpMissingSettings);
    }

    // Client: receiving SETTINGS frame twice causes connection close
    // with error HTTP_UNEXPECTED_FRAME.
    #[test]
    fn client_receive_settings_twice() {
        let (mut client, mut server) = connect();
        // send the second SETTINGS frame.
        let sent = server.conn.stream_send(
            server.control_stream_id.unwrap(),
            &[0x4, 0x6, 0x1, 0x40, 0x64, 0x7, 0x40, 0x64],
        );
        assert_eq!(sent, Ok(8));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpFrameUnexpected);
    }

    fn test_wrong_frame_on_control_stream(v: &[u8]) {
        let (mut client, mut server) = connect();

        // send a frame that is not allowed on the control stream.
        _ = server
            .conn
            .stream_send(server.control_stream_id.unwrap(), v)
            .unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        assert_closed(&client, &Error::HttpFrameUnexpected);
    }

    // send DATA frame on a cortrol stream
    #[test]
    fn data_frame_on_control_stream() {
        test_wrong_frame_on_control_stream(&[0x0, 0x2, 0x1, 0x2]);
    }

    // send HEADERS frame on a cortrol stream
    #[test]
    fn headers_frame_on_control_stream() {
        test_wrong_frame_on_control_stream(&[0x1, 0x2, 0x1, 0x2]);
    }

    // send PUSH_PROMISE frame on a cortrol stream
    #[test]
    fn push_promise_frame_on_control_stream() {
        test_wrong_frame_on_control_stream(&[0x5, 0x2, 0x1, 0x2]);
    }

    // send PRIORITY_UPDATE frame on a control stream to the client
    #[test]
    fn priority_update_request_on_control_stream() {
        test_wrong_frame_on_control_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]);
    }

    #[test]
    fn priority_update_push_on_control_stream() {
        test_wrong_frame_on_control_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]);
    }

    fn test_wrong_frame_on_push_stream(v: &[u8]) {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        send_push_promise(&mut server.conn, request_stream_id, 0);
        // Create a push stream
        let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap();

        // Send the push stream type byte, push_id and frame v.
        _ = server
            .conn
            .stream_send(push_stream_id, &[0x01, 0x0])
            .unwrap();
        _ = server.conn.stream_send(push_stream_id, v).unwrap();

        let out = server.conn.process_output(now());
        let out = client.process(out.dgram(), now());
        mem::drop(server.conn.process(out.dgram(), now()));

        assert_closed(&client, &Error::HttpFrameUnexpected);
    }

    #[test]
    fn cancel_push_frame_on_push_stream() {
        test_wrong_frame_on_push_stream(&[0x3, 0x1, 0x5]);
    }

    #[test]
    fn settings_frame_on_push_stream() {
        test_wrong_frame_on_push_stream(&[0x4, 0x4, 0x6, 0x4, 0x8, 0x4]);
    }

    #[test]
    fn push_promise_frame_on_push_stream() {
        test_wrong_frame_on_push_stream(&[0x5, 0x2, 0x1, 0x2]);
    }

    #[test]
    fn priority_update_request_on_push_stream() {
        test_wrong_frame_on_push_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]);
    }

    #[test]
    fn priority_update_push_on_push_stream() {
        test_wrong_frame_on_push_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]);
    }

    #[test]
    fn goaway_frame_on_push_stream() {
        test_wrong_frame_on_push_stream(&[0x7, 0x1, 0x5]);
    }

    #[test]
    fn max_push_id_frame_on_push_stream() {
        test_wrong_frame_on_push_stream(&[0xd, 0x1, 0x5]);
    }

    // send DATA frame before a header frame
    #[test]
    fn data_frame_on_push_stream() {
        test_wrong_frame_on_push_stream(&[0x0, 0x2, 0x1, 0x2]);
    }

    // Client: receive unknown stream type
    // This function also tests getting stream id that does not fit into a single byte.
    #[test]
    fn client_received_unknown_stream() {
        let (mut client, mut server) = connect();

        // create a stream with unknown type.
        let new_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap();
        _ = server
            .conn
            .stream_send(new_stream_id, &[0x41, 0x19, 0x4, 0x4, 0x6, 0x0, 0x8, 0x0])
            .unwrap();
        let out = server.conn.process_output(now());
        let out = client.process(out.dgram(), now());
        mem::drop(server.conn.process(out.dgram(), now()));

        // check for stop-sending with Error::HttpStreamCreation.
        let mut stop_sending_event_found = false;
        while let Some(e) = server.conn.next_event() {
            if let ConnectionEvent::SendStreamStopSending {
                stream_id,
                app_error,
            } = e
            {
                stop_sending_event_found = true;
                assert_eq!(stream_id, new_stream_id);
                assert_eq!(app_error, Error::HttpStreamCreation.code());
            }
        }
        assert!(stop_sending_event_found);
        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test wrong frame on req/rec stream
    fn test_wrong_frame_on_request_stream(v: &[u8]) {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        _ = server.conn.stream_send(request_stream_id, v).unwrap();

        // Generate packet with the above bad h3 input
        let out = server.conn.process_output(now());
        // Process bad input and close the connection.
        mem::drop(client.process(out.dgram(), now()));

        assert_closed(&client, &Error::HttpFrameUnexpected);
    }

    #[test]
    fn cancel_push_frame_on_request_stream() {
        test_wrong_frame_on_request_stream(&[0x3, 0x1, 0x5]);
    }

    #[test]
    fn settings_frame_on_request_stream() {
        test_wrong_frame_on_request_stream(&[0x4, 0x4, 0x6, 0x4, 0x8, 0x4]);
    }

    #[test]
    fn goaway_frame_on_request_stream() {
        test_wrong_frame_on_request_stream(&[0x7, 0x1, 0x5]);
    }

    #[test]
    fn max_push_id_frame_on_request_stream() {
        test_wrong_frame_on_request_stream(&[0xd, 0x1, 0x5]);
    }

    #[test]
    fn priority_update_request_on_request_stream() {
        test_wrong_frame_on_request_stream(&[0x80, 0x0f, 0x07, 0x00, 0x01, 0x03]);
    }

    #[test]
    fn priority_update_push_on_request_stream() {
        test_wrong_frame_on_request_stream(&[0x80, 0x0f, 0x07, 0x01, 0x01, 0x03]);
    }

    // Test reading of a slowly streamed frame. bytes are received one by one
    #[test]
    fn frame_reading() {
        let (mut client, mut server) = connect_only_transport();

        // create a control stream.
        let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap();

        // send the stream type
        let mut sent = server.conn.stream_send(control_stream, &[0x0]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // start sending SETTINGS frame
        sent = server.conn.stream_send(control_stream, &[0x4]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x4]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x6]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x0]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x8]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x0]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        assert_eq!(client.state(), Http3State::Connected);

        // Now test PushPromise
        sent = server.conn.stream_send(control_stream, &[0x5]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x5]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x4]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x61]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x62]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x63]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        sent = server.conn.stream_send(control_stream, &[0x64]);
        assert_eq!(sent, Ok(1));
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // PUSH_PROMISE on a control stream will cause an error
        assert_closed(&client, &Error::HttpFrameUnexpected);
    }

    #[test]
    fn fetch_basic() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // send response - 200  Content-Length: 7
        // with content: 'abcdefg'.
        // The content will be send in 2 DATA frames.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_1,
            true,
        );

        let http_events = client.events().collect::<Vec<_>>();
        assert_eq!(http_events.len(), 2);
        for e in http_events {
            match e {
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    check_response_header_1(&headers);
                    assert!(!fin);
                    assert!(!interim);
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    let mut buf = [0_u8; 100];
                    let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
                    assert!(fin);
                    assert_eq!(amount, EXPECTED_RESPONSE_DATA_1.len());
                    assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_1);
                }
                _ => {}
            }
        }

        // after this stream will be removed from hcoon. We will check this by trying to read
        // from the stream and that should fail.
        let mut buf = [0_u8; 100];
        let res = client.read_data(now(), request_stream_id, &mut buf);
        assert_eq!(res.unwrap_err(), Error::InvalidStreamId);

        client.close(now(), 0, "");
    }

    /// Force both endpoints into an idle state.
    /// Do this by opening unidirectional streams at both endpoints and sending
    /// a partial unidirectional stream type (which the receiver has to buffer),
    /// then delivering packets out of order.
    /// This forces the receiver to create an acknowledgment, which will allow
    /// the peer to become idle.
    fn force_idle(client: &mut Http3Client, server: &mut TestServer) {
        // Send a partial unidirectional stream ID.
        // Note that this can't close the stream as that causes the receiver
        // to send `MAX_STREAMS`, which would prevent it from becoming idle.
        fn dgram(c: &mut Connection) -> Datagram {
            let stream = c.stream_create(StreamType::UniDi).unwrap();
            _ = c.stream_send(stream, &[0xc0]).unwrap();
            c.process_output(now()).dgram().unwrap()
        }

        let d1 = dgram(&mut client.conn);
        let d2 = dgram(&mut client.conn);
        server.conn.process_input(d2, now());
        server.conn.process_input(d1, now());
        let d3 = dgram(&mut server.conn);
        let d4 = dgram(&mut server.conn);
        client.process_input(d4, now());
        client.process_input(d3, now());
        let ack = client.process_output(now()).dgram();
        server.conn.process_input(ack.unwrap(), now());
    }

    /// The client should keep a connection alive if it has unanswered requests.
    #[test]
    fn fetch_keep_alive() {
        let (mut client, mut server, _request_stream_id) = connect_and_send_request(true);
        force_idle(&mut client, &mut server);

        let idle_timeout = ConnectionParameters::default().get_idle_timeout();
        assert_eq!(client.process_output(now()).callback(), idle_timeout / 2);
    }

    // Helper function: read response when a server sends HTTP_RESPONSE_2.
    fn read_response(
        client: &mut Http3Client,
        server: &mut Connection,
        request_stream_id: StreamId,
    ) {
        let out = server.process_output(now());
        client.process(out.dgram(), now());

        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    check_response_header_2(&headers);
                    assert!(!fin);
                    assert!(!interim);
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    let mut buf = [0_u8; 100];
                    let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
                    assert!(fin);
                    assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len());
                    assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1);
                }
                _ => {}
            }
        }

        // after this stream will be removed from client. We will check this by trying to read
        // from the stream and that should fail.
        let mut buf = [0_u8; 100];
        let res = client.read_data(now(), request_stream_id, &mut buf);
        assert!(res.is_err());
        assert_eq!(res.unwrap_err(), Error::InvalidStreamId);

        client.close(now(), 0, "");
    }

    // Data sent with a request:
    const REQUEST_BODY: &[u8] = &[0x64, 0x65, 0x66];
    // Corresponding data frame that server will receive.
    const EXPECTED_REQUEST_BODY_FRAME: &[u8] = &[0x0, 0x3, 0x64, 0x65, 0x66];

    // Send a request with the request body.
    #[test]
    fn fetch_with_data() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // Get DataWritable for the request stream so that we can write the request body.
        let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. });
        assert!(client.events().any(data_writable));
        let sent = client.send_data(request_stream_id, REQUEST_BODY).unwrap();
        assert_eq!(sent, REQUEST_BODY.len());
        client.stream_close_send(request_stream_id).unwrap();

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        // find the new request/response stream and send response on it.
        while let Some(e) = server.conn.next_event() {
            match e {
                ConnectionEvent::NewStream { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(stream_id.stream_type(), StreamType::BiDi);
                }
                ConnectionEvent::RecvStreamReadable { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);

                    // Read request body.
                    let mut buf = [0_u8; 100];
                    let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap();
                    assert!(fin);
                    assert_eq!(amount, EXPECTED_REQUEST_BODY_FRAME.len());
                    assert_eq!(&buf[..amount], EXPECTED_REQUEST_BODY_FRAME);

                    // send response - 200  Content-Length: 3
                    // with content: 'abc'.
                    _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap();
                    server.conn.stream_close_send(stream_id).unwrap();
                }
                _ => {}
            }
        }

        read_response(&mut client, &mut server.conn, request_stream_id);
    }

    // send a request with request body containing request_body. We expect to receive
    // expected_data_frame_header.
    fn fetch_with_data_length_xbytes(request_body: &[u8], expected_data_frame_header: &[u8])&nbsp;{
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // Get DataWritable for the request stream so that we can write the request body.
        let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. });
        assert!(client.events().any(data_writable));
        let sent = client.send_data(request_stream_id, request_body);
        assert_eq!(sent, Ok(request_body.len()));

        // Close stream.
        client.stream_close_send(request_stream_id).unwrap();

        // We need to loop a bit until all data has been sent.
        let mut out = client.process_output(now());
        for _i in 0..20 {
            out = server.conn.process(out.dgram(), now());
            out = client.process(out.dgram(), now());
        }

        // check request body is received.
        // Then send a response.
        while let Some(e) = server.conn.next_event() {
            if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
                if stream_id == request_stream_id {
                    // Read the DATA frame.
                    let mut buf = vec![1_u8; RECV_BUFFER_SIZE];
                    let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap();
                    assert!(fin);
                    assert_eq!(
                        amount,
                        request_body.len() + expected_data_frame_header.len()
                    );

                    // Check the DATA frame header
                    assert_eq!(
                        &buf[..expected_data_frame_header.len()],
                        expected_data_frame_header
                    );

                    // Check data.
                    assert_eq!(&buf[expected_data_frame_header.len()..amount], request_body);

                    // send response - 200  Content-Length: 3
                    // with content: 'abc'.
                    _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap();
                    server.conn.stream_close_send(stream_id).unwrap();
                }
            }
        }

        read_response(&mut client, &mut server.conn, request_stream_id);
    }

    // send a request with 63 bytes. The DATA frame length field will still have 1 byte.
    #[test]
    fn fetch_with_data_length_63bytes() {
        fetch_with_data_length_xbytes(&[0_u8; 63], &[0x0, 0x3f]);
    }

    // send a request with 64 bytes. The DATA frame length field will need 2 byte.
    #[test]
    fn fetch_with_data_length_64bytes() {
        fetch_with_data_length_xbytes(&[0_u8; 64], &[0x0, 0x40, 0x40]);
    }

    // send a request with 16383 bytes. The DATA frame length field will still have 2 byte.
    #[test]
    fn fetch_with_data_length_16383bytes() {
        fetch_with_data_length_xbytes(&[0_u8; 16383], &[0x0, 0x7f, 0xff]);
    }

    // send a request with 16384 bytes. The DATA frame length field will need 4 byte.
    #[test]
    fn fetch_with_data_length_16384bytes() {
        fetch_with_data_length_xbytes(&[0_u8; 16384], &[0x0, 0x80, 0x0, 0x40, 0x0]);
    }

    // Send 2 data frames so that the second one cannot fit into the send_buf and it is only
    // partialy sent. We check that the sent data is correct.
    fn fetch_with_two_data_frames(
        first_frame: &[u8],
        expected_first_data_frame_header: &[u8],
        expected_second_data_frame_header: &[u8],
        expected_second_data_frame: &[u8],
    ) {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // Get DataWritable for the request stream so that we can write the request body.
        let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. });
        assert!(client.events().any(data_writable));

        // Send the first frame.
        let sent = client.send_data(request_stream_id, first_frame);
        assert_eq!(sent, Ok(first_frame.len()));

        // The second frame cannot fit.
        let sent = client.send_data(request_stream_id, &vec![0_u8; SEND_BUFFER_SIZE]);
        assert_eq!(sent, Ok(expected_second_data_frame.len()));

        // Close stream.
        client.stream_close_send(request_stream_id).unwrap();

        let mut out = client.process_output(now());
        // We need to loop a bit until all data has been sent. Once for every 1K
        // of data.
        for _i in 0..SEND_BUFFER_SIZE / 1000 {
            out = server.conn.process(out.dgram(), now());
            out = client.process(out.dgram(), now());
        }

        // Check received frames and send a response.
        while let Some(e) = server.conn.next_event() {
            if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
                if stream_id == request_stream_id {
                    // Read DATA frames.
                    let mut buf = vec![1_u8; RECV_BUFFER_SIZE];
                    let (amount, fin) = server.conn.stream_recv(stream_id, &mut buf).unwrap();
                    assert!(fin);
                    assert_eq!(
                        amount,
                        expected_first_data_frame_header.len()
                            + first_frame.len()
                            + expected_second_data_frame_header.len()
                            + expected_second_data_frame.len()
                    );

                    // Check the first DATA frame header
                    let end = expected_first_data_frame_header.len();
                    assert_eq!(&buf[..end], expected_first_data_frame_header);

                    // Check the first frame data.
                    let start = end;
                    let end = end + first_frame.len();
                    assert_eq!(&buf[start..end], first_frame);

                    // Check the second DATA frame header
                    let start2 = end;
                    let end2 = end + expected_second_data_frame_header.len();
                    assert_eq!(&buf[start2..end2], expected_second_data_frame_header);

                    // Check the second frame data.
                    let start3 = end2;
                    let end3 = end2 + expected_second_data_frame.len();
                    assert_eq!(&buf[start3..end3], expected_second_data_frame);

                    // send response - 200  Content-Length: 3
                    // with content: 'abc'.
                    _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_2).unwrap();
                    server.conn.stream_close_send(stream_id).unwrap();
                }
            }
        }

        read_response(&mut client, &mut server.conn, request_stream_id);
    }

    fn alloc_buffer(size: usize) -> (Vec<u8>, Vec<u8>) {
        let data_frame = HFrame::Data { len: size as u64 };
        let mut enc = Encoder::default();
        data_frame.encode(&mut enc);

        (vec![0_u8; size], enc.as_ref().to_vec())
    }

    // Send 2 frames. For the second one we can only send 63 bytes.
    // After the first frame there is exactly 63+2 bytes left in the send buffer.
    #[test]
    fn fetch_two_data_frame_second_63bytes() {
        let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 88);
        fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]);
    }

    // Send 2 frames. For the second one we can only send 63 bytes.
    // After the first frame there is exactly 63+3 bytes left in the send buffer,
    // but we can only send 63 bytes.
    #[test]
    fn fetch_two_data_frame_second_63bytes_place_for_66() {
        let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 89);
        fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x3f], &[0_u8; 63]);
    }

    // Send 2 frames. For the second one we can only send 64 bytes.
    // After the first frame there is exactly 64+3 bytes left in the send buffer,
    // but we can only send 64 bytes.
    #[test]
    fn fetch_two_data_frame_second_64bytes_place_for_67() {
        let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 90);
        fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x40, 0x40], &[0_u8; 64]);
    }

    // Send 2 frames. For the second one we can only send 16383 bytes.
    // After the first frame there is exactly 16383+3 bytes left in the send buffer.
    #[test]
    fn fetch_two_data_frame_second_16383bytes() {
        let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16409);
        fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]);
    }

    // Send 2 frames. For the second one we can only send 16383 bytes.
    // After the first frame there is exactly 16383+4 bytes left in the send buffer, but we can only
    // send 16383 bytes.
    #[test]
    fn fetch_two_data_frame_second_16383bytes_place_for_16387() {
        let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16410);
        fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]);
    }

    // Send 2 frames. For the second one we can only send 16383 bytes.
    // After the first frame there is exactly 16383+5 bytes left in the send buffer, but we can only
    // send 16383 bytes.
    #[test]
    fn fetch_two_data_frame_second_16383bytes_place_for_16388() {
        let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16411);
        fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x7f, 0xff], &[0_u8; 16383]);
    }

    // Send 2 frames. For the second one we can send 16384 bytes.
    // After the first frame there is exactly 16384+5 bytes left in the send buffer, but we can send
    // 16384 bytes.
    #[test]
    fn fetch_two_data_frame_second_16384bytes_place_for_16389() {
        let (buf, hdr) = alloc_buffer(SEND_BUFFER_SIZE - 16412);
        fetch_with_two_data_frames(&buf, &hdr, &[0x0, 0x80, 0x0, 0x40, 0x0], &[0_u8; 16384]);
    }

    // Test receiving STOP_SENDING with the HttpNoError error code.
    #[test]
    fn stop_sending_early_response() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // Stop sending with early_response.
        assert_eq!(
            Ok(()),
            server
                .conn
                .stream_stop_sending(request_stream_id, Error::HttpNoError.code())
        );

        // send response - 200  Content-Length: 3
        // with content: 'abc'.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            true,
        );

        let mut stop_sending = false;
        let mut response_headers = false;
        let mut response_body = false;
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::StopSending { stream_id, error } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(error, Error::HttpNoError.code());
                    // assert that we cannot send any more request data.
                    assert_eq!(
                        Err(Error::InvalidStreamId),
                        client.send_data(request_stream_id, &[0_u8; 10])
                    );
                    stop_sending = true;
                }
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    check_response_header_2(&headers);
                    assert!(!fin);
                    assert!(!interim);
                    response_headers = true;
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    let mut buf = [0_u8; 100];
                    let (amount, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
                    assert!(fin);
                    assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len());
                    assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1);
                    response_body = true;
                }
                _ => {}
            }
        }
        assert!(response_headers);
        assert!(response_body);
        assert!(stop_sending);

        // after this stream will be removed from client. We will check this by trying to read
        // from the stream and that should fail.
        let mut buf = [0_u8; 100];
        let res = client.read_data(now(), request_stream_id, &mut buf);
        assert!(res.is_err());
        assert_eq!(res.unwrap_err(), Error::InvalidStreamId);

        client.close(now(), 0, "");
    }

    // Server sends stop sending and reset.
    #[test]
    fn stop_sending_other_error_with_reset() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // Stop sending with RequestRejected.
        assert_eq!(
            Ok(()),
            server
                .conn
                .stream_stop_sending(request_stream_id, Error::HttpRequestRejected.code())
        );
        // also reset with RequestRejected.
        assert_eq!(
            Ok(()),
            server
                .conn
                .stream_reset_send(request_stream_id, Error::HttpRequestRejected.code())
        );

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        let mut reset = false;
        let mut stop_sending = false;
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::StopSending { stream_id, error } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(error, Error::HttpRequestRejected.code());
                    stop_sending = true;
                }
                Http3ClientEvent::Reset {
                    stream_id,
                    error,
                    local,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(error, Error::HttpRequestRejected.code());
                    assert!(!local);
                    reset = true;
                }
                Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => {
                    panic!("We should not get any headers or data");
                }
                _ => {}
            }
        }

        assert!(reset);
        assert!(stop_sending);

        // after this stream will be removed from client. We will check this by trying to read
        // from the stream and that should fail.
        let mut buf = [0_u8; 100];
        let res = client.read_data(now(), request_stream_id, &mut buf);
        assert!(res.is_err());
        assert_eq!(res.unwrap_err(), Error::InvalidStreamId);

        client.close(now(), 0, "");
    }

    // Server sends stop sending with RequestRejected, but it does not send reset.
    #[test]
    fn stop_sending_other_error_wo_reset() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // Stop sending with RequestRejected.
        assert_eq!(
            Ok(()),
            server
                .conn
                .stream_stop_sending(request_stream_id, Error::HttpRequestRejected.code())
        );

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        let mut stop_sending = false;

        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::StopSending { stream_id, error } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(error, Error::HttpRequestRejected.code());
                    stop_sending = true;
                }
                Http3ClientEvent::Reset { .. } => {
                    panic!("We should not get StopSending.");
                }
                Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => {
                    panic!("We should not get any headers or data");
                }
                _ => {}
            }
        }

        assert!(stop_sending);

        // after this we can still read from a stream.
        let mut buf = [0_u8; 100];
        let res = client.read_data(now(), request_stream_id, &mut buf);
        assert!(res.is_ok());

        client.close(now(), 0, "");
    }

    // Server sends stop sending and reset. We have some events for that stream already
    // in client.events. The events will be removed.
    #[test]
    fn stop_sending_and_reset_other_error_with_events() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // send response - 200  Content-Length: 3
        // with content: 'abc'.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            false,
        );
        // At this moment we have some new events, i.e. a HeadersReady event

        // Send a stop sending and reset.
        assert_eq!(
            Ok(()),
            server
                .conn
                .stream_stop_sending(request_stream_id, Error::HttpRequestCancelled.code())
        );
        assert_eq!(
            Ok(()),
            server
                .conn
                .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code())
        );

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        let mut reset = false;

        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::StopSending { stream_id, error } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(error, Error::HttpRequestCancelled.code());
                }
                Http3ClientEvent::Reset {
                    stream_id,
                    error,
                    local,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(error, Error::HttpRequestCancelled.code());
                    assert!(!local);
                    reset = true;
                }
                Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => {
                    panic!("We should not get any headers or data");
                }
                _ => {}
            }
        }

        assert!(reset);

        // after this stream will be removed from client. We will check this by trying to read
        // from the stream and that should fail.
        let mut buf = [0_u8; 100];
        let res = client.read_data(now(), request_stream_id, &mut buf);
        assert!(res.is_err());
        assert_eq!(res.unwrap_err(), Error::InvalidStreamId);

        client.close(now(), 0, "");
    }

    // Server sends stop sending with code that is not HttpNoError.
    // We have some events for that stream already in the client.events.
    // The events will be removed.
    #[test]
    fn stop_sending_other_error_with_events() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // send response - 200  Content-Length: 3
        // with content: 'abc'.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            false,
        );
        // At this moment we have some new event, i.e. a HeadersReady event

        // Send a stop sending.
        assert_eq!(
            Ok(()),
            server
                .conn
                .stream_stop_sending(request_stream_id, Error::HttpRequestCancelled.code())
        );

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        let mut stop_sending = false;
        let mut header_ready = false;

        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::StopSending { stream_id, error } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(error, Error::HttpRequestCancelled.code());
                    stop_sending = true;
                }
                Http3ClientEvent::Reset { .. } => {
                    panic!("We should not get StopSending.");
                }
                Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => {
                    header_ready = true;
                }
                _ => {}
            }
        }

        assert!(stop_sending);
        assert!(header_ready);

        // after this, we can sill read data from a sttream.
        let mut buf = [0_u8; 100];
        let (amount, fin) = client
            .read_data(now(), request_stream_id, &mut buf)
            .unwrap();
        assert!(!fin);
        assert_eq!(amount, EXPECTED_RESPONSE_DATA_2_FRAME_1.len());
        assert_eq!(&buf[..amount], EXPECTED_RESPONSE_DATA_2_FRAME_1);

        client.close(now(), 0, "");
    }

    // Server sends a reset. We will close sending side as well.
    #[test]
    fn reset_wo_stop_sending() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // Send a reset.
        assert_eq!(
            Ok(()),
            server
                .conn
                .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code())
        );

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        let mut reset = false;

        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::StopSending { .. } => {
                    panic!("We should not get StopSending.");
                }
                Http3ClientEvent::Reset {
                    stream_id,
                    error,
                    local,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    assert_eq!(error, Error::HttpRequestCancelled.code());
                    assert!(!local);
                    reset = true;
                }
                Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::DataReadable { .. } => {
                    panic!("We should not get any headers or data");
                }
                _ => {}
            }
        }

        assert!(reset);

        // after this stream will be removed from client. We will check this by trying to read
        // from the stream and that should fail.
        let mut buf = [0_u8; 100];
        let res = client.read_data(now(), request_stream_id, &mut buf);
        assert!(res.is_err());
        assert_eq!(res.unwrap_err(), Error::InvalidStreamId);

        client.close(now(), 0, "");
    }

    fn test_incomplet_frame(buf: &[u8], error: &Error) {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            buf,
            true,
        );

        while let Some(e) = client.next_event() {
            if let Http3ClientEvent::DataReadable { stream_id } = e {
                assert_eq!(stream_id, request_stream_id);
                let mut buf_res = [0_u8; 100];
                let res = client.read_data(now(), stream_id, &mut buf_res);
                assert!(res.is_err());
                assert_eq!(res.unwrap_err(), Error::HttpFrame);
            }
        }
        assert_closed(&client, error);
    }

    // Incomplete DATA frame
    #[test]
    fn incomplet_data_frame() {
        test_incomplet_frame(&HTTP_RESPONSE_2[..12], &Error::HttpFrame);
    }

    // Incomplete HEADERS frame
    #[test]
    fn incomplet_headers_frame() {
        test_incomplet_frame(&HTTP_RESPONSE_2[..7], &Error::HttpFrame);
    }

    #[test]
    fn incomplet_unknown_frame() {
        test_incomplet_frame(&[0x21], &Error::HttpFrame);
    }

    // test goaway
    #[test]
    fn goaway() {
        let (mut client, mut server) = connect();
        let request_stream_id_1 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_1, 0);
        let request_stream_id_2 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_2, 4);
        let request_stream_id_3 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_3, 8);

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        _ = server
            .conn
            .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8])
            .unwrap();

        // find the new request/response stream and send frame v on it.
        while let Some(e) = server.conn.next_event() {
            if let ConnectionEvent::RecvStreamReadable { stream_id } = e {
                let mut buf = [0_u8; 100];
                _ = server.conn.stream_recv(stream_id, &mut buf).unwrap();
                if (stream_id == request_stream_id_1) || (stream_id == request_stream_id_2) {
                    // send response - 200  Content-Length: 7
                    // with content: 'abcdefg'.
                    // The content will be send in 2 DATA frames.
                    _ = server.conn.stream_send(stream_id, HTTP_RESPONSE_1).unwrap();
                    server.conn.stream_close_send(stream_id).unwrap();
                }
            }
        }
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        let mut stream_reset = false;
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady { headers, fin, .. } => {
                    check_response_header_1(&headers);
                    assert!(!fin);
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert!(
                        (stream_id == request_stream_id_1) || (stream_id == request_stream_id_2)
                    );
                    let mut buf = [0_u8; 100];
                    assert_eq!(
                        (EXPECTED_RESPONSE_DATA_1.len(), true),
                        client.read_data(now(), stream_id, &mut buf).unwrap()
                    );
                }
                Http3ClientEvent::Reset {
                    stream_id,
                    error,
                    local,
                } => {
                    assert_eq!(stream_id, request_stream_id_3);
                    assert_eq!(error, Error::HttpRequestRejected.code());
                    assert!(!local);
                    stream_reset = true;
                }
                _ => {}
            }
        }

        assert!(stream_reset);
        assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(8)));

        // Check that a new request cannot be made.
        assert_eq!(
            client.fetch(
                now(),
                "GET",
                &("https", "something.com", "/"),
                &[],
                Priority::default()
            ),
            Err(Error::AlreadyClosed)
        );

        client.close(now(), 0, "");
    }

    #[test]
    fn multiple_goaways() {
        let (mut client, mut server) = connect();
        let request_stream_id_1 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_1, 0);
        let request_stream_id_2 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_2, 4);
        let request_stream_id_3 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_3, 8);

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        // First send a Goaway frame with an higher number
        _ = server
            .conn
            .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8])
            .unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // Check that there is one reset for stream_id 8
        let mut stream_reset_1 = 0;
        while let Some(e) = client.next_event() {
            if let Http3ClientEvent::Reset {
                stream_id,
                error,
                local,
            } = e
            {
                assert_eq!(stream_id, request_stream_id_3);
                assert_eq!(error, Error::HttpRequestRejected.code());
                assert!(!local);
                stream_reset_1 += 1;
            }
        }

        assert_eq!(stream_reset_1, 1);
        assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(8)));

        // Server sends another GOAWAY frame
        _ = server
            .conn
            .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x4])
            .unwrap();

        // Send response for stream 0
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id_1,
            HTTP_RESPONSE_1,
            true,
        );

        let mut stream_reset_2 = 0;
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady { headers, fin, .. } => {
                    check_response_header_1(&headers);
                    assert!(!fin);
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert!(stream_id == request_stream_id_1);
                    let mut buf = [0_u8; 100];
                    assert_eq!(
                        (EXPECTED_RESPONSE_DATA_1.len(), true),
                        client.read_data(now(), stream_id, &mut buf).unwrap()
                    );
                }
                Http3ClientEvent::Reset {
                    stream_id,
                    error,
                    local,
                } => {
                    assert_eq!(stream_id, request_stream_id_2);
                    assert_eq!(error, Error::HttpRequestRejected.code());
                    assert!(!local);
                    stream_reset_2 += 1;
                }
                _ => {}
            }
        }

        assert_eq!(stream_reset_2, 1);
        assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(4)));
    }

    #[test]
    fn multiple_goaways_stream_id_increased() {
        let (mut client, mut server) = connect();
        let request_stream_id_1 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_1, 0);
        let request_stream_id_2 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_2, 4);
        let request_stream_id_3 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_3, 8);

        // First send a Goaway frame with a smaller number
        _ = server
            .conn
            .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x4])
            .unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        assert_eq!(client.state(), Http3State::GoingAway(StreamId::new(4)));

        // Now send a Goaway frame with an higher number
        _ = server
            .conn
            .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x8])
            .unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        assert_closed(&client, &Error::HttpGeneralProtocol);
    }

    #[test]
    fn goaway_wrong_stream_id() {
        let (mut client, mut server) = connect();

        _ = server
            .conn
            .stream_send(server.control_stream_id.unwrap(), &[0x7, 0x1, 0x9])
            .unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        assert_closed(&client, &Error::HttpId);
    }

    // Close stream before headers.
    #[test]
    fn stream_fin_wo_headers() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);
        // send fin before sending any data.
        server.conn.stream_close_send(request_stream_id).unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // Recv HeaderReady wo headers with fin.
        let e = client.events().next().unwrap();
        assert_eq!(
            e,
            Http3ClientEvent::Reset {
                stream_id: request_stream_id,
                error: Error::HttpGeneralProtocolStream.code(),
                local: true,
            }
        );

        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    // Close stream imemediately after headers.
    #[test]
    fn stream_fin_after_headers() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_HEADER_ONLY_2,
            true,
        );

        // Recv HeaderReady with headers and fin.
        let e = client.events().next().unwrap();
        if let Http3ClientEvent::HeaderReady {
            stream_id,
            headers,
            interim,
            fin,
        } = e
        {
            assert_eq!(stream_id, request_stream_id);
            check_response_header_2(&headers);
            assert!(fin);
            assert!(!interim);
        } else {
            panic!("wrong event type");
        }

        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    // Send headers, read headers and than close stream.
    // We should get HeaderReady and a DataReadable
    #[test]
    fn stream_fin_after_headers_are_read_wo_data_frame() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);
        // Send some good data wo fin
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_HEADER_ONLY_2,
            false,
        );

        // Recv headers wo fin
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    check_response_header_2(&headers);
                    assert!(!fin);
                    assert!(!interim);
                }
                Http3ClientEvent::DataReadable { .. } => {
                    panic!("We should not receive a DataGeadable event!");
                }
                _ => {}
            };
        }

        // ok NOW send fin
        server.conn.stream_close_send(request_stream_id).unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // Recv DataReadable wo data with fin
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady { .. } => {
                    panic!("We should not get another HeaderReady!");
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    let mut buf = [0_u8; 100];
                    let res = client.read_data(now(), stream_id, &mut buf);
                    let (len, fin) = res.expect("should read");
                    assert_eq!(0, len);
                    assert!(fin);
                }
                _ => {}
            };
        }

        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    // Send headers and an empty data frame, then close the stream.
    #[test]
    fn stream_fin_after_headers_and_a_empty_data_frame() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send headers.
        _ = server
            .conn
            .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2)
            .unwrap();
        // Send an empty data frame.
        _ = server
            .conn
            .stream_send(request_stream_id, &[0x00, 0x00])
            .unwrap();
        // ok NOW send fin
        server.conn.stream_close_send(request_stream_id).unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // Recv HeaderReady with fin.
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    check_response_header_2(&headers);
                    assert!(!fin);
                    assert!(!interim);
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    let mut buf = [0_u8; 100];
                    assert_eq!(Ok((0, true)), client.read_data(now(), stream_id, &mut buf));
                }
                _ => {}
            };
        }

        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), request_stream_id, &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    // Send headers and an empty data frame. Read headers and then close the stream.
    // We should get a HeaderReady without fin and a DataReadable wo data and with fin.
    #[test]
    fn stream_fin_after_headers_an_empty_data_frame_are_read() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);
        // Send some good data wo fin
        // Send headers.
        _ = server
            .conn
            .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2)
            .unwrap();
        // Send an empty data frame.
        _ = server
            .conn
            .stream_send(request_stream_id, &[0x00, 0x00])
            .unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // Recv headers wo fin
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    check_response_header_2(&headers);
                    assert!(!fin);
                    assert!(!interim);
                }
                Http3ClientEvent::DataReadable { .. } => {
                    panic!("We should not receive a DataGeadable event!");
                }
                _ => {}
            };
        }

        // ok NOW send fin
        server.conn.stream_close_send(request_stream_id).unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // Recv no data, but do get fin
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady { .. } => {
                    panic!("We should not get another HeaderReady!");
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    let mut buf = [0_u8; 100];
                    let res = client.read_data(now(), stream_id, &mut buf);
                    let (len, fin) = res.expect("should read");
                    assert_eq!(0, len);
                    assert!(fin);
                }
                _ => {}
            };
        }

        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    #[test]
    fn stream_fin_after_a_data_frame() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);
        // Send some good data wo fin
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            false,
        );

        // Recv some good data wo fin
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    interim,
                    fin,
                } => {
                    assert_eq!(stream_id, request_stream_id);
                    check_response_header_2(&headers);
                    assert!(!fin);
                    assert!(!interim);
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    assert_eq!(stream_id, request_stream_id);
                    let mut buf = [0_u8; 100];
                    let res = client.read_data(now(), stream_id, &mut buf);
                    let (len, fin) = res.expect("should have data");
                    assert_eq!(len, EXPECTED_RESPONSE_DATA_2_FRAME_1.len());
                    assert_eq!(&buf[..len], EXPECTED_RESPONSE_DATA_2_FRAME_1);
                    assert!(!fin);
                }
                _ => {}
            };
        }

        // ok NOW send fin
        server.conn.stream_close_send(request_stream_id).unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // fin wo data should generate DataReadable
        let e = client.events().next().unwrap();
        if let Http3ClientEvent::DataReadable { stream_id } = e {
            assert_eq!(stream_id, request_stream_id);
            let mut buf = [0; 100];
            let res = client.read_data(now(), stream_id, &mut buf);
            let (len, fin) = res.expect("should read");
            assert_eq!(0, len);
            assert!(fin);
        } else {
            panic!("wrong event type");
        }

        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    #[test]
    fn multiple_data_frames() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send two data frames with fin
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_1,
            true,
        );

        // Read first frame
        match client.events().nth(1).unwrap() {
            Http3ClientEvent::DataReadable { stream_id } => {
                assert_eq!(stream_id, request_stream_id);
                let mut buf = [0_u8; 100];
                assert_eq!(
                    (EXPECTED_RESPONSE_DATA_1.len(), true),
                    client.read_data(now(), stream_id, &mut buf).unwrap()
                );
            }
            x => {
                panic!("event {x:?}");
            }
        }

        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    #[test]
    fn receive_grease_before_response() {
        // Construct an unknown frame.
        const UNKNOWN_FRAME_LEN: usize = 832;

        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        let mut enc = Encoder::with_capacity(UNKNOWN_FRAME_LEN + 4);
        enc.encode_varint(1028_u64); // Arbitrary type.
        enc.encode_varint(UNKNOWN_FRAME_LEN as u64);
        let mut buf: Vec<_> = enc.into();
        buf.resize(UNKNOWN_FRAME_LEN + buf.len(), 0);
        _ = server.conn.stream_send(request_stream_id, &buf).unwrap();

        // Send a headers and a data frame with fin
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            true,
        );

        // Read first frame
        match client.events().nth(1).unwrap() {
            Http3ClientEvent::DataReadable { stream_id } => {
                assert_eq!(stream_id, request_stream_id);
                let mut buf = [0_u8; 100];
                let (len, fin) = client.read_data(now(), stream_id, &mut buf).unwrap();
                assert_eq!(len, EXPECTED_RESPONSE_DATA_2_FRAME_1.len());
                assert_eq!(&buf[..len], EXPECTED_RESPONSE_DATA_2_FRAME_1);
                assert!(fin);
            }
            x => {
                panic!("event {x:?}");
            }
        }
        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    #[test]
    fn read_frames_header_blocked() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let headers = vec![
            Header::new(":status", "200"),
            Header::new("my-header", "my-header"),
            Header::new("content-length", "3"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &headers,
            request_stream_id,
        );
        let hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };

        // Send the encoder instructions, but delay them so that the stream is blocked on decoding
        // headers.
        let encoder_inst_pkt = server.conn.process_output(now());

        // Send response
        let mut d = Encoder::default();
        hframe.encode(&mut d);
        let d_frame = HFrame::Data { len: 3 };
        d_frame.encode(&mut d);
        d.encode(&[0x61, 0x62, 0x63]);
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            true,
        );

        let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        assert!(!client.events().any(header_ready_event));

        // Let client receive the encoder instructions.
        mem::drop(client.process(encoder_inst_pkt.dgram(), now()));

        let out = server.conn.process_output(now());
        mem::drop(client.process(out.dgram(), now()));
        mem::drop(client.process_output(now()));

        let mut recv_header = false;
        let mut recv_data = false;
        // Now the stream is unblocked and both headers and data will be consumed.
        while let Some(e) = client.next_event() {
            match e {
                Http3ClientEvent::HeaderReady { stream_id, .. } => {
                    assert_eq!(stream_id, request_stream_id);
                    recv_header = true;
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    recv_data = true;
                    assert_eq!(stream_id, request_stream_id);
                }
                x => {
                    panic!("event {x:?}");
                }
            }
        }
        assert!(recv_header && recv_data);
    }

    #[test]
    fn read_frames_header_blocked_with_fin_after_headers() {
        let (mut hconn, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut hconn, &mut server);

        let sent_headers = vec![
            Header::new(":status", "200"),
            Header::new("my-header", "my-header"),
            Header::new("content-length", "0"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &sent_headers,
            request_stream_id,
        );
        let hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };

        // Send the encoder instructions, but delay them so that the stream is blocked on decoding
        // headers.
        let encoder_inst_pkt = server.conn.process_output(now());

        let mut d = Encoder::default();
        hframe.encode(&mut d);

        server_send_response_and_exchange_packet(
            &mut hconn,
            &mut server,
            request_stream_id,
            &d,
            true,
        );

        let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        assert!(!hconn.events().any(header_ready_event));

        // Let client receive the encoder instructions.
        let _out = hconn.process(encoder_inst_pkt.dgram(), now());

        let mut recv_header = false;
        // Now the stream is unblocked. After headers we will receive a fin.
        while let Some(e) = hconn.next_event() {
            if let Http3ClientEvent::HeaderReady {
                stream_id,
                headers,
                interim,
                fin,
            } = e
            {
                assert_eq!(stream_id, request_stream_id);
                assert_eq!(headers.as_ref(), sent_headers);
                assert!(fin);
                assert!(!interim);
                recv_header = true;
            } else {
                panic!("event {e:?}");
            }
        }
        assert!(recv_header);
    }

    fn exchange_token(client: &mut Http3Client, server: &mut Connection) -> ResumptionToken {
        server.send_ticket(now(), &[]).expect("can send ticket");
        let out = server.process_output(now());
        assert!(out.as_dgram_ref().is_some());
        client.process_input(out.dgram().unwrap(), now());
        // We do not have a token so we need to wait for a resumption token timer to trigger.
        client.process_output(now() + Duration::from_millis(250));
        assert_eq!(client.state(), Http3State::Connected);
        client
            .events()
            .find_map(|e| {
                if let Http3ClientEvent::ResumptionToken(token) = e {
                    Some(token)
                } else {
                    None
                }
            })
            .unwrap()
    }

    fn start_with_0rtt() -> (Http3Client, TestServer) {
        let (mut client, mut server) = connect();
        let token = exchange_token(&mut client, &mut server.conn);

        let mut client = default_http3_client();

        let server = TestServer::new();

        assert_eq!(client.state(), Http3State::Initializing);
        client
            .enable_resumption(now(), &token)
            .expect("Set resumption token.");

        assert_eq!(client.state(), Http3State::ZeroRtt);
        let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt));
        assert!(client.events().any(zerortt_event));

        (client, server)
    }

    #[test]
    fn zero_rtt_negotiated() {
        let (mut client, mut server) = start_with_0rtt();

        let out = client.process_output(now());

        assert_eq!(client.state(), Http3State::ZeroRtt);
        assert_eq!(*server.conn.state(), State::Init);
        let out = server.conn.process(out.dgram(), now());

        // Check that control and qpack streams are received and a
        // SETTINGS frame has been received.
        // Also qpack encoder stream will send "change capacity" instruction because it has
        // the peer settings already.
        server.check_control_qpack_request_streams_resumption(
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
            EXPECTED_REQUEST_HEADER_FRAME,
            false,
        );

        assert_eq!(*server.conn.state(), State::Handshaking);
        let out = client.process(out.dgram(), now());
        assert_eq!(client.state(), Http3State::Connected);

        mem::drop(server.conn.process(out.dgram(), now()));
        assert!(server.conn.state().connected());

        assert!(client.tls_info().unwrap().resumed());
        assert!(server.conn.tls_info().unwrap().resumed());
    }

    #[test]
    fn zero_rtt_send_request() {
        let (mut client, mut server) = start_with_0rtt();

        let request_stream_id =
            make_request(&mut client, true, &[Header::new("myheaders", "myvalue")]);
        assert_eq!(request_stream_id, 0);

        let out = client.process_output(now());

        assert_eq!(client.state(), Http3State::ZeroRtt);
        assert_eq!(*server.conn.state(), State::Init);
        let out = server.conn.process(out.dgram(), now());

        // Check that control and qpack streams are received and a
        // SETTINGS frame has been received.
        // Also qpack encoder stream will send "change capacity" instruction because it has
        // the peer settings already.
        server.check_control_qpack_request_streams_resumption(
            ENCODER_STREAM_DATA_WITH_CAP_INST_AND_ENCODING_INST,
            EXPECTED_REQUEST_HEADER_FRAME_VERSION2,
            true,
        );

        assert_eq!(*server.conn.state(), State::Handshaking);
        let out = client.process(out.dgram(), now());
        assert_eq!(client.state(), Http3State::Connected);
        let out = server.conn.process(out.dgram(), now());
        assert!(server.conn.state().connected());
        let out = client.process(out.dgram(), now());
        assert!(out.dgram().is_none());

        // After the server has been connected, send a response.
        let res = server.conn.stream_send(request_stream_id, HTTP_RESPONSE_2);
        assert_eq!(res, Ok(HTTP_RESPONSE_2.len()));
        server.conn.stream_close_send(request_stream_id).unwrap();

        read_response(&mut client, &mut server.conn, request_stream_id);

        assert!(client.tls_info().unwrap().resumed());
        assert!(server.conn.tls_info().unwrap().resumed());
    }

    #[test]
    fn zero_rtt_before_resumption_token() {
        let mut client = default_http3_client();
        assert!(client
            .fetch(
                now(),
                "GET",
                &("https", "something.com", "/"),
                &[],
                Priority::default()
            )
            .is_err());
    }

    #[test]
    fn zero_rtt_send_reject() {
        let (mut client, mut server) = connect();
        let token = exchange_token(&mut client, &mut server.conn);

        let mut client = default_http3_client();
        let mut server = Connection::new_server(
            test_fixture::DEFAULT_KEYS,
            test_fixture::DEFAULT_ALPN_H3,
            Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
            ConnectionParameters::default(),
        )
        .unwrap();
        // Using a freshly initialized anti-replay context
        // should result in the server rejecting 0-RTT.
        let ar = AntiReplay::new(now(), test_fixture::ANTI_REPLAY_WINDOW, 1, 3)
            .expect("setup anti-replay");
        server
            .server_enable_0rtt(&ar, AllowZeroRtt {})
            .expect("enable 0-RTT");

        assert_eq!(client.state(), Http3State::Initializing);
        client
            .enable_resumption(now(), &token)
            .expect("Set resumption token.");
        let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt));
        assert!(client.events().any(zerortt_event));

        // Send ClientHello.
        let client_hs = client.process_output(now());
        assert!(client_hs.as_dgram_ref().is_some());

        // Create a request
        let request_stream_id = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id, 0);

        let client_0rtt = client.process_output(now());
        assert!(client_0rtt.as_dgram_ref().is_some());

        let server_hs = server.process(client_hs.dgram(), now());
        assert!(server_hs.as_dgram_ref().is_some()); // Should produce ServerHello etc...
        let server_ignored = server.process(client_0rtt.dgram(), now());
        assert!(server_ignored.as_dgram_ref().is_none());

        // The server shouldn't receive that 0-RTT data.
        let recvd_stream_evt = |e| matches!(e, ConnectionEvent::NewStream { .. });
        assert!(!server.events().any(recvd_stream_evt));

        // Client should get a rejection.
        let client_out = client.process(server_hs.dgram(), now());
        assert!(client_out.as_dgram_ref().is_some());
        let recvd_0rtt_reject = |e| e == Http3ClientEvent::ZeroRttRejected;
        assert!(client.events().any(recvd_0rtt_reject));

        // ...and the client stream should be gone.
        let res = client.stream_close_send(request_stream_id);
        assert!(res.is_err());
        assert_eq!(res.unwrap_err(), Error::InvalidStreamId);

        // Client will send Setting frame and open new qpack streams.
        mem::drop(server.process(client_out.dgram(), now()));
        TestServer::new_with_conn(server).check_client_control_qpack_streams_no_resumption();

        // Check that we can send a request and that the stream_id starts again from 0.
        assert_eq!(make_request(&mut client, false, &[]), 0);
    }

    // Connect to a server, get token and reconnect using 0-rtt. Seerver sends new Settings.
    fn zero_rtt_change_settings(
        original_settings: &[HSetting],
        resumption_settings: &[HSetting],
        expected_client_state: &Http3State,
        expected_encoder_stream_data: &[u8],
    ) {
        let mut client = default_http3_client();
        let mut server = TestServer::new_with_settings(original_settings);
        // Connect and get a token
        connect_with(&mut client, &mut server);
        let token = exchange_token(&mut client, &mut server.conn);

        let mut client = default_http3_client();
        let mut server = TestServer::new_with_settings(resumption_settings);
        assert_eq!(client.state(), Http3State::Initializing);
        client
            .enable_resumption(now(), &token)
            .expect("Set resumption token.");
        assert_eq!(client.state(), Http3State::ZeroRtt);
        let out = client.process_output(now());

        assert_eq!(client.state(), Http3State::ZeroRtt);
        assert_eq!(*server.conn.state(), State::Init);
        let out = server.conn.process(out.dgram(), now());

        // Check that control and qpack streams and a SETTINGS frame are received.
        // Also qpack encoder stream will send "change capacity" instruction because it has
        // the peer settings already.
        server.check_control_qpack_request_streams_resumption(
            expected_encoder_stream_data,
            EXPECTED_REQUEST_HEADER_FRAME,
            false,
        );

        assert_eq!(*server.conn.state(), State::Handshaking);
        let out = client.process(out.dgram(), now());
        assert_eq!(client.state(), Http3State::Connected);

        mem::drop(server.conn.process(out.dgram(), now()));
        assert!(server.conn.state().connected());

        assert!(client.tls_info().unwrap().resumed());
        assert!(server.conn.tls_info().unwrap().resumed());

        // Send new settings.
        let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap();
        let mut enc = Encoder::default();
        server.settings.encode(&mut enc);
        let mut sent = server.conn.stream_send(control_stream, CONTROL_STREAM_TYPE);
        assert_eq!(sent.unwrap(), CONTROL_STREAM_TYPE.len());
        sent = server.conn.stream_send(control_stream, enc.as_ref());
        assert_eq!(sent.unwrap(), enc.len());

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        assert_eq!(&client.state(), expected_client_state);
        assert!(server.conn.state().connected());
    }

    #[test]
    fn zero_rtt_new_server_setting_are_the_same() {
        // Send a new server settings that are the same as the old one.
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Connected,
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_omit_max_table() {
        // Send a new server settings without MaxTableCapacity
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Closing(CloseReason::Application(265)),
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_omit_blocked_streams() {
        // Send a new server settings without BlockedStreams
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Closing(CloseReason::Application(265)),
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_omit_header_list_size() {
        // Send a new server settings without MaxHeaderListSize
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
            ],
            &Http3State::Connected,
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_max_table_size_bigger() {
        // Send a new server settings MaxTableCapacity=200
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 200),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Closing(CloseReason::Application(514)),
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_max_table_size_smaller() {
        // Send a new server settings MaxTableCapacity=50
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 50),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Closing(CloseReason::Application(265)),
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_blocked_streams_bigger() {
        // Send a new server settings withBlockedStreams=200
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 200),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Connected,
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_blocked_streams_smaller() {
        // Send a new server settings withBlockedStreams=50
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 50),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Closing(CloseReason::Application(265)),
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_max_header_size_bigger() {
        // Send a new server settings with MaxHeaderListSize=20000
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 20000),
            ],
            &Http3State::Connected,
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_new_server_setting_max_headers_size_smaller() {
        // Send the new server settings with MaxHeaderListSize=5000
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 5000),
            ],
            &Http3State::Closing(CloseReason::Application(265)),
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_max_table_size_first_omitted() {
        // send server original settings without MaxTableCapacity
        // send new server setting with MaxTableCapacity
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Connected,
            ENCODER_STREAM_DATA,
        );
    }

    #[test]
    fn zero_rtt_blocked_streams_first_omitted() {
        // Send server original settings without BlockedStreams
        // Send the new server settings with BlockedStreams
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Connected,
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn zero_rtt_max_header_size_first_omitted() {
        // Send server settings without MaxHeaderListSize
        // Send new settings with MaxHeaderListSize.
        zero_rtt_change_settings(
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 10000),
            ],
            &[
                HSetting::new(HSettingType::MaxTableCapacity, 100),
                HSetting::new(HSettingType::BlockedStreams, 100),
                HSetting::new(HSettingType::MaxHeaderListSize, 10000),
            ],
            &Http3State::Closing(CloseReason::Application(265)),
            ENCODER_STREAM_DATA_WITH_CAP_INSTRUCTION,
        );
    }

    #[test]
    fn trailers_with_fin_after_headers() {
        // Make a new connection.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send HEADER frame.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_HEADER_FRAME_0,
            false,
        );

        // Check response headers.
        let mut response_headers = false;
        while let Some(e) = client.next_event() {
            if let Http3ClientEvent::HeaderReady {
                stream_id,
                headers,
                interim,
                fin,
            } = e
            {
                assert_eq!(stream_id, request_stream_id);
                check_response_header_0(&headers);
                assert!(!fin);
                assert!(!interim);
                response_headers = true;
            }
        }
        assert!(response_headers);

        // Send trailers
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_HEADER_FRAME_0,
            true,
        );

        let events: Vec<Http3ClientEvent> = client.events().collect();

        // We already had HeaderReady
        let header_ready: fn(&Http3ClientEvent) -> _ =
            |e| matches!(*e, Http3ClientEvent::HeaderReady { .. });
        assert!(!events.iter().any(header_ready));

        // Check that we have a DataReady event. Reading from the stream will return fin=true.
        let data_readable: fn(&Http3ClientEvent) -> _ =
            |e| matches!(*e, Http3ClientEvent::DataReadable { .. });
        assert!(events.iter().any(data_readable));
        let mut buf = [0_u8; 100];
        let (len, fin) = client
            .read_data(now(), request_stream_id, &mut buf)
            .unwrap();
        assert_eq!(0, len);
        assert!(fin);
    }

    #[test]
    fn trailers_with_later_fin_after_headers() {
        // Make a new connection.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send HEADER frame.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_HEADER_FRAME_0,
            false,
        );

        // Check response headers.
        let mut response_headers = false;
        while let Some(e) = client.next_event() {
            if let Http3ClientEvent::HeaderReady {
                stream_id,
                headers,
                interim,
                fin,
            } = e
            {
                assert_eq!(stream_id, request_stream_id);
                check_response_header_0(&headers);
                assert!(!fin);
                assert!(!interim);
                response_headers = true;
            }
        }
        assert!(response_headers);

        // Send trailers
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_HEADER_FRAME_0,
            false,
        );

        // Check that we do not have a DataReady event.
        let data_readable = |e| matches!(e, Http3ClientEvent::DataReadable { .. });
        assert!(!client.events().any(data_readable));

        server.conn.stream_close_send(request_stream_id).unwrap();

        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        let events: Vec<Http3ClientEvent> = client.events().collect();

        // We already had HeaderReady
        let header_ready: fn(&Http3ClientEvent) -> _ =
            |e| matches!(*e, Http3ClientEvent::HeaderReady { .. });
        assert!(!events.iter().any(header_ready));

        // Check that we have a DataReady event. Reading from the stream will return fin=true.
        let data_readable_fn: fn(&Http3ClientEvent) -> _ =
            |e| matches!(*e, Http3ClientEvent::DataReadable { .. });
        assert!(events.iter().any(data_readable_fn));
        let mut buf = [0_u8; 100];
        let (len, fin) = client
            .read_data(now(), request_stream_id, &mut buf)
            .unwrap();
        assert_eq!(0, len);
        assert!(fin);
    }

    #[test]
    fn data_after_trailers_after_headers() {
        // Make a new connection.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send HEADER frame.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_HEADER_FRAME_0,
            false,
        );

        // Check response headers.
        let mut response_headers = false;
        while let Some(e) = client.next_event() {
            if let Http3ClientEvent::HeaderReady {
                stream_id,
                headers,
                interim,
                fin,
            } = e
            {
                assert_eq!(stream_id, request_stream_id);
                check_response_header_0(&headers);
                assert!(!fin);
                assert!(!interim);
                response_headers = true;
            }
        }
        assert!(response_headers);

        // Send trailers
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_HEADER_FRAME_0,
            false,
        );

        // Check that we do not have a DataReady event.
        let data_readable = |e| matches!(e, Http3ClientEvent::DataReadable { .. });
        assert!(!client.events().any(data_readable));

        // Send Data frame.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            [0x0, 0x3, 0x61, 0x62, 0x63], // a data frame
            false,
        );

        assert_closed(&client, &Error::HttpFrameUnexpected);
    }

    #[test]
    fn transport_stream_readable_event_after_all_data() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(false);

        // Send headers.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            false,
        );

        // Send an empty data frame and a fin
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            [0x0, 0x0],
            true,
        );

        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Ok((3, true))
        );

        client.process_output(now());
    }

    #[test]
    fn no_data_ready_events_after_fin() {
        // Connect exchange headers and send a request. Also check if the correct header frame has
        // been sent.
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // send response - 200  Content-Length: 7
        // with content: 'abcdefg'.
        // The content will be send in 2 DATA frames.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_1,
            true,
        );

        let data_readable_event = |e| matches!(e, Http3ClientEvent::DataReadable { stream_id } if stream_id == request_stream_id);
        assert!(client.events().any(data_readable_event));

        let mut buf = [0_u8; 100];
        assert_eq!(
            (EXPECTED_RESPONSE_DATA_1.len(), true),
            client
                .read_data(now(), request_stream_id, &mut buf)
                .unwrap()
        );

        assert!(!client.events().any(data_readable_event));
    }

    #[test]
    fn reading_small_chunks_of_data() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // send response - 200  Content-Length: 7
        // with content: 'abcdefg'.
        // The content will be send in 2 DATA frames.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_1,
            true,
        );

        let data_readable_event = |e| matches!(e, Http3ClientEvent::DataReadable { stream_id } if stream_id == request_stream_id);
        assert!(client.events().any(data_readable_event));

        let mut buf1 = [0_u8; 1];
        assert_eq!(
            (1, false),
            client
                .read_data(now(), request_stream_id, &mut buf1)
                .unwrap()
        );
        assert!(!client.events().any(data_readable_event));

        // Now read only until the end of the first frame. The firs framee has 3 bytes.
        let mut buf2 = [0_u8; 2];
        assert_eq!(
            (2, false),
            client
                .read_data(now(), request_stream_id, &mut buf2)
                .unwrap()
        );
        assert!(!client.events().any(data_readable_event));

        // Read a half of the second frame.
        assert_eq!(
            (2, false),
            client
                .read_data(now(), request_stream_id, &mut buf2)
                .unwrap()
        );
        assert!(!client.events().any(data_readable_event));

        // Read the rest.
        // Read a half of the second frame.
        assert_eq!(
            (2, true),
            client
                .read_data(now(), request_stream_id, &mut buf2)
                .unwrap()
        );
        assert!(!client.events().any(data_readable_event));
    }

    #[test]
    fn zero_length_data_at_end() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // send response - 200  Content-Length: 7
        // with content: 'abcdefg'.
        // The content will be send in 2 DATA frames.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_1,
            false,
        );
        // Send a zero-length frame at the end of the stream.
        _ = server.conn.stream_send(request_stream_id, &[0, 0]).unwrap();
        server.conn.stream_close_send(request_stream_id).unwrap();
        let dgram = server.conn.process_output(now()).dgram();
        client.process_input(dgram.unwrap(), now());

        let data_readable_event = |e: &_| matches!(e, Http3ClientEvent::DataReadable { stream_id } if *stream_id == request_stream_id);
        assert_eq!(client.events().filter(data_readable_event).count(), 1);

        let mut buf = [0_u8; 10];
        assert_eq!(
            (7, true),
            client
                .read_data(now(), request_stream_id, &mut buf)
                .unwrap()
        );
        assert!(!client.events().any(|e| data_readable_event(&e)));
    }

    #[test]
    fn stream_blocked_no_remote_encoder_stream() {
        let (mut client, mut server) = connect_only_transport();

        send_and_receive_client_settings(&mut client, &mut server);

        server.create_control_stream();
        // Send the server's control stream data.
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        server.create_qpack_streams();
        let qpack_pkt1 = server.conn.process_output(now());
        // delay delivery of this packet.

        let request_stream_id = make_request(&mut client, true, &[]);
        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        setup_server_side_encoder(&mut client, &mut server);

        let headers = vec![
            Header::new(":status", "200"),
            Header::new("my-header", "my-header"),
            Header::new("content-length", "3"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &headers,
            request_stream_id,
        );
        let hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };

        // Send the encoder instructions,
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // Send response
        let mut d = Encoder::default();
        hframe.encode(&mut d);
        let d_frame = HFrame::Data { len: 3 };
        d_frame.encode(&mut d);
        d.encode(&[0x61, 0x62, 0x63]);
        _ = server
            .conn
            .stream_send(request_stream_id, d.as_ref())
            .unwrap();
        server.conn.stream_close_send(request_stream_id).unwrap();

        let out = server.conn.process_output(now());
        mem::drop(client.process(out.dgram(), now()));

        let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        assert!(!client.events().any(header_ready_event));

        // Let client receive the encoder instructions.
        mem::drop(client.process(qpack_pkt1.dgram(), now()));

        assert!(client.events().any(header_ready_event));
    }

    // Client: receive a push stream
    #[test]
    fn push_single() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send a push promise.
        send_push_promise(&mut server.conn, request_stream_id, 0);

        // create a push stream.
        _ = send_push_data(&mut server.conn, 0, true);

        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            true,
        );

        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 0,
                ref_stream_id: request_stream_id,
            }],
            &[0],
            request_stream_id,
        );

        assert_eq!(client.state(), Http3State::Connected);

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));
    }

    /// We can't keep the connection alive on the basis of a push promise,
    /// nor do we want to if the push promise is not interesting to the client.
    /// We do the next best thing, which is keep any push stream alive if the
    /// client reads from it.
    #[test]
    fn push_keep_alive() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);
        let idle_timeout = ConnectionParameters::default().get_idle_timeout();

        // Promise a push and deliver, but don't close the stream.
        send_push_promise(&mut server.conn, request_stream_id, 0);
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            true,
        );
        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 0,
                ref_stream_id: request_stream_id,
            }],
            &[], // No push streams yet.
            request_stream_id,
        );

        // The client will become idle here.
        force_idle(&mut client, &mut server);
        assert_eq!(client.process_output(now()).callback(), idle_timeout);

        // Reading push data will stop the client from being idle.
        _ = send_push_data(&mut server.conn, 0, false);
        let out = server.conn.process_output(now());
        client.process_input(out.dgram().unwrap(), now());

        let mut buf = [0; 16];
        let (read, fin) = client.push_read_data(now(), 0, &mut buf).unwrap();
        assert!(read < buf.len());
        assert!(!fin);

        force_idle(&mut client, &mut server);
        assert_eq!(client.process_output(now()).callback(), idle_timeout / 2);
    }

    #[test]
    fn push_multiple() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send a push promise.
        send_push_promise(&mut server.conn, request_stream_id, 0);
        send_push_promise(&mut server.conn, request_stream_id, 1);

        // create a push stream.
        _ = send_push_data(&mut server.conn, 0, true);

        // create a second push stream.
        _ = send_push_data(&mut server.conn, 1, true);

        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            true,
        );

        read_response_and_push_events(
            &mut client,
            &[
                PushPromiseInfo {
                    push_id: 0,
                    ref_stream_id: request_stream_id,
                },
                PushPromiseInfo {
                    push_id: 1,
                    ref_stream_id: request_stream_id,
                },
            ],
            &[0, 1],
            request_stream_id,
        );

        assert_eq!(client.state(), Http3State::Connected);

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));
        assert_eq!(client.cancel_push(1), Err(Error::InvalidStreamId));
    }

    #[test]
    fn push_after_headers() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send response headers
        _ = server
            .conn
            .stream_send(request_stream_id, HTTP_RESPONSE_HEADER_ONLY_2)
            .unwrap();

        // Send a push promise.
        send_push_promise(&mut server.conn, request_stream_id, 0);

        // create a push stream.
        _ = send_push_data(&mut server.conn, 0, true);

        // Send response data
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_DATA_FRAME_ONLY_2,
            true,
        );

        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 0,
                ref_stream_id: request_stream_id,
            }],
            &[0],
            request_stream_id,
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    #[test]
    fn push_after_response() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send response headers and data frames
        _ = server
            .conn
            .stream_send(request_stream_id, HTTP_RESPONSE_2)
            .unwrap();

        // Send a push promise.
        send_push_promise(&mut server.conn, request_stream_id, 0);
        // create a push stream.
        send_push_data_and_exchange_packets(&mut client, &mut server, 0, true);

        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 0,
                ref_stream_id: request_stream_id,
            }],
            &[0],
            request_stream_id,
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    fn check_push_events(client: &mut Http3Client) -> bool {
        let any_push_event = |e| {
            matches!(
                e,
                Http3ClientEvent::PushPromise { .. }
                    | Http3ClientEvent::PushHeaderReady { .. }
                    | Http3ClientEvent::PushDataReadable { .. }
            )
        };
        client.events().any(any_push_event)
    }

    fn check_data_readable(client: &mut Http3Client) -> bool {
        let any_data_event = |e| matches!(e, Http3ClientEvent::DataReadable { .. });
        client.events().any(any_data_event)
    }

    fn check_header_ready(client: &mut Http3Client) -> bool {
        let any_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        client.events().any(any_event)
    }

    fn check_header_ready_and_push_promise(client: &mut Http3Client) -> bool {
        let any_event = |e| {
            matches!(
                e,
                Http3ClientEvent::HeaderReady { .. } | Http3ClientEvent::PushPromise { .. }
            )
        };
        client.events().any(any_event)
    }

    #[test]
    fn push_stream_before_promise() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // create a push stream.
        send_push_data_and_exchange_packets(&mut client, &mut server, 0, true);

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Now send push_promise
        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0);

        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            true,
        );

        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 0,
                ref_stream_id: request_stream_id,
            }],
            &[0],
            request_stream_id,
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test receiving pushes out of order.
    // Push_id 5 is received first, therefore Push_id 3 will be in the PushState:Init state.
    // Start push_id 3 by receiving a push_promise and then a push stream with the push_id 3.
    #[test]
    fn push_out_of_order_1() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3);
        // Start a push stream with push_id 3.
        send_push_data_and_exchange_packets(&mut client, &mut server, 3, true);

        assert_eq!(client.state(), Http3State::Connected);

        read_response_and_push_events(
            &mut client,
            &[
                PushPromiseInfo {
                    push_id: 5,
                    ref_stream_id: request_stream_id,
                },
                PushPromiseInfo {
                    push_id: 3,
                    ref_stream_id: request_stream_id,
                },
            ],
            &[3],
            request_stream_id,
        );
        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test receiving pushes out of order.
    // Push_id 5 is received first, therefore Push_id 3 will be in the PushState:Init state.
    // Start push_id 3 by receiving a push stream with push_id 3 and then a push_promise.
    #[test]
    fn push_out_of_order_2() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5);

        send_push_data_and_exchange_packets(&mut client, &mut server, 3, true);
        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3);

        read_response_and_push_events(
            &mut client,
            &[
                PushPromiseInfo {
                    push_id: 5,
                    ref_stream_id: request_stream_id,
                },
                PushPromiseInfo {
                    push_id: 3,
                    ref_stream_id: request_stream_id,
                },
            ],
            &[3],
            request_stream_id,
        );
        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test receiving pushes out of order.
    // Push_id 5 is received first and read so that it is removed from the list,
    // therefore Push_id 3 will be in the PushState:Init state.
    // Start push_id 3 by receiving a push stream with the push_id 3 and then a push_promise.
    #[test]
    fn push_out_of_order_3() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5);
        send_push_data_and_exchange_packets(&mut client, &mut server, 5, true);
        assert_eq!(client.state(), Http3State::Connected);

        // Read push stream with push_id 5 to make it change to closed state.
        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 5,
                ref_stream_id: request_stream_id,
            }],
            &[5],
            request_stream_id,
        );

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 3);
        send_push_data_and_exchange_packets(&mut client, &mut server, 3, true);

        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 3,
                ref_stream_id: request_stream_id,
            }],
            &[3],
            request_stream_id,
        );
        assert_eq!(client.state(), Http3State::Connected);
    }

    // The next test is for receiving a second PushPromise when Push is in the PushPromise state.
    #[test]
    fn multiple_push_promise() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5);

        // make a second request.
        let request_stream_id_2 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_2, 4);

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5);

        read_response_and_push_events(
            &mut client,
            &[
                PushPromiseInfo {
                    push_id: 5,
                    ref_stream_id: request_stream_id,
                },
                PushPromiseInfo {
                    push_id: 5,
                    ref_stream_id: request_stream_id_2,
                },
            ],
            &[],
            request_stream_id,
        );
        assert_eq!(client.state(), Http3State::Connected);
    }

    // The next test is for receiving a second PushPromise when Push is in the Active state.
    #[test]
    fn multiple_push_promise_active() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5);
        send_push_data_and_exchange_packets(&mut client, &mut server, 5, true);

        // make a second request.
        let request_stream_id_2 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_2, 4);

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5);

        read_response_and_push_events(
            &mut client,
            &[
                PushPromiseInfo {
                    push_id: 5,
                    ref_stream_id: request_stream_id,
                },
                PushPromiseInfo {
                    push_id: 5,
                    ref_stream_id: request_stream_id_2,
                },
            ],
            &[5],
            request_stream_id,
        );
        assert_eq!(client.state(), Http3State::Connected);
    }

    // The next test is for receiving a second PushPromise when the push is already closed.
    // PushPromise will be ignored for the push streams that are consumed.
    #[test]
    fn multiple_push_promise_closed() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 5);
        // Start a push stream with push_id 5.
        send_push_data_and_exchange_packets(&mut client, &mut server, 5, true);

        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 5,
                ref_stream_id: request_stream_id,
            }],
            &[5],
            request_stream_id,
        );

        // make a second request.
        let request_stream_id_2 = make_request(&mut client, false, &[]);
        assert_eq!(request_stream_id_2, 4);

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id_2, 5);

        // Check that we do not have a Http3ClientEvent::PushPromise.
        let push_event = |e| matches!(e, Http3ClientEvent::PushPromise { .. });
        assert!(!client.events().any(push_event));
    }

    // Test that max_push_id is enforced when a push promise frame is received.
    #[test]
    fn exceed_max_push_id_promise() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send a push promise. max_push_id is set to 5, to trigger an error we send push_id=6.
        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 6);

        assert_closed(&client, &Error::HttpId);
    }

    // Test that max_push_id is enforced when a push stream is received.
    #[test]
    fn exceed_max_push_id_push_stream() {
        // Connect and send a request
        let (mut client, mut server) = connect();

        // Send a push stream. max_push_id is set to 5, to trigger an error we send push_id=6.
        send_push_data_and_exchange_packets(&mut client, &mut server, 6, true);

        assert_closed(&client, &Error::HttpId);
    }

    // Test that max_push_id is enforced when a cancel push frame is received.
    #[test]
    fn exceed_max_push_id_cancel_push() {
        // Connect and send a request
        let (mut client, mut server, _request_stream_id) = connect_and_send_request(true);

        // Send CANCEL_PUSH for push_id 6.
        send_cancel_push_and_exchange_packets(&mut client, &mut server, 6);

        assert_closed(&client, &Error::HttpId);
    }

    // Test that max_push_id is enforced when an app calls cancel_push.
    #[test]
    fn exceed_max_push_id_cancel_api() {
        // Connect and send a request
        let (mut client, _, _) = connect_and_send_request(true);

        assert_eq!(client.cancel_push(6), Err(Error::HttpId));
        assert_eq!(client.state(), Http3State::Connected);
    }

    #[test]
    fn max_push_id_frame_update_is_sent() {
        const MAX_PUSH_ID_FRAME: &[u8] = &[0xd, 0x1, 0x8];

        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send 3 push promises.
        send_push_promise(&mut server.conn, request_stream_id, 0);
        send_push_promise(&mut server.conn, request_stream_id, 1);
        send_push_promise(&mut server.conn, request_stream_id, 2);

        // create 3 push streams.
        send_push_data(&mut server.conn, 0, true);
        send_push_data(&mut server.conn, 1, true);
        send_push_data_and_exchange_packets(&mut client, &mut server, 2, true);

        read_response_and_push_events(
            &mut client,
            &[
                PushPromiseInfo {
                    push_id: 0,
                    ref_stream_id: request_stream_id,
                },
                PushPromiseInfo {
                    push_id: 1,
                    ref_stream_id: request_stream_id,
                },
                PushPromiseInfo {
                    push_id: 2,
                    ref_stream_id: request_stream_id,
                },
            ],
            &[0, 1, 2],
            request_stream_id,
        );

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        // Check max_push_id frame has been received
        let control_stream_readable =
            |e| matches!(e, ConnectionEvent::RecvStreamReadable{stream_id: x} if x == 2);
        assert!(server.conn.events().any(control_stream_readable));
        let mut buf = [0_u8; 100];
        let (amount, fin) = server.conn.stream_recv(StreamId::new(2), &mut buf).unwrap();
        assert!(!fin);

        assert_eq!(amount, MAX_PUSH_ID_FRAME.len());
        assert_eq!(&buf[..3], MAX_PUSH_ID_FRAME);

        // Check that we can send push_id=8 now
        send_push_promise(&mut server.conn, request_stream_id, 8);
        send_push_data(&mut server.conn, 8, true);

        let out = server.conn.process_output(now());
        let out = client.process(out.dgram(), now());
        mem::drop(server.conn.process(out.dgram(), now()));

        assert_eq!(client.state(), Http3State::Connected);

        read_response_and_push_events(
            &mut client,
            &[PushPromiseInfo {
                push_id: 8,
                ref_stream_id: request_stream_id,
            }],
            &[8],
            request_stream_id,
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test that 2 push streams with the same push_id are caught.
    #[test]
    fn duplicate_push_stream() {
        // Connect and send a request
        let (mut client, mut server, _request_stream_id) = connect_and_send_request(true);

        // Start a push stream with push_id 0.
        send_push_data_and_exchange_packets(&mut client, &mut server, 0, true);

        // Send it again
        send_push_data_and_exchange_packets(&mut client, &mut server, 0, true);

        assert_closed(&client, &Error::HttpId);
    }

    // Test that 2 push streams with the same push_id are caught.
    #[test]
    fn duplicate_push_stream_active() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise(&mut server.conn, request_stream_id, 0);
        send_push_data_and_exchange_packets(&mut client, &mut server, 0, true);
        // Now the push_stream is in the PushState::Active state

        send_push_data_and_exchange_packets(&mut client, &mut server, 0, true);

        assert_closed(&client, &Error::HttpId);
    }

    fn assert_stop_sending_event(
        server: &mut TestServer,
        push_stream_id: StreamId,
        expected_error: u64,
    ) {
        assert!(server.conn.events().any(|e| matches!(
            e,
            ConnectionEvent::SendStreamStopSending {
                stream_id,
                app_error,
            } if stream_id == push_stream_id && app_error == expected_error
        )));
    }

    // Test CANCEL_PUSH frame: after cancel push any new PUSH_PROMISE or push stream will be
    // ignored.
    #[test]
    fn cancel_push_ignore_promise() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_cancel_push_and_exchange_packets(&mut client, &mut server, 0);

        send_push_promise(&mut server.conn, request_stream_id, 0);
        // Start a push stream with push_id 0.
        let push_stream_id =
            send_push_data_and_exchange_packets(&mut client, &mut server, 0, false);

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));

        // Check that the push has been canceled by the client.
        assert_stop_sending_event(
            &mut server,
            push_stream_id,
            Error::HttpRequestCancelled.code(),
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test CANCEL_PUSH frame: after cancel push any already received PUSH_PROMISE or push stream
    // events will be removed.
    #[test]
    fn cancel_push_removes_push_events() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise(&mut server.conn, request_stream_id, 0);
        let push_stream_id =
            send_push_data_and_exchange_packets(&mut client, &mut server, 0, false);

        send_cancel_push_and_exchange_packets(&mut client, &mut server, 0);

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));

        // Check that the push has been canceled by the client.
        assert_stop_sending_event(
            &mut server,
            push_stream_id,
            Error::HttpRequestCancelled.code(),
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test CANCEL_PUSH frame: after cancel push any already received push stream will be canceled.
    #[test]
    fn cancel_push_frame_after_push_stream() {
        // Connect and send a request
        let (mut client, mut server, _) = connect_and_send_request(true);

        // Start a push stream with push_id 0.
        let push_stream_id =
            send_push_data_and_exchange_packets(&mut client, &mut server, 0, false);

        send_cancel_push_and_exchange_packets(&mut client, &mut server, 0);

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));

        // Check that the push has been canceled by the client.
        assert_stop_sending_event(
            &mut server,
            push_stream_id,
            Error::HttpRequestCancelled.code(),
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test a push stream reset after a new PUSH_PROMISE or/and push stream. The events will be
    // ignored.
    #[test]
    fn cancel_push_stream_after_push_promise_and_push_stream() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise(&mut server.conn, request_stream_id, 0);
        // Start a push stream with push_id 0.
        let push_stream_id =
            send_push_data_and_exchange_packets(&mut client, &mut server, 0, false);

        server
            .conn
            .stream_reset_send(push_stream_id, Error::HttpRequestCancelled.code())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test that a PUSH_PROMISE will be ignored after a push stream reset.
    #[test]
    fn cancel_push_stream_before_push_promise() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Start a push stream with push_id 0.
        let push_stream_id =
            send_push_data_and_exchange_packets(&mut client, &mut server, 0, false);

        server
            .conn
            .stream_reset_send(push_stream_id, Error::HttpRequestCancelled.code())
            .unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0);

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test that push_promise events will be removed after application calls cancel_push.
    #[test]
    fn app_cancel_push_after_push_promise() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0);

        assert!(client.cancel_push(0).is_ok());

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test that push_promise and push data events will be removed after application calls
    // cancel_push.
    #[test]
    fn app_cancel_push_after_push_promise_and_push_stream() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0);
        let push_stream_id =
            send_push_data_and_exchange_packets(&mut client, &mut server, 0, false);

        assert!(client.cancel_push(0).is_ok());
        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));

        // Check that the push has been canceled by the client.
        assert_stop_sending_event(
            &mut server,
            push_stream_id,
            Error::HttpRequestCancelled.code(),
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    // Test that push_promise events will be ignored after application calls cancel_push.
    #[test]
    fn app_cancel_push_before_push_promise() {
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0);
        let push_stream_id =
            send_push_data_and_exchange_packets(&mut client, &mut server, 0, false);

        assert!(client.cancel_push(0).is_ok());
        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        send_push_promise_and_exchange_packets(&mut client, &mut server, request_stream_id, 0);

        // Assert that we do not have any push event.
        assert!(!check_push_events(&mut client));

        // Check that the push has been closed, e.g. calling cancel_push should return
        // InvalidStreamId.
        assert_eq!(client.cancel_push(0), Err(Error::InvalidStreamId));

        // Check that the push has been canceled by the client.
        assert_stop_sending_event(
            &mut server,
            push_stream_id,
            Error::HttpRequestCancelled.code(),
        );

        assert_eq!(client.state(), Http3State::Connected);
    }

    fn setup_server_side_encoder_param(
        client: &mut Http3Client,
        server: &mut TestServer,
        max_blocked_streams: u64,
    ) {
        server
            .encoder
            .borrow_mut()
            .set_max_capacity(max_blocked_streams)
            .unwrap();
        server
            .encoder
            .borrow_mut()
            .set_max_blocked_streams(100)
            .unwrap();
        server
            .encoder
            .borrow_mut()
            .send_encoder_updates(&mut server.conn)
            .unwrap();
        let out = server.conn.process_output(now());
        mem::drop(client.process(out.dgram(), now()));
    }

    fn setup_server_side_encoder(client: &mut Http3Client, server: &mut TestServer) {
        setup_server_side_encoder_param(client, server, 100);
    }

    fn send_push_promise_using_encoder(
        client: &mut Http3Client,
        server: &mut TestServer,
        stream_id: StreamId,
        push_id: u64,
    ) -> Option<Datagram> {
        send_push_promise_using_encoder_with_custom_headers(
            client,
            server,
            stream_id,
            push_id,
            Header::new("my-header", "my-value"),
        )
    }

    fn send_push_promise_using_encoder_with_custom_headers(
        client: &mut Http3Client,
        server: &mut TestServer,
        stream_id: StreamId,
        push_id: u64,
        additional_header: Header,
    ) -> Option<Datagram> {
        let mut headers = vec![
            Header::new(":method", "GET"),
            Header::new(":scheme", "https"),
            Header::new(":authority", "something.com"),
            Header::new(":path", "/"),
            Header::new("content-length", "3"),
        ];
        headers.push(additional_header);

        let encoded_headers =
            server
                .encoder
                .borrow_mut()
                .encode_header_block(&mut server.conn, &headers, stream_id);
        let push_promise_frame = HFrame::PushPromise {
            push_id,
            header_block: encoded_headers.to_vec(),
        };

        // Send the encoder instructions, but delay them so that the stream is blocked on decoding
        // headers.
        let encoder_inst_pkt = server.conn.process_output(now()).dgram();
        assert!(encoder_inst_pkt.is_some());

        let mut d = Encoder::default();
        push_promise_frame.encode(&mut d);
        server_send_response_and_exchange_packet(client, server, stream_id, &d, false);

        encoder_inst_pkt
    }

    #[test]
    fn push_promise_header_decoder_block() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let encoder_inst_pkt =
            send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0);

        // PushPromise is blocked wathing for encoder instructions.
        assert!(!check_push_events(&mut client));

        // Let client receive the encoder instructions.
        let _out = client.process(encoder_inst_pkt, now());

        // PushPromise is blocked wathing for encoder instructions.
        assert!(check_push_events(&mut client));
    }

    // If PushPromise is blocked, stream data can still be received.
    #[test]
    fn push_promise_blocked_but_stream_is_not_blocked() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        // Send response headers
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_HEADER_ONLY_1,
            false,
        );

        let encoder_inst_pkt =
            send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0);

        // PushPromise is blocked wathing for encoder instructions.
        assert!(!check_push_events(&mut client));

        // Stream data can be still read
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_DATA_FRAME_1_ONLY_1,
            false,
        );

        assert!(check_data_readable(&mut client));

        // Let client receive the encoder instructions.
        let _out = client.process(encoder_inst_pkt, now());

        // PushPromise is blocked wathing for encoder instructions.
        assert!(check_push_events(&mut client));

        // Stream data can be still read
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_DATA_FRAME_2_ONLY_1,
            false,
        );

        assert!(check_data_readable(&mut client));
    }

    // The response Headers are not block if they do not refer the dynamic table.
    #[test]
    fn push_promise_does_not_block_headers() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let encoder_inst_pkt =
            send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0);

        // PushPromise is blocked wathing for encoder instructions.
        assert!(!check_push_events(&mut client));

        // Send response headers
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_HEADER_ONLY_1,
            false,
        );

        assert!(check_header_ready(&mut client));

        // Let client receive the encoder instructions.
        let _out = client.process(encoder_inst_pkt, now());

        // PushPromise is blocked wathing for encoder instructions.
        assert!(check_push_events(&mut client));
    }

    // The response Headers are blocked if they refer a dynamic table entry.
    #[test]
    fn push_promise_block_headers() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        // Insert an elemet into a dynamic table.
        // insert "content-length: 1234
        server
            .encoder
            .borrow_mut()
            .send_and_insert(&mut server.conn, b"content-length", b"1234")
            .unwrap();
        let encoder_inst_pkt1 = server.conn.process_output(now()).dgram();
        let _out = client.process(encoder_inst_pkt1, now());

        // Send a PushPromise that is blocked until encoder_inst_pkt2 is process by the client.
        let encoder_inst_pkt2 =
            send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0);

        // PushPromise is blocked wathing for encoder instructions.
        assert!(!check_push_events(&mut client));

        let response_headers = vec![
            Header::new(":status", "200"),
            Header::new("content-length", "1234"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &response_headers,
            request_stream_id,
        );
        let header_hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };
        let mut d = Encoder::default();
        header_hframe.encode(&mut d);
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            false,
        );

        // The response headers are blocked.
        assert!(!check_header_ready(&mut client));

        // Let client receive the encoder instructions.
        let _out = client.process(encoder_inst_pkt2, now());

        // The response headers are blocked.
        assert!(check_header_ready_and_push_promise(&mut client));
    }

    // In this test there are 2 push promises that are blocked and the response header is
    // blocked as well. After a packet is received only the first push promises is unblocked.
    #[test]
    fn two_push_promises_and_header_block() {
        let mut client = default_http3_client_param(200);
        let mut server = TestServer::new_with_settings(&[
            HSetting::new(HSettingType::MaxTableCapacity, 200),
            HSetting::new(HSettingType::BlockedStreams, 100),
            HSetting::new(HSettingType::MaxHeaderListSize, 10000),
        ]);
        connect_only_transport_with(&mut client, &mut server);
        server.create_control_stream();
        server.create_qpack_streams();
        setup_server_side_encoder_param(&mut client, &mut server, 200);

        let request_stream_id = make_request_and_exchange_pkts(&mut client, &mut server, true);

        // Send a PushPromise that is blocked until encoder_inst_pkt2 is process by the client.
        let encoder_inst_pkt1 = send_push_promise_using_encoder_with_custom_headers(
            &mut client,
            &mut server,
            request_stream_id,
            0,
            Header::new("myn1", "myv1"),
        );

        // PushPromise is blocked wathing for encoder instructions.
        assert!(!check_push_events(&mut client));

        let encoder_inst_pkt2 = send_push_promise_using_encoder_with_custom_headers(
            &mut client,
            &mut server,
            request_stream_id,
            1,
            Header::new("myn2", "myv2"),
        );

        // PushPromise is blocked wathing for encoder instructions.
        assert!(!check_push_events(&mut client));

        let response_headers = vec![
            Header::new(":status", "200"),
            Header::new("content-length", "1234"),
            Header::new("myn3", "myv3"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &response_headers,
            request_stream_id,
        );
        let header_hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };
        let mut d = Encoder::default();
        header_hframe.encode(&mut d);
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            false,
        );

        // The response headers are blocked.
        assert!(!check_header_ready(&mut client));

        // Let client receive the encoder instructions.
        let _out = client.process(encoder_inst_pkt1, now());

        assert!(check_push_events(&mut client));

        // Let client receive the encoder instructions.
        let _out = client.process(encoder_inst_pkt2, now());

        assert!(check_header_ready_and_push_promise(&mut client));
    }

    // The PushPromise blocked on header decoding will be canceled if the stream is closed.
    #[test]
    fn blocked_push_promises_canceled() {
        const STREAM_CANCELED_ID_0: &[u8] = &[0x40];

        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        mem::drop(
            send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0)
                .unwrap(),
        );

        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_1,
            true,
        );

        // Read response that will make stream change to closed state.
        assert!(check_header_ready(&mut client));
        let mut buf = [0_u8; 100];
        _ = client
            .read_data(now(), request_stream_id, &mut buf)
            .unwrap();

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));
        // Check that encoder got stream_canceled instruction.
        let mut inst = [0_u8; 100];
        let (amount, fin) = server
            .conn
            .stream_recv(CLIENT_SIDE_DECODER_STREAM_ID, &mut inst)
            .unwrap();
        assert!(!fin);
        assert_eq!(amount, STREAM_CANCELED_ID_0.len());
        assert_eq!(&inst[..amount], STREAM_CANCELED_ID_0);
    }

    #[test]
    fn data_readable_in_decoder_blocked_state() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let headers = vec![
            Header::new(":status", "200"),
            Header::new("my-header", "my-header"),
            Header::new("content-length", "0"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &headers,
            request_stream_id,
        );
        let hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };

        // Delay encoder instruction so that the stream will be blocked.
        let encoder_insts = server.conn.process_output(now());

        // Send response headers.
        let mut d = Encoder::default();
        hframe.encode(&mut d);
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            false,
        );

        // Headers are blocked waiting fro the encoder instructions.
        let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        assert!(!client.events().any(header_ready_event));

        // Now send data frame. This will trigger DataRead event.
        let mut d = Encoder::default();
        hframe.encode(&mut d);
        let d_frame = HFrame::Data { len: 0 };
        d_frame.encode(&mut d);
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            true,
        );

        // Now read headers.
        mem::drop(client.process(encoder_insts.dgram(), now()));
    }

    #[test]
    fn qpack_stream_reset() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);
        setup_server_side_encoder(&mut client, &mut server);
        // Cancel request.
        mem::drop(client.cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code()));
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0);
        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));
        mem::drop(server.encoder_receiver.receive(&mut server.conn));
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1);
    }

    fn send_headers_using_encoder(
        client: &mut Http3Client,
        server: &mut TestServer,
        request_stream_id: StreamId,
        headers: &[Header],
        data: &[u8],
    ) -> Option<Datagram> {
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            headers,
            request_stream_id,
        );
        let hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };

        let out = server.conn.process_output(now());

        // Send response
        let mut d = Encoder::default();
        hframe.encode(&mut d);
        let d_frame = HFrame::Data {
            len: u64::try_from(data.len()).unwrap(),
        };
        d_frame.encode(&mut d);
        d.encode(data);
        server_send_response_and_exchange_packet(client, server, request_stream_id, &d, true);

        out.dgram()
    }

    #[test]
    fn qpack_stream_reset_recv() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);
        setup_server_side_encoder(&mut client, &mut server);

        // Cancel request.
        server
            .conn
            .stream_reset_send(request_stream_id, Error::HttpRequestCancelled.code())
            .unwrap();
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0);
        let out = server.conn.process_output(now());
        let out = client.process(out.dgram(), now());
        mem::drop(server.conn.process(out.dgram(), now()));
        mem::drop(server.encoder_receiver.receive(&mut server.conn));
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1);
    }

    #[test]
    fn qpack_stream_reset_during_header_qpack_blocked() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        mem::drop(
            send_headers_using_encoder(
                &mut client,
                &mut server,
                request_stream_id,
                &[
                    Header::new(":status", "200"),
                    Header::new("my-header", "my-header"),
                    Header::new("content-length", "3"),
                ],
                &[0x61, 0x62, 0x63],
            )
            .unwrap(),
        );

        let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        assert!(!client.events().any(header_ready_event));

        // Cancel request.
        client
            .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code())
            .unwrap();

        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0);
        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));
        mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap());
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1);
    }

    #[test]
    fn qpack_no_stream_cancelled_after_fin() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let encoder_instruct = send_headers_using_encoder(
            &mut client,
            &mut server,
            request_stream_id,
            &[
                Header::new(":status", "200"),
                Header::new("my-header", "my-header"),
                Header::new("content-length", "3"),
            ],
            &[],
        );

        // Exchange encoder instructions
        mem::drop(client.process(encoder_instruct, now()));

        let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        assert!(client.events().any(header_ready_event));
        // After this the recv_stream is in ClosePending state

        // Cancel request.
        client
            .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code())
            .unwrap();

        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0);
        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));
        mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap());
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0);
    }

    #[test]
    fn qpack_stream_reset_push_promise_header_decoder_block() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let headers = vec![
            Header::new(":status", "200"),
            Header::new("content-length", "3"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &headers,
            request_stream_id,
        );
        let hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };

        // Send the encoder instructions.
        let out = server.conn.process_output(now());
        mem::drop(client.process(out.dgram(), now()));

        // Send PushPromise that will be blocked waiting for decoder instructions.
        mem::drop(
            send_push_promise_using_encoder(&mut client, &mut server, request_stream_id, 0)
                .unwrap(),
        );

        // Send response
        let mut d = Encoder::default();
        hframe.encode(&mut d);
        let d_frame = HFrame::Data { len: 0 };
        d_frame.encode(&mut d);
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            true,
        );

        let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        assert!(client.events().any(header_ready_event));

        // Cancel request.
        client
            .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code())
            .unwrap();

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));
        mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap());
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 1);
    }

    #[test]
    fn qpack_stream_reset_dynamic_table_zero() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);
        // Cancel request.
        client
            .cancel_fetch(request_stream_id, Error::HttpRequestCancelled.code())
            .unwrap();
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0);
        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));
        mem::drop(server.encoder_receiver.receive(&mut server.conn).unwrap());
        assert_eq!(server.encoder.borrow_mut().stats().stream_cancelled_recv, 0);
    }

    #[test]
    fn multiple_streams_in_decoder_blocked_state() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let headers = vec![
            Header::new(":status", "200"),
            Header::new("my-header", "my-header"),
            Header::new("content-length", "0"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &headers,
            request_stream_id,
        );
        let hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };

        // Delay encoder instruction so that the stream will be blocked.
        let encoder_insts = server.conn.process_output(now());

        // Send response headers.
        let mut d = Encoder::default();
        hframe.encode(&mut d);
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            true,
        );

        // Headers are blocked waiting for the encoder instructions.
        let header_ready_event = |e| matches!(e, Http3ClientEvent::HeaderReady { .. });
        assert!(!client.events().any(header_ready_event));

        // Make another request.
        let request2 = make_request_and_exchange_pkts(&mut client, &mut server, true);
        // Send response headers.
        server_send_response_and_exchange_packet(&mut client, &mut server, request2, &d, true);

        // Headers on the second request are blocked as well are blocked
        // waiting for the encoder instructions.
        assert!(!client.events().any(header_ready_event));

        // Now make the encoder instructions available.
        mem::drop(client.process(encoder_insts.dgram(), now()));

        // Header blocks for both streams should be ready.
        let mut count_responses = 0;
        while let Some(e) = client.next_event() {
            if let Http3ClientEvent::HeaderReady { stream_id, .. } = e {
                assert!((stream_id == request_stream_id) || (stream_id == request2));
                count_responses += 1;
            }
        }
        assert_eq!(count_responses, 2);
    }

    #[test]
    fn reserved_frames() {
        for f in H3_RESERVED_FRAME_TYPES {
            let mut enc = Encoder::default();
            enc.encode_varint(*f);
            test_wrong_frame_on_control_stream(enc.as_ref());
            test_wrong_frame_on_push_stream(enc.as_ref());
            test_wrong_frame_on_request_stream(enc.as_ref());
        }
    }

    #[test]
    fn send_reserved_settings() {
        for s in H3_RESERVED_SETTINGS {
            let (mut client, mut server) = connect_only_transport();
            let control_stream = server.conn.stream_create(StreamType::UniDi).unwrap();
            // Send the control stream type(0x0).
            _ = server
                .conn
                .stream_send(control_stream, CONTROL_STREAM_TYPE)
                .unwrap();
            // Create a settings frame of length 2.
            let mut enc = Encoder::default();
            enc.encode_varint(H3_FRAME_TYPE_SETTINGS);
            enc.encode_varint(2_u64);
            // The settings frame contains a reserved settings type and some value (0x1).
            enc.encode_varint(*s);
            enc.encode_varint(1_u64);
            let sent = server.conn.stream_send(control_stream, enc.as_ref());
            assert_eq!(sent, Ok(4));
            let out = server.conn.process_output(now());
            client.process(out.dgram(), now());
            assert_closed(&client, &Error::HttpSettings);
        }
    }

    #[test]
    fn response_w_1xx() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let mut d = Encoder::default();
        let headers1xx: &[Header] = &[Header::new(":status", "103")];
        server.encode_headers(request_stream_id, headers1xx, &mut d);

        let headers200: &[Header] = &[
            Header::new(":status", "200"),
            Header::new("my-header", "my-header"),
            Header::new("content-length", "3"),
        ];
        server.encode_headers(request_stream_id, headers200, &mut d);

        // Send 1xx and 200 headers response.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            false,
        );

        // Sending response data.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_DATA_FRAME_ONLY_2,
            true,
        );

        let mut events = client.events().filter_map(|e| {
            if let Http3ClientEvent::HeaderReady {
                stream_id,
                interim,
                headers,
                ..
            } = e
            {
                Some((stream_id, interim, headers))
            } else {
                None
            }
        });
        let (stream_id_1xx_rec, interim1xx_rec, headers1xx_rec) = events.next().unwrap();
        assert_eq!(
            (stream_id_1xx_rec, interim1xx_rec, headers1xx_rec.as_ref()),
            (request_stream_id, true, headers1xx)
        );

        let (stream_id_200_rec, interim200_rec, headers200_rec) = events.next().unwrap();
        assert_eq!(
            (stream_id_200_rec, interim200_rec, headers200_rec.as_ref()),
            (request_stream_id, false, headers200)
        );
        assert!(events.next().is_none());
    }

    #[test]
    fn response_wo_status() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let mut d = Encoder::default();
        let headers = vec![
            Header::new("my-header", "my-header"),
            Header::new("content-length", "3"),
        ];
        server.encode_headers(request_stream_id, &headers, &mut d);

        // Send response
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            false,
        );

        // Stream has been reset because of the malformed headers.
        let e = client.events().next().unwrap();
        assert_eq!(
            e,
            Http3ClientEvent::Reset {
                stream_id: request_stream_id,
                error: Error::InvalidHeader.code(),
                local: true,
            }
        );

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        // Check that server has received a reset.
        let stop_sending_event = |e| {
            matches!(e, ConnectionEvent::SendStreamStopSending {
            stream_id,
            app_error
        } if stream_id == request_stream_id && app_error == Error::InvalidHeader.code())
        };
        assert!(server.conn.events().any(stop_sending_event));

        // Stream should now be closed and gone
        let mut buf = [0_u8; 100];
        assert_eq!(
            client.read_data(now(), StreamId::new(0), &mut buf),
            Err(Error::InvalidStreamId)
        );
    }

    // Client: receive a push stream
    #[test]
    fn push_single_with_1xx() {
        const FIRST_PUSH_ID: u64 = 0;
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send a push promise.
        send_push_promise(&mut server.conn, request_stream_id, FIRST_PUSH_ID);
        // Create a push stream
        let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap();

        let mut d = Encoder::default();
        let headers1xx: &[Header] = &[Header::new(":status", "100")];
        server.encode_headers(push_stream_id, headers1xx, &mut d);

        let headers200: &[Header] = &[
            Header::new(":status", "200"),
            Header::new("my-header", "my-header"),
            Header::new("content-length", "3"),
        ];
        server.encode_headers(push_stream_id, headers200, &mut d);

        // create a push stream.
        send_data_on_push(
            &mut server.conn,
            push_stream_id,
            u8::try_from(FIRST_PUSH_ID).unwrap(),
            &d,
            true,
        );

        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            true,
        );

        let mut events = client.events().filter_map(|e| {
            if let Http3ClientEvent::PushHeaderReady {
                push_id,
                interim,
                headers,
                ..
            } = e
            {
                Some((push_id, interim, headers))
            } else {
                None
            }
        });

        let (push_id_1xx_rec, interim1xx_rec, headers1xx_rec) = events.next().unwrap();
        assert_eq!(
            (push_id_1xx_rec, interim1xx_rec, headers1xx_rec.as_ref()),
            (FIRST_PUSH_ID, true, headers1xx)
        );

        let (push_id_200_rec, interim200_rec, headers200_rec) = events.next().unwrap();
        assert_eq!(
            (push_id_200_rec, interim200_rec, headers200_rec.as_ref()),
            (FIRST_PUSH_ID, false, headers200)
        );
        assert!(events.next().is_none());
    }

    // Client: receive a push stream
    #[test]
    fn push_single_wo_status() {
        const FIRST_PUSH_ID: u64 = 0;
        // Connect and send a request
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        // Send a push promise.
        send_push_promise(&mut server.conn, request_stream_id, FIRST_PUSH_ID);
        // Create a push stream
        let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap();

        let mut d = Encoder::default();
        let headers = vec![
            Header::new("my-header", "my-header"),
            Header::new("content-length", "3"),
        ];
        server.encode_headers(request_stream_id, &headers, &mut d);

        send_data_on_push(
            &mut server.conn,
            push_stream_id,
            u8::try_from(FIRST_PUSH_ID).unwrap(),
            &d,
            false,
        );

        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            HTTP_RESPONSE_2,
            true,
        );

        // Stream has been reset because of thei malformed headers.
        let push_reset_event = |e| {
            matches!(e, Http3ClientEvent::PushReset {
            push_id,
            error,
        } if push_id == FIRST_PUSH_ID && error == Error::InvalidHeader.code())
        };

        assert!(client.events().any(push_reset_event));

        let out = client.process_output(now());
        mem::drop(server.conn.process(out.dgram(), now()));

        // Check that server has received a reset.
        let stop_sending_event = |e| {
            matches!(e, ConnectionEvent::SendStreamStopSending {
            stream_id,
            app_error
        } if stream_id == push_stream_id && app_error == Error::InvalidHeader.code())
        };
        assert!(server.conn.events().any(stop_sending_event));
    }

    fn handshake_client_error(client: &mut Http3Client, server: &mut TestServer, error: &Error)&nbsp;{
        let out = handshake_only(client, server);
        client.process(out.dgram(), now());
        assert_closed(client, error);
    }

    /// Client fails to create a control stream, since server does not allow it.
    #[test]
    fn client_control_stream_create_failed() {
        let mut client = default_http3_client();
        let mut server = TestServer::new_with_conn(new_server(
            DEFAULT_ALPN_H3,
            ConnectionParameters::default().max_streams(StreamType::UniDi, 0),
        ));
        handshake_client_error(&mut client, &mut server, &Error::StreamLimitError);
    }

    /// 2 streams isn't enough for control and QPACK streams.
    #[test]
    fn client_qpack_stream_create_failed() {
        let mut client = default_http3_client();
        let mut server = TestServer::new_with_conn(new_server(
            DEFAULT_ALPN_H3,
            ConnectionParameters::default().max_streams(StreamType::UniDi, 2),
        ));
        handshake_client_error(&mut client, &mut server, &Error::StreamLimitError);
    }

    fn do_malformed_response_test(headers: &[Header]) {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let mut d = Encoder::default();
        server.encode_headers(request_stream_id, headers, &mut d);

        // Send response
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            false,
        );

        // Stream has been reset because of the malformed headers.
        let e = client.events().next().unwrap();
        assert_eq!(
            e,
            Http3ClientEvent::Reset {
                stream_id: request_stream_id,
                error: Error::InvalidHeader.code(),
                local: true,
            }
        );
    }

    #[test]
    fn malformed_response_pseudo_header_after_regular_header() {
        do_malformed_response_test(&[
            Header::new("content-type", "text/plain"),
            Header::new(":status", "100"),
        ]);
    }

    #[test]
    fn malformed_response_undefined_pseudo_header() {
        do_malformed_response_test(&[Header::new(":status", "200"), Header::new(":cheese", "200")]);
    }

    #[test]
    fn malformed_response_duplicate_pseudo_header() {
        do_malformed_response_test(&[
            Header::new(":status", "200"),
            Header::new(":status", "100"),
            Header::new("content-type", "text/plain"),
        ]);
    }

    #[test]
    fn malformed_response_uppercase_header() {
        do_malformed_response_test(&[
            Header::new(":status", "200"),
            Header::new("content-Type", "text/plain"),
        ]);
    }

    #[test]
    fn malformed_response_excluded_header() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let mut d = Encoder::default();
        server.encode_headers(
            request_stream_id,
            &[
                Header::new(":status", "200"),
                Header::new("content-type", "text/plain"),
                Header::new("connection", "close"),
            ],
            &mut d,
        );

        // Send response
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            false,
        );

        // Stream has been reset because of the malformed headers.
        let e = client.events().next().unwrap();
        assert_eq!(
            e,
            Http3ClientEvent::HeaderReady {
                stream_id: request_stream_id,
                headers: vec![
                    Header::new(":status", "200"),
                    Header::new("content-type", "text/plain")
                ],
                interim: false,
                fin: false,
            }
        );
    }

    #[test]
    fn malformed_response_excluded_byte_in_header() {
        do_malformed_response_test(&[
            Header::new(":status", "200"),
            Header::new("content:type", "text/plain"),
        ]);
    }

    #[test]
    fn malformed_response_request_header_in_response() {
        do_malformed_response_test(&[
            Header::new(":status", "200"),
            Header::new(":method", "GET"),
            Header::new("content-type", "text/plain"),
        ]);
    }

    fn maybe_authenticate(conn: &mut Http3Client) {
        let authentication_needed = |e| matches!(e, Http3ClientEvent::AuthenticationNeeded);
        if conn.events().any(authentication_needed) {
            conn.authenticated(AuthenticationStatus::Ok, now());
        }
    }

    const MAX_TABLE_SIZE: u64 = 65536;
    const MAX_BLOCKED_STREAMS: u16 = 5;

    fn get_resumption_token(server: &mut Http3Server) -> ResumptionToken {
        let mut client = default_http3_client_param(MAX_TABLE_SIZE);

        let mut datagram = None;
        let is_done = |c: &Http3Client| matches!(c.state(), Http3State::Connected);
        while !is_done(&mut client) {
            maybe_authenticate(&mut client);
            datagram = client.process(datagram, now()).dgram();
            datagram = server.process(datagram, now()).dgram();
        }

        // exchange qpack settings, server will send a token as well.
        datagram = client.process(datagram, now()).dgram();
        datagram = server.process(datagram, now()).dgram();
        mem::drop(client.process(datagram, now()).dgram());

        client
            .events()
            .find_map(|e| {
                if let Http3ClientEvent::ResumptionToken(token) = e {
                    Some(token)
                } else {
                    None
                }
            })
            .unwrap()
    }

    // Test that decoder stream type is always sent before any other instruction also
    // in case when 0RTT is used.
    // A client will send a request that uses the dynamic table. This will trigger a header-ack
    // from a server. We will use stats to check that a header-ack has been received.
    #[test]
    fn zerortt_request_use_dynamic_table() {
        let mut server = Http3Server::new(
            now(),
            DEFAULT_KEYS,
            DEFAULT_ALPN_H3,
            anti_replay(),
            Rc::new(RefCell::new(CountingConnectionIdGenerator::default())),
            Http3Parameters::default()
                .max_table_size_encoder(MAX_TABLE_SIZE)
                .max_table_size_decoder(MAX_TABLE_SIZE)
                .max_blocked_streams(MAX_BLOCKED_STREAMS),
            None,
        )
        .unwrap();

        let token = get_resumption_token(&mut server);
        // Make a new connection.
        let mut client = default_http3_client_param(MAX_TABLE_SIZE);
        assert_eq!(client.state(), Http3State::Initializing);
        client
            .enable_resumption(now(), &token)
            .expect("Set resumption token.");

        assert_eq!(client.state(), Http3State::ZeroRtt);
        let zerortt_event = |e| matches!(e, Http3ClientEvent::StateChange(Http3State::ZeroRtt));
        assert!(client.events().any(zerortt_event));

        // Make a request that uses the dynamic table.
        _ = make_request(&mut client, true, &[Header::new("myheaders", "myvalue")]);
        // Assert that the request has used dynamic table. That will trigger a header_ack.
        assert_eq!(client.qpack_encoder_stats().dynamic_table_references, 1);

        // Exchange packets until header-ack is received.
        // These many packet exchange is needed, to get a header-ack.
        // TODO this may be optimize at Http3Server.
        let out = client.process_output(now());
        let out = server.process(out.dgram(), now());
        let out = client.process(out.dgram(), now());
        let out = server.process(out.dgram(), now());
        let out = client.process(out.dgram(), now());
        let out = server.process(out.dgram(), now());
        let out = client.process(out.dgram(), now());
        let out = server.process(out.dgram(), now());
        mem::drop(client.process(out.dgram(), now()));

        // The header ack for the first request has been received.
        assert_eq!(client.qpack_encoder_stats().header_acks_recv, 1);
    }

    fn manipulate_conrol_stream(client: &mut Http3Client, stream_id: StreamId) {
        assert_eq!(
            client
                .cancel_fetch(stream_id, Error::HttpNoError.code())
                .unwrap_err(),
            Error::InvalidStreamId
        );
        assert_eq!(
            client.stream_close_send(stream_id).unwrap_err(),
            Error::InvalidStreamId
        );
        let mut buf = [0; 2];
        assert_eq!(
            client.send_data(stream_id, &buf).unwrap_err(),
            Error::InvalidStreamId
        );
        assert_eq!(
            client.read_data(now(), stream_id, &mut buf).unwrap_err(),
            Error::InvalidStreamId
        );
    }

    #[test]
    fn manipulate_conrol_streams() {
        let (mut client, server, request_stream_id) = connect_and_send_request(false);
        manipulate_conrol_stream(&mut client, CLIENT_SIDE_CONTROL_STREAM_ID);
        manipulate_conrol_stream(&mut client, CLIENT_SIDE_ENCODER_STREAM_ID);
        manipulate_conrol_stream(&mut client, CLIENT_SIDE_DECODER_STREAM_ID);
        manipulate_conrol_stream(&mut client, server.control_stream_id.unwrap());
        manipulate_conrol_stream(&mut client, server.encoder_stream_id.unwrap());
        manipulate_conrol_stream(&mut client, server.decoder_stream_id.unwrap());
        client
            .cancel_fetch(request_stream_id, Error::HttpNoError.code())
            .unwrap();
    }

    // Client: receive a push stream
    #[test]
    fn incomple_push_stream() {
        let (mut client, mut server) = connect();

        // Create a push stream
        let push_stream_id = server.conn.stream_create(StreamType::UniDi).unwrap();
        _ = server
            .conn
            .stream_send(push_stream_id, PUSH_STREAM_TYPE)
            .unwrap();
        _ = server.conn.stream_send(push_stream_id, &[0]).unwrap();
        server.conn.stream_close_send(push_stream_id).unwrap();
        let out = server.conn.process_output(now());
        client.process(out.dgram(), now());
        assert_closed(&client, &Error::HttpGeneralProtocol);
    }

    #[test]
    fn priority_update_during_full_buffer() {
        // set a lower MAX_DATA on the server side to restrict the data the client can send
        let (mut client, mut server) = connect_with_connection_parameters(
            ConnectionParameters::default().max_data(MIN_INITIAL_PACKET_SIZE.try_into().unwrap()),
        );

        let request_stream_id = make_request_and_exchange_pkts(&mut client, &mut server, false);
        let data_writable = |e| matches!(e, Http3ClientEvent::DataWritable { .. });
        assert!(client.events().any(data_writable));
        // Send a lot of data to reach the flow control limit
        client.send_data(request_stream_id, &[0; 2000]).unwrap();

        // now queue a priority_update packet for that stream
        assert!(client
            .priority_update(request_stream_id, Priority::new(6, false))
            .unwrap());

        let md_before = server.conn.stats().frame_tx.max_data;

        // sending the http request and most most of the request data
        let out = client.process_output(now());
        let out = server.conn.process(out.dgram(), now());

        // the server responses with an ack, but the max_data didn't change
        assert_eq!(md_before, server.conn.stats().frame_tx.max_data);

        let out = client.process(out.dgram(), now());
        let out = server.conn.process(out.dgram(), now());

        // the server increased the max_data during the second read if that isn't the case
        // in the future and therefore this asserts fails, the request data on stream 0 could be
        // read to cause a max_update frame
        assert_eq!(md_before + 1, server.conn.stats().frame_tx.max_data);

        // make sure that the server didn't receive a priority_update on client control stream
        // (stream_id 2) yet
        let mut buf = [0; 32];
        assert_eq!(
            server.conn.stream_recv(StreamId::new(2), &mut buf),
            Ok((0, false))
        );

        // the client now sends the priority update
        let out = client.process(out.dgram(), now());
        server.conn.process_input(out.dgram().unwrap(), now());

        // check that the priority_update arrived at the client control stream
        let num_read = server.conn.stream_recv(StreamId::new(2), &mut buf).unwrap();
        assert_eq!(b"\x80\x0f\x07\x00\x04\x00\x75\x3d\x36", &buf[0..num_read.0]);
    }

    #[test]
    fn error_request_stream() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let headers = vec![
            Header::new(":status", "200"),
            Header::new(":method", "GET"), // <- invalid
            Header::new("my-header", "my-header"),
            Header::new("content-length", "3"),
        ];
        let encoded_headers = server.encoder.borrow_mut().encode_header_block(
            &mut server.conn,
            &headers,
            request_stream_id,
        );
        let hframe = HFrame::Headers {
            header_block: encoded_headers.to_vec(),
        };

        // Send the encoder instructions, but delay them so that the stream is blocked on decoding
        // headers.
        let encoder_inst_pkt = server.conn.process_output(now());

        // Send response
        let mut d = Encoder::default();
        hframe.encode(&mut d);
        let d_frame = HFrame::Data { len: 3 };
        d_frame.encode(&mut d);
        d.encode(b"abc");
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            true,
        );

        // Let client receive the encoder instructions.
        client.process_input(encoder_inst_pkt.dgram().unwrap(), now());

        let reset_event = |e| matches!(e, Http3ClientEvent::Reset { stream_id, .. } if stream_id == request_stream_id);
        assert!(client.events().any(reset_event));
    }

    #[test]
    fn response_w_101() {
        let (mut client, mut server, request_stream_id) = connect_and_send_request(true);

        setup_server_side_encoder(&mut client, &mut server);

        let mut d = Encoder::default();
        let headers1xx = &[Header::new(":status", "101")];
        server.encode_headers(request_stream_id, headers1xx, &mut d);

        // Send 101 response.
        server_send_response_and_exchange_packet(
            &mut client,
            &mut server,
            request_stream_id,
            &d,
            false,
        );

        // Stream has been reset because of the 101 response.
        let e = client.events().next().unwrap();
        assert_eq!(
            e,
            Http3ClientEvent::Reset {
                stream_id: request_stream_id,
                error: Error::InvalidHeader.code(),
                local: true,
            }
        );
    }
}

[Dauer der Verarbeitung: 0.222 Sekunden, vorverarbeitet 2026-04-27]