Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  context.rs   Sprache: unbekannt

 
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

use crate::stream;
use crate::{assert_not_in_callback, run_in_callback};
use crate::{ClientStream, AUDIOIPC_INIT_PARAMS};
#[cfg(target_os = "linux")]
use audio_thread_priority::get_current_thread_info;
#[cfg(not(target_os = "linux"))]
use audio_thread_priority::promote_current_thread_to_real_time;
use audioipc::ipccore::EventLoopHandle;
use audioipc::{ipccore, rpccore, sys, PlatformHandle};
use audioipc::{
    messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage,
    ServerMessage,
};
use cubeb_backend::{
    capi_new, ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error,
    InputProcessingParams, Ops, Result, Stream, StreamParams, StreamParamsRef,
};
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::sync::{Arc, Mutex};
use std::thread;
use std::{fmt, mem, ptr};

struct CubebClient;

impl rpccore::Client for CubebClient {
    type ServerMessage = ServerMessage;
    type ClientMessage = ClientMessage;
}

pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);

// ClientContext's layout *must* match cubeb.c's `struct cubeb` for the
// common fields.
#[repr(C)]
pub struct ClientContext {
    _ops: *const Ops,
    rpc: rpccore::Proxy<ServerMessage, ClientMessage>,
    rpc_thread: ipccore::EventLoopThread,
    callback_thread: ipccore::EventLoopThread,
    backend_id: CString,
    device_collection_rpc: bool,
    input_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
    output_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
}

impl ClientContext {
    #[doc(hidden)]
    pub fn rpc_handle(&self) -> &EventLoopHandle {
        self.rpc_thread.handle()
    }

    #[doc(hidden)]
    pub fn rpc(&self) -> rpccore::Proxy<ServerMessage, ClientMessage> {
        self.rpc.clone()
    }

    #[doc(hidden)]
    pub fn callback_handle(&self) -> &EventLoopHandle {
        self.callback_thread.handle()
    }
}

#[cfg(target_os = "linux")]
fn promote_thread(rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
    match get_current_thread_info() {
        Ok(info) => {
            let bytes = info.serialize();
            let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes));
        }
        Err(_) => {
            warn!("Could not remotely promote thread to RT.");
        }
    }
}

#[cfg(not(target_os = "linux"))]
fn promote_thread(_rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
    match promote_current_thread_to_real_time(0, 48000) {
        Ok(_) => {
            info!("Audio thread promoted to real-time.");
        }
        Err(_) => {
            warn!("Could not promote thread to real-time.");
        }
    }
}

fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) {
    if let Some(func) = callback {
        let thr = thread::current();
        let name = CString::new(thr.name().unwrap()).unwrap();
        func(name.as_ptr());
    }
}

fn unregister_thread(callback: Option<extern "C" fn()>) {
    if let Some(func) = callback {
        func();
    }
}

fn promote_and_register_thread(
    rpc: &rpccore::Proxy<ServerMessage, ClientMessage>,
    callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
) {
    promote_thread(rpc);
    register_thread(callback);
}

#[derive(Default)]
struct DeviceCollectionCallback {
    cb: ffi::cubeb_device_collection_changed_callback,
    user_ptr: usize,
}

struct DeviceCollectionServer {
    input_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
    output_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
}

impl rpccore::Server for DeviceCollectionServer {
    type ServerMessage = DeviceCollectionReq;
    type ClientMessage = DeviceCollectionResp;

    fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
        match req {
            DeviceCollectionReq::DeviceChange(device_type) => {
                trace!(
                    "ctx_thread: DeviceChange Callback: device_type={}",
                    device_type
                );

                let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type);

                let (input_cb, input_user_ptr) = {
                    let dcb = self.input_device_callback.lock().unwrap();
                    (dcb.cb, dcb.user_ptr)
                };
                let (output_cb, output_user_ptr) = {
                    let dcb = self.output_device_callback.lock().unwrap();
                    (dcb.cb, dcb.user_ptr)
                };

                run_in_callback(|| {
                    if devtype.contains(cubeb_backend::DeviceType::INPUT) {
                        unsafe { input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) }
                    }
                    if devtype.contains(cubeb_backend::DeviceType::OUTPUT) {
                        unsafe {
                            output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void)
                        }
                    }
                });

                DeviceCollectionResp::DeviceChange
            }
        }
    }
}

impl ContextOps for ClientContext {
    fn init(_context_name: Option<&CStr>) -> Result<Context> {
        assert_not_in_callback();

        let params = AUDIOIPC_INIT_PARAMS.with(|p| p.replace(None).unwrap());
        let thread_create_callback = params.thread_create_callback;
        let thread_destroy_callback = params.thread_destroy_callback;

        let server_connection =
            unsafe { sys::Pipe::from_raw_handle(PlatformHandle::new(params.server_connection)) };

        let rpc_thread = ipccore::EventLoopThread::new(
            "AudioIPC Client RPC".to_string(),
            None,
            move || register_thread(thread_create_callback),
            move || unregister_thread(thread_destroy_callback),
        )
        .map_err(|_| Error::default())?;
        let rpc = rpc_thread
            .handle()
            .bind_client::<CubebClient>(server_connection)
            .map_err(|_| Error::default())?;
        let rpc2 = rpc.clone();

        // Don't let errors bubble from here.  Later calls against this context
        // will return errors the caller expects to handle.
        let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected);

        let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId())
            .unwrap_or_else(|_| "(remote error)".to_string());
        let backend_id = CString::new(backend_id).expect("backend_id query failed");

        // TODO: remove params.pool_size from init params.
        let callback_thread = ipccore::EventLoopThread::new(
            "AudioIPC Client Callback".to_string(),
            Some(params.stack_size),
            move || promote_and_register_thread(&rpc2, thread_create_callback),
            move || unregister_thread(thread_destroy_callback),
        )
        .map_err(|_| Error::default())?;

        let ctx = Box::new(ClientContext {
            _ops: &CLIENT_OPS as *const _,
            rpc,
            rpc_thread,
            callback_thread,
            backend_id,
            device_collection_rpc: false,
            input_device_callback: Arc::new(Mutex::new(Default::default())),
            output_device_callback: Arc::new(Mutex::new(Default::default())),
        });
        Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
    }

    fn backend_id(&mut self) -> &CStr {
        assert_not_in_callback();
        self.backend_id.as_c_str()
    }

    fn max_channel_count(&mut self) -> Result<u32> {
        assert_not_in_callback();
        send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount())
    }

    fn min_latency(&mut self, params: StreamParams) -> Result<u32> {
        assert_not_in_callback();
        let params = messages::StreamParams::from(params.as_ref());
        send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency())
    }

    fn preferred_sample_rate(&mut self) -> Result<u32> {
        assert_not_in_callback();
        send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
    }

    fn supported_input_processing_params(&mut self) -> Result<InputProcessingParams> {
        assert_not_in_callback();
        send_recv!(self.rpc(),
                   ContextGetSupportedInputProcessingParams =>
                   ContextSupportedInputProcessingParams())
        .map(InputProcessingParams::from_bits_truncate)
    }

    fn enumerate_devices(
        &mut self,
        devtype: DeviceType,
        collection: &DeviceCollectionRef,
    ) -> Result<()> {
        assert_not_in_callback();
        let v: Vec<ffi::cubeb_device_info> = send_recv!(
            self.rpc(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices())?
        .into_iter()
        .map(|i| i.into())
        .collect();
        let mut vs = v.into_boxed_slice();
        let coll = unsafe { &mut *collection.as_ptr() };
        coll.device = vs.as_mut_ptr();
        coll.count = vs.len();
        // Giving away the memory owned by vs.  Don't free it!
        // Reclaimed in `device_collection_destroy`.
        mem::forget(vs);
        Ok(())
    }

    fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> {
        assert_not_in_callback();
        unsafe {
            let coll = &mut *collection.as_ptr();
            let mut devices = Vec::from_raw_parts(coll.device, coll.count, coll.count);
            for dev in &mut devices {
                if !dev.device_id.is_null() {
                    let _ = CString::from_raw(dev.device_id as *mut _);
                }
                if !dev.group_id.is_null() {
                    let _ = CString::from_raw(dev.group_id as *mut _);
                }
                if !dev.vendor_name.is_null() {
                    let _ = CString::from_raw(dev.vendor_name as *mut _);
                }
                if !dev.friendly_name.is_null() {
                    let _ = CString::from_raw(dev.friendly_name as *mut _);
                }
            }
            coll.device = ptr::null_mut();
            coll.count = 0;
            Ok(())
        }
    }

    fn stream_init(
        &mut self,
        stream_name: Option<&CStr>,
        input_device: DeviceId,
        input_stream_params: Option<&StreamParamsRef>,
        output_device: DeviceId,
        output_stream_params: Option<&StreamParamsRef>,
        latency_frames: u32,
        // These params aren't sent to the server
        data_callback: ffi::cubeb_data_callback,
        state_callback: ffi::cubeb_state_callback,
        user_ptr: *mut c_void,
    ) -> Result<Stream> {
        assert_not_in_callback();

        let stream_name = stream_name.map(|name| name.to_bytes_with_nul().to_vec());

        let input_stream_params = input_stream_params.map(messages::StreamParams::from);
        let output_stream_params = output_stream_params.map(messages::StreamParams::from);

        let init_params = messages::StreamInitParams {
            stream_name,
            input_device: input_device as usize,
            input_stream_params,
            output_device: output_device as usize,
            output_stream_params,
            latency_frames,
        };
        stream::init(self, init_params, data_callback, state_callback, user_ptr)
    }

    fn register_device_collection_changed(
        &mut self,
        devtype: DeviceType,
        collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
        user_ptr: *mut c_void,
    ) -> Result<()> {
        assert_not_in_callback();

        if !self.device_collection_rpc {
            let mut fd = send_recv!(self.rpc(),
                                 ContextSetupDeviceCollectionCallback =>
                                 ContextSetupDeviceCollectionCallback())?;

            let stream = unsafe { sys::Pipe::from_raw_handle(fd.platform_handle.take_handle()) };

            let server = DeviceCollectionServer {
                input_device_callback: self.input_device_callback.clone(),
                output_device_callback: self.output_device_callback.clone(),
            };

            self.rpc_handle()
                .bind_server(server, stream)
                .map_err(|_| Error::default())?;
            self.device_collection_rpc = true;
        }

        if devtype.contains(cubeb_backend::DeviceType::INPUT) {
            let mut cb = self.input_device_callback.lock().unwrap();
            cb.cb = collection_changed_callback;
            cb.user_ptr = user_ptr as usize;
        }
        if devtype.contains(cubeb_backend::DeviceType::OUTPUT) {
            let mut cb = self.output_device_callback.lock().unwrap();
            cb.cb = collection_changed_callback;
            cb.user_ptr = user_ptr as usize;
        }

        let enable = collection_changed_callback.is_some();
        send_recv!(self.rpc(),
                   ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) =>
                   ContextRegisteredDeviceCollectionChanged)
    }
}

impl Drop for ClientContext {
    fn drop(&mut self) {
        debug!("ClientContext dropped...");
        let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
    }
}

impl fmt::Debug for ClientContext {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ClientContext")
            .field("_ops", &self._ops)
            .field("rpc", &self.rpc)
            .field("core", &self.rpc_thread)
            .field("cpu_pool", &"...")
            .finish()
    }
}

[ Dauer der Verarbeitung: 0.23 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge