Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  server.rs   Sprache: unbekannt

 
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

#[cfg(target_os = "linux")]
use audio_thread_priority::{promote_thread_to_real_time, RtPriorityThreadInfo};
use audioipc::messages::SerializableHandle;
use audioipc::messages::{
    CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp,
    DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams,
    StreamInitParams, StreamParams,
};
use audioipc::shm::SharedMem;
use audioipc::{ipccore, rpccore, sys, PlatformHandle};
use cubeb::InputProcessingParams;
use cubeb_core as cubeb;
use cubeb_core::ffi;
use std::convert::TryInto;
use std::ffi::CStr;
use std::mem::size_of;
use std::os::raw::{c_long, c_void};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cell::RefCell, sync::Mutex};
use std::{panic, slice};

use crate::errors::*;

fn error(error: cubeb::Error) -> ClientMessage {
    ClientMessage::Error(error.raw_code())
}

struct CubebDeviceCollectionManager {
    servers: Mutex<Vec<(Rc<DeviceCollectionChangeCallback>, cubeb::DeviceType)>>,
}

impl CubebDeviceCollectionManager {
    fn new() -> CubebDeviceCollectionManager {
        CubebDeviceCollectionManager {
            servers: Mutex::new(Vec::new()),
        }
    }

    fn register(
        &self,
        context: &cubeb::Context,
        server: &Rc<DeviceCollectionChangeCallback>,
        devtype: cubeb::DeviceType,
    ) -> cubeb::Result<()> {
        let mut servers = self.servers.lock().unwrap();
        if servers.is_empty() {
            self.internal_register(context, true)?;
        }
        servers.push((server.clone(), devtype));
        Ok(())
    }

    fn unregister(
        &self,
        context: &cubeb::Context,
        server: &Rc<DeviceCollectionChangeCallback>,
        devtype: cubeb::DeviceType,
    ) -> cubeb::Result<()> {
        let mut servers = self.servers.lock().unwrap();
        servers.retain(|(s, d)| !Rc::ptr_eq(s, server) || d != &devtype);
        if servers.is_empty() {
            self.internal_register(context, false)?;
        }
        Ok(())
    }

    fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> {
        for &(dir, cb) in &[
            (
                cubeb::DeviceType::INPUT,
                device_collection_changed_input_cb_c as _,
            ),
            (
                cubeb::DeviceType::OUTPUT,
                device_collection_changed_output_cb_c as _,
            ),
        ] {
            unsafe {
                context.register_device_collection_changed(
                    dir,
                    if enable { Some(cb) } else { None },
                    if enable {
                        self as *const CubebDeviceCollectionManager as *mut c_void
                    } else {
                        std::ptr::null_mut()
                    },
                )?;
            }
        }
        Ok(())
    }

    unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
        let servers = self.servers.lock().unwrap();
        servers.iter().for_each(|(s, d)| {
            if d.contains(cubeb::DeviceType::from_bits_truncate(device_type)) {
                s.device_collection_changed_callback(device_type)
            }
        });
    }
}

impl Drop for CubebDeviceCollectionManager {
    fn drop(&mut self) {
        assert!(self.servers.lock().unwrap().is_empty());
    }
}

struct DevIdMap {
    devices: Vec<usize>,
}

// A cubeb_devid is an opaque type which may be implemented with a stable
// pointer in a cubeb backend.  cubeb_devids received remotely must be
// validated before use, so DevIdMap provides a simple 1:1 mapping between a
// cubeb_devid and an IPC-transportable value suitable for use as a unique
// handle.
impl DevIdMap {
    fn new() -> DevIdMap {
        let mut d = DevIdMap {
            devices: Vec::with_capacity(32),
        };
        // A null cubeb_devid is used for selecting the default device.
        // Pre-populate the mapping with 0 -> 0 to handle nulls.
        d.devices.push(0);
        d
    }

    // Given a cubeb_devid, return a unique stable value suitable for use
    // over IPC.
    fn make_handle(&mut self, devid: usize) -> usize {
        if let Some(i) = self.devices.iter().position(|&d| d == devid) {
            return i;
        }
        self.devices.push(devid);
        self.devices.len() - 1
    }

    // Given a handle produced by `make_handle`, return the associated
    // cubeb_devid.  Invalid handles result in a panic.
    fn handle_to_id(&self, handle: usize) -> usize {
        self.devices[handle]
    }
}

struct CubebContextState {
    // `manager` must be dropped before the `context` is destroyed.
    manager: CubebDeviceCollectionManager,
    context: cubeb::Result<cubeb::Context>,
}

thread_local!(static CONTEXT_KEY: RefCell<Option<CubebContextState>> = const { RefCell::new(None) });

fn cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context> {
    let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
    let context_name = Some(params.context_name.as_c_str());
    let backend_name = params.backend_name.as_deref();
    let r = cubeb::Context::init(context_name, backend_name);
    r.inspect_err(|e| {
        info!("cubeb::Context::init failed r={:?}", e);
    })
}

fn with_local_context<T, F>(f: F) -> T
where
    F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,
{
    CONTEXT_KEY.with(|k| {
        let mut state = k.borrow_mut();
        if state.is_none() {
            *state = Some(CubebContextState {
                manager: CubebDeviceCollectionManager::new(),
                context: cubeb_init_from_context_params(),
            });
        }
        let CubebContextState { manager, context } = state.as_mut().unwrap();
        // Always reattempt to initialize cubeb, OS config may have changed.
        if context.is_err() {
            *context = cubeb_init_from_context_params();
        }
        f(context, manager)
    })
}

struct DeviceCollectionClient;

impl rpccore::Client for DeviceCollectionClient {
    type ServerMessage = DeviceCollectionReq;
    type ClientMessage = DeviceCollectionResp;
}

struct CallbackClient;

impl rpccore::Client for CallbackClient {
    type ServerMessage = CallbackReq;
    type ClientMessage = CallbackResp;
}

struct ServerStreamCallbacks {
    /// Size of input frame in bytes
    input_frame_size: u16,
    /// Size of output frame in bytes
    output_frame_size: u16,
    /// Shared memory buffer for transporting audio data to/from client
    shm: SharedMem,
    /// RPC interface for data_callback (on OS audio thread) to server callback thread
    data_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
    /// RPC interface for state_callback (on any thread) to server callback thread
    state_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
    /// RPC interface for device_change_callback (on any thread) to server callback thread
    device_change_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
    /// Indicates stream is connected to client side.  Callbacks received before
    /// the stream is in the connected state cannot be sent to the client side, so
    /// are logged and otherwise ignored.
    connected: AtomicBool,
}

impl ServerStreamCallbacks {
    fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize {
        trace!(
            "Stream data callback: {} {} {}",
            nframes,
            input.len(),
            output.len()
        );
        if !self.connected.load(Ordering::Acquire) {
            warn!("Stream data callback triggered before stream connected");
            return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
        }

        if self.input_frame_size != 0 {
            if input.len() > self.shm.get_size() {
                debug!(
                    "bad input size: input={} shm={}",
                    input.len(),
                    self.shm.get_size()
                );
                return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
            }
            unsafe {
                self.shm
                    .get_mut_slice(input.len())
                    .unwrap()
                    .copy_from_slice(input);
            }
        }

        if self.output_frame_size != 0 && output.len() > self.shm.get_size() {
            debug!(
                "bad output size: output={} shm={}",
                output.len(),
                self.shm.get_size()
            );
            return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
        }

        if nframes == 0 {
            // Optimization: skip the RPC call when there are no frames.
            return 0;
        }

        let r = self.data_callback_rpc.call(CallbackReq::Data {
            nframes,
            input_frame_size: self.input_frame_size as usize,
            output_frame_size: self.output_frame_size as usize,
        });

        match r {
            Ok(CallbackResp::Data(frames)) => {
                if frames >= 0 && self.output_frame_size != 0 {
                    let nbytes = frames as usize * self.output_frame_size as usize;
                    unsafe {
                        output[..nbytes].copy_from_slice(self.shm.get_slice(nbytes).unwrap());
                    }
                }
                frames
            }
            _ => {
                debug!("Unexpected message {:?} during data_callback", r);
                cubeb::ffi::CUBEB_ERROR.try_into().unwrap()
            }
        }
    }

    fn state_callback(&self, state: cubeb::State) {
        trace!("Stream state callback: {:?}", state);
        if !self.connected.load(Ordering::Acquire) {
            warn!("Stream state callback triggered before stream connected");
            return;
        }

        let r = self
            .state_callback_rpc
            .call(CallbackReq::State(state.into()));
        match r {
            Ok(CallbackResp::State) => {}
            _ => {
                debug!("Unexpected message {:?} during state callback", r);
            }
        }
    }

    fn device_change_callback(&self) {
        trace!("Stream device change callback");
        if !self.connected.load(Ordering::Acquire) {
            warn!("Stream device_change callback triggered before stream connected");
            return;
        }
        let r = self
            .device_change_callback_rpc
            .call(CallbackReq::DeviceChange);
        match r {
            Ok(CallbackResp::DeviceChange) => {}
            _ => {
                debug!("Unexpected message {:?} during device change callback", r);
            }
        }
    }
}

static SHM_ID: AtomicUsize = AtomicUsize::new(0);

// Generate a temporary shm_id fragment that is unique to the process.  This
// path is used temporarily to create a shm segment, which is then
// immediately deleted from the filesystem while retaining handles to the
// shm to be shared between the server and client.
fn get_shm_id() -> String {
    format!(
        "cubeb-shm-{}-{}",
        std::process::id(),
        SHM_ID.fetch_add(1, Ordering::SeqCst)
    )
}

struct ServerStream {
    stream: Option<cubeb::Stream>,
    cbs: Box<ServerStreamCallbacks>,
    client_pipe: Option<PlatformHandle>,
}

impl Drop for ServerStream {
    fn drop(&mut self) {
        // `stream` *must* be dropped before `cbs`.
        drop(self.stream.take());
    }
}

struct DeviceCollectionChangeCallback {
    rpc: rpccore::Proxy<DeviceCollectionReq, DeviceCollectionResp>,
}

impl DeviceCollectionChangeCallback {
    fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
        // TODO: Assert device_type is in devtype.
        debug!(
            "Sending device collection ({:?}) changed event",
            device_type
        );
        let _ = self
            .rpc
            .call(DeviceCollectionReq::DeviceChange(device_type));
    }
}

pub struct CubebServer {
    callback_thread: ipccore::EventLoopHandle,
    device_collection_thread: ipccore::EventLoopHandle,
    streams: slab::Slab<ServerStream>,
    remote_pid: Option<u32>,
    device_collection_change_callbacks: Option<Rc<DeviceCollectionChangeCallback>>,
    devidmap: DevIdMap,
    shm_area_size: usize,
}

impl Drop for CubebServer {
    fn drop(&mut self) {
        if let Some(device_collection_change_callbacks) = &self.device_collection_change_callbacks {
            debug!("CubebServer: dropped with device_collection_change_callbacks registered");
            CONTEXT_KEY.with(|k| {
                let mut state = k.borrow_mut();
                if let Some(CubebContextState {
                    manager,
                    context: Ok(context),
                }) = state.as_mut()
                {
                    for devtype in [cubeb::DeviceType::INPUT, cubeb::DeviceType::OUTPUT] {
                        let r = manager.unregister(
                            context,
                            device_collection_change_callbacks,
                            devtype,
                        );
                        if r.is_err() {
                            debug!("CubebServer: unregister failed: {:?}", r);
                        }
                    }
                }
            })
        }
    }
}

#[allow(unknown_lints)] // non_send_fields_in_send_ty is Nightly-only as of 2021-11-29.
#[allow(clippy::non_send_fields_in_send_ty)]
// XXX: required for server setup, verify this is safe.
unsafe impl Send for CubebServer {}

impl rpccore::Server for CubebServer {
    type ServerMessage = ServerMessage;
    type ClientMessage = ClientMessage;

    fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
        if let ServerMessage::ClientConnect(pid) = req {
            self.remote_pid = Some(pid);
        }
        with_local_context(|context, manager| match *context {
            Err(_) => error(cubeb::Error::error()),
            Ok(ref context) => self.process_msg(context, manager, &req),
        })
    }
}

// Debugging for BMO 1594216/1612044.
macro_rules! try_stream {
    ($self:expr, $stm_tok:expr) => {
        if $self.streams.contains($stm_tok) {
            $self.streams[$stm_tok]
                .stream
                .as_mut()
                .expect("uninitialized stream")
        } else {
            error!(
                "{}:{}:{} - Stream({}): invalid token",
                file!(),
                line!(),
                column!(),
                $stm_tok
            );
            return error(cubeb::Error::invalid_parameter());
        }
    };
}

impl CubebServer {
    pub fn new(
        callback_thread: ipccore::EventLoopHandle,
        device_collection_thread: ipccore::EventLoopHandle,
        shm_area_size: usize,
    ) -> Self {
        CubebServer {
            callback_thread,
            device_collection_thread,
            streams: slab::Slab::<ServerStream>::new(),
            remote_pid: None,
            device_collection_change_callbacks: None,
            devidmap: DevIdMap::new(),
            shm_area_size,
        }
    }

    // Process a request coming from the client.
    fn process_msg(
        &mut self,
        context: &cubeb::Context,
        manager: &mut CubebDeviceCollectionManager,
        msg: &ServerMessage,
    ) -> ClientMessage {
        let resp: ClientMessage = match *msg {
            ServerMessage::ClientConnect(_) => {
                // remote_pid is set before cubeb initialization, just verify here.
                assert!(self.remote_pid.is_some());
                ClientMessage::ClientConnected
            }

            ServerMessage::ClientDisconnect => {
                // TODO:
                //self.connection.client_disconnect();
                ClientMessage::ClientDisconnected
            }

            ServerMessage::ContextGetBackendId => {
                ClientMessage::ContextBackendId(context.backend_id().to_string())
            }

            ServerMessage::ContextGetMaxChannelCount => context
                .max_channel_count()
                .map(ClientMessage::ContextMaxChannelCount)
                .unwrap_or_else(error),

            ServerMessage::ContextGetMinLatency(ref params) => {
                let format = cubeb::SampleFormat::from(params.format);
                let layout = cubeb::ChannelLayout::from(params.layout);

                let params = cubeb::StreamParamsBuilder::new()
                    .format(format)
                    .rate(params.rate)
                    .channels(params.channels)
                    .layout(layout)
                    .take();

                context
                    .min_latency(¶ms)
                    .map(ClientMessage::ContextMinLatency)
                    .unwrap_or_else(error)
            }

            ServerMessage::ContextGetPreferredSampleRate => context
                .preferred_sample_rate()
                .map(ClientMessage::ContextPreferredSampleRate)
                .unwrap_or_else(error),

            ServerMessage::ContextGetSupportedInputProcessingParams => context
                .supported_input_processing_params()
                .map(|params| ClientMessage::ContextSupportedInputProcessingParams(params.bits()))
                .unwrap_or_else(error),

            ServerMessage::ContextGetDeviceEnumeration(device_type) => context
                .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
                .map(|devices| {
                    let v: Vec<DeviceInfo> = devices
                        .iter()
                        .map(|i| {
                            let mut tmp: DeviceInfo = i.as_ref().into();
                            // Replace each cubeb_devid with a unique handle suitable for IPC.
                            tmp.devid = self.devidmap.make_handle(tmp.devid);
                            tmp
                        })
                        .collect();
                    ClientMessage::ContextEnumeratedDevices(v)
                })
                .unwrap_or_else(error),

            ServerMessage::StreamCreate(ref params) => self
                .process_stream_create(params)
                .unwrap_or_else(|_| error(cubeb::Error::error())),

            ServerMessage::StreamInit(stm_tok, ref params) => self
                .process_stream_init(context, stm_tok, params)
                .unwrap_or_else(|_| error(cubeb::Error::error())),

            ServerMessage::StreamDestroy(stm_tok) => {
                if self.streams.contains(stm_tok) {
                    debug!("Unregistering stream {:?}", stm_tok);
                    self.streams.remove(stm_tok);
                } else {
                    // Debugging for BMO 1594216/1612044.
                    error!("StreamDestroy({}): invalid token", stm_tok);
                    return error(cubeb::Error::invalid_parameter());
                }
                ClientMessage::StreamDestroyed
            }

            ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok)
                .start()
                .map(|_| ClientMessage::StreamStarted)
                .unwrap_or_else(error),

            ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok)
                .stop()
                .map(|_| ClientMessage::StreamStopped)
                .unwrap_or_else(error),

            ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok)
                .position()
                .map(ClientMessage::StreamPosition)
                .unwrap_or_else(error),

            ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok)
                .latency()
                .map(ClientMessage::StreamLatency)
                .unwrap_or_else(error),

            ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok)
                .input_latency()
                .map(ClientMessage::StreamInputLatency)
                .unwrap_or_else(error),

            ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok)
                .set_volume(volume)
                .map(|_| ClientMessage::StreamVolumeSet)
                .unwrap_or_else(error),

            ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok)
                .set_name(name)
                .map(|_| ClientMessage::StreamNameSet)
                .unwrap_or_else(error),

            ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok)
                .current_device()
                .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
                .unwrap_or_else(error),

            ServerMessage::StreamSetInputMute(stm_tok, mute) => try_stream!(self, stm_tok)
                .set_input_mute(mute)
                .map(|_| ClientMessage::StreamInputMuteSet)
                .unwrap_or_else(error),

            ServerMessage::StreamSetInputProcessingParams(stm_tok, params) => {
                try_stream!(self, stm_tok)
                    .set_input_processing_params(InputProcessingParams::from_bits_truncate(params))
                    .map(|_| ClientMessage::StreamInputProcessingParamsSet)
                    .unwrap_or_else(error)
            }

            ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => {
                try_stream!(self, stm_tok)
                    .register_device_changed_callback(if enable {
                        Some(device_change_cb_c)
                    } else {
                        None
                    })
                    .map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
                    .unwrap_or_else(error)
            }

            ServerMessage::ContextSetupDeviceCollectionCallback => {
                let (server_pipe, client_pipe) = match sys::make_pipe_pair() {
                    Ok((server_pipe, client_pipe)) => (server_pipe, client_pipe),
                    Err(e) => {
                        debug!(
                            "ContextSetupDeviceCollectionCallback - make_pipe_pair failed: {:?}",
                            e
                        );
                        return error(cubeb::Error::error());
                    }
                };

                // TODO: this should bind the client_pipe and send the server_pipe to the remote, but
                //       additional work is required as it's not possible to convert a Windows sys::Pipe into a raw handle.
                // TODO: Use the rpc_thread instead of an extra device_collection_thread, but a reentrant bind_client
                //       is required to support that.
                let rpc = match self
                    .device_collection_thread
                    .bind_client::<DeviceCollectionClient>(server_pipe)
                {
                    Ok(rpc) => rpc,
                    Err(e) => {
                        debug!(
                            "ContextSetupDeviceCollectionCallback - bind_client: {:?}",
                            e
                        );
                        return error(cubeb::Error::error());
                    }
                };

                self.device_collection_change_callbacks =
                    Some(Rc::new(DeviceCollectionChangeCallback { rpc }));
                let fd = RegisterDeviceCollectionChanged {
                    platform_handle: SerializableHandle::new(client_pipe, self.remote_pid.unwrap()),
                };

                ClientMessage::ContextSetupDeviceCollectionCallback(fd)
            }

            ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self
                .process_register_device_collection_changed(
                    context,
                    manager,
                    cubeb::DeviceType::from_bits_truncate(device_type),
                    enable,
                )
                .unwrap_or_else(error),

            #[cfg(target_os = "linux")]
            ServerMessage::PromoteThreadToRealTime(thread_info) => {
                let info = RtPriorityThreadInfo::deserialize(thread_info);
                match promote_thread_to_real_time(info, 0, 48000) {
                    Ok(_) => {
                        info!("Promotion of content process thread to real-time OK");
                    }
                    Err(_) => {
                        warn!("Promotion of content process thread to real-time error");
                    }
                }
                ClientMessage::ThreadPromoted
            }
        };

        trace!("process_msg: req={:?}, resp={:?}", msg, resp);

        resp
    }

    fn process_register_device_collection_changed(
        &mut self,
        context: &cubeb::Context,
        manager: &mut CubebDeviceCollectionManager,
        devtype: cubeb::DeviceType,
        enable: bool,
    ) -> cubeb::Result<ClientMessage> {
        if devtype == cubeb::DeviceType::UNKNOWN {
            return Err(cubeb::Error::invalid_parameter());
        }

        assert!(self.device_collection_change_callbacks.is_some());
        let cbs = self.device_collection_change_callbacks.as_ref().unwrap();

        if enable {
            manager.register(context, cbs, devtype)
        } else {
            manager.unregister(context, cbs, devtype)
        }
        .map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged)
    }

    // Stream create is special, so it's been separated from process_msg.
    fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage> {
        fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
            params
                .map(|p| {
                    let format = p.format.into();
                    let sample_size = match format {
                        cubeb::SampleFormat::S16LE
                        | cubeb::SampleFormat::S16BE
                        | cubeb::SampleFormat::S16NE => 2,
                        cubeb::SampleFormat::Float32LE
                        | cubeb::SampleFormat::Float32BE
                        | cubeb::SampleFormat::Float32NE => 4,
                    };
                    let channel_count = p.channels as u16;
                    sample_size * channel_count
                })
                .unwrap_or(0u16)
        }

        // Create the callback handling struct which is attached the cubeb stream.
        let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
        let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());

        // Estimate a safe shmem size for this stream configuration.  If the server was configured with a fixed
        // shm_area_size override, use that instead.
        // TODO: Add a new cubeb API to query the precise buffer size required for a given stream config.
        // https://github.com/mozilla/audioipc-2/issues/124
        let shm_area_size = if self.shm_area_size == 0 {
            let frame_size = output_frame_size.max(input_frame_size) as u32;
            let in_rate = params.input_stream_params.map(|p| p.rate).unwrap_or(0);
            let out_rate = params.output_stream_params.map(|p| p.rate).unwrap_or(0);
            let rate = out_rate.max(in_rate);
            // 1s of audio, rounded up to the nearest 64kB.
            // Stream latency is capped at 1s in process_stream_init.
            (((rate * frame_size) + 0xffff) & !0xffff) as usize
        } else {
            self.shm_area_size
        };
        debug!("shm_area_size = {}", shm_area_size);

        let shm = SharedMem::new(&get_shm_id(), shm_area_size)?;
        let shm_handle = unsafe { shm.make_handle()? };

        let (server_pipe, client_pipe) = sys::make_pipe_pair()?;
        // TODO: this should bind the client_pipe and send the server_pipe to the remote, but
        //       additional work is required as it's not possible to convert a Windows sys::Pipe into a raw handle.
        let rpc = self
            .callback_thread
            .bind_client::<CallbackClient>(server_pipe)?;

        let cbs = Box::new(ServerStreamCallbacks {
            input_frame_size,
            output_frame_size,
            shm,
            state_callback_rpc: rpc.clone(),
            device_change_callback_rpc: rpc.clone(),
            data_callback_rpc: rpc,
            connected: AtomicBool::new(false),
        });

        let entry = self.streams.vacant_entry();
        let key = entry.key();
        debug!("Registering stream {:?}", key);

        entry.insert(ServerStream {
            stream: None,
            cbs,
            client_pipe: Some(client_pipe),
        });

        Ok(ClientMessage::StreamCreated(StreamCreate {
            token: key,
            shm_handle: SerializableHandle::new(shm_handle, self.remote_pid.unwrap()),
            shm_area_size,
        }))
    }

    // Stream init is special, so it's been separated from process_msg.
    fn process_stream_init(
        &mut self,
        context: &cubeb::Context,
        stm_tok: usize,
        params: &StreamInitParams,
    ) -> Result<ClientMessage> {
        // Create cubeb stream from params
        let stream_name = params
            .stream_name
            .as_ref()
            .and_then(|name| CStr::from_bytes_with_nul(name).ok());

        // Map IPC handle back to cubeb_devid.
        let input_device = self.devidmap.handle_to_id(params.input_device) as *const _;
        let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
            cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
        });

        // Map IPC handle back to cubeb_devid.
        let output_device = self.devidmap.handle_to_id(params.output_device) as *const _;
        let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
            cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
        });

        // TODO: Manage stream latency requests with respect to the RT deadlines the callback_thread was configured for.
        fn round_up_pow2(v: u32) -> u32 {
            debug_assert!(v >= 1);
            1 << (32 - (v - 1).leading_zeros())
        }
        let rate = params
            .output_stream_params
            .map(|p| p.rate)
            .unwrap_or_else(|| params.input_stream_params.map(|p| p.rate).unwrap());
        // Note: minimum latency supported by AudioIPC is currently ~5ms.  This restriction may be reduced by later IPC improvements.
        let min_latency = round_up_pow2(5 * rate / 1000);
        // Note: maximum latency is restricted by the SharedMem size.
        let max_latency = rate;
        let latency = params.latency_frames.clamp(min_latency, max_latency);
        trace!(
            "stream rate={} latency requested={} calculated={}",
            rate,
            params.latency_frames,
            latency
        );

        let server_stream = &mut self.streams[stm_tok];
        assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
        let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;

        let stream = unsafe {
            let stream = context.stream_init(
                stream_name,
                input_device,
                input_stream_params,
                output_device,
                output_stream_params,
                latency,
                Some(data_cb_c),
                Some(state_cb_c),
                user_ptr,
            );
            match stream {
                Ok(stream) => stream,
                Err(e) => {
                    debug!("Unregistering stream {:?} (stream error {:?})", stm_tok, e);
                    self.streams.remove(stm_tok);
                    return Err(e.into());
                }
            }
        };

        server_stream.stream = Some(stream);

        let client_pipe = server_stream
            .client_pipe
            .take()
            .expect("invalid state after StreamCreated");
        server_stream.cbs.connected.store(true, Ordering::Release);
        Ok(ClientMessage::StreamInitialized(SerializableHandle::new(
            client_pipe,
            self.remote_pid.unwrap(),
        )))
    }
}

// C callable callbacks
unsafe extern "C" fn data_cb_c(
    _: *mut ffi::cubeb_stream,
    user_ptr: *mut c_void,
    input_buffer: *const c_void,
    output_buffer: *mut c_void,
    nframes: c_long,
) -> c_long {
    let ok = panic::catch_unwind(|| {
        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
        let input = if input_buffer.is_null() {
            &[]
        } else {
            let nbytes = nframes * c_long::from(cbs.input_frame_size);
            slice::from_raw_parts(input_buffer as *const u8, nbytes as usize)
        };
        let output: &mut [u8] = if output_buffer.is_null() {
            &mut []
        } else {
            let nbytes = nframes * c_long::from(cbs.output_frame_size);
            slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize)
        };
        cbs.data_callback(input, output, nframes as isize) as c_long
    });
    ok.unwrap_or(cubeb::ffi::CUBEB_ERROR as c_long)
}

unsafe extern "C" fn state_cb_c(
    _: *mut ffi::cubeb_stream,
    user_ptr: *mut c_void,
    state: ffi::cubeb_state,
) {
    let ok = panic::catch_unwind(|| {
        let state = cubeb::State::from(state);
        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
        cbs.state_callback(state);
    });
    ok.expect("State callback panicked");
}

unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) {
    let ok = panic::catch_unwind(|| {
        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
        cbs.device_change_callback();
    });
    ok.expect("Device change callback panicked");
}

unsafe extern "C" fn device_collection_changed_input_cb_c(
    _: *mut ffi::cubeb,
    user_ptr: *mut c_void,
) {
    let ok = panic::catch_unwind(|| {
        let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
        manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT);
    });
    ok.expect("Collection changed (input) callback panicked");
}

unsafe extern "C" fn device_collection_changed_output_cb_c(
    _: *mut ffi::cubeb,
    user_ptr: *mut c_void,
) {
    let ok = panic::catch_unwind(|| {
        let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
        manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT);
    });
    ok.expect("Collection changed (output) callback panicked");
}

[ Dauer der Verarbeitung: 0.31 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge