Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  push_controller.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{
    cell::RefCell,
    collections::VecDeque,
    fmt::{Debug, Display},
    mem,
    rc::Rc,
    slice::SliceIndex,
};

use neqo_common::{qerror, qinfo, qtrace, Header};
use neqo_transport::{Connection, StreamId};

use crate::{
    client_events::{Http3ClientEvent, Http3ClientEvents},
    connection::Http3Connection,
    frames::HFrame,
    CloseType, Error, Http3StreamInfo, HttpRecvStreamEvents, RecvStreamEvents, Res,
};

/// `PushStates`:
///   `Init`: there is no push stream nor a push promise. This state is only used to keep track of
/// opened and closed           push streams.
///   `PushPromise`: the push has only ever receive a pushpromise frame
///   `OnlyPushStream`: there is only a push stream. All push stream events, i.e. `PushHeaderReady`
/// and                     `PushDataReadable` will be delayed until a push promise is received
/// (they are kept in                     `events`).
///   `Active`: there is a push steam and at least one push promise frame.
///   `Close`: the push stream has been closed or reset already.
#[derive(Debug, PartialEq, Clone)]
enum PushState {
    Init,
    PushPromise {
        headers: Vec<Header>,
    },
    OnlyPushStream {
        stream_id: StreamId,
        events: Vec<Http3ClientEvent>,
    },
    Active {
        stream_id: StreamId,
        headers: Vec<Header>,
    },
    Closed,
}

/// `ActivePushStreams` holds information about push streams.
///
/// `first_push_id` holds a `push_id` of the first element in `push_streams` if it is present.
/// `push_id` smaller than `first_push_id` have been already closed. `push_id` => `first_push_id`
/// are in `push_streams` or they are not yet opened.
#[derive(Debug)]
struct ActivePushStreams {
    push_streams: VecDeque<PushState>,
    first_push_id: u64,
}

impl ActivePushStreams {
    pub const fn new() -> Self {
        Self {
            push_streams: VecDeque::new(),
            first_push_id: 0,
        }
    }

    /// Returns None if a stream has been closed already.
    pub fn get_mut(
        &mut self,
        push_id: u64,
    ) -> Option<&mut <usize as SliceIndex<[PushState]>>::Output> {
        if push_id < self.first_push_id {
            return None;
        }

        let inx = usize::try_from(push_id - self.first_push_id).unwrap();
        if inx >= self.push_streams.len() {
            self.push_streams.resize(inx + 1, PushState::Init);
        }
        match self.push_streams.get_mut(inx) {
            Some(PushState::Closed) => None,
            e => e,
        }
    }

    /// Returns None if a stream has been closed already.
    pub fn get(&mut self, push_id: u64) -> Option<&mut PushState> {
        self.get_mut(push_id)
    }

    /// Returns the State of a closed push stream or None for already closed streams.
    pub fn close(&mut self, push_id: u64) -> Option<PushState> {
        match self.get_mut(push_id) {
            None | Some(PushState::Closed) => None,
            Some(s) => {
                let res = mem::replace(s, PushState::Closed);
                while self.push_streams.front() == Some(&PushState::Closed) {
                    self.push_streams.pop_front();
                    self.first_push_id += 1;
                }
                Some(res)
            }
        }
    }

    #[must_use]
    pub fn number_done(&self) -> u64 {
        self.first_push_id
            + u64::try_from(
                self.push_streams
                    .iter()
                    .filter(|&e| e == &PushState::Closed)
                    .count(),
            )
            .unwrap()
    }

    pub fn clear(&mut self) {
        self.first_push_id = 0;
        self.push_streams.clear();
    }
}

/// `PushController` keeps information about push stream states.
///
/// A `PushStream` calls `add_new_push_stream` that may change the push state from Init to
/// `OnlyPushStream` or from `PushPromise` to `Active`. If a stream has already been closed
/// `add_new_push_stream` returns false (the `PushStream` will close the transport stream).
/// A `PushStream` calls `push_stream_reset` if the transport stream has been canceled.
/// When a push stream is done it calls `close`.
///
/// The `PushController` handles:
///  `PUSH_PROMISE` frame: frames may change the push state from Init to `PushPromise` and from
/// `OnlyPushStream` to                        `Active`. Frames for a closed steams are ignored.
///  `CANCEL_PUSH` frame: (`handle_cancel_push` will be called). If a push is in state `PushPromise`
/// or `Active`, any                       posted events will be removed and a `PushCanceled` event
/// will be posted. If a push is in                       state `OnlyPushStream` or `Active` the
/// transport stream and the `PushStream` will be closed.                       The frame will be
/// ignored for already closed pushes.  Application calling cancel: the actions are similar to the
/// `CANCEL_PUSH` frame. The difference is that                              `PushCanceled` will not
/// be posted and a `CANCEL_PUSH` frame may be sent.
#[derive(Debug)]
pub struct PushController {
    max_concurent_push: u64,
    current_max_push_id: u64,
    // push_streams holds the states of push streams.
    // We keep a stream until the stream has been closed.
    push_streams: ActivePushStreams,
    // The keeps the next consecutive push_id that should be open.
    // All push_id < next_push_id_to_open are in the push_stream lists. If they are not in the list
    // they have been already closed.
    conn_events: Http3ClientEvents,
}

impl PushController {
    pub const fn new(max_concurent_push: u64, conn_events: Http3ClientEvents) -> Self {
        Self {
            max_concurent_push,
            current_max_push_id: 0,
            push_streams: ActivePushStreams::new(),
            conn_events,
        }
    }
}

impl Display for PushController {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "Push controler")
    }
}

impl PushController {
    /// A new `push_promise` has been received.
    ///
    /// # Errors
    ///
    /// `HttpId` if `push_id` greater than it is allowed has been received.
    pub fn new_push_promise(
        &mut self,
        push_id: u64,
        ref_stream_id: StreamId,
        new_headers: Vec<Header>,
    ) -> Res<()> {
        qtrace!(
            [self],
            "New push promise push_id={} headers={:?} max_push={}",
            push_id,
            new_headers,
            self.max_concurent_push
        );

        self.check_push_id(push_id)?;

        match self.push_streams.get_mut(push_id) {
            None => {
                qtrace!("Push has been closed already {}.", push_id);
                Ok(())
            }
            Some(push_state) => match push_state {
                PushState::Init => {
                    self.conn_events
                        .push_promise(push_id, ref_stream_id, new_headers.clone());
                    *push_state = PushState::PushPromise {
                        headers: new_headers,
                    };
                    Ok(())
                }
                PushState::PushPromise { headers } | PushState::Active { headers, .. } => {
                    if new_headers != *headers {
                        return Err(Error::HttpGeneralProtocol);
                    }
                    self.conn_events
                        .push_promise(push_id, ref_stream_id, new_headers);
                    Ok(())
                }
                PushState::OnlyPushStream { stream_id, events } => {
                    let stream_id_tmp = *stream_id;
                    self.conn_events
                        .push_promise(push_id, ref_stream_id, new_headers.clone());

                    for e in events.drain(..) {
                        self.conn_events.insert(e);
                    }
                    *push_state = PushState::Active {
                        stream_id: stream_id_tmp,
                        headers: new_headers,
                    };
                    Ok(())
                }
                PushState::Closed => unreachable!("This is only internal; it is transfer to None"),
            },
        }
    }

    pub fn add_new_push_stream(&mut self, push_id: u64, stream_id: StreamId) -> Res<bool> {
        qtrace!(
            "A new push stream with push_id={} stream_id={}",
            push_id,
            stream_id
        );

        self.check_push_id(push_id)?;

        self.push_streams.get_mut(push_id).map_or_else(
            || {
                qinfo!("Push has been closed already.");
                Ok(false)
            },
            |push_state| match push_state {
                PushState::Init => {
                    *push_state = PushState::OnlyPushStream {
                        stream_id,
                        events: Vec::new(),
                    };
                    Ok(true)
                }
                PushState::PushPromise { headers } => {
                    let tmp = mem::take(headers);
                    *push_state = PushState::Active {
                        stream_id,
                        headers: tmp,
                    };
                    Ok(true)
                }
                // The following state have already have a push stream:
                // PushState::OnlyPushStream | PushState::Active
                _ => {
                    qerror!("Duplicate push stream.");
                    Err(Error::HttpId)
                }
            },
        )
    }

    fn check_push_id(&self, push_id: u64) -> Res<()> {
        // Check if push id is greater than what we allow.
        if push_id > self.current_max_push_id {
            qerror!("Push id is greater than current_max_push_id.");
            Err(Error::HttpId)
        } else {
            Ok(())
        }
    }

    pub fn handle_cancel_push(
        &mut self,
        push_id: u64,
        conn: &mut Connection,
        base_handler: &mut Http3Connection,
    ) -> Res<()> {
        qtrace!("CANCEL_PUSH frame has been received, push_id={}", push_id);

        self.check_push_id(push_id)?;

        match self.push_streams.close(push_id) {
            None => {
                qtrace!("Push has already been closed (push_id={}).", push_id);
                Ok(())
            }
            Some(ps) => match ps {
                PushState::Init => Ok(()),
                PushState::PushPromise { .. } => {
                    self.conn_events.remove_events_for_push_id(push_id);
                    self.conn_events.push_canceled(push_id);
                    Ok(())
                }
                PushState::OnlyPushStream { stream_id, .. }
                | PushState::Active { stream_id, .. } => {
                    mem::drop(base_handler.stream_stop_sending(
                        conn,
                        stream_id,
                        Error::HttpRequestCancelled.code(),
                    ));
                    self.conn_events.remove_events_for_push_id(push_id);
                    self.conn_events.push_canceled(push_id);
                    Ok(())
                }
                PushState::Closed => unreachable!("This is only internal; it is transfer to None"),
            },
        }
    }

    pub fn close(&mut self, push_id: u64) {
        qtrace!("Push stream has been closed.");
        if let Some(push_state) = self.push_streams.close(push_id) {
            debug_assert!(matches!(push_state, PushState::Active { .. }));
        } else {
            debug_assert!(false, "Closing non existing push stream!");
        }
    }

    pub fn cancel(
        &mut self,
        push_id: u64,
        conn: &mut Connection,
        base_handler: &mut Http3Connection,
    ) -> Res<()> {
        qtrace!("Cancel push_id={}", push_id);

        self.check_push_id(push_id)?;

        match self.push_streams.get(push_id) {
            None => {
                qtrace!("Push has already been closed.");
                // If we have some events for the push_id in the event queue, the caller still does
                // not not know that the push has been closed. Otherwise return
                // InvalidStreamId.
                if self.conn_events.has_push(push_id) {
                    self.conn_events.remove_events_for_push_id(push_id);
                    Ok(())
                } else {
                    Err(Error::InvalidStreamId)
                }
            }
            Some(PushState::PushPromise { .. }) => {
                self.conn_events.remove_events_for_push_id(push_id);
                base_handler.queue_control_frame(&HFrame::CancelPush { push_id });
                self.push_streams.close(push_id);
                Ok(())
            }
            Some(PushState::Active { stream_id, .. }) => {
                self.conn_events.remove_events_for_push_id(push_id);
                // Cancel the stream. The transport stream may already be done, so ignore an error.
                mem::drop(base_handler.stream_stop_sending(
                    conn,
                    *stream_id,
                    Error::HttpRequestCancelled.code(),
                ));
                self.push_streams.close(push_id);
                Ok(())
            }
            Some(_) => Err(Error::InvalidStreamId),
        }
    }

    pub fn push_stream_reset(&mut self, push_id: u64, close_type: CloseType) {
        qtrace!("Push stream has been reset, push_id={}", push_id);

        if let Some(push_state) = self.push_streams.get(push_id) {
            match push_state {
                PushState::OnlyPushStream { .. } => {
                    self.push_streams.close(push_id);
                }
                PushState::Active { .. } => {
                    self.push_streams.close(push_id);
                    self.conn_events.remove_events_for_push_id(push_id);
                    if let CloseType::LocalError(app_error) = close_type {
                        self.conn_events.push_reset(push_id, app_error);
                    } else {
                        self.conn_events.push_canceled(push_id);
                    }
                }
                _ => {
                    debug_assert!(
                        false,
                        "Reset cannot actually happen because we do not have a stream."
                    );
                }
            }
        }
    }

    pub fn get_active_stream_id(&mut self, push_id: u64) -> Option<StreamId> {
        match self.push_streams.get(push_id) {
            Some(PushState::Active { stream_id, .. }) => Some(*stream_id),
            _ => None,
        }
    }

    pub fn maybe_send_max_push_id_frame(&mut self, base_handler: &mut Http3Connection) {
        let push_done = self.push_streams.number_done();
        if self.max_concurent_push > 0
            && (self.current_max_push_id - push_done) <= (self.max_concurent_push / 2)
        {
            self.current_max_push_id = push_done + self.max_concurent_push;
            base_handler.queue_control_frame(&HFrame::MaxPushId {
                push_id: self.current_max_push_id,
            });
        }
    }

    pub fn handle_zero_rtt_rejected(&mut self) {
        self.current_max_push_id = 0;
    }

    pub fn clear(&mut self) {
        self.push_streams.clear();
    }

    pub const fn can_receive_push(&self) -> bool {
        self.max_concurent_push > 0
    }

    pub fn new_stream_event(&mut self, push_id: u64, event: Http3ClientEvent) {
        match self.push_streams.get_mut(push_id) {
            None => {
                debug_assert!(false, "Push has been closed already.");
            }
            Some(PushState::OnlyPushStream { events, .. }) => {
                events.push(event);
            }
            Some(PushState::Active { .. }) => {
                self.conn_events.insert(event);
            }
            Some(_) => {
                debug_assert!(false, "No record of a stream!");
            }
        }
    }
}

/// `RecvPushEvents` relays a push stream events to `PushController`.
/// It informs `PushController` when a push stream is done or canceled.
/// Also when headers or data is ready and `PushController` decide whether to post
/// `PushHeaderReady` and `PushDataReadable` events or to postpone them if
/// a `push_promise` has not been yet received for the stream.
#[derive(Debug)]
pub struct RecvPushEvents {
    push_id: u64,
    push_handler: Rc<RefCell<PushController>>,
}

impl RecvPushEvents {
    pub const fn new(push_id: u64, push_handler: Rc<RefCell<PushController>>) -> Self {
        Self {
            push_id,
            push_handler,
        }
    }
}

impl RecvStreamEvents for RecvPushEvents {
    fn data_readable(&self, _stream_info: Http3StreamInfo) {
        self.push_handler.borrow_mut().new_stream_event(
            self.push_id,
            Http3ClientEvent::PushDataReadable {
                push_id: self.push_id,
            },
        );
    }

    fn recv_closed(&self, _stream_info: Http3StreamInfo, close_type: CloseType) {
        match close_type {
            CloseType::ResetApp(_) => {}
            CloseType::ResetRemote(_) | CloseType::LocalError(_) => self
                .push_handler
                .borrow_mut()
                .push_stream_reset(self.push_id, close_type),
            CloseType::Done => self.push_handler.borrow_mut().close(self.push_id),
        }
    }
}

impl HttpRecvStreamEvents for RecvPushEvents {
    fn header_ready(
        &self,
        _stream_info: Http3StreamInfo,
        headers: Vec<Header>,
        interim: bool,
        fin: bool,
    ) {
        self.push_handler.borrow_mut().new_stream_event(
            self.push_id,
            Http3ClientEvent::PushHeaderReady {
                push_id: self.push_id,
                headers,
                interim,
                fin,
            },
        );
    }
}

[ Dauer der Verarbeitung: 0.47 Sekunden  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge