Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/tokio/tests/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 22 kB image not shown  

Quelle  rt_unstable_metrics.rs   Sprache: unbekannt

 
#![allow(unknown_lints, unexpected_cfgs)]
#![warn(rust_2018_idioms)]
#![cfg(all(
    feature = "full",
    tokio_unstable,
    not(target_os = "wasi"),
    target_has_atomic = "64"
))]

use std::future::Future;
use std::sync::{Arc, Barrier, Mutex};
use std::task::Poll;
use std::thread;
use tokio::macros::support::poll_fn;

use tokio::runtime::Runtime;
use tokio::task::consume_budget;
use tokio::time::{self, Duration};

#[test]
fn num_workers() {
    let rt = current_thread();
    assert_eq!(1, rt.metrics().num_workers());

    let rt = threaded();
    assert_eq!(2, rt.metrics().num_workers());
}

#[test]
fn num_blocking_threads() {
    let rt = current_thread();
    assert_eq!(0, rt.metrics().num_blocking_threads());
    let _ = rt.block_on(rt.spawn_blocking(move || {}));
    assert_eq!(1, rt.metrics().num_blocking_threads());

    let rt = threaded();
    assert_eq!(0, rt.metrics().num_blocking_threads());
    let _ = rt.block_on(rt.spawn_blocking(move || {}));
    assert_eq!(1, rt.metrics().num_blocking_threads());
}

#[test]
fn num_idle_blocking_threads() {
    let rt = current_thread();
    assert_eq!(0, rt.metrics().num_idle_blocking_threads());
    let _ = rt.block_on(rt.spawn_blocking(move || {}));
    rt.block_on(async {
        time::sleep(Duration::from_millis(5)).await;
    });

    // We need to wait until the blocking thread has become idle. Usually 5ms is
    // enough for this to happen, but not always. When it isn't enough, sleep
    // for another second. We don't always wait for a whole second since we want
    // the test suite to finish quickly.
    //
    // Note that the timeout for idle threads to be killed is 10 seconds.
    if 0 == rt.metrics().num_idle_blocking_threads() {
        rt.block_on(async {
            time::sleep(Duration::from_secs(1)).await;
        });
    }

    assert_eq!(1, rt.metrics().num_idle_blocking_threads());
}

#[test]
fn blocking_queue_depth() {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .max_blocking_threads(1)
        .build()
        .unwrap();

    assert_eq!(0, rt.metrics().blocking_queue_depth());

    let ready = Arc::new(Mutex::new(()));
    let guard = ready.lock().unwrap();

    let ready_cloned = ready.clone();
    let wait_until_ready = move || {
        let _unused = ready_cloned.lock().unwrap();
    };

    let h1 = rt.spawn_blocking(wait_until_ready.clone());
    let h2 = rt.spawn_blocking(wait_until_ready);
    assert!(rt.metrics().blocking_queue_depth() > 0);

    drop(guard);

    let _ = rt.block_on(h1);
    let _ = rt.block_on(h2);

    assert_eq!(0, rt.metrics().blocking_queue_depth());
}

#[test]
fn spawned_tasks_count() {
    let rt = current_thread();
    let metrics = rt.metrics();
    assert_eq!(0, metrics.spawned_tasks_count());

    rt.block_on(rt.spawn(async move {
        assert_eq!(1, metrics.spawned_tasks_count());
    }))
    .unwrap();

    assert_eq!(1, rt.metrics().spawned_tasks_count());

    let rt = threaded();
    let metrics = rt.metrics();
    assert_eq!(0, metrics.spawned_tasks_count());

    rt.block_on(rt.spawn(async move {
        assert_eq!(1, metrics.spawned_tasks_count());
    }))
    .unwrap();

    assert_eq!(1, rt.metrics().spawned_tasks_count());
}

#[test]
fn remote_schedule_count() {
    use std::thread;

    let rt = current_thread();
    let handle = rt.handle().clone();
    let task = thread::spawn(move || {
        handle.spawn(async {
            // DO nothing
        })
    })
    .join()
    .unwrap();

    rt.block_on(task).unwrap();

    assert_eq!(1, rt.metrics().remote_schedule_count());

    let rt = threaded();
    let handle = rt.handle().clone();
    let task = thread::spawn(move || {
        handle.spawn(async {
            // DO nothing
        })
    })
    .join()
    .unwrap();

    rt.block_on(task).unwrap();

    assert_eq!(1, rt.metrics().remote_schedule_count());
}

#[test]
fn worker_thread_id_current_thread() {
    let rt = current_thread();
    let metrics = rt.metrics();

    // Check that runtime is on this thread.
    rt.block_on(async {});
    assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));

    // Move runtime to another thread.
    let thread_id = std::thread::scope(|scope| {
        let join_handle = scope.spawn(|| {
            rt.block_on(async {});
        });
        join_handle.thread().id()
    });
    assert_eq!(Some(thread_id), metrics.worker_thread_id(0));

    // Move runtime back to this thread.
    rt.block_on(async {});
    assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));
}

#[test]
fn worker_thread_id_threaded() {
    let rt = threaded();
    let metrics = rt.metrics();

    rt.block_on(rt.spawn(async move {
        // Check that we are running on a worker thread and determine
        // the index of our worker.
        let thread_id = std::thread::current().id();
        let this_worker = (0..2)
            .position(|w| metrics.worker_thread_id(w) == Some(thread_id))
            .expect("task not running on any worker thread");

        // Force worker to another thread.
        let moved_thread_id = tokio::task::block_in_place(|| {
            assert_eq!(thread_id, std::thread::current().id());

            // Wait for worker to move to another thread.
            for _ in 0..100 {
                let new_id = metrics.worker_thread_id(this_worker).unwrap();
                if thread_id != new_id {
                    return new_id;
                }
                std::thread::sleep(Duration::from_millis(100));
            }

            panic!("worker did not move to new thread");
        });

        // After blocking task worker either stays on new thread or
        // is moved back to current thread.
        assert!(
            metrics.worker_thread_id(this_worker) == Some(moved_thread_id)
                || metrics.worker_thread_id(this_worker) == Some(thread_id)
        );
    }))
    .unwrap()
}

#[test]
fn worker_park_count() {
    let rt = current_thread();
    let metrics = rt.metrics();
    rt.block_on(async {
        time::sleep(Duration::from_millis(1)).await;
    });
    drop(rt);
    assert!(1 <= metrics.worker_park_count(0));

    let rt = threaded();
    let metrics = rt.metrics();
    rt.block_on(async {
        time::sleep(Duration::from_millis(1)).await;
    });
    drop(rt);
    assert!(1 <= metrics.worker_park_count(0));
    assert!(1 <= metrics.worker_park_count(1));
}

#[test]
fn worker_park_unpark_count() {
    let rt = current_thread();
    let metrics = rt.metrics();
    rt.block_on(rt.spawn(async {})).unwrap();
    drop(rt);
    assert!(2 <= metrics.worker_park_unpark_count(0));

    let rt = threaded();
    let metrics = rt.metrics();

    // Wait for workers to be parked after runtime startup.
    for _ in 0..100 {
        if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) {
            break;
        }
        std::thread::sleep(std::time::Duration::from_millis(100));
    }
    assert_eq!(1, metrics.worker_park_unpark_count(0));
    assert_eq!(1, metrics.worker_park_unpark_count(1));

    // Spawn a task to unpark and then park a worker.
    rt.block_on(rt.spawn(async {})).unwrap();
    for _ in 0..100 {
        if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) {
            break;
        }
        std::thread::sleep(std::time::Duration::from_millis(100));
    }
    assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1));

    // Both threads unpark for runtime shutdown.
    drop(rt);
    assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
    assert_eq!(0, metrics.worker_park_unpark_count(1) % 2);
    assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));
}

#[test]
fn worker_noop_count() {
    // There isn't really a great way to generate no-op parks as they happen as
    // false-positive events under concurrency.

    let rt = current_thread();
    let metrics = rt.metrics();
    rt.block_on(async {
        time::sleep(Duration::from_millis(1)).await;
    });
    drop(rt);
    assert!(0 < metrics.worker_noop_count(0));

    let rt = threaded();
    let metrics = rt.metrics();
    rt.block_on(async {
        time::sleep(Duration::from_millis(1)).await;
    });
    drop(rt);
    assert!(0 < metrics.worker_noop_count(0));
    assert!(0 < metrics.worker_noop_count(1));
}

#[test]
#[ignore] // this test is flaky, see https://github.com/tokio-rs/tokio/issues/6470
fn worker_steal_count() {
    // This metric only applies to the multi-threaded runtime.
    //
    // We use a blocking channel to backup one worker thread.
    use std::sync::mpsc::channel;

    let rt = threaded_no_lifo();
    let metrics = rt.metrics();

    rt.block_on(async {
        let (tx, rx) = channel();

        // Move to the runtime.
        tokio::spawn(async move {
            // Spawn the task that sends to the channel
            //
            // Since the lifo slot is disabled, this task is stealable.
            tokio::spawn(async move {
                tx.send(()).unwrap();
            });

            // Blocking receive on the channel.
            rx.recv().unwrap();
        })
        .await
        .unwrap();
    });

    drop(rt);

    let n: u64 = (0..metrics.num_workers())
        .map(|i| metrics.worker_steal_count(i))
        .sum();

    assert_eq!(1, n);
}

#[test]
fn worker_poll_count_and_time() {
    const N: u64 = 5;

    async fn task() {
        // Sync sleep
        std::thread::sleep(std::time::Duration::from_micros(10));
    }

    let rt = current_thread();
    let metrics = rt.metrics();
    rt.block_on(async {
        for _ in 0..N {
            tokio::spawn(task()).await.unwrap();
        }
    });
    drop(rt);
    assert_eq!(N, metrics.worker_poll_count(0));
    // Not currently supported for current-thread runtime
    assert_eq!(Duration::default(), metrics.worker_mean_poll_time(0));

    // Does not populate the histogram
    assert!(!metrics.poll_count_histogram_enabled());
    for i in 0..10 {
        assert_eq!(0, metrics.poll_count_histogram_bucket_count(0, i));
    }

    let rt = threaded();
    let metrics = rt.metrics();
    rt.block_on(async {
        for _ in 0..N {
            tokio::spawn(task()).await.unwrap();
        }
    });
    drop(rt);
    // Account for the `block_on` task
    let n = (0..metrics.num_workers())
        .map(|i| metrics.worker_poll_count(i))
        .sum();

    assert_eq!(N, n);

    let n: Duration = (0..metrics.num_workers())
        .map(|i| metrics.worker_mean_poll_time(i))
        .sum();

    assert!(n > Duration::default());

    // Does not populate the histogram
    assert!(!metrics.poll_count_histogram_enabled());
    for n in 0..metrics.num_workers() {
        for i in 0..10 {
            assert_eq!(0, metrics.poll_count_histogram_bucket_count(n, i));
        }
    }
}

#[test]
fn worker_poll_count_histogram() {
    const N: u64 = 5;

    let rts = [
        tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .enable_metrics_poll_count_histogram()
            .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
            .metrics_poll_count_histogram_buckets(3)
            .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
            .build()
            .unwrap(),
        tokio::runtime::Builder::new_multi_thread()
            .worker_threads(2)
            .enable_all()
            .enable_metrics_poll_count_histogram()
            .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
            .metrics_poll_count_histogram_buckets(3)
            .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
            .build()
            .unwrap(),
    ];

    for rt in rts {
        let metrics = rt.metrics();
        rt.block_on(async {
            for _ in 0..N {
                tokio::spawn(async {}).await.unwrap();
            }
        });
        drop(rt);

        let num_workers = metrics.num_workers();
        let num_buckets = metrics.poll_count_histogram_num_buckets();

        assert!(metrics.poll_count_histogram_enabled());
        assert_eq!(num_buckets, 3);

        let n = (0..num_workers)
            .flat_map(|i| (0..num_buckets).map(move |j| (i, j)))
            .map(|(worker, bucket)| metrics.poll_count_histogram_bucket_count(worker, bucket))
            .sum();
        assert_eq!(N, n);
    }
}

#[test]
fn worker_poll_count_histogram_range() {
    let max = Duration::from_nanos(u64::MAX);

    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .enable_metrics_poll_count_histogram()
        .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
        .metrics_poll_count_histogram_buckets(3)
        .metrics_poll_count_histogram_resolution(us(50))
        .build()
        .unwrap();
    let metrics = rt.metrics();

    assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..us(50));
    assert_eq!(
        metrics.poll_count_histogram_bucket_range(1),
        us(50)..us(100)
    );
    assert_eq!(metrics.poll_count_histogram_bucket_range(2), us(100)..max);

    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .enable_metrics_poll_count_histogram()
        .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Log)
        .metrics_poll_count_histogram_buckets(3)
        .metrics_poll_count_histogram_resolution(us(50))
        .build()
        .unwrap();
    let metrics = rt.metrics();

    let a = Duration::from_nanos(50000_u64.next_power_of_two());
    let b = a * 2;

    assert_eq!(metrics.poll_count_histogram_bucket_range(0), us(0)..a);
    assert_eq!(metrics.poll_count_histogram_bucket_range(1), a..b);
    assert_eq!(metrics.poll_count_histogram_bucket_range(2), b..max);
}

#[test]
fn worker_poll_count_histogram_disabled_without_explicit_enable() {
    let rts = [
        tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
            .metrics_poll_count_histogram_buckets(3)
            .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
            .build()
            .unwrap(),
        tokio::runtime::Builder::new_multi_thread()
            .worker_threads(2)
            .enable_all()
            .metrics_poll_count_histogram_scale(tokio::runtime::HistogramScale::Linear)
            .metrics_poll_count_histogram_buckets(3)
            .metrics_poll_count_histogram_resolution(Duration::from_millis(50))
            .build()
            .unwrap(),
    ];

    for rt in rts {
        let metrics = rt.metrics();
        assert!(!metrics.poll_count_histogram_enabled());
    }
}

#[test]
fn worker_total_busy_duration() {
    const N: usize = 5;

    let zero = Duration::from_millis(0);

    let rt = current_thread();
    let metrics = rt.metrics();

    rt.block_on(async {
        for _ in 0..N {
            tokio::spawn(async {
                tokio::task::yield_now().await;
            })
            .await
            .unwrap();
        }
    });

    drop(rt);

    assert!(zero < metrics.worker_total_busy_duration(0));

    let rt = threaded();
    let metrics = rt.metrics();

    rt.block_on(async {
        for _ in 0..N {
            tokio::spawn(async {
                tokio::task::yield_now().await;
            })
            .await
            .unwrap();
        }
    });

    drop(rt);

    for i in 0..metrics.num_workers() {
        assert!(zero < metrics.worker_total_busy_duration(i));
    }
}

#[test]
fn worker_local_schedule_count() {
    let rt = current_thread();
    let metrics = rt.metrics();
    rt.block_on(async {
        tokio::spawn(async {}).await.unwrap();
    });
    drop(rt);

    assert_eq!(1, metrics.worker_local_schedule_count(0));
    assert_eq!(0, metrics.remote_schedule_count());

    let rt = threaded();
    let metrics = rt.metrics();
    rt.block_on(async {
        // Move to the runtime
        tokio::spawn(async {
            tokio::spawn(async {}).await.unwrap();
        })
        .await
        .unwrap();
    });
    drop(rt);

    let n: u64 = (0..metrics.num_workers())
        .map(|i| metrics.worker_local_schedule_count(i))
        .sum();

    assert_eq!(2, n);
    assert_eq!(1, metrics.remote_schedule_count());
}

#[test]
fn worker_overflow_count() {
    // Only applies to the threaded worker
    let rt = threaded();
    let metrics = rt.metrics();
    rt.block_on(async {
        // Move to the runtime
        tokio::spawn(async {
            let (tx1, rx1) = std::sync::mpsc::channel();
            let (tx2, rx2) = std::sync::mpsc::channel();

            // First, we need to block the other worker until all tasks have
            // been spawned.
            //
            // We spawn from outside the runtime to ensure that the other worker
            // will pick it up:
            // <https://github.com/tokio-rs/tokio/issues/4730>
            tokio::task::spawn_blocking(|| {
                tokio::spawn(async move {
                    tx1.send(()).unwrap();
                    rx2.recv().unwrap();
                });
            });

            rx1.recv().unwrap();

            // Spawn many tasks
            for _ in 0..300 {
                tokio::spawn(async {});
            }

            tx2.send(()).unwrap();
        })
        .await
        .unwrap();
    });
    drop(rt);

    let n: u64 = (0..metrics.num_workers())
        .map(|i| metrics.worker_overflow_count(i))
        .sum();

    assert_eq!(1, n);
}

#[test]
fn injection_queue_depth_current_thread() {
    use std::thread;

    let rt = current_thread();
    let handle = rt.handle().clone();
    let metrics = rt.metrics();

    thread::spawn(move || {
        handle.spawn(async {});
    })
    .join()
    .unwrap();

    assert_eq!(1, metrics.injection_queue_depth());
}

#[test]
fn injection_queue_depth_multi_thread() {
    let rt = threaded();
    let metrics = rt.metrics();

    let barrier1 = Arc::new(Barrier::new(3));
    let barrier2 = Arc::new(Barrier::new(3));

    // Spawn a task per runtime worker to block it.
    for _ in 0..2 {
        let barrier1 = barrier1.clone();
        let barrier2 = barrier2.clone();
        rt.spawn(async move {
            barrier1.wait();
            barrier2.wait();
        });
    }

    barrier1.wait();

    for i in 0..10 {
        assert_eq!(i, metrics.injection_queue_depth());
        rt.spawn(async {});
    }

    barrier2.wait();
}

#[test]
fn worker_local_queue_depth() {
    const N: usize = 100;

    let rt = current_thread();
    let metrics = rt.metrics();
    rt.block_on(async {
        for _ in 0..N {
            tokio::spawn(async {});
        }

        assert_eq!(N, metrics.worker_local_queue_depth(0));
    });

    let rt = threaded();
    let metrics = rt.metrics();
    rt.block_on(async move {
        // Move to the runtime
        tokio::spawn(async move {
            let (tx1, rx1) = std::sync::mpsc::channel();
            let (tx2, rx2) = std::sync::mpsc::channel();

            // First, we need to block the other worker until all tasks have
            // been spawned.
            tokio::spawn(async move {
                tx1.send(()).unwrap();
                rx2.recv().unwrap();
            });

            // Bump the next-run spawn
            tokio::spawn(async {});

            rx1.recv().unwrap();

            // Spawn some tasks
            for _ in 0..100 {
                tokio::spawn(async {});
            }

            let n: usize = (0..metrics.num_workers())
                .map(|i| metrics.worker_local_queue_depth(i))
                .sum();

            assert_eq!(n, N);

            tx2.send(()).unwrap();
        })
        .await
        .unwrap();
    });
}

#[test]
fn budget_exhaustion_yield() {
    let rt = current_thread();
    let metrics = rt.metrics();

    assert_eq!(0, metrics.budget_forced_yield_count());

    let mut did_yield = false;

    // block on a task which consumes budget until it yields
    rt.block_on(poll_fn(|cx| loop {
        if did_yield {
            return Poll::Ready(());
        }

        let fut = consume_budget();
        tokio::pin!(fut);

        if fut.poll(cx).is_pending() {
            did_yield = true;
            return Poll::Pending;
        }
    }));

    assert_eq!(1, rt.metrics().budget_forced_yield_count());
}

#[test]
fn budget_exhaustion_yield_with_joins() {
    let rt = current_thread();
    let metrics = rt.metrics();

    assert_eq!(0, metrics.budget_forced_yield_count());

    let mut did_yield_1 = false;
    let mut did_yield_2 = false;

    // block on a task which consumes budget until it yields
    rt.block_on(async {
        tokio::join!(
            poll_fn(|cx| loop {
                if did_yield_1 {
                    return Poll::Ready(());
                }

                let fut = consume_budget();
                tokio::pin!(fut);

                if fut.poll(cx).is_pending() {
                    did_yield_1 = true;
                    return Poll::Pending;
                }
            }),
            poll_fn(|cx| loop {
                if did_yield_2 {
                    return Poll::Ready(());
                }

                let fut = consume_budget();
                tokio::pin!(fut);

                if fut.poll(cx).is_pending() {
                    did_yield_2 = true;
                    return Poll::Pending;
                }
            })
        )
    });

    assert_eq!(1, rt.metrics().budget_forced_yield_count());
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_fd_count() {
    let rt = current_thread();
    let metrics = rt.metrics();

    assert_eq!(metrics.io_driver_fd_registered_count(), 0);

    let stream = tokio::net::TcpStream::connect("google.com:80");
    let stream = rt.block_on(async move { stream.await.unwrap() });

    assert_eq!(metrics.io_driver_fd_registered_count(), 1);
    assert_eq!(metrics.io_driver_fd_deregistered_count(), 0);

    drop(stream);

    assert_eq!(metrics.io_driver_fd_deregistered_count(), 1);
    assert_eq!(metrics.io_driver_fd_registered_count(), 1);
}

#[cfg(any(target_os = "linux", target_os = "macos"))]
#[test]
fn io_driver_ready_count() {
    let rt = current_thread();
    let metrics = rt.metrics();

    let stream = tokio::net::TcpStream::connect("google.com:80");
    let _stream = rt.block_on(async move { stream.await.unwrap() });

    assert_eq!(metrics.io_driver_ready_count(), 1);
}

fn current_thread() -> Runtime {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
}

fn threaded() -> Runtime {
    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(2)
        .enable_all()
        .build()
        .unwrap()
}

fn threaded_no_lifo() -> Runtime {
    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(2)
        .disable_lifo_slot()
        .enable_all()
        .build()
        .unwrap()
}

fn us(n: u64) -> Duration {
    Duration::from_micros(n)
}

[ Dauer der Verarbeitung: 0.29 Sekunden  (vorverarbeitet)  ]