Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  rt_common.rs   Sprache: unbekannt

 
#![allow(unknown_lints, unexpected_cfgs)]
#![allow(clippy::needless_range_loop)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

// Tests to run on both current-thread & multi-thread runtime variants.

macro_rules! rt_test {
    ($($t:tt)*) => {
        mod current_thread_scheduler {
            $($t)*

            #[cfg(not(target_os="wasi"))]
            const NUM_WORKERS: usize = 1;

            fn rt() -> Arc<Runtime> {
                tokio::runtime::Builder::new_current_thread()
                    .enable_all()
                    .build()
                    .unwrap()
                    .into()
            }
        }

        #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
        mod threaded_scheduler_4_threads {
            $($t)*

            const NUM_WORKERS: usize = 4;

            fn rt() -> Arc<Runtime> {
                tokio::runtime::Builder::new_multi_thread()
                    .worker_threads(4)
                    .enable_all()
                    .build()
                    .unwrap()
                    .into()
            }
        }

        #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
        mod threaded_scheduler_1_thread {
            $($t)*

            const NUM_WORKERS: usize = 1;

            fn rt() -> Arc<Runtime> {
                tokio::runtime::Builder::new_multi_thread()
                    .worker_threads(1)
                    .enable_all()
                    .build()
                    .unwrap()
                    .into()
            }
        }

        #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
        #[cfg(tokio_unstable)]
        mod alt_threaded_scheduler_4_threads {
            $($t)*

            const NUM_WORKERS: usize = 4;

            fn rt() -> Arc<Runtime> {
                tokio::runtime::Builder::new_multi_thread()
                    .worker_threads(4)
                    .enable_all()
                    .build()
                    .unwrap()
                    .into()
            }
        }

        #[cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
        #[cfg(tokio_unstable)]
        mod alt_threaded_scheduler_1_thread {
            $($t)*

            const NUM_WORKERS: usize = 1;

            fn rt() -> Arc<Runtime> {
                tokio::runtime::Builder::new_multi_thread()
                    .worker_threads(1)
                    .enable_all()
                    .build()
                    .unwrap()
                    .into()
            }
        }
    }
}

#[test]
fn send_sync_bound() {
    use tokio::runtime::Runtime;
    fn is_send<T: Send + Sync>() {}

    is_send::<Runtime>();
}

rt_test! {
    #[cfg(not(target_os="wasi"))]
    use tokio::net::{TcpListener, TcpStream};
    #[cfg(not(target_os="wasi"))]
    use tokio::io::{AsyncReadExt, AsyncWriteExt};

    use tokio::runtime::Runtime;
    use tokio::sync::oneshot;
    use tokio::{task, time};

    #[cfg(not(target_os="wasi"))]
    use tokio_test::assert_err;
    use tokio_test::assert_ok;

    use futures::future::poll_fn;
    use std::future::Future;
    use std::pin::Pin;

    #[cfg(not(target_os="wasi"))]
    use std::sync::mpsc;

    use std::sync::Arc;
    use std::task::{Context, Poll};

    #[cfg(not(target_os="wasi"))]
    use std::thread;
    use std::time::{Duration, Instant};

    #[test]
    fn block_on_sync() {
        let rt = rt();

        let mut win = false;
        rt.block_on(async {
            win = true;
        });

        assert!(win);
    }


    #[cfg(not(target_os="wasi"))]
    #[test]
    fn block_on_async() {
        let rt = rt();

        let out = rt.block_on(async {
            let (tx, rx) = oneshot::channel();

            thread::spawn(move || {
                thread::sleep(Duration::from_millis(50));
                tx.send("ZOMG").unwrap();
            });

            assert_ok!(rx.await)
        });

        assert_eq!(out, "ZOMG");
    }

    #[test]
    fn spawn_one_bg() {
        let rt = rt();

        let out = rt.block_on(async {
            let (tx, rx) = oneshot::channel();

            tokio::spawn(async move {
                tx.send("ZOMG").unwrap();
            });

            assert_ok!(rx.await)
        });

        assert_eq!(out, "ZOMG");
    }

    #[test]
    fn spawn_one_join() {
        let rt = rt();

        let out = rt.block_on(async {
            let (tx, rx) = oneshot::channel();

            let handle = tokio::spawn(async move {
                tx.send("ZOMG").unwrap();
                "DONE"
            });

            let msg = assert_ok!(rx.await);

            let out = assert_ok!(handle.await);
            assert_eq!(out, "DONE");

            msg
        });

        assert_eq!(out, "ZOMG");
    }

    #[test]
    fn spawn_two() {
        let rt = rt();

        let out = rt.block_on(async {
            let (tx1, rx1) = oneshot::channel();
            let (tx2, rx2) = oneshot::channel();

            tokio::spawn(async move {
                assert_ok!(tx1.send("ZOMG"));
            });

            tokio::spawn(async move {
                let msg = assert_ok!(rx1.await);
                assert_ok!(tx2.send(msg));
            });

            assert_ok!(rx2.await)
        });

        assert_eq!(out, "ZOMG");
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn spawn_many_from_block_on() {
        use tokio::sync::mpsc;

        const ITER: usize = 200;

        let rt = rt();

        let out = rt.block_on(async {
            let (done_tx, mut done_rx) = mpsc::unbounded_channel();

            let mut txs = (0..ITER)
                .map(|i| {
                    let (tx, rx) = oneshot::channel();
                    let done_tx = done_tx.clone();

                    tokio::spawn(async move {
                        let msg = assert_ok!(rx.await);
                        assert_eq!(i, msg);
                        assert_ok!(done_tx.send(msg));
                    });

                    tx
                })
                .collect::<Vec<_>>();

            drop(done_tx);

            thread::spawn(move || {
                for (i, tx) in txs.drain(..).enumerate() {
                    assert_ok!(tx.send(i));
                }
            });

            let mut out = vec![];
            while let Some(i) = done_rx.recv().await {
                out.push(i);
            }

            out.sort_unstable();
            out
        });

        assert_eq!(ITER, out.len());

        for i in 0..ITER {
            assert_eq!(i, out[i]);
        }
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn spawn_many_from_task() {
        use tokio::sync::mpsc;

        const ITER: usize = 500;

        let rt = rt();

        let out = rt.block_on(async {
            tokio::spawn(async move {
                let (done_tx, mut done_rx) = mpsc::unbounded_channel();

                let mut txs = (0..ITER)
                    .map(|i| {
                        let (tx, rx) = oneshot::channel();
                        let done_tx = done_tx.clone();

                        tokio::spawn(async move {
                            let msg = assert_ok!(rx.await);
                            assert_eq!(i, msg);
                            assert_ok!(done_tx.send(msg));
                        });

                        tx
                    })
                    .collect::<Vec<_>>();

                drop(done_tx);

                thread::spawn(move || {
                    for (i, tx) in txs.drain(..).enumerate() {
                        assert_ok!(tx.send(i));
                    }
                });

                let mut out = vec![];
                while let Some(i) = done_rx.recv().await {
                    out.push(i);
                }

                out.sort_unstable();
                out
            }).await.unwrap()
        });

        assert_eq!(ITER, out.len());

        for i in 0..ITER {
            assert_eq!(i, out[i]);
        }
    }

    #[test]
    fn spawn_one_from_block_on_called_on_handle() {
        let rt = rt();
        let (tx, rx) = oneshot::channel();

        #[allow(clippy::async_yields_async)]
        let handle = rt.handle().block_on(async {
            tokio::spawn(async move {
                tx.send("ZOMG").unwrap();
                "DONE"
            })
        });

        let out = rt.block_on(async {
            let msg = assert_ok!(rx.await);

            let out = assert_ok!(handle.await);
            assert_eq!(out, "DONE");

            msg
        });

        assert_eq!(out, "ZOMG");
    }

    #[test]
    fn spawn_await_chain() {
        let rt = rt();

        let out = rt.block_on(async {
            assert_ok!(tokio::spawn(async {
                assert_ok!(tokio::spawn(async {
                    "hello"
                }).await)
            }).await)
        });

        assert_eq!(out, "hello");
    }

    #[test]
    fn outstanding_tasks_dropped() {
        let rt = rt();

        let cnt = Arc::new(());

        rt.block_on(async {
            let cnt = cnt.clone();

            tokio::spawn(poll_fn(move |_| {
                assert_eq!(2, Arc::strong_count(&cnt));
                Poll::<()>::Pending
            }));
        });

        assert_eq!(2, Arc::strong_count(&cnt));

        drop(rt);

        assert_eq!(1, Arc::strong_count(&cnt));
    }

    #[test]
    #[should_panic]
    fn nested_rt() {
        let rt1 = rt();
        let rt2 = rt();

        rt1.block_on(async { rt2.block_on(async { "hello" }) });
    }

    #[test]
    fn create_rt_in_block_on() {
        let rt1 = rt();
        let rt2 = rt1.block_on(async { rt() });
        let out = rt2.block_on(async { "ZOMG" });

        assert_eq!(out, "ZOMG");
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn complete_block_on_under_load() {
        let rt = rt();

        rt.block_on(async {
            let (tx, rx) = oneshot::channel();

            // Spin hard
            tokio::spawn(async {
                loop {
                    yield_once().await;
                }
            });

            thread::spawn(move || {
                thread::sleep(Duration::from_millis(50));
                assert_ok!(tx.send(()));
            });

            assert_ok!(rx.await);
        });
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn complete_task_under_load() {
        let rt = rt();

        rt.block_on(async {
            let (tx1, rx1) = oneshot::channel();
            let (tx2, rx2) = oneshot::channel();

            // Spin hard
            tokio::spawn(async {
                loop {
                    yield_once().await;
                }
            });

            thread::spawn(move || {
                thread::sleep(Duration::from_millis(50));
                assert_ok!(tx1.send(()));
            });

            tokio::spawn(async move {
                assert_ok!(rx1.await);
                assert_ok!(tx2.send(()));
            });

            assert_ok!(rx2.await);
        });
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn spawn_from_other_thread_idle() {
        let rt = rt();
        let handle = rt.clone();

        let (tx, rx) = oneshot::channel();

        thread::spawn(move || {
            thread::sleep(Duration::from_millis(50));

            handle.spawn(async move {
                assert_ok!(tx.send(()));
            });
        });

        rt.block_on(async move {
            assert_ok!(rx.await);
        });
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn spawn_from_other_thread_under_load() {
        let rt = rt();
        let handle = rt.clone();

        let (tx, rx) = oneshot::channel();

        thread::spawn(move || {
            handle.spawn(async move {
                assert_ok!(tx.send(()));
            });
        });

        rt.block_on(async move {
            // Spin hard
            tokio::spawn(async {
                loop {
                    yield_once().await;
                }
            });

            assert_ok!(rx.await);
        });
    }

    #[test]
    fn sleep_at_root() {
        let rt = rt();

        let now = Instant::now();
        let dur = Duration::from_millis(50);

        rt.block_on(async move {
            time::sleep(dur).await;
        });

        assert!(now.elapsed() >= dur);
    }

    #[test]
    fn sleep_in_spawn() {
        let rt = rt();

        let now = Instant::now();
        let dur = Duration::from_millis(50);

        rt.block_on(async move {
            let (tx, rx) = oneshot::channel();

            tokio::spawn(async move {
                time::sleep(dur).await;
                assert_ok!(tx.send(()));
            });

            assert_ok!(rx.await);
        });

        assert!(now.elapsed() >= dur);
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support bind
    #[test]
    fn block_on_socket() {
        let rt = rt();

        rt.block_on(async move {
            let (tx, rx) = oneshot::channel();

            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
            let addr = listener.local_addr().unwrap();

            tokio::spawn(async move {
                let _ = listener.accept().await;
                tx.send(()).unwrap();
            });

            TcpStream::connect(&addr).await.unwrap();
            rx.await.unwrap();
        });
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn spawn_from_blocking() {
        let rt = rt();

        let out = rt.block_on(async move {
            let inner = assert_ok!(tokio::task::spawn_blocking(|| {
                tokio::spawn(async move { "hello" })
            }).await);

            assert_ok!(inner.await)
        });

        assert_eq!(out, "hello")
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn spawn_blocking_from_blocking() {
        let rt = rt();

        let out = rt.block_on(async move {
            let inner = assert_ok!(tokio::task::spawn_blocking(|| {
                tokio::task::spawn_blocking(|| "hello")
            }).await);

            assert_ok!(inner.await)
        });

        assert_eq!(out, "hello")
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn sleep_from_blocking() {
        let rt = rt();

        rt.block_on(async move {
            assert_ok!(tokio::task::spawn_blocking(|| {
                let now = std::time::Instant::now();
                let dur = Duration::from_millis(1);

                // use the futures' block_on fn to make sure we aren't setting
                // any Tokio context
                futures::executor::block_on(async {
                    tokio::time::sleep(dur).await;
                });

                assert!(now.elapsed() >= dur);
            }).await);
        });
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support bind
    #[test]
    fn socket_from_blocking() {
        let rt = rt();

        rt.block_on(async move {
            let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
            let addr = assert_ok!(listener.local_addr());

            let peer = tokio::task::spawn_blocking(move || {
                // use the futures' block_on fn to make sure we aren't setting
                // any Tokio context
                futures::executor::block_on(async {
                    assert_ok!(TcpStream::connect(addr).await);
                });
            });

            // Wait for the client to connect
            let _ = assert_ok!(listener.accept().await);

            assert_ok!(peer.await);
        });
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn always_active_parker() {
        // This test it to show that we will always have
        // an active parker even if we call block_on concurrently

        let rt = rt();
        let rt2 = rt.clone();

        let (tx1, rx1) = oneshot::channel();
        let (tx2, rx2) = oneshot::channel();

        let jh1 = thread::spawn(move || {
                rt.block_on(async move {
                    rx2.await.unwrap();
                    time::sleep(Duration::from_millis(5)).await;
                    tx1.send(()).unwrap();
                });
        });

        let jh2 = thread::spawn(move || {
            rt2.block_on(async move {
                tx2.send(()).unwrap();
                time::sleep(Duration::from_millis(5)).await;
                rx1.await.unwrap();
                time::sleep(Duration::from_millis(5)).await;
            });
        });

        jh1.join().unwrap();
        jh2.join().unwrap();
    }

    #[test]
    // IOCP requires setting the "max thread" concurrency value. The sane,
    // default, is to set this to the number of cores. Threads that poll I/O
    // become associated with the IOCP handle. Once those threads sleep for any
    // reason (mutex), they yield their ownership.
    //
    // This test hits an edge case on windows where more threads than cores are
    // created, none of those threads ever yield due to being at capacity, so
    // IOCP gets "starved".
    //
    // For now, this is a very edge case that is probably not a real production
    // concern. There also isn't a great/obvious solution to take. For now, the
    // test is disabled.
    #[cfg(not(windows))]
    #[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads
    fn io_driver_called_when_under_load() {
        let rt = rt();

        // Create a lot of constant load. The scheduler will always be busy.
        for _ in 0..100 {
            rt.spawn(async {
                loop {
                    // Don't use Tokio's `yield_now()` to avoid special defer
                    // logic.
                    futures::future::poll_fn::<(), _>(|cx| {
                        cx.waker().wake_by_ref();
                        std::task::Poll::Pending
                    }).await;
                }
            });
        }

        // Do some I/O work
        rt.block_on(async {
            let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
            let addr = assert_ok!(listener.local_addr());

            let srv = tokio::spawn(async move {
                let (mut stream, _) = assert_ok!(listener.accept().await);
                assert_ok!(stream.write_all(b"hello world").await);
            });

            let cli = tokio::spawn(async move {
                let mut stream = assert_ok!(TcpStream::connect(addr).await);
                let mut dst = vec![0; 11];

                assert_ok!(stream.read_exact(&mut dst).await);
                assert_eq!(dst, b"hello world");
            });

            assert_ok!(srv.await);
            assert_ok!(cli.await);
        });
    }

    /// Tests that yielded tasks are not scheduled until **after** resource
    /// drivers are polled.
    ///
    /// The OS does not guarantee when I/O events are delivered, so there may be
    /// more yields than anticipated. This makes the test slightly flaky. To
    /// help avoid flakiness, we run the test 10 times and only fail it after
    /// 10 failures in a row.
    ///
    /// Note that if the test fails by panicking rather than by returning false,
    /// then we fail it immediately. That kind of failure should not happen
    /// spuriously.
    #[test]
    #[cfg(not(target_os="wasi"))]
    fn yield_defers_until_park() {
        for _ in 0..10 {
            if yield_defers_until_park_inner() {
                // test passed
                return;
            }

            // Wait a bit and run the test again.
            std::thread::sleep(std::time::Duration::from_secs(2));
        }

        panic!("yield_defers_until_park is failing consistently");
    }

    /// Implementation of `yield_defers_until_park` test. Returns `true` if the
    /// test passed.
    #[cfg(not(target_os="wasi"))]
    fn yield_defers_until_park_inner() -> bool {
        use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
        use std::sync::Barrier;

        let rt = rt();

        let flag = Arc::new(AtomicBool::new(false));
        let barrier = Arc::new(Barrier::new(NUM_WORKERS));

        rt.block_on(async {
            // Make sure other workers cannot steal tasks
            #[allow(clippy::reversed_empty_ranges)]
            for _ in 0..(NUM_WORKERS-1) {
                let flag = flag.clone();
                let barrier = barrier.clone();

                tokio::spawn(async move {
                    barrier.wait();

                    while !flag.load(SeqCst) {
                        std::thread::sleep(std::time::Duration::from_millis(1));
                    }
                });
            }

            barrier.wait();

            let (fail_test, fail_test_recv) = oneshot::channel::<()>();

            let jh = tokio::spawn(async move {
                // Create a TCP litener
                let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
                let addr = listener.local_addr().unwrap();

                tokio::join!(
                    async {
                        // Done in a blocking manner intentionally.
                        let _socket = std::net::TcpStream::connect(addr).unwrap();

                        // Yield until connected
                        let mut cnt = 0;
                        while !flag.load(SeqCst){
                            tokio::task::yield_now().await;
                            cnt += 1;

                            if cnt >= 10 {
                                // yielded too many times; report failure and
                                // sleep forever so that the `fail_test` branch
                                // of the `select!` below triggers.
                                let _ = fail_test.send(());
                                futures::future::pending::<()>().await;
                                break;
                            }
                        }
                    },
                    async {
                        let _ = listener.accept().await.unwrap();
                        flag.store(true, SeqCst);
                    }
                );
            });

            // Wait until the spawned task completes or fails. If no message is
            // sent on `fail_test`, then the test succeeds. Otherwise, it fails.
            let success = fail_test_recv.await.is_err();

            if success {
                // Check for panics in spawned task.
                jh.abort();
                jh.await.unwrap();
            }

            success
        })
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn client_server_block_on() {
        let rt = rt();
        let (tx, rx) = mpsc::channel();

        rt.block_on(async move { client_server(tx).await });

        assert_ok!(rx.try_recv());
        assert_err!(rx.try_recv());
    }

    #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads or panic recovery")]
    #[cfg(panic = "unwind")]
    #[test]
    fn panic_in_task() {
        let rt = rt();
        let (tx, rx) = oneshot::channel();

        struct Boom(Option<oneshot::Sender<()>>);

        impl Future for Boom {
            type Output = ();

            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
                panic!();
            }
        }

        impl Drop for Boom {
            fn drop(&mut self) {
                assert!(std::thread::panicking());
                self.0.take().unwrap().send(()).unwrap();
            }
        }

        rt.spawn(Boom(Some(tx)));
        assert_ok!(rt.block_on(rx));
    }

    #[test]
    #[should_panic]
    #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support panic recovery")]
    fn panic_in_block_on() {
        let rt = rt();
        rt.block_on(async { panic!() });
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    async fn yield_once() {
        let mut yielded = false;
        poll_fn(|cx| {
            if yielded {
                Poll::Ready(())
            } else {
                yielded = true;
                cx.waker().wake_by_ref();
                Poll::Pending
            }
        })
        .await
    }

    #[test]
    fn enter_and_spawn() {
        let rt = rt();
        let handle = {
            let _enter = rt.enter();
            tokio::spawn(async {})
        };

        assert_ok!(rt.block_on(handle));
    }

    #[test]
    fn eagerly_drops_futures_on_shutdown() {
        use std::sync::mpsc;

        struct Never {
            drop_tx: mpsc::Sender<()>,
        }

        impl Future for Never {
            type Output = ();

            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
                Poll::Pending
            }
        }

        impl Drop for Never {
            fn drop(&mut self) {
                self.drop_tx.send(()).unwrap();
            }
        }

        let rt = rt();

        let (drop_tx, drop_rx) = mpsc::channel();
        let (run_tx, run_rx) = oneshot::channel();

        rt.block_on(async move {
            tokio::spawn(async move {
                assert_ok!(run_tx.send(()));

                Never { drop_tx }.await
            });

            assert_ok!(run_rx.await);
        });

        drop(rt);

        assert_ok!(drop_rx.recv());
    }

    #[test]
    fn wake_while_rt_is_dropping() {
        use tokio::sync::Barrier;

        struct OnDrop<F: FnMut()>(F);

        impl<F: FnMut()> Drop for OnDrop<F> {
            fn drop(&mut self) {
                (self.0)()
            }
        }

        let (tx1, rx1) = oneshot::channel();
        let (tx2, rx2) = oneshot::channel();

        let barrier = Arc::new(Barrier::new(3));
        let barrier1 = barrier.clone();
        let barrier2 = barrier.clone();

        let rt = rt();

        rt.spawn(async move {
            let mut tx2 = Some(tx2);
            let _d = OnDrop(move || {
                let _ = tx2.take().unwrap().send(());
            });

            // Ensure a waker gets stored in oneshot 1.
            let _ = tokio::join!(rx1, barrier1.wait());
        });

        rt.spawn(async move {
            let mut tx1 = Some(tx1);
            let _d = OnDrop(move || {
                let _ = tx1.take().unwrap().send(());
            });

            // Ensure a waker gets stored in oneshot 2.
            let _ = tokio::join!(rx2, barrier2.wait());
        });

        // Wait until every oneshot channel has been polled.
        rt.block_on(barrier.wait());

        // Drop the rt. Regardless of which task is dropped first, its destructor will wake the
        // other task.
        drop(rt);
    }

    #[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
    #[test]
    fn io_notify_while_shutting_down() {
        use tokio::net::UdpSocket;
        use std::sync::Arc;

        for _ in 1..10 {
            let runtime = rt();

            runtime.block_on(async {
                let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
                let addr = socket.local_addr().unwrap();
                let send_half = Arc::new(socket);
                let recv_half = send_half.clone();

                tokio::spawn(async move {
                    let mut buf = [0];
                    loop {
                        recv_half.recv_from(&mut buf).await.unwrap();
                        std::thread::sleep(Duration::from_millis(2));
                    }
                });

                tokio::spawn(async move {
                    let buf = [0];
                    loop {
                        send_half.send_to(&buf, &addr).await.unwrap();
                        tokio::time::sleep(Duration::from_millis(1)).await;
                    }
                });

                tokio::time::sleep(Duration::from_millis(5)).await;
            });
        }
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn shutdown_timeout() {
        let (tx, rx) = oneshot::channel();
        let runtime = rt();

        runtime.block_on(async move {
            task::spawn_blocking(move || {
                tx.send(()).unwrap();
                thread::sleep(Duration::from_secs(10_000));
            });

            rx.await.unwrap();
        });

        Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support threads
    #[test]
    fn shutdown_timeout_0() {
        let runtime = rt();

        runtime.block_on(async move {
            task::spawn_blocking(move || {
                thread::sleep(Duration::from_secs(10_000));
            });
        });

        let now = Instant::now();
        Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
        assert!(now.elapsed().as_secs() < 1);
    }

    #[test]
    fn shutdown_wakeup_time() {
        let runtime = rt();

        runtime.block_on(async move {
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        });

        Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
    }

    // This test is currently ignored on Windows because of a
    // rust-lang issue in thread local storage destructors.
    // See https://github.com/rust-lang/rust/issues/74875
    #[test]
    #[cfg(not(windows))]
    #[cfg_attr(target_os = "wasi", ignore = "Wasi does not support threads")]
    fn runtime_in_thread_local() {
        use std::cell::RefCell;
        use std::thread;

        thread_local!(
            static R: RefCell<Option<Runtime>> = const { RefCell::new(None) };
        );

        thread::spawn(|| {
            R.with(|cell| {
                let rt = rt();
                let rt = Arc::try_unwrap(rt).unwrap();
                *cell.borrow_mut() = Some(rt);
            });

            let _rt = rt();
        }).join().unwrap();
    }

    #[cfg(not(target_os="wasi"))] // Wasi does not support bind
    async fn client_server(tx: mpsc::Sender<()>) {
        let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);

        // Get the assigned address
        let addr = assert_ok!(server.local_addr());

        // Spawn the server
        tokio::spawn(async move {
            // Accept a socket
            let (mut socket, _) = server.accept().await.unwrap();

            // Write some data
            socket.write_all(b"hello").await.unwrap();
        });

        let mut client = TcpStream::connect(&addr).await.unwrap();

        let mut buf = vec![];
        client.read_to_end(&mut buf).await.unwrap();

        assert_eq!(buf, b"hello");
        tx.send(()).unwrap();
    }

    #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
    #[test]
    fn local_set_block_on_socket() {
        let rt = rt();
        let local = task::LocalSet::new();

        local.block_on(&rt, async move {
            let (tx, rx) = oneshot::channel();

            let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
            let addr = listener.local_addr().unwrap();

            task::spawn_local(async move {
                let _ = listener.accept().await;
                tx.send(()).unwrap();
            });

            TcpStream::connect(&addr).await.unwrap();
            rx.await.unwrap();
        });
    }

    #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
    #[test]
    fn local_set_client_server_block_on() {
        let rt = rt();
        let (tx, rx) = mpsc::channel();

        let local = task::LocalSet::new();

        local.block_on(&rt, async move { client_server_local(tx).await });

        assert_ok!(rx.try_recv());
        assert_err!(rx.try_recv());
    }

    #[cfg(not(target_os = "wasi"))] // Wasi does not support bind
    async fn client_server_local(tx: mpsc::Sender<()>) {
        let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);

        // Get the assigned address
        let addr = assert_ok!(server.local_addr());

        // Spawn the server
        task::spawn_local(async move {
            // Accept a socket
            let (mut socket, _) = server.accept().await.unwrap();

            // Write some data
            socket.write_all(b"hello").await.unwrap();
        });

        let mut client = TcpStream::connect(&addr).await.unwrap();

        let mut buf = vec![];
        client.read_to_end(&mut buf).await.unwrap();

        assert_eq!(buf, b"hello");
        tx.send(()).unwrap();
    }

    #[test]
    fn coop() {
        use std::task::Poll::Ready;
        use tokio::sync::mpsc;

        let rt = rt();

        rt.block_on(async {
            let (send, mut recv) = mpsc::unbounded_channel();

            // Send a bunch of messages.
            for _ in 0..1_000 {
                send.send(()).unwrap();
            }

            poll_fn(|cx| {
                // At least one response should return pending.
                for _ in 0..1_000 {
                    if recv.poll_recv(cx).is_pending() {
                        return Ready(());
                    }
                }

                panic!("did not yield");
            }).await;
        });
    }

    #[test]
    fn coop_unconstrained() {
        use std::task::Poll::Ready;
        use tokio::sync::mpsc;

        let rt = rt();

        rt.block_on(async {
            let (send, mut recv) = mpsc::unbounded_channel();

            // Send a bunch of messages.
            for _ in 0..1_000 {
                send.send(()).unwrap();
            }

            tokio::task::unconstrained(poll_fn(|cx| {
                // All the responses should be ready.
                for _ in 0..1_000 {
                    assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(())));
                }

                Ready(())
            })).await;
        });
    }

    #[cfg(tokio_unstable)]
    #[test]
    fn coop_consume_budget() {
        let rt = rt();

        rt.block_on(async {
            poll_fn(|cx| {
                let counter = Arc::new(std::sync::Mutex::new(0));
                let counter_clone = Arc::clone(&counter);
                let mut worker = Box::pin(async move {
                    // Consume the budget until a yield happens
                    for _ in 0..1000 {
                        *counter.lock().unwrap() += 1;
                        task::consume_budget().await
                    }
                });
                // Assert that the worker was yielded and it didn't manage
                // to finish the whole work (assuming the total budget of 128)
                assert!(Pin::new(&mut worker).poll(cx).is_pending());
                assert!(*counter_clone.lock().unwrap() < 1000);
                std::task::Poll::Ready(())
            }).await;
        });
    }

    // Tests that the "next task" scheduler optimization is not able to starve
    // other tasks.
    #[test]
    fn ping_pong_saturation() {
        use std::sync::atomic::{Ordering, AtomicBool};
        use tokio::sync::mpsc;

        const NUM: usize = 100;

        let rt = rt();

        let running = Arc::new(AtomicBool::new(true));

        rt.block_on(async {
            let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();

            let mut tasks = vec![];
            // Spawn a bunch of tasks that ping-pong between each other to
            // saturate the runtime.
            for _ in 0..NUM {
                let (tx1, mut rx1) = mpsc::unbounded_channel();
                let (tx2, mut rx2) = mpsc::unbounded_channel();
                let spawned_tx = spawned_tx.clone();
                let running = running.clone();
                tasks.push(task::spawn(async move {
                    spawned_tx.send(()).unwrap();


                    while running.load(Ordering::Relaxed) {
                        tx1.send(()).unwrap();
                        rx2.recv().await.unwrap();
                    }

                    // Close the channel and wait for the other task to exit.
                    drop(tx1);
                    assert!(rx2.recv().await.is_none());
                }));

                tasks.push(task::spawn(async move {
                    while rx1.recv().await.is_some() {
                        tx2.send(()).unwrap();
                    }
                }));
            }

            for _ in 0..NUM {
                spawned_rx.recv().await.unwrap();
            }

            // spawn another task and wait for it to complete
            let handle = task::spawn(async {
                for _ in 0..5 {
                    // Yielding forces it back into the local queue.
                    task::yield_now().await;
                }
            });
            handle.await.unwrap();
            running.store(false, Ordering::Relaxed);
            for t in tasks {
                t.await.unwrap();
            }
        });
    }

    #[test]
    #[cfg(not(target_os="wasi"))]
    fn shutdown_concurrent_spawn() {
        const NUM_TASKS: usize = 10_000;
        for _ in 0..5 {
            let (tx, rx) = std::sync::mpsc::channel();
            let rt = rt();

            let mut txs = vec![];

            for _ in 0..NUM_TASKS {
                let (tx, rx) = tokio::sync::oneshot::channel();
                txs.push(tx);
                rt.spawn(async move {
                    rx.await.unwrap();
                });
            }

            // Prime the tasks
            rt.block_on(async { tokio::task::yield_now().await });

            let th = std::thread::spawn(move || {
                tx.send(()).unwrap();
                for tx in txs.drain(..) {
                    let _ = tx.send(());
                }
            });

            rx.recv().unwrap();
            drop(rt);

            th.join().unwrap();
        }
    }

    #[test]
    #[cfg_attr(target_family = "wasm", ignore)]
    fn wake_by_ref_from_thread_local() {
        wake_from_thread_local(true);
    }

    #[test]
    #[cfg_attr(target_family = "wasm", ignore)]
    fn wake_by_val_from_thread_local() {
        wake_from_thread_local(false);
    }

    fn wake_from_thread_local(by_ref: bool) {
        use std::cell::RefCell;
        use std::sync::mpsc::{channel, Sender};
        use std::task::Waker;

        struct TLData {
            by_ref: bool,
            waker: Option<Waker>,
            done: Sender<bool>,
        }

        impl Drop for TLData {
            fn drop(&mut self) {
                if self.by_ref {
                    self.waker.take().unwrap().wake_by_ref();
                } else {
                    self.waker.take().unwrap().wake();
                }
                let _ = self.done.send(true);
            }
        }

        std::thread_local! {
            static TL_DATA: RefCell<Option<TLData>> = const { RefCell::new(None) };
        };

        let (send, recv) = channel();

        std::thread::spawn(move || {
            let rt = rt();
            rt.block_on(rt.spawn(poll_fn(move |cx| {
                let waker = cx.waker().clone();
                let send = send.clone();
                TL_DATA.with(|tl| {
                    tl.replace(Some(TLData {
                        by_ref,
                        waker: Some(waker),
                        done: send,
                    }));
                });
                Poll::Ready(())
            })))
            .unwrap();
        })
        .join()
        .unwrap();

        assert!(recv.recv().unwrap());
    }
}

[ Dauer der Verarbeitung: 0.31 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge