Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  test.rs   Sprache: unbekannt

 
#![cfg(test)]

use crate::ThreadPoolBuilder;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::{thread, time};

#[test]
fn broadcast_global() {
    let v = crate::broadcast(|ctx| ctx.index());
    assert!(v.into_iter().eq(0..crate::current_num_threads()));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_global() {
    let (tx, rx) = channel();
    crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());

    let mut v: Vec<_> = rx.into_iter().collect();
    v.sort_unstable();
    assert!(v.into_iter().eq(0..crate::current_num_threads()));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_pool() {
    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    let v = pool.broadcast(|ctx| ctx.index());
    assert!(v.into_iter().eq(0..7));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_pool() {
    let (tx, rx) = channel();
    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());

    let mut v: Vec<_> = rx.into_iter().collect();
    v.sort_unstable();
    assert!(v.into_iter().eq(0..7));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_self() {
    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
    assert!(v.into_iter().eq(0..7));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_self() {
    let (tx, rx) = channel();
    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));

    let mut v: Vec<_> = rx.into_iter().collect();
    v.sort_unstable();
    assert!(v.into_iter().eq(0..7));
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual() {
    let count = AtomicUsize::new(0);
    let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
    let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    pool1.install(|| {
        pool2.broadcast(|_| {
            pool1.broadcast(|_| {
                count.fetch_add(1, Ordering::Relaxed);
            })
        })
    });
    assert_eq!(count.into_inner(), 3 * 7);
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual() {
    let (tx, rx) = channel();
    let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
    let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    pool1.spawn({
        let pool1 = Arc::clone(&pool1);
        move || {
            pool2.spawn_broadcast(move |_| {
                let tx = tx.clone();
                pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
            })
        }
    });
    assert_eq!(rx.into_iter().count(), 3 * 7);
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_mutual_sleepy() {
    let count = AtomicUsize::new(0);
    let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
    let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    pool1.install(|| {
        thread::sleep(time::Duration::from_secs(1));
        pool2.broadcast(|_| {
            thread::sleep(time::Duration::from_secs(1));
            pool1.broadcast(|_| {
                thread::sleep(time::Duration::from_millis(100));
                count.fetch_add(1, Ordering::Relaxed);
            })
        })
    });
    assert_eq!(count.into_inner(), 3 * 7);
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_broadcast_mutual_sleepy() {
    let (tx, rx) = channel();
    let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
    let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    pool1.spawn({
        let pool1 = Arc::clone(&pool1);
        move || {
            thread::sleep(time::Duration::from_secs(1));
            pool2.spawn_broadcast(move |_| {
                let tx = tx.clone();
                thread::sleep(time::Duration::from_secs(1));
                pool1.spawn_broadcast(move |_| {
                    thread::sleep(time::Duration::from_millis(100));
                    tx.send(()).unwrap();
                })
            })
        }
    });
    assert_eq!(rx.into_iter().count(), 3 * 7);
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_one() {
    let count = AtomicUsize::new(0);
    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    let result = crate::unwind::halt_unwinding(|| {
        pool.broadcast(|ctx| {
            count.fetch_add(1, Ordering::Relaxed);
            if ctx.index() == 3 {
                panic!("Hello, world!");
            }
        })
    });
    assert_eq!(count.into_inner(), 7);
    assert!(result.is_err(), "broadcast panic should propagate!");
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_one() {
    let (tx, rx) = channel();
    let (panic_tx, panic_rx) = channel();
    let pool = ThreadPoolBuilder::new()
        .num_threads(7)
        .panic_handler(move |e| panic_tx.send(e).unwrap())
        .build()
        .unwrap();
    pool.spawn_broadcast(move |ctx| {
        tx.send(()).unwrap();
        if ctx.index() == 3 {
            panic!("Hello, world!");
        }
    });
    drop(pool); // including panic_tx
    assert_eq!(rx.into_iter().count(), 7);
    assert_eq!(panic_rx.into_iter().count(), 1);
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn broadcast_panic_many() {
    let count = AtomicUsize::new(0);
    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    let result = crate::unwind::halt_unwinding(|| {
        pool.broadcast(|ctx| {
            count.fetch_add(1, Ordering::Relaxed);
            if ctx.index() % 2 == 0 {
                panic!("Hello, world!");
            }
        })
    });
    assert_eq!(count.into_inner(), 7);
    assert!(result.is_err(), "broadcast panic should propagate!");
}

#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn spawn_broadcast_panic_many() {
    let (tx, rx) = channel();
    let (panic_tx, panic_rx) = channel();
    let pool = ThreadPoolBuilder::new()
        .num_threads(7)
        .panic_handler(move |e| panic_tx.send(e).unwrap())
        .build()
        .unwrap();
    pool.spawn_broadcast(move |ctx| {
        tx.send(()).unwrap();
        if ctx.index() % 2 == 0 {
            panic!("Hello, world!");
        }
    });
    drop(pool); // including panic_tx
    assert_eq!(rx.into_iter().count(), 7);
    assert_eq!(panic_rx.into_iter().count(), 4);
}

#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn broadcast_sleep_race() {
    let test_duration = time::Duration::from_secs(1);
    let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
    let start = time::Instant::now();
    while start.elapsed() < test_duration {
        pool.broadcast(|ctx| {
            // A slight spread of sleep duration increases the chance that one
            // of the threads will race in the pool's idle sleep afterward.
            thread::sleep(time::Duration::from_micros(ctx.index() as u64));
        });
    }
}

#[test]
fn broadcast_after_spawn_broadcast() {
    let (tx, rx) = channel();

    // Queue a non-blocking spawn_broadcast.
    crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());

    // This blocking broadcast runs after all prior broadcasts.
    crate::broadcast(|_| {});

    // The spawn_broadcast **must** have run by now on all threads.
    let mut v: Vec<_> = rx.try_iter().collect();
    v.sort_unstable();
    assert!(v.into_iter().eq(0..crate::current_num_threads()));
}

#[test]
fn broadcast_after_spawn() {
    let (tx, rx) = channel();

    // Queue a regular spawn on a thread-local deque.
    crate::registry::in_worker(move |_, _| {
        crate::spawn(move || tx.send(22).unwrap());
    });

    // Broadcast runs after the local deque is empty.
    crate::broadcast(|_| {});

    // The spawn **must** have run by now.
    assert_eq!(22, rx.try_recv().unwrap());
}

[ Dauer der Verarbeitung: 0.11 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge