Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/neqo-http3/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 17 kB image not shown  

Quelle  server_events.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

#![allow(clippy::module_name_repetitions)]

use std::{
    cell::RefCell,
    collections::VecDeque,
    ops::{Deref, DerefMut},
    rc::Rc,
};

use neqo_common::{qdebug, Encoder, Header};
use neqo_transport::{
    server::ConnectionRef, AppError, Connection, DatagramTracking, StreamId, StreamType,
};

use crate::{
    connection::{Http3State, WebTransportSessionAcceptAction},
    connection_server::Http3ServerHandler,
    features::extended_connect::SessionCloseReason,
    Http3StreamInfo, Http3StreamType, Priority, Res,
};

#[derive(Debug, Clone)]
pub struct StreamHandler {
    pub conn: ConnectionRef,
    pub handler: Rc<RefCell<Http3ServerHandler>>,
    pub stream_info: Http3StreamInfo,
}

impl ::std::fmt::Display for StreamHandler {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        let conn: &Connection = &self.conn.borrow();
        write!(f, "conn={} stream_info={:?}", conn, self.stream_info)
    }
}

impl std::hash::Hash for StreamHandler {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.conn.hash(state);
        state.write_u64(self.stream_info.stream_id().as_u64());
        _ = state.finish();
    }
}

impl PartialEq for StreamHandler {
    fn eq(&self, other: &Self) -> bool {
        self.conn == other.conn && self.stream_info.stream_id() == other.stream_info.stream_id()
    }
}

impl Eq for StreamHandler {}

impl StreamHandler {
    pub const fn stream_id(&self) -> StreamId {
        self.stream_info.stream_id()
    }

    /// Supply a response header to a request.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn send_headers(&self, headers: &[Header]) -> Res<()> {
        self.handler.borrow_mut().send_headers(
            self.stream_id(),
            headers,
            &mut self.conn.borrow_mut(),
        )
    }

    /// Supply response data to a request.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn send_data(&self, buf: &[u8]) -> Res<usize> {
        self.handler
            .borrow_mut()
            .send_data(self.stream_id(), buf, &mut self.conn.borrow_mut())
    }

    /// Bytes sendable on stream at the QUIC layer.
    ///
    /// Note that this does not yet account for HTTP3 frame headers.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn available(&self) -> Res<usize> {
        let stream_id = self.stream_id();
        let n = self.conn.borrow_mut().stream_avail_send_space(stream_id)?;
        Ok(n)
    }

    /// Close sending side.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn stream_close_send(&self) -> Res<()> {
        self.handler
            .borrow_mut()
            .stream_close_send(self.stream_id(), &mut self.conn.borrow_mut())
    }

    /// Request a peer to stop sending a stream.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn stream_stop_sending(&self, app_error: AppError) -> Res<()> {
        qdebug!(
            [self],
            "stop sending stream_id:{} error:{}.",
            self.stream_info.stream_id(),
            app_error
        );
        self.handler.borrow_mut().stream_stop_sending(
            self.stream_info.stream_id(),
            app_error,
            &mut self.conn.borrow_mut(),
        )
    }

    /// Reset sending side of a stream.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn stream_reset_send(&self, app_error: AppError) -> Res<()> {
        qdebug!(
            [self],
            "reset send stream_id:{} error:{}.",
            self.stream_info.stream_id(),
            app_error
        );
        self.handler.borrow_mut().stream_reset_send(
            self.stream_info.stream_id(),
            app_error,
            &mut self.conn.borrow_mut(),
        )
    }

    /// Reset a stream/request.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore
    pub fn cancel_fetch(&self, app_error: AppError) -> Res<()> {
        qdebug!([self], "reset error:{}.", app_error);
        self.handler.borrow_mut().cancel_fetch(
            self.stream_info.stream_id(),
            app_error,
            &mut self.conn.borrow_mut(),
        )
    }
}

#[derive(Debug, Clone)]
pub struct Http3OrWebTransportStream {
    stream_handler: StreamHandler,
}

impl ::std::fmt::Display for Http3OrWebTransportStream {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "Stream server {:?}", self.stream_handler)
    }
}

impl Http3OrWebTransportStream {
    pub(crate) const fn new(
        conn: ConnectionRef,
        handler: Rc<RefCell<Http3ServerHandler>>,
        stream_info: Http3StreamInfo,
    ) -> Self {
        Self {
            stream_handler: StreamHandler {
                conn,
                handler,
                stream_info,
            },
        }
    }

    /// Supply a response header to a request.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn send_headers(&self, headers: &[Header]) -> Res<()> {
        self.stream_handler.send_headers(headers)
    }

    /// Supply response data to a request.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn send_data(&self, data: &[u8]) -> Res<usize> {
        qdebug!([self], "Set new response.");
        self.stream_handler.send_data(data)
    }

    /// Close sending side.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn stream_close_send(&self) -> Res<()> {
        qdebug!([self], "Set new response.");
        self.stream_handler.stream_close_send()
    }
}

impl Deref for Http3OrWebTransportStream {
    type Target = StreamHandler;
    #[must_use]
    fn deref(&self) -> &Self::Target {
        &self.stream_handler
    }
}

impl DerefMut for Http3OrWebTransportStream {
    fn deref_mut(&mut self) -> &mut StreamHandler {
        &mut self.stream_handler
    }
}

impl std::hash::Hash for Http3OrWebTransportStream {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.stream_handler.hash(state);
        _ = state.finish();
    }
}

impl PartialEq for Http3OrWebTransportStream {
    fn eq(&self, other: &Self) -> bool {
        self.stream_handler == other.stream_handler
    }
}

impl Eq for Http3OrWebTransportStream {}

#[derive(Debug, Clone)]
pub struct WebTransportRequest {
    stream_handler: StreamHandler,
}

impl ::std::fmt::Display for WebTransportRequest {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "WebTransport session {}", self.stream_handler)
    }
}

impl WebTransportRequest {
    pub(crate) const fn new(
        conn: ConnectionRef,
        handler: Rc<RefCell<Http3ServerHandler>>,
        stream_id: StreamId,
    ) -> Self {
        Self {
            stream_handler: StreamHandler {
                conn,
                handler,
                stream_info: Http3StreamInfo::new(stream_id, Http3StreamType::Http),
            },
        }
    }

    #[must_use]
    pub fn state(&self) -> Http3State {
        self.stream_handler.handler.borrow().state()
    }

    /// Respond to a `WebTransport` session request.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn response(&self, accept: &WebTransportSessionAcceptAction) -> Res<()> {
        qdebug!([self], "Set a response for a WebTransport session.");
        self.stream_handler
            .handler
            .borrow_mut()
            .webtransport_session_accept(
                &mut self.stream_handler.conn.borrow_mut(),
                self.stream_handler.stream_info.stream_id(),
                accept,
            )
    }

    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    /// Also return an error if the stream was closed on the transport layer,
    /// but that information is not yet consumed on the  http/3 layer.
    pub fn close_session(&self, error: u32, message: &str) -> Res<()> {
        self.stream_handler
            .handler
            .borrow_mut()
            .webtransport_close_session(
                &mut self.stream_handler.conn.borrow_mut(),
                self.stream_handler.stream_info.stream_id(),
                error,
                message,
            )
    }

    #[must_use]
    pub const fn stream_id(&self) -> StreamId {
        self.stream_handler.stream_id()
    }

    /// Close sending side.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    pub fn create_stream(&self, stream_type: StreamType) -> Res<Http3OrWebTransportStream> {
        let session_id = self.stream_handler.stream_id();
        let id = self
            .stream_handler
            .handler
            .borrow_mut()
            .webtransport_create_stream(
                &mut self.stream_handler.conn.borrow_mut(),
                session_id,
                stream_type,
            )?;

        Ok(Http3OrWebTransportStream::new(
            self.stream_handler.conn.clone(),
            self.stream_handler.handler.clone(),
            Http3StreamInfo::new(id, Http3StreamType::WebTransport(session_id)),
        ))
    }

    /// Send `WebTransport` datagram.
    ///
    /// # Errors
    ///
    /// It may return `InvalidStreamId` if a stream does not exist anymore.
    /// The function returns `TooMuchData` if the supply buffer is bigger than
    /// the allowed remote datagram size.
    pub fn send_datagram(&self, buf: &[u8], id: impl Into<DatagramTracking>) -> Res<()> {
        let session_id = self.stream_handler.stream_id();
        self.stream_handler
            .handler
            .borrow_mut()
            .webtransport_send_datagram(
                &mut self.stream_handler.conn.borrow_mut(),
                session_id,
                buf,
                id,
            )
    }

    #[must_use]
    pub fn remote_datagram_size(&self) -> u64 {
        self.stream_handler.conn.borrow().remote_datagram_size()
    }

    /// Returns the current max size of a datagram that can fit into a packet.
    /// The value will change over time depending on the encoded size of the
    /// packet number, ack frames, etc.
    ///
    /// # Errors
    ///
    /// The function returns `NotAvailable` if datagrams are not enabled.
    ///
    /// # Panics
    ///
    /// This cannot panic. The max varint length is 8.
    pub fn max_datagram_size(&self) -> Res<u64> {
        let max_size = self.stream_handler.conn.borrow().max_datagram_size()?;
        Ok(max_size
            - u64::try_from(Encoder::varint_len(
                self.stream_handler.stream_id().as_u64(),
            ))
            .unwrap())
    }
}

impl Deref for WebTransportRequest {
    type Target = StreamHandler;
    #[must_use]
    fn deref(&self) -> &Self::Target {
        &self.stream_handler
    }
}

impl DerefMut for WebTransportRequest {
    fn deref_mut(&mut self) -> &mut StreamHandler {
        &mut self.stream_handler
    }
}

impl std::hash::Hash for WebTransportRequest {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.stream_handler.hash(state);
        _ = state.finish();
    }
}

impl PartialEq for WebTransportRequest {
    fn eq(&self, other: &Self) -> bool {
        self.stream_handler == other.stream_handler
    }
}

impl Eq for WebTransportRequest {}

#[derive(Debug, Clone)]
pub enum WebTransportServerEvent {
    NewSession {
        session: WebTransportRequest,
        headers: Vec<Header>,
    },
    SessionClosed {
        session: WebTransportRequest,
        reason: SessionCloseReason,
        headers: Option<Vec<Header>>,
    },
    NewStream(Http3OrWebTransportStream),
    Datagram {
        session: WebTransportRequest,
        datagram: Vec<u8>,
    },
}

#[derive(Debug, Clone)]
pub enum Http3ServerEvent {
    /// Headers are ready.
    Headers {
        stream: Http3OrWebTransportStream,
        headers: Vec<Header>,
        fin: bool,
    },
    /// Request data is ready.
    Data {
        stream: Http3OrWebTransportStream,
        data: Vec<u8>,
        fin: bool,
    },
    DataWritable {
        stream: Http3OrWebTransportStream,
    },
    StreamReset {
        stream: Http3OrWebTransportStream,
        error: AppError,
    },
    StreamStopSending {
        stream: Http3OrWebTransportStream,
        error: AppError,
    },
    /// When individual connection change state. It is only used for tests.
    StateChange {
        conn: ConnectionRef,
        state: Http3State,
    },
    PriorityUpdate {
        stream_id: StreamId,
        priority: Priority,
    },
    WebTransport(WebTransportServerEvent),
}

#[derive(Debug, Default, Clone)]
pub struct Http3ServerEvents {
    events: Rc<RefCell<VecDeque<Http3ServerEvent>>>,
}

impl Http3ServerEvents {
    fn insert(&self, event: Http3ServerEvent) {
        self.events.borrow_mut().push_back(event);
    }

    /// Take all events
    pub fn events(&self) -> impl Iterator<Item = Http3ServerEvent> {
        self.events.replace(VecDeque::new()).into_iter()
    }

    /// Whether there is request pending.
    pub fn has_events(&self) -> bool {
        !self.events.borrow().is_empty()
    }

    /// Take the next event if present.
    pub fn next_event(&self) -> Option<Http3ServerEvent> {
        self.events.borrow_mut().pop_front()
    }

    /// Insert a `Headers` event.
    pub(crate) fn headers(
        &self,
        request: Http3OrWebTransportStream,
        headers: Vec<Header>,
        fin: bool,
    ) {
        self.insert(Http3ServerEvent::Headers {
            stream: request,
            headers,
            fin,
        });
    }

    /// Insert a `StateChange` event.
    pub(crate) fn connection_state_change(&self, conn: ConnectionRef, state: Http3State) {
        self.insert(Http3ServerEvent::StateChange { conn, state });
    }

    /// Insert a `Data` event.
    pub(crate) fn data(
        &self,
        conn: ConnectionRef,
        handler: Rc<RefCell<Http3ServerHandler>>,
        stream_info: Http3StreamInfo,
        data: Vec<u8>,
        fin: bool,
    ) {
        self.insert(Http3ServerEvent::Data {
            stream: Http3OrWebTransportStream::new(conn, handler, stream_info),
            data,
            fin,
        });
    }

    pub(crate) fn data_writable(
        &self,
        conn: ConnectionRef,
        handler: Rc<RefCell<Http3ServerHandler>>,
        stream_info: Http3StreamInfo,
    ) {
        self.insert(Http3ServerEvent::DataWritable {
            stream: Http3OrWebTransportStream::new(conn, handler, stream_info),
        });
    }

    pub(crate) fn stream_reset(
        &self,
        conn: ConnectionRef,
        handler: Rc<RefCell<Http3ServerHandler>>,
        stream_info: Http3StreamInfo,
        error: AppError,
    ) {
        self.insert(Http3ServerEvent::StreamReset {
            stream: Http3OrWebTransportStream::new(conn, handler, stream_info),
            error,
        });
    }

    pub(crate) fn stream_stop_sending(
        &self,
        conn: ConnectionRef,
        handler: Rc<RefCell<Http3ServerHandler>>,
        stream_info: Http3StreamInfo,
        error: AppError,
    ) {
        self.insert(Http3ServerEvent::StreamStopSending {
            stream: Http3OrWebTransportStream::new(conn, handler, stream_info),
            error,
        });
    }

    pub(crate) fn priority_update(&self, stream_id: StreamId, priority: Priority) {
        self.insert(Http3ServerEvent::PriorityUpdate {
            stream_id,
            priority,
        });
    }

    pub(crate) fn webtransport_new_session(
        &self,
        session: WebTransportRequest,
        headers: Vec<Header>,
    ) {
        self.insert(Http3ServerEvent::WebTransport(
            WebTransportServerEvent::NewSession { session, headers },
        ));
    }

    pub(crate) fn webtransport_session_closed(
        &self,
        session: WebTransportRequest,
        reason: SessionCloseReason,
        headers: Option<Vec<Header>>,
    ) {
        self.insert(Http3ServerEvent::WebTransport(
            WebTransportServerEvent::SessionClosed {
                session,
                reason,
                headers,
            },
        ));
    }

    pub(crate) fn webtransport_new_stream(&self, stream: Http3OrWebTransportStream) {
        self.insert(Http3ServerEvent::WebTransport(
            WebTransportServerEvent::NewStream(stream),
        ));
    }

    pub(crate) fn webtransport_datagram(&self, session: WebTransportRequest, datagram: Vec<u8>) {
        self.insert(Http3ServerEvent::WebTransport(
            WebTransportServerEvent::Datagram { session, datagram },
        ));
    }
}

[ Dauer der Verarbeitung: 0.42 Sekunden  ]