Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/third_party/rust/futures-channel/benches/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 3 kB image not shown  

Quelle  sync_mpsc.rs   Sprache: unbekannt

 
#![feature(test)]

extern crate test;
use crate::test::Bencher;

use {
    futures::{
        channel::mpsc::{self, Sender, UnboundedSender},
        ready,
        sink::Sink,
        stream::{Stream, StreamExt},
        task::{Context, Poll},
    },
    futures_test::task::noop_context,
    std::pin::Pin,
};

/// Single producer, single consumer
#[bench]
fn unbounded_1_tx(b: &mut Bencher) {
    let mut cx = noop_context();
    b.iter(|| {
        let (tx, mut rx) = mpsc::unbounded();

        // 1000 iterations to avoid measuring overhead of initialization
        // Result should be divided by 1000
        for i in 0..1000 {
            // Poll, not ready, park
            assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

            UnboundedSender::unbounded_send(&tx, i).unwrap();

            // Now poll ready
            assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
        }
    })
}

/// 100 producers, single consumer
#[bench]
fn unbounded_100_tx(b: &mut Bencher) {
    let mut cx = noop_context();
    b.iter(|| {
        let (tx, mut rx) = mpsc::unbounded();

        let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();

        // 1000 send/recv operations total, result should be divided by 1000
        for _ in 0..10 {
            for (i, x) in tx.iter().enumerate() {
                assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));

                UnboundedSender::unbounded_send(x, i).unwrap();

                assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
            }
        }
    })
}

#[bench]
fn unbounded_uncontended(b: &mut Bencher) {
    let mut cx = noop_context();
    b.iter(|| {
        let (tx, mut rx) = mpsc::unbounded();

        for i in 0..1000 {
            UnboundedSender::unbounded_send(&tx, i).expect("send");
            // No need to create a task, because poll is not going to park.
            assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
        }
    })
}

/// A Stream that continuously sends incrementing number of the queue
struct TestSender {
    tx: Sender<u32>,
    last: u32, // Last number sent
}

// Could be a Future, it doesn't matter
impl Stream for TestSender {
    type Item = u32;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = &mut *self;
        let mut tx = Pin::new(&mut this.tx);

        ready!(tx.as_mut().poll_ready(cx)).unwrap();
        tx.as_mut().start_send(this.last + 1).unwrap();
        this.last += 1;
        assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
        Poll::Ready(Some(this.last))
    }
}

/// Single producers, single consumer
#[bench]
fn bounded_1_tx(b: &mut Bencher) {
    let mut cx = noop_context();
    b.iter(|| {
        let (tx, mut rx) = mpsc::channel(0);

        let mut tx = TestSender { tx, last: 0 };

        for i in 0..1000 {
            assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
            assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
            assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
        }
    })
}

/// 100 producers, single consumer
#[bench]
fn bounded_100_tx(b: &mut Bencher) {
    let mut cx = noop_context();
    b.iter(|| {
        // Each sender can send one item after specified capacity
        let (tx, mut rx) = mpsc::channel(0);

        let mut tx: Vec<_> = (0..100).map(|_| TestSender { tx: tx.clone(), last: 0 }).collect();

        for i in 0..10 {
            for x in &mut tx {
                // Send an item
                assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
                // Then block
                assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
                // Recv the item
                assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
            }
        }
    })
}

[ Dauer der Verarbeitung: 0.3 Sekunden  (vorverarbeitet)  ]