Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/third_party/rust/neqo-bin/src/client/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 14 kB image not shown  

Quelle  http3.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! An HTTP 3 client implementation.

use std::{
    cell::RefCell,
    collections::{HashMap, VecDeque},
    fmt::Display,
    fs::File,
    io::{BufWriter, Write},
    net::SocketAddr,
    path::PathBuf,
    rc::Rc,
    time::Instant,
};

use neqo_common::{event::Provider, hex, qdebug, qinfo, qwarn, Datagram, Header};
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
use neqo_http3::{Error, Http3Client, Http3ClientEvent, Http3Parameters, Http3State, Priority};
use neqo_transport::{
    AppError, CloseReason, Connection, EmptyConnectionIdGenerator, Error as TransportError, Output,
    RandomConnectionIdGenerator, StreamId,
};
use url::Url;

use super::{get_output_file, qlog_new, Args, CloseState, Res};
use crate::{send_data::SendData, STREAM_IO_BUFFER_SIZE};

pub struct Handler<'a> {
    #[allow(clippy::struct_field_names)]
    url_handler: UrlHandler<'a>,
    token: Option<ResumptionToken>,
    output_read_data: bool,
    read_buffer: Vec<u8>,
}

impl<'a> Handler<'a> {
    pub(crate) fn new(url_queue: VecDeque<Url>, args: &'a Args) -> Self {
        let url_handler = UrlHandler {
            url_queue,
            handled_urls: Vec::new(),
            stream_handlers: HashMap::new(),
            all_paths: Vec::new(),
            args,
        };

        Self {
            url_handler,
            token: None,
            output_read_data: args.output_read_data,
            read_buffer: vec![0; STREAM_IO_BUFFER_SIZE],
        }
    }
}

pub fn create_client(
    args: &Args,
    local_addr: SocketAddr,
    remote_addr: SocketAddr,
    hostname: &str,
    resumption_token: Option<ResumptionToken>,
) -> Res<Http3Client> {
    let cid_generator: Rc<RefCell<dyn neqo_transport::ConnectionIdGenerator>> = if args.cid_len == 0
    {
        Rc::new(RefCell::new(EmptyConnectionIdGenerator::default()))
    } else {
        Rc::new(RefCell::new(RandomConnectionIdGenerator::new(
            args.cid_len.into(),
        )))
    };
    let mut transport = Connection::new_client(
        hostname,
        &[&args.shared.alpn],
        cid_generator,
        local_addr,
        remote_addr,
        args.shared.quic_parameters.get(args.shared.alpn.as_str()),
        Instant::now(),
    )?;
    let ciphers = args.get_ciphers();
    if !ciphers.is_empty() {
        transport.set_ciphers(&ciphers)?;
    }
    let mut client = Http3Client::new_with_conn(
        transport,
        Http3Parameters::default()
            .max_table_size_encoder(args.shared.max_table_size_encoder)
            .max_table_size_decoder(args.shared.max_table_size_decoder)
            .max_blocked_streams(args.shared.max_blocked_streams)
            .max_concurrent_push_streams(args.max_concurrent_push_streams),
    );

    let qlog = qlog_new(args, hostname, client.connection_id())?;
    client.set_qlog(qlog);
    if let Some(ech) = &args.ech {
        client.enable_ech(ech).expect("enable ECH");
    }
    if let Some(token) = resumption_token {
        client
            .enable_resumption(Instant::now(), token)
            .expect("enable resumption");
    }

    Ok(client)
}

impl TryFrom<Http3State> for CloseState {
    type Error = CloseReason;

    fn try_from(value: Http3State) -> Result<Self, Self::Error> {
        let (state, error) = match value {
            Http3State::Closing(error) => (Self::Closing, error),
            Http3State::Closed(error) => (Self::Closed, error),
            _ => return Ok(Self::NotClosing),
        };

        if error.is_error() {
            Err(error)
        } else {
            Ok(state)
        }
    }
}

impl super::Client for Http3Client {
    fn is_closed(&self) -> Result<CloseState, CloseReason> {
        self.state().try_into()
    }

    fn process_output(&mut self, now: Instant) -> Output {
        self.process_output(now)
    }

    fn process_multiple_input<'a>(
        &mut self,
        dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
        now: Instant,
    ) {
        self.process_multiple_input(dgrams, now);
    }

    fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
    where
        S: AsRef<str> + Display,
    {
        self.close(now, app_error, msg);
    }

    fn stats(&self) -> neqo_transport::Stats {
        self.transport_stats()
    }

    fn has_events(&self) -> bool {
        neqo_common::event::Provider::has_events(self)
    }
}

impl Handler<'_> {
    fn reinit(&mut self) {
        for url in self.url_handler.handled_urls.drain(..) {
            self.url_handler.url_queue.push_front(url);
        }
        self.url_handler.stream_handlers.clear();
        self.url_handler.all_paths.clear();
    }
}

impl super::Handler for Handler<'_> {
    type Client = Http3Client;

    fn handle(&mut self, client: &mut Http3Client) -> Res<bool> {
        while let Some(event) = client.next_event() {
            match event {
                Http3ClientEvent::AuthenticationNeeded => {
                    client.authenticated(AuthenticationStatus::Ok, Instant::now());
                }
                Http3ClientEvent::HeaderReady {
                    stream_id,
                    headers,
                    fin,
                    ..
                } => {
                    if let Some(handler) = self.url_handler.stream_handler(stream_id) {
                        handler.process_header_ready(stream_id, fin, headers);
                    } else {
                        qwarn!("Data on unexpected stream: {stream_id}");
                    }
                    if fin {
                        self.url_handler.on_stream_fin(client, stream_id);
                    }
                }
                Http3ClientEvent::DataReadable { stream_id } => {
                    let mut stream_done = false;
                    match self.url_handler.stream_handler(stream_id) {
                        None => {
                            qwarn!("Data on unexpected stream: {stream_id}");
                        }
                        Some(handler) => loop {
                            let (sz, fin) = client
                                .read_data(Instant::now(), stream_id, &mut self.read_buffer)
                                .expect("Read should succeed");

                            handler.process_data_readable(
                                stream_id,
                                fin,
                                &self.read_buffer[..sz],
                                self.output_read_data,
                            )?;

                            if fin {
                                stream_done = true;
                                break;
                            }

                            if sz == 0 {
                                break;
                            }
                        },
                    }

                    if stream_done {
                        self.url_handler.on_stream_fin(client, stream_id);
                    }
                }
                Http3ClientEvent::DataWritable { stream_id } => {
                    match self.url_handler.stream_handler(stream_id) {
                        None => {
                            qwarn!("Data on unexpected stream: {stream_id}");
                        }
                        Some(handler) => {
                            handler.process_data_writable(client, stream_id);
                        }
                    }
                }
                Http3ClientEvent::StateChange(Http3State::Connected)
                | Http3ClientEvent::RequestsCreatable => {
                    qinfo!("{event:?}");
                    self.url_handler.process_urls(client);
                }
                Http3ClientEvent::ZeroRttRejected => {
                    qinfo!("{event:?}");
                    // All 0-RTT data was rejected. We need to retransmit it.
                    self.reinit();
                    self.url_handler.process_urls(client);
                }
                Http3ClientEvent::ResumptionToken(t) => self.token = Some(t),
                _ => {
                    qwarn!("Unhandled event {event:?}");
                }
            }
        }

        Ok(self.url_handler.done())
    }

    fn take_token(&mut self) -> Option<ResumptionToken> {
        self.token.take()
    }
}

trait StreamHandler {
    fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec<Header>);
    fn process_data_readable(
        &mut self,
        stream_id: StreamId,
        fin: bool,
        data: &[u8],
        output_read_data: bool,
    ) -> Res<bool>;
    fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId);
}

struct DownloadStreamHandler {
    out_file: Option<BufWriter<File>>,
}

impl StreamHandler for DownloadStreamHandler {
    fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec<Header>) {
        if self.out_file.is_none() {
            qdebug!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}");
        }
    }

    fn process_data_readable(
        &mut self,
        stream_id: StreamId,
        fin: bool,
        data: &[u8],
        output_read_data: bool,
    ) -> Res<bool> {
        if let Some(out_file) = &mut self.out_file {
            if !data.is_empty() {
                out_file.write_all(data)?;
            }
            return Ok(true);
        } else if !output_read_data {
            qdebug!("READ[{stream_id}]: {} bytes", data.len());
        } else if let Ok(txt) = std::str::from_utf8(data) {
            qdebug!("READ[{stream_id}]: {txt}");
        } else {
            qdebug!("READ[{}]: 0x{}", stream_id, hex(data));
        }

        if fin {
            if let Some(mut out_file) = self.out_file.take() {
                out_file.flush()?;
            } else {
                qdebug!("<FIN[{stream_id}]>");
            }
        }

        Ok(true)
    }

    fn process_data_writable(&mut self, _client: &mut Http3Client, _stream_id: StreamId) {}
}

struct UploadStreamHandler {
    data: SendData,
    start: Instant,
}

impl StreamHandler for UploadStreamHandler {
    fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec<Header>) {
        qdebug!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}");
    }

    fn process_data_readable(
        &mut self,
        stream_id: StreamId,
        _fin: bool,
        data: &[u8],
        _output_read_data: bool,
    ) -> Res<bool> {
        if let Ok(txt) = std::str::from_utf8(data) {
            let trimmed_txt = txt.trim_end_matches(char::from(0));
            let parsed: usize = trimmed_txt.parse().unwrap();
            if parsed == self.data.len() {
                let upload_time = Instant::now().duration_since(self.start);
                qinfo!("Stream ID: {stream_id:?}, Upload time: {upload_time:?}");
            }
        } else {
            panic!("Unexpected data [{}]: 0x{}", stream_id, hex(data));
        }
        Ok(true)
    }

    fn process_data_writable(&mut self, client: &mut Http3Client, stream_id: StreamId) {
        let done = self
            .data
            .send(|chunk| client.send_data(stream_id, chunk).unwrap());
        if done {
            client.stream_close_send(stream_id).unwrap();
        }
    }
}

struct UrlHandler<'a> {
    url_queue: VecDeque<Url>,
    handled_urls: Vec<Url>,
    stream_handlers: HashMap<StreamId, Box<dyn StreamHandler>>,
    all_paths: Vec<PathBuf>,
    args: &'a Args,
}

impl UrlHandler<'_> {
    fn stream_handler(&mut self, stream_id: StreamId) -> Option<&mut Box<dyn StreamHandler>> {
        self.stream_handlers.get_mut(&stream_id)
    }

    fn process_urls(&mut self, client: &mut Http3Client) {
        loop {
            if self.url_queue.is_empty() {
                break;
            }
            if self.stream_handlers.len() >= self.args.concurrency {
                break;
            }
            if !self.next_url(client) {
                break;
            }
        }
    }

    fn next_url(&mut self, client: &mut Http3Client) -> bool {
        let url = self
            .url_queue
            .pop_front()
            .expect("download_next called with empty queue");
        match client.fetch(
            Instant::now(),
            &self.args.method,
            &url,
            &to_headers(&self.args.header),
            Priority::default(),
        ) {
            Ok(client_stream_id) => {
                qdebug!("Successfully created stream id {client_stream_id} for {url}");

                let handler: Box<dyn StreamHandler> = match self.args.method.as_str() {
                    "GET" => {
                        let out_file = get_output_file(
                            &url,
                            self.args.output_dir.as_ref(),
                            &mut self.all_paths,
                        );
                        client.stream_close_send(client_stream_id).unwrap();
                        Box::new(DownloadStreamHandler { out_file })
                    }
                    "POST" => Box::new(UploadStreamHandler {
                        data: SendData::zeroes(self.args.upload_size),
                        start: Instant::now(),
                    }),
                    _ => unimplemented!(),
                };

                self.stream_handlers.insert(client_stream_id, handler);
                self.handled_urls.push(url);
                true
            }
            Err(
                Error::TransportError(TransportError::StreamLimitError)
                | Error::StreamLimitError
                | Error::Unavailable,
            ) => {
                self.url_queue.push_front(url);
                false
            }
            Err(e) => {
                panic!("Can't create stream {e}");
            }
        }
    }

    fn done(&self) -> bool {
        self.stream_handlers.is_empty() && self.url_queue.is_empty()
    }

    fn on_stream_fin(&mut self, client: &mut Http3Client, stream_id: StreamId) {
        self.stream_handlers.remove(&stream_id);
        self.process_urls(client);
    }
}

fn to_headers(values: &[impl AsRef<str>]) -> Vec<Header> {
    values
        .iter()
        .scan(None, |state, value| {
            if let Some(name) = state.take() {
                *state = None;
                Some(Header::new(name, value.as_ref()))
            } else {
                *state = Some(value.as_ref().to_string());
                None
            }
        })
        .collect()
}

[ Dauer der Verarbeitung: 0.29 Sekunden  (vorverarbeitet)  ]