Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  streams.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

// Stream management for a connection.
use std::{cell::RefCell, cmp::Ordering, rc::Rc};

use neqo_common::{qtrace, qwarn, Role};

use crate::{
    fc::{LocalStreamLimits, ReceiverFlowControl, RemoteStreamLimits, SenderFlowControl},
    frame::Frame,
    packet::PacketBuilder,
    recovery::{RecoveryToken, StreamRecoveryToken},
    recv_stream::{RecvStream, RecvStreams},
    send_stream::{SendStream, SendStreams, TransmissionPriority},
    stats::FrameStats,
    stream_id::{StreamId, StreamType},
    tparams::{self, TransportParametersHandler},
    ConnectionEvents, Error, Res,
};

pub type SendOrder = i64;

#[derive(Copy, Clone)]
pub struct StreamOrder {
    pub sendorder: Option<SendOrder>,
}

// We want highest to lowest, with None being higher than any value
impl Ord for StreamOrder {
    fn cmp(&self, other: &Self) -> Ordering {
        if self.sendorder.is_some() && other.sendorder.is_some() {
            // We want reverse order (high to low) when both values are specified.
            other.sendorder.cmp(&self.sendorder)
        } else {
            self.sendorder.cmp(&other.sendorder)
        }
    }
}

impl PartialOrd for StreamOrder {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl PartialEq for StreamOrder {
    fn eq(&self, other: &Self) -> bool {
        self.sendorder == other.sendorder
    }
}

impl Eq for StreamOrder {}

pub struct Streams {
    role: Role,
    tps: Rc<RefCell<TransportParametersHandler>>,
    events: ConnectionEvents,
    sender_fc: Rc<RefCell<SenderFlowControl<()>>>,
    receiver_fc: Rc<RefCell<ReceiverFlowControl<()>>>,
    remote_stream_limits: RemoteStreamLimits,
    local_stream_limits: LocalStreamLimits,
    pub(crate) send: SendStreams,
    pub(crate) recv: RecvStreams,
}

impl Streams {
    pub fn new(
        tps: Rc<RefCell<TransportParametersHandler>>,
        role: Role,
        events: ConnectionEvents,
    ) -> Self {
        let limit_bidi = tps
            .borrow()
            .local
            .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI);
        let limit_uni = tps
            .borrow()
            .local
            .get_integer(tparams::INITIAL_MAX_STREAMS_UNI);
        let max_data = tps.borrow().local.get_integer(tparams::INITIAL_MAX_DATA);
        Self {
            role,
            tps,
            events,
            sender_fc: Rc::new(RefCell::new(SenderFlowControl::new((), 0))),
            receiver_fc: Rc::new(RefCell::new(ReceiverFlowControl::new((), max_data))),
            remote_stream_limits: RemoteStreamLimits::new(limit_bidi, limit_uni, role),
            local_stream_limits: LocalStreamLimits::new(role),
            send: SendStreams::default(),
            recv: RecvStreams::default(),
        }
    }

    #[must_use]
    pub fn is_stream_id_allowed(&self, stream_id: StreamId) -> bool {
        self.remote_stream_limits[stream_id.stream_type()].is_allowed(stream_id)
    }

    pub fn zero_rtt_rejected(&mut self) {
        self.clear_streams();
        debug_assert_eq!(
            self.remote_stream_limits[StreamType::BiDi].max_active(),
            self.tps
                .borrow()
                .local
                .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI)
        );
        debug_assert_eq!(
            self.remote_stream_limits[StreamType::UniDi].max_active(),
            self.tps
                .borrow()
                .local
                .get_integer(tparams::INITIAL_MAX_STREAMS_UNI)
        );
        self.local_stream_limits = LocalStreamLimits::new(self.role);
    }

    /// # Errors
    /// When the frame is invalid.
    pub fn input_frame(&mut self, frame: &Frame, stats: &mut FrameStats) -> Res<()> {
        match frame {
            Frame::ResetStream {
                stream_id,
                application_error_code,
                final_size,
            } => {
                stats.reset_stream += 1;
                if let (_, Some(rs)) = self.obtain_stream(*stream_id)? {
                    rs.reset(*application_error_code, *final_size)?;
                }
            }
            Frame::StopSending {
                stream_id,
                application_error_code,
            } => {
                stats.stop_sending += 1;
                self.events
                    .send_stream_stop_sending(*stream_id, *application_error_code);
                if let (Some(ss), _) = self.obtain_stream(*stream_id)? {
                    ss.reset(*application_error_code);
                }
            }
            Frame::Stream {
                fin,
                stream_id,
                offset,
                data,
                ..
            } => {
                stats.stream += 1;
                if let (_, Some(rs)) = self.obtain_stream(*stream_id)? {
                    rs.inbound_stream_frame(*fin, *offset, data)?;
                }
            }
            Frame::MaxData { maximum_data } => {
                stats.max_data += 1;
                self.handle_max_data(*maximum_data);
            }
            Frame::MaxStreamData {
                stream_id,
                maximum_stream_data,
            } => {
                qtrace!(
                    "Stream {} Received MaxStreamData {}",
                    *stream_id,
                    *maximum_stream_data
                );
                stats.max_stream_data += 1;
                if let (Some(ss), _) = self.obtain_stream(*stream_id)? {
                    ss.set_max_stream_data(*maximum_stream_data);
                }
            }
            Frame::MaxStreams {
                stream_type,
                maximum_streams,
            } => {
                stats.max_streams += 1;
                self.handle_max_streams(*stream_type, *maximum_streams);
            }
            Frame::DataBlocked { data_limit } => {
                // Should never happen since we set data limit to max
                qwarn!("Received DataBlocked with data limit {}", data_limit);
                stats.data_blocked += 1;
                self.handle_data_blocked();
            }
            Frame::StreamDataBlocked { stream_id, .. } => {
                qtrace!("Received StreamDataBlocked");
                stats.stream_data_blocked += 1;
                // Terminate connection with STREAM_STATE_ERROR if send-only
                // stream (-transport 19.13)
                if stream_id.is_send_only(self.role) {
                    return Err(Error::StreamStateError);
                }

                if let (_, Some(rs)) = self.obtain_stream(*stream_id)? {
                    rs.send_flowc_update();
                }
            }
            Frame::StreamsBlocked { .. } => {
                stats.streams_blocked += 1;
                // We send an update evry time we retire a stream. There is no need to
                // trigger flow updates here.
            }
            _ => unreachable!("This is not a stream Frame"),
        }
        Ok(())
    }

    fn write_maintenance_frames(
        &mut self,
        builder: &mut PacketBuilder,
        tokens: &mut Vec<RecoveryToken>,
        stats: &mut FrameStats,
    ) {
        // Send `DATA_BLOCKED` as necessary.
        self.sender_fc
            .borrow_mut()
            .write_frames(builder, tokens, stats);
        if builder.is_full() {
            return;
        }

        // Send `MAX_DATA` as necessary.
        self.receiver_fc
            .borrow_mut()
            .write_frames(builder, tokens, stats);
        if builder.is_full() {
            return;
        }

        self.recv.write_frames(builder, tokens, stats);

        self.remote_stream_limits[StreamType::BiDi].write_frames(builder, tokens, stats);
        if builder.is_full() {
            return;
        }
        self.remote_stream_limits[StreamType::UniDi].write_frames(builder, tokens, stats);
        if builder.is_full() {
            return;
        }

        self.local_stream_limits[StreamType::BiDi].write_frames(builder, tokens, stats);
        if builder.is_full() {
            return;
        }

        self.local_stream_limits[StreamType::UniDi].write_frames(builder, tokens, stats);
    }

    pub fn write_frames(
        &mut self,
        priority: TransmissionPriority,
        builder: &mut PacketBuilder,
        tokens: &mut Vec<RecoveryToken>,
        stats: &mut FrameStats,
    ) {
        if priority == TransmissionPriority::Important {
            self.write_maintenance_frames(builder, tokens, stats);
            if builder.is_full() {
                return;
            }
        }

        self.send.write_frames(priority, builder, tokens, stats);
    }

    pub fn lost(&mut self, token: &StreamRecoveryToken) {
        match token {
            StreamRecoveryToken::Stream(st) => self.send.lost(st),
            StreamRecoveryToken::ResetStream { stream_id } => self.send.reset_lost(*stream_id),
            StreamRecoveryToken::StreamDataBlocked { stream_id, limit } => {
                self.send.blocked_lost(*stream_id, *limit);
            }
            StreamRecoveryToken::MaxStreamData {
                stream_id,
                max_data,
            } => {
                if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) {
                    rs.max_stream_data_lost(*max_data);
                }
            }
            StreamRecoveryToken::StopSending { stream_id } => {
                if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) {
                    rs.stop_sending_lost();
                }
            }
            StreamRecoveryToken::StreamsBlocked { stream_type, limit } => {
                self.local_stream_limits[*stream_type].frame_lost(*limit);
            }
            StreamRecoveryToken::MaxStreams {
                stream_type,
                max_streams,
            } => {
                self.remote_stream_limits[*stream_type].frame_lost(*max_streams);
            }
            StreamRecoveryToken::DataBlocked(limit) => {
                self.sender_fc.borrow_mut().frame_lost(*limit);
            }
            StreamRecoveryToken::MaxData(maximum_data) => {
                self.receiver_fc.borrow_mut().frame_lost(*maximum_data);
            }
        }
    }

    pub fn acked(&mut self, token: &StreamRecoveryToken) {
        match token {
            StreamRecoveryToken::Stream(st) => self.send.acked(st),
            StreamRecoveryToken::ResetStream { stream_id } => self.send.reset_acked(*stream_id),
            StreamRecoveryToken::StopSending { stream_id } => {
                if let Ok((_, Some(rs))) = self.obtain_stream(*stream_id) {
                    rs.stop_sending_acked();
                }
            }
            // We only worry when these are lost
            StreamRecoveryToken::DataBlocked(_)
            | StreamRecoveryToken::StreamDataBlocked { .. }
            | StreamRecoveryToken::MaxStreamData { .. }
            | StreamRecoveryToken::StreamsBlocked { .. }
            | StreamRecoveryToken::MaxStreams { .. }
            | StreamRecoveryToken::MaxData(_) => (),
        }
    }

    pub fn clear_streams(&mut self) {
        self.send.clear();
        self.recv.clear();
    }

    pub fn cleanup_closed_streams(&mut self) {
        // filter the list, removing closed streams
        self.send.remove_terminal();

        let (removed_bidi, removed_uni) = self.recv.clear_terminal(&self.send, self.role);

        // Send max_streams updates if we removed remote-initiated recv streams.
        // The updates will be send if any steams has been removed.
        self.remote_stream_limits[StreamType::BiDi].add_retired(removed_bidi);
        self.remote_stream_limits[StreamType::UniDi].add_retired(removed_uni);
    }

    fn ensure_created_if_remote(&mut self, stream_id: StreamId) -> Res<()> {
        if !stream_id.is_remote_initiated(self.role)
            || !self.remote_stream_limits[stream_id.stream_type()].is_new_stream(stream_id)?
        {
            // If it is not a remote stream and stream already exist.
            return Ok(());
        }

        let tp = match stream_id.stream_type() {
            // From the local perspective, this is a remote- originated BiDi stream. From
            // the remote perspective, this is a local-originated BiDi stream. Therefore,
            // look at the local transport parameters for the
            // INITIAL_MAX_STREAM_DATA_BIDI_REMOTE value to decide how much this endpoint
            // will allow its peer to send.
            StreamType::BiDi => tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE,
            StreamType::UniDi => tparams::INITIAL_MAX_STREAM_DATA_UNI,
        };
        let recv_initial_max_stream_data = self.tps.borrow().local.get_integer(tp);

        while self.remote_stream_limits[stream_id.stream_type()].is_new_stream(stream_id)? {
            let next_stream_id =
                self.remote_stream_limits[stream_id.stream_type()].take_stream_id();
            self.events.new_stream(next_stream_id);

            self.recv.insert(
                next_stream_id,
                RecvStream::new(
                    next_stream_id,
                    recv_initial_max_stream_data,
                    Rc::clone(&self.receiver_fc),
                    self.events.clone(),
                ),
            );

            if next_stream_id.is_bidi() {
                // From the local perspective, this is a remote- originated BiDi stream.
                // From the remote perspective, this is a local-originated BiDi stream.
                // Therefore, look at the remote's transport parameters for the
                // INITIAL_MAX_STREAM_DATA_BIDI_LOCAL value to decide how much this endpoint
                // is allowed to send its peer.
                let send_initial_max_stream_data = self
                    .tps
                    .borrow()
                    .remote()
                    .get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);
                self.send.insert(
                    next_stream_id,
                    SendStream::new(
                        next_stream_id,
                        send_initial_max_stream_data,
                        Rc::clone(&self.sender_fc),
                        self.events.clone(),
                    ),
                );
            }
        }
        Ok(())
    }

    /// Get or make a stream, and implicitly open additional streams as
    /// indicated by its stream id.
    /// # Errors
    /// When the stream cannot be created due to stream limits.
    pub fn obtain_stream(
        &mut self,
        stream_id: StreamId,
    ) -> Res<(Option<&mut SendStream>, Option<&mut RecvStream>)> {
        self.ensure_created_if_remote(stream_id)?;
        Ok((
            self.send.get_mut(stream_id).ok(),
            self.recv.get_mut(stream_id).ok(),
        ))
    }

    /// # Errors
    /// When the stream does not exist.
    pub fn set_sendorder(&mut self, stream_id: StreamId, sendorder: Option<SendOrder>) -> Res<()> {
        self.send.set_sendorder(stream_id, sendorder)
    }

    /// # Errors
    /// When the stream does not exist.
    pub fn set_fairness(&mut self, stream_id: StreamId, fairness: bool) -> Res<()> {
        self.send.set_fairness(stream_id, fairness)
    }

    /// # Errors
    /// When a stream cannot be created, which might be temporary.
    pub fn stream_create(&mut self, st: StreamType) -> Res<StreamId> {
        match self.local_stream_limits.take_stream_id(st) {
            None => Err(Error::StreamLimitError),
            Some(new_id) => {
                let send_limit_tp = match st {
                    StreamType::UniDi => tparams::INITIAL_MAX_STREAM_DATA_UNI,
                    StreamType::BiDi => tparams::INITIAL_MAX_STREAM_DATA_BIDI_REMOTE,
                };
                let send_limit = self.tps.borrow().remote().get_integer(send_limit_tp);
                let stream = SendStream::new(
                    new_id,
                    send_limit,
                    Rc::clone(&self.sender_fc),
                    self.events.clone(),
                );
                self.send.insert(new_id, stream);

                if st == StreamType::BiDi {
                    // From the local perspective, this is a local- originated BiDi stream. From the
                    // remote perspective, this is a remote-originated BiDi stream. Therefore, look
                    // at the local transport parameters for the
                    // INITIAL_MAX_STREAM_DATA_BIDI_LOCAL value to decide how
                    // much this endpoint will allow its peer to send.
                    let recv_initial_max_stream_data = self
                        .tps
                        .borrow()
                        .local
                        .get_integer(tparams::INITIAL_MAX_STREAM_DATA_BIDI_LOCAL);

                    self.recv.insert(
                        new_id,
                        RecvStream::new(
                            new_id,
                            recv_initial_max_stream_data,
                            Rc::clone(&self.receiver_fc),
                            self.events.clone(),
                        ),
                    );
                }
                Ok(new_id)
            }
        }
    }

    pub fn handle_max_data(&mut self, maximum_data: u64) {
        let previous_limit = self.sender_fc.borrow().available();
        let Some(current_limit) = self.sender_fc.borrow_mut().update(maximum_data) else {
            return;
        };

        for (_id, ss) in &mut self.send {
            ss.maybe_emit_writable_event(previous_limit, current_limit);
        }
    }

    pub fn handle_data_blocked(&self) {
        self.receiver_fc.borrow_mut().send_flowc_update();
    }

    pub fn set_initial_limits(&mut self) {
        _ = self.local_stream_limits[StreamType::BiDi].update(
            self.tps
                .borrow()
                .remote()
                .get_integer(tparams::INITIAL_MAX_STREAMS_BIDI),
        );
        _ = self.local_stream_limits[StreamType::UniDi].update(
            self.tps
                .borrow()
                .remote()
                .get_integer(tparams::INITIAL_MAX_STREAMS_UNI),
        );

        // As a client, there are two sets of initial limits for sending stream data.
        // If the second limit is higher and streams have been created, then
        // ensure that streams are not blocked on the lower limit.
        if self.role == Role::Client {
            self.send.update_initial_limit(self.tps.borrow().remote());
        }

        self.sender_fc.borrow_mut().update(
            self.tps
                .borrow()
                .remote()
                .get_integer(tparams::INITIAL_MAX_DATA),
        );

        if self.local_stream_limits[StreamType::BiDi].available() > 0 {
            self.events.send_stream_creatable(StreamType::BiDi);
        }
        if self.local_stream_limits[StreamType::UniDi].available() > 0 {
            self.events.send_stream_creatable(StreamType::UniDi);
        }
    }

    pub fn handle_max_streams(&mut self, stream_type: StreamType, maximum_streams: u64) {
        let increased = self.local_stream_limits[stream_type]
            .update(maximum_streams)
            .is_some();
        if increased {
            self.events.send_stream_creatable(stream_type);
        }
    }

    /// # Errors
    /// When the stream does not exist.
    pub fn get_send_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut SendStream> {
        self.send.get_mut(stream_id)
    }

    /// # Errors
    /// When the stream does not exist.
    pub fn get_send_stream(&self, stream_id: StreamId) -> Res<&SendStream> {
        self.send.get(stream_id)
    }

    /// # Errors
    /// When the stream does not exist.
    pub fn get_recv_stream_mut(&mut self, stream_id: StreamId) -> Res<&mut RecvStream> {
        self.recv.get_mut(stream_id)
    }

    /// # Errors
    /// When the stream does not exist.
    pub fn keep_alive(&mut self, stream_id: StreamId, keep: bool) -> Res<()> {
        self.recv.keep_alive(stream_id, keep)
    }

    #[must_use]
    pub fn need_keep_alive(&self) -> bool {
        self.recv.need_keep_alive()
    }
}

[ zur Elbe Produktseite wechseln0.30Quellennavigators  Analyse erneut starten  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge