Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  connection_server.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{rc::Rc, time::Instant};

use neqo_common::{event::Provider, qdebug, qinfo, qtrace, Header, MessageType, Role};
use neqo_transport::{
    AppError, Connection, ConnectionEvent, DatagramTracking, StreamId, StreamType,
};

use crate::{
    connection::{Http3Connection, Http3State, WebTransportSessionAcceptAction},
    frames::HFrame,
    recv_message::{RecvMessage, RecvMessageInfo},
    send_message::SendMessage,
    server_connection_events::{Http3ServerConnEvent, Http3ServerConnEvents},
    Error, Http3Parameters, Http3StreamType, NewStreamType, Priority, PriorityHandler,
    ReceiveOutput, Res,
};

#[derive(Debug)]
pub struct Http3ServerHandler {
    base_handler: Http3Connection,
    events: Http3ServerConnEvents,
    needs_processing: bool,
}

impl ::std::fmt::Display for Http3ServerHandler {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "Http3 server connection")
    }
}

impl Http3ServerHandler {
    pub(crate) fn new(http3_parameters: Http3Parameters) -> Self {
        Self {
            base_handler: Http3Connection::new(http3_parameters, Role::Server),
            events: Http3ServerConnEvents::default(),
            needs_processing: false,
        }
    }

    #[must_use]
    pub fn state(&self) -> Http3State {
        self.base_handler.state()
    }

    /// Supply a response for a request.
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist,
    /// `AlreadyClosed` if the stream has already been closed.
    /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
    /// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
    /// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
    /// supplied.
    pub(crate) fn send_data(
        &mut self,
        stream_id: StreamId,
        data: &[u8],
        conn: &mut Connection,
    ) -> Res<usize> {
        let n = self
            .base_handler
            .send_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?
            .send_data(conn, data)?;
        if n > 0 {
            self.base_handler.stream_has_pending_data(stream_id);
        }
        self.needs_processing = true;
        Ok(n)
    }

    /// Supply response heeaders for a request.
    pub(crate) fn send_headers(
        &mut self,
        stream_id: StreamId,
        headers: &[Header],
        conn: &mut Connection,
    ) -> Res<()> {
        self.base_handler
            .send_streams
            .get_mut(&stream_id)
            .ok_or(Error::InvalidStreamId)?
            .http_stream()
            .ok_or(Error::InvalidStreamId)?
            .send_headers(headers, conn)?;
        self.base_handler.stream_has_pending_data(stream_id);
        self.needs_processing = true;
        Ok(())
    }

    /// This is called when application is done sending a request.
    ///
    /// # Errors
    ///
    /// An error will be returned if stream does not exist.
    pub fn stream_close_send(&mut self, stream_id: StreamId, conn: &mut Connection) -> Res<()> {
        qdebug!([self], "Close sending side stream={}.", stream_id);
        self.base_handler.stream_close_send(conn, stream_id)?;
        self.needs_processing = true;
        Ok(())
    }

    /// An application may reset a stream(request).
    /// Both sides, sending and receiving side, will be closed.
    ///
    /// # Errors
    ///
    /// An error will be return if a stream does not exist.
    pub fn cancel_fetch(
        &mut self,
        stream_id: StreamId,
        error: AppError,
        conn: &mut Connection,
    ) -> Res<()> {
        qinfo!([self], "cancel_fetch {} error={}.", stream_id, error);
        self.needs_processing = true;
        self.base_handler.cancel_fetch(stream_id, error, conn)
    }

    pub fn stream_stop_sending(
        &mut self,
        stream_id: StreamId,
        error: AppError,
        conn: &mut Connection,
    ) -> Res<()> {
        qinfo!([self], "stream_stop_sending {} error={}.", stream_id, error);
        self.needs_processing = true;
        self.base_handler
            .stream_stop_sending(conn, stream_id, error)
    }

    pub fn stream_reset_send(
        &mut self,
        stream_id: StreamId,
        error: AppError,
        conn: &mut Connection,
    ) -> Res<()> {
        qinfo!([self], "stream_reset_send {} error={}.", stream_id, error);
        self.needs_processing = true;
        self.base_handler.stream_reset_send(conn, stream_id, error)
    }

    /// Accept a `WebTransport` Session request
    pub(crate) fn webtransport_session_accept(
        &mut self,
        conn: &mut Connection,
        stream_id: StreamId,
        accept: &WebTransportSessionAcceptAction,
    ) -> Res<()> {
        self.needs_processing = true;
        self.base_handler.webtransport_session_accept(
            conn,
            stream_id,
            Box::new(self.events.clone()),
            accept,
        )
    }

    /// Close `WebTransport` cleanly
    ///
    /// # Errors
    ///
    /// `InvalidStreamId` if the stream does not exist,
    /// `TransportStreamDoesNotExist` if the transport stream does not exist (this may happen if
    /// `process_output` has not been called when needed, and HTTP3 layer has not picked up the
    /// info that the stream has been closed.) `InvalidInput` if an empty buffer has been
    /// supplied.
    pub fn webtransport_close_session(
        &mut self,
        conn: &mut Connection,
        session_id: StreamId,
        error: u32,
        message: &str,
    ) -> Res<()> {
        self.needs_processing = true;
        self.base_handler
            .webtransport_close_session(conn, session_id, error, message)
    }

    pub fn webtransport_create_stream(
        &mut self,
        conn: &mut Connection,
        session_id: StreamId,
        stream_type: StreamType,
    ) -> Res<StreamId> {
        self.needs_processing = true;
        self.base_handler.webtransport_create_stream_local(
            conn,
            session_id,
            stream_type,
            Box::new(self.events.clone()),
            Box::new(self.events.clone()),
        )
    }

    pub fn webtransport_send_datagram(
        &mut self,
        conn: &mut Connection,
        session_id: StreamId,
        buf: &[u8],
        id: impl Into<DatagramTracking>,
    ) -> Res<()> {
        self.needs_processing = true;
        self.base_handler
            .webtransport_send_datagram(session_id, conn, buf, id)
    }

    /// Process HTTTP3 layer.
    pub fn process_http3(&mut self, conn: &mut Connection, now: Instant) {
        qtrace!([self], "Process http3 internal.");
        if matches!(self.base_handler.state(), Http3State::Closed(..)) {
            return;
        }

        let res = self.check_connection_events(conn, now);
        if !self.check_result(conn, now, &res) && self.base_handler.state().active() {
            let res = self.base_handler.process_sending(conn);
            self.check_result(conn, now, &res);
        }
    }

    /// Take the next available event.
    pub(crate) fn next_event(&self) -> Option<Http3ServerConnEvent> {
        self.events.next_event()
    }

    /// Whether this connection has events to process or data to send.
    pub(crate) fn should_be_processed(&mut self) -> bool {
        if self.needs_processing {
            self.needs_processing = false;
            return true;
        }
        self.base_handler.has_data_to_send() || self.events.has_events()
    }

    // This function takes the provided result and check for an error.
    // An error results in closing the connection.
    fn check_result<ERR>(&mut self, conn: &mut Connection, now: Instant, res: &Res<ERR>) -> bool {
        match &res {
            Err(e) => {
                self.close(conn, now, e);
                true
            }
            _ => false,
        }
    }

    fn close(&mut self, conn: &mut Connection, now: Instant, err: &Error) {
        qinfo!([self], "Connection error: {}.", err);
        conn.close(now, err.code(), format!("{err}"));
        self.base_handler.close(err.code());
        self.events
            .connection_state_change(self.base_handler.state());
    }

    // If this return an error the connection must be closed.
    fn check_connection_events(&mut self, conn: &mut Connection, now: Instant) -> Res<()> {
        qtrace!([self], "Check connection events.");
        while let Some(e) = conn.next_event() {
            qdebug!([self], "check_connection_events - event {e:?}.");
            match e {
                ConnectionEvent::NewStream { stream_id } => {
                    self.base_handler.add_new_stream(stream_id);
                }
                ConnectionEvent::RecvStreamReadable { stream_id } => {
                    self.handle_stream_readable(conn, stream_id)?;
                }
                ConnectionEvent::RecvStreamReset {
                    stream_id,
                    app_error,
                } => {
                    self.base_handler
                        .handle_stream_reset(stream_id, app_error, conn)?;
                }
                ConnectionEvent::SendStreamStopSending {
                    stream_id,
                    app_error,
                } => self
                    .base_handler
                    .handle_stream_stop_sending(stream_id, app_error, conn)?,
                ConnectionEvent::StateChange(state) => {
                    if self.base_handler.handle_state_change(conn, &state)? {
                        if self.base_handler.state() == Http3State::Connected {
                            let settings = self.base_handler.save_settings();
                            conn.send_ticket(now, &settings)?;
                        }
                        self.events
                            .connection_state_change(self.base_handler.state());
                    }
                }
                ConnectionEvent::SendStreamWritable { stream_id } => {
                    if let Some(s) = self.base_handler.send_streams.get_mut(&stream_id) {
                        s.stream_writable();
                    }
                }
                ConnectionEvent::Datagram(dgram) => self.base_handler.handle_datagram(&dgram),
                ConnectionEvent::AuthenticationNeeded
                | ConnectionEvent::EchFallbackAuthenticationNeeded { .. }
                | ConnectionEvent::ZeroRttRejected
                | ConnectionEvent::ResumptionToken(..) => return Err(Error::HttpInternal(4)),
                ConnectionEvent::SendStreamComplete { .. }
                | ConnectionEvent::SendStreamCreatable { .. }
                | ConnectionEvent::OutgoingDatagramOutcome { .. }
                | ConnectionEvent::IncomingDatagramDropped => {}
            }
        }
        Ok(())
    }

    fn handle_stream_readable(&mut self, conn: &mut Connection, stream_id: StreamId) -> Res<()> {
        match self.base_handler.handle_stream_readable(conn, stream_id)? {
            ReceiveOutput::NewStream(NewStreamType::Push(_)) => Err(Error::HttpStreamCreation),
            ReceiveOutput::NewStream(NewStreamType::Http(first_frame_type)) => {
                self.base_handler.add_streams(
                    stream_id,
                    Box::new(SendMessage::new(
                        MessageType::Response,
                        Http3StreamType::Http,
                        stream_id,
                        self.base_handler.qpack_encoder.clone(),
                        Box::new(self.events.clone()),
                    )),
                    Box::new(RecvMessage::new(
                        &RecvMessageInfo {
                            message_type: MessageType::Request,
                            stream_type: Http3StreamType::Http,
                            stream_id,
                            first_frame_type: Some(first_frame_type),
                        },
                        Rc::clone(&self.base_handler.qpack_decoder),
                        Box::new(self.events.clone()),
                        None,
                        PriorityHandler::new(false, Priority::default()),
                    )),
                );
                let res = self.base_handler.handle_stream_readable(conn, stream_id)?;
                assert_eq!(ReceiveOutput::NoOutput, res);
                Ok(())
            }
            ReceiveOutput::NewStream(NewStreamType::WebTransportStream(session_id)) => {
                self.base_handler.webtransport_create_stream_remote(
                    StreamId::from(session_id),
                    stream_id,
                    Box::new(self.events.clone()),
                    Box::new(self.events.clone()),
                )?;
                let res = self.base_handler.handle_stream_readable(conn, stream_id)?;
                assert_eq!(ReceiveOutput::NoOutput, res);
                Ok(())
            }
            ReceiveOutput::ControlFrames(control_frames) => {
                for f in control_frames {
                    match f {
                        HFrame::MaxPushId { .. } => {
                            // TODO implement push
                            Ok(())
                        }
                        HFrame::Goaway { .. } | HFrame::CancelPush { .. } => {
                            Err(Error::HttpFrameUnexpected)
                        }
                        HFrame::PriorityUpdatePush { element_id, priority } => {
                            // TODO: check if the element_id references a promised push stream or
                            // is greater than the maximum Push ID.
                            self.events.priority_update(StreamId::from(element_id), priority);
                            Ok(())
                        }
                        HFrame::PriorityUpdateRequest { element_id, priority } => {
                            // check that the element_id references a request stream
                            // within the client-sided bidirectional stream limit
                            let element_stream_id = StreamId::new(element_id);
                            if !element_stream_id.is_bidi()
                                || !element_stream_id.is_client_initiated()
                                || !conn.is_stream_id_allowed(element_stream_id)
                            {
                                return Err(Error::HttpId)
                            }

                            self.events.priority_update(element_stream_id, priority);
                            Ok(())
                        }
                        _ => unreachable!(
                            "we should only put MaxPushId, Goaway and PriorityUpdates into control_frames."
                        ),
                    }?;
                }
                Ok(())
            }
            _ => Ok(()),
        }
    }

    /// Response data are read directly into a buffer supplied as a parameter of this function to
    /// avoid copying data.
    ///
    /// # Errors
    ///
    /// It returns an error if a stream does not exist or an error happen while reading a stream,
    /// e.g. early close, protocol error, etc.
    pub fn read_data(
        &mut self,
        conn: &mut Connection,
        now: Instant,
        stream_id: StreamId,
        buf: &mut [u8],
    ) -> Res<(usize, bool)> {
        qdebug!([self], "read_data from stream {}.", stream_id);
        let res = self.base_handler.read_data(conn, stream_id, buf);
        if let Err(e) = &res {
            if e.connection_error() {
                self.close(conn, now, e);
            }
        }
        res
    }
}

[ Dauer der Verarbeitung: 0.33 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge