Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  time_delay_queue.rs   Sprache: unbekannt

 
#![allow(clippy::blacklisted_name)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::time::{self, sleep, sleep_until, Duration, Instant};
use tokio_test::{assert_pending, assert_ready, task};
use tokio_util::time::DelayQueue;

macro_rules! poll {
    ($queue:ident) => {
        $queue.enter(|cx, mut queue| queue.poll_expired(cx))
    };
}

macro_rules! assert_ready_some {
    ($e:expr) => {{
        match assert_ready!($e) {
            Some(v) => v,
            None => panic!("None"),
        }
    }};
}

#[tokio::test]
async fn single_immediate_delay() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());
    let _key = queue.insert_at("foo", Instant::now());

    // Advance time by 1ms to handle thee rounding
    sleep(ms(1)).await;

    assert_ready_some!(poll!(queue));

    let entry = assert_ready!(poll!(queue));
    assert!(entry.is_none())
}

#[tokio::test]
async fn multi_immediate_delays() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let _k = queue.insert_at("1", Instant::now());
    let _k = queue.insert_at("2", Instant::now());
    let _k = queue.insert_at("3", Instant::now());

    sleep(ms(1)).await;

    let mut res = vec![];

    while res.len() < 3 {
        let entry = assert_ready_some!(poll!(queue));
        res.push(entry.into_inner());
    }

    let entry = assert_ready!(poll!(queue));
    assert!(entry.is_none());

    res.sort_unstable();

    assert_eq!("1", res[0]);
    assert_eq!("2", res[1]);
    assert_eq!("3", res[2]);
}

#[tokio::test]
async fn single_short_delay() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());
    let _key = queue.insert_at("foo", Instant::now() + ms(5));

    assert_pending!(poll!(queue));

    sleep(ms(1)).await;

    assert!(!queue.is_woken());

    sleep(ms(5)).await;

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue));
    assert_eq!(*entry.get_ref(), "foo");

    let entry = assert_ready!(poll!(queue));
    assert!(entry.is_none());
}

#[tokio::test]
async fn multi_delay_at_start() {
    time::pause();

    let long = 262_144 + 9 * 4096;
    let delays = &[1000, 2, 234, long, 60, 10];

    let mut queue = task::spawn(DelayQueue::new());

    // Setup the delays
    for &i in delays {
        let _key = queue.insert_at(i, Instant::now() + ms(i));
    }

    assert_pending!(poll!(queue));
    assert!(!queue.is_woken());

    let start = Instant::now();
    for elapsed in 0..1200 {
        println!("elapsed: {:?}", elapsed);
        let elapsed = elapsed + 1;
        tokio::time::sleep_until(start + ms(elapsed)).await;

        if delays.contains(&elapsed) {
            assert!(queue.is_woken());
            assert_ready!(poll!(queue));
            assert_pending!(poll!(queue));
        } else if queue.is_woken() {
            let cascade = &[192, 960];
            assert!(
                cascade.contains(&elapsed),
                "elapsed={} dt={:?}",
                elapsed,
                Instant::now() - start
            );

            assert_pending!(poll!(queue));
        }
    }
    println!("finished multi_delay_start");
}

#[tokio::test]
async fn insert_in_past_fires_immediately() {
    println!("running insert_in_past_fires_immediately");
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());
    let now = Instant::now();

    sleep(ms(10)).await;

    queue.insert_at("foo", now);

    assert_ready!(poll!(queue));
    println!("finished insert_in_past_fires_immediately");
}

#[tokio::test]
async fn remove_entry() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let key = queue.insert_at("foo", Instant::now() + ms(5));

    assert_pending!(poll!(queue));

    let entry = queue.remove(&key);
    assert_eq!(entry.into_inner(), "foo");

    sleep(ms(10)).await;

    let entry = assert_ready!(poll!(queue));
    assert!(entry.is_none());
}

#[tokio::test]
async fn reset_entry() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();
    let key = queue.insert_at("foo", now + ms(5));

    assert_pending!(poll!(queue));
    sleep(ms(1)).await;

    queue.reset_at(&key, now + ms(10));

    assert_pending!(poll!(queue));

    sleep(ms(7)).await;

    assert!(!queue.is_woken());

    assert_pending!(poll!(queue));

    sleep(ms(3)).await;

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue));
    assert_eq!(*entry.get_ref(), "foo");

    let entry = assert_ready!(poll!(queue));
    assert!(entry.is_none())
}

// Reproduces tokio-rs/tokio#849.
#[tokio::test]
async fn reset_much_later() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();
    sleep(ms(1)).await;

    let key = queue.insert_at("foo", now + ms(200));
    assert_pending!(poll!(queue));

    sleep(ms(3)).await;

    queue.reset_at(&key, now + ms(10));

    sleep(ms(20)).await;

    assert!(queue.is_woken());
}

// Reproduces tokio-rs/tokio#849.
#[tokio::test]
async fn reset_twice() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());
    let now = Instant::now();

    sleep(ms(1)).await;

    let key = queue.insert_at("foo", now + ms(200));

    assert_pending!(poll!(queue));

    sleep(ms(3)).await;

    queue.reset_at(&key, now + ms(50));

    sleep(ms(20)).await;

    queue.reset_at(&key, now + ms(40));

    sleep(ms(20)).await;

    assert!(queue.is_woken());
}

/// Regression test: Given an entry inserted with a deadline in the past, so
/// that it is placed directly on the expired queue, reset the entry to a
/// deadline in the future. Validate that this leaves the entry and queue in an
/// internally consistent state by running an additional reset on the entry
/// before polling it to completion.
#[tokio::test]
async fn repeatedly_reset_entry_inserted_as_expired() {
    time::pause();
    let mut queue = task::spawn(DelayQueue::new());
    let now = Instant::now();

    let key = queue.insert_at("foo", now - ms(100));

    queue.reset_at(&key, now + ms(100));
    queue.reset_at(&key, now + ms(50));

    assert_pending!(poll!(queue));

    time::sleep_until(now + ms(60)).await;

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "foo");

    let entry = assert_ready!(poll!(queue));
    assert!(entry.is_none());
}

#[tokio::test]
async fn remove_expired_item() {
    time::pause();

    let mut queue = DelayQueue::new();

    let now = Instant::now();

    sleep(ms(10)).await;

    let key = queue.insert_at("foo", now);

    let entry = queue.remove(&key);
    assert_eq!(entry.into_inner(), "foo");
}

/// Regression test: it should be possible to remove entries which fall in the
/// 0th slot of the internal timer wheel — that is, entries whose expiration
/// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
/// is equal to the wheel's current elapsed time.
#[tokio::test]
async fn remove_at_timer_wheel_threshold() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    let key1 = queue.insert_at("foo", now + ms(64));
    let key2 = queue.insert_at("bar", now + ms(64));

    sleep(ms(80)).await;

    let entry = assert_ready_some!(poll!(queue)).into_inner();

    match entry {
        "foo" => {
            let entry = queue.remove(&key2).into_inner();
            assert_eq!(entry, "bar");
        }
        "bar" => {
            let entry = queue.remove(&key1).into_inner();
            assert_eq!(entry, "foo");
        }
        other => panic!("other: {:?}", other),
    }
}

#[tokio::test]
async fn expires_before_last_insert() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    queue.insert_at("foo", now + ms(10_000));

    // Delay should be set to 8.192s here.
    assert_pending!(poll!(queue));

    // Delay should be set to the delay of the new item here
    queue.insert_at("bar", now + ms(600));

    assert_pending!(poll!(queue));

    sleep(ms(600)).await;

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "bar");
}

#[tokio::test]
async fn multi_reset() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    let one = queue.insert_at("one", now + ms(200));
    let two = queue.insert_at("two", now + ms(250));

    assert_pending!(poll!(queue));

    queue.reset_at(&one, now + ms(300));
    queue.reset_at(&two, now + ms(350));
    queue.reset_at(&one, now + ms(400));

    sleep(ms(310)).await;

    assert_pending!(poll!(queue));

    sleep(ms(50)).await;

    let entry = assert_ready_some!(poll!(queue));
    assert_eq!(*entry.get_ref(), "two");

    assert_pending!(poll!(queue));

    sleep(ms(50)).await;

    let entry = assert_ready_some!(poll!(queue));
    assert_eq!(*entry.get_ref(), "one");

    let entry = assert_ready!(poll!(queue));
    assert!(entry.is_none())
}

#[tokio::test]
async fn expire_first_key_when_reset_to_expire_earlier() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    let one = queue.insert_at("one", now + ms(200));
    queue.insert_at("two", now + ms(250));

    assert_pending!(poll!(queue));

    queue.reset_at(&one, now + ms(100));

    sleep(ms(100)).await;

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "one");
}

#[tokio::test]
async fn expire_second_key_when_reset_to_expire_earlier() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    queue.insert_at("one", now + ms(200));
    let two = queue.insert_at("two", now + ms(250));

    assert_pending!(poll!(queue));

    queue.reset_at(&two, now + ms(100));

    sleep(ms(100)).await;

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "two");
}

#[tokio::test]
async fn reset_first_expiring_item_to_expire_later() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    let one = queue.insert_at("one", now + ms(200));
    let _two = queue.insert_at("two", now + ms(250));

    assert_pending!(poll!(queue));

    queue.reset_at(&one, now + ms(300));
    sleep(ms(250)).await;

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "two");
}

#[tokio::test]
async fn insert_before_first_after_poll() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    let _one = queue.insert_at("one", now + ms(200));

    assert_pending!(poll!(queue));

    let _two = queue.insert_at("two", now + ms(100));

    sleep(ms(99)).await;

    assert_pending!(poll!(queue));

    sleep(ms(1)).await;

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "two");
}

#[tokio::test]
async fn insert_after_ready_poll() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    queue.insert_at("1", now + ms(100));
    queue.insert_at("2", now + ms(100));
    queue.insert_at("3", now + ms(100));

    assert_pending!(poll!(queue));

    sleep(ms(100)).await;

    assert!(queue.is_woken());

    let mut res = vec![];

    while res.len() < 3 {
        let entry = assert_ready_some!(poll!(queue));
        res.push(entry.into_inner());
        queue.insert_at("foo", now + ms(500));
    }

    res.sort_unstable();

    assert_eq!("1", res[0]);
    assert_eq!("2", res[1]);
    assert_eq!("3", res[2]);
}

#[tokio::test]
async fn reset_later_after_slot_starts() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    let foo = queue.insert_at("foo", now + ms(100));

    assert_pending!(poll!(queue));

    sleep_until(now + Duration::from_millis(80)).await;

    assert!(!queue.is_woken());

    // At this point the queue hasn't been polled, so `elapsed` on the wheel
    // for the queue is still at 0 and hence the 1ms resolution slots cover
    // [0-64).  Resetting the time on the entry to 120 causes it to get put in
    // the [64-128) slot.  As the queue knows that the first entry is within
    // that slot, but doesn't know when, it must wake immediately to advance
    // the wheel.
    queue.reset_at(&foo, now + ms(120));
    assert!(queue.is_woken());

    assert_pending!(poll!(queue));

    sleep_until(now + Duration::from_millis(119)).await;
    assert!(!queue.is_woken());

    sleep(ms(1)).await;
    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "foo");
}

#[tokio::test]
async fn reset_inserted_expired() {
    time::pause();
    let mut queue = task::spawn(DelayQueue::new());
    let now = Instant::now();

    let key = queue.insert_at("foo", now - ms(100));

    // this causes the panic described in #2473
    queue.reset_at(&key, now + ms(100));

    assert_eq!(1, queue.len());

    sleep(ms(200)).await;

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "foo");

    assert_eq!(queue.len(), 0);
}

#[tokio::test]
async fn reset_earlier_after_slot_starts() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    let foo = queue.insert_at("foo", now + ms(200));

    assert_pending!(poll!(queue));

    sleep_until(now + Duration::from_millis(80)).await;

    assert!(!queue.is_woken());

    // At this point the queue hasn't been polled, so `elapsed` on the wheel
    // for the queue is still at 0 and hence the 1ms resolution slots cover
    // [0-64).  Resetting the time on the entry to 120 causes it to get put in
    // the [64-128) slot.  As the queue knows that the first entry is within
    // that slot, but doesn't know when, it must wake immediately to advance
    // the wheel.
    queue.reset_at(&foo, now + ms(120));
    assert!(queue.is_woken());

    assert_pending!(poll!(queue));

    sleep_until(now + Duration::from_millis(119)).await;
    assert!(!queue.is_woken());

    sleep(ms(1)).await;
    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "foo");
}

#[tokio::test]
async fn insert_in_past_after_poll_fires_immediately() {
    time::pause();

    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    queue.insert_at("foo", now + ms(200));

    assert_pending!(poll!(queue));

    sleep(ms(80)).await;

    assert!(!queue.is_woken());
    queue.insert_at("bar", now + ms(40));

    assert!(queue.is_woken());

    let entry = assert_ready_some!(poll!(queue)).into_inner();
    assert_eq!(entry, "bar");
}

#[tokio::test]
async fn delay_queue_poll_expired_when_empty() {
    let mut delay_queue = task::spawn(DelayQueue::new());
    let key = delay_queue.insert(0, std::time::Duration::from_secs(10));
    assert_pending!(poll!(delay_queue));

    delay_queue.remove(&key);
    assert!(assert_ready!(poll!(delay_queue)).is_none());
}

#[tokio::test(start_paused = true)]
async fn compact_expire_empty() {
    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    queue.insert_at("foo1", now + ms(10));
    queue.insert_at("foo2", now + ms(10));

    sleep(ms(10)).await;

    let mut res = vec![];
    while res.len() < 2 {
        let entry = assert_ready_some!(poll!(queue));
        res.push(entry.into_inner());
    }

    queue.compact();

    assert_eq!(queue.len(), 0);
    assert_eq!(queue.capacity(), 0);
}

#[tokio::test(start_paused = true)]
async fn compact_remove_empty() {
    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    let key1 = queue.insert_at("foo1", now + ms(10));
    let key2 = queue.insert_at("foo2", now + ms(10));

    queue.remove(&key1);
    queue.remove(&key2);

    queue.compact();

    assert_eq!(queue.len(), 0);
    assert_eq!(queue.capacity(), 0);
}

#[tokio::test(start_paused = true)]
// Trigger a re-mapping of keys in the slab due to a `compact` call and
// test removal of re-mapped keys
async fn compact_remove_remapped_keys() {
    let mut queue = task::spawn(DelayQueue::new());

    let now = Instant::now();

    queue.insert_at("foo1", now + ms(10));
    queue.insert_at("foo2", now + ms(10));

    // should be assigned indices 3 and 4
    let key3 = queue.insert_at("foo3", now + ms(20));
    let key4 = queue.insert_at("foo4", now + ms(20));

    sleep(ms(10)).await;

    let mut res = vec![];
    while res.len() < 2 {
        let entry = assert_ready_some!(poll!(queue));
        res.push(entry.into_inner());
    }

    // items corresponding to `foo3` and `foo4` will be assigned
    // new indices here
    queue.compact();

    queue.insert_at("foo5", now + ms(10));

    // test removal of re-mapped keys
    let expired3 = queue.remove(&key3);
    let expired4 = queue.remove(&key4);

    assert_eq!(expired3.into_inner(), "foo3");
    assert_eq!(expired4.into_inner(), "foo4");

    queue.compact();
    assert_eq!(queue.len(), 1);
    assert_eq!(queue.capacity(), 1);
}

#[tokio::test(start_paused = true)]
async fn compact_change_deadline() {
    let mut queue = task::spawn(DelayQueue::new());

    let mut now = Instant::now();

    queue.insert_at("foo1", now + ms(10));
    queue.insert_at("foo2", now + ms(10));

    // should be assigned indices 3 and 4
    queue.insert_at("foo3", now + ms(20));
    let key4 = queue.insert_at("foo4", now + ms(20));

    sleep(ms(10)).await;

    let mut res = vec![];
    while res.len() < 2 {
        let entry = assert_ready_some!(poll!(queue));
        res.push(entry.into_inner());
    }

    // items corresponding to `foo3` and `foo4` should be assigned
    // new indices
    queue.compact();

    now = Instant::now();

    queue.insert_at("foo5", now + ms(10));
    let key6 = queue.insert_at("foo6", now + ms(10));

    queue.reset_at(&key4, now + ms(20));
    queue.reset_at(&key6, now + ms(20));

    // foo3 and foo5 will expire
    sleep(ms(10)).await;

    while res.len() < 4 {
        let entry = assert_ready_some!(poll!(queue));
        res.push(entry.into_inner());
    }

    sleep(ms(10)).await;

    while res.len() < 6 {
        let entry = assert_ready_some!(poll!(queue));
        res.push(entry.into_inner());
    }

    let entry = assert_ready!(poll!(queue));
    assert!(entry.is_none());
}

#[tokio::test(start_paused = true)]
async fn remove_after_compact() {
    let now = Instant::now();
    let mut queue = DelayQueue::new();

    let foo_key = queue.insert_at("foo", now + ms(10));
    queue.insert_at("bar", now + ms(20));
    queue.remove(&foo_key);
    queue.compact();

    let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
        queue.remove(&foo_key);
    }));
    assert!(panic.is_err());
}

#[tokio::test(start_paused = true)]
async fn remove_after_compact_poll() {
    let now = Instant::now();
    let mut queue = task::spawn(DelayQueue::new());

    let foo_key = queue.insert_at("foo", now + ms(10));
    queue.insert_at("bar", now + ms(20));

    sleep(ms(10)).await;
    assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key);

    queue.compact();

    let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
        queue.remove(&foo_key);
    }));
    assert!(panic.is_err());
}

fn ms(n: u64) -> Duration {
    Duration::from_millis(n)
}

[ Dauer der Verarbeitung: 0.15 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge