Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  sync_mpsc.rs   Sprache: unbekannt

 
#![allow(clippy::redundant_clone)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]

#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;

#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
use tokio::test as maybe_tokio_test;

use std::fmt;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio_test::*;

#[cfg(not(target_family = "wasm"))]
mod support {
    pub(crate) mod mpsc_stream;
}

#[allow(unused)]
trait AssertSend: Send {}
impl AssertSend for mpsc::Sender<i32> {}
impl AssertSend for mpsc::Receiver<i32> {}

#[maybe_tokio_test]
async fn send_recv_with_buffer() {
    let (tx, mut rx) = mpsc::channel::<i32>(16);

    // Using poll_ready / try_send
    // let permit assert_ready_ok!(tx.reserve());
    let permit = tx.reserve().await.unwrap();
    permit.send(1);

    // Without poll_ready
    tx.try_send(2).unwrap();

    drop(tx);

    let val = rx.recv().await;
    assert_eq!(val, Some(1));

    let val = rx.recv().await;
    assert_eq!(val, Some(2));

    let val = rx.recv().await;
    assert!(val.is_none());
}

#[tokio::test]
#[cfg(feature = "full")]
async fn reserve_disarm() {
    let (tx, mut rx) = mpsc::channel::<i32>(2);
    let tx1 = tx.clone();
    let tx2 = tx.clone();
    let tx3 = tx.clone();
    let tx4 = tx;

    // We should be able to `poll_ready` two handles without problem
    let permit1 = assert_ok!(tx1.reserve().await);
    let permit2 = assert_ok!(tx2.reserve().await);

    // But a third should not be ready
    let mut r3 = tokio_test::task::spawn(tx3.reserve());
    assert_pending!(r3.poll());

    let mut r4 = tokio_test::task::spawn(tx4.reserve());
    assert_pending!(r4.poll());

    // Using one of the reserved slots should allow a new handle to become ready
    permit1.send(1);

    // We also need to receive for the slot to be free
    assert!(!r3.is_woken());
    rx.recv().await.unwrap();
    // Now there's a free slot!
    assert!(r3.is_woken());
    assert!(!r4.is_woken());

    // Dropping a permit should also open up a slot
    drop(permit2);
    assert!(r4.is_woken());

    let mut r1 = tokio_test::task::spawn(tx1.reserve());
    assert_pending!(r1.poll());
}

#[tokio::test]
#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
async fn send_recv_stream_with_buffer() {
    use tokio_stream::StreamExt;

    let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
    let mut rx = Box::pin(rx);

    tokio::spawn(async move {
        assert_ok!(tx.send(1).await);
        assert_ok!(tx.send(2).await);
    });

    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

#[tokio::test]
#[cfg(feature = "full")]
async fn async_send_recv_with_buffer() {
    let (tx, mut rx) = mpsc::channel(16);

    tokio::spawn(async move {
        assert_ok!(tx.send(1).await);
        assert_ok!(tx.send(2).await);
    });

    assert_eq!(Some(1), rx.recv().await);
    assert_eq!(Some(2), rx.recv().await);
    assert_eq!(None, rx.recv().await);
}

#[tokio::test]
#[cfg(feature = "full")]
async fn async_send_recv_many_with_buffer() {
    let (tx, mut rx) = mpsc::channel(2);
    let mut buffer = Vec::<i32>::with_capacity(3);

    // With `limit=0` does not sleep, returns immediately
    assert_eq!(0, rx.recv_many(&mut buffer, 0).await);

    let handle = tokio::spawn(async move {
        assert_ok!(tx.send(1).await);
        assert_ok!(tx.send(2).await);
        assert_ok!(tx.send(7).await);
        assert_ok!(tx.send(0).await);
    });

    let limit = 3;
    let mut recv_count = 0usize;
    while recv_count < 4 {
        recv_count += rx.recv_many(&mut buffer, limit).await;
        assert_eq!(buffer.len(), recv_count);
    }

    assert_eq!(vec![1, 2, 7, 0], buffer);
    assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
    handle.await.unwrap();
}

#[tokio::test]
#[cfg(feature = "full")]
async fn start_send_past_cap() {
    use std::future::Future;

    let mut t1 = tokio_test::task::spawn(());

    let (tx1, mut rx) = mpsc::channel(1);
    let tx2 = tx1.clone();

    assert_ok!(tx1.try_send(()));

    let mut r1 = Box::pin(tx1.reserve());
    t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));

    {
        let mut r2 = tokio_test::task::spawn(tx2.reserve());
        assert_pending!(r2.poll());

        drop(r1);

        assert!(rx.recv().await.is_some());

        assert!(r2.is_woken());
        assert!(!t1.is_woken());
    }

    drop(tx1);
    drop(tx2);

    assert!(rx.recv().await.is_none());
}

#[test]
#[should_panic]
#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
fn buffer_gteq_one() {
    mpsc::channel::<i32>(0);
}

#[maybe_tokio_test]
async fn send_recv_unbounded() {
    let (tx, mut rx) = mpsc::unbounded_channel::<i32>();

    // Using `try_send`
    assert_ok!(tx.send(1));
    assert_ok!(tx.send(2));

    assert_eq!(rx.recv().await, Some(1));
    assert_eq!(rx.recv().await, Some(2));

    drop(tx);

    assert!(rx.recv().await.is_none());
}

#[maybe_tokio_test]
async fn send_recv_many_unbounded() {
    let (tx, mut rx) = mpsc::unbounded_channel::<i32>();

    let mut buffer: Vec<i32> = Vec::new();

    // With `limit=0` does not sleep, returns immediately
    rx.recv_many(&mut buffer, 0).await;
    assert_eq!(0, buffer.len());

    assert_ok!(tx.send(7));
    assert_ok!(tx.send(13));
    assert_ok!(tx.send(100));
    assert_ok!(tx.send(1002));

    rx.recv_many(&mut buffer, 0).await;
    assert_eq!(0, buffer.len());

    let mut count = 0;
    while count < 4 {
        count += rx.recv_many(&mut buffer, 1).await;
    }
    assert_eq!(count, 4);
    assert_eq!(vec![7, 13, 100, 1002], buffer);
    let final_capacity = buffer.capacity();
    assert!(final_capacity > 0);

    buffer.clear();

    assert_ok!(tx.send(5));
    assert_ok!(tx.send(6));
    assert_ok!(tx.send(7));
    assert_ok!(tx.send(2));

    // Re-use existing capacity
    count = rx.recv_many(&mut buffer, 32).await;

    assert_eq!(final_capacity, buffer.capacity());
    assert_eq!(count, 4);
    assert_eq!(vec![5, 6, 7, 2], buffer);

    drop(tx);

    // recv_many will immediately return zero if the channel
    // is closed and no more messages are waiting
    assert_eq!(0, rx.recv_many(&mut buffer, 4).await);
    assert!(rx.recv().await.is_none());
}

#[tokio::test]
#[cfg(feature = "full")]
async fn send_recv_many_bounded_capacity() {
    let mut buffer: Vec<String> = Vec::with_capacity(9);
    let limit = buffer.capacity();
    let (tx, mut rx) = mpsc::channel(100);

    let mut expected: Vec<String> = (0..limit)
        .map(|x: usize| format!("{x}"))
        .collect::<Vec<_>>();
    for x in expected.clone() {
        tx.send(x).await.unwrap()
    }
    tx.send("one more".to_string()).await.unwrap();

    // Here `recv_many` receives all but the last value;
    // the initial capacity is adequate, so the buffer does
    // not increase in side.
    assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
    assert_eq!(expected, buffer);
    assert_eq!(limit, buffer.capacity());

    // Receive up more values:
    assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
    assert!(buffer.capacity() > limit);
    expected.push("one more".to_string());
    assert_eq!(expected, buffer);

    tokio::spawn(async move {
        tx.send("final".to_string()).await.unwrap();
    });

    // 'tx' is dropped, but `recv_many` is guaranteed not
    // to return 0 as the channel has outstanding permits
    assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
    expected.push("final".to_string());
    assert_eq!(expected, buffer);
    // The channel is now closed and `recv_many` returns 0.
    assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
    assert_eq!(expected, buffer);
}

#[tokio::test]
#[cfg(feature = "full")]
async fn send_recv_many_unbounded_capacity() {
    let mut buffer: Vec<String> = Vec::with_capacity(9); // capacity >= 9
    let limit = buffer.capacity();
    let (tx, mut rx) = mpsc::unbounded_channel();

    let mut expected: Vec<String> = (0..limit)
        .map(|x: usize| format!("{x}"))
        .collect::<Vec<_>>();
    for x in expected.clone() {
        tx.send(x).unwrap()
    }
    tx.send("one more".to_string()).unwrap();

    // Here `recv_many` receives all but the last value;
    // the initial capacity is adequate, so the buffer does
    // not increase in side.
    assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
    assert_eq!(expected, buffer);
    assert_eq!(limit, buffer.capacity());

    // Receive up more values:
    assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
    assert!(buffer.capacity() > limit);
    expected.push("one more".to_string());
    assert_eq!(expected, buffer);

    tokio::spawn(async move {
        tx.send("final".to_string()).unwrap();
    });

    // 'tx' is dropped, but `recv_many` is guaranteed not
    // to return 0 as the channel has outstanding permits
    assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
    expected.push("final".to_string());
    assert_eq!(expected, buffer);
    // The channel is now closed and `recv_many` returns 0.
    assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
    assert_eq!(expected, buffer);
}

#[tokio::test]
#[cfg(feature = "full")]
async fn async_send_recv_unbounded() {
    let (tx, mut rx) = mpsc::unbounded_channel();

    tokio::spawn(async move {
        assert_ok!(tx.send(1));
        assert_ok!(tx.send(2));
    });

    assert_eq!(Some(1), rx.recv().await);
    assert_eq!(Some(2), rx.recv().await);
    assert_eq!(None, rx.recv().await);
}

#[tokio::test]
#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
async fn send_recv_stream_unbounded() {
    use tokio_stream::StreamExt;

    let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();

    let mut rx = Box::pin(rx);

    tokio::spawn(async move {
        assert_ok!(tx.send(1));
        assert_ok!(tx.send(2));
    });

    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

#[maybe_tokio_test]
async fn no_t_bounds_buffer() {
    struct NoImpls;

    let (tx, mut rx) = mpsc::channel(100);

    // sender should be Debug even though T isn't Debug
    is_debug(&tx);
    // same with Receiver
    is_debug(&rx);
    // and sender should be Clone even though T isn't Clone
    assert!(tx.clone().try_send(NoImpls).is_ok());

    assert!(rx.recv().await.is_some());
}

#[maybe_tokio_test]
async fn no_t_bounds_unbounded() {
    struct NoImpls;

    let (tx, mut rx) = mpsc::unbounded_channel();

    // sender should be Debug even though T isn't Debug
    is_debug(&tx);
    // same with Receiver
    is_debug(&rx);
    // and sender should be Clone even though T isn't Clone
    assert!(tx.clone().send(NoImpls).is_ok());

    assert!(rx.recv().await.is_some());
}

#[tokio::test]
#[cfg(feature = "full")]
async fn send_recv_buffer_limited() {
    let (tx, mut rx) = mpsc::channel::<i32>(1);

    // Reserve capacity
    let p1 = assert_ok!(tx.reserve().await);

    // Send first message
    p1.send(1);

    // Not ready
    let mut p2 = tokio_test::task::spawn(tx.reserve());
    assert_pending!(p2.poll());

    // Take the value
    assert!(rx.recv().await.is_some());

    // Notified
    assert!(p2.is_woken());

    // Trying to send fails
    assert_err!(tx.try_send(1337));

    // Send second
    let permit = assert_ready_ok!(p2.poll());
    permit.send(2);

    assert!(rx.recv().await.is_some());
}

#[maybe_tokio_test]
async fn recv_close_gets_none_idle() {
    let (tx, mut rx) = mpsc::channel::<i32>(10);

    rx.close();

    assert!(rx.recv().await.is_none());

    assert_err!(tx.send(1).await);
}

#[tokio::test]
#[cfg(feature = "full")]
async fn recv_close_gets_none_reserved() {
    let (tx1, mut rx) = mpsc::channel::<i32>(1);
    let tx2 = tx1.clone();

    let permit1 = assert_ok!(tx1.reserve().await);
    let mut permit2 = tokio_test::task::spawn(tx2.reserve());
    assert_pending!(permit2.poll());

    rx.close();

    assert!(permit2.is_woken());
    assert_ready_err!(permit2.poll());

    {
        let mut recv = tokio_test::task::spawn(rx.recv());
        assert_pending!(recv.poll());

        permit1.send(123);
        assert!(recv.is_woken());

        let v = assert_ready!(recv.poll());
        assert_eq!(v, Some(123));
    }

    assert!(rx.recv().await.is_none());
}

#[maybe_tokio_test]
async fn tx_close_gets_none() {
    let (_, mut rx) = mpsc::channel::<i32>(10);
    assert!(rx.recv().await.is_none());
}

#[maybe_tokio_test]
async fn try_send_fail() {
    let (tx, mut rx) = mpsc::channel(1);

    tx.try_send("hello").unwrap();

    // This should fail
    match assert_err!(tx.try_send("fail")) {
        TrySendError::Full(..) => {}
        _ => panic!(),
    }

    assert_eq!(rx.recv().await, Some("hello"));

    assert_ok!(tx.try_send("goodbye"));
    drop(tx);

    assert_eq!(rx.recv().await, Some("goodbye"));
    assert!(rx.recv().await.is_none());
}

#[maybe_tokio_test]
async fn try_send_fail_with_try_recv() {
    let (tx, mut rx) = mpsc::channel(1);

    tx.try_send("hello").unwrap();

    // This should fail
    match assert_err!(tx.try_send("fail")) {
        TrySendError::Full(..) => {}
        _ => panic!(),
    }

    assert_eq!(rx.try_recv(), Ok("hello"));

    assert_ok!(tx.try_send("goodbye"));
    drop(tx);

    assert_eq!(rx.try_recv(), Ok("goodbye"));
    assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
}

#[maybe_tokio_test]
async fn reserve_many_above_cap() {
    const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS;
    let (tx, _rx) = mpsc::channel::<()>(1);

    assert_err!(tx.reserve_many(2).await);
    assert_err!(tx.reserve_many(MAX_PERMITS + 1).await);
    assert_err!(tx.reserve_many(usize::MAX).await);
}

#[test]
fn try_reserve_many_zero() {
    let (tx, rx) = mpsc::channel::<()>(1);

    // Succeeds when not closed.
    assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none());

    // Even when channel is full.
    tx.try_send(()).unwrap();
    assert!(assert_ok!(tx.try_reserve_many(0)).next().is_none());

    drop(rx);

    // Closed error when closed.
    assert_eq!(
        assert_err!(tx.try_reserve_many(0)),
        TrySendError::Closed(())
    );
}

#[maybe_tokio_test]
async fn reserve_many_zero() {
    let (tx, rx) = mpsc::channel::<()>(1);

    // Succeeds when not closed.
    assert!(assert_ok!(tx.reserve_many(0).await).next().is_none());

    // Even when channel is full.
    tx.send(()).await.unwrap();
    assert!(assert_ok!(tx.reserve_many(0).await).next().is_none());

    drop(rx);

    // Closed error when closed.
    assert_err!(tx.reserve_many(0).await);
}

#[maybe_tokio_test]
async fn try_reserve_many_edge_cases() {
    const MAX_PERMITS: usize = tokio::sync::Semaphore::MAX_PERMITS;

    let (tx, rx) = mpsc::channel::<()>(1);

    let mut permit = assert_ok!(tx.try_reserve_many(0));
    assert!(permit.next().is_none());

    let permit = tx.try_reserve_many(MAX_PERMITS + 1);
    match assert_err!(permit) {
        TrySendError::Full(..) => {}
        _ => panic!(),
    }

    let permit = tx.try_reserve_many(usize::MAX);
    match assert_err!(permit) {
        TrySendError::Full(..) => {}
        _ => panic!(),
    }

    // Dropping the receiver should close the channel
    drop(rx);
    assert_err!(tx.reserve_many(0).await);
}

#[maybe_tokio_test]
async fn try_reserve_fails() {
    let (tx, mut rx) = mpsc::channel(1);

    let permit = tx.try_reserve().unwrap();

    // This should fail
    match assert_err!(tx.try_reserve()) {
        TrySendError::Full(()) => {}
        _ => panic!(),
    }

    permit.send("foo");

    assert_eq!(rx.recv().await, Some("foo"));

    // Dropping permit releases the slot.
    let permit = tx.try_reserve().unwrap();
    drop(permit);

    let _permit = tx.try_reserve().unwrap();
}

#[maybe_tokio_test]
async fn reserve_many_and_send() {
    let (tx, mut rx) = mpsc::channel(100);
    for i in 0..100 {
        for permit in assert_ok!(tx.reserve_many(i).await) {
            permit.send("foo");
            assert_eq!(rx.recv().await, Some("foo"));
        }
        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
    }
}
#[maybe_tokio_test]
async fn try_reserve_many_and_send() {
    let (tx, mut rx) = mpsc::channel(100);
    for i in 0..100 {
        for permit in assert_ok!(tx.try_reserve_many(i)) {
            permit.send("foo");
            assert_eq!(rx.recv().await, Some("foo"));
        }
        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
    }
}

#[maybe_tokio_test]
async fn reserve_many_on_closed_channel() {
    let (tx, rx) = mpsc::channel::<()>(100);
    drop(rx);
    assert_err!(tx.reserve_many(10).await);
}

#[maybe_tokio_test]
async fn try_reserve_many_on_closed_channel() {
    let (tx, rx) = mpsc::channel::<usize>(100);
    drop(rx);
    match assert_err!(tx.try_reserve_many(10)) {
        TrySendError::Closed(()) => {}
        _ => panic!(),
    };
}

#[maybe_tokio_test]
async fn try_reserve_many_full() {
    // Reserve n capacity and send k messages
    for n in 1..100 {
        for k in 0..n {
            let (tx, mut rx) = mpsc::channel::<usize>(n);
            let permits = assert_ok!(tx.try_reserve_many(n));

            assert_eq!(permits.len(), n);
            assert_eq!(tx.capacity(), 0);

            match assert_err!(tx.try_reserve_many(1)) {
                TrySendError::Full(..) => {}
                _ => panic!(),
            };

            for permit in permits.take(k) {
                permit.send(0);
            }
            // We only used k permits on the n reserved
            assert_eq!(tx.capacity(), n - k);

            // We can reserve more permits
            assert_ok!(tx.try_reserve_many(1));

            // But not more than the current capacity
            match assert_err!(tx.try_reserve_many(n - k + 1)) {
                TrySendError::Full(..) => {}
                _ => panic!(),
            };

            for _i in 0..k {
                assert_eq!(rx.recv().await, Some(0));
            }

            // Now that we've received everything, capacity should be back to n
            assert_eq!(tx.capacity(), n);
        }
    }
}

#[tokio::test]
#[cfg(feature = "full")]
async fn drop_permit_releases_permit() {
    // poll_ready reserves capacity, ensure that the capacity is released if tx
    // is dropped w/o sending a value.
    let (tx1, _rx) = mpsc::channel::<i32>(1);
    let tx2 = tx1.clone();

    let permit = assert_ok!(tx1.reserve().await);

    let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
    assert_pending!(reserve2.poll());

    drop(permit);

    assert!(reserve2.is_woken());
    assert_ready_ok!(reserve2.poll());
}

#[maybe_tokio_test]
async fn drop_permit_iterator_releases_permits() {
    // poll_ready reserves capacity, ensure that the capacity is released if tx
    // is dropped w/o sending a value.
    for n in 1..100 {
        let (tx1, _rx) = mpsc::channel::<i32>(n);
        let tx2 = tx1.clone();

        let permits = assert_ok!(tx1.reserve_many(n).await);

        let mut reserve2 = tokio_test::task::spawn(tx2.reserve_many(n));
        assert_pending!(reserve2.poll());

        drop(permits);

        assert!(reserve2.is_woken());

        let permits = assert_ready_ok!(reserve2.poll());
        drop(permits);

        assert_eq!(tx1.capacity(), n);
    }
}

#[maybe_tokio_test]
async fn dropping_rx_closes_channel() {
    let (tx, rx) = mpsc::channel(100);

    let msg = Arc::new(());
    assert_ok!(tx.try_send(msg.clone()));

    drop(rx);
    assert_err!(tx.reserve().await);
    assert_err!(tx.reserve_many(10).await);
    assert_eq!(1, Arc::strong_count(&msg));
}

#[test]
fn dropping_rx_closes_channel_for_try() {
    let (tx, rx) = mpsc::channel(100);

    let msg = Arc::new(());
    tx.try_send(msg.clone()).unwrap();

    drop(rx);

    assert!(matches!(
        tx.try_send(msg.clone()),
        Err(TrySendError::Closed(_))
    ));
    assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
    assert!(matches!(
        tx.try_reserve_owned(),
        Err(TrySendError::Closed(_))
    ));

    assert_eq!(1, Arc::strong_count(&msg));
}

#[test]
fn unconsumed_messages_are_dropped() {
    let msg = Arc::new(());

    let (tx, rx) = mpsc::channel(100);

    tx.try_send(msg.clone()).unwrap();

    assert_eq!(2, Arc::strong_count(&msg));

    drop((tx, rx));

    assert_eq!(1, Arc::strong_count(&msg));
}

#[test]
#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
fn blocking_recv() {
    let (tx, mut rx) = mpsc::channel::<u8>(1);

    let sync_code = std::thread::spawn(move || {
        assert_eq!(Some(10), rx.blocking_recv());
    });

    tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async move {
            let _ = tx.send(10).await;
        });
    sync_code.join().unwrap()
}

#[tokio::test]
#[should_panic]
#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
async fn blocking_recv_async() {
    let (_tx, mut rx) = mpsc::channel::<()>(1);
    let _ = rx.blocking_recv();
}

#[test]
#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
fn blocking_send() {
    let (tx, mut rx) = mpsc::channel::<u8>(1);

    let sync_code = std::thread::spawn(move || {
        tx.blocking_send(10).unwrap();
    });

    tokio::runtime::Runtime::new()
        .unwrap()
        .block_on(async move {
            assert_eq!(Some(10), rx.recv().await);
        });
    sync_code.join().unwrap()
}

#[tokio::test]
#[should_panic]
#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
async fn blocking_send_async() {
    let (tx, _rx) = mpsc::channel::<()>(1);
    let _ = tx.blocking_send(());
}

#[tokio::test]
#[cfg(feature = "full")]
async fn ready_close_cancel_bounded() {
    let (tx, mut rx) = mpsc::channel::<()>(100);
    let _tx2 = tx.clone();

    let permit = assert_ok!(tx.reserve().await);

    rx.close();

    let mut recv = tokio_test::task::spawn(rx.recv());
    assert_pending!(recv.poll());

    drop(permit);

    assert!(recv.is_woken());
    let val = assert_ready!(recv.poll());
    assert!(val.is_none());
}

#[tokio::test]
#[cfg(feature = "full")]
async fn permit_available_not_acquired_close() {
    let (tx1, mut rx) = mpsc::channel::<()>(1);
    let tx2 = tx1.clone();

    let permit1 = assert_ok!(tx1.reserve().await);

    let mut permit2 = tokio_test::task::spawn(tx2.reserve());
    assert_pending!(permit2.poll());

    rx.close();

    drop(permit1);
    assert!(permit2.is_woken());

    drop(permit2);
    assert!(rx.recv().await.is_none());
}

#[test]
fn try_recv_bounded() {
    let (tx, mut rx) = mpsc::channel(5);

    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    assert!(tx.try_send("hello").is_err());

    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Err(TryRecvError::Empty), rx.try_recv());

    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    assert_eq!(Ok("hello"), rx.try_recv());
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    assert!(tx.try_send("hello").is_err());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Err(TryRecvError::Empty), rx.try_recv());

    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    tx.try_send("hello").unwrap();
    drop(tx);
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Ok("hello"), rx.try_recv());
    assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
}

#[test]
fn try_recv_unbounded() {
    for num in 0..100 {
        let (tx, mut rx) = mpsc::unbounded_channel();

        for i in 0..num {
            tx.send(i).unwrap();
        }

        for i in 0..num {
            assert_eq!(rx.try_recv(), Ok(i));
        }

        assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
        drop(tx);
        assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
    }
}

#[test]
fn try_recv_close_while_empty_bounded() {
    let (tx, mut rx) = mpsc::channel::<()>(5);

    assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
    drop(tx);
    assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
}

#[test]
fn try_recv_close_while_empty_unbounded() {
    let (tx, mut rx) = mpsc::unbounded_channel::<()>();

    assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
    drop(tx);
    assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
}

#[tokio::test(start_paused = true)]
#[cfg(feature = "full")]
async fn recv_timeout() {
    use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
    use tokio::time::Duration;

    let (tx, rx) = mpsc::channel(5);

    assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
    assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
    assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
    assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
    assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
    assert_eq!(
        tx.send_timeout(60, Duration::from_secs(1)).await,
        Err(Timeout(60))
    );

    drop(rx);
    assert_eq!(
        tx.send_timeout(70, Duration::from_secs(1)).await,
        Err(Closed(70))
    );
}

#[test]
#[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
fn recv_timeout_panic() {
    use futures::future::FutureExt;
    use tokio::time::Duration;

    let (tx, _rx) = mpsc::channel(5);
    tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
}

// Tests that channel `capacity` changes and `max_capacity` stays the same
#[tokio::test]
async fn test_tx_capacity() {
    let (tx, _rx) = mpsc::channel::<()>(10);
    // both capacities are same before
    assert_eq!(tx.capacity(), 10);
    assert_eq!(tx.max_capacity(), 10);

    let _permit = tx.reserve().await.unwrap();
    // after reserve, only capacity should drop by one
    assert_eq!(tx.capacity(), 9);
    assert_eq!(tx.max_capacity(), 10);

    tx.send(()).await.unwrap();
    // after send, capacity should drop by one again
    assert_eq!(tx.capacity(), 8);
    assert_eq!(tx.max_capacity(), 10);
}

#[tokio::test]
async fn test_rx_is_closed_when_calling_close_with_sender() {
    // is_closed should return true after calling close but still has a sender
    let (_tx, mut rx) = mpsc::channel::<()>(10);
    rx.close();

    assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_closed_when_dropping_all_senders() {
    // is_closed should return true after dropping all senders
    let (tx, rx) = mpsc::channel::<()>(10);
    let another_tx = tx.clone();
    let task = tokio::spawn(async move {
        drop(another_tx);
    });

    drop(tx);
    let _ = task.await;

    assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_not_closed_when_there_are_senders() {
    // is_closed should return false when there is a sender
    let (_tx, rx) = mpsc::channel::<()>(10);
    assert!(!rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_not_closed_when_there_are_senders_and_buffer_filled() {
    // is_closed should return false when there is a sender, even if enough messages have been sent to fill the channel
    let (tx, rx) = mpsc::channel(10);
    for i in 0..10 {
        assert!(tx.send(i).await.is_ok());
    }
    assert!(!rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_closed_when_there_are_no_senders_and_there_are_messages() {
    // is_closed should return true when there are messages in the buffer, but no senders
    let (tx, rx) = mpsc::channel(10);
    for i in 0..10 {
        assert!(tx.send(i).await.is_ok());
    }
    drop(tx);
    assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_closed_when_there_are_messages_and_close_is_called() {
    // is_closed should return true when there are messages in the buffer, and close is called
    let (tx, mut rx) = mpsc::channel(10);
    for i in 0..10 {
        assert!(tx.send(i).await.is_ok());
    }
    rx.close();
    assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_not_closed_when_there_are_permits_but_not_senders() {
    // is_closed should return false when there is a permit (but no senders)
    let (tx, rx) = mpsc::channel::<()>(10);
    let _permit = tx.reserve_owned().await.expect("Failed to reserve permit");
    assert!(!rx.is_closed());
}

#[tokio::test]
async fn test_rx_is_empty_when_no_messages_were_sent() {
    let (_tx, rx) = mpsc::channel::<()>(10);
    assert!(rx.is_empty())
}

#[tokio::test]
async fn test_rx_is_not_empty_when_there_are_messages_in_the_buffer() {
    let (tx, rx) = mpsc::channel::<()>(10);
    assert!(tx.send(()).await.is_ok());
    assert!(!rx.is_empty())
}

#[tokio::test]
async fn test_rx_is_not_empty_when_the_buffer_is_full() {
    let (tx, rx) = mpsc::channel(10);
    for i in 0..10 {
        assert!(tx.send(i).await.is_ok());
    }
    assert!(!rx.is_empty())
}

#[tokio::test]
async fn test_rx_is_not_empty_when_all_but_one_messages_are_consumed() {
    let (tx, mut rx) = mpsc::channel(10);
    for i in 0..10 {
        assert!(tx.send(i).await.is_ok());
    }

    for _ in 0..9 {
        assert!(rx.recv().await.is_some());
    }

    assert!(!rx.is_empty())
}

#[tokio::test]
async fn test_rx_is_empty_when_all_messages_are_consumed() {
    let (tx, mut rx) = mpsc::channel(10);
    for i in 0..10 {
        assert!(tx.send(i).await.is_ok());
    }
    while rx.try_recv().is_ok() {}
    assert!(rx.is_empty())
}

#[tokio::test]
async fn test_rx_is_empty_all_senders_are_dropped_and_messages_consumed() {
    let (tx, mut rx) = mpsc::channel(10);
    for i in 0..10 {
        assert!(tx.send(i).await.is_ok());
    }
    drop(tx);

    for _ in 0..10 {
        assert!(rx.recv().await.is_some());
    }

    assert!(rx.is_empty())
}

#[tokio::test]
async fn test_rx_len_on_empty_channel() {
    let (_tx, rx) = mpsc::channel::<()>(100);
    assert_eq!(rx.len(), 0);
}

#[tokio::test]
async fn test_rx_len_on_empty_channel_without_senders() {
    // when all senders are dropped, a "closed" value is added to the end of the linked list.
    // here we test that the "closed" value does not change the len of the channel.

    let (tx, rx) = mpsc::channel::<()>(100);
    drop(tx);
    assert_eq!(rx.len(), 0);
}

#[tokio::test]
async fn test_rx_len_on_filled_channel() {
    let (tx, rx) = mpsc::channel(100);

    for i in 0..100 {
        assert!(tx.send(i).await.is_ok());
    }
    assert_eq!(rx.len(), 100);
}

#[tokio::test]
async fn test_rx_len_on_filled_channel_without_senders() {
    let (tx, rx) = mpsc::channel(100);

    for i in 0..100 {
        assert!(tx.send(i).await.is_ok());
    }
    drop(tx);
    assert_eq!(rx.len(), 100);
}

#[tokio::test]
async fn test_rx_len_when_consuming_all_messages() {
    let (tx, mut rx) = mpsc::channel(100);

    for i in 0..100 {
        assert!(tx.send(i).await.is_ok());
        assert_eq!(rx.len(), i + 1);
    }

    drop(tx);

    for i in (0..100).rev() {
        assert!(rx.recv().await.is_some());
        assert_eq!(rx.len(), i);
    }
}

#[tokio::test]
async fn test_rx_len_when_close_is_called() {
    let (tx, mut rx) = mpsc::channel(100);
    tx.send(()).await.unwrap();
    rx.close();

    assert_eq!(rx.len(), 1);
}

#[tokio::test]
async fn test_rx_len_when_close_is_called_before_dropping_sender() {
    let (tx, mut rx) = mpsc::channel(100);
    tx.send(()).await.unwrap();
    rx.close();
    drop(tx);

    assert_eq!(rx.len(), 1);
}

#[tokio::test]
async fn test_rx_len_when_close_is_called_after_dropping_sender() {
    let (tx, mut rx) = mpsc::channel(100);
    tx.send(()).await.unwrap();
    drop(tx);
    rx.close();

    assert_eq!(rx.len(), 1);
}

#[tokio::test]
async fn test_rx_unbounded_is_closed_when_calling_close_with_sender() {
    // is_closed should return true after calling close but still has a sender
    let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
    rx.close();

    assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_unbounded_is_closed_when_dropping_all_senders() {
    // is_closed should return true after dropping all senders
    let (tx, rx) = mpsc::unbounded_channel::<()>();
    let another_tx = tx.clone();
    let task = tokio::spawn(async move {
        drop(another_tx);
    });

    drop(tx);
    let _ = task.await;

    assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_unbounded_is_not_closed_when_there_are_senders() {
    // is_closed should return false when there is a sender
    let (_tx, rx) = mpsc::unbounded_channel::<()>();
    assert!(!rx.is_closed());
}

#[tokio::test]
async fn test_rx_unbounded_is_closed_when_there_are_no_senders_and_there_are_messages() {
    // is_closed should return true when there are messages in the buffer, but no senders
    let (tx, rx) = mpsc::unbounded_channel();
    for i in 0..10 {
        assert!(tx.send(i).is_ok());
    }
    drop(tx);
    assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called() {
    // is_closed should return true when there are messages in the buffer, and close is called
    let (tx, mut rx) = mpsc::unbounded_channel();
    for i in 0..10 {
        assert!(tx.send(i).is_ok());
    }
    rx.close();
    assert!(rx.is_closed());
}

#[tokio::test]
async fn test_rx_unbounded_is_empty_when_no_messages_were_sent() {
    let (_tx, rx) = mpsc::unbounded_channel::<()>();
    assert!(rx.is_empty())
}

#[tokio::test]
async fn test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer() {
    let (tx, rx) = mpsc::unbounded_channel();
    assert!(tx.send(()).is_ok());
    assert!(!rx.is_empty())
}

#[tokio::test]
async fn test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed() {
    let (tx, mut rx) = mpsc::unbounded_channel();
    for i in 0..10 {
        assert!(tx.send(i).is_ok());
    }

    for _ in 0..9 {
        assert!(rx.recv().await.is_some());
    }

    assert!(!rx.is_empty())
}

#[tokio::test]
async fn test_rx_unbounded_is_empty_when_all_messages_are_consumed() {
    let (tx, mut rx) = mpsc::unbounded_channel();
    for i in 0..10 {
        assert!(tx.send(i).is_ok());
    }
    while rx.try_recv().is_ok() {}
    assert!(rx.is_empty())
}

#[tokio::test]
async fn test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed() {
    let (tx, mut rx) = mpsc::unbounded_channel();
    for i in 0..10 {
        assert!(tx.send(i).is_ok());
    }
    drop(tx);

    for _ in 0..10 {
        assert!(rx.recv().await.is_some());
    }

    assert!(rx.is_empty())
}

#[tokio::test]
async fn test_rx_unbounded_len_on_empty_channel() {
    let (_tx, rx) = mpsc::unbounded_channel::<()>();
    assert_eq!(rx.len(), 0);
}

#[tokio::test]
async fn test_rx_unbounded_len_on_empty_channel_without_senders() {
    // when all senders are dropped, a "closed" value is added to the end of the linked list.
    // here we test that the "closed" value does not change the len of the channel.

    let (tx, rx) = mpsc::unbounded_channel::<()>();
    drop(tx);
    assert_eq!(rx.len(), 0);
}

#[tokio::test]
async fn test_rx_unbounded_len_with_multiple_messages() {
    let (tx, rx) = mpsc::unbounded_channel();

    for i in 0..100 {
        assert!(tx.send(i).is_ok());
    }
    assert_eq!(rx.len(), 100);
}

#[tokio::test]
async fn test_rx_unbounded_len_with_multiple_messages_and_dropped_senders() {
    let (tx, rx) = mpsc::unbounded_channel();

    for i in 0..100 {
        assert!(tx.send(i).is_ok());
    }
    drop(tx);
    assert_eq!(rx.len(), 100);
}

#[tokio::test]
async fn test_rx_unbounded_len_when_consuming_all_messages() {
    let (tx, mut rx) = mpsc::unbounded_channel();

    for i in 0..100 {
        assert!(tx.send(i).is_ok());
        assert_eq!(rx.len(), i + 1);
    }

    drop(tx);

    for i in (0..100).rev() {
        assert!(rx.recv().await.is_some());
        assert_eq!(rx.len(), i);
    }
}

#[tokio::test]
async fn test_rx_unbounded_len_when_close_is_called() {
    let (tx, mut rx) = mpsc::unbounded_channel();
    tx.send(()).unwrap();
    rx.close();

    assert_eq!(rx.len(), 1);
}

#[tokio::test]
async fn test_rx_unbounded_len_when_close_is_called_before_dropping_sender() {
    let (tx, mut rx) = mpsc::unbounded_channel();
    tx.send(()).unwrap();
    rx.close();
    drop(tx);

    assert_eq!(rx.len(), 1);
}

#[tokio::test]
async fn test_rx_unbounded_len_when_close_is_called_after_dropping_sender() {
    let (tx, mut rx) = mpsc::unbounded_channel();
    tx.send(()).unwrap();
    drop(tx);
    rx.close();

    assert_eq!(rx.len(), 1);
}

// Regression test for https://github.com/tokio-rs/tokio/issues/6602
#[tokio::test]
async fn test_is_empty_32_msgs() {
    let (sender, mut receiver) = mpsc::channel(33);

    for value in 1..257 {
        sender.send(value).await.unwrap();
        receiver.recv().await.unwrap();
        assert!(receiver.is_empty(), "{value}. len: {}", receiver.len());
    }
}

fn is_debug<T: fmt::Debug>(_: &T) {}

[ Dauer der Verarbeitung: 0.47 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge