Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  lib.rs   Sprache: unbekannt

 
use crossbeam_utils::thread;
use std::any::Any;
use std::env;
use std::io;

/// Represents the types of errors that may occur while using build-parallel.
#[derive(Debug)]
pub enum Error<E> {
    /// Error occurred while internally performing I/O.
    IOError(io::Error),
    /// Error occurred during build callback.
    BuildError(E),
    /// Panic occurred during build callback.
    BuildPanic(Box<dyn Any + Send + 'static>),
}

fn compile_object<T, R, E, F>(f: F, obj: &T) -> Result<R, Error<E>>
where
    T: 'static + Sync,
    R: 'static + Sync + Send,
    E: 'static + Sync + Send,
    F: Fn(&T) -> Result<R, E> + Sync + Send,
{
    f(obj).map_err(Error::BuildError)
}

pub fn compile_objects<T, R, E, F>(f: &F, objs: &[T]) -> Result<Vec<R>, Error<E>>
where
    T: 'static + Sync,
    R: 'static + Sync + Send,
    E: 'static + Sync + Send,
    F: Fn(&T) -> Result<R, E> + Sync + Send,
{
    use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
    use std::sync::Once;

    // Limit our parallelism globally with a jobserver. Start off by
    // releasing our own token for this process so we can have a bit of an
    // easier to write loop below. If this fails, though, then we're likely
    // on Windows with the main implicit token, so we just have a bit extra
    // parallelism for a bit and don't reacquire later.
    let server = jobserver();
    let reacquire = server.release_raw().is_ok();

    let res = thread::scope(|s| {
        // When compiling objects in parallel we do a few dirty tricks to speed
        // things up:
        //
        // * First is that we use the `jobserver` crate to limit the parallelism
        //   of this build script. The `jobserver` crate will use a jobserver
        //   configured by Cargo for build scripts to ensure that parallelism is
        //   coordinated across C compilations and Rust compilations. Before we
        //   compile anything we make sure to wait until we acquire a token.
        //
        //   Note that this jobserver is cached globally so we only used one per
        //   process and only worry about creating it once.
        //
        // * Next we use a raw `thread::spawn` per thread to actually compile
        //   objects in parallel. We only actually spawn a thread after we've
        //   acquired a token to perform some work
        //
        // * Finally though we want to keep the dependencies of this crate
        //   pretty light, so we avoid using a safe abstraction like `rayon` and
        //   instead rely on some bits of `unsafe` code. We know that this stack
        //   frame persists while everything is compiling so we use all the
        //   stack-allocated objects without cloning/reallocating. We use a
        //   transmute to `State` with a `'static` lifetime to persist
        //   everything we need across the boundary, and the join-on-drop
        //   semantics of `JoinOnDrop` should ensure that our stack frame is
        //   alive while threads are alive.
        //
        // With all that in mind we compile all objects in a loop here, after we
        // acquire the appropriate tokens, Once all objects have been compiled
        // we join on all the threads and propagate the results of compilation.
        //
        // Note that as a slight optimization we try to break out as soon as
        // possible as soon as any compilation fails to ensure that errors get
        // out to the user as fast as possible.
        let error = AtomicBool::new(false);
        let mut handles = Vec::new();
        for obj in objs {
            if error.load(SeqCst) {
                break;
            }
            let token = server.acquire().map_err(Error::IOError)?;
            let state = State { obj, error: &error };
            let state = unsafe { std::mem::transmute::<State<T>, State<'static, T>>(state) };
            handles.push(s.spawn(|_| {
                let state: State<T> = state; // erase the `'static` lifetime
                let result = compile_object(f, state.obj);
                if result.is_err() {
                    state.error.store(true, SeqCst);
                }
                drop(token); // make sure our jobserver token is released after the compile
                result
            }));
        }

        let mut output = Vec::new();
        for handle in handles {
            match handle.join().map_err(Error::BuildPanic)? {
                Ok(r) => output.push(r),
                Err(err) => return Err(err),
            }
        }

        Ok(output)
    })
    .map_err(Error::BuildPanic)?;

    // Reacquire our process's token before we proceed, which we released
    // before entering the loop above.
    if reacquire {
        server.acquire_raw().map_err(Error::IOError)?;
    }

    return res;

    /// Shared state from the parent thread to the child thread. This
    /// package of pointers is temporarily transmuted to a `'static`
    /// lifetime to cross the thread boundary and then once the thread is
    /// running we erase the `'static` to go back to an anonymous lifetime.
    struct State<'a, O> {
        obj: &'a O,
        error: &'a AtomicBool,
    }

    /// Returns a suitable `jobserver::Client` used to coordinate
    /// parallelism between build scripts.
    fn jobserver() -> &'static jobserver::Client {
        static INIT: Once = Once::new();
        static mut JOBSERVER: Option<jobserver::Client> = None;

        fn _assert_sync<T: Sync>() {}
        _assert_sync::<jobserver::Client>();

        unsafe {
            INIT.call_once(|| {
                let server = default_jobserver();
                JOBSERVER = Some(server);
            });
            JOBSERVER.as_ref().unwrap()
        }
    }

    unsafe fn default_jobserver() -> jobserver::Client {
        // Try to use the environmental jobserver which Cargo typically
        // initializes for us...
        if let Some(client) = jobserver::Client::from_env() {
            return client;
        }

        // ... but if that fails for whatever reason fall back to the number
        // of cpus on the system or the `NUM_JOBS` env var.
        let mut parallelism = num_cpus::get();
        if let Ok(amt) = env::var("NUM_JOBS") {
            if let Ok(amt) = amt.parse() {
                parallelism = amt;
            }
        }

        // If we create our own jobserver then be sure to reserve one token
        // for ourselves.
        let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
        client.acquire_raw().expect("failed to acquire initial");
        client
    }
}

#[test]
fn it_works() {
    struct Object;
    let mut v = Vec::new();
    for _ in 0..4000 {
        v.push(Object);
    }
    compile_objects::<Object, (), (), _>(
        &|_| {
            println!("compile {:?}", std::thread::current().id());
            Ok(())
        },
        &v,
    )
    .unwrap();
}

#[test]
fn test_build_error() {
    struct Object;
    let mut v = Vec::new();
    v.push(Object);
    let err = compile_objects::<Object, (), (), _>(
        &|_| {
            return Err(());
        },
        &v,
    )
    .unwrap_err();

    match err {
        Error::BuildError(_) => {},
        _ => panic!("Unexpected error."),
    }
}

#[test]
fn test_build_panic() {
    struct Object;
    let mut v = Vec::new();
    v.push(Object);
    let err = compile_objects::<Object, (), (), _>(
        &|_| {
            panic!("Panic.");
        },
        &v,
    )
    .unwrap_err();

    match err {
        Error::BuildPanic(_) => {},
        _ => panic!("Unexpected error."),
    }
}

[ Dauer der Verarbeitung: 0.20 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge