Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/neqo-transport/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 76 kB image not shown  

Quellcode-Bibliothek recv_stream.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

// Building a stream of ordered bytes to give the application from a series of
// incoming STREAM frames.

use std::{
    cell::RefCell,
    cmp::max,
    collections::BTreeMap,
    mem,
    rc::{Rc, Weak},
};

use neqo_common::{qtrace, Role};
use smallvec::SmallVec;

use crate::{
    events::ConnectionEvents,
    fc::ReceiverFlowControl,
    frame::FRAME_TYPE_STOP_SENDING,
    packet::PacketBuilder,
    recovery::{RecoveryToken, StreamRecoveryToken},
    send_stream::SendStreams,
    stats::FrameStats,
    stream_id::StreamId,
    AppError, Error, Res,
};

const RX_STREAM_DATA_WINDOW: u64 = 0x10_0000; // 1MiB

// Export as usize for consistency with SEND_BUFFER_SIZE
#[allow(clippy::cast_possible_truncation)] // Yeah, nope.
pub const RECV_BUFFER_SIZE: usize = RX_STREAM_DATA_WINDOW as usize;

#[derive(Debug, Default)]
pub struct RecvStreams {
    streams: BTreeMap<StreamId, RecvStream>,
    keep_alive: Weak<()>,
}

impl RecvStreams {
    pub fn write_frames(
        &mut self,
        builder: &mut PacketBuilder,
        tokens: &mut Vec<RecoveryToken>,
        stats: &mut FrameStats,
    ) {
        for stream in self.streams.values_mut() {
            stream.write_frame(builder, tokens, stats);
            if builder.is_full() {
                return;
            }
        }
    }

    pub fn insert(&mut self, id: StreamId, stream: RecvStream) {
        self.streams.insert(id, stream);
    }

    #[allow(clippy::missing_errors_doc)]
    pub fn get_mut(&mut self, id: StreamId) -> Res<&mut RecvStream> {
        self.streams.get_mut(&id).ok_or(Error::InvalidStreamId)
    }

    #[allow(clippy::missing_errors_doc)]
    pub fn keep_alive(&mut self, id: StreamId, k: bool) -> Res<()> {
        let self_ka = &mut self.keep_alive;
        let s = self.streams.get_mut(&id).ok_or(Error::InvalidStreamId)?;
        s.keep_alive = if k {
            Some(self_ka.upgrade().unwrap_or_else(|| {
                let r = Rc::new(());
                *self_ka = Rc::downgrade(&r);
                r
            }))
        } else {
            None
        };
        Ok(())
    }

    #[must_use]
    pub fn need_keep_alive(&self) -> bool {
        self.keep_alive.strong_count() > 0
    }

    pub fn clear(&mut self) {
        self.streams.clear();
    }

    pub fn clear_terminal(&mut self, send_streams: &SendStreams, role: Role) -> (u64, u64) {
        let mut removed_bidi = 0;
        let mut removed_uni = 0;
        self.streams.retain(|id, s| {
            let dead = s.is_terminal() && (id.is_uni() || !send_streams.exists(*id));
            if dead && id.is_remote_initiated(role) {
                if id.is_bidi() {
                    removed_bidi += 1;
                } else {
                    removed_uni += 1;
                }
            }
            !dead
        });

        (removed_bidi, removed_uni)
    }
}

/// Holds data not yet read by application. Orders and dedupes data ranges
/// from incoming STREAM frames.
#[derive(Debug, Default)]
pub struct RxStreamOrderer {
    data_ranges: BTreeMap<u64, Vec<u8>>, // (start_offset, data)
    retired: u64,                        // Number of bytes the application has read
    received: u64,                       // The number of bytes has stored in `data_ranges`
}

impl RxStreamOrderer {
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Process an incoming stream frame off the wire. This may result in data
    /// being available to upper layers if frame is not out of order (ooo) or
    /// if the frame fills a gap.
    /// # Panics
    /// Only when `u64` values cannot be converted to `usize`, which only
    /// happens on 32-bit machines that hold far too much data at the same time.
    pub fn inbound_frame(&mut self, mut new_start: u64, mut new_data: &[u8]) {
        qtrace!("Inbound data offset={} len={}", new_start, new_data.len());

        // Get entry before where new entry would go, so we can see if we already
        // have the new bytes.
        // Avoid copies and duplicated data.
        let new_end = new_start + u64::try_from(new_data.len()).unwrap();

        if new_end <= self.retired {
            // Range already read by application, this frame is very late and unneeded.
            return;
        }

        if new_start < self.retired {
            new_data = &new_data[usize::try_from(self.retired - new_start).unwrap()..];
            new_start = self.retired;
        }

        if new_data.is_empty() {
            // No data to insert
            return;
        }

        let extend = if let Some((&prev_start, prev_vec)) =
            self.data_ranges.range_mut(..=new_start).next_back()
        {
            let prev_end = prev_start + u64::try_from(prev_vec.len()).unwrap();
            if new_end > prev_end {
                // PPPPPP    ->  PPPPPP
                //   NNNNNN            NN
                // NNNNNNNN            NN
                // Add a range containing only new data
                // (In-order frames will take this path, with no overlap)
                let overlap = prev_end.saturating_sub(new_start);
                qtrace!(
                    "New frame {}-{} received, overlap: {}",
                    new_start,
                    new_end,
                    overlap
                );
                new_start += overlap;
                new_data = &new_data[usize::try_from(overlap).unwrap()..];
                // If it is small enough, extend the previous buffer.
                // This can't always extend, because otherwise the buffer could end up
                // growing indefinitely without being released.
                prev_vec.len() < 4096 && prev_end == new_start
            } else {
                // PPPPPP    ->  PPPPPP
                //   NNNN
                // NNNN
                // Do nothing
                qtrace!(
                    "Dropping frame with already-received range {}-{}",
                    new_start,
                    new_end
                );
                return;
            }
        } else {
            qtrace!("New frame {}-{} received", new_start, new_end);
            false
        };

        let mut to_add = new_data;
        if self
            .data_ranges
            .last_entry()
            .is_some_and(|e| *e.key() >= new_start)
        {
            // Is this at the end (common case)?  If so, nothing to do in this block
            // Common case:
            //  PPPPPP        -> PPPPPP
            //        NNNNNNN          NNNNNNN
            // or
            //  PPPPPP             -> PPPPPP
            //             NNNNNNN               NNNNNNN
            //
            // Not the common case, handle possible overlap with next entries
            //  PPPPPP       AAA      -> PPPPPP
            //        NNNNNNN                  NNNNNNN
            // or
            //  PPPPPP     AAAA      -> PPPPPP     AAAA
            //        NNNNNNN                 NNNNN
            // or (this is where to_remove is used)
            //  PPPPPP    AA       -> PPPPPP
            //        NNNNNNN               NNNNNNN

            let mut to_remove = SmallVec::<[_; 8]>::new();

            for (&next_start, next_data) in self.data_ranges.range_mut(new_start..) {
                let next_end = next_start + u64::try_from(next_data.len()).unwrap();
                let overlap = new_end.saturating_sub(next_start);
                if overlap == 0 {
                    // Fills in the hole, exactly (probably common)
                    break;
                } else if next_end >= new_end {
                    qtrace!(
                        "New frame {}-{} overlaps with next frame by {}, truncating",
                        new_start,
                        new_end,
                        overlap
                    );
                    let truncate_to = new_data.len() - usize::try_from(overlap).unwrap();
                    to_add = &new_data[..truncate_to];
                    break;
                }
                qtrace!(
                    "New frame {}-{} spans entire next frame {}-{}, replacing",
                    new_start,
                    new_end,
                    next_start,
                    next_end
                );
                to_remove.push(next_start);
                // Continue, since we may have more overlaps
            }

            for start in to_remove {
                self.data_ranges.remove(&start);
            }
        }

        if !to_add.is_empty() {
            self.received += u64::try_from(to_add.len()).unwrap();
            if extend {
                let (_, buf) = self
                    .data_ranges
                    .range_mut(..=new_start)
                    .next_back()
                    .unwrap();
                buf.extend_from_slice(to_add);
            } else {
                self.data_ranges.insert(new_start, to_add.to_vec());
            }
        }
    }

    /// Are any bytes readable?
    #[must_use]
    pub fn data_ready(&self) -> bool {
        self.data_ranges
            .keys()
            .next()
            .is_some_and(|&start| start <= self.retired)
    }

    /// How many bytes are readable?
    fn bytes_ready(&self) -> usize {
        let mut prev_end = self.retired;
        self.data_ranges
            .iter()
            .map(|(start_offset, data)| {
                // All ranges don't overlap but we could have partially
                // retired some of the first entry's data.
                let data_len = data.len() as u64 - self.retired.saturating_sub(*start_offset);
                (start_offset, data_len)
            })
            .take_while(|(start_offset, data_len)| {
                if **start_offset <= prev_end {
                    prev_end += data_len;
                    true
                } else {
                    false
                }
            })
            // Accumulate, but saturate at usize::MAX.
            .fold(0, |acc: usize, (_, data_len)| {
                acc.saturating_add(usize::try_from(data_len).unwrap_or(usize::MAX))
            })
    }

    /// Bytes read by the application.
    #[must_use]
    pub const fn retired(&self) -> u64 {
        self.retired
    }

    #[must_use]
    pub const fn received(&self) -> u64 {
        self.received
    }

    /// Data bytes buffered. Could be more than `bytes_readable` if there are
    /// ranges missing.
    fn buffered(&self) -> u64 {
        self.data_ranges
            .iter()
            .map(|(&start, data)| data.len() as u64 - (self.retired.saturating_sub(start)))
            .sum()
    }

    /// Copy received data (if any) into the buffer. Returns bytes copied.
    fn read(&mut self, buf: &mut [u8]) -> usize {
        qtrace!("Reading {} bytes, {} available", buf.len(), self.buffered());
        let mut copied = 0;

        for (&range_start, range_data) in &mut self.data_ranges {
            let mut keep = false;
            if self.retired >= range_start {
                // Frame data has new contiguous bytes.
                let copy_offset =
                    usize::try_from(max(range_start, self.retired) - range_start).unwrap();
                assert!(range_data.len() >= copy_offset);
                let available = range_data.len() - copy_offset;
                let space = buf.len() - copied;
                let copy_bytes = if available > space {
                    keep = true;
                    space
                } else {
                    available
                };

                if copy_bytes > 0 {
                    let copy_slc = &range_data[copy_offset..copy_offset + copy_bytes];
                    buf[copied..copied + copy_bytes].copy_from_slice(copy_slc);
                    copied += copy_bytes;
                    self.retired += u64::try_from(copy_bytes).unwrap();
                }
            } else {
                // The data in the buffer isn't contiguous.
                keep = true;
            }
            if keep {
                let mut keep = self.data_ranges.split_off(&range_start);
                mem::swap(&mut self.data_ranges, &mut keep);
                return copied;
            }
        }

        self.data_ranges.clear();
        copied
    }

    /// Extend the given Vector with any available data.
    pub fn read_to_end(&mut self, buf: &mut Vec<u8>) -> usize {
        let orig_len = buf.len();
        buf.resize(orig_len + self.bytes_ready(), 0);
        self.read(&mut buf[orig_len..])
    }
}

/// QUIC receiving states, based on -transport 3.2.
#[derive(Debug)]
// Because a dead_code warning is easier than clippy::unused_self, see https://github.com/rust-lang/rust/issues/68408
enum RecvStreamState {
    Recv {
        fc: ReceiverFlowControl<StreamId>,
        session_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
        recv_buf: RxStreamOrderer,
    },
    SizeKnown {
        fc: ReceiverFlowControl<StreamId>,
        session_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
        recv_buf: RxStreamOrderer,
    },
    DataRecvd {
        fc: ReceiverFlowControl<StreamId>,
        session_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
        recv_buf: RxStreamOrderer,
    },
    DataRead {
        final_received: u64,
        final_read: u64,
    },
    AbortReading {
        fc: ReceiverFlowControl<StreamId>,
        session_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
        final_size_reached: bool,
        frame_needed: bool,
        err: AppError,
        final_received: u64,
        final_read: u64,
    },
    WaitForReset {
        fc: ReceiverFlowControl<StreamId>,
        session_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
        final_received: u64,
        final_read: u64,
    },
    ResetRecvd {
        final_received: u64,
        final_read: u64,
    },
    // Defined by spec but we don't use it: ResetRead
}

impl RecvStreamState {
    fn new(
        max_bytes: u64,
        stream_id: StreamId,
        session_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
    ) -> Self {
        Self::Recv {
            fc: ReceiverFlowControl::new(stream_id, max_bytes),
            recv_buf: RxStreamOrderer::new(),
            session_fc,
        }
    }

    const fn name(&self) -> &str {
        match self {
            Self::Recv { .. } => "Recv",
            Self::SizeKnown { .. } => "SizeKnown",
            Self::DataRecvd { .. } => "DataRecvd",
            Self::DataRead { .. } => "DataRead",
            Self::AbortReading { .. } => "AbortReading",
            Self::WaitForReset { .. } => "WaitForReset",
            Self::ResetRecvd { .. } => "ResetRecvd",
        }
    }

    const fn recv_buf(&self) -> Option<&RxStreamOrderer> {
        match self {
            Self::Recv { recv_buf, .. }
            | Self::SizeKnown { recv_buf, .. }
            | Self::DataRecvd { recv_buf, .. } => Some(recv_buf),
            Self::DataRead { .. }
            | Self::AbortReading { .. }
            | Self::WaitForReset { .. }
            | Self::ResetRecvd { .. } => None,
        }
    }

    fn flow_control_consume_data(&mut self, consumed: u64, fin: bool) -> Res<()> {
        let (fc, session_fc, final_size_reached, retire_data) = match self {
            Self::Recv { fc, session_fc, .. } => (fc, session_fc, false, false),
            Self::WaitForReset { fc, session_fc, .. } => (fc, session_fc, false, true),
            Self::SizeKnown { fc, session_fc, .. } | Self::DataRecvd { fc, session_fc, .. } => {
                (fc, session_fc, true, false)
            }
            Self::AbortReading {
                fc,
                session_fc,
                final_size_reached,
                ..
            } => {
                let old_final_size_reached = *final_size_reached;
                *final_size_reached |= fin;
                (fc, session_fc, old_final_size_reached, true)
            }
            Self::DataRead { .. } | Self::ResetRecvd { .. } => {
                return Ok(());
            }
        };

        // Check final size:
        let final_size_ok = match (fin, final_size_reached) {
            (true, true) => consumed == fc.consumed(),
            (false, true) => consumed <= fc.consumed(),
            (true, false) => consumed >= fc.consumed(),
            (false, false) => true,
        };

        if !final_size_ok {
            return Err(Error::FinalSizeError);
        }

        let new_bytes_consumed = fc.set_consumed(consumed)?;
        session_fc.borrow_mut().consume(new_bytes_consumed)?;
        if retire_data {
            // Let's also retire this data since the stream has been aborted
            RecvStream::flow_control_retire_data(fc.consumed() - fc.retired(), fc, session_fc);
        }
        Ok(())
    }
}

// See https://www.w3.org/TR/webtransport/#receive-stream-stats
#[derive(Debug, Clone, Copy)]
pub struct RecvStreamStats {
    // An indicator of progress on how many of the server application’s bytes
    // intended for this stream have been received so far.
    // Only sequential bytes up to, but not including, the first missing byte,
    // are counted. This number can only increase.
    pub bytes_received: u64,
    // The total number of bytes the application has successfully read from this
    // stream. This number can only increase, and is always less than or equal
    // to bytes_received.
    pub bytes_read: u64,
}

impl RecvStreamStats {
    #[must_use]
    pub const fn new(bytes_received: u64, bytes_read: u64) -> Self {
        Self {
            bytes_received,
            bytes_read,
        }
    }

    #[must_use]
    pub const fn bytes_received(&self) -> u64 {
        self.bytes_received
    }

    #[must_use]
    pub const fn bytes_read(&self) -> u64 {
        self.bytes_read
    }
}

/// Implement a QUIC receive stream.
#[derive(Debug)]
pub struct RecvStream {
    stream_id: StreamId,
    state: RecvStreamState,
    conn_events: ConnectionEvents,
    keep_alive: Option<Rc<()>>,
}

impl RecvStream {
    pub fn new(
        stream_id: StreamId,
        max_stream_data: u64,
        session_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
        conn_events: ConnectionEvents,
    ) -> Self {
        Self {
            stream_id,
            state: RecvStreamState::new(max_stream_data, stream_id, session_fc),
            conn_events,
            keep_alive: None,
        }
    }

    fn set_state(&mut self, new_state: RecvStreamState) {
        debug_assert_ne!(
            mem::discriminant(&self.state),
            mem::discriminant(&new_state)
        );
        qtrace!(
            "RecvStream {} state {} -> {}",
            self.stream_id.as_u64(),
            self.state.name(),
            new_state.name()
        );

        match new_state {
            // Receiving all data, or receiving or requesting RESET_STREAM
            // is cause to stop keep-alives.
            RecvStreamState::DataRecvd { .. }
            | RecvStreamState::AbortReading { .. }
            | RecvStreamState::ResetRecvd { .. } => {
                self.keep_alive = None;
            }
            // Once all the data is read, generate an event.
            RecvStreamState::DataRead { .. } => {
                self.conn_events.recv_stream_complete(self.stream_id);
            }
            _ => {}
        }

        self.state = new_state;
    }

    #[must_use]
    pub const fn stats(&self) -> RecvStreamStats {
        match &self.state {
            RecvStreamState::Recv { recv_buf, .. }
            | RecvStreamState::SizeKnown { recv_buf, .. }
            | RecvStreamState::DataRecvd { recv_buf, .. } => {
                let received = recv_buf.received();
                let read = recv_buf.retired();
                RecvStreamStats::new(received, read)
            }
            RecvStreamState::AbortReading {
                final_received,
                final_read,
                ..
            }
            | RecvStreamState::WaitForReset {
                final_received,
                final_read,
                ..
            }
            | RecvStreamState::DataRead {
                final_received,
                final_read,
            }
            | RecvStreamState::ResetRecvd {
                final_received,
                final_read,
            } => {
                let received = *final_received;
                let read = *final_read;
                RecvStreamStats::new(received, read)
            }
        }
    }

    /// # Errors
    /// When the incoming data violates flow control limits.
    /// # Panics
    /// Only when `u64` values are so big that they can't fit in a `usize`, which
    /// only happens on a 32-bit machine that has far too much unread data.
    pub fn inbound_stream_frame(&mut self, fin: bool, offset: u64, data: &[u8]) -> Res<()> {
        // We should post a DataReadable event only once when we change from no-data-ready to
        // data-ready. Therefore remember the state before processing a new frame.
        let already_data_ready = self.data_ready();
        let new_end = offset + u64::try_from(data.len())?;

        self.state.flow_control_consume_data(new_end, fin)?;

        match &mut self.state {
            RecvStreamState::Recv {
                recv_buf,
                fc,
                session_fc,
            } => {
                recv_buf.inbound_frame(offset, data);
                if fin {
                    let all_recv =
                        fc.consumed() == recv_buf.retired() + recv_buf.bytes_ready() as u64;
                    let buf = mem::replace(recv_buf, RxStreamOrderer::new());
                    let fc_copy = mem::take(fc);
                    let session_fc_copy = mem::take(session_fc);
                    if all_recv {
                        self.set_state(RecvStreamState::DataRecvd {
                            fc: fc_copy,
                            session_fc: session_fc_copy,
                            recv_buf: buf,
                        });
                    } else {
                        self.set_state(RecvStreamState::SizeKnown {
                            fc: fc_copy,
                            session_fc: session_fc_copy,
                            recv_buf: buf,
                        });
                    }
                }
            }
            RecvStreamState::SizeKnown {
                recv_buf,
                fc,
                session_fc,
            } => {
                recv_buf.inbound_frame(offset, data);
                if fc.consumed() == recv_buf.retired() + recv_buf.bytes_ready() as u64 {
                    let buf = mem::replace(recv_buf, RxStreamOrderer::new());
                    let fc_copy = mem::take(fc);
                    let session_fc_copy = mem::take(session_fc);
                    self.set_state(RecvStreamState::DataRecvd {
                        fc: fc_copy,
                        session_fc: session_fc_copy,
                        recv_buf: buf,
                    });
                }
            }
            RecvStreamState::DataRecvd { .. }
            | RecvStreamState::DataRead { .. }
            | RecvStreamState::AbortReading { .. }
            | RecvStreamState::WaitForReset { .. }
            | RecvStreamState::ResetRecvd { .. } => {
                qtrace!("data received when we are in state {}", self.state.name());
            }
        }

        if !already_data_ready && (self.data_ready() || self.needs_to_inform_app_about_fin()) {
            self.conn_events.recv_stream_readable(self.stream_id);
        }

        Ok(())
    }

    /// # Errors
    /// When the reset occurs at an invalid point.
    pub fn reset(&mut self, application_error_code: AppError, final_size: u64) -> Res<()> {
        self.state.flow_control_consume_data(final_size, true)?;
        match &mut self.state {
            RecvStreamState::Recv {
                fc,
                session_fc,
                recv_buf,
            }
            | RecvStreamState::SizeKnown {
                fc,
                session_fc,
                recv_buf,
            } => {
                // make flow control consumes new data that not really exist.
                Self::flow_control_retire_data(final_size - fc.retired(), fc, session_fc);
                self.conn_events
                    .recv_stream_reset(self.stream_id, application_error_code);
                let received = recv_buf.received();
                let read = recv_buf.retired();
                self.set_state(RecvStreamState::ResetRecvd {
                    final_received: received,
                    final_read: read,
                });
            }
            RecvStreamState::AbortReading {
                fc,
                session_fc,
                final_received,
                final_read,
                ..
            }
            | RecvStreamState::WaitForReset {
                fc,
                session_fc,
                final_received,
                final_read,
            } => {
                // make flow control consumes new data that not really exist.
                Self::flow_control_retire_data(final_size - fc.retired(), fc, session_fc);
                self.conn_events
                    .recv_stream_reset(self.stream_id, application_error_code);
                let received = *final_received;
                let read = *final_read;
                self.set_state(RecvStreamState::ResetRecvd {
                    final_received: received,
                    final_read: read,
                });
            }
            _ => {
                // Ignore reset if in DataRecvd, DataRead, or ResetRecvd
            }
        }
        Ok(())
    }

    /// If we should tell the sender they have more credit, return an offset
    fn flow_control_retire_data(
        new_read: u64,
        fc: &mut ReceiverFlowControl<StreamId>,
        session_fc: &Rc<RefCell<ReceiverFlowControl<()>>>,
    ) {
        if new_read > 0 {
            fc.add_retired(new_read);
            session_fc.borrow_mut().add_retired(new_read);
        }
    }

    /// Send a flow control update.
    /// This is used when a peer declares that they are blocked.
    /// This sends `MAX_STREAM_DATA` if there is any increase possible.
    pub fn send_flowc_update(&mut self) {
        if let RecvStreamState::Recv { fc, .. } = &mut self.state {
            fc.send_flowc_update();
        }
    }

    pub fn set_stream_max_data(&mut self, max_data: u64) {
        if let RecvStreamState::Recv { fc, .. } = &mut self.state {
            fc.set_max_active(max_data);
        }
    }

    #[must_use]
    pub const fn is_terminal(&self) -> bool {
        matches!(
            self.state,
            RecvStreamState::ResetRecvd { .. } | RecvStreamState::DataRead { .. }
        )
    }

    // App got all data but did not get the fin signal.
    const fn needs_to_inform_app_about_fin(&self) -> bool {
        matches!(self.state, RecvStreamState::DataRecvd { .. })
    }

    fn data_ready(&self) -> bool {
        self.state
            .recv_buf()
            .is_some_and(RxStreamOrderer::data_ready)
    }

    /// # Errors
    /// `NoMoreData` if data and fin bit were previously read by the application.
    #[allow(clippy::missing_panics_doc)] // with a >16 exabyte packet on a 128-bit machine, maybe
    pub fn read(&mut self, buf: &mut [u8]) -> Res<(usize, bool)> {
        let data_recvd_state = matches!(self.state, RecvStreamState::DataRecvd { .. });
        match &mut self.state {
            RecvStreamState::Recv {
                recv_buf,
                fc,
                session_fc,
            }
            | RecvStreamState::SizeKnown {
                recv_buf,
                fc,
                session_fc,
                ..
            }
            | RecvStreamState::DataRecvd {
                recv_buf,
                fc,
                session_fc,
            } => {
                let bytes_read = recv_buf.read(buf);
                Self::flow_control_retire_data(u64::try_from(bytes_read).unwrap(), fc, session_fc);
                let fin_read = if data_recvd_state {
                    if recv_buf.buffered() == 0 {
                        let received = recv_buf.received();
                        let read = recv_buf.retired();
                        self.set_state(RecvStreamState::DataRead {
                            final_received: received,
                            final_read: read,
                        });
                        true
                    } else {
                        false
                    }
                } else {
                    false
                };
                Ok((bytes_read, fin_read))
            }
            RecvStreamState::DataRead { .. }
            | RecvStreamState::AbortReading { .. }
            | RecvStreamState::WaitForReset { .. }
            | RecvStreamState::ResetRecvd { .. } => Err(Error::NoMoreData),
        }
    }

    pub fn stop_sending(&mut self, err: AppError) {
        qtrace!("stop_sending called when in state {}", self.state.name());
        match &mut self.state {
            RecvStreamState::Recv {
                fc,
                session_fc,
                recv_buf,
            }
            | RecvStreamState::SizeKnown {
                fc,
                session_fc,
                recv_buf,
            } => {
                // Retire data
                Self::flow_control_retire_data(fc.consumed() - fc.retired(), fc, session_fc);
                let fc_copy = mem::take(fc);
                let session_fc_copy = mem::take(session_fc);
                let received = recv_buf.received();
                let read = recv_buf.retired();
                self.set_state(RecvStreamState::AbortReading {
                    fc: fc_copy,
                    session_fc: session_fc_copy,
                    final_size_reached: matches!(self.state, RecvStreamState::SizeKnown { .. }),
                    frame_needed: true,
                    err,
                    final_received: received,
                    final_read: read,
                });
            }
            RecvStreamState::DataRecvd {
                fc,
                session_fc,
                recv_buf,
            } => {
                Self::flow_control_retire_data(fc.consumed() - fc.retired(), fc, session_fc);
                let received = recv_buf.received();
                let read = recv_buf.retired();
                self.set_state(RecvStreamState::DataRead {
                    final_received: received,
                    final_read: read,
                });
            }
            RecvStreamState::DataRead { .. }
            | RecvStreamState::AbortReading { .. }
            | RecvStreamState::WaitForReset { .. }
            | RecvStreamState::ResetRecvd { .. } => {
                // Already in terminal state
            }
        }
    }

    /// Maybe write a `MAX_STREAM_DATA` frame.
    pub fn write_frame(
        &mut self,
        builder: &mut PacketBuilder,
        tokens: &mut Vec<RecoveryToken>,
        stats: &mut FrameStats,
    ) {
        match &mut self.state {
            // Maybe send MAX_STREAM_DATA
            RecvStreamState::Recv { fc, .. } => fc.write_frames(builder, tokens, stats),
            // Maybe send STOP_SENDING
            RecvStreamState::AbortReading {
                frame_needed, err, ..
            } => {
                if *frame_needed
                    && builder.write_varint_frame(&[
                        FRAME_TYPE_STOP_SENDING,
                        self.stream_id.as_u64(),
                        *err,
                    ])
                {
                    tokens.push(RecoveryToken::Stream(StreamRecoveryToken::StopSending {
                        stream_id: self.stream_id,
                    }));
                    stats.stop_sending += 1;
                    *frame_needed = false;
                }
            }
            _ => {}
        }
    }

    pub fn max_stream_data_lost(&mut self, maximum_data: u64) {
        if let RecvStreamState::Recv { fc, .. } = &mut self.state {
            fc.frame_lost(maximum_data);
        }
    }

    pub fn stop_sending_lost(&mut self) {
        if let RecvStreamState::AbortReading { frame_needed, .. } = &mut self.state {
            *frame_needed = true;
        }
    }

    pub fn stop_sending_acked(&mut self) {
        if let RecvStreamState::AbortReading {
            fc,
            session_fc,
            final_size_reached,
            final_received,
            final_read,
            ..
        } = &mut self.state
        {
            let received = *final_received;
            let read = *final_read;
            if *final_size_reached {
                // We already know the final_size of the stream therefore we
                // do not need to wait for RESET.
                self.set_state(RecvStreamState::ResetRecvd {
                    final_received: received,
                    final_read: read,
                });
            } else {
                let fc_copy = mem::take(fc);
                let session_fc_copy = mem::take(session_fc);
                self.set_state(RecvStreamState::WaitForReset {
                    fc: fc_copy,
                    session_fc: session_fc_copy,
                    final_received: received,
                    final_read: read,
                });
            }
        }
    }

    #[cfg(test)]
    #[must_use]
    pub const fn has_frames_to_write(&self) -> bool {
        if let RecvStreamState::Recv { fc, .. } = &self.state {
            fc.frame_needed()
        } else {
            false
        }
    }

    #[cfg(test)]
    #[must_use]
    pub const fn fc(&self) -> Option<&ReceiverFlowControl<StreamId>> {
        match &self.state {
            RecvStreamState::Recv { fc, .. }
            | RecvStreamState::SizeKnown { fc, .. }
            | RecvStreamState::DataRecvd { fc, .. }
            | RecvStreamState::AbortReading { fc, .. }
            | RecvStreamState::WaitForReset { fc, .. } => Some(fc),
            _ => None,
        }
    }
}

#[cfg(test)]
mod tests {
    use std::{cell::RefCell, ops::Range, rc::Rc};

    use neqo_common::{qtrace, Encoder};

    use super::RecvStream;
    use crate::{
        fc::ReceiverFlowControl,
        packet::PacketBuilder,
        recv_stream::{RxStreamOrderer, RX_STREAM_DATA_WINDOW},
        stats::FrameStats,
        ConnectionEvents, Error, StreamId, RECV_BUFFER_SIZE,
    };

    const SESSION_WINDOW: usize = 1024;

    fn recv_ranges(ranges: &[Range<u64>], available: usize) {
        const ZEROES: &[u8] = &[0; 100];
        qtrace!("recv_ranges {:?}", ranges);

        let mut s = RxStreamOrderer::default();
        for r in ranges {
            let data = &ZEROES[..usize::try_from(r.end - r.start).unwrap()];
            s.inbound_frame(r.start, data);
        }

        let mut buf = [0xff; 100];
        let mut total_recvd = 0;
        loop {
            let recvd = s.read(&mut buf[..]);
            qtrace!("recv_ranges read {}", recvd);
            total_recvd += recvd;
            if recvd == 0 {
                assert_eq!(total_recvd, available);
                break;
            }
        }
    }

    #[test]
    #[allow(clippy::single_range_in_vec_init)] // Because that lint makes no sense here.
    fn recv_noncontiguous() {
        // Non-contiguous with the start, no data available.
        recv_ranges(&[10..20], 0);
    }

    /// Overlaps with the start of a 10..20 range of bytes.
    #[test]
    fn recv_overlap_start() {
        // Overlap the start, with a larger new value.
        // More overlap than not.
        recv_ranges(&[10..20, 4..18, 0..4], 20);
        // Overlap the start, with a larger new value.
        // Less overlap than not.
        recv_ranges(&[10..20, 2..15, 0..2], 20);
        // Overlap the start, with a smaller new value.
        // More overlap than not.
        recv_ranges(&[10..20, 8..14, 0..8], 20);
        // Overlap the start, with a smaller new value.
        // Less overlap than not.
        recv_ranges(&[10..20, 6..13, 0..6], 20);

        // Again with some of the first range split in two.
        recv_ranges(&[10..11, 11..20, 4..18, 0..4], 20);
        recv_ranges(&[10..11, 11..20, 2..15, 0..2], 20);
        recv_ranges(&[10..11, 11..20, 8..14, 0..8], 20);
        recv_ranges(&[10..11, 11..20, 6..13, 0..6], 20);

        // Again with a gap in the first range.
        recv_ranges(&[10..11, 12..20, 4..18, 0..4], 20);
        recv_ranges(&[10..11, 12..20, 2..15, 0..2], 20);
        recv_ranges(&[10..11, 12..20, 8..14, 0..8], 20);
        recv_ranges(&[10..11, 12..20, 6..13, 0..6], 20);
    }

    /// Overlaps with the end of a 10..20 range of bytes.
    #[test]
    fn recv_overlap_end() {
        // Overlap the end, with a larger new value.
        // More overlap than not.
        recv_ranges(&[10..20, 12..25, 0..10], 25);
        // Overlap the end, with a larger new value.
        // Less overlap than not.
        recv_ranges(&[10..20, 17..33, 0..10], 33);
        // Overlap the end, with a smaller new value.
        // More overlap than not.
        recv_ranges(&[10..20, 15..21, 0..10], 21);
        // Overlap the end, with a smaller new value.
        // Less overlap than not.
        recv_ranges(&[10..20, 17..25, 0..10], 25);

        // Again with some of the first range split in two.
        recv_ranges(&[10..19, 19..20, 12..25, 0..10], 25);
        recv_ranges(&[10..19, 19..20, 17..33, 0..10], 33);
        recv_ranges(&[10..19, 19..20, 15..21, 0..10], 21);
        recv_ranges(&[10..19, 19..20, 17..25, 0..10], 25);

        // Again with a gap in the first range.
        recv_ranges(&[10..18, 19..20, 12..25, 0..10], 25);
        recv_ranges(&[10..18, 19..20, 17..33, 0..10], 33);
        recv_ranges(&[10..18, 19..20, 15..21, 0..10], 21);
        recv_ranges(&[10..18, 19..20, 17..25, 0..10], 25);
    }

    /// Complete overlaps with the start of a 10..20 range of bytes.
    #[test]
    fn recv_overlap_complete() {
        // Complete overlap, more at the end.
        recv_ranges(&[10..20, 9..23, 0..9], 23);
        // Complete overlap, more at the start.
        recv_ranges(&[10..20, 3..23, 0..3], 23);
        // Complete overlap, to end.
        recv_ranges(&[10..20, 5..20, 0..5], 20);
        // Complete overlap, from start.
        recv_ranges(&[10..20, 10..27, 0..10], 27);
        // Complete overlap, from 0 and more.
        recv_ranges(&[10..20, 0..23], 23);

        // Again with the first range split in two.
        recv_ranges(&[10..14, 14..20, 9..23, 0..9], 23);
        recv_ranges(&[10..14, 14..20, 3..23, 0..3], 23);
        recv_ranges(&[10..14, 14..20, 5..20, 0..5], 20);
        recv_ranges(&[10..14, 14..20, 10..27, 0..10], 27);
        recv_ranges(&[10..14, 14..20, 0..23], 23);

        // Again with the a gap in the first range.
        recv_ranges(&[10..13, 14..20, 9..23, 0..9], 23);
        recv_ranges(&[10..13, 14..20, 3..23, 0..3], 23);
        recv_ranges(&[10..13, 14..20, 5..20, 0..5], 20);
        recv_ranges(&[10..13, 14..20, 10..27, 0..10], 27);
        recv_ranges(&[10..13, 14..20, 0..23], 23);
    }

    /// An overlap with no new bytes.
    #[test]
    fn recv_overlap_duplicate() {
        recv_ranges(&[10..20, 11..12, 0..10], 20);
        recv_ranges(&[10..20, 10..15, 0..10], 20);
        recv_ranges(&[10..20, 14..20, 0..10], 20);
        // Now with the first range split.
        recv_ranges(&[10..14, 14..20, 10..15, 0..10], 20);
        recv_ranges(&[10..15, 16..20, 21..25, 10..25, 0..10], 25);
    }

    /// Reading exactly one chunk works, when the next chunk starts immediately.
    #[test]
    fn stop_reading_at_chunk() {
        const CHUNK_SIZE: usize = 10;
        const EXTRA_SIZE: usize = 3;
        let mut s = RxStreamOrderer::new();

        // Add three chunks.
        s.inbound_frame(0, &[0; CHUNK_SIZE]);
        let offset = u64::try_from(CHUNK_SIZE).unwrap();
        s.inbound_frame(offset, &[0; EXTRA_SIZE]);
        let offset = u64::try_from(CHUNK_SIZE + EXTRA_SIZE).unwrap();
        s.inbound_frame(offset, &[0; EXTRA_SIZE]);

        // Read, providing only enough space for the first.
        let mut buf = [0; 100];
        let count = s.read(&mut buf[..CHUNK_SIZE]);
        assert_eq!(count, CHUNK_SIZE);
        let count = s.read(&mut buf[..]);
        assert_eq!(count, EXTRA_SIZE * 2);
    }

    #[test]
    fn recv_overlap_while_reading() {
        let mut s = RxStreamOrderer::new();

        // Add a chunk
        s.inbound_frame(0, &[0; 150]);
        assert_eq!(s.data_ranges.get(&0).unwrap().len(), 150);
        // Read, providing only enough space for the first 100.
        let mut buf = [0; 100];
        let count = s.read(&mut buf[..]);
        assert_eq!(count, 100);
        assert_eq!(s.retired, 100);

        // Add a second frame that overlaps.
        // This shouldn't truncate the first frame, as we're already
        // Reading from it.
        s.inbound_frame(120, &[0; 60]);
        assert_eq!(s.data_ranges.get(&0).unwrap().len(), 180);
        // Read second part of first frame and all of the second frame
        let count = s.read(&mut buf[..]);
        assert_eq!(count, 80);
    }

    /// Reading exactly one chunk works, when there is a gap.
    #[test]
    fn stop_reading_at_gap() {
        const CHUNK_SIZE: usize = 10;
        const EXTRA_SIZE: usize = 3;
        let mut s = RxStreamOrderer::new();

        // Add three chunks.
        s.inbound_frame(0, &[0; CHUNK_SIZE]);
        let offset = u64::try_from(CHUNK_SIZE + EXTRA_SIZE).unwrap();
        s.inbound_frame(offset, &[0; EXTRA_SIZE]);

        // Read, providing only enough space for the first chunk.
        let mut buf = [0; 100];
        let count = s.read(&mut buf[..CHUNK_SIZE]);
        assert_eq!(count, CHUNK_SIZE);

        // Now fill the gap and ensure that everything can be read.
        let offset = u64::try_from(CHUNK_SIZE).unwrap();
        s.inbound_frame(offset, &[0; EXTRA_SIZE]);
        let count = s.read(&mut buf[..]);
        assert_eq!(count, EXTRA_SIZE * 2);
    }

    /// Reading exactly one chunk works, when there is a gap.
    #[test]
    fn stop_reading_in_chunk() {
        const CHUNK_SIZE: usize = 10;
        const EXTRA_SIZE: usize = 3;
        let mut s = RxStreamOrderer::new();

        // Add two chunks.
        s.inbound_frame(0, &[0; CHUNK_SIZE]);
        let offset = u64::try_from(CHUNK_SIZE).unwrap();
        s.inbound_frame(offset, &[0; EXTRA_SIZE]);

        // Read, providing only enough space for some of the first chunk.
        let mut buf = [0; 100];
        let count = s.read(&mut buf[..CHUNK_SIZE - EXTRA_SIZE]);
        assert_eq!(count, CHUNK_SIZE - EXTRA_SIZE);

        let count = s.read(&mut buf[..]);
        assert_eq!(count, EXTRA_SIZE * 2);
    }

    /// Read one byte at a time.
    #[test]
    fn read_byte_at_a_time() {
        const CHUNK_SIZE: usize = 10;
        const EXTRA_SIZE: usize = 3;
        let mut s = RxStreamOrderer::new();

        // Add two chunks.
        s.inbound_frame(0, &[0; CHUNK_SIZE]);
        let offset = u64::try_from(CHUNK_SIZE).unwrap();
        s.inbound_frame(offset, &[0; EXTRA_SIZE]);

        let mut buf = [0; 1];
        for _ in 0..CHUNK_SIZE + EXTRA_SIZE {
            let count = s.read(&mut buf[..]);
            assert_eq!(count, 1);
        }
        assert_eq!(0, s.read(&mut buf[..]));
    }

    fn check_stats(stream: &RecvStream, expected_received: u64, expected_read: u64) {
        let stream_stats = stream.stats();
        assert_eq!(expected_received, stream_stats.bytes_received());
        assert_eq!(expected_read, stream_stats.bytes_read());
    }

    #[test]
    fn stream_rx() {
        let conn_events = ConnectionEvents::default();

        let mut s = RecvStream::new(
            StreamId::from(567),
            1024,
            Rc::new(RefCell::new(ReceiverFlowControl::new((), 1024 * 1024))),
            conn_events,
        );

        // test receiving a contig frame and reading it works
        s.inbound_stream_frame(false, 0, &[1; 10]).unwrap();
        assert!(s.data_ready());
        check_stats(&s, 10, 0);

        let mut buf = vec![0u8; 100];
        assert_eq!(s.read(&mut buf).unwrap(), (10, false));
        assert_eq!(s.state.recv_buf().unwrap().retired(), 10);
        assert_eq!(s.state.recv_buf().unwrap().buffered(), 0);

        check_stats(&s, 10, 10);

        // test receiving a noncontig frame
        s.inbound_stream_frame(false, 12, &[2; 12]).unwrap();
        assert!(!s.data_ready());
        assert_eq!(s.read(&mut buf).unwrap(), (0, false));
        assert_eq!(s.state.recv_buf().unwrap().retired(), 10);
        assert_eq!(s.state.recv_buf().unwrap().buffered(), 12);

        check_stats(&s, 22, 10);

        // another frame that overlaps the first
        s.inbound_stream_frame(false, 14, &[3; 8]).unwrap();
        assert!(!s.data_ready());
        assert_eq!(s.state.recv_buf().unwrap().retired(), 10);
        assert_eq!(s.state.recv_buf().unwrap().buffered(), 12);

        check_stats(&s, 22, 10);

        // fill in the gap, but with a FIN
        s.inbound_stream_frame(true, 10, &[4; 6]).unwrap_err();
        assert!(!s.data_ready());
        assert_eq!(s.read(&mut buf).unwrap(), (0, false));
        assert_eq!(s.state.recv_buf().unwrap().retired(), 10);
        assert_eq!(s.state.recv_buf().unwrap().buffered(), 12);

        check_stats(&s, 22, 10);

        // fill in the gap
        s.inbound_stream_frame(false, 10, &[5; 10]).unwrap();
        assert!(s.data_ready());
        assert_eq!(s.state.recv_buf().unwrap().retired(), 10);
        assert_eq!(s.state.recv_buf().unwrap().buffered(), 14);

        check_stats(&s, 24, 10);

        // a legit FIN
        s.inbound_stream_frame(true, 24, &[6; 18]).unwrap();
        assert_eq!(s.state.recv_buf().unwrap().retired(), 10);
        assert_eq!(s.state.recv_buf().unwrap().buffered(), 32);
        assert!(s.data_ready());
        assert_eq!(s.read(&mut buf).unwrap(), (32, true));

        check_stats(&s, 42, 42);

        // Stream now no longer readable (is in DataRead state)
        s.read(&mut buf).unwrap_err();
    }

    fn check_chunks(s: &RxStreamOrderer, expected: &[(u64, usize)]) {
        assert_eq!(s.data_ranges.len(), expected.len());
        for ((start, buf), (expected_start, expected_len)) in s.data_ranges.iter().zip(expected) {
            assert_eq!((*start, buf.len()), (*expected_start, *expected_len));
        }
    }

    // Test deduplication when the new data is at the end.
    #[test]
    fn stream_rx_dedupe_tail() {
        let mut s = RxStreamOrderer::new();

        s.inbound_frame(0, &[1; 6]);
        check_chunks(&s, &[(0, 6)]);

        // New data that overlaps entirely (starting from the head), is ignored.
        s.inbound_frame(0, &[2; 3]);
        check_chunks(&s, &[(0, 6)]);

        // New data that overlaps at the tail has any new data appended.
        s.inbound_frame(2, &[3; 6]);
        check_chunks(&s, &[(0, 8)]);

        // New data that overlaps entirely (up to the tail), is ignored.
        s.inbound_frame(4, &[4; 4]);
        check_chunks(&s, &[(0, 8)]);

        // New data that overlaps, starting from the beginning is appended too.
        s.inbound_frame(0, &[5; 10]);
        check_chunks(&s, &[(0, 10)]);

        // New data that is entirely subsumed is ignored.
        s.inbound_frame(2, &[6; 2]);
        check_chunks(&s, &[(0, 10)]);

        let mut buf = [0; 16];
        assert_eq!(s.read(&mut buf[..]), 10);
        assert_eq!(buf[..10], [1, 1, 1, 1, 1, 1, 3, 3, 5, 5]);
    }

    /// When chunks are added before existing data, they aren't merged.
    #[test]
    fn stream_rx_dedupe_head() {
        let mut s = RxStreamOrderer::new();

        s.inbound_frame(1, &[6; 6]);
        check_chunks(&s, &[(1, 6)]);

        // Insertion before an existing chunk causes truncation of the new chunk.
        s.inbound_frame(0, &[7; 6]);
        check_chunks(&s, &[(0, 1), (1, 6)]);

        // Perfect overlap with existing slices has no effect.
        s.inbound_frame(0, &[8; 7]);
        check_chunks(&s, &[(0, 1), (1, 6)]);

        let mut buf = [0; 16];
        assert_eq!(s.read(&mut buf[..]), 7);
        assert_eq!(buf[..7], [7, 6, 6, 6, 6, 6, 6]);
    }

    #[test]
    fn stream_rx_dedupe_new_tail() {
        let mut s = RxStreamOrderer::new();

        s.inbound_frame(1, &[6; 6]);
        check_chunks(&s, &[(1, 6)]);

        // Insertion before an existing chunk causes truncation of the new chunk.
        s.inbound_frame(0, &[7; 6]);
        check_chunks(&s, &[(0, 1), (1, 6)]);

        // New data at the end causes the tail to be added to the first chunk,
        // replacing later chunks entirely.
        s.inbound_frame(0, &[9; 8]);
        check_chunks(&s, &[(0, 8)]);

        let mut buf = [0; 16];
        assert_eq!(s.read(&mut buf[..]), 8);
        assert_eq!(buf[..8], [7, 9, 9, 9, 9, 9, 9, 9]);
    }

    #[test]
    fn stream_rx_dedupe_replace() {
        let mut s = RxStreamOrderer::new();

        s.inbound_frame(2, &[6; 6]);
        check_chunks(&s, &[(2, 6)]);

        // Insertion before an existing chunk causes truncation of the new chunk.
        s.inbound_frame(1, &[7; 6]);
        check_chunks(&s, &[(1, 1), (2, 6)]);

        // New data at the start and end replaces all the slices.
        s.inbound_frame(0, &[9; 10]);
        check_chunks(&s, &[(0, 10)]);

        let mut buf = [0; 16];
        assert_eq!(s.read(&mut buf[..]), 10);
        assert_eq!(buf[..10], [9; 10]);
    }

    #[test]
    fn trim_retired() {
        let mut s = RxStreamOrderer::new();

        let mut buf = [0; 18];
        s.inbound_frame(0, &[1; 10]);

        // Partially read slices are retained.
        assert_eq!(s.read(&mut buf[..6]), 6);
        check_chunks(&s, &[(0, 10)]);

        // Partially read slices are kept and so are added to.
        s.inbound_frame(3, &buf[..10]);
        check_chunks(&s, &[(0, 13)]);

        // Wholly read pieces are dropped.
        assert_eq!(s.read(&mut buf[..]), 7);
        assert!(s.data_ranges.is_empty());

        // New data that overlaps with retired data is trimmed.
        s.inbound_frame(0, &buf[..]);
        check_chunks(&s, &[(13, 5)]);
    }

    #[test]
    fn stream_flowc_update() {
        let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW);
        let mut buf = vec![0u8; RECV_BUFFER_SIZE + 100]; // Make it overlarge

        assert!(!s.has_frames_to_write());
        let big_buf = vec![0; RECV_BUFFER_SIZE];
        s.inbound_stream_frame(false, 0, &big_buf).unwrap();
        assert!(!s.has_frames_to_write());
        assert_eq!(s.read(&mut buf).unwrap(), (RECV_BUFFER_SIZE, false));
        assert!(!s.data_ready());

        // flow msg generated!
        assert!(s.has_frames_to_write());

        // consume it
        let mut builder = PacketBuilder::short(Encoder::new(), false, None::<&[u8]>);
        let mut token = Vec::new();
        s.write_frame(&mut builder, &mut token, &mut FrameStats::default());

        // it should be gone
        assert!(!s.has_frames_to_write());
    }

    fn create_stream(session_fc: u64) -> RecvStream {
        let conn_events = ConnectionEvents::default();
        RecvStream::new(
            StreamId::from(67),
            RX_STREAM_DATA_WINDOW,
            Rc::new(RefCell::new(ReceiverFlowControl::new((), session_fc))),
            conn_events,
        )
    }

    #[test]
    fn stream_max_stream_data() {
        let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW);
        assert!(!s.has_frames_to_write());
        let big_buf = vec![0; RECV_BUFFER_SIZE];
        s.inbound_stream_frame(false, 0, &big_buf).unwrap();
        s.inbound_stream_frame(false, RX_STREAM_DATA_WINDOW, &[1; 1])
            .unwrap_err();
    }

    #[test]
    fn stream_orderer_bytes_ready() {
        let mut rx_ord = RxStreamOrderer::new();

        rx_ord.inbound_frame(0, &[1; 6]);
        assert_eq!(rx_ord.bytes_ready(), 6);
        assert_eq!(rx_ord.buffered(), 6);
        assert_eq!(rx_ord.retired(), 0);

        // read some so there's an offset into the first frame
        let mut buf = [0u8; 10];
        rx_ord.read(&mut buf[..2]);
        assert_eq!(rx_ord.bytes_ready(), 4);
        assert_eq!(rx_ord.buffered(), 4);
        assert_eq!(rx_ord.retired(), 2);

        // an overlapping frame
        rx_ord.inbound_frame(5, &[2; 6]);
        assert_eq!(rx_ord.bytes_ready(), 9);
        assert_eq!(rx_ord.buffered(), 9);
        assert_eq!(rx_ord.retired(), 2);

        // a noncontig frame
        rx_ord.inbound_frame(20, &[3; 6]);
        assert_eq!(rx_ord.bytes_ready(), 9);
        assert_eq!(rx_ord.buffered(), 15);
        assert_eq!(rx_ord.retired(), 2);

        // an old frame
        rx_ord.inbound_frame(0, &[4; 2]);
        assert_eq!(rx_ord.bytes_ready(), 9);
        assert_eq!(rx_ord.buffered(), 15);
        assert_eq!(rx_ord.retired(), 2);
    }

    #[test]
    fn no_stream_flowc_event_after_exiting_recv() {
        let mut s = create_stream(1024 * RX_STREAM_DATA_WINDOW);
        let mut buf = vec![0; RECV_BUFFER_SIZE];
        // Write from buf at first.
        s.inbound_stream_frame(false, 0, &buf).unwrap();
        // Then read into it.
        s.read(&mut buf).unwrap();
        assert!(s.has_frames_to_write());
        s.inbound_stream_frame(true, RX_STREAM_DATA_WINDOW, &[])
            .unwrap();
        assert!(!s.has_frames_to_write());
    }

    fn create_stream_with_fc(
        session_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
        fc_limit: u64,
    ) -> RecvStream {
        RecvStream::new(
            StreamId::from(567),
            fc_limit,
            session_fc,
            ConnectionEvents::default(),
        )
    }

    fn create_stream_session_flow_control() -> (RecvStream, Rc<RefCell<ReceiverFlowControl<()>>>) {
        assert!(RX_STREAM_DATA_WINDOW > u64::try_from(SESSION_WINDOW).unwrap());
        let session_fc = Rc::new(RefCell::new(ReceiverFlowControl::new(
            (),
            u64::try_from(SESSION_WINDOW).unwrap(),
        )));
        (
            create_stream_with_fc(Rc::clone(&session_fc), RX_STREAM_DATA_WINDOW),
            session_fc,
        )
    }

    #[test]
    fn session_flow_control() {
        let (mut s, session_fc) = create_stream_session_flow_control();

        s.inbound_stream_frame(false, 0, &[0; SESSION_WINDOW])
            .unwrap();
        assert!(!session_fc.borrow().frame_needed());
        // The buffer is big enough to hold SESSION_WINDOW, this will make sure that we always
        // read everything from he stream.
        let mut buf = [0; 2 * SESSION_WINDOW];
        s.read(&mut buf).unwrap();
        assert!(session_fc.borrow().frame_needed());
        // consume it
        let mut builder = PacketBuilder::short(Encoder::new(), false, None::<&[u8]>);
        let mut token = Vec::new();
        session_fc
            .borrow_mut()
            .write_frames(&mut builder, &mut token, &mut FrameStats::default());

        // Switch to SizeKnown state
        s.inbound_stream_frame(true, 2 * u64::try_from(SESSION_WINDOW).unwrap() - 1, &[0])
            .unwrap();
        assert!(!session_fc.borrow().frame_needed());
        // Receive new data that can be read.
        s.inbound_stream_frame(
            false,
            u64::try_from(SESSION_WINDOW).unwrap(),
            &[0; SESSION_WINDOW / 2 + 1],
        )
        .unwrap();
        assert!(!session_fc.borrow().frame_needed());
        s.read(&mut buf).unwrap();
        assert!(session_fc.borrow().frame_needed());
        // consume it
        let mut builder = PacketBuilder::short(Encoder::new(), false, None::<&[u8]>);
        let mut token = Vec::new();
        session_fc
            .borrow_mut()
            .write_frames(&mut builder, &mut token, &mut FrameStats::default());

        // Test DataRecvd state
        let session_fc = Rc::new(RefCell::new(ReceiverFlowControl::new(
            (),
            u64::try_from(SESSION_WINDOW).unwrap(),
        )));
        let mut s = RecvStream::new(
            StreamId::from(567),
            RX_STREAM_DATA_WINDOW,
            Rc::clone(&session_fc),
            ConnectionEvents::default(),
        );

        s.inbound_stream_frame(true, 0, &[0; SESSION_WINDOW])
            .unwrap();
        assert!(!session_fc.borrow().frame_needed());
        s.read(&mut buf).unwrap();
        assert!(session_fc.borrow().frame_needed());
    }

    #[test]
    fn session_flow_control_reset() {
        let (mut s, session_fc) = create_stream_session_flow_control();

        s.inbound_stream_frame(false, 0, &[0; SESSION_WINDOW / 2])
            .unwrap();
        assert!(!session_fc.borrow().frame_needed());

        s.reset(
            Error::NoError.code(),
            u64::try_from(SESSION_WINDOW).unwrap(),
        )
        .unwrap();
        assert!(session_fc.borrow().frame_needed());
    }

    fn check_fc<T: std::fmt::Debug>(fc: &ReceiverFlowControl<T>, consumed: u64, retired: u64) {
        assert_eq!(fc.consumed(), consumed);
        assert_eq!(fc.retired(), retired);
    }

    /// Test consuming the flow control in `RecvStreamState::Recv`
    #[test]
    fn fc_state_recv_1() {
        const SW: u64 = 1024;
        const SW_US: usize = 1024;
        let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW)));
        let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4);

        check_fc(&fc.borrow(), 0, 0);
        check_fc(s.fc().unwrap(), 0, 0);

        s.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap();

        check_fc(&fc.borrow(), SW / 4, 0);
        check_fc(s.fc().unwrap(), SW / 4, 0);
    }

    /// Test consuming the flow control in `RecvStreamState::Recv`
    /// with multiple streams
    #[test]
    fn fc_state_recv_2() {
        const SW: u64 = 1024;
        const SW_US: usize = 1024;
        let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW)));
        let mut s1 = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4);
        let mut s2 = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4);

        check_fc(&fc.borrow(), 0, 0);
        check_fc(s1.fc().unwrap(), 0, 0);
        check_fc(s2.fc().unwrap(), 0, 0);

        s1.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap();

        check_fc(&fc.borrow(), SW / 4, 0);
        check_fc(s1.fc().unwrap(), SW / 4, 0);
        check_fc(s2.fc().unwrap(), 0, 0);

        s2.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap();

        check_fc(&fc.borrow(), SW / 2, 0);
        check_fc(s1.fc().unwrap(), SW / 4, 0);
        check_fc(s2.fc().unwrap(), SW / 4, 0);
    }

    /// Test retiring the flow control in `RecvStreamState::Recv`
    /// with multiple streams
    #[test]
    fn fc_state_recv_3() {
        const SW: u64 = 1024;
        const SW_US: usize = 1024;
        let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW)));
        let mut s1 = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4);
        let mut s2 = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4);

        check_fc(&fc.borrow(), 0, 0);
        check_fc(s1.fc().unwrap(), 0, 0);
        check_fc(s2.fc().unwrap(), 0, 0);

        s1.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap();
        s2.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap();
        check_fc(&fc.borrow(), SW / 2, 0);
        check_fc(s1.fc().unwrap(), SW / 4, 0);
        check_fc(s2.fc().unwrap(), SW / 4, 0);

        // Read data
        let mut buf = [1; SW_US];
        assert_eq!(s1.read(&mut buf).unwrap(), (SW_US / 4, false));
        check_fc(&fc.borrow(), SW / 2, SW / 4);
        check_fc(s1.fc().unwrap(), SW / 4, SW / 4);
        check_fc(s2.fc().unwrap(), SW / 4, 0);

        assert_eq!(s2.read(&mut buf).unwrap(), (SW_US / 4, false));
        check_fc(&fc.borrow(), SW / 2, SW / 2);
        check_fc(s1.fc().unwrap(), SW / 4, SW / 4);
        check_fc(s2.fc().unwrap(), SW / 4, SW / 4);

        // Read when there is no more date to be read will not change fc.
        assert_eq!(s1.read(&mut buf).unwrap(), (0, false));
        check_fc(&fc.borrow(), SW / 2, SW / 2);
        check_fc(s1.fc().unwrap(), SW / 4, SW / 4);
        check_fc(s2.fc().unwrap(), SW / 4, SW / 4);

        // Receiving more data on a stream.
        s1.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4])
            .unwrap();
        check_fc(&fc.borrow(), SW * 3 / 4, SW / 2);
        check_fc(s1.fc().unwrap(), SW / 2, SW / 4);
        check_fc(s2.fc().unwrap(), SW / 4, SW / 4);

        // Read data
        assert_eq!(s1.read(&mut buf).unwrap(), (SW_US / 4, false));
        check_fc(&fc.borrow(), SW * 3 / 4, SW * 3 / 4);
        check_fc(s1.fc().unwrap(), SW / 2, SW / 2);
        check_fc(s2.fc().unwrap(), SW / 4, SW / 4);
    }

    /// Test consuming the flow control in `RecvStreamState::Recv` - duplicate data
    #[test]
    fn fc_state_recv_4() {
        const SW: u64 = 1024;
        const SW_US: usize = 1024;
        let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW)));
        let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4);

        check_fc(&fc.borrow(), 0, 0);
        check_fc(s.fc().unwrap(), 0, 0);

        s.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap();

        check_fc(&fc.borrow(), SW / 4, 0);
        check_fc(s.fc().unwrap(), SW / 4, 0);

        // Receiving duplicate frames (already consumed data) will not cause an error or
        // change fc.
        s.inbound_stream_frame(false, 0, &[0; SW_US / 8]).unwrap();
        check_fc(&fc.borrow(), SW / 4, 0);
        check_fc(s.fc().unwrap(), SW / 4, 0);
    }

    /// Test consuming the flow control in `RecvStreamState::Recv` - filling a gap in the
    /// data stream.
    #[test]
    fn fc_state_recv_5() {
        const SW: u64 = 1024;
        const SW_US: usize = 1024;
        let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW)));
        let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4);

        // Receive out of order data.
        s.inbound_stream_frame(false, SW / 8, &[0; SW_US / 8])
            .unwrap();
        check_fc(&fc.borrow(), SW / 4, 0);
        check_fc(s.fc().unwrap(), SW / 4, 0);

        // Filling in the gap will not change fc.
        s.inbound_stream_frame(false, 0, &[0; SW_US / 8]).unwrap();
        check_fc(&fc.borrow(), SW / 4, 0);
        check_fc(s.fc().unwrap(), SW / 4, 0);
    }

    /// Test consuming the flow control in `RecvStreamState::Recv` - receiving frame past
    /// the flow control will cause an error.
    #[test]
    fn fc_state_recv_6() {
        const SW: u64 = 1024;
        const SW_US: usize = 1024;
        let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW)));
        let mut s = create_stream_with_fc(Rc::clone(&fc), SW * 3 / 4);

        // Receiving frame past the flow control will cause an error.
        assert_eq!(
            s.inbound_stream_frame(false, 0, &[0; SW_US * 3 / 4 + 1]),
            Err(Error::FlowControlError)
        );
    }

    /// Test that the flow controls will send updates.
    #[test]
    fn fc_state_recv_7() {
        const SW: u64 = 1024;
        const SW_US: usize = 1024;
        let fc = Rc::new(RefCell::new(ReceiverFlowControl::new((), SW)));
        let mut s = create_stream_with_fc(Rc::clone(&fc), SW / 2);

        check_fc(&fc.borrow(), 0, 0);
        check_fc(s.fc().unwrap(), 0, 0);

        s.inbound_stream_frame(false, 0, &[0; SW_US / 4]).unwrap();
        let mut buf = [1; SW_US];
        assert_eq!(s.read(&mut buf).unwrap(), (SW_US / 4, false));
        check_fc(&fc.borrow(), SW / 4, SW / 4);
        check_fc(s.fc().unwrap(), SW / 4, SW / 4);

        // Still no fc update needed.
        assert!(!fc.borrow().frame_needed());
        assert!(!s.fc().unwrap().frame_needed());

        // Receive one more byte that will cause a fc update after it is read.
        s.inbound_stream_frame(false, SW / 4, &[0]).unwrap();
        check_fc(&fc.borrow(), SW / 4 + 1, SW / 4);
        check_fc(s.fc().unwrap(), SW / 4 + 1, SW / 4);
        // Only consuming data does not cause a fc update to be sent.
        assert!(!fc.borrow().frame_needed());
        assert!(!s.fc().unwrap().frame_needed());

        assert_eq!(s.read(&mut buf).unwrap(), (1, false));
        check_fc(&fc.borrow(), SW / 4 + 1, SW / 4 + 1);
        check_fc(s.fc().unwrap(), SW / 4 + 1, SW / 4 + 1);
        // Data are retired and the sttream fc will send an update.
        assert!(!fc.borrow().frame_needed());
        assert!(s.fc().unwrap().frame_needed());

        // Receive more data to increase fc further.
        s.inbound_stream_frame(false, SW / 4, &[0; SW_US / 4])
            .unwrap();
        assert_eq!(s.read(&mut buf).unwrap(), (SW_US / 4 - 1, false));
        check_fc(&fc.borrow(), SW / 2, SW / 2);
        check_fc(s.fc().unwrap(), SW / 2, SW / 2);
        assert!(!fc.borrow().frame_needed());
        assert!(s.fc().unwrap().frame_needed());

        // Write the fc update frame
        let mut builder = PacketBuilder::short(Encoder::new(), false, None::<&[u8]>);
--> --------------------

--> maximum size reached

--> --------------------

[ 0.56Quellennavigators  Projekt   ]