Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  local_pool.rs   Sprache: unbekannt

 
use futures::channel::oneshot;
use futures::executor::LocalPool;
use futures::future::{self, lazy, poll_fn, Future};
use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

struct Pending(Rc<()>);

impl Future for Pending {
    type Output = ();

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
        Poll::Pending
    }
}

fn pending() -> Pending {
    Pending(Rc::new(()))
}

#[test]
fn run_until_single_future() {
    let mut cnt = 0;

    {
        let mut pool = LocalPool::new();
        let fut = lazy(|_| {
            cnt += 1;
        });
        pool.run_until(fut);
    }

    assert_eq!(cnt, 1);
}

#[test]
fn run_until_ignores_spawned() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
    pool.run_until(lazy(|_| ()));
}

#[test]
fn run_until_executes_spawned() {
    let (tx, rx) = oneshot::channel();
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    spawn
        .spawn_local_obj(
            Box::pin(lazy(move |_| {
                tx.send(()).unwrap();
            }))
            .into(),
        )
        .unwrap();
    pool.run_until(rx).unwrap();
}

#[test]
fn run_returns_if_empty() {
    let mut pool = LocalPool::new();
    pool.run();
    pool.run();
}

#[test]
fn run_executes_spawned() {
    let cnt = Rc::new(Cell::new(0));
    let cnt2 = cnt.clone();

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    let spawn2 = pool.spawner();

    spawn
        .spawn_local_obj(
            Box::pin(lazy(move |_| {
                spawn2
                    .spawn_local_obj(
                        Box::pin(lazy(move |_| {
                            cnt2.set(cnt2.get() + 1);
                        }))
                        .into(),
                    )
                    .unwrap();
            }))
            .into(),
        )
        .unwrap();

    pool.run();

    assert_eq!(cnt.get(), 1);
}

#[test]
fn run_spawn_many() {
    const ITER: usize = 200;

    let cnt = Rc::new(Cell::new(0));

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    for _ in 0..ITER {
        let cnt = cnt.clone();
        spawn
            .spawn_local_obj(
                Box::pin(lazy(move |_| {
                    cnt.set(cnt.get() + 1);
                }))
                .into(),
            )
            .unwrap();
    }

    pool.run();

    assert_eq!(cnt.get(), ITER);
}

#[test]
fn try_run_one_returns_if_empty() {
    let mut pool = LocalPool::new();
    assert!(!pool.try_run_one());
}

#[test]
fn try_run_one_executes_one_ready() {
    const ITER: usize = 200;

    let cnt = Rc::new(Cell::new(0));

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    for _ in 0..ITER {
        spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();

        let cnt = cnt.clone();
        spawn
            .spawn_local_obj(
                Box::pin(lazy(move |_| {
                    cnt.set(cnt.get() + 1);
                }))
                .into(),
            )
            .unwrap();

        spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
    }

    for i in 0..ITER {
        assert_eq!(cnt.get(), i);
        assert!(pool.try_run_one());
        assert_eq!(cnt.get(), i + 1);
    }
    assert!(!pool.try_run_one());
}

#[test]
fn try_run_one_returns_on_no_progress() {
    const ITER: usize = 10;

    let cnt = Rc::new(Cell::new(0));

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    let waker: Rc<Cell<Option<Waker>>> = Rc::new(Cell::new(None));
    {
        let cnt = cnt.clone();
        let waker = waker.clone();
        spawn
            .spawn_local_obj(
                Box::pin(poll_fn(move |ctx| {
                    cnt.set(cnt.get() + 1);
                    waker.set(Some(ctx.waker().clone()));
                    if cnt.get() == ITER {
                        Poll::Ready(())
                    } else {
                        Poll::Pending
                    }
                }))
                .into(),
            )
            .unwrap();
    }

    for i in 0..ITER - 1 {
        assert_eq!(cnt.get(), i);
        assert!(!pool.try_run_one());
        assert_eq!(cnt.get(), i + 1);
        let w = waker.take();
        assert!(w.is_some());
        w.unwrap().wake();
    }
    assert!(pool.try_run_one());
    assert_eq!(cnt.get(), ITER);
}

#[test]
fn try_run_one_runs_sub_futures() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    let cnt = Rc::new(Cell::new(0));

    let inner_spawner = spawn.clone();
    let cnt1 = cnt.clone();
    spawn
        .spawn_local_obj(
            Box::pin(poll_fn(move |_| {
                cnt1.set(cnt1.get() + 1);

                let cnt2 = cnt1.clone();
                inner_spawner
                    .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
                    .unwrap();

                Poll::Pending
            }))
            .into(),
        )
        .unwrap();

    pool.try_run_one();
    assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_returns_if_empty() {
    let mut pool = LocalPool::new();
    pool.run_until_stalled();
    pool.run_until_stalled();
}

#[test]
fn run_until_stalled_returns_multiple_times() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    let cnt = Rc::new(Cell::new(0));

    let cnt1 = cnt.clone();
    spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt1.set(cnt1.get() + 1))).into()).unwrap();
    pool.run_until_stalled();
    assert_eq!(cnt.get(), 1);

    let cnt2 = cnt.clone();
    spawn.spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into()).unwrap();
    pool.run_until_stalled();
    assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_runs_spawned_sub_futures() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();
    let cnt = Rc::new(Cell::new(0));

    let inner_spawner = spawn.clone();
    let cnt1 = cnt.clone();
    spawn
        .spawn_local_obj(
            Box::pin(poll_fn(move |_| {
                cnt1.set(cnt1.get() + 1);

                let cnt2 = cnt1.clone();
                inner_spawner
                    .spawn_local_obj(Box::pin(lazy(move |_| cnt2.set(cnt2.get() + 1))).into())
                    .unwrap();

                Poll::Pending
            }))
            .into(),
        )
        .unwrap();

    pool.run_until_stalled();
    assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_executes_all_ready() {
    const ITER: usize = if cfg!(miri) { 50 } else { 200 };
    const PER_ITER: usize = 3;

    let cnt = Rc::new(Cell::new(0));

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    for i in 0..ITER {
        for _ in 0..PER_ITER {
            spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();

            let cnt = cnt.clone();
            spawn
                .spawn_local_obj(
                    Box::pin(lazy(move |_| {
                        cnt.set(cnt.get() + 1);
                    }))
                    .into(),
                )
                .unwrap();

            // also add some pending tasks to test if they are ignored
            spawn.spawn_local_obj(Box::pin(pending()).into()).unwrap();
        }
        assert_eq!(cnt.get(), i * PER_ITER);
        pool.run_until_stalled();
        assert_eq!(cnt.get(), (i + 1) * PER_ITER);
    }
}

#[test]
#[should_panic]
fn nesting_run() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    spawn
        .spawn_obj(
            Box::pin(lazy(|_| {
                let mut pool = LocalPool::new();
                pool.run();
            }))
            .into(),
        )
        .unwrap();

    pool.run();
}

#[test]
#[should_panic]
fn nesting_run_run_until_stalled() {
    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    spawn
        .spawn_obj(
            Box::pin(lazy(|_| {
                let mut pool = LocalPool::new();
                pool.run_until_stalled();
            }))
            .into(),
        )
        .unwrap();

    pool.run();
}

#[test]
fn tasks_are_scheduled_fairly() {
    let state = Rc::new(RefCell::new([0, 0]));

    struct Spin {
        state: Rc<RefCell<[i32; 2]>>,
        idx: usize,
    }

    impl Future for Spin {
        type Output = ();

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
            let mut state = self.state.borrow_mut();

            if self.idx == 0 {
                let diff = state[0] - state[1];

                assert!(diff.abs() <= 1);

                if state[0] >= 50 {
                    return Poll::Ready(());
                }
            }

            state[self.idx] += 1;

            if state[self.idx] >= 100 {
                return Poll::Ready(());
            }

            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }

    let mut pool = LocalPool::new();
    let spawn = pool.spawner();

    spawn.spawn_local_obj(Box::pin(Spin { state: state.clone(), idx: 0 }).into()).unwrap();

    spawn.spawn_local_obj(Box::pin(Spin { state, idx: 1 }).into()).unwrap();

    pool.run();
}

// Tests that the use of park/unpark in user-code has no
// effect on the expected behavior of the executor.
#[test]
fn park_unpark_independence() {
    let mut done = false;

    let future = future::poll_fn(move |cx| {
        if done {
            return Poll::Ready(());
        }
        done = true;
        cx.waker().clone().wake(); // (*)
                                   // some user-code that temporarily parks the thread
        let test = thread::current();
        let latch = Arc::new(AtomicBool::new(false));
        let signal = latch.clone();
        thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            signal.store(true, Ordering::SeqCst);
            test.unpark()
        });
        while !latch.load(Ordering::Relaxed) {
            thread::park();
        }
        Poll::Pending // Expect to be called again due to (*).
    });

    futures::executor::block_on(future)
}

struct SelfWaking {
    wakeups_remaining: Rc<RefCell<usize>>,
}

impl Future for SelfWaking {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if *self.wakeups_remaining.borrow() != 0 {
            *self.wakeups_remaining.borrow_mut() -= 1;
            cx.waker().wake_by_ref();
        }

        Poll::Pending
    }
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `run_until_stalled`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_run_until_stalled() {
    let wakeups_remaining = Rc::new(RefCell::new(10));

    let mut pool = LocalPool::new();
    let spawner = pool.spawner();
    for _ in 0..3 {
        let wakeups_remaining = Rc::clone(&wakeups_remaining);
        spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
    }

    // This should keep polling until there are no more wakeups.
    pool.run_until_stalled();

    assert_eq!(*wakeups_remaining.borrow(), 0);
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `try_run_one`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_try_run_one() {
    let wakeups_remaining = Rc::new(RefCell::new(10));

    let mut pool = LocalPool::new();
    let spawner = pool.spawner();
    for _ in 0..3 {
        let wakeups_remaining = Rc::clone(&wakeups_remaining);
        spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
    }

    spawner.spawn(future::ready(())).unwrap();

    // The `ready` future should complete.
    assert!(pool.try_run_one());

    // The self-waking futures are each polled once.
    assert_eq!(*wakeups_remaining.borrow(), 7);
}

[ Dauer der Verarbeitung: 0.31 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge