Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/third_party/rust/hyper/src/proto/h2/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 15 kB image not shown  

Quelle  ping.rs   Sprache: unbekannt

 
/// HTTP2 Ping usage
///
/// hyper uses HTTP2 pings for two purposes:
///
/// 1. Adaptive flow control using BDP
/// 2. Connection keep-alive
///
/// Both cases are optional.
///
/// # BDP Algorithm
///
/// 1. When receiving a DATA frame, if a BDP ping isn't outstanding:
///   1a. Record current time.
///   1b. Send a BDP ping.
/// 2. Increment the number of received bytes.
/// 3. When the BDP ping ack is received:
///   3a. Record duration from sent time.
///   3b. Merge RTT with a running average.
///   3c. Calculate bdp as bytes/rtt.
///   3d. If bdp is over 2/3 max, set new max to bdp and update windows.

#[cfg(feature = "runtime")]
use std::fmt;
#[cfg(feature = "runtime")]
use std::future::Future;
#[cfg(feature = "runtime")]
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{self, Poll};
use std::time::Duration;
#[cfg(not(feature = "runtime"))]
use std::time::Instant;

use h2::{Ping, PingPong};
#[cfg(feature = "runtime")]
use tokio::time::{Instant, Sleep};
use tracing::{debug, trace};

type WindowSize = u32;

pub(super) fn disabled() -> Recorder {
    Recorder { shared: None }
}

pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger) {
    debug_assert!(
        config.is_enabled(),
        "ping channel requires bdp or keep-alive config",
    );

    let bdp = config.bdp_initial_window.map(|wnd| Bdp {
        bdp: wnd,
        max_bandwidth: 0.0,
        rtt: 0.0,
        ping_delay: Duration::from_millis(100),
        stable_count: 0,
    });

    let (bytes, next_bdp_at) = if bdp.is_some() {
        (Some(0), Some(Instant::now()))
    } else {
        (None, None)
    };

    #[cfg(feature = "runtime")]
    let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
        interval,
        timeout: config.keep_alive_timeout,
        while_idle: config.keep_alive_while_idle,
        timer: Box::pin(tokio::time::sleep(interval)),
        state: KeepAliveState::Init,
    });

    #[cfg(feature = "runtime")]
    let last_read_at = keep_alive.as_ref().map(|_| Instant::now());

    let shared = Arc::new(Mutex::new(Shared {
        bytes,
        #[cfg(feature = "runtime")]
        last_read_at,
        #[cfg(feature = "runtime")]
        is_keep_alive_timed_out: false,
        ping_pong,
        ping_sent_at: None,
        next_bdp_at,
    }));

    (
        Recorder {
            shared: Some(shared.clone()),
        },
        Ponger {
            bdp,
            #[cfg(feature = "runtime")]
            keep_alive,
            shared,
        },
    )
}

#[derive(Clone)]
pub(super) struct Config {
    pub(super) bdp_initial_window: Option<WindowSize>,
    /// If no frames are received in this amount of time, a PING frame is sent.
    #[cfg(feature = "runtime")]
    pub(super) keep_alive_interval: Option<Duration>,
    /// After sending a keepalive PING, the connection will be closed if
    /// a pong is not received in this amount of time.
    #[cfg(feature = "runtime")]
    pub(super) keep_alive_timeout: Duration,
    /// If true, sends pings even when there are no active streams.
    #[cfg(feature = "runtime")]
    pub(super) keep_alive_while_idle: bool,
}

#[derive(Clone)]
pub(crate) struct Recorder {
    shared: Option<Arc<Mutex<Shared>>>,
}

pub(super) struct Ponger {
    bdp: Option<Bdp>,
    #[cfg(feature = "runtime")]
    keep_alive: Option<KeepAlive>,
    shared: Arc<Mutex<Shared>>,
}

struct Shared {
    ping_pong: PingPong,
    ping_sent_at: Option<Instant>,

    // bdp
    /// If `Some`, bdp is enabled, and this tracks how many bytes have been
    /// read during the current sample.
    bytes: Option<usize>,
    /// We delay a variable amount of time between BDP pings. This allows us
    /// to send less pings as the bandwidth stabilizes.
    next_bdp_at: Option<Instant>,

    // keep-alive
    /// If `Some`, keep-alive is enabled, and the Instant is how long ago
    /// the connection read the last frame.
    #[cfg(feature = "runtime")]
    last_read_at: Option<Instant>,

    #[cfg(feature = "runtime")]
    is_keep_alive_timed_out: bool,
}

struct Bdp {
    /// Current BDP in bytes
    bdp: u32,
    /// Largest bandwidth we've seen so far.
    max_bandwidth: f64,
    /// Round trip time in seconds
    rtt: f64,
    /// Delay the next ping by this amount.
    ///
    /// This will change depending on how stable the current bandwidth is.
    ping_delay: Duration,
    /// The count of ping round trips where BDP has stayed the same.
    stable_count: u32,
}

#[cfg(feature = "runtime")]
struct KeepAlive {
    /// If no frames are received in this amount of time, a PING frame is sent.
    interval: Duration,
    /// After sending a keepalive PING, the connection will be closed if
    /// a pong is not received in this amount of time.
    timeout: Duration,
    /// If true, sends pings even when there are no active streams.
    while_idle: bool,

    state: KeepAliveState,
    timer: Pin<Box<Sleep>>,
}

#[cfg(feature = "runtime")]
enum KeepAliveState {
    Init,
    Scheduled,
    PingSent,
}

pub(super) enum Ponged {
    SizeUpdate(WindowSize),
    #[cfg(feature = "runtime")]
    KeepAliveTimedOut,
}

#[cfg(feature = "runtime")]
#[derive(Debug)]
pub(super) struct KeepAliveTimedOut;

// ===== impl Config =====

impl Config {
    pub(super) fn is_enabled(&self) -> bool {
        #[cfg(feature = "runtime")]
        {
            self.bdp_initial_window.is_some() || self.keep_alive_interval.is_some()
        }

        #[cfg(not(feature = "runtime"))]
        {
            self.bdp_initial_window.is_some()
        }
    }
}

// ===== impl Recorder =====

impl Recorder {
    pub(crate) fn record_data(&self, len: usize) {
        let shared = if let Some(ref shared) = self.shared {
            shared
        } else {
            return;
        };

        let mut locked = shared.lock().unwrap();

        #[cfg(feature = "runtime")]
        locked.update_last_read_at();

        // are we ready to send another bdp ping?
        // if not, we don't need to record bytes either

        if let Some(ref next_bdp_at) = locked.next_bdp_at {
            if Instant::now() < *next_bdp_at {
                return;
            } else {
                locked.next_bdp_at = None;
            }
        }

        if let Some(ref mut bytes) = locked.bytes {
            *bytes += len;
        } else {
            // no need to send bdp ping if bdp is disabled
            return;
        }

        if !locked.is_ping_sent() {
            locked.send_ping();
        }
    }

    pub(crate) fn record_non_data(&self) {
        #[cfg(feature = "runtime")]
        {
            let shared = if let Some(ref shared) = self.shared {
                shared
            } else {
                return;
            };

            let mut locked = shared.lock().unwrap();

            locked.update_last_read_at();
        }
    }

    /// If the incoming stream is already closed, convert self into
    /// a disabled reporter.
    #[cfg(feature = "client")]
    pub(super) fn for_stream(self, stream: &h2::RecvStream) -> Self {
        if stream.is_end_stream() {
            disabled()
        } else {
            self
        }
    }

    pub(super) fn ensure_not_timed_out(&self) -> crate::Result<()> {
        #[cfg(feature = "runtime")]
        {
            if let Some(ref shared) = self.shared {
                let locked = shared.lock().unwrap();
                if locked.is_keep_alive_timed_out {
                    return Err(KeepAliveTimedOut.crate_error());
                }
            }
        }

        // else
        Ok(())
    }
}

// ===== impl Ponger =====

impl Ponger {
    pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
        let now = Instant::now();
        let mut locked = self.shared.lock().unwrap();
        #[cfg(feature = "runtime")]
        let is_idle = self.is_idle();

        #[cfg(feature = "runtime")]
        {
            if let Some(ref mut ka) = self.keep_alive {
                ka.schedule(is_idle, &locked);
                ka.maybe_ping(cx, &mut locked);
            }
        }

        if !locked.is_ping_sent() {
            // XXX: this doesn't register a waker...?
            return Poll::Pending;
        }

        match locked.ping_pong.poll_pong(cx) {
            Poll::Ready(Ok(_pong)) => {
                let start = locked
                    .ping_sent_at
                    .expect("pong received implies ping_sent_at");
                locked.ping_sent_at = None;
                let rtt = now - start;
                trace!("recv pong");

                #[cfg(feature = "runtime")]
                {
                    if let Some(ref mut ka) = self.keep_alive {
                        locked.update_last_read_at();
                        ka.schedule(is_idle, &locked);
                    }
                }

                if let Some(ref mut bdp) =  self.bdp {
                    let bytes = locked.bytes.expect("bdp enabled implies bytes");
                    locked.bytes = Some(0); // reset
                    trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);

                    let update = bdp.calculate(bytes, rtt);
                    locked.next_bdp_at = Some(now + bdp.ping_delay);
                    if let Some(update) = update {
                        return Poll::Ready(Ponged::SizeUpdate(update))
                    }
                }
            }
            Poll::Ready(Err(e)) => {
                debug!("pong error: {}", e);
            }
            Poll::Pending => {
                #[cfg(feature = "runtime")]
                {
                    if let Some(ref mut ka) = self.keep_alive {
                        if let Err(KeepAliveTimedOut) = ka.maybe_timeout(cx) {
                            self.keep_alive = None;
                            locked.is_keep_alive_timed_out = true;
                            return Poll::Ready(Ponged::KeepAliveTimedOut);
                        }
                    }
                }
            }
        }

        // XXX: this doesn't register a waker...?
        Poll::Pending
    }

    #[cfg(feature = "runtime")]
    fn is_idle(&self) -> bool {
        Arc::strong_count(&self.shared) <= 2
    }
}

// ===== impl Shared =====

impl Shared {
    fn send_ping(&mut self) {
        match self.ping_pong.send_ping(Ping::opaque()) {
            Ok(()) => {
                self.ping_sent_at = Some(Instant::now());
                trace!("sent ping");
            }
            Err(err) => {
                debug!("error sending ping: {}", err);
            }
        }
    }

    fn is_ping_sent(&self) -> bool {
        self.ping_sent_at.is_some()
    }

    #[cfg(feature = "runtime")]
    fn update_last_read_at(&mut self) {
        if self.last_read_at.is_some() {
            self.last_read_at = Some(Instant::now());
        }
    }

    #[cfg(feature = "runtime")]
    fn last_read_at(&self) -> Instant {
        self.last_read_at.expect("keep_alive expects last_read_at")
    }
}

// ===== impl Bdp =====

/// Any higher than this likely will be hitting the TCP flow control.
const BDP_LIMIT: usize = 1024 * 1024 * 16;

impl Bdp {
    fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
        // No need to do any math if we're at the limit.
        if self.bdp as usize == BDP_LIMIT {
            self.stabilize_delay();
            return None;
        }

        // average the rtt
        let rtt = seconds(rtt);
        if self.rtt == 0.0 {
            // First sample means rtt is first rtt.
            self.rtt = rtt;
        } else {
            // Weigh this rtt as 1/8 for a moving average.
            self.rtt += (rtt - self.rtt) * 0.125;
        }

        // calculate the current bandwidth
        let bw = (bytes as f64) / (self.rtt * 1.5);
        trace!("current bandwidth = {:.1}B/s", bw);

        if bw < self.max_bandwidth {
            // not a faster bandwidth, so don't update
            self.stabilize_delay();
            return None;
        } else {
            self.max_bandwidth = bw;
        }

        // if the current `bytes` sample is at least 2/3 the previous
        // bdp, increase to double the current sample.
        if bytes >= self.bdp as usize * 2 / 3 {
            self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
            trace!("BDP increased to {}", self.bdp);

            self.stable_count = 0;
            self.ping_delay /= 2;
            Some(self.bdp)
        } else {
            self.stabilize_delay();
            None
        }
    }

    fn stabilize_delay(&mut self) {
        if self.ping_delay < Duration::from_secs(10) {
            self.stable_count += 1;

            if self.stable_count >= 2 {
                self.ping_delay *= 4;
                self.stable_count = 0;
            }
        }
    }
}

fn seconds(dur: Duration) -> f64 {
    const NANOS_PER_SEC: f64 = 1_000_000_000.0;
    let secs = dur.as_secs() as f64;
    secs + (dur.subsec_nanos() as f64) / NANOS_PER_SEC
}

// ===== impl KeepAlive =====

#[cfg(feature = "runtime")]
impl KeepAlive {
    fn schedule(&mut self, is_idle: bool, shared: &Shared) {
        match self.state {
            KeepAliveState::Init => {
                if !self.while_idle && is_idle {
                    return;
                }

                self.state = KeepAliveState::Scheduled;
                let interval = shared.last_read_at() + self.interval;
                self.timer.as_mut().reset(interval);
            }
            KeepAliveState::PingSent => {
                if shared.is_ping_sent() {
                    return;
                }

                self.state = KeepAliveState::Scheduled;
                let interval = shared.last_read_at() + self.interval;
                self.timer.as_mut().reset(interval);
            }
            KeepAliveState::Scheduled => (),
        }
    }

    fn maybe_ping(&mut self, cx: &mut task::Context<'_>, shared: &mut Shared) {
        match self.state {
            KeepAliveState::Scheduled => {
                if Pin::new(&mut self.timer).poll(cx).is_pending() {
                    return;
                }
                // check if we've received a frame while we were scheduled
                if shared.last_read_at() + self.interval > self.timer.deadline() {
                    self.state = KeepAliveState::Init;
                    cx.waker().wake_by_ref(); // schedule us again
                    return;
                }
                trace!("keep-alive interval ({:?}) reached", self.interval);
                shared.send_ping();
                self.state = KeepAliveState::PingSent;
                let timeout = Instant::now() + self.timeout;
                self.timer.as_mut().reset(timeout);
            }
            KeepAliveState::Init | KeepAliveState::PingSent => (),
        }
    }

    fn maybe_timeout(&mut self, cx: &mut task::Context<'_>) -> Result<(), KeepAliveTimedOut> {
        match self.state {
            KeepAliveState::PingSent => {
                if Pin::new(&mut self.timer).poll(cx).is_pending() {
                    return Ok(());
                }
                trace!("keep-alive timeout ({:?}) reached", self.timeout);
                Err(KeepAliveTimedOut)
            }
            KeepAliveState::Init | KeepAliveState::Scheduled => Ok(()),
        }
    }
}

// ===== impl KeepAliveTimedOut =====

#[cfg(feature = "runtime")]
impl KeepAliveTimedOut {
    pub(super) fn crate_error(self) -> crate::Error {
        crate::Error::new(crate::error::Kind::Http2).with(self)
    }
}

#[cfg(feature = "runtime")]
impl fmt::Display for KeepAliveTimedOut {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str("keep-alive timed out")
    }
}

#[cfg(feature = "runtime")]
impl std::error::Error for KeepAliveTimedOut {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        Some(&crate::error::TimedOut)
    }
}

[ Dauer der Verarbeitung: 0.27 Sekunden  (vorverarbeitet)  ]