Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/tokio/tests/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 7 kB image not shown  

Quelle  sync_watch.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

#![allow(clippy::cognitive_complexity)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]

#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test as test;

use tokio::sync::watch;
use tokio_test::task::spawn;
use tokio_test::{
    assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok,
};

#[test]
fn single_rx_recv() {
    let (tx, mut rx) = watch::channel("one");

    {
        // Not initially notified
        let mut t = spawn(rx.changed());
        assert_pending!(t.poll());
    }
    assert_eq!(*rx.borrow(), "one");

    {
        let mut t = spawn(rx.changed());
        assert_pending!(t.poll());

        tx.send("two").unwrap();

        assert!(t.is_woken());

        assert_ready_ok!(t.poll());
    }
    assert_eq!(*rx.borrow(), "two");

    {
        let mut t = spawn(rx.changed());
        assert_pending!(t.poll());

        drop(tx);

        assert!(t.is_woken());
        assert_ready_err!(t.poll());
    }
    assert_eq!(*rx.borrow(), "two");
}

#[test]
fn rx_version_underflow() {
    let (_tx, mut rx) = watch::channel("one");

    // Version starts at 2, validate we do not underflow
    rx.mark_changed();
    rx.mark_changed();
}

#[test]
fn rx_mark_changed() {
    let (tx, mut rx) = watch::channel("one");

    let mut rx2 = rx.clone();
    let mut rx3 = rx.clone();
    let mut rx4 = rx.clone();
    {
        rx.mark_changed();
        assert!(rx.has_changed().unwrap());

        let mut t = spawn(rx.changed());
        assert_ready_ok!(t.poll());
    }

    {
        assert!(!rx2.has_changed().unwrap());

        let mut t = spawn(rx2.changed());
        assert_pending!(t.poll());
    }

    {
        rx3.mark_changed();
        assert_eq!(*rx3.borrow(), "one");

        assert!(rx3.has_changed().unwrap());

        assert_eq!(*rx3.borrow_and_update(), "one");

        assert!(!rx3.has_changed().unwrap());

        let mut t = spawn(rx3.changed());
        assert_pending!(t.poll());
    }

    {
        tx.send("two").unwrap();
        assert!(rx4.has_changed().unwrap());
        assert_eq!(*rx4.borrow_and_update(), "two");

        rx4.mark_changed();
        assert!(rx4.has_changed().unwrap());
        assert_eq!(*rx4.borrow_and_update(), "two")
    }

    assert_eq!(*rx.borrow(), "two");
}

#[test]
fn rx_mark_unchanged() {
    let (tx, mut rx) = watch::channel("one");

    let mut rx2 = rx.clone();

    {
        assert!(!rx.has_changed().unwrap());

        rx.mark_changed();
        assert!(rx.has_changed().unwrap());

        rx.mark_unchanged();
        assert!(!rx.has_changed().unwrap());

        let mut t = spawn(rx.changed());
        assert_pending!(t.poll());
    }

    {
        assert!(!rx2.has_changed().unwrap());

        tx.send("two").unwrap();
        assert!(rx2.has_changed().unwrap());

        rx2.mark_unchanged();
        assert!(!rx2.has_changed().unwrap());
        assert_eq!(*rx2.borrow_and_update(), "two");
    }

    assert_eq!(*rx.borrow(), "two");
}

#[test]
fn multi_rx() {
    let (tx, mut rx1) = watch::channel("one");
    let mut rx2 = rx1.clone();

    {
        let mut t1 = spawn(rx1.changed());
        let mut t2 = spawn(rx2.changed());

        assert_pending!(t1.poll());
        assert_pending!(t2.poll());
    }
    assert_eq!(*rx1.borrow(), "one");
    assert_eq!(*rx2.borrow(), "one");

    let mut t2 = spawn(rx2.changed());

    {
        let mut t1 = spawn(rx1.changed());

        assert_pending!(t1.poll());
        assert_pending!(t2.poll());

        tx.send("two").unwrap();

        assert!(t1.is_woken());
        assert!(t2.is_woken());

        assert_ready_ok!(t1.poll());
    }
    assert_eq!(*rx1.borrow(), "two");

    {
        let mut t1 = spawn(rx1.changed());

        assert_pending!(t1.poll());

        tx.send("three").unwrap();

        assert!(t1.is_woken());
        assert!(t2.is_woken());

        assert_ready_ok!(t1.poll());
        assert_ready_ok!(t2.poll());
    }
    assert_eq!(*rx1.borrow(), "three");

    drop(t2);

    assert_eq!(*rx2.borrow(), "three");

    {
        let mut t1 = spawn(rx1.changed());
        let mut t2 = spawn(rx2.changed());

        assert_pending!(t1.poll());
        assert_pending!(t2.poll());

        tx.send("four").unwrap();

        assert_ready_ok!(t1.poll());
        assert_ready_ok!(t2.poll());
    }
    assert_eq!(*rx1.borrow(), "four");
    assert_eq!(*rx2.borrow(), "four");
}

#[test]
fn rx_observes_final_value() {
    // Initial value

    let (tx, mut rx) = watch::channel("one");
    drop(tx);

    {
        let mut t1 = spawn(rx.changed());
        assert_ready_err!(t1.poll());
    }
    assert_eq!(*rx.borrow(), "one");

    // Sending a value

    let (tx, mut rx) = watch::channel("one");

    tx.send("two").unwrap();

    {
        let mut t1 = spawn(rx.changed());
        assert_ready_ok!(t1.poll());
    }
    assert_eq!(*rx.borrow(), "two");

    {
        let mut t1 = spawn(rx.changed());
        assert_pending!(t1.poll());

        tx.send("three").unwrap();
        drop(tx);

        assert!(t1.is_woken());

        assert_ready_ok!(t1.poll());
    }
    assert_eq!(*rx.borrow(), "three");

    {
        let mut t1 = spawn(rx.changed());
        assert_ready_err!(t1.poll());
    }
    assert_eq!(*rx.borrow(), "three");
}

#[test]
fn poll_close() {
    let (tx, rx) = watch::channel("one");

    {
        let mut t = spawn(tx.closed());
        assert_pending!(t.poll());

        drop(rx);

        assert!(t.is_woken());
        assert_ready!(t.poll());
    }

    assert!(tx.send("two").is_err());
}

#[test]
fn borrow_and_update() {
    let (tx, mut rx) = watch::channel("one");

    assert!(!rx.has_changed().unwrap());

    tx.send("two").unwrap();
    assert!(rx.has_changed().unwrap());
    assert_ready!(spawn(rx.changed()).poll()).unwrap();
    assert_pending!(spawn(rx.changed()).poll());
    assert!(!rx.has_changed().unwrap());

    tx.send("three").unwrap();
    assert!(rx.has_changed().unwrap());
    assert_eq!(*rx.borrow_and_update(), "three");
    assert_pending!(spawn(rx.changed()).poll());
    assert!(!rx.has_changed().unwrap());

    drop(tx);
    assert_eq!(*rx.borrow_and_update(), "three");
    assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
    assert!(rx.has_changed().is_err());
}

#[test]
fn reopened_after_subscribe() {
    let (tx, rx) = watch::channel("one");
    assert!(!tx.is_closed());

    drop(rx);
    assert!(tx.is_closed());

    let rx = tx.subscribe();
    assert!(!tx.is_closed());

    drop(rx);
    assert!(tx.is_closed());
}

#[test]
#[cfg(panic = "unwind")]
#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
fn send_modify_panic() {
    let (tx, mut rx) = watch::channel("one");

    tx.send_modify(|old| *old = "two");
    assert_eq!(*rx.borrow_and_update(), "two");

    let mut rx2 = rx.clone();
    assert_eq!(*rx2.borrow_and_update(), "two");

    let mut task = spawn(rx2.changed());

    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
        tx.send_modify(|old| {
            *old = "panicked";
            panic!();
        })
    }));
    assert!(result.is_err());

    assert_pending!(task.poll());
    assert_eq!(*rx.borrow(), "panicked");

    tx.send_modify(|old| *old = "three");
    assert_ready_ok!(task.poll());
    assert_eq!(*rx.borrow_and_update(), "three");
}

#[tokio::test]
async fn multiple_sender() {
    let (tx1, mut rx) = watch::channel(0);
    let tx2 = tx1.clone();

    let mut t = spawn(async {
        rx.changed().await.unwrap();
        let v1 = *rx.borrow_and_update();
        rx.changed().await.unwrap();
        let v2 = *rx.borrow_and_update();
        (v1, v2)
    });

    tx1.send(1).unwrap();
    assert_pending!(t.poll());
    tx2.send(2).unwrap();
    assert_ready_eq!(t.poll(), (1, 2));
}

#[tokio::test]
async fn receiver_is_notified_when_last_sender_is_dropped() {
    let (tx1, mut rx) = watch::channel(0);
    let tx2 = tx1.clone();

    let mut t = spawn(rx.changed());
    assert_pending!(t.poll());

    drop(tx1);
    assert!(!t.is_woken());
    drop(tx2);

    assert!(t.is_woken());
}

[ Dauer der Verarbeitung: 0.27 Sekunden  ]