Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/third_party/rust/neqo-qpack/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 58 kB image not shown  

Quelle  encoder.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::collections::{HashMap, HashSet, VecDeque};

use neqo_common::{qdebug, qerror, qlog::NeqoQlog, qtrace, Header};
use neqo_transport::{Connection, Error as TransportError, StreamId};

use crate::{
    decoder_instructions::{DecoderInstruction, DecoderInstructionReader},
    encoder_instructions::EncoderInstruction,
    header_block::HeaderEncoder,
    qlog,
    qpack_send_buf::QpackData,
    reader::ReceiverConnWrapper,
    stats::Stats,
    table::{HeaderTable, LookupResult, ADDITIONAL_TABLE_ENTRY_SIZE},
    Error, QpackSettings, Res,
};

pub const QPACK_UNI_STREAM_TYPE_ENCODER: u64 = 0x2;

#[derive(Debug, PartialEq)]
enum LocalStreamState {
    NoStream,
    Uninitialized(StreamId),
    Initialized(StreamId),
}

impl LocalStreamState {
    pub const fn stream_id(&self) -> Option<StreamId> {
        match self {
            Self::NoStream => None,
            Self::Uninitialized(stream_id) | Self::Initialized(stream_id) => Some(*stream_id),
        }
    }
}

#[derive(Debug)]
pub struct QPackEncoder {
    table: HeaderTable,
    max_table_size: u64,
    max_entries: u64,
    instruction_reader: DecoderInstructionReader,
    local_stream: LocalStreamState,
    max_blocked_streams: u16,
    // Remember header blocks that are referring to dynamic table.
    // There can be multiple header blocks in one stream, headers, trailer, push stream request,
    // etc. This HashMap maps a stream ID to a list of header blocks. Each header block is a
    // list of referenced dynamic table entries.
    unacked_header_blocks: HashMap<StreamId, VecDeque<HashSet<u64>>>,
    blocked_stream_cnt: u16,
    use_huffman: bool,
    next_capacity: Option<u64>,
    stats: Stats,
}

impl QPackEncoder {
    #[must_use]
    pub fn new(qpack_settings: &QpackSettings, use_huffman: bool) -> Self {
        Self {
            table: HeaderTable::new(true),
            max_table_size: qpack_settings.max_table_size_encoder,
            max_entries: 0,
            instruction_reader: DecoderInstructionReader::new(),
            local_stream: LocalStreamState::NoStream,
            max_blocked_streams: 0,
            unacked_header_blocks: HashMap::new(),
            blocked_stream_cnt: 0,
            use_huffman,
            next_capacity: None,
            stats: Stats::default(),
        }
    }

    /// This function is use for setting encoders table max capacity. The value is received as
    /// a `SETTINGS_QPACK_MAX_TABLE_CAPACITY` setting parameter.
    ///
    /// # Errors
    ///
    /// `EncoderStream` if value is too big.
    /// `ChangeCapacity` if table capacity cannot be reduced.
    pub fn set_max_capacity(&mut self, cap: u64) -> Res<()> {
        if cap > (1 << 30) - 1 {
            return Err(Error::EncoderStream);
        }

        if cap == self.table.capacity() {
            return Ok(());
        }

        qdebug!(
            [self],
            "Set max capacity to new capacity:{} old:{} max_table_size={}.",
            cap,
            self.table.capacity(),
            self.max_table_size,
        );

        let new_cap = std::cmp::min(self.max_table_size, cap);
        // we also set our table to the max allowed.
        self.change_capacity(new_cap);
        Ok(())
    }

    /// This function is use for setting encoders max blocked streams. The value is received as
    /// a `SETTINGS_QPACK_BLOCKED_STREAMS` setting parameter.
    ///
    /// # Errors
    ///
    /// `EncoderStream` if value is too big.
    pub fn set_max_blocked_streams(&mut self, blocked_streams: u64) -> Res<()> {
        self.max_blocked_streams = u16::try_from(blocked_streams).or(Err(Error::EncoderStream))?;
        Ok(())
    }

    /// Reads decoder instructions.
    ///
    /// # Errors
    ///
    /// May return: `ClosedCriticalStream` if stream has been closed or `DecoderStream`
    /// in case of any other transport error.
    pub fn receive(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
        self.read_instructions(conn, stream_id)
            .map_err(|e| map_error(&e))
    }

    fn read_instructions(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
        qdebug!([self], "read a new instruction");
        loop {
            let mut recv = ReceiverConnWrapper::new(conn, stream_id);
            match self.instruction_reader.read_instructions(&mut recv) {
                Ok(instruction) => self.call_instruction(instruction, conn.qlog_mut())?,
                Err(Error::NeedMoreData) => break Ok(()),
                Err(e) => break Err(e),
            }
        }
    }

    fn recalculate_blocked_streams(&mut self) {
        let acked_inserts_cnt = self.table.get_acked_inserts_cnt();
        self.blocked_stream_cnt = 0;
        for hb_list in self.unacked_header_blocks.values_mut() {
            debug_assert!(!hb_list.is_empty());
            if hb_list.iter().flatten().any(|e| *e >= acked_inserts_cnt) {
                self.blocked_stream_cnt += 1;
            }
        }
    }

    #[allow(clippy::map_err_ignore)]
    fn insert_count_instruction(&mut self, increment: u64) -> Res<()> {
        self.table
            .increment_acked(increment)
            .map_err(|_| Error::DecoderStream)?;
        self.recalculate_blocked_streams();
        Ok(())
    }

    fn header_ack(&mut self, stream_id: StreamId) {
        self.stats.header_acks_recv += 1;
        let mut new_acked = self.table.get_acked_inserts_cnt();
        if let Some(hb_list) = self.unacked_header_blocks.get_mut(&stream_id) {
            if let Some(ref_list) = hb_list.pop_back() {
                for iter in ref_list {
                    self.table.remove_ref(iter);
                    if iter >= new_acked {
                        new_acked = iter + 1;
                    }
                }
            } else {
                debug_assert!(false, "We should have at least one header block.");
            }
            if hb_list.is_empty() {
                self.unacked_header_blocks.remove(&stream_id);
            }
        }
        if new_acked > self.table.get_acked_inserts_cnt() {
            self.insert_count_instruction(new_acked - self.table.get_acked_inserts_cnt())
                .expect("This should neve happen");
        }
    }

    fn stream_cancellation(&mut self, stream_id: StreamId) {
        self.stats.stream_cancelled_recv += 1;
        let mut was_blocker = false;
        if let Some(mut hb_list) = self.unacked_header_blocks.remove(&stream_id) {
            debug_assert!(!hb_list.is_empty());
            while let Some(ref_list) = hb_list.pop_front() {
                for iter in ref_list {
                    self.table.remove_ref(iter);
                    was_blocker = was_blocker || (iter >= self.table.get_acked_inserts_cnt());
                }
            }
        }
        if was_blocker {
            debug_assert!(self.blocked_stream_cnt > 0);
            self.blocked_stream_cnt -= 1;
        }
    }

    fn call_instruction(&mut self, instruction: DecoderInstruction, qlog: &NeqoQlog) -> Res<()> {
        qdebug!([self], "call intruction {:?}", instruction);
        match instruction {
            DecoderInstruction::InsertCountIncrement { increment } => {
                qlog::qpack_read_insert_count_increment_instruction(
                    qlog,
                    increment,
                    &increment.to_be_bytes(),
                );

                self.insert_count_instruction(increment)
            }
            DecoderInstruction::HeaderAck { stream_id } => {
                self.header_ack(stream_id);
                Ok(())
            }
            DecoderInstruction::StreamCancellation { stream_id } => {
                self.stream_cancellation(stream_id);
                Ok(())
            }
            DecoderInstruction::NoInstruction => Ok(()),
        }
    }

    /// Inserts a new entry into a table and sends the corresponding instruction to a peer. An entry
    /// is added only if it is possible to send the corresponding instruction immediately, i.e.
    /// the encoder stream is not blocked by the flow control (or stream internal buffer(this is
    /// very unlikely)).
    ///
    /// # Errors
    ///
    /// `EncoderStreamBlocked` if the encoder stream is blocked by the flow control.
    /// `DynamicTableFull` if the dynamic table does not have enough space for the entry.
    /// The function can return transport errors: `InvalidStreamId`, `InvalidInput` and
    /// `FinalSizeError`.
    ///
    /// # Panics
    ///
    /// When the insertion fails (it should not).
    pub fn send_and_insert(
        &mut self,
        conn: &mut Connection,
        name: &[u8],
        value: &[u8],
    ) -> Res<u64> {
        qdebug!([self], "insert {:?} {:?}.", name, value);

        let entry_size = name.len() + value.len() + ADDITIONAL_TABLE_ENTRY_SIZE;

        if !self.table.insert_possible(entry_size) {
            return Err(Error::DynamicTableFull);
        }

        let mut buf = QpackData::default();
        EncoderInstruction::InsertWithNameLiteral { name, value }
            .marshal(&mut buf, self.use_huffman);

        let stream_id = self.local_stream.stream_id().ok_or(Error::Internal)?;

        let sent = conn
            .stream_send_atomic(stream_id, &buf)
            .map_err(|e| map_stream_send_atomic_error(&e))?;
        if !sent {
            return Err(Error::EncoderStreamBlocked);
        }

        self.stats.dynamic_table_inserts += 1;

        match self.table.insert(name, value) {
            Ok(inx) => Ok(inx),
            Err(e) => {
                debug_assert!(false);
                Err(e)
            }
        }
    }

    fn change_capacity(&mut self, value: u64) {
        qdebug!([self], "change capacity: {}", value);
        self.next_capacity = Some(value);
    }

    fn maybe_send_change_capacity(
        &mut self,
        conn: &mut Connection,
        stream_id: StreamId,
    ) -> Res<()> {
        if let Some(cap) = self.next_capacity {
            // Check if it is possible to reduce the capacity, e.g. if enough space can be make free
            // for the reduction.
            if cap < self.table.capacity() && !self.table.can_evict_to(cap) {
                return Err(Error::DynamicTableFull);
            }
            let mut buf = QpackData::default();
            EncoderInstruction::Capacity { value: cap }.marshal(&mut buf, self.use_huffman);
            if !conn.stream_send_atomic(stream_id, &buf)? {
                return Err(Error::EncoderStreamBlocked);
            }
            if self.table.set_capacity(cap).is_err() {
                debug_assert!(
                    false,
                    "can_evict_to should have checked and make sure this operation is possible"
                );
                return Err(Error::InternalError);
            }
            self.max_entries = cap / 32;
            self.next_capacity = None;
        }
        Ok(())
    }

    /// Sends any qpack encoder instructions.
    ///
    /// # Errors
    ///
    ///   returns `EncoderStream` in case of an error.
    pub fn send_encoder_updates(&mut self, conn: &mut Connection) -> Res<()> {
        match self.local_stream {
            LocalStreamState::NoStream => {
                qerror!("Send call but there is no stream yet.");
                Ok(())
            }
            LocalStreamState::Uninitialized(stream_id) => {
                let mut buf = QpackData::default();
                buf.encode_varint(QPACK_UNI_STREAM_TYPE_ENCODER);
                if !conn.stream_send_atomic(stream_id, &buf[..])? {
                    return Err(Error::EncoderStreamBlocked);
                }
                self.local_stream = LocalStreamState::Initialized(stream_id);
                self.maybe_send_change_capacity(conn, stream_id)
            }
            LocalStreamState::Initialized(stream_id) => {
                self.maybe_send_change_capacity(conn, stream_id)
            }
        }
    }

    fn is_stream_blocker(&self, stream_id: StreamId) -> bool {
        self.unacked_header_blocks
            .get(&stream_id)
            .is_some_and(|hb_list| {
                debug_assert!(!hb_list.is_empty());
                hb_list
                    .iter()
                    .flatten()
                    .max()
                    .is_some_and(|max_ref| *max_ref >= self.table.get_acked_inserts_cnt())
            })
    }

    /// Encodes headers
    ///
    /// # Errors
    ///
    /// `ClosedCriticalStream` if the encoder stream is closed.
    /// `InternalError` if an unexpected error occurred.
    ///
    /// # Panics
    ///
    /// If there is a programming error.
    pub fn encode_header_block(
        &mut self,
        conn: &mut Connection,
        h: &[Header],
        stream_id: StreamId,
    ) -> HeaderEncoder {
        qdebug!([self], "encoding headers.");

        // Try to send capacity instructions if present.
        // This code doesn't try to deal with errors, it just tries
        // to write to the encoder stream AND if it can't uses
        // literal instructions.
        // The errors can be:
        //   1) `EncoderStreamBlocked` - this is an error that can occur.
        //   2) `InternalError` - this is unexpected error.
        //   3) `ClosedCriticalStream` - this is error that should close the HTTP/3 session.
        // The last 2 errors are ignored here and will be picked up
        // by the main loop.
        let mut encoder_blocked = self.send_encoder_updates(conn).is_err();

        let mut encoded_h =
            HeaderEncoder::new(self.table.base(), self.use_huffman, self.max_entries);

        let stream_is_blocker = self.is_stream_blocker(stream_id);
        let can_block = self.blocked_stream_cnt < self.max_blocked_streams || stream_is_blocker;

        let mut ref_entries = HashSet::new();

        for iter in h {
            let name = iter.name().as_bytes().to_vec();
            let value = iter.value().as_bytes().to_vec();
            qtrace!("encoding {:x?} {:x?}.", name, value);

            if let Some(LookupResult {
                index,
                static_table,
                value_matches,
            }) = self.table.lookup(&name, &value, can_block)
            {
                qtrace!(
                    [self],
                    "found a {} entry, value-match={}",
                    if static_table { "static" } else { "dynamic" },
                    value_matches
                );
                if value_matches {
                    if static_table {
                        encoded_h.encode_indexed_static(index);
                    } else {
                        encoded_h.encode_indexed_dynamic(index);
                    }
                } else {
                    encoded_h.encode_literal_with_name_ref(static_table, index, &value);
                }
                if !static_table && ref_entries.insert(index) {
                    self.table.add_ref(index);
                }
            } else if can_block && !encoder_blocked {
                // Insert using an InsertWithNameLiteral instruction. This entry name does not match
                // any name in the tables therefore we cannot use any other
                // instruction.
                if let Ok(index) = self.send_and_insert(conn, &name, &value) {
                    encoded_h.encode_indexed_dynamic(index);
                    ref_entries.insert(index);
                    self.table.add_ref(index);
                } else {
                    // This code doesn't try to deal with errors, it just tries
                    // to write to the encoder stream AND if it can't uses
                    // literal instructions.
                    // The errors can be:
                    //   1) `EncoderStreamBlocked` - this is an error that can occur.
                    //   2) `DynamicTableFull` - this is an error that can occur.
                    //   3) `InternalError` - this is unexpected error.
                    //   4) `ClosedCriticalStream` - this is error that should close the HTTP/3
                    //      session.
                    // The last 2 errors are ignored here and will be picked up
                    // by the main loop.
                    // As soon as one of the instructions cannot be written or the table is full, do
                    // not try again.
                    encoder_blocked = true;
                    encoded_h.encode_literal_with_name_literal(&name, &value);
                }
            } else {
                encoded_h.encode_literal_with_name_literal(&name, &value);
            }
        }

        encoded_h.encode_header_block_prefix();

        if !stream_is_blocker {
            // The streams was not a blocker, check if the stream is a blocker now.
            if let Some(max_ref) = ref_entries.iter().max() {
                if *max_ref >= self.table.get_acked_inserts_cnt() {
                    debug_assert!(self.blocked_stream_cnt <= self.max_blocked_streams);
                    self.blocked_stream_cnt += 1;
                }
            }
        }

        if !ref_entries.is_empty() {
            self.unacked_header_blocks
                .entry(stream_id)
                .or_default()
                .push_front(ref_entries);
            self.stats.dynamic_table_references += 1;
        }
        encoded_h
    }

    /// Encoder stream has been created. Add the stream id.
    ///
    /// # Panics
    ///
    /// If a stream has already been added.
    pub fn add_send_stream(&mut self, stream_id: StreamId) {
        if self.local_stream == LocalStreamState::NoStream {
            self.local_stream = LocalStreamState::Uninitialized(stream_id);
        } else {
            panic!("Adding multiple local streams");
        }
    }

    #[must_use]
    pub fn stats(&self) -> Stats {
        self.stats.clone()
    }

    #[must_use]
    pub const fn local_stream_id(&self) -> Option<StreamId> {
        self.local_stream.stream_id()
    }

    #[cfg(test)]
    const fn blocked_stream_cnt(&self) -> u16 {
        self.blocked_stream_cnt
    }
}

impl ::std::fmt::Display for QPackEncoder {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "QPackEncoder")
    }
}

fn map_error(err: &Error) -> Error {
    if *err == Error::ClosedCriticalStream {
        Error::ClosedCriticalStream
    } else {
        Error::DecoderStream
    }
}

fn map_stream_send_atomic_error(err: &TransportError) -> Error {
    match err {
        TransportError::InvalidStreamId | TransportError::FinalSizeError => {
            Error::ClosedCriticalStream
        }
        _ => {
            debug_assert!(false, "Unexpected error");
            Error::InternalError
        }
    }
}

#[cfg(test)]
mod tests {
    use std::mem;

    use neqo_transport::{ConnectionParameters, StreamId, StreamType};
    use test_fixture::{default_client, default_server, handshake, new_server, now, DEFAULT_ALPN};

    use super::{Connection, Error, Header, QPackEncoder, Res};
    use crate::QpackSettings;

    struct TestEncoder {
        encoder: QPackEncoder,
        send_stream_id: StreamId,
        recv_stream_id: StreamId,
        conn: Connection,
        peer_conn: Connection,
    }

    impl TestEncoder {
        pub fn change_capacity(&mut self, capacity: u64) -> Res<()> {
            self.encoder.set_max_capacity(capacity).unwrap();
            // We will try to really change the table only when we send the change capacity
            // instruction.
            self.encoder.send_encoder_updates(&mut self.conn)
        }

        pub fn insert(&mut self, header: &[u8], value: &[u8], inst: &[u8]) {
            let res = self.encoder.send_and_insert(&mut self.conn, header, value);
            assert!(res.is_ok());
            self.send_instructions(inst);
        }

        pub fn encode_header_block(
            &mut self,
            stream_id: StreamId,
            headers: &[Header],
            expected_encoding: &[u8],
            inst: &[u8],
        ) {
            let buf = self
                .encoder
                .encode_header_block(&mut self.conn, headers, stream_id);
            assert_eq!(&buf[..], expected_encoding);
            self.send_instructions(inst);
        }

        pub fn send_instructions(&mut self, encoder_instruction: &[u8]) {
            self.encoder.send_encoder_updates(&mut self.conn).unwrap();
            let out = self.conn.process_output(now());
            let out2 = self.peer_conn.process(out.dgram(), now());
            mem::drop(self.conn.process(out2.dgram(), now()));
            let mut buf = [0_u8; 100];
            let (amount, fin) = self
                .peer_conn
                .stream_recv(self.send_stream_id, &mut buf)
                .unwrap();
            assert!(!fin);
            assert_eq!(buf[..amount], encoder_instruction[..]);
        }
    }

    fn connect_generic(huffman: bool, max_data: Option<u64>) -> TestEncoder {
        let mut conn = default_client();
        let mut peer_conn = max_data.map_or_else(default_server, |max| {
            new_server(
                DEFAULT_ALPN,
                ConnectionParameters::default()
                    .max_stream_data(StreamType::UniDi, true, max)
                    .max_stream_data(StreamType::BiDi, true, max)
                    .max_stream_data(StreamType::BiDi, false, max),
            )
        });
        handshake(&mut conn, &mut peer_conn);

        // create a stream
        let recv_stream_id = peer_conn.stream_create(StreamType::UniDi).unwrap();
        let send_stream_id = conn.stream_create(StreamType::UniDi).unwrap();

        // create an encoder
        let mut encoder = QPackEncoder::new(
            &QpackSettings {
                max_table_size_encoder: 1500,
                max_table_size_decoder: 0,
                max_blocked_streams: 0,
            },
            huffman,
        );
        encoder.add_send_stream(send_stream_id);

        TestEncoder {
            encoder,
            send_stream_id,
            recv_stream_id,
            conn,
            peer_conn,
        }
    }

    fn connect(huffman: bool) -> TestEncoder {
        connect_generic(huffman, None)
    }

    fn connect_flow_control(max_data: u64) -> TestEncoder {
        connect_generic(true, Some(max_data))
    }

    fn recv_instruction(encoder: &mut TestEncoder, decoder_instruction: &[u8]) {
        encoder
            .peer_conn
            .stream_send(encoder.recv_stream_id, decoder_instruction)
            .unwrap();
        let out = encoder.peer_conn.process_output(now());
        mem::drop(encoder.conn.process(out.dgram(), now()));
        assert!(encoder
            .encoder
            .read_instructions(&mut encoder.conn, encoder.recv_stream_id)
            .is_ok());
    }

    const CAP_INSTRUCTION_200: &[u8] = &[0x02, 0x3f, 0xa9, 0x01];
    const CAP_INSTRUCTION_60: &[u8] = &[0x02, 0x3f, 0x1d];
    const CAP_INSTRUCTION_1000: &[u8] = &[0x02, 0x3f, 0xc9, 0x07];
    const CAP_INSTRUCTION_1500: &[u8] = &[0x02, 0x3f, 0xbd, 0x0b];

    const HEADER_CONTENT_LENGTH: &[u8] = &[
        0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
    ];
    const VALUE_1: &[u8] = &[0x31, 0x32, 0x33, 0x34];
    const VALUE_2: &[u8] = &[0x31, 0x32, 0x33, 0x34, 0x35];

    // HEADER_CONTENT_LENGTH and VALUE_1 encoded by instruction insert_with_name_literal.
    const HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL: &[u8] = &[
        0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
        0x04, 0x31, 0x32, 0x33, 0x34,
    ];

    // HEADER_CONTENT_LENGTH and VALUE_2 encoded by instruction insert_with_name_literal.
    const HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL: &[u8] = &[
        0x4e, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x2d, 0x6c, 0x65, 0x6e, 0x67, 0x74, 0x68,
        0x05, 0x31, 0x32, 0x33, 0x34, 0x35,
    ];

    // Indexed Header Field that refers to the first entry in the dynamic table.
    const ENCODE_INDEXED_REF_DYNAMIC: &[u8] = &[0x02, 0x00, 0x80];

    const STREAM_1: StreamId = StreamId::new(1);
    const STREAM_2: StreamId = StreamId::new(2);
    const HEADER_ACK_STREAM_ID_1: &[u8] = &[0x81];
    const HEADER_ACK_STREAM_ID_2: &[u8] = &[0x82];
    const STREAM_CANCELED_ID_1: &[u8] = &[0x41];

    // test insert_with_name_literal which fails because there is not enough space in the table
    #[test]
    fn insert_with_name_literal_1() {
        let mut encoder = connect(false);

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
        assert_eq!(Error::DynamicTableFull, res.unwrap_err());
        encoder.send_instructions(&[0x02]);
    }

    // test insert_with_name_literal - succeeds
    #[test]
    fn insert_with_name_literal_2() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());
        // test the change capacity instruction.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);
    }

    #[test]
    fn change_capacity() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());
        encoder.send_instructions(CAP_INSTRUCTION_200);
    }

    struct TestElement {
        pub headers: Vec<Header>,
        pub header_block: &'static [u8],
        pub encoder_inst: &'static [u8],
    }

    #[test]
    fn header_block_encoder_non() {
        let test_cases: [TestElement; 6] = [
            // test a header with ref to static - encode_indexed
            TestElement {
                headers: vec![Header::new(":method", "GET")],
                header_block: &[0x00, 0x00, 0xd1],
                encoder_inst: &[],
            },
            // test encode_literal_with_name_ref
            TestElement {
                headers: vec![Header::new(":path", "/somewhere")],
                header_block: &[
                    0x00, 0x00, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65, 0x72,
                    0x65,
                ],
                encoder_inst: &[],
            },
            // test adding a new header and encode_post_base_index, also test
            // fix_header_block_prefix
            TestElement {
                headers: vec![Header::new("my-header", "my-value")],
                header_block: &[0x02, 0x80, 0x10],
                encoder_inst: &[
                    0x49, 0x6d, 0x79, 0x2d, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x08, 0x6d, 0x79,
                    0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65,
                ],
            },
            // test encode_indexed with a ref to dynamic table.
            TestElement {
                headers: vec![Header::new("my-header", "my-value")],
                header_block: ENCODE_INDEXED_REF_DYNAMIC,
                encoder_inst: &[],
            },
            // test encode_literal_with_name_ref.
            TestElement {
                headers: vec![Header::new("my-header", "my-value2")],
                header_block: &[
                    0x02, 0x00, 0x40, 0x09, 0x6d, 0x79, 0x2d, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32,
                ],
                encoder_inst: &[],
            },
            // test multiple headers
            TestElement {
                headers: vec![
                    Header::new(":method", "GET"),
                    Header::new(":path", "/somewhere"),
                    Header::new(":authority", "example.com"),
                    Header::new(":scheme", "https"),
                ],
                header_block: &[
                    0x00, 0x01, 0xd1, 0x51, 0x0a, 0x2f, 0x73, 0x6f, 0x6d, 0x65, 0x77, 0x68, 0x65,
                    0x72, 0x65, 0x50, 0x0b, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63,
                    0x6f, 0x6d, 0xd7,
                ],
                encoder_inst: &[],
            },
        ];

        let mut encoder = connect(false);

        encoder.encoder.set_max_blocked_streams(100).unwrap();
        encoder.encoder.set_max_capacity(200).unwrap();

        // test the change capacity instruction.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        for t in &test_cases {
            let buf = encoder
                .encoder
                .encode_header_block(&mut encoder.conn, &t.headers, STREAM_1);
            assert_eq!(&buf[..], t.header_block);
            encoder.send_instructions(t.encoder_inst);
        }
    }

    #[test]
    fn header_block_encoder_huffman() {
        let test_cases: [TestElement; 6] = [
            // test a header with ref to static - encode_indexed
            TestElement {
                headers: vec![Header::new(":method", "GET")],
                header_block: &[0x00, 0x00, 0xd1],
                encoder_inst: &[],
            },
            // test encode_literal_with_name_ref
            TestElement {
                headers: vec![Header::new(":path", "/somewhere")],
                header_block: &[
                    0x00, 0x00, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85,
                ],
                encoder_inst: &[],
            },
            // test adding a new header and encode_post_base_index, also test
            // fix_header_block_prefix
            TestElement {
                headers: vec![Header::new("my-header", "my-value")],
                header_block: &[0x02, 0x80, 0x10],
                encoder_inst: &[
                    0x67, 0xa7, 0xd2, 0xd3, 0x94, 0x72, 0x16, 0xcf, 0x86, 0xa7, 0xd2, 0xdd, 0xc7,
                    0x45, 0xa5,
                ],
            },
            // test encode_indexed with a ref to dynamic table.
            TestElement {
                headers: vec![Header::new("my-header", "my-value")],
                header_block: ENCODE_INDEXED_REF_DYNAMIC,
                encoder_inst: &[],
            },
            // test encode_literal_with_name_ref.
            TestElement {
                headers: vec![Header::new("my-header", "my-value2")],
                header_block: &[
                    0x02, 0x00, 0x40, 0x87, 0xa7, 0xd2, 0xdd, 0xc7, 0x45, 0xa5, 0x17,
                ],
                encoder_inst: &[],
            },
            // test multiple headers
            TestElement {
                headers: vec![
                    Header::new(":method", "GET"),
                    Header::new(":path", "/somewhere"),
                    Header::new(":authority", "example.com"),
                    Header::new(":scheme", "https"),
                ],
                header_block: &[
                    0x00, 0x01, 0xd1, 0x51, 0x87, 0x61, 0x07, 0xa4, 0xbe, 0x27, 0x2d, 0x85, 0x50,
                    0x88, 0x2f, 0x91, 0xd3, 0x5d, 0x05, 0x5c, 0x87, 0xa7, 0xd7,
                ],
                encoder_inst: &[],
            },
        ];

        let mut encoder = connect(true);

        encoder.encoder.set_max_blocked_streams(100).unwrap();
        encoder.encoder.set_max_capacity(200).unwrap();

        // test the change capacity instruction.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        for t in &test_cases {
            let buf = encoder
                .encoder
                .encode_header_block(&mut encoder.conn, &t.headers, STREAM_1);
            assert_eq!(&buf[..], t.header_block);
            encoder.send_instructions(t.encoder_inst);
        }
    }

    // Test inserts block on waiting for an insert count increment.
    #[test]
    fn insertion_blocked_on_insert_count_feedback() {
        let mut encoder = connect(false);

        encoder.encoder.set_max_capacity(60).unwrap();

        // test the change capacity instruction.
        encoder.send_instructions(CAP_INSTRUCTION_60);

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);

        // insert "content-length: 12345 which will fail because the ntry in the table cannot be
        // evicted.
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
        assert!(res.is_err());
        encoder.send_instructions(&[]);

        // receive an insert count increment.
        recv_instruction(&mut encoder, &[0x01]);

        // insert "content-length: 12345 again it will succeed.
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
    }

    // Test inserts block on waiting for acks
    // test the table insertion is blocked:
    // 0 - waiting for a header ack
    // 2 - waiting for a stream cancel.
    fn test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(wait: u8) {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(60).is_ok());
        // test the change capacity instruction.
        encoder.send_instructions(CAP_INSTRUCTION_60);

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);
        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);

        // receive an insert count increment.
        recv_instruction(&mut encoder, &[0x01]);

        // send a header block
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "1234")],
            STREAM_1,
        );
        assert_eq!(&buf[..], ENCODE_INDEXED_REF_DYNAMIC);
        encoder.send_instructions(&[]);

        // insert "content-length: 12345 which will fail because the entry in the table cannot be
        // evicted
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
        assert!(res.is_err());
        encoder.send_instructions(&[]);

        if wait == 0 {
            // receive a header_ack.
            recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);
        } else {
            // receive a stream canceled
            recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);
        }

        // insert "content-length: 12345 again it will succeed.
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);
        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);
    }

    #[test]
    fn header_ack() {
        test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(0);
    }

    #[test]
    fn stream_canceled() {
        test_insertion_blocked_on_waiting_for_header_ack_or_stream_cancel(1);
    }

    fn assert_is_index_to_dynamic(buf: &[u8]) {
        assert_eq!(buf[2] & 0xc0, 0x80);
    }

    fn assert_is_index_to_dynamic_post(buf: &[u8]) {
        assert_eq!(buf[2] & 0xf0, 0x10);
    }

    fn assert_is_index_to_static_name_only(buf: &[u8]) {
        assert_eq!(buf[2] & 0xf0, 0x50);
    }

    fn assert_is_literal_value_literal_name(buf: &[u8]) {
        assert_eq!(buf[2] & 0xf0, 0x20);
    }

    #[test]
    fn max_block_streams1() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(60).is_ok());

        // change capacity to 60.
        encoder.send_instructions(CAP_INSTRUCTION_60);

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);

        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);

        encoder.encoder.set_max_blocked_streams(1).unwrap();

        // send a header block, it refers to unacked entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "1234")],
            STREAM_1,
        );
        assert_is_index_to_dynamic(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        encoder.send_instructions(&[]);

        // The next one will not use the dynamic entry because it is exceeding the
        // max_blocked_streams limit.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "1234")],
            StreamId::new(2),
        );
        assert_is_index_to_static_name_only(&buf);

        encoder.send_instructions(&[]);
        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // another header block to already blocked stream can still use the entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "1234")],
            STREAM_1,
        );
        assert_is_index_to_dynamic(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
    }

    #[test]
    fn max_block_streams2() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());

        // change capacity to 200.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);

        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);

        // insert "content-length: 12345
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_2);

        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_2_NAME_LITERAL);

        encoder.encoder.set_max_blocked_streams(1).unwrap();

        // send a header block, it refers to unacked entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "1234")],
            STREAM_1,
        );
        assert_is_index_to_dynamic(&buf);

        // encode another header block for the same stream that will refer to the second entry
        // in the dynamic table.
        // This should work because the stream is already a blocked stream
        // send a header block, it refers to unacked entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "12345")],
            STREAM_1,
        );
        assert_is_index_to_dynamic(&buf);
    }

    #[test]
    fn max_block_streams3() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());

        // change capacity to 200.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        encoder.encoder.set_max_blocked_streams(1).unwrap();

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);

        // send a header block, that creates an new entry and refers to it.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // The next one will not create a new entry because the encoder is on max_blocked_streams
        // limit.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name2", "value2")],
            STREAM_2,
        );
        assert_is_literal_value_literal_name(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // another header block to already blocked stream can still create a new entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name2", "value2")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
    }

    #[test]
    fn max_block_streams4() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());

        // change capacity to 200.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        encoder.encoder.set_max_blocked_streams(1).unwrap();

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);

        // send a header block, that creates an new entry and refers to it.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // another header block to already blocked stream can still create a new entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name2", "value2")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // receive a header_ack for the first header block.
        recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);

        // The stream is still blocking because the second header block is not acked.
        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
    }

    #[test]
    fn max_block_streams5() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());

        // change capacity to 200.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        encoder.encoder.set_max_blocked_streams(1).unwrap();

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);

        // send a header block, that creates an new entry and refers to it.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // another header block to already blocked stream can still create a new entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_1,
        );
        assert_is_index_to_dynamic(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // receive a header_ack for the first header block.
        recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);

        // The stream is not blocking anymore because header ack also acks the instruction.
        assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
    }

    #[test]
    fn max_block_streams6() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());

        // change capacity to 200.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        encoder.encoder.set_max_blocked_streams(2).unwrap();

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);

        // send a header block, that creates an new entry and refers to it.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // header block for the next stream will create an new entry as well.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name2", "value2")],
            STREAM_2,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);

        // receive a header_ack for the second header block. This will ack the first as well
        recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_2);

        // The stream is not blocking anymore because header ack also acks the instruction.
        assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);
    }

    #[test]
    fn max_block_streams7() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());

        // change capacity to 200.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        encoder.encoder.set_max_blocked_streams(2).unwrap();

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);

        // send a header block, that creates an new entry and refers to it.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // header block for the next stream will create an new entry as well.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_2,
        );
        assert_is_index_to_dynamic(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);

        // receive a stream cancel for the first stream.
        // This will remove the first stream as blocking but it will not mark the instruction as
        // acked. and the second steam will still be blocking.
        recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);

        // The stream is not blocking anymore because header ack also acks the instruction.
        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
    }

    #[test]
    fn max_block_stream8() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(200).is_ok());

        // change capacity to 200.
        encoder.send_instructions(CAP_INSTRUCTION_200);

        encoder.encoder.set_max_blocked_streams(2).unwrap();

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 0);

        // send a header block, that creates an new entry and refers to it.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);

        // header block for the next stream will refer to the same entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name1", "value1")],
            STREAM_2,
        );
        assert_is_index_to_dynamic(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);

        // send another header block on stream 1.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("name2", "value2")],
            STREAM_1,
        );
        assert_is_index_to_dynamic_post(&buf);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 2);

        // stream 1 is block on entries 1 and 2; stream 2 is block only on 1.
        // receive an Insert Count Increment for the first entry.
        // After that only stream 1 will be blocking.
        recv_instruction(&mut encoder, &[0x01]);

        assert_eq!(encoder.encoder.blocked_stream_cnt(), 1);
    }

    #[test]
    fn dynamic_table_can_evict1() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(60).is_ok());

        // change capacity to 60.
        encoder.send_instructions(CAP_INSTRUCTION_60);

        encoder.encoder.set_max_blocked_streams(2).unwrap();

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);

        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);

        // send a header block, it refers to unacked entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "1234")],
            STREAM_1,
        );
        assert_is_index_to_dynamic(&buf);

        // trying to evict the entry will failed.
        assert!(encoder.change_capacity(10).is_err());

        // receive an Insert Count Increment for the entry.
        recv_instruction(&mut encoder, &[0x01]);

        // trying to evict the entry will failed. The stream is still referring to it.
        assert!(encoder.change_capacity(10).is_err());

        // receive a header_ack for the header block.
        recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);

        // now entry can be evicted.
        assert!(encoder.change_capacity(10).is_ok());
    }

    #[test]
    fn dynamic_table_can_evict2() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(60).is_ok());

        // change capacity to 60.
        encoder.send_instructions(CAP_INSTRUCTION_60);

        encoder.encoder.set_max_blocked_streams(2).unwrap();

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);

        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);

        // send a header block, it refers to unacked entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "1234")],
            STREAM_1,
        );
        assert_is_index_to_dynamic(&buf);

        // trying to evict the entry will failed.
        assert!(encoder.change_capacity(10).is_err());

        // receive an Insert Count Increment for the entry.
        recv_instruction(&mut encoder, &[0x01]);

        // trying to evict the entry will failed. The stream is still referring to it.
        assert!(encoder.change_capacity(10).is_err());

        // receive a stream cancelled.
        recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);

        // now entry can be evicted.
        assert!(encoder.change_capacity(10).is_ok());
    }

    #[test]
    fn dynamic_table_can_evict3() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(60).is_ok());

        // change capacity to 60.
        encoder.send_instructions(CAP_INSTRUCTION_60);

        encoder.encoder.set_max_blocked_streams(2).unwrap();

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);

        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);

        // trying to evict the entry will failed, because the entry is not acked.
        assert!(encoder.change_capacity(10).is_err());

        // receive an Insert Count Increment for the entry.
        recv_instruction(&mut encoder, &[0x01]);

        // now entry can be evicted.
        assert!(encoder.change_capacity(10).is_ok());
    }

    #[test]
    fn dynamic_table_can_evict4() {
        let mut encoder = connect(false);

        assert!(encoder.encoder.set_max_capacity(60).is_ok());

        // change capacity to 60.
        encoder.send_instructions(CAP_INSTRUCTION_60);

        encoder.encoder.set_max_blocked_streams(2).unwrap();

        // insert "content-length: 1234
        let res =
            encoder
                .encoder
                .send_and_insert(&mut encoder.conn, HEADER_CONTENT_LENGTH, VALUE_1);

        assert!(res.is_ok());
        encoder.send_instructions(HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL);

        // send a header block, it refers to unacked entry.
        let buf = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[Header::new("content-length", "1234")],
            STREAM_1,
        );
        assert_is_index_to_dynamic(&buf);

        // trying to evict the entry will failed. The stream is still referring to it and
        // entry is not acked.
        assert!(encoder.change_capacity(10).is_err());

        // receive a header_ack for the header block. This will also ack the instruction.
        recv_instruction(&mut encoder, HEADER_ACK_STREAM_ID_1);

        // now entry can be evicted.
        assert!(encoder.change_capacity(10).is_ok());
    }

    #[test]
    fn encoder_flow_controlled_blocked() {
        const SMALL_MAX_DATA: u64 = 20;
        const ONE_INSTRUCTION_1: &[u8] = &[
            0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x7f, 0x83, 0x8, 0x99, 0x6b,
        ];
        const ONE_INSTRUCTION_2: &[u8] = &[
            0x67, 0x41, 0xe9, 0x2a, 0x67, 0x35, 0x53, 0x37, 0x83, 0x8, 0x99, 0x6b,
        ];

        let mut encoder = connect_flow_control(SMALL_MAX_DATA);

        // change capacity to 1000 and max_block streams to 20.
        encoder.encoder.set_max_blocked_streams(20).unwrap();
        assert!(encoder.encoder.set_max_capacity(1000).is_ok());
        encoder.send_instructions(CAP_INSTRUCTION_1000);

        // Encode a header block with 2 headers. The first header will be added to the dynamic
        // table. The second will not be added to the dynamic table, because the
        // corresponding instruction cannot be written immediately due to the flow control
        // limit.
        let buf1 = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[
                Header::new("something", "1234"),
                Header::new("something2", "12345678910"),
            ],
            STREAM_1,
        );

        // Assert that the first header is encoded as an index to the dynamic table (a post form).
        assert_eq!(buf1[2], 0x10);
        // Assert that the second header is encoded as a literal with a name literal
        assert_eq!(buf1[3] & 0xf0, 0x20);

        // Try to encode another header block. Here both headers will be encoded as a literal with a
        // name literal
        let buf2 = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[
                Header::new("something3", "1234"),
                Header::new("something4", "12345678910"),
            ],
            STREAM_2,
        );
        assert_eq!(buf2[2] & 0xf0, 0x20);

        // Ensure that we have sent only one instruction for (String::from("something", "1234"))
        encoder.send_instructions(ONE_INSTRUCTION_1);

        // exchange a flow control update.
        let out = encoder.peer_conn.process_output(now());
        mem::drop(encoder.conn.process(out.dgram(), now()));

        // Try writing a new header block. Now, headers will be added to the dynamic table again,
        // because instructions can be sent.
        let buf3 = encoder.encoder.encode_header_block(
            &mut encoder.conn,
            &[
                Header::new("something5", "1234"),
                Header::new("something6", "12345678910"),
            ],
            StreamId::new(3),
        );
        // Assert that the first header is encoded as an index to the dynamic table (a post form).
        assert_eq!(buf3[2], 0x10);
        // Assert that the second header is encoded as a literal with a name literal
        assert_eq!(buf3[3] & 0xf0, 0x20);

        // Asset that one instruction has been sent
        encoder.send_instructions(ONE_INSTRUCTION_2);
    }

    #[test]
    fn encoder_max_capacity_limit() {
        let mut encoder = connect(false);

        // change capacity to 2000.
        assert!(encoder.encoder.set_max_capacity(2000).is_ok());
        encoder.send_instructions(CAP_INSTRUCTION_1500);
    }

    #[test]
    fn do_not_evict_entry_that_are_referred_only_by_the_same_header_blocked_encoding() {
        let mut encoder = connect(false);

        encoder.encoder.set_max_blocked_streams(20).unwrap();
        assert!(encoder.change_capacity(50).is_ok());

        encoder
            .encoder
            .send_and_insert(&mut encoder.conn, b"something5", b"1234")
            .unwrap();

        encoder
            .encoder
            .send_encoder_updates(&mut encoder.conn)
            .unwrap();
        let out = encoder.conn.process_output(now());
        mem::drop(encoder.peer_conn.process(out.dgram(), now()));
        // receive an insert count increment.
        recv_instruction(&mut encoder, &[0x01]);

        // The first header will use the table entry and the second will use the literal
        // encoding because the first entry is referred to and cannot be evicted.
        assert_eq!(
            encoder
                .encoder
                .encode_header_block(
                    &mut encoder.conn,
                    &[
                        Header::new("something5", "1234"),
                        Header::new("something6", "1234"),
                    ],
                    StreamId::new(3),
                )
                .to_vec(),
            &[
                0x02, 0x00, 0x80, 0x27, 0x03, 0x73, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x69, 0x6e, 0x67,
                0x36, 0x04, 0x31, 0x32, 0x33, 0x34
            ]
        );
        // Also check that ther is no new instruction send by the encoder.
        assert!(encoder.conn.process_output(now()).dgram().is_none());
    }

    #[test]
    fn streams_cancel_cleans_up_unacked_header_blocks() {
        let mut encoder = connect(false);

        encoder.encoder.set_max_blocked_streams(10).unwrap();
        assert!(encoder.change_capacity(60).is_ok());
        encoder.send_instructions(CAP_INSTRUCTION_60);

        // insert "content-length: 1234
        encoder.insert(
            HEADER_CONTENT_LENGTH,
            VALUE_1,
            HEADER_CONTENT_LENGTH_VALUE_1_NAME_LITERAL,
        );

        // send a header block
        encoder.encode_header_block(
            StreamId::new(1),
            &[Header::new("content-length", "1234")],
            ENCODE_INDEXED_REF_DYNAMIC,
            &[],
        );

        // receive a stream canceled instruction.
        recv_instruction(&mut encoder, STREAM_CANCELED_ID_1);

        recv_instruction(&mut encoder, &[0x01]);
    }
}

[ zur Elbe Produktseite wechseln0.56Quellennavigators  Analyse erneut starten  ]