Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/third_party/rust/neqo-bin/src/client/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 10 kB image not shown  

Quelle  http09.rs   Sprache: unbekannt

 
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! An [HTTP 0.9](https://www.w3.org/Protocols/HTTP/AsImplemented.html) client implementation.

use std::{
    cell::RefCell,
    collections::{HashMap, VecDeque},
    fs::File,
    io::{BufWriter, Write},
    net::SocketAddr,
    path::PathBuf,
    rc::Rc,
    time::Instant,
};

use neqo_common::{event::Provider, qdebug, qinfo, qwarn, Datagram};
use neqo_crypto::{AuthenticationStatus, ResumptionToken};
use neqo_transport::{
    CloseReason, Connection, ConnectionEvent, ConnectionIdGenerator, EmptyConnectionIdGenerator,
    Error, Output, RandomConnectionIdGenerator, State, StreamId, StreamType,
};
use url::Url;

use super::{get_output_file, qlog_new, Args, CloseState, Res};
use crate::STREAM_IO_BUFFER_SIZE;

pub struct Handler<'a> {
    streams: HashMap<StreamId, Option<BufWriter<File>>>,
    url_queue: VecDeque<Url>,
    handled_urls: Vec<Url>,
    all_paths: Vec<PathBuf>,
    args: &'a Args,
    token: Option<ResumptionToken>,
    needs_key_update: bool,
    read_buffer: Vec<u8>,
}

impl Handler<'_> {
    fn reinit(&mut self) {
        for url in self.handled_urls.drain(..) {
            self.url_queue.push_front(url);
        }
        self.streams.clear();
        self.all_paths.clear();
    }
}

impl super::Handler for Handler<'_> {
    type Client = Connection;

    fn handle(&mut self, client: &mut Self::Client) -> Res<bool> {
        while let Some(event) = client.next_event() {
            if self.needs_key_update {
                match client.initiate_key_update() {
                    Ok(()) => {
                        qdebug!("Keys updated");
                        self.needs_key_update = false;
                        self.download_urls(client);
                    }
                    Err(neqo_transport::Error::KeyUpdateBlocked) => (),
                    Err(e) => return Err(e.into()),
                }
            }

            match event {
                ConnectionEvent::AuthenticationNeeded => {
                    client.authenticated(AuthenticationStatus::Ok, Instant::now());
                }
                ConnectionEvent::RecvStreamReadable { stream_id } => {
                    self.read(client, stream_id)?;
                }
                ConnectionEvent::SendStreamWritable { stream_id } => {
                    qdebug!("stream {stream_id} writable");
                }
                ConnectionEvent::SendStreamComplete { stream_id } => {
                    qdebug!("stream {stream_id} complete");
                }
                ConnectionEvent::SendStreamCreatable { stream_type } => {
                    qdebug!("stream {stream_type:?} creatable");
                    if stream_type == StreamType::BiDi {
                        self.download_urls(client);
                    }
                }
                ConnectionEvent::StateChange(
                    State::WaitInitial | State::Handshaking | State::Connected,
                ) => {
                    qdebug!("{event:?}");
                    self.download_urls(client);
                }
                ConnectionEvent::ZeroRttRejected => {
                    qdebug!("{event:?}");
                    // All 0-RTT data was rejected. We need to retransmit it.
                    self.reinit();
                    self.download_urls(client);
                }
                ConnectionEvent::ResumptionToken(token) => {
                    self.token = Some(token);
                }
                _ => {
                    qwarn!("Unhandled event {event:?}");
                }
            }
        }

        if !self.streams.is_empty() || !self.url_queue.is_empty() {
            return Ok(false);
        }

        if self.args.resume && self.token.is_none() {
            self.token = client.take_resumption_token(Instant::now());
        }

        Ok(true)
    }

    fn take_token(&mut self) -> Option<ResumptionToken> {
        self.token.take()
    }
}

pub fn create_client(
    args: &Args,
    local_addr: SocketAddr,
    remote_addr: SocketAddr,
    hostname: &str,
    resumption_token: Option<ResumptionToken>,
) -> Res<Connection> {
    let alpn = match args.shared.alpn.as_str() {
        "hq-29" | "hq-30" | "hq-31" | "hq-32" => args.shared.alpn.as_str(),
        _ => "hq-interop",
    };
    let cid_generator: Rc<RefCell<dyn ConnectionIdGenerator>> = if args.cid_len == 0 {
        Rc::new(RefCell::new(EmptyConnectionIdGenerator::default()))
    } else {
        Rc::new(RefCell::new(RandomConnectionIdGenerator::new(
            args.cid_len.into(),
        )))
    };
    let mut client = Connection::new_client(
        hostname,
        &[alpn],
        cid_generator,
        local_addr,
        remote_addr,
        args.shared.quic_parameters.get(alpn),
        Instant::now(),
    )?;

    if let Some(tok) = resumption_token {
        client.enable_resumption(Instant::now(), tok)?;
    }

    let ciphers = args.get_ciphers();
    if !ciphers.is_empty() {
        client.set_ciphers(&ciphers)?;
    }

    client.set_qlog(qlog_new(args, hostname, client.odcid().unwrap())?);

    Ok(client)
}

impl TryFrom<&State> for CloseState {
    type Error = CloseReason;

    fn try_from(value: &State) -> Result<Self, Self::Error> {
        let (state, error) = match value {
            State::Closing { error, .. } | State::Draining { error, .. } => (Self::Closing, error),
            State::Closed(error) => (Self::Closed, error),
            _ => return Ok(Self::NotClosing),
        };

        if error.is_error() {
            Err(error.clone())
        } else {
            Ok(state)
        }
    }
}

impl super::Client for Connection {
    fn process_output(&mut self, now: Instant) -> Output {
        self.process_output(now)
    }

    fn process_multiple_input<'a>(
        &mut self,
        dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
        now: Instant,
    ) {
        self.process_multiple_input(dgrams, now);
    }

    fn close<S>(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S)
    where
        S: AsRef<str> + std::fmt::Display,
    {
        if !self.state().closed() {
            self.close(now, app_error, msg);
        }
    }

    fn is_closed(&self) -> Result<CloseState, CloseReason> {
        self.state().try_into()
    }

    fn stats(&self) -> neqo_transport::Stats {
        self.stats()
    }

    fn has_events(&self) -> bool {
        neqo_common::event::Provider::has_events(self)
    }
}

impl<'b> Handler<'b> {
    pub fn new(url_queue: VecDeque<Url>, args: &'b Args) -> Self {
        Self {
            streams: HashMap::new(),
            url_queue,
            handled_urls: Vec::new(),
            all_paths: Vec::new(),
            args,
            token: None,
            needs_key_update: args.key_update,
            read_buffer: vec![0; STREAM_IO_BUFFER_SIZE],
        }
    }

    fn download_urls(&mut self, client: &mut Connection) {
        loop {
            if self.url_queue.is_empty() {
                break;
            }
            if self.streams.len() >= self.args.concurrency {
                break;
            }
            if !self.download_next(client) {
                break;
            }
        }
    }

    fn download_next(&mut self, client: &mut Connection) -> bool {
        if self.needs_key_update {
            qdebug!("Deferring requests until after first key update");
            return false;
        }
        let url = self
            .url_queue
            .pop_front()
            .expect("download_next called with empty queue");
        match client.stream_create(StreamType::BiDi) {
            Ok(client_stream_id) => {
                qinfo!("Created stream {client_stream_id} for {url}");
                let req = format!("GET {}\r\n", url.path());
                _ = client
                    .stream_send(client_stream_id, req.as_bytes())
                    .unwrap();
                client.stream_close_send(client_stream_id).unwrap();
                let out_file =
                    get_output_file(&url, self.args.output_dir.as_ref(), &mut self.all_paths);
                self.streams.insert(client_stream_id, out_file);
                self.handled_urls.push(url);
                true
            }
            Err(e @ (Error::StreamLimitError | Error::ConnectionState)) => {
                qwarn!("Cannot create stream {e:?}");
                self.url_queue.push_front(url);
                false
            }
            Err(e) => {
                panic!("Error creating stream {e:?}");
            }
        }
    }

    /// Read and maybe print received data from a stream.
    // Returns bool: was fin received?
    fn read_from_stream(
        client: &mut Connection,
        stream_id: StreamId,
        read_buffer: &mut [u8],
        output_read_data: bool,
        maybe_out_file: &mut Option<BufWriter<File>>,
    ) -> Res<bool> {
        loop {
            let (sz, fin) = client.stream_recv(stream_id, read_buffer)?;
            if sz == 0 {
                return Ok(fin);
            }
            let read_buffer = &read_buffer[0..sz];

            if let Some(out_file) = maybe_out_file {
                out_file.write_all(read_buffer)?;
            } else if !output_read_data {
                qdebug!("READ[{stream_id}]: {} bytes", read_buffer.len());
            } else {
                qdebug!(
                    "READ[{}]: {}",
                    stream_id,
                    std::str::from_utf8(read_buffer).unwrap()
                );
            }
            if fin {
                return Ok(true);
            }
        }
    }

    fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<()> {
        match self.streams.get_mut(&stream_id) {
            None => {
                qwarn!("Data on unexpected stream: {stream_id}");
                return Ok(());
            }
            Some(maybe_out_file) => {
                let fin_recvd = Self::read_from_stream(
                    client,
                    stream_id,
                    &mut self.read_buffer,
                    self.args.output_read_data,
                    maybe_out_file,
                )?;

                if fin_recvd {
                    if let Some(mut out_file) = maybe_out_file.take() {
                        out_file.flush()?;
                    } else {
                        qinfo!("<FIN[{stream_id}]>");
                    }
                    self.streams.remove(&stream_id);
                    self.download_urls(client);
                }
            }
        }
        Ok(())
    }
}

[ Dauer der Verarbeitung: 0.27 Sekunden  (vorverarbeitet)  ]