Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/coreaudio-sys-utils/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 12 kB image not shown  

Quelle  dispatch.rs   Sprache: unbekannt

 
use coreaudio_sys::*;

use std::ffi::CString;
use std::mem;
use std::os::raw::c_void;
use std::panic;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Mutex, OnceLock};
#[cfg(test)]
use std::thread;
#[cfg(test)]
use std::time::Duration;
use std::time::Instant;

pub const DISPATCH_QUEUE_LABEL: &str = "org.mozilla.cubeb";

pub fn get_serial_queue_singleton() -> &'static Queue {
    static SERIAL_QUEUE: OnceLock<Queue> = OnceLock::new();
    SERIAL_QUEUE.get_or_init(|| Queue::new(DISPATCH_QUEUE_LABEL))
}

pub fn debug_assert_running_serially() {
    get_serial_queue_singleton().debug_assert_is_current();
}

pub fn debug_assert_not_running_serially() {
    get_serial_queue_singleton().debug_assert_is_not_current();
}

pub fn run_serially<F, B>(work: F) -> B
where
    F: FnOnce() -> B,
{
    get_serial_queue_singleton().run_sync(|| work()).unwrap()
}

pub fn run_serially_forward_panics<F, B>(work: F) -> B
where
    F: panic::UnwindSafe + FnOnce() -> B,
{
    match run_serially(|| panic::catch_unwind(|| work())) {
        Ok(res) => res,
        Err(e) => panic::resume_unwind(e),
    }
}

// Queue: A wrapper around `dispatch_queue_t` that is always serial.
// ------------------------------------------------------------------------------------------------
#[derive(Debug)]
pub struct Queue {
    queue: Mutex<dispatch_queue_t>,
    owned: AtomicBool,
}

impl Queue {
    pub fn new_with_target(label: &str, target: &Queue) -> Self {
        const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t =
            ptr::null_mut::<dispatch_queue_attr_s>();
        let label = CString::new(label).unwrap();
        let c_string = label.as_ptr();
        let queue = {
            let target_guard = target.queue.lock().unwrap();
            Self {
                queue: Mutex::new(unsafe {
                    dispatch_queue_create_with_target(
                        c_string,
                        DISPATCH_QUEUE_SERIAL,
                        *target_guard,
                    )
                }),
                owned: AtomicBool::new(true),
            }
        };
        queue.set_should_cancel(Box::new(AtomicBool::new(false)));
        queue
    }

    pub fn new(label: &str) -> Self {
        Queue::new_with_target(label, &Queue::get_global_queue())
    }

    pub fn get_global_queue() -> Self {
        Self {
            queue: Mutex::new(unsafe { dispatch_get_global_queue(QOS_CLASS_DEFAULT as isize, 0) }),
            owned: AtomicBool::new(false),
        }
    }

    #[cfg(debug_assertions)]
    pub fn debug_assert_is_current(&self) {
        let guard = self.queue.lock().unwrap();
        unsafe {
            dispatch_assert_queue(*guard);
        }
    }

    #[cfg(not(debug_assertions))]
    pub fn debug_assert_is_current(&self) {}

    #[cfg(debug_assertions)]
    pub fn debug_assert_is_not_current(&self) {
        let guard = self.queue.lock().unwrap();
        unsafe {
            dispatch_assert_queue_not(*guard);
        }
    }

    #[cfg(not(debug_assertions))]
    pub fn debug_assert_is_not_current(&self) {}

    pub fn run_async<F>(&self, work: F)
    where
        F: Send + FnOnce(),
    {
        let guard = self.queue.lock().unwrap();
        let should_cancel = self.get_should_cancel(*guard);
        let (closure, executor) = Self::create_closure_and_executor(|| {
            if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
                return;
            }
            work();
        });
        unsafe {
            dispatch_async_f(*guard, closure, executor);
        }
    }

    pub fn run_after<F>(&self, when: Instant, work: F)
    where
        F: Send + FnOnce(),
    {
        let now = Instant::now();
        if when <= now {
            return self.run_async(work);
        }
        let nanos = (when - now).as_nanos() as i64;
        let when = unsafe { dispatch_time(DISPATCH_TIME_NOW.into(), nanos) };
        let guard = self.queue.lock().unwrap();
        let should_cancel = self.get_should_cancel(*guard);
        let (closure, executor) = Self::create_closure_and_executor(|| {
            if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
                return;
            }
            work();
        });
        unsafe {
            dispatch_after_f(when, *guard, closure, executor);
        }
    }

    pub fn run_sync<F, B>(&self, work: F) -> Option<B>
    where
        F: FnOnce() -> B,
    {
        let queue: Option<dispatch_queue_t>;
        let mut res: Option<B> = None;
        let cex: Option<(*mut c_void, dispatch_function_t)>;
        {
            let guard = self.queue.lock().unwrap();
            queue = Some(*guard);
            let should_cancel = self.get_should_cancel(*guard);
            cex = Some(Self::create_closure_and_executor(|| {
                if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
                    return;
                }
                res = Some(work());
            }));
        }
        let (closure, executor) = cex.unwrap();
        unsafe {
            dispatch_sync_f(queue.unwrap(), closure, executor);
        }
        res
    }

    pub fn run_final<F, B>(&self, work: F) -> Option<B>
    where
        F: FnOnce() -> B,
    {
        assert!(
            self.owned.load(Ordering::SeqCst),
            "Doesn't make sense to finalize global queue"
        );
        let queue: Option<dispatch_queue_t>;
        let mut res: Option<B> = None;
        let cex: Option<(*mut c_void, dispatch_function_t)>;
        {
            let guard = self.queue.lock().unwrap();
            queue = Some(*guard);
            let should_cancel = self.get_should_cancel(*guard);
            debug_assert!(
                should_cancel.is_some(),
                "dispatch context should be allocated!"
            );
            cex = Some(Self::create_closure_and_executor(|| {
                res = Some(work());
                should_cancel
                    .expect("dispatch context should be allocated!")
                    .store(true, Ordering::SeqCst);
            }));
        }
        let (closure, executor) = cex.unwrap();
        unsafe {
            dispatch_sync_f(queue.unwrap(), closure, executor);
        }
        res
    }

    fn get_should_cancel(&self, queue: dispatch_queue_t) -> Option<&mut AtomicBool> {
        if !self.owned.load(Ordering::SeqCst) {
            return None;
        }
        unsafe {
            let context =
                dispatch_get_context(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue))
                    as *mut AtomicBool;
            context.as_mut()
        }
    }

    fn set_should_cancel(&self, context: Box<AtomicBool>) {
        assert!(self.owned.load(Ordering::SeqCst));
        unsafe {
            let guard = self.queue.lock().unwrap();
            let queue = mem::transmute::<dispatch_queue_t, dispatch_object_t>(*guard);
            // Leak the context from Box.
            dispatch_set_context(queue, Box::into_raw(context) as *mut c_void);

            extern "C" fn finalizer(context: *mut c_void) {
                // Retake the leaked context into box and then drop it.
                let _ = unsafe { Box::from_raw(context as *mut AtomicBool) };
            }

            // The `finalizer` is only run if the `context` in `queue` is set by `dispatch_set_context`.
            dispatch_set_finalizer_f(queue, Some(finalizer));
        }
    }

    fn release(&self) {
        let guard = self.queue.lock().unwrap();
        let queue = *guard;
        unsafe {
            // This will release the inner `dispatch_queue_t` asynchronously.
            // TODO: It's incredibly unsafe to call `transmute` directly.
            //       Find another way to release the queue.
            dispatch_release(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue));
        }
    }

    fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, dispatch_function_t)
    where
        F: FnOnce(),
    {
        extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void)
        where
            F: FnOnce(),
        {
            // Retake the leaked closure.
            let closure = unsafe { Box::from_raw(unboxed_closure as *mut F) };
            // Execute the closure.
            (*closure)();
            // closure is released after finishing this function call.
        }

        let closure = Box::new(closure); // Allocate closure on heap.
        let executor: dispatch_function_t = Some(closure_executer::<F>);

        (
            Box::into_raw(closure) as *mut c_void, // Leak the closure.
            executor,
        )
    }
}

impl Drop for Queue {
    fn drop(&mut self) {
        if self.owned.load(Ordering::SeqCst) {
            self.release();
        }
    }
}

impl Clone for Queue {
    fn clone(&self) -> Self {
        assert!(
            self.owned.load(Ordering::SeqCst),
            "No need to clone a static queue"
        );
        let guard = self.queue.lock().unwrap();
        let queue = *guard;
        // TODO: It's incredibly unsafe to call `transmute` directly.
        //       Find another way to release the queue.
        unsafe {
            dispatch_retain(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue));
        }
        Self {
            queue: Mutex::new(queue),
            owned: AtomicBool::new(true),
        }
    }
}

unsafe impl Send for Queue {}
unsafe impl Sync for Queue {}

#[test]
fn run_tasks_in_order() {
    let mut visited = Vec::<u32>::new();

    // Rust compilter doesn't allow a pointer to be passed across threads.
    // A hacky way to do that is to cast the pointer into a value, then
    // the value, which is actually an address, can be copied into threads.
    let ptr = &mut visited as *mut Vec<u32> as usize;

    fn visit(v: u32, visited_ptr: usize) {
        let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
        visited.push(v);
    }

    let queue = Queue::new("Run tasks in order");

    queue.run_sync(|| visit(1, ptr));
    queue.run_sync(|| visit(2, ptr));
    queue.run_async(|| visit(3, ptr));
    queue.run_async(|| visit(4, ptr));
    // Call sync here to block the current thread and make sure all the tasks are done.
    queue.run_sync(|| visit(5, ptr));

    assert_eq!(visited, vec![1, 2, 3, 4, 5]);
}

#[test]
fn run_final_task() {
    let mut visited = Vec::<u32>::new();

    {
        // Rust compilter doesn't allow a pointer to be passed across threads.
        // A hacky way to do that is to cast the pointer into a value, then
        // the value, which is actually an address, can be copied into threads.
        let ptr = &mut visited as *mut Vec<u32> as usize;

        fn visit(v: u32, visited_ptr: usize) {
            let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
            visited.push(v);
        }

        let queue = Queue::new("Task after run_final will be cancelled");

        queue.run_sync(|| visit(1, ptr));
        queue.run_async(|| visit(2, ptr));
        queue.run_final(|| visit(3, ptr));
        queue.run_async(|| visit(4, ptr));
        queue.run_sync(|| visit(5, ptr));
    }
    // `queue` will be dropped asynchronously and then the `finalizer` of the `queue`
    // should be fired to clean up the `context` set in the `queue`.

    assert_eq!(visited, vec![1, 2, 3]);
}

#[test]
fn sync_return_value() {
    let q = Queue::new("Test queue");
    assert_eq!(q.run_sync(|| 42), Some(42));
    assert_eq!(q.run_final(|| "foo"), Some("foo"));
    assert_eq!(q.run_sync(|| Ok::<(), u32>(())), None);
}

#[test]
fn run_after() {
    let mut visited = Vec::<u32>::new();

    {
        // Rust compilter doesn't allow a pointer to be passed across threads.
        // A hacky way to do that is to cast the pointer into a value, then
        // the value, which is actually an address, can be copied into threads.
        let ptr = &mut visited as *mut Vec<u32> as usize;

        fn visit(v: u32, visited_ptr: usize) {
            let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
            visited.push(v);
        }

        let queue = Queue::new("Task after run_final will be cancelled");

        queue.run_async(|| visit(1, ptr));
        queue.run_after(Instant::now() + Duration::from_millis(10), || visit(2, ptr));
        queue.run_after(Instant::now() + Duration::from_secs(1), || visit(3, ptr));
        queue.run_async(|| visit(4, ptr));
        thread::sleep(Duration::from_millis(100));
        queue.run_final(|| visit(5, ptr));
    }
    // `queue` will be dropped asynchronously and then the `finalizer` of the `queue`
    // should be fired to clean up the `context` set in the `queue`.

    assert_eq!(visited, vec![1, 4, 2, 5]);
}

[ Dauer der Verarbeitung: 0.24 Sekunden  (vorverarbeitet)  ]