Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  recv_message.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{cell::RefCell, cmp::min, collections::VecDeque, fmt::Debug, rc::Rc};

use neqo_common::{qdebug, qinfo, qtrace, Header};
use neqo_qpack::decoder::QPackDecoder;
use neqo_transport::{Connection, StreamId};

use crate::{
    frames::{hframe::HFrameType, FrameReader, HFrame, StreamReaderConnectionWrapper},
    headers_checks::{headers_valid, is_interim},
    priority::PriorityHandler,
    push_controller::PushController,
    qlog, CloseType, Error, Http3StreamInfo, Http3StreamType, HttpRecvStream, HttpRecvStreamEvents,
    MessageType, Priority, ReceiveOutput, RecvStream, Res, Stream,
};

#[allow(clippy::module_name_repetitions)]
pub struct RecvMessageInfo {
    pub message_type: MessageType,
    pub stream_type: Http3StreamType,
    pub stream_id: StreamId,
    pub first_frame_type: Option<u64>,
}

/*
 * Response stream state:
 *    WaitingForResponseHeaders : we wait for headers. in this state we can
 *                                also get a PUSH_PROMISE frame.
 *    DecodingHeaders : In this step the headers will be decoded. The stream
 *                      may be blocked in this state on encoder instructions.
 *    WaitingForData : we got HEADERS, we are waiting for one or more data
 *                     frames. In this state we can receive one or more
 *                     PUSH_PROMIS frames or a HEADERS frame carrying trailers.
 *    ReadingData : we got a DATA frame, now we letting the app read payload.
 *                  From here we will go back to WaitingForData state to wait
 *                  for more data frames or to CLosed state
 *    ClosePending : waiting for app to pick up data, after that we can delete
 * the TransactionClient.
 *    Closed
 *    ExtendedConnect: this request is for a WebTransport session. In this
 *                         state RecvMessage will not be treated as a HTTP
 *                         stream anymore. It is waiting to be transformed
 *                         into WebTransport session or to be closed.
 */
#[derive(Debug)]
enum RecvMessageState {
    WaitingForResponseHeaders { frame_reader: FrameReader },
    DecodingHeaders { header_block: Vec<u8>, fin: bool },
    WaitingForData { frame_reader: FrameReader },
    ReadingData { remaining_data_len: usize },
    WaitingForFinAfterTrailers { frame_reader: FrameReader },
    ClosePending, // Close must first be read by application
    Closed,
    ExtendedConnect,
}

#[derive(Debug)]
struct PushInfo {
    push_id: u64,
    header_block: Vec<u8>,
}

#[derive(Debug)]
pub struct RecvMessage {
    state: RecvMessageState,
    message_type: MessageType,
    stream_type: Http3StreamType,
    qpack_decoder: Rc<RefCell<QPackDecoder>>,
    conn_events: Box<dyn HttpRecvStreamEvents>,
    push_handler: Option<Rc<RefCell<PushController>>>,
    stream_id: StreamId,
    priority_handler: PriorityHandler,
    blocked_push_promise: VecDeque<PushInfo>,
}

impl ::std::fmt::Display for RecvMessage {
    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
        write!(f, "RecvMessage stream_id:{}", self.stream_id)
    }
}

impl RecvMessage {
    pub fn new(
        message_info: &RecvMessageInfo,
        qpack_decoder: Rc<RefCell<QPackDecoder>>,
        conn_events: Box<dyn HttpRecvStreamEvents>,
        push_handler: Option<Rc<RefCell<PushController>>>,
        priority_handler: PriorityHandler,
    ) -> Self {
        Self {
            state: RecvMessageState::WaitingForResponseHeaders {
                frame_reader: message_info
                    .first_frame_type
                    .map_or_else(FrameReader::new, |frame_type| {
                        FrameReader::new_with_type(HFrameType(frame_type))
                    }),
            },
            message_type: message_info.message_type,
            stream_type: message_info.stream_type,
            qpack_decoder,
            conn_events,
            push_handler,
            stream_id: message_info.stream_id,
            priority_handler,
            blocked_push_promise: VecDeque::new(),
        }
    }

    fn handle_headers_frame(&mut self, header_block: Vec<u8>, fin: bool) -> Res<()> {
        match self.state {
            RecvMessageState::WaitingForResponseHeaders {..} => {
                if header_block.is_empty() {
                    return Err(Error::HttpGeneralProtocolStream);
                }
                    self.state = RecvMessageState::DecodingHeaders { header_block, fin };
             }
            RecvMessageState::WaitingForData { ..} => {
                // TODO implement trailers, for now just ignore them.
                self.state = RecvMessageState::WaitingForFinAfterTrailers{frame_reader: FrameReader::new()};
            }
            RecvMessageState::WaitingForFinAfterTrailers {..} => {
                return Err(Error::HttpFrameUnexpected);
            }
            _ => unreachable!("This functions is only called in WaitingForResponseHeaders | WaitingForData | WaitingForFinAfterTrailers state.")
         }
        Ok(())
    }

    fn handle_data_frame(&mut self, len: u64, fin: bool) -> Res<()> {
        match self.state {
            RecvMessageState::WaitingForResponseHeaders {..} | RecvMessageState::WaitingForFinAfterTrailers {..} => {
                return Err(Error::HttpFrameUnexpected);
            }
            RecvMessageState::WaitingForData {..} => {
                if len > 0 {
                    if fin {
                        return Err(Error::HttpFrame);
                    }
                    self.state = RecvMessageState::ReadingData {
                        remaining_data_len: usize::try_from(len).or(Err(Error::HttpFrame))?,
                    };
                }
            }
            _ => unreachable!("This functions is only called in WaitingForResponseHeaders | WaitingForData | WaitingForFinAfterTrailers state.")
        }
        Ok(())
    }

    fn add_headers(&mut self, mut headers: Vec<Header>, fin: bool) -> Res<()> {
        qtrace!([self], "Add new headers fin={}", fin);
        let interim = match self.message_type {
            MessageType::Request => false,
            MessageType::Response => is_interim(&headers)?,
        };
        headers_valid(&headers, self.message_type)?;
        if self.message_type == MessageType::Response {
            headers.retain(Header::is_allowed_for_response);
        }

        if fin && interim {
            return Err(Error::HttpGeneralProtocolStream);
        }

        let is_web_transport = self.message_type == MessageType::Request
            && headers
                .iter()
                .any(|h| h.name() == ":method" && h.value() == "CONNECT")
            && headers
                .iter()
                .any(|h| h.name() == ":protocol" && h.value() == "webtransport");
        if is_web_transport {
            self.conn_events
                .extended_connect_new_session(self.stream_id, headers);
        } else {
            self.conn_events
                .header_ready(self.get_stream_info(), headers, interim, fin);
        }

        if fin {
            self.set_closed();
        } else {
            self.state = if is_web_transport {
                self.stream_type = Http3StreamType::ExtendedConnect;
                RecvMessageState::ExtendedConnect
            } else if interim {
                RecvMessageState::WaitingForResponseHeaders {
                    frame_reader: FrameReader::new(),
                }
            } else {
                RecvMessageState::WaitingForData {
                    frame_reader: FrameReader::new(),
                }
            };
        }
        Ok(())
    }

    fn set_state_to_close_pending(&mut self, post_readable_event: bool) -> Res<()> {
        // Stream has received fin. Depending on headers state set header_ready
        // or data_readable event so that app can pick up the fin.
        qtrace!([self], "set_state_to_close_pending: state={:?}", self.state);

        match self.state {
            RecvMessageState::WaitingForResponseHeaders { .. } => {
                return Err(Error::HttpGeneralProtocolStream);
            }
            RecvMessageState::ReadingData { .. } => {}
            RecvMessageState::WaitingForData { .. }
            | RecvMessageState::WaitingForFinAfterTrailers { .. } => {
                if post_readable_event {
                    self.conn_events.data_readable(self.get_stream_info());
                }
            }
            _ => unreachable!("Closing an already closed transaction."),
        }
        if !matches!(self.state, RecvMessageState::Closed) {
            self.state = RecvMessageState::ClosePending;
        }
        Ok(())
    }

    fn handle_push_promise(&mut self, push_id: u64, header_block: Vec<u8>) -> Res<()> {
        if self.push_handler.is_none() {
            return Err(Error::HttpFrameUnexpected);
        }

        if !self.blocked_push_promise.is_empty() {
            self.blocked_push_promise.push_back(PushInfo {
                push_id,
                header_block,
            });
        } else if let Some(headers) = self
            .qpack_decoder
            .borrow_mut()
            .decode_header_block(&header_block, self.stream_id)?
        {
            self.push_handler
                .as_ref()
                .ok_or(Error::HttpFrameUnexpected)?
                .borrow_mut()
                .new_push_promise(push_id, self.stream_id, headers)?;
        } else {
            self.blocked_push_promise.push_back(PushInfo {
                push_id,
                header_block,
            });
        }
        Ok(())
    }

    fn receive_internal(&mut self, conn: &mut Connection, post_readable_event: bool) -> Res<()> {
        let label = ::neqo_common::log_subject!(::log::Level::Debug, self);
        loop {
            qdebug!([label], "state={:?}.", self.state);
            match &mut self.state {
                // In the following 3 states we need to read frames.
                RecvMessageState::WaitingForResponseHeaders { frame_reader }
                | RecvMessageState::WaitingForData { frame_reader }
                | RecvMessageState::WaitingForFinAfterTrailers { frame_reader } => {
                    match frame_reader.receive(&mut StreamReaderConnectionWrapper::new(
                        conn,
                        self.stream_id,
                    ))? {
                        (None, true) => {
                            break self.set_state_to_close_pending(post_readable_event);
                        }
                        (None, false) => break Ok(()),
                        (Some(frame), fin) => {
                            qdebug!(
                                [self],
                                "A new frame has been received: {:?}; state={:?} fin={}",
                                frame,
                                self.state,
                                fin,
                            );
                            match frame {
                                HFrame::Headers { header_block } => {
                                    self.handle_headers_frame(header_block, fin)?;
                                }
                                HFrame::Data { len } => self.handle_data_frame(len, fin)?,
                                HFrame::PushPromise {
                                    push_id,
                                    header_block,
                                } => self.handle_push_promise(push_id, header_block)?,
                                _ => break Err(Error::HttpFrameUnexpected),
                            }
                            if matches!(self.state, RecvMessageState::Closed) {
                                break Ok(());
                            }
                            if fin
                                && !matches!(self.state, RecvMessageState::DecodingHeaders { .. })
                            {
                                break self.set_state_to_close_pending(post_readable_event);
                            }
                        }
                    };
                }
                RecvMessageState::DecodingHeaders {
                    ref header_block,
                    fin,
                } => {
                    if self
                        .qpack_decoder
                        .borrow()
                        .refers_dynamic_table(header_block)?
                        && !self.blocked_push_promise.is_empty()
                    {
                        qinfo!(
                            [self],
                            "decoding header is blocked waiting for a push_promise header block."
                        );
                        break Ok(());
                    }
                    let done = *fin;
                    let d_headers = self
                        .qpack_decoder
                        .borrow_mut()
                        .decode_header_block(header_block, self.stream_id)?;
                    if let Some(headers) = d_headers {
                        self.add_headers(headers, done)?;
                        if matches!(
                            self.state,
                            RecvMessageState::Closed | RecvMessageState::ExtendedConnect
                        ) {
                            break Ok(());
                        }
                    } else {
                        qinfo!([self], "decoding header is blocked.");
                        break Ok(());
                    }
                }
                RecvMessageState::ReadingData { .. } => {
                    if post_readable_event {
                        self.conn_events.data_readable(self.get_stream_info());
                    }
                    break Ok(());
                }
                RecvMessageState::ClosePending | RecvMessageState::Closed => {
                    panic!("Stream readable after being closed!");
                }
                RecvMessageState::ExtendedConnect => {
                    // Ignore read event, this request is waiting to be picked up by a new
                    // WebTransportSession
                    break Ok(());
                }
            };
        }
    }

    fn set_closed(&mut self) {
        if !self.blocked_push_promise.is_empty() {
            self.qpack_decoder
                .borrow_mut()
                .cancel_stream(self.stream_id);
        }
        self.state = RecvMessageState::Closed;
        self.conn_events
            .recv_closed(self.get_stream_info(), CloseType::Done);
    }

    const fn closing(&self) -> bool {
        matches!(
            self.state,
            RecvMessageState::ClosePending | RecvMessageState::Closed
        )
    }

    const fn get_stream_info(&self) -> Http3StreamInfo {
        Http3StreamInfo::new(self.stream_id, Http3StreamType::Http)
    }
}

impl Stream for RecvMessage {
    fn stream_type(&self) -> Http3StreamType {
        self.stream_type
    }
}

impl RecvStream for RecvMessage {
    fn receive(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
        self.receive_internal(conn, true)?;
        Ok((
            ReceiveOutput::NoOutput,
            matches!(self.state, RecvMessageState::Closed),
        ))
    }

    fn reset(&mut self, close_type: CloseType) -> Res<()> {
        if !self.closing() || !self.blocked_push_promise.is_empty() {
            self.qpack_decoder
                .borrow_mut()
                .cancel_stream(self.stream_id);
        }
        self.conn_events
            .recv_closed(self.get_stream_info(), close_type);
        self.state = RecvMessageState::Closed;
        Ok(())
    }

    fn read_data(&mut self, conn: &mut Connection, buf: &mut [u8]) -> Res<(usize, bool)> {
        let mut written = 0;
        loop {
            match self.state {
                RecvMessageState::ReadingData {
                    ref mut remaining_data_len,
                } => {
                    let to_read = min(*remaining_data_len, buf.len() - written);
                    let (amount, fin) = conn
                        .stream_recv(self.stream_id, &mut buf[written..written + to_read])
                        .map_err(|e| Error::map_stream_recv_errors(&Error::from(e)))?;
                    qlog::h3_data_moved_up(conn.qlog_mut(), self.stream_id, amount);

                    debug_assert!(amount <= to_read);
                    *remaining_data_len -= amount;
                    written += amount;

                    if fin {
                        if *remaining_data_len > 0 {
                            return Err(Error::HttpFrame);
                        }
                        self.set_closed();
                        break Ok((written, fin));
                    } else if *remaining_data_len == 0 {
                        self.state = RecvMessageState::WaitingForData {
                            frame_reader: FrameReader::new(),
                        };
                        self.receive_internal(conn, false)?;
                    } else {
                        break Ok((written, false));
                    }
                }
                RecvMessageState::ClosePending => {
                    self.set_closed();
                    break Ok((written, true));
                }
                _ => break Ok((written, false)),
            }
        }
    }

    fn http_stream(&mut self) -> Option<&mut dyn HttpRecvStream> {
        Some(self)
    }
}

impl HttpRecvStream for RecvMessage {
    fn header_unblocked(&mut self, conn: &mut Connection) -> Res<(ReceiveOutput, bool)> {
        while let Some(p) = self.blocked_push_promise.front() {
            if let Some(headers) = self
                .qpack_decoder
                .borrow_mut()
                .decode_header_block(&p.header_block, self.stream_id)?
            {
                self.push_handler
                    .as_ref()
                    .ok_or(Error::HttpFrameUnexpected)?
                    .borrow_mut()
                    .new_push_promise(p.push_id, self.stream_id, headers)?;
                self.blocked_push_promise.pop_front();
            } else {
                return Ok((ReceiveOutput::NoOutput, false));
            }
        }

        self.receive(conn)
    }

    fn maybe_update_priority(&mut self, priority: Priority) -> bool {
        self.priority_handler.maybe_update_priority(priority)
    }

    fn priority_update_frame(&mut self) -> Option<HFrame> {
        self.priority_handler.maybe_encode_frame(self.stream_id)
    }

    fn priority_update_sent(&mut self) {
        self.priority_handler.priority_update_sent();
    }

    fn set_new_listener(&mut self, conn_events: Box<dyn HttpRecvStreamEvents>) {
        self.state = RecvMessageState::WaitingForData {
            frame_reader: FrameReader::new(),
        };
        self.conn_events = conn_events;
    }

    fn extended_connect_wait_for_response(&self) -> bool {
        matches!(self.state, RecvMessageState::ExtendedConnect)
    }
}

[ Dauer der Verarbeitung: 0.27 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge