Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/third_party/rust/audioipc2-server/src/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 34 kB image not shown  

Quelle  server.rs   Sprache: unbekannt

 
// Copyright © 2017 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details

#[cfg(target_os = "linux")]
use audio_thread_priority::{promote_thread_to_real_time, RtPriorityThreadInfo};
use audioipc::messages::SerializableHandle;
use audioipc::messages::{
    CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp,
    DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams,
    StreamInitParams, StreamParams,
};
use audioipc::shm::SharedMem;
use audioipc::{ipccore, rpccore, sys, PlatformHandle};
use cubeb::InputProcessingParams;
use cubeb_core as cubeb;
use cubeb_core::ffi;
use std::convert::TryInto;
use std::ffi::CStr;
use std::mem::size_of;
use std::os::raw::{c_long, c_void};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{cell::RefCell, sync::Mutex};
use std::{panic, slice};

use crate::errors::*;

fn error(error: cubeb::Error) -> ClientMessage {
    ClientMessage::Error(error.raw_code())
}

struct CubebDeviceCollectionManager {
    servers: Mutex<Vec<(Rc<DeviceCollectionChangeCallback>, cubeb::DeviceType)>>,
}

impl CubebDeviceCollectionManager {
    fn new() -> CubebDeviceCollectionManager {
        CubebDeviceCollectionManager {
            servers: Mutex::new(Vec::new()),
        }
    }

    fn register(
        &self,
        context: &cubeb::Context,
        server: &Rc<DeviceCollectionChangeCallback>,
        devtype: cubeb::DeviceType,
    ) -> cubeb::Result<()> {
        let mut servers = self.servers.lock().unwrap();
        if servers.is_empty() {
            self.internal_register(context, true)?;
        }
        servers.push((server.clone(), devtype));
        Ok(())
    }

    fn unregister(
        &self,
        context: &cubeb::Context,
        server: &Rc<DeviceCollectionChangeCallback>,
        devtype: cubeb::DeviceType,
    ) -> cubeb::Result<()> {
        let mut servers = self.servers.lock().unwrap();
        servers.retain(|(s, d)| !Rc::ptr_eq(s, server) || d != &devtype);
        if servers.is_empty() {
            self.internal_register(context, false)?;
        }
        Ok(())
    }

    fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> {
        for &(dir, cb) in &[
            (
                cubeb::DeviceType::INPUT,
                device_collection_changed_input_cb_c as _,
            ),
            (
                cubeb::DeviceType::OUTPUT,
                device_collection_changed_output_cb_c as _,
            ),
        ] {
            unsafe {
                context.register_device_collection_changed(
                    dir,
                    if enable { Some(cb) } else { None },
                    if enable {
                        self as *const CubebDeviceCollectionManager as *mut c_void
                    } else {
                        std::ptr::null_mut()
                    },
                )?;
            }
        }
        Ok(())
    }

    unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
        let servers = self.servers.lock().unwrap();
        servers.iter().for_each(|(s, d)| {
            if d.contains(cubeb::DeviceType::from_bits_truncate(device_type)) {
                s.device_collection_changed_callback(device_type)
            }
        });
    }
}

impl Drop for CubebDeviceCollectionManager {
    fn drop(&mut self) {
        assert!(self.servers.lock().unwrap().is_empty());
    }
}

struct DevIdMap {
    devices: Vec<usize>,
}

// A cubeb_devid is an opaque type which may be implemented with a stable
// pointer in a cubeb backend.  cubeb_devids received remotely must be
// validated before use, so DevIdMap provides a simple 1:1 mapping between a
// cubeb_devid and an IPC-transportable value suitable for use as a unique
// handle.
impl DevIdMap {
    fn new() -> DevIdMap {
        let mut d = DevIdMap {
            devices: Vec::with_capacity(32),
        };
        // A null cubeb_devid is used for selecting the default device.
        // Pre-populate the mapping with 0 -> 0 to handle nulls.
        d.devices.push(0);
        d
    }

    // Given a cubeb_devid, return a unique stable value suitable for use
    // over IPC.
    fn make_handle(&mut self, devid: usize) -> usize {
        if let Some(i) = self.devices.iter().position(|&d| d == devid) {
            return i;
        }
        self.devices.push(devid);
        self.devices.len() - 1
    }

    // Given a handle produced by `make_handle`, return the associated
    // cubeb_devid.  Invalid handles result in a panic.
    fn handle_to_id(&self, handle: usize) -> usize {
        self.devices[handle]
    }
}

struct CubebContextState {
    // `manager` must be dropped before the `context` is destroyed.
    manager: CubebDeviceCollectionManager,
    context: cubeb::Result<cubeb::Context>,
}

thread_local!(static CONTEXT_KEY: RefCell<Option<CubebContextState>> = const { RefCell::new(None) });

fn cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context> {
    let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
    let context_name = Some(params.context_name.as_c_str());
    let backend_name = params.backend_name.as_deref();
    let r = cubeb::Context::init(context_name, backend_name);
    r.inspect_err(|e| {
        info!("cubeb::Context::init failed r={:?}", e);
    })
}

fn with_local_context<T, F>(f: F) -> T
where
    F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,
{
    CONTEXT_KEY.with(|k| {
        let mut state = k.borrow_mut();
        if state.is_none() {
            *state = Some(CubebContextState {
                manager: CubebDeviceCollectionManager::new(),
                context: cubeb_init_from_context_params(),
            });
        }
        let CubebContextState { manager, context } = state.as_mut().unwrap();
        // Always reattempt to initialize cubeb, OS config may have changed.
        if context.is_err() {
            *context = cubeb_init_from_context_params();
        }
        f(context, manager)
    })
}

struct DeviceCollectionClient;

impl rpccore::Client for DeviceCollectionClient {
    type ServerMessage = DeviceCollectionReq;
    type ClientMessage = DeviceCollectionResp;
}

struct CallbackClient;

impl rpccore::Client for CallbackClient {
    type ServerMessage = CallbackReq;
    type ClientMessage = CallbackResp;
}

struct ServerStreamCallbacks {
    /// Size of input frame in bytes
    input_frame_size: u16,
    /// Size of output frame in bytes
    output_frame_size: u16,
    /// Shared memory buffer for transporting audio data to/from client
    shm: SharedMem,
    /// RPC interface for data_callback (on OS audio thread) to server callback thread
    data_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
    /// RPC interface for state_callback (on any thread) to server callback thread
    state_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
    /// RPC interface for device_change_callback (on any thread) to server callback thread
    device_change_callback_rpc: rpccore::Proxy<CallbackReq, CallbackResp>,
    /// Indicates stream is connected to client side.  Callbacks received before
    /// the stream is in the connected state cannot be sent to the client side, so
    /// are logged and otherwise ignored.
    connected: AtomicBool,
}

impl ServerStreamCallbacks {
    fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize {
        trace!(
            "Stream data callback: {} {} {}",
            nframes,
            input.len(),
            output.len()
        );
        if !self.connected.load(Ordering::Acquire) {
            warn!("Stream data callback triggered before stream connected");
            return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
        }

        if self.input_frame_size != 0 {
            if input.len() > self.shm.get_size() {
                debug!(
                    "bad input size: input={} shm={}",
                    input.len(),
                    self.shm.get_size()
                );
                return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
            }
            unsafe {
                self.shm
                    .get_mut_slice(input.len())
                    .unwrap()
                    .copy_from_slice(input);
            }
        }

        if self.output_frame_size != 0 && output.len() > self.shm.get_size() {
            debug!(
                "bad output size: output={} shm={}",
                output.len(),
                self.shm.get_size()
            );
            return cubeb::ffi::CUBEB_ERROR.try_into().unwrap();
        }

        if nframes == 0 {
            // Optimization: skip the RPC call when there are no frames.
            return 0;
        }

        let r = self.data_callback_rpc.call(CallbackReq::Data {
            nframes,
            input_frame_size: self.input_frame_size as usize,
            output_frame_size: self.output_frame_size as usize,
        });

        match r {
            Ok(CallbackResp::Data(frames)) => {
                if frames >= 0 && self.output_frame_size != 0 {
                    let nbytes = frames as usize * self.output_frame_size as usize;
                    unsafe {
                        output[..nbytes].copy_from_slice(self.shm.get_slice(nbytes).unwrap());
                    }
                }
                frames
            }
            _ => {
                debug!("Unexpected message {:?} during data_callback", r);
                cubeb::ffi::CUBEB_ERROR.try_into().unwrap()
            }
        }
    }

    fn state_callback(&self, state: cubeb::State) {
        trace!("Stream state callback: {:?}", state);
        if !self.connected.load(Ordering::Acquire) {
            warn!("Stream state callback triggered before stream connected");
            return;
        }

        let r = self
            .state_callback_rpc
            .call(CallbackReq::State(state.into()));
        match r {
            Ok(CallbackResp::State) => {}
            _ => {
                debug!("Unexpected message {:?} during state callback", r);
            }
        }
    }

    fn device_change_callback(&self) {
        trace!("Stream device change callback");
        if !self.connected.load(Ordering::Acquire) {
            warn!("Stream device_change callback triggered before stream connected");
            return;
        }
        let r = self
            .device_change_callback_rpc
            .call(CallbackReq::DeviceChange);
        match r {
            Ok(CallbackResp::DeviceChange) => {}
            _ => {
                debug!("Unexpected message {:?} during device change callback", r);
            }
        }
    }
}

static SHM_ID: AtomicUsize = AtomicUsize::new(0);

// Generate a temporary shm_id fragment that is unique to the process.  This
// path is used temporarily to create a shm segment, which is then
// immediately deleted from the filesystem while retaining handles to the
// shm to be shared between the server and client.
fn get_shm_id() -> String {
    format!(
        "cubeb-shm-{}-{}",
        std::process::id(),
        SHM_ID.fetch_add(1, Ordering::SeqCst)
    )
}

struct ServerStream {
    stream: Option<cubeb::Stream>,
    cbs: Box<ServerStreamCallbacks>,
    client_pipe: Option<PlatformHandle>,
}

impl Drop for ServerStream {
    fn drop(&mut self) {
        // `stream` *must* be dropped before `cbs`.
        drop(self.stream.take());
    }
}

struct DeviceCollectionChangeCallback {
    rpc: rpccore::Proxy<DeviceCollectionReq, DeviceCollectionResp>,
}

impl DeviceCollectionChangeCallback {
    fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
        // TODO: Assert device_type is in devtype.
        debug!(
            "Sending device collection ({:?}) changed event",
            device_type
        );
        let _ = self
            .rpc
            .call(DeviceCollectionReq::DeviceChange(device_type));
    }
}

pub struct CubebServer {
    callback_thread: ipccore::EventLoopHandle,
    device_collection_thread: ipccore::EventLoopHandle,
    streams: slab::Slab<ServerStream>,
    remote_pid: Option<u32>,
    device_collection_change_callbacks: Option<Rc<DeviceCollectionChangeCallback>>,
    devidmap: DevIdMap,
    shm_area_size: usize,
}

impl Drop for CubebServer {
    fn drop(&mut self) {
        if let Some(device_collection_change_callbacks) = &self.device_collection_change_callbacks {
            debug!("CubebServer: dropped with device_collection_change_callbacks registered");
            CONTEXT_KEY.with(|k| {
                let mut state = k.borrow_mut();
                if let Some(CubebContextState {
                    manager,
                    context: Ok(context),
                }) = state.as_mut()
                {
                    for devtype in [cubeb::DeviceType::INPUT, cubeb::DeviceType::OUTPUT] {
                        let r = manager.unregister(
                            context,
                            device_collection_change_callbacks,
                            devtype,
                        );
                        if r.is_err() {
                            debug!("CubebServer: unregister failed: {:?}", r);
                        }
                    }
                }
            })
        }
    }
}

#[allow(unknown_lints)] // non_send_fields_in_send_ty is Nightly-only as of 2021-11-29.
#[allow(clippy::non_send_fields_in_send_ty)]
// XXX: required for server setup, verify this is safe.
unsafe impl Send for CubebServer {}

impl rpccore::Server for CubebServer {
    type ServerMessage = ServerMessage;
    type ClientMessage = ClientMessage;

    fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
        if let ServerMessage::ClientConnect(pid) = req {
            self.remote_pid = Some(pid);
        }
        with_local_context(|context, manager| match *context {
            Err(_) => error(cubeb::Error::error()),
            Ok(ref context) => self.process_msg(context, manager, &req),
        })
    }
}

// Debugging for BMO 1594216/1612044.
macro_rules! try_stream {
    ($self:expr, $stm_tok:expr) => {
        if $self.streams.contains($stm_tok) {
            $self.streams[$stm_tok]
                .stream
                .as_mut()
                .expect("uninitialized stream")
        } else {
            error!(
                "{}:{}:{} - Stream({}): invalid token",
                file!(),
                line!(),
                column!(),
                $stm_tok
            );
            return error(cubeb::Error::invalid_parameter());
        }
    };
}

impl CubebServer {
    pub fn new(
        callback_thread: ipccore::EventLoopHandle,
        device_collection_thread: ipccore::EventLoopHandle,
        shm_area_size: usize,
    ) -> Self {
        CubebServer {
            callback_thread,
            device_collection_thread,
            streams: slab::Slab::<ServerStream>::new(),
            remote_pid: None,
            device_collection_change_callbacks: None,
            devidmap: DevIdMap::new(),
            shm_area_size,
        }
    }

    // Process a request coming from the client.
    fn process_msg(
        &mut self,
        context: &cubeb::Context,
        manager: &mut CubebDeviceCollectionManager,
        msg: &ServerMessage,
    ) -> ClientMessage {
        let resp: ClientMessage = match *msg {
            ServerMessage::ClientConnect(_) => {
                // remote_pid is set before cubeb initialization, just verify here.
                assert!(self.remote_pid.is_some());
                ClientMessage::ClientConnected
            }

            ServerMessage::ClientDisconnect => {
                // TODO:
                //self.connection.client_disconnect();
                ClientMessage::ClientDisconnected
            }

            ServerMessage::ContextGetBackendId => {
                ClientMessage::ContextBackendId(context.backend_id().to_string())
            }

            ServerMessage::ContextGetMaxChannelCount => context
                .max_channel_count()
                .map(ClientMessage::ContextMaxChannelCount)
                .unwrap_or_else(error),

            ServerMessage::ContextGetMinLatency(ref params) => {
                let format = cubeb::SampleFormat::from(params.format);
                let layout = cubeb::ChannelLayout::from(params.layout);

                let params = cubeb::StreamParamsBuilder::new()
                    .format(format)
                    .rate(params.rate)
                    .channels(params.channels)
                    .layout(layout)
                    .take();

                context
                    .min_latency(¶ms)
                    .map(ClientMessage::ContextMinLatency)
                    .unwrap_or_else(error)
            }

            ServerMessage::ContextGetPreferredSampleRate => context
                .preferred_sample_rate()
                .map(ClientMessage::ContextPreferredSampleRate)
                .unwrap_or_else(error),

            ServerMessage::ContextGetSupportedInputProcessingParams => context
                .supported_input_processing_params()
                .map(|params| ClientMessage::ContextSupportedInputProcessingParams(params.bits()))
                .unwrap_or_else(error),

            ServerMessage::ContextGetDeviceEnumeration(device_type) => context
                .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
                .map(|devices| {
                    let v: Vec<DeviceInfo> = devices
                        .iter()
                        .map(|i| {
                            let mut tmp: DeviceInfo = i.as_ref().into();
                            // Replace each cubeb_devid with a unique handle suitable for IPC.
                            tmp.devid = self.devidmap.make_handle(tmp.devid);
                            tmp
                        })
                        .collect();
                    ClientMessage::ContextEnumeratedDevices(v)
                })
                .unwrap_or_else(error),

            ServerMessage::StreamCreate(ref params) => self
                .process_stream_create(params)
                .unwrap_or_else(|_| error(cubeb::Error::error())),

            ServerMessage::StreamInit(stm_tok, ref params) => self
                .process_stream_init(context, stm_tok, params)
                .unwrap_or_else(|_| error(cubeb::Error::error())),

            ServerMessage::StreamDestroy(stm_tok) => {
                if self.streams.contains(stm_tok) {
                    debug!("Unregistering stream {:?}", stm_tok);
                    self.streams.remove(stm_tok);
                } else {
                    // Debugging for BMO 1594216/1612044.
                    error!("StreamDestroy({}): invalid token", stm_tok);
                    return error(cubeb::Error::invalid_parameter());
                }
                ClientMessage::StreamDestroyed
            }

            ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok)
                .start()
                .map(|_| ClientMessage::StreamStarted)
                .unwrap_or_else(error),

            ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok)
                .stop()
                .map(|_| ClientMessage::StreamStopped)
                .unwrap_or_else(error),

            ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok)
                .position()
                .map(ClientMessage::StreamPosition)
                .unwrap_or_else(error),

            ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok)
                .latency()
                .map(ClientMessage::StreamLatency)
                .unwrap_or_else(error),

            ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok)
                .input_latency()
                .map(ClientMessage::StreamInputLatency)
                .unwrap_or_else(error),

            ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok)
                .set_volume(volume)
                .map(|_| ClientMessage::StreamVolumeSet)
                .unwrap_or_else(error),

            ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok)
                .set_name(name)
                .map(|_| ClientMessage::StreamNameSet)
                .unwrap_or_else(error),

            ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok)
                .current_device()
                .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
                .unwrap_or_else(error),

            ServerMessage::StreamSetInputMute(stm_tok, mute) => try_stream!(self, stm_tok)
                .set_input_mute(mute)
                .map(|_| ClientMessage::StreamInputMuteSet)
                .unwrap_or_else(error),

            ServerMessage::StreamSetInputProcessingParams(stm_tok, params) => {
                try_stream!(self, stm_tok)
                    .set_input_processing_params(InputProcessingParams::from_bits_truncate(params))
                    .map(|_| ClientMessage::StreamInputProcessingParamsSet)
                    .unwrap_or_else(error)
            }

            ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => {
                try_stream!(self, stm_tok)
                    .register_device_changed_callback(if enable {
                        Some(device_change_cb_c)
                    } else {
                        None
                    })
                    .map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
                    .unwrap_or_else(error)
            }

            ServerMessage::ContextSetupDeviceCollectionCallback => {
                let (server_pipe, client_pipe) = match sys::make_pipe_pair() {
                    Ok((server_pipe, client_pipe)) => (server_pipe, client_pipe),
                    Err(e) => {
                        debug!(
                            "ContextSetupDeviceCollectionCallback - make_pipe_pair failed: {:?}",
                            e
                        );
                        return error(cubeb::Error::error());
                    }
                };

                // TODO: this should bind the client_pipe and send the server_pipe to the remote, but
                //       additional work is required as it's not possible to convert a Windows sys::Pipe into a raw handle.
                // TODO: Use the rpc_thread instead of an extra device_collection_thread, but a reentrant bind_client
                //       is required to support that.
                let rpc = match self
                    .device_collection_thread
                    .bind_client::<DeviceCollectionClient>(server_pipe)
                {
                    Ok(rpc) => rpc,
                    Err(e) => {
                        debug!(
                            "ContextSetupDeviceCollectionCallback - bind_client: {:?}",
                            e
                        );
                        return error(cubeb::Error::error());
                    }
                };

                self.device_collection_change_callbacks =
                    Some(Rc::new(DeviceCollectionChangeCallback { rpc }));
                let fd = RegisterDeviceCollectionChanged {
                    platform_handle: SerializableHandle::new(client_pipe, self.remote_pid.unwrap()),
                };

                ClientMessage::ContextSetupDeviceCollectionCallback(fd)
            }

            ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self
                .process_register_device_collection_changed(
                    context,
                    manager,
                    cubeb::DeviceType::from_bits_truncate(device_type),
                    enable,
                )
                .unwrap_or_else(error),

            #[cfg(target_os = "linux")]
            ServerMessage::PromoteThreadToRealTime(thread_info) => {
                let info = RtPriorityThreadInfo::deserialize(thread_info);
                match promote_thread_to_real_time(info, 0, 48000) {
                    Ok(_) => {
                        info!("Promotion of content process thread to real-time OK");
                    }
                    Err(_) => {
                        warn!("Promotion of content process thread to real-time error");
                    }
                }
                ClientMessage::ThreadPromoted
            }
        };

        trace!("process_msg: req={:?}, resp={:?}", msg, resp);

        resp
    }

    fn process_register_device_collection_changed(
        &mut self,
        context: &cubeb::Context,
        manager: &mut CubebDeviceCollectionManager,
        devtype: cubeb::DeviceType,
        enable: bool,
    ) -> cubeb::Result<ClientMessage> {
        if devtype == cubeb::DeviceType::UNKNOWN {
            return Err(cubeb::Error::invalid_parameter());
        }

        assert!(self.device_collection_change_callbacks.is_some());
        let cbs = self.device_collection_change_callbacks.as_ref().unwrap();

        if enable {
            manager.register(context, cbs, devtype)
        } else {
            manager.unregister(context, cbs, devtype)
        }
        .map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged)
    }

    // Stream create is special, so it's been separated from process_msg.
    fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage> {
        fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
            params
                .map(|p| {
                    let format = p.format.into();
                    let sample_size = match format {
                        cubeb::SampleFormat::S16LE
                        | cubeb::SampleFormat::S16BE
                        | cubeb::SampleFormat::S16NE => 2,
                        cubeb::SampleFormat::Float32LE
                        | cubeb::SampleFormat::Float32BE
                        | cubeb::SampleFormat::Float32NE => 4,
                    };
                    let channel_count = p.channels as u16;
                    sample_size * channel_count
                })
                .unwrap_or(0u16)
        }

        // Create the callback handling struct which is attached the cubeb stream.
        let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
        let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());

        // Estimate a safe shmem size for this stream configuration.  If the server was configured with a fixed
        // shm_area_size override, use that instead.
        // TODO: Add a new cubeb API to query the precise buffer size required for a given stream config.
        // https://github.com/mozilla/audioipc-2/issues/124
        let shm_area_size = if self.shm_area_size == 0 {
            let frame_size = output_frame_size.max(input_frame_size) as u32;
            let in_rate = params.input_stream_params.map(|p| p.rate).unwrap_or(0);
            let out_rate = params.output_stream_params.map(|p| p.rate).unwrap_or(0);
            let rate = out_rate.max(in_rate);
            // 1s of audio, rounded up to the nearest 64kB.
            // Stream latency is capped at 1s in process_stream_init.
            (((rate * frame_size) + 0xffff) & !0xffff) as usize
        } else {
            self.shm_area_size
        };
        debug!("shm_area_size = {}", shm_area_size);

        let shm = SharedMem::new(&get_shm_id(), shm_area_size)?;
        let shm_handle = unsafe { shm.make_handle()? };

        let (server_pipe, client_pipe) = sys::make_pipe_pair()?;
        // TODO: this should bind the client_pipe and send the server_pipe to the remote, but
        //       additional work is required as it's not possible to convert a Windows sys::Pipe into a raw handle.
        let rpc = self
            .callback_thread
            .bind_client::<CallbackClient>(server_pipe)?;

        let cbs = Box::new(ServerStreamCallbacks {
            input_frame_size,
            output_frame_size,
            shm,
            state_callback_rpc: rpc.clone(),
            device_change_callback_rpc: rpc.clone(),
            data_callback_rpc: rpc,
            connected: AtomicBool::new(false),
        });

        let entry = self.streams.vacant_entry();
        let key = entry.key();
        debug!("Registering stream {:?}", key);

        entry.insert(ServerStream {
            stream: None,
            cbs,
            client_pipe: Some(client_pipe),
        });

        Ok(ClientMessage::StreamCreated(StreamCreate {
            token: key,
            shm_handle: SerializableHandle::new(shm_handle, self.remote_pid.unwrap()),
            shm_area_size,
        }))
    }

    // Stream init is special, so it's been separated from process_msg.
    fn process_stream_init(
        &mut self,
        context: &cubeb::Context,
        stm_tok: usize,
        params: &StreamInitParams,
    ) -> Result<ClientMessage> {
        // Create cubeb stream from params
        let stream_name = params
            .stream_name
            .as_ref()
            .and_then(|name| CStr::from_bytes_with_nul(name).ok());

        // Map IPC handle back to cubeb_devid.
        let input_device = self.devidmap.handle_to_id(params.input_device) as *const _;
        let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
            cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
        });

        // Map IPC handle back to cubeb_devid.
        let output_device = self.devidmap.handle_to_id(params.output_device) as *const _;
        let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
            cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
        });

        // TODO: Manage stream latency requests with respect to the RT deadlines the callback_thread was configured for.
        fn round_up_pow2(v: u32) -> u32 {
            debug_assert!(v >= 1);
            1 << (32 - (v - 1).leading_zeros())
        }
        let rate = params
            .output_stream_params
            .map(|p| p.rate)
            .unwrap_or_else(|| params.input_stream_params.map(|p| p.rate).unwrap());
        // Note: minimum latency supported by AudioIPC is currently ~5ms.  This restriction may be reduced by later IPC improvements.
        let min_latency = round_up_pow2(5 * rate / 1000);
        // Note: maximum latency is restricted by the SharedMem size.
        let max_latency = rate;
        let latency = params.latency_frames.clamp(min_latency, max_latency);
        trace!(
            "stream rate={} latency requested={} calculated={}",
            rate,
            params.latency_frames,
            latency
        );

        let server_stream = &mut self.streams[stm_tok];
        assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
        let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;

        let stream = unsafe {
            let stream = context.stream_init(
                stream_name,
                input_device,
                input_stream_params,
                output_device,
                output_stream_params,
                latency,
                Some(data_cb_c),
                Some(state_cb_c),
                user_ptr,
            );
            match stream {
                Ok(stream) => stream,
                Err(e) => {
                    debug!("Unregistering stream {:?} (stream error {:?})", stm_tok, e);
                    self.streams.remove(stm_tok);
                    return Err(e.into());
                }
            }
        };

        server_stream.stream = Some(stream);

        let client_pipe = server_stream
            .client_pipe
            .take()
            .expect("invalid state after StreamCreated");
        server_stream.cbs.connected.store(true, Ordering::Release);
        Ok(ClientMessage::StreamInitialized(SerializableHandle::new(
            client_pipe,
            self.remote_pid.unwrap(),
        )))
    }
}

// C callable callbacks
unsafe extern "C" fn data_cb_c(
    _: *mut ffi::cubeb_stream,
    user_ptr: *mut c_void,
    input_buffer: *const c_void,
    output_buffer: *mut c_void,
    nframes: c_long,
) -> c_long {
    let ok = panic::catch_unwind(|| {
        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
        let input = if input_buffer.is_null() {
            &[]
        } else {
            let nbytes = nframes * c_long::from(cbs.input_frame_size);
            slice::from_raw_parts(input_buffer as *const u8, nbytes as usize)
        };
        let output: &mut [u8] = if output_buffer.is_null() {
            &mut []
        } else {
            let nbytes = nframes * c_long::from(cbs.output_frame_size);
            slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize)
        };
        cbs.data_callback(input, output, nframes as isize) as c_long
    });
    ok.unwrap_or(cubeb::ffi::CUBEB_ERROR as c_long)
}

unsafe extern "C" fn state_cb_c(
    _: *mut ffi::cubeb_stream,
    user_ptr: *mut c_void,
    state: ffi::cubeb_state,
) {
    let ok = panic::catch_unwind(|| {
        let state = cubeb::State::from(state);
        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
        cbs.state_callback(state);
    });
    ok.expect("State callback panicked");
}

unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) {
    let ok = panic::catch_unwind(|| {
        let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
        cbs.device_change_callback();
    });
    ok.expect("Device change callback panicked");
}

unsafe extern "C" fn device_collection_changed_input_cb_c(
    _: *mut ffi::cubeb,
    user_ptr: *mut c_void,
) {
    let ok = panic::catch_unwind(|| {
        let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
        manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT);
    });
    ok.expect("Collection changed (input) callback panicked");
}

unsafe extern "C" fn device_collection_changed_output_cb_c(
    _: *mut ffi::cubeb,
    user_ptr: *mut c_void,
) {
    let ok = panic::catch_unwind(|| {
        let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
        manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT);
    });
    ok.expect("Collection changed (output) callback panicked");
}

[ Dauer der Verarbeitung: 0.37 Sekunden  (vorverarbeitet)  ]