Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  task_local_set.rs   Sprache: unbekannt

 
#![allow(unknown_lints, unexpected_cfgs)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use futures::{
    future::{pending, ready},
    FutureExt,
};

use tokio::runtime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::{self, LocalSet};
use tokio::time;

#[cfg(not(target_os = "wasi"))]
use std::cell::Cell;
use std::sync::atomic::AtomicBool;
#[cfg(not(target_os = "wasi"))]
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
#[cfg(not(target_os = "wasi"))]
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;

#[tokio::test(flavor = "current_thread")]
async fn local_current_thread_scheduler() {
    LocalSet::new()
        .run_until(async {
            task::spawn_local(async {}).await.unwrap();
        })
        .await;
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool() {
    thread_local! {
        static ON_RT_THREAD: Cell<bool> = const { Cell::new(false) };
    }

    ON_RT_THREAD.with(|cell| cell.set(true));

    LocalSet::new()
        .run_until(async {
            assert!(ON_RT_THREAD.with(|cell| cell.get()));
            task::spawn_local(async {
                assert!(ON_RT_THREAD.with(|cell| cell.get()));
            })
            .await
            .unwrap();
        })
        .await;
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_future_threadpool() {
    thread_local! {
        static ON_LOCAL_THREAD: Cell<bool> = const { Cell::new(false) };
    }

    ON_LOCAL_THREAD.with(|cell| cell.set(true));

    let local = LocalSet::new();
    local.spawn_local(async move {
        assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
    });
    local.await;
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_future_timers() {
    static RAN1: AtomicBool = AtomicBool::new(false);
    static RAN2: AtomicBool = AtomicBool::new(false);

    let local = LocalSet::new();
    local.spawn_local(async move {
        time::sleep(Duration::from_millis(5)).await;
        RAN1.store(true, Ordering::SeqCst);
    });
    local.spawn_local(async move {
        time::sleep(Duration::from_millis(10)).await;
        RAN2.store(true, Ordering::SeqCst);
    });
    local.await;
    assert!(RAN1.load(Ordering::SeqCst));
    assert!(RAN2.load(Ordering::SeqCst));
}

#[tokio::test]
async fn localset_future_drives_all_local_futs() {
    static RAN1: AtomicBool = AtomicBool::new(false);
    static RAN2: AtomicBool = AtomicBool::new(false);
    static RAN3: AtomicBool = AtomicBool::new(false);

    let local = LocalSet::new();
    local.spawn_local(async move {
        task::spawn_local(async {
            task::yield_now().await;
            RAN3.store(true, Ordering::SeqCst);
        });
        task::yield_now().await;
        RAN1.store(true, Ordering::SeqCst);
    });
    local.spawn_local(async move {
        task::yield_now().await;
        RAN2.store(true, Ordering::SeqCst);
    });
    local.await;
    assert!(RAN1.load(Ordering::SeqCst));
    assert!(RAN2.load(Ordering::SeqCst));
    assert!(RAN3.load(Ordering::SeqCst));
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool_timer() {
    // This test ensures that runtime services like the timer are properly
    // set for the local task set.
    thread_local! {
        static ON_RT_THREAD: Cell<bool> = const { Cell::new(false) };
    }

    ON_RT_THREAD.with(|cell| cell.set(true));

    LocalSet::new()
        .run_until(async {
            assert!(ON_RT_THREAD.with(|cell| cell.get()));
            let join = task::spawn_local(async move {
                assert!(ON_RT_THREAD.with(|cell| cell.get()));
                time::sleep(Duration::from_millis(10)).await;
                assert!(ON_RT_THREAD.with(|cell| cell.get()));
            });
            join.await.unwrap();
        })
        .await;
}
#[test]
fn enter_guard_spawn() {
    let local = LocalSet::new();
    let _guard = local.enter();
    // Run the local task set.

    let join = task::spawn_local(async { true });
    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    local.block_on(&rt, async move {
        assert!(join.await.unwrap());
    });
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
#[test]
// This will panic, since the thread that calls `block_on` cannot use
// in-place blocking inside of `block_on`.
#[should_panic]
fn local_threadpool_blocking_in_place() {
    thread_local! {
        static ON_RT_THREAD: Cell<bool> = const { Cell::new(false) };
    }

    ON_RT_THREAD.with(|cell| cell.set(true));

    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    LocalSet::new().block_on(&rt, async {
        assert!(ON_RT_THREAD.with(|cell| cell.get()));
        let join = task::spawn_local(async move {
            assert!(ON_RT_THREAD.with(|cell| cell.get()));
            task::block_in_place(|| {});
            assert!(ON_RT_THREAD.with(|cell| cell.get()));
        });
        join.await.unwrap();
    });
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool_blocking_run() {
    thread_local! {
        static ON_RT_THREAD: Cell<bool> = const { Cell::new(false) };
    }

    ON_RT_THREAD.with(|cell| cell.set(true));

    LocalSet::new()
        .run_until(async {
            assert!(ON_RT_THREAD.with(|cell| cell.get()));
            let join = task::spawn_local(async move {
                assert!(ON_RT_THREAD.with(|cell| cell.get()));
                task::spawn_blocking(|| {
                    assert!(
                        !ON_RT_THREAD.with(|cell| cell.get()),
                        "blocking must not run on the local task set's thread"
                    );
                })
                .await
                .unwrap();
                assert!(ON_RT_THREAD.with(|cell| cell.get()));
            });
            join.await.unwrap();
        })
        .await;
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn all_spawns_are_local() {
    use futures::future;
    thread_local! {
        static ON_RT_THREAD: Cell<bool> = const { Cell::new(false) };
    }

    ON_RT_THREAD.with(|cell| cell.set(true));

    LocalSet::new()
        .run_until(async {
            assert!(ON_RT_THREAD.with(|cell| cell.get()));
            let handles = (0..128)
                .map(|_| {
                    task::spawn_local(async {
                        assert!(ON_RT_THREAD.with(|cell| cell.get()));
                    })
                })
                .collect::<Vec<_>>();
            for joined in future::join_all(handles).await {
                joined.unwrap();
            }
        })
        .await;
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn nested_spawn_is_local() {
    thread_local! {
        static ON_RT_THREAD: Cell<bool> = const { Cell::new(false) };
    }

    ON_RT_THREAD.with(|cell| cell.set(true));

    LocalSet::new()
        .run_until(async {
            assert!(ON_RT_THREAD.with(|cell| cell.get()));
            task::spawn_local(async {
                assert!(ON_RT_THREAD.with(|cell| cell.get()));
                task::spawn_local(async {
                    assert!(ON_RT_THREAD.with(|cell| cell.get()));
                    task::spawn_local(async {
                        assert!(ON_RT_THREAD.with(|cell| cell.get()));
                        task::spawn_local(async {
                            assert!(ON_RT_THREAD.with(|cell| cell.get()));
                        })
                        .await
                        .unwrap();
                    })
                    .await
                    .unwrap();
                })
                .await
                .unwrap();
            })
            .await
            .unwrap();
        })
        .await;
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[test]
fn join_local_future_elsewhere() {
    thread_local! {
        static ON_RT_THREAD: Cell<bool> = const { Cell::new(false) };
    }

    ON_RT_THREAD.with(|cell| cell.set(true));

    let rt = runtime::Runtime::new().unwrap();
    let local = LocalSet::new();
    local.block_on(&rt, async move {
        let (tx, rx) = oneshot::channel();
        let join = task::spawn_local(async move {
            assert!(
                ON_RT_THREAD.with(|cell| cell.get()),
                "local task must run on local thread, no matter where it is awaited"
            );
            rx.await.unwrap();

            "hello world"
        });
        let join2 = task::spawn(async move {
            assert!(
                !ON_RT_THREAD.with(|cell| cell.get()),
                "spawned task should be on a worker"
            );

            tx.send(()).expect("task shouldn't have ended yet");

            join.await.expect("task should complete successfully");
        });
        join2.await.unwrap()
    });
}

// Tests for <https://github.com/tokio-rs/tokio/issues/4973>
#[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_in_thread_local() {
    thread_local! {
        static LOCAL_SET: LocalSet = LocalSet::new();
    }

    // holds runtime thread until end of main fn.
    let (_tx, rx) = oneshot::channel::<()>();
    let handle = tokio::runtime::Handle::current();

    std::thread::spawn(move || {
        LOCAL_SET.with(|local_set| {
            handle.block_on(local_set.run_until(async move {
                let _ = rx.await;
            }))
        });
    });
}

#[test]
fn drop_cancels_tasks() {
    use std::rc::Rc;

    // This test reproduces issue #1842
    let rt = rt();
    let rc1 = Rc::new(());
    let rc2 = rc1.clone();

    let (started_tx, started_rx) = oneshot::channel();

    let local = LocalSet::new();
    local.spawn_local(async move {
        // Move this in
        let _rc2 = rc2;

        started_tx.send(()).unwrap();
        futures::future::pending::<()>().await;
    });

    local.block_on(&rt, async {
        started_rx.await.unwrap();
    });
    drop(local);
    drop(rt);

    assert_eq!(1, Rc::strong_count(&rc1));
}

/// Runs a test function in a separate thread, and panics if the test does not
/// complete within the specified timeout, or if the test function panics.
///
/// This is intended for running tests whose failure mode is a hang or infinite
/// loop that cannot be detected otherwise.
fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
    use std::sync::mpsc::RecvTimeoutError;

    let (done_tx, done_rx) = std::sync::mpsc::channel();
    let thread = std::thread::spawn(move || {
        f();

        // Send a message on the channel so that the test thread can
        // determine if we have entered an infinite loop:
        done_tx.send(()).unwrap();
    });

    // Since the failure mode of this test is an infinite loop, rather than
    // something we can easily make assertions about, we'll run it in a
    // thread. When the test thread finishes, it will send a message on a
    // channel to this thread. We'll wait for that message with a fairly
    // generous timeout, and if we don't receive it, we assume the test
    // thread has hung.
    //
    // Note that it should definitely complete in under a minute, but just
    // in case CI is slow, we'll give it a long timeout.
    match done_rx.recv_timeout(timeout) {
        Err(RecvTimeoutError::Timeout) => panic!(
            "test did not complete within {:?} seconds, \
             we have (probably) entered an infinite loop!",
            timeout,
        ),
        // Did the test thread panic? We'll find out for sure when we `join`
        // with it.
        Err(RecvTimeoutError::Disconnected) => {}
        // Test completed successfully!
        Ok(()) => {}
    }

    thread.join().expect("test thread should not panic!")
}

#[cfg_attr(
    target_os = "wasi",
    ignore = "`unwrap()` in `with_timeout()` panics on Wasi"
)]
#[test]
fn drop_cancels_remote_tasks() {
    // This test reproduces issue #1885.
    with_timeout(Duration::from_secs(60), || {
        let (tx, mut rx) = mpsc::channel::<()>(1024);

        let rt = rt();

        let local = LocalSet::new();
        local.spawn_local(async move { while rx.recv().await.is_some() {} });
        local.block_on(&rt, async {
            time::sleep(Duration::from_millis(1)).await;
        });

        drop(tx);

        // This enters an infinite loop if the remote notified tasks are not
        // properly cancelled.
        drop(local);
    });
}

#[cfg_attr(
    target_os = "wasi",
    ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi"
)]
#[test]
fn local_tasks_wake_join_all() {
    // This test reproduces issue #2460.
    with_timeout(Duration::from_secs(60), || {
        use futures::future::join_all;
        use tokio::task::LocalSet;

        let rt = rt();
        let set = LocalSet::new();
        let mut handles = Vec::new();

        for _ in 1..=128 {
            handles.push(set.spawn_local(async move {
                tokio::task::spawn_local(async move {}).await.unwrap();
            }));
        }

        rt.block_on(set.run_until(join_all(handles)));
    });
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
#[test]
fn local_tasks_are_polled_after_tick() {
    // This test depends on timing, so we run it up to five times.
    for _ in 0..4 {
        let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner);
        if res.is_ok() {
            // success
            return;
        }
    }

    // Test failed 4 times. Try one more time without catching panics. If it
    // fails again, the test fails.
    local_tasks_are_polled_after_tick_inner();
}

#[cfg(not(target_os = "wasi"))] // Wasi doesn't support panic recovery
#[tokio::main(flavor = "current_thread")]
async fn local_tasks_are_polled_after_tick_inner() {
    // Reproduces issues #1899 and #1900

    static RX1: AtomicUsize = AtomicUsize::new(0);
    static RX2: AtomicUsize = AtomicUsize::new(0);
    const EXPECTED: usize = 500;

    RX1.store(0, SeqCst);
    RX2.store(0, SeqCst);

    let (tx, mut rx) = mpsc::unbounded_channel();

    let local = LocalSet::new();

    local
        .run_until(async {
            let task2 = task::spawn(async move {
                // Wait a bit
                time::sleep(Duration::from_millis(10)).await;

                let mut oneshots = Vec::with_capacity(EXPECTED);

                // Send values
                for _ in 0..EXPECTED {
                    let (oneshot_tx, oneshot_rx) = oneshot::channel();
                    oneshots.push(oneshot_tx);
                    tx.send(oneshot_rx).unwrap();
                }

                time::sleep(Duration::from_millis(10)).await;

                for tx in oneshots.drain(..) {
                    tx.send(()).unwrap();
                }

                loop {
                    time::sleep(Duration::from_millis(20)).await;
                    let rx1 = RX1.load(SeqCst);
                    let rx2 = RX2.load(SeqCst);

                    if rx1 == EXPECTED && rx2 == EXPECTED {
                        break;
                    }
                }
            });

            while let Some(oneshot) = rx.recv().await {
                RX1.fetch_add(1, SeqCst);

                task::spawn_local(async move {
                    oneshot.await.unwrap();
                    RX2.fetch_add(1, SeqCst);
                });
            }

            task2.await.unwrap();
        })
        .await;
}

#[tokio::test]
async fn acquire_mutex_in_drop() {
    use futures::future::pending;

    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();
    let local = LocalSet::new();

    local.spawn_local(async move {
        let _ = rx2.await;
        unreachable!();
    });

    local.spawn_local(async move {
        let _ = rx1.await;
        tx2.send(()).unwrap();
        unreachable!();
    });

    // Spawn a task that will never notify
    local.spawn_local(async move {
        pending::<()>().await;
        tx1.send(()).unwrap();
    });

    // Tick the loop
    local
        .run_until(async {
            task::yield_now().await;
        })
        .await;

    // Drop the LocalSet
    drop(local);
}

#[tokio::test]
async fn spawn_wakes_localset() {
    let local = LocalSet::new();
    futures::select! {
        _ = local.run_until(pending::<()>()).fuse() => unreachable!(),
        ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret
    }
}

/// Checks that the task wakes up with `enter`.
/// Reproduces <https://github.com/tokio-rs/tokio/issues/5020>.
#[tokio::test]
async fn sleep_with_local_enter_guard() {
    let local = LocalSet::new();
    let _guard = local.enter();

    let (tx, rx) = oneshot::channel();

    local
        .run_until(async move {
            tokio::task::spawn_local(async move {
                time::sleep(Duration::ZERO).await;

                tx.send(()).expect("failed to send");
            });
            assert_eq!(rx.await, Ok(()));
        })
        .await;
}

#[test]
fn store_local_set_in_thread_local_with_runtime() {
    use tokio::runtime::Runtime;

    thread_local! {
        static CURRENT: RtAndLocalSet = RtAndLocalSet::new();
    }

    struct RtAndLocalSet {
        rt: Runtime,
        local: LocalSet,
    }

    impl RtAndLocalSet {
        fn new() -> RtAndLocalSet {
            RtAndLocalSet {
                rt: tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                    .unwrap(),
                local: LocalSet::new(),
            }
        }

        async fn inner_method(&self) {
            self.local
                .run_until(async move {
                    tokio::task::spawn_local(async {});
                })
                .await
        }

        fn method(&self) {
            self.rt.block_on(self.inner_method());
        }
    }

    CURRENT.with(|f| {
        f.method();
    });
}

#[cfg(tokio_unstable)]
mod unstable {
    use tokio::runtime::UnhandledPanic;
    use tokio::task::LocalSet;

    #[tokio::test]
    #[should_panic(
        expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"
    )]
    async fn shutdown_on_panic() {
        LocalSet::new()
            .unhandled_panic(UnhandledPanic::ShutdownRuntime)
            .run_until(async {
                tokio::task::spawn_local(async {
                    panic!("boom");
                });

                futures::future::pending::<()>().await;
            })
            .await;
    }

    // This test compares that, when the task driving `run_until` has already
    // consumed budget, the `run_until` future has less budget than a "spawned"
    // task.
    //
    // "Budget" is a fuzzy metric as the Tokio runtime is able to change values
    // internally. This is why the test uses indirection to test this.
    #[tokio::test]
    async fn run_until_does_not_get_own_budget() {
        // Consume some budget
        tokio::task::consume_budget().await;

        LocalSet::new()
            .run_until(async {
                let spawned = tokio::spawn(async {
                    let mut spawned_n = 0;

                    {
                        let mut spawned = tokio_test::task::spawn(async {
                            loop {
                                spawned_n += 1;
                                tokio::task::consume_budget().await;
                            }
                        });
                        // Poll once
                        assert!(!spawned.poll().is_ready());
                    }

                    spawned_n
                });

                let mut run_until_n = 0;
                {
                    let mut run_until = tokio_test::task::spawn(async {
                        loop {
                            run_until_n += 1;
                            tokio::task::consume_budget().await;
                        }
                    });
                    // Poll once
                    assert!(!run_until.poll().is_ready());
                }

                let spawned_n = spawned.await.unwrap();
                assert_ne!(spawned_n, 0);
                assert_ne!(run_until_n, 0);
                assert!(spawned_n > run_until_n);
            })
            .await
    }
}

fn rt() -> runtime::Runtime {
    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap()
}

[ Dauer der Verarbeitung: 0.12 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge