Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


SSL mod.rs   Sprache: unbekannt

 
Spracherkennung für: .rs vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

// Copyright © 2018 Mozilla Foundation
//
// This program is made available under an ISC-style license.  See the
// accompanying file LICENSE for details.
#![allow(unused_assignments)]
#![allow(unused_must_use)]
extern crate coreaudio_sys_utils;
extern crate libc;
extern crate ringbuf;

mod aggregate_device;
mod auto_release;
mod buffer_manager;
mod device_property;
mod mixer;
mod resampler;
mod utils;

use self::aggregate_device::*;
use self::auto_release::*;
use self::buffer_manager::*;
use self::coreaudio_sys_utils::aggregate_device::*;
use self::coreaudio_sys_utils::audio_device_extensions::*;
use self::coreaudio_sys_utils::audio_object::*;
use self::coreaudio_sys_utils::audio_unit::*;
use self::coreaudio_sys_utils::cf_mutable_dict::*;
use self::coreaudio_sys_utils::dispatch::*;
use self::coreaudio_sys_utils::string::*;
use self::coreaudio_sys_utils::sys::*;
use self::device_property::*;
use self::mixer::*;
use self::resampler::*;
use self::utils::*;
use backend::ringbuf::RingBuffer;
#[cfg(feature = "audio-dump")]
use cubeb_backend::ffi::cubeb_audio_dump_stream_t;
use cubeb_backend::{
    ffi, ChannelLayout, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceRef, DeviceType,
    Error, InputProcessingParams, Ops, Result, SampleFormat, State, Stream, StreamOps,
    StreamParams, StreamParamsRef, StreamPrefs,
};
use mach::mach_time::{mach_absolute_time, mach_timebase_info};
use std::cmp;
use std::ffi::{CStr, CString};
use std::fmt;
use std::mem;
use std::os::raw::{c_uint, c_void};
use std::ptr;
use std::slice;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
use std::time::{Duration, Instant};
const NO_ERR: OSStatus = 0;

const AU_OUT_BUS: AudioUnitElement = 0;
const AU_IN_BUS: AudioUnitElement = 1;

const PRIVATE_AGGREGATE_DEVICE_NAME: &str = "CubebAggregateDevice";
const VOICEPROCESSING_AGGREGATE_DEVICE_NAME: &str = "VPAUAggregateAudioDevice";

const APPLE_STUDIO_DISPLAY_USB_ID: &str = "05AC:1114";

// Testing empirically, some headsets report a minimal latency that is very low,
// but this does not work in practice. Lie and say the minimum is 128 frames.
const SAFE_MIN_LATENCY_FRAMES: u32 = 128;
const SAFE_MAX_LATENCY_FRAMES: u32 = 512;

const VPIO_IDLE_TIMEOUT: Duration = Duration::from_secs(10);

const MACOS_KERNEL_MAJOR_VERSION_MONTEREY: u32 = 21;

#[derive(Debug, PartialEq)]
enum ParseMacOSKernelVersionError {
    SysCtl,
    Malformed,
    Parsing,
}

fn macos_kernel_major_version() -> std::result::Result<u32, ParseMacOSKernelVersionError> {
    let ver = whatsys::kernel_version();
    if ver.is_none() {
        return Err(ParseMacOSKernelVersionError::SysCtl);
    }
    let ver = ver.unwrap();
    let major = ver.split('.').next();
    if major.is_none() {
        return Err(ParseMacOSKernelVersionError::Malformed);
    }
    let parsed_major = u32::from_str(major.unwrap());
    if parsed_major.is_err() {
        return Err(ParseMacOSKernelVersionError::Parsing);
    }
    Ok(parsed_major.unwrap())
}

bitflags! {
    #[allow(non_camel_case_types)]
    #[derive(Clone, Debug, PartialEq, Copy)]
    struct device_flags: u32 {
        const DEV_UNKNOWN           = 0b0000_0000; // Unknown
        const DEV_INPUT             = 0b0000_0001; // Record device like mic
        const DEV_OUTPUT            = 0b0000_0010; // Playback device like speakers
        const DEV_SELECTED_DEFAULT  = 0b0000_0100; // User selected to use the system default device
    }
}

#[cfg(feature = "audio-dump")]
fn dump_audio(stream: cubeb_audio_dump_stream_t, audio_samples: *mut c_void, count: u32) {
    unsafe {
        let rv = ffi::cubeb_audio_dump_write(stream, audio_samples, count);
        if rv != 0 {
            cubeb_alog!("Error dumping audio data");
        }
    }
}

fn make_sized_audio_channel_layout(sz: usize) -> AutoRelease<AudioChannelLayout> {
    assert!(sz >= mem::size_of::<AudioChannelLayout>());
    assert_eq!(
        (sz - mem::size_of::<AudioChannelLayout>()) % mem::size_of::<AudioChannelDescription>(),
        0
    );
    let acl = unsafe { libc::calloc(1, sz) } as *mut AudioChannelLayout;

    unsafe extern "C" fn free_acl(acl: *mut AudioChannelLayout) {
        libc::free(acl as *mut libc::c_void);
    }

    AutoRelease::new(acl, free_acl)
}

#[allow(non_camel_case_types)]
#[derive(Clone, Debug)]
struct device_info {
    id: AudioDeviceID,
    flags: device_flags,
}

impl Default for device_info {
    fn default() -> Self {
        Self {
            id: kAudioObjectUnknown,
            flags: device_flags::DEV_UNKNOWN,
        }
    }
}

#[allow(non_camel_case_types)]
#[derive(Debug)]
struct device_property_listener {
    device: AudioDeviceID,
    property: AudioObjectPropertyAddress,
    listener: audio_object_property_listener_proc,
}

impl device_property_listener {
    fn new(
        device: AudioDeviceID,
        property: AudioObjectPropertyAddress,
        listener: audio_object_property_listener_proc,
    ) -> Self {
        Self {
            device,
            property,
            listener,
        }
    }
}

#[derive(Debug, PartialEq)]
struct CAChannelLabel(AudioChannelLabel);

impl From<CAChannelLabel> for mixer::Channel {
    fn from(label: CAChannelLabel) -> mixer::Channel {
        use self::coreaudio_sys_utils::sys;
        match label.0 {
            sys::kAudioChannelLabel_Left => mixer::Channel::FrontLeft,
            sys::kAudioChannelLabel_Right => mixer::Channel::FrontRight,
            sys::kAudioChannelLabel_Center | sys::kAudioChannelLabel_Mono => {
                mixer::Channel::FrontCenter
            }
            sys::kAudioChannelLabel_LFEScreen => mixer::Channel::LowFrequency,
            sys::kAudioChannelLabel_LeftSurround => mixer::Channel::BackLeft,
            sys::kAudioChannelLabel_RightSurround => mixer::Channel::BackRight,
            sys::kAudioChannelLabel_LeftCenter => mixer::Channel::FrontLeftOfCenter,
            sys::kAudioChannelLabel_RightCenter => mixer::Channel::FrontRightOfCenter,
            sys::kAudioChannelLabel_CenterSurround => mixer::Channel::BackCenter,
            sys::kAudioChannelLabel_LeftSurroundDirect => mixer::Channel::SideLeft,
            sys::kAudioChannelLabel_RightSurroundDirect => mixer::Channel::SideRight,
            sys::kAudioChannelLabel_TopCenterSurround => mixer::Channel::TopCenter,
            sys::kAudioChannelLabel_VerticalHeightLeft => mixer::Channel::TopFrontLeft,
            sys::kAudioChannelLabel_VerticalHeightCenter => mixer::Channel::TopFrontCenter,
            sys::kAudioChannelLabel_VerticalHeightRight => mixer::Channel::TopFrontRight,
            sys::kAudioChannelLabel_TopBackLeft => mixer::Channel::TopBackLeft,
            sys::kAudioChannelLabel_TopBackCenter => mixer::Channel::TopBackCenter,
            sys::kAudioChannelLabel_TopBackRight => mixer::Channel::TopBackRight,
            sys::kAudioChannelLabel_Unknown => mixer::Channel::Discrete,
            sys::kAudioChannelLabel_Unused => mixer::Channel::Silence,
            v => {
                eprintln!("Warning: channel label value {} isn't handled", v);
                mixer::Channel::Silence
            }
        }
    }
}

fn set_notification_runloop() {
    let address = AudioObjectPropertyAddress {
        mSelector: kAudioHardwarePropertyRunLoop,
        mScope: kAudioObjectPropertyScopeGlobal,
        mElement: kAudioObjectPropertyElementMaster,
    };

    // Ask HAL to manage its own thread for notification by setting the run_loop to NULL.
    // Otherwise HAL may use main thread to fire notifications.
    let run_loop: CFRunLoopRef = ptr::null_mut();
    let size = mem::size_of::<CFRunLoopRef>();
    let status =
        audio_object_set_property_data(kAudioObjectSystemObject, &address, size, &run_loop);
    if status != NO_ERR {
        cubeb_log!("Could not make global CoreAudio notifications use their own thread.");
    }
}

fn create_device_info(devid: AudioDeviceID, devtype: DeviceType) -> Option<device_info> {
    assert_ne!(devid, kAudioObjectSystemObject);
    debug_assert_running_serially();

    let mut flags = match devtype {
        DeviceType::INPUT => device_flags::DEV_INPUT,
        DeviceType::OUTPUT => device_flags::DEV_OUTPUT,
        _ => panic!("Only accept input or output type"),
    };

    if devid == kAudioObjectUnknown {
        cubeb_log!("Using the system default device");
        flags |= device_flags::DEV_SELECTED_DEFAULT;
        get_default_device(devtype).map(|id| device_info { id, flags })
    } else {
        Some(device_info { id: devid, flags })
    }
}

fn create_stream_description(stream_params: &StreamParams) -> Result<AudioStreamBasicDescription> {
    assert!(stream_params.rate() > 0);
    assert!(stream_params.channels() > 0);

    let mut desc = AudioStreamBasicDescription::default();

    match stream_params.format() {
        SampleFormat::S16LE => {
            desc.mBitsPerChannel = 16;
            desc.mFormatFlags = kAudioFormatFlagIsSignedInteger;
        }
        SampleFormat::S16BE => {
            desc.mBitsPerChannel = 16;
            desc.mFormatFlags = kAudioFormatFlagIsSignedInteger | kAudioFormatFlagIsBigEndian;
        }
        SampleFormat::Float32LE => {
            desc.mBitsPerChannel = 32;
            desc.mFormatFlags = kAudioFormatFlagIsFloat;
        }
        SampleFormat::Float32BE => {
            desc.mBitsPerChannel = 32;
            desc.mFormatFlags = kAudioFormatFlagIsFloat | kAudioFormatFlagIsBigEndian;
        }
        _ => {
            return Err(Error::invalid_format());
        }
    }

    desc.mFormatID = kAudioFormatLinearPCM;
    desc.mFormatFlags |= kLinearPCMFormatFlagIsPacked;
    desc.mSampleRate = f64::from(stream_params.rate());
    desc.mChannelsPerFrame = stream_params.channels();

    desc.mBytesPerFrame = (desc.mBitsPerChannel / 8) * desc.mChannelsPerFrame;
    desc.mFramesPerPacket = 1;
    desc.mBytesPerPacket = desc.mBytesPerFrame * desc.mFramesPerPacket;

    desc.mReserved = 0;

    Ok(desc)
}

fn set_volume(unit: AudioUnit, volume: f32) -> Result<()> {
    assert!(!unit.is_null());
    let r = audio_unit_set_parameter(
        unit,
        kHALOutputParam_Volume,
        kAudioUnitScope_Global,
        0,
        volume,
        0,
    );
    if r == NO_ERR {
        Ok(())
    } else {
        cubeb_log!("AudioUnitSetParameter/kHALOutputParam_Volume rv={}", r);
        Err(Error::error())
    }
}

fn get_volume(unit: AudioUnit) -> Result<f32> {
    assert!(!unit.is_null());
    let mut volume: f32 = 0.0;
    let r = audio_unit_get_parameter(
        unit,
        kHALOutputParam_Volume,
        kAudioUnitScope_Global,
        0,
        &mut volume,
    );
    if r == NO_ERR {
        Ok(volume)
    } else {
        cubeb_log!("AudioUnitGetParameter/kHALOutputParam_Volume rv={}", r);
        Err(Error::error())
    }
}

fn set_input_mute(unit: AudioUnit, mute: bool) -> Result<()> {
    assert!(!unit.is_null());
    let mute: u32 = mute.into();
    let mut old_mute: u32 = 0;
    let r = audio_unit_get_property(
        unit,
        kAUVoiceIOProperty_MuteOutput,
        kAudioUnitScope_Global,
        AU_IN_BUS,
        &mut old_mute,
        &mut mem::size_of::<u32>(),
    );
    if r != NO_ERR {
        cubeb_log!(
            "AudioUnitGetProperty/kAUVoiceIOProperty_MuteOutput rv={}",
            r
        );
        return Err(Error::error());
    }
    if old_mute == mute {
        return Ok(());
    }
    let r = audio_unit_set_property(
        unit,
        kAUVoiceIOProperty_MuteOutput,
        kAudioUnitScope_Global,
        AU_IN_BUS,
        &mute,
        mem::size_of::<u32>(),
    );
    if r == NO_ERR {
        Ok(())
    } else {
        cubeb_log!(
            "AudioUnitSetProperty/kAUVoiceIOProperty_MuteOutput rv={}",
            r
        );
        Err(Error::error())
    }
}

fn set_input_processing_params(unit: AudioUnit, params: InputProcessingParams) -> Result<()> {
    assert!(!unit.is_null());
    let aec = params.contains(InputProcessingParams::ECHO_CANCELLATION);
    let ns = params.contains(InputProcessingParams::NOISE_SUPPRESSION);
    let agc = params.contains(InputProcessingParams::AUTOMATIC_GAIN_CONTROL);
    assert_eq!(aec, ns);

    let mut old_agc: u32 = 0;
    let r = audio_unit_get_property(
        unit,
        kAUVoiceIOProperty_VoiceProcessingEnableAGC,
        kAudioUnitScope_Global,
        AU_IN_BUS,
        &mut old_agc,
        &mut mem::size_of::<u32>(),
    );
    if r != NO_ERR {
        cubeb_log!(
            "AudioUnitGetProperty/kAUVoiceIOProperty_VoiceProcessingEnableAGC rv={}",
            r
        );
        return Err(Error::error());
    }

    if (old_agc == 1) != agc {
        let agc = u32::from(agc);
        let r = audio_unit_set_property(
            unit,
            kAUVoiceIOProperty_VoiceProcessingEnableAGC,
            kAudioUnitScope_Global,
            AU_IN_BUS,
            &agc,
            mem::size_of::<u32>(),
        );
        if r != NO_ERR {
            cubeb_log!(
                "AudioUnitSetProperty/kAUVoiceIOProperty_VoiceProcessingEnableAGC rv={}",
                r
            );
            return Err(Error::error());
        }
        cubeb_log!(
            "set_input_processing_params on unit {:p} - set agc: {}",
            unit,
            agc
        );
    }

    let mut old_bypass: u32 = 0;
    let r = audio_unit_get_property(
        unit,
        kAUVoiceIOProperty_BypassVoiceProcessing,
        kAudioUnitScope_Global,
        AU_IN_BUS,
        &mut old_bypass,
        &mut mem::size_of::<u32>(),
    );
    if r != NO_ERR {
        cubeb_log!(
            "AudioUnitGetProperty/kAUVoiceIOProperty_BypassVoiceProcessing rv={}",
            r
        );
        return Err(Error::error());
    }

    let bypass = u32::from(!aec);
    if old_bypass != bypass {
        let r = audio_unit_set_property(
            unit,
            kAUVoiceIOProperty_BypassVoiceProcessing,
            kAudioUnitScope_Global,
            AU_IN_BUS,
            &bypass,
            mem::size_of::<u32>(),
        );
        if r != NO_ERR {
            cubeb_log!(
                "AudioUnitSetProperty/kAUVoiceIOProperty_BypassVoiceProcessing rv={}",
                r
            );
            return Err(Error::error());
        }
        cubeb_log!(
            "set_input_processing_params on unit {:p} - set bypass: {}",
            unit,
            bypass
        );
    }

    Ok(())
}

fn minimum_resampling_input_frames(
    input_rate: f64,
    output_rate: f64,
    output_frames: usize,
) -> usize {
    assert!(!approx_eq!(f64, input_rate, 0_f64));
    assert!(!approx_eq!(f64, output_rate, 0_f64));
    if approx_eq!(f64, input_rate, output_rate) {
        return output_frames;
    }
    (input_rate * output_frames as f64 / output_rate).ceil() as usize
}

fn audiounit_make_silent(io_data: &AudioBuffer) {
    assert!(!io_data.mData.is_null());
    let bytes = unsafe {
        let ptr = io_data.mData as *mut u8;
        let len = io_data.mDataByteSize as usize;
        slice::from_raw_parts_mut(ptr, len)
    };
    for data in bytes.iter_mut() {
        *data = 0;
    }
}

extern "C" fn audiounit_input_callback(
    user_ptr: *mut c_void,
    flags: *mut AudioUnitRenderActionFlags,
    tstamp: *const AudioTimeStamp,
    bus: u32,
    input_frames: u32,
    _: *mut AudioBufferList,
) -> OSStatus {
    enum ErrorHandle {
        Return(OSStatus),
        Reinit,
    }

    assert!(input_frames > 0);
    assert_eq!(bus, AU_IN_BUS);

    assert!(!user_ptr.is_null());
    let stm = unsafe { &mut *(user_ptr as *mut AudioUnitStream) };

    if unsafe { *flags | kAudioTimeStampHostTimeValid } != 0 {
        let now = unsafe { mach_absolute_time() };
        let input_latency_frames = compute_input_latency(stm, unsafe { (*tstamp).mHostTime }, now);
        stm.total_input_latency_frames
            .store(input_latency_frames, Ordering::SeqCst);
    }

    if stm.stopped.load(Ordering::SeqCst) {
        cubeb_log!("({:p}) input stopped", stm as *const AudioUnitStream);
        return NO_ERR;
    }

    let handler = |stm: &mut AudioUnitStream,
                   flags: *mut AudioUnitRenderActionFlags,
                   tstamp: *const AudioTimeStamp,
                   bus: u32,
                   input_frames: u32|
     -> ErrorHandle {
        let input_buffer_manager = stm.core_stream_data.input_buffer_manager.as_mut().unwrap();
        assert_eq!(
            stm.core_stream_data.stm_ptr,
            user_ptr as *const AudioUnitStream
        );

        // `flags` and `tstamp` must be non-null so they can be casted into the references.
        assert!(!flags.is_null());
        let flags = unsafe { &mut (*flags) };
        assert!(!tstamp.is_null());
        let tstamp = unsafe { &(*tstamp) };

        // Create the AudioBufferList to store input.
        let mut input_buffer_list = AudioBufferList::default();
        input_buffer_list.mBuffers[0].mDataByteSize =
            stm.core_stream_data.input_dev_desc.mBytesPerFrame * input_frames;
        input_buffer_list.mBuffers[0].mData = ptr::null_mut();
        input_buffer_list.mBuffers[0].mNumberChannels =
            stm.core_stream_data.input_dev_desc.mChannelsPerFrame;
        input_buffer_list.mNumberBuffers = 1;

        debug_assert!(!stm.core_stream_data.input_unit.is_null());
        let status = audio_unit_render(
            stm.core_stream_data.input_unit,
            flags,
            tstamp,
            bus,
            input_frames,
            &mut input_buffer_list,
        );
        if (status != NO_ERR)
            && (status != kAudioUnitErr_CannotDoInCurrentContext
                || stm.core_stream_data.output_unit.is_null())
        {
            return ErrorHandle::Return(status);
        }
        let handle = if status == kAudioUnitErr_CannotDoInCurrentContext {
            assert!(!stm.core_stream_data.output_unit.is_null());
            // kAudioUnitErr_CannotDoInCurrentContext is returned when using a BT
            // headset and the profile is changed from A2DP to HFP/HSP. The previous
            // output device is no longer valid and must be reset.
            // For now state that no error occurred and feed silence, stream will be
            // resumed once reinit has completed.
            ErrorHandle::Reinit
        } else {
            assert_eq!(status, NO_ERR);

            #[cfg(feature = "audio-dump")]
            {
                dump_audio(
                    stm.core_stream_data.audio_dump_input,
                    input_buffer_list.mBuffers[0].mData,
                    input_frames * stm.core_stream_data.input_dev_desc.mChannelsPerFrame,
                );
            }

            input_buffer_manager
                .push_data(input_buffer_list.mBuffers[0].mData, input_frames as usize);
            ErrorHandle::Return(status)
        };

        // Full Duplex. We'll call data_callback in the AudioUnit output callback. Record this
        // callback for logging.
        if !stm.core_stream_data.output_unit.is_null() {
            let input_callback_data = InputCallbackData {
                bytes: input_buffer_list.mBuffers[0].mDataByteSize,
                rendered_frames: input_frames,
                total_available: input_buffer_manager.available_frames(),
                channels: input_buffer_list.mBuffers[0].mNumberChannels,
                num_buf: input_buffer_list.mNumberBuffers,
            };
            stm.core_stream_data
                .input_logging
                .as_mut()
                .unwrap()
                .push(input_callback_data);
            return handle;
        }

        cubeb_alogv!(
            "({:p}) input: buffers {}, size {}, channels {}, rendered frames {}, total frames {}.",
            stm.core_stream_data.stm_ptr,
            input_buffer_list.mNumberBuffers,
            input_buffer_list.mBuffers[0].mDataByteSize,
            input_buffer_list.mBuffers[0].mNumberChannels,
            input_frames,
            input_buffer_manager.available_frames()
        );

        // Input only. Call the user callback through resampler.
        // Resampler will deliver input buffer in the correct rate.
        assert!(input_frames as usize <= input_buffer_manager.available_frames());
        stm.frames_read.fetch_add(
            input_buffer_manager.available_frames(),
            atomic::Ordering::SeqCst,
        );
        let mut total_input_frames = input_buffer_manager.available_frames() as i64;
        let input_buffer =
            input_buffer_manager.get_linear_data(input_buffer_manager.available_frames());
        let outframes = stm.core_stream_data.resampler.fill(
            input_buffer,
            &mut total_input_frames,
            ptr::null_mut(),
            0,
        );
        if outframes < 0 {
            if !stm.stopped.swap(true, Ordering::SeqCst) {
                stm.notify_state_changed(State::Error);
                // Use a new thread, through the queue, to avoid deadlock when calling
                // AudioOutputUnitStop method from inside render callback
                stm.queue.clone().run_async(move || {
                    stm.core_stream_data.stop_audiounits();
                });
            }
            return ErrorHandle::Return(status);
        }
        if outframes < total_input_frames {
            stm.draining.store(true, Ordering::SeqCst);
        }

        handle
    };

    // If the stream is drained, do nothing.
    let handle = if !stm.draining.load(Ordering::SeqCst) {
        handler(stm, flags, tstamp, bus, input_frames)
    } else {
        ErrorHandle::Return(NO_ERR)
    };

    // If the input (input-only stream) is drained, cancel this callback. Whenever an output
    // is involved, the output callback handles stopping all units and notifying of state.
    if stm.core_stream_data.output_unit.is_null()
        && stm.draining.load(Ordering::SeqCst)
        && !stm.stopped.swap(true, Ordering::SeqCst)
    {
        cubeb_alog!("({:p}) Input-only drained.", stm as *const AudioUnitStream);
        stm.notify_state_changed(State::Drained);
        // Use a new thread, through the queue, to avoid deadlock when calling
        // AudioOutputUnitStop method from inside render callback
        let stm_ptr = user_ptr as usize;
        stm.queue.clone().run_async(move || {
            let stm = unsafe { &mut *(stm_ptr as *mut AudioUnitStream) };
            stm.core_stream_data.stop_audiounits();
        });
    }

    match handle {
        ErrorHandle::Reinit => {
            stm.reinit_async();
            NO_ERR
        }
        ErrorHandle::Return(s) => s,
    }
}

fn host_time_to_ns(ctx: &AudioUnitContext, host_time: u64) -> u64 {
    let mut rv: f64 = host_time as f64;
    rv *= ctx.host_time_to_ns_ratio.0 as f64;
    rv /= ctx.host_time_to_ns_ratio.1 as f64;
    rv as u64
}

fn compute_output_latency(stm: &AudioUnitStream, audio_output_time: u64, now: u64) -> u32 {
    const NS2S: u64 = 1_000_000_000;
    let output_hw_rate = stm.core_stream_data.output_dev_desc.mSampleRate as u64;
    let fixed_latency_ns =
        (stm.output_device_latency_frames.load(Ordering::SeqCst) as u64 * NS2S) / output_hw_rate;
    // The total output latency is the timestamp difference + the stream latency + the hardware
    // latency.
    let total_output_latency_ns =
        fixed_latency_ns + host_time_to_ns(stm.context, audio_output_time.saturating_sub(now));

    (total_output_latency_ns * output_hw_rate / NS2S) as u32
}

fn compute_input_latency(stm: &AudioUnitStream, audio_input_time: u64, now: u64) -> u32 {
    const NS2S: u64 = 1_000_000_000;
    let input_hw_rate = stm.core_stream_data.input_dev_desc.mSampleRate as u64;
    let fixed_latency_ns =
        (stm.input_device_latency_frames.load(Ordering::SeqCst) as u64 * NS2S) / input_hw_rate;
    // The total input latency is the timestamp difference + the stream latency +
    // the hardware latency.
    let total_input_latency_ns =
        host_time_to_ns(stm.context, now.saturating_sub(audio_input_time)) + fixed_latency_ns;

    (total_input_latency_ns * input_hw_rate / NS2S) as u32
}

extern "C" fn audiounit_output_callback(
    user_ptr: *mut c_void,
    flags: *mut AudioUnitRenderActionFlags,
    tstamp: *const AudioTimeStamp,
    bus: u32,
    output_frames: u32,
    out_buffer_list: *mut AudioBufferList,
) -> OSStatus {
    assert_eq!(bus, AU_OUT_BUS);
    assert!(!out_buffer_list.is_null());

    assert!(!user_ptr.is_null());
    let stm = unsafe { &mut *(user_ptr as *mut AudioUnitStream) };

    if output_frames == 0 {
        cubeb_alog!(
            "({:p}) output callback empty.",
            stm as *const AudioUnitStream
        );
        return NO_ERR;
    }

    let out_buffer_list_ref = unsafe { &mut (*out_buffer_list) };
    assert_eq!(out_buffer_list_ref.mNumberBuffers, 1);
    let buffers = unsafe {
        let ptr = out_buffer_list_ref.mBuffers.as_mut_ptr();
        let len = out_buffer_list_ref.mNumberBuffers as usize;
        slice::from_raw_parts_mut(ptr, len)
    };

    if stm.stopped.load(Ordering::SeqCst) {
        cubeb_alog!("({:p}) output stopped.", stm as *const AudioUnitStream);
        audiounit_make_silent(&buffers[0]);
        #[cfg(feature = "audio-dump")]
        {
            dump_audio(
                stm.core_stream_data.audio_dump_output,
                buffers[0].mData,
                output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
            );
        }
        return NO_ERR;
    }

    if stm.draining.load(Ordering::SeqCst) {
        // Cancel all callbacks. For input-only streams, the input callback handles
        // cancelling itself.
        audiounit_make_silent(&buffers[0]);
        #[cfg(feature = "audio-dump")]
        {
            dump_audio(
                stm.core_stream_data.audio_dump_output,
                buffers[0].mData,
                output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
            );
        }
        if !stm.stopped.swap(true, Ordering::SeqCst) {
            cubeb_alog!("({:p}) output drained.", stm as *const AudioUnitStream);
            stm.notify_state_changed(State::Drained);
            // Use a new thread, through the queue, to avoid deadlock when calling
            // AudioOutputUnitStop method from inside render callback
            stm.queue.clone().run_async(move || {
                stm.core_stream_data.stop_audiounits();
            });
        }
        return NO_ERR;
    }

    let now = unsafe { mach_absolute_time() };

    if unsafe { *flags | kAudioTimeStampHostTimeValid } != 0 {
        let output_latency_frames =
            compute_output_latency(stm, unsafe { (*tstamp).mHostTime }, now);
        stm.total_output_latency_frames
            .store(output_latency_frames, Ordering::SeqCst);
    }
    // Get output buffer
    let output_buffer = match stm.core_stream_data.mixer.as_mut() {
        None => buffers[0].mData,
        Some(mixer) => {
            // If remixing needs to occur, we can't directly work in our final
            // destination buffer as data may be overwritten or too small to start with.
            mixer.update_buffer_size(output_frames as usize);
            mixer.get_buffer_mut_ptr() as *mut c_void
        }
    };

    let prev_frames_written = stm.frames_written.load(Ordering::SeqCst);

    stm.frames_written
        .fetch_add(output_frames as usize, Ordering::SeqCst);

    // Also get the input buffer if the stream is duplex
    let (input_buffer, mut input_frames) = if !stm.core_stream_data.input_unit.is_null() {
        let input_logging = &mut stm.core_stream_data.input_logging.as_mut().unwrap();
        if input_logging.is_empty() {
            cubeb_alogv!("no audio input data in output callback");
        } else {
            while let Some(input_callback_data) = input_logging.pop() {
                cubeb_alogv!(
                    "input: buffers {}, size {}, channels {}, rendered frames {}, total frames {}.",
                    input_callback_data.num_buf,
                    input_callback_data.bytes,
                    input_callback_data.channels,
                    input_callback_data.rendered_frames,
                    input_callback_data.total_available
                );
            }
        }
        let input_buffer_manager = stm.core_stream_data.input_buffer_manager.as_mut().unwrap();
        assert_ne!(stm.core_stream_data.input_dev_desc.mChannelsPerFrame, 0);
        // If the output callback came first and this is a duplex stream, we need to
        // fill in some additional silence in the resampler.
        // Otherwise, if we had more than expected callbacks in a row, or we're
        // currently switching, we add some silence as well to compensate for the
        // fact that we're lacking some input data.
        let input_frames_needed = minimum_resampling_input_frames(
            stm.core_stream_data.input_dev_desc.mSampleRate,
            f64::from(stm.core_stream_data.output_stream_params.rate()),
            output_frames as usize,
        );
        let buffered_input_frames = input_buffer_manager.available_frames();
        // Else if the input has buffered a lot already because the output started late, we
        // need to trim the input buffer
        if prev_frames_written == 0 && buffered_input_frames > input_frames_needed {
            input_buffer_manager.trim(input_frames_needed);
            let popped_frames = buffered_input_frames - input_frames_needed;
            cubeb_alog!("Dropping {} frames in input buffer.", popped_frames);
        }

        let input_frames = if input_frames_needed > buffered_input_frames
            && (stm.switching_device.load(Ordering::SeqCst)
                || stm.reinit_pending.load(Ordering::SeqCst)
                || stm.frames_read.load(Ordering::SeqCst) == 0)
        {
            // The silent frames will be inserted in `get_linear_data` below.
            let silent_frames_to_push = input_frames_needed - buffered_input_frames;
            cubeb_alog!(
                "({:p}) Missing Frames: {} will append {} frames of input silence.",
                stm.core_stream_data.stm_ptr,
                if stm.frames_read.load(Ordering::SeqCst) == 0 {
                    "input hasn't started,"
                } else if stm.switching_device.load(Ordering::SeqCst) {
                    "device switching,"
                } else {
                    "reinit pending,"
                },
                silent_frames_to_push
            );
            input_frames_needed
        } else {
            buffered_input_frames
        };

        stm.frames_read.fetch_add(input_frames, Ordering::SeqCst);
        (
            input_buffer_manager.get_linear_data(input_frames),
            input_frames as i64,
        )
    } else {
        (ptr::null_mut::<c_void>(), 0)
    };

    cubeb_alogv!(
        "({:p}) output: buffers {}, size {}, channels {}, frames {}.",
        stm as *const AudioUnitStream,
        buffers.len(),
        buffers[0].mDataByteSize,
        buffers[0].mNumberChannels,
        output_frames
    );

    assert_ne!(output_frames, 0);
    let outframes = stm.core_stream_data.resampler.fill(
        input_buffer,
        if input_buffer.is_null() {
            ptr::null_mut()
        } else {
            &mut input_frames
        },
        output_buffer,
        i64::from(output_frames),
    );

    if outframes < 0 || outframes > i64::from(output_frames) {
        audiounit_make_silent(&buffers[0]);

        #[cfg(feature = "audio-dump")]
        {
            dump_audio(
                stm.core_stream_data.audio_dump_output,
                buffers[0].mData,
                output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
            );
        }
        if !stm.stopped.swap(true, Ordering::SeqCst) {
            stm.notify_state_changed(State::Error);
            // Use a new thread, through the queue, to avoid deadlock when calling
            // AudioOutputUnitStop method from inside render callback
            stm.queue.clone().run_async(move || {
                stm.core_stream_data.stop_audiounits();
            });
        }
        return NO_ERR;
    }

    stm.draining
        .store(outframes < i64::from(output_frames), Ordering::SeqCst);
    stm.output_callback_timing_data_write
        .write(OutputCallbackTimingData {
            frames_queued: stm.frames_queued,
            timestamp: now,
            buffer_size: outframes as u64,
        });

    stm.frames_queued += outframes as u64;

    // Post process output samples.
    if stm.draining.load(Ordering::SeqCst) {
        // Clear missing frames (silence)
        let frames_to_bytes = |frames: usize| -> usize {
            let sample_size = cubeb_sample_size(stm.core_stream_data.output_stream_params.format());
            let channel_count = stm.core_stream_data.output_stream_params.channels() as usize;
            frames * sample_size * channel_count
        };
        let out_bytes = unsafe {
            slice::from_raw_parts_mut(
                output_buffer as *mut u8,
                frames_to_bytes(output_frames as usize),
            )
        };
        let start = frames_to_bytes(outframes as usize);
        for byte in out_bytes.iter_mut().skip(start) {
            *byte = 0;
        }
    }

    // Mixing
    if stm.core_stream_data.mixer.is_some() {
        assert!(
            buffers[0].mDataByteSize
                >= stm.core_stream_data.output_dev_desc.mBytesPerFrame * output_frames
        );
        stm.core_stream_data.mixer.as_mut().unwrap().mix(
            output_frames as usize,
            buffers[0].mData,
            buffers[0].mDataByteSize as usize,
        );
    }

    #[cfg(feature = "audio-dump")]
    {
        dump_audio(
            stm.core_stream_data.audio_dump_output,
            buffers[0].mData,
            output_frames * stm.core_stream_data.output_dev_desc.mChannelsPerFrame,
        );
    }
    NO_ERR
}

#[allow(clippy::cognitive_complexity)]
extern "C" fn audiounit_property_listener_callback(
    id: AudioObjectID,
    address_count: u32,
    addresses: *const AudioObjectPropertyAddress,
    user: *mut c_void,
) -> OSStatus {
    assert_ne!(address_count, 0);

    let stm = unsafe { &mut *(user as *mut AudioUnitStream) };
    let addrs = unsafe { slice::from_raw_parts(addresses, address_count as usize) };
    if stm.switching_device.load(Ordering::SeqCst) {
        cubeb_log!(
            "Switching is already taking place. Skipping event for device {}",
            id
        );
        return NO_ERR;
    }
    stm.switching_device.store(true, Ordering::SeqCst);

    let mut explicit_device_dead = false;

    cubeb_log!(
        "({:p}) Handling {} device changed events for device {}",
        stm as *const AudioUnitStream,
        address_count,
        id
    );
    for (i, addr) in addrs.iter().enumerate() {
        let p = PropertySelector::from(addr.mSelector);
        cubeb_log!("Event #{}: {}", i, p);
        assert_ne!(p, PropertySelector::Unknown);
        if p == PropertySelector::DeviceIsAlive {
            explicit_device_dead = true;
        }
    }

    // Handle the events
    if explicit_device_dead {
        if !stm.stopped.swap(true, Ordering::SeqCst) {
            cubeb_log!("The user-selected input or output device is dead, entering error state");

            // Use a different thread, through the queue, to avoid deadlock when calling
            // Get/SetProperties method from inside notify callback
            stm.queue.clone().run_async(move || {
                stm.core_stream_data.stop_audiounits();
                stm.close_on_error();
            });
        }
        return NO_ERR;
    }
    {
        let callback = stm.device_changed_callback.lock().unwrap();
        if let Some(device_changed_callback) = *callback {
            cubeb_log!("Calling device changed callback");
            unsafe {
                device_changed_callback(stm.user_ptr);
            }
        }
    }

    cubeb_log!("Reinitializing stream with new device because of device change, async");
    stm.reinit_async();

    NO_ERR
}

fn get_default_device(devtype: DeviceType) -> Option<AudioObjectID> {
    debug_assert_running_serially();
    match get_default_device_id(devtype) {
        Err(e) => {
            cubeb_log!("Cannot get default {:?} device. Error: {}", devtype, e);
            None
        }
        Ok(id) if id == kAudioObjectUnknown => {
            cubeb_log!("Get an invalid default {:?} device: {}", devtype, id);
            None
        }
        Ok(id) => Some(id),
    }
}

fn get_default_device_id(devtype: DeviceType) -> std::result::Result<AudioObjectID, OSStatus> {
    debug_assert_running_serially();
    let address = get_property_address(
        match devtype {
            DeviceType::INPUT => Property::HardwareDefaultInputDevice,
            DeviceType::OUTPUT => Property::HardwareDefaultOutputDevice,
            _ => panic!("Unsupport type"),
        },
        DeviceType::INPUT | DeviceType::OUTPUT,
    );

    let mut devid: AudioDeviceID = kAudioObjectUnknown;
    let mut size = mem::size_of::<AudioDeviceID>();
    let status =
        audio_object_get_property_data(kAudioObjectSystemObject, &address, &mut size, &mut devid);
    if status == NO_ERR {
        Ok(devid)
    } else {
        Err(status)
    }
}

fn audiounit_convert_channel_layout(layout: &AudioChannelLayout) -> Result<Vec<mixer::Channel>> {
    if layout.mChannelLayoutTag != kAudioChannelLayoutTag_UseChannelDescriptions {
        // kAudioChannelLayoutTag_UseChannelBitmap
        // kAudioChannelLayoutTag_Mono
        // kAudioChannelLayoutTag_Stereo
        // ....
        cubeb_log!("Only handling UseChannelDescriptions for now.\n");
        return Err(Error::error());
    }

    let channel_descriptions = unsafe {
        slice::from_raw_parts(
            layout.mChannelDescriptions.as_ptr(),
            layout.mNumberChannelDescriptions as usize,
        )
    };

    let mut channels = Vec::with_capacity(layout.mNumberChannelDescriptions as usize);
    for description in channel_descriptions {
        let label = CAChannelLabel(description.mChannelLabel);
        channels.push(label.into());
    }

    Ok(channels)
}

fn audiounit_get_preferred_channel_layout(output_unit: AudioUnit) -> Result<Vec<mixer::Channel>> {
    debug_assert_running_serially();
    let mut rv = NO_ERR;
    let mut size: usize = 0;
    rv = audio_unit_get_property_info(
        output_unit,
        kAudioDevicePropertyPreferredChannelLayout,
        kAudioUnitScope_Output,
        AU_OUT_BUS,
        &mut size,
        None,
    );
    if rv != NO_ERR {
        cubeb_log!(
            "AudioUnitGetPropertyInfo/kAudioDevicePropertyPreferredChannelLayout rv={}",
            rv
        );
        return Err(Error::error());
    }
    debug_assert!(size > 0);

    let mut layout = make_sized_audio_channel_layout(size);
    rv = audio_unit_get_property(
        output_unit,
        kAudioDevicePropertyPreferredChannelLayout,
        kAudioUnitScope_Output,
        AU_OUT_BUS,
        layout.as_mut(),
        &mut size,
    );
    if rv != NO_ERR {
        cubeb_log!(
            "AudioUnitGetProperty/kAudioDevicePropertyPreferredChannelLayout rv={}",
            rv
        );
        return Err(Error::error());
    }

    audiounit_convert_channel_layout(layout.as_ref())
}

// This is for output AudioUnit only. Calling this by input-only AudioUnit is prone
// to crash intermittently.
fn audiounit_get_current_channel_layout(output_unit: AudioUnit) -> Result<Vec<mixer::Channel>> {
    debug_assert_running_serially();
    let mut rv = NO_ERR;
    let mut size: usize = 0;
    rv = audio_unit_get_property_info(
        output_unit,
        kAudioUnitProperty_AudioChannelLayout,
        kAudioUnitScope_Output,
        AU_OUT_BUS,
        &mut size,
        None,
    );
    if rv != NO_ERR {
        cubeb_log!(
            "AudioUnitGetPropertyInfo/kAudioUnitProperty_AudioChannelLayout rv={}",
            rv
        );
        return Err(Error::error());
    }
    debug_assert!(size > 0);

    let mut layout = make_sized_audio_channel_layout(size);
    rv = audio_unit_get_property(
        output_unit,
        kAudioUnitProperty_AudioChannelLayout,
        kAudioUnitScope_Output,
        AU_OUT_BUS,
        layout.as_mut(),
        &mut size,
    );
    if rv != NO_ERR {
        cubeb_log!(
            "AudioUnitGetProperty/kAudioUnitProperty_AudioChannelLayout rv={}",
            rv
        );
        return Err(Error::error());
    }

    audiounit_convert_channel_layout(layout.as_ref())
}

fn get_channel_layout(output_unit: AudioUnit) -> Result<Vec<mixer::Channel>> {
    debug_assert_running_serially();
    audiounit_get_current_channel_layout(output_unit)
        .or_else(|_| {
            // The kAudioUnitProperty_AudioChannelLayout property isn't known before
            // macOS 10.12, attempt another method.
            cubeb_log!(
                "Cannot get current channel layout for audiounit @ {:p}. Trying preferred channel layout.",
                output_unit
            );
            audiounit_get_preferred_channel_layout(output_unit)
        })
}

fn start_audiounit(unit: AudioUnit) -> Result<()> {
    let status = audio_output_unit_start(unit);
    if status == NO_ERR {
        Ok(())
    } else {
        cubeb_log!("Cannot start audiounit @ {:p}. Error: {}", unit, status);
        Err(Error::error())
    }
}

fn stop_audiounit(unit: AudioUnit) -> Result<()> {
    let status = audio_output_unit_stop(unit);
    if status == NO_ERR {
        Ok(())
    } else {
        cubeb_log!("Cannot stop audiounit @ {:p}. Error: {}", unit, status);
        Err(Error::error())
    }
}

fn create_audiounit(device: &device_info) -> Result<AudioUnit> {
    assert!(device
        .flags
        .intersects(device_flags::DEV_INPUT | device_flags::DEV_OUTPUT));
    assert!(!device
        .flags
        .contains(device_flags::DEV_INPUT | device_flags::DEV_OUTPUT));
    debug_assert_running_serially();

    let unit = create_blank_audiounit()?;
    let mut bus = AU_OUT_BUS;

    if device.flags.contains(device_flags::DEV_INPUT) {
        // Input only.
        if let Err(e) = enable_audiounit_scope(unit, DeviceType::INPUT, true) {
            cubeb_log!("Failed to enable audiounit input scope. Error: {}", e);
            dispose_audio_unit(unit);
            return Err(Error::error());
        }
        if let Err(e) = enable_audiounit_scope(unit, DeviceType::OUTPUT, false) {
            cubeb_log!("Failed to disable audiounit output scope. Error: {}", e);
            dispose_audio_unit(unit);
            return Err(Error::error());
        }
        bus = AU_IN_BUS;
    }

    if device.flags.contains(device_flags::DEV_OUTPUT) {
        // Output only.
        if let Err(e) = enable_audiounit_scope(unit, DeviceType::OUTPUT, true) {
            cubeb_log!("Failed to enable audiounit output scope. Error: {}", e);
            dispose_audio_unit(unit);
            return Err(Error::error());
        }
        if let Err(e) = enable_audiounit_scope(unit, DeviceType::INPUT, false) {
            cubeb_log!("Failed to disable audiounit input scope. Error: {}", e);
            dispose_audio_unit(unit);
            return Err(Error::error());
        }
        bus = AU_OUT_BUS;
    }

    if let Err(e) = set_device_to_audiounit(unit, device.id, bus) {
        cubeb_log!(
            "Failed to set device {} to the created audiounit. Error: {}",
            device.id,
            e
        );
        dispose_audio_unit(unit);
        return Err(Error::error());
    }

    Ok(unit)
}

fn get_voiceprocessing_audiounit(
    shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
    in_device: &device_info,
    out_device: &device_info,
) -> Result<OwningHandle<VoiceProcessingUnit>> {
    debug_assert_running_serially();
    assert!(in_device.flags.contains(device_flags::DEV_INPUT));
    assert!(!in_device.flags.contains(device_flags::DEV_OUTPUT));
    assert!(!out_device.flags.contains(device_flags::DEV_INPUT));

    let unit_handle = shared_voice_processing_unit.take_or_create();
    if let Err(e) = unit_handle {
        cubeb_log!(
            "Failed to create shared voiceprocessing audiounit. Error: {}",
            e
        );
        return Err(Error::error());
    }
    let mut unit_handle = unit_handle.unwrap();

    if let Err(e) = set_device_to_audiounit(unit_handle.as_mut().unit, in_device.id, AU_IN_BUS) {
        cubeb_log!(
            "Failed to set in device {} to the created audiounit. Error: {}",
            in_device.id,
            e
        );
        return Err(Error::error());
    }

    let has_output = out_device.id != kAudioObjectUnknown;
    if let Err(e) =
        enable_audiounit_scope(unit_handle.as_mut().unit, DeviceType::OUTPUT, has_output)
    {
        cubeb_log!("Failed to enable audiounit input scope. Error: {}", e);
        return Err(Error::error());
    }
    if has_output {
        if let Err(e) =
            set_device_to_audiounit(unit_handle.as_mut().unit, out_device.id, AU_OUT_BUS)
        {
            cubeb_log!(
                "Failed to set out device {} to the created audiounit. Error: {}",
                out_device.id,
                e
            );
            return Err(Error::error());
        }
    }

    Ok(unit_handle)
}

fn enable_audiounit_scope(
    unit: AudioUnit,
    devtype: DeviceType,
    enable_io: bool,
) -> std::result::Result<(), OSStatus> {
    assert!(!unit.is_null());

    let enable = u32::from(enable_io);
    let (scope, element) = match devtype {
        DeviceType::INPUT => (kAudioUnitScope_Input, AU_IN_BUS),
        DeviceType::OUTPUT => (kAudioUnitScope_Output, AU_OUT_BUS),
        _ => panic!(
            "Enable AudioUnit {:?} with unsupported type: {:?}",
            unit, devtype
        ),
    };
    let status = audio_unit_set_property(
        unit,
        kAudioOutputUnitProperty_EnableIO,
        scope,
        element,
        &enable,
        mem::size_of::<u32>(),
    );
    if status == NO_ERR {
        Ok(())
    } else {
        Err(status)
    }
}

fn set_device_to_audiounit(
    unit: AudioUnit,
    device_id: AudioObjectID,
    bus: AudioUnitElement,
) -> std::result::Result<(), OSStatus> {
    assert!(!unit.is_null());

    let status = audio_unit_set_property(
        unit,
        kAudioOutputUnitProperty_CurrentDevice,
        kAudioUnitScope_Global,
        bus,
        &device_id,
        mem::size_of::<AudioDeviceID>(),
    );
    if status == NO_ERR {
        Ok(())
    } else {
        Err(status)
    }
}

fn create_typed_audiounit(sub_type: c_uint) -> Result<AudioUnit> {
    let desc = AudioComponentDescription {
        componentType: kAudioUnitType_Output,
        componentSubType: sub_type,
        componentManufacturer: kAudioUnitManufacturer_Apple,
        componentFlags: 0,
        componentFlagsMask: 0,
    };

    let comp = unsafe { AudioComponentFindNext(ptr::null_mut(), &desc) };
    if comp.is_null() {
        cubeb_log!("Could not find matching audio hardware.");
        return Err(Error::error());
    }
    let mut unit: AudioUnit = ptr::null_mut();
    let status = unsafe { AudioComponentInstanceNew(comp, &mut unit) };
    if status == NO_ERR {
        assert!(!unit.is_null());
        Ok(unit)
    } else {
        cubeb_log!("Fail to get a new AudioUnit. Error: {}", status);
        Err(Error::error())
    }
}

fn create_blank_audiounit() -> Result<AudioUnit> {
    #[cfg(not(target_os = "ios"))]
    return create_typed_audiounit(kAudioUnitSubType_HALOutput);
    #[cfg(target_os = "ios")]
    return create_typed_audiounit(kAudioUnitSubType_RemoteIO);
}

fn create_voiceprocessing_audiounit() -> Result<VoiceProcessingUnit> {
    let res = create_typed_audiounit(kAudioUnitSubType_VoiceProcessingIO);
    if res.is_err() {
        return Err(Error::error());
    }

    match get_default_device(DeviceType::OUTPUT) {
        None => {
            cubeb_log!("Could not get default output device in order to undo vpio ducking");
        }
        Some(id) => {
            let r = audio_device_duck(id, 1.0, ptr::null_mut(), 0.5);
            if r != NO_ERR {
                cubeb_log!(
                        "Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}",
                        id,
                        r
                    );
            }
        }
    };

    res.map(|unit| VoiceProcessingUnit { unit })
}

fn get_buffer_size(unit: AudioUnit, devtype: DeviceType) -> std::result::Result<u32, OSStatus> {
    assert!(!unit.is_null());
    let (scope, element) = match devtype {
        DeviceType::INPUT => (kAudioUnitScope_Output, AU_IN_BUS),
        DeviceType::OUTPUT => (kAudioUnitScope_Input, AU_OUT_BUS),
        _ => panic!(
            "Get buffer size of AudioUnit {:?} with unsupported type: {:?}",
            unit, devtype
        ),
    };
    let mut frames: u32 = 0;
    let mut size = mem::size_of::<u32>();
    let status = audio_unit_get_property(
        unit,
        kAudioDevicePropertyBufferFrameSize,
        scope,
        element,
        &mut frames,
        &mut size,
    );
    if status == NO_ERR {
        Ok(frames)
    } else {
        Err(status)
    }
}

fn set_buffer_size(
    unit: AudioUnit,
    devtype: DeviceType,
    frames: u32,
) -> std::result::Result<(), OSStatus> {
    assert!(!unit.is_null());
    let (scope, element) = match devtype {
        DeviceType::INPUT => (kAudioUnitScope_Output, AU_IN_BUS),
        DeviceType::OUTPUT => (kAudioUnitScope_Input, AU_OUT_BUS),
        _ => panic!(
            "Set buffer size of AudioUnit {:?} with unsupported type: {:?}",
            unit, devtype
        ),
    };
    let status = audio_unit_set_property(
        unit,
        kAudioDevicePropertyBufferFrameSize,
        scope,
        element,
        &frames,
        mem::size_of_val(&frames),
    );
    if status == NO_ERR {
        Ok(())
    } else {
        Err(status)
    }
}

#[allow(clippy::mutex_atomic)] // The mutex needs to be fed into Condvar::wait_timeout.
fn set_buffer_size_sync(unit: AudioUnit, devtype: DeviceType, frames: u32) -> Result<()> {
    let current_frames = get_buffer_size(unit, devtype).map_err(|e| {
        cubeb_log!(
            "Cannot get buffer size of AudioUnit {:?} for {:?}. Error: {}",
            unit,
            devtype,
            e
        );
        Error::error()
    })?;
    if frames == current_frames {
        cubeb_log!(
            "The buffer frame size of AudioUnit {:?} for {:?} is already {}",
            unit,
            devtype,
            frames
        );
        return Ok(());
    }

    let waiting_time = Duration::from_millis(100);
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let mut pair2 = pair.clone();
    let pair_ptr = &mut pair2;

    assert_eq!(
        audio_unit_add_property_listener(
            unit,
            kAudioDevicePropertyBufferFrameSize,
            buffer_size_changed_callback,
            pair_ptr,
        ),
        NO_ERR
    );

    let _teardown = finally(|| {
        assert_eq!(
            audio_unit_remove_property_listener_with_user_data(
                unit,
                kAudioDevicePropertyBufferFrameSize,
                buffer_size_changed_callback,
                pair_ptr,
            ),
            NO_ERR
        );
    });

    set_buffer_size(unit, devtype, frames).map_err(|e| {
        cubeb_log!(
            "Failed to set buffer size for AudioUnit {:?} for {:?}. Error: {}",
            unit,
            devtype,
            e
        );
        Error::error()
    })?;

    let (lock, cvar) = &*pair;
    let changed = lock.lock().unwrap();
    if !*changed {
        let (chg, timeout_res) = cvar.wait_timeout(changed, waiting_time).unwrap();
        if timeout_res.timed_out() {
            cubeb_log!(
                "Timed out for waiting the buffer frame size setting of AudioUnit {:?} for {:?}",
                unit,
                devtype
            );
        }
        if !*chg {
            return Err(Error::error());
        }
    }

    let new_frames = get_buffer_size(unit, devtype).map_err(|e| {
        cubeb_log!(
            "Cannot get new buffer size of AudioUnit {:?} for {:?}. Error: {}",
            unit,
            devtype,
            e
        );
        Error::error()
    })?;
    cubeb_log!(
        "The new buffer frames size of AudioUnit {:?} for {:?} is {}",
        unit,
        devtype,
        new_frames
    );

    extern "C" fn buffer_size_changed_callback(
        in_client_data: *mut c_void,
        _in_unit: AudioUnit,
        in_property_id: AudioUnitPropertyID,
        in_scope: AudioUnitScope,
        in_element: AudioUnitElement,
    ) {
        if in_scope == 0 {
            // filter out the callback for global scope.
            return;
        }
        assert!(in_element == AU_IN_BUS || in_element == AU_OUT_BUS);
        assert_eq!(in_property_id, kAudioDevicePropertyBufferFrameSize);
        let pair = unsafe { &mut *(in_client_data as *mut Arc<(Mutex<bool>, Condvar)>) };
        let (lock, cvar) = &**pair;
        let mut changed = lock.lock().unwrap();
        *changed = true;
        cvar.notify_one();
    }

    Ok(())
}

fn convert_uint32_into_string(data: u32) -> CString {
    let empty = CString::default();
    if data == 0 {
        return empty;
    }

    // Reverse 0xWXYZ into 0xZYXW.
    let mut buffer = vec![b'\x00'; 4]; // 4 bytes for uint32.
    buffer[0] = (data >> 24) as u8;
    buffer[1] = (data >> 16) as u8;
    buffer[2] = (data >> 8) as u8;
    buffer[3] = (data) as u8;

    // CString::new() will consume the input bytes vec and add a '\0' at the
    // end of the bytes. The input bytes vec must not contain any 0 bytes in
    // it in case causing memory leaks.
    CString::new(buffer).unwrap_or(empty)
}

fn get_channel_count(
    devid: AudioObjectID,
    devtype: DeviceType,
) -> std::result::Result<u32, OSStatus> {
    assert_ne!(devid, kAudioObjectUnknown);
    debug_assert_running_serially();

    let devstreams = get_device_streams(devid, devtype)?;
    let mut count: u32 = 0;
    for ds in devstreams {
        if devtype == DeviceType::INPUT
            && CoreStreamData::should_force_vpio_for_input_device(ds.device)
        {
            count += 1;
        } else {
            count += get_stream_virtual_format(ds.stream)
                .map(|f| f.mChannelsPerFrame)
                .unwrap_or(0);
        }
    }
    Ok(count)
}

fn get_range_of_sample_rates(
    devid: AudioObjectID,
    devtype: DeviceType,
) -> std::result::Result<(f64, f64), String> {
    debug_assert_running_serially();
    let result = get_ranges_of_device_sample_rate(devid, devtype);
    if let Err(e) = result {
        return Err(format!("status {}", e));
    }
    let rates = result.unwrap();
    if rates.is_empty() {
        return Err(String::from("No data"));
    }
    let (mut min, mut max) = (f64::MAX, f64::MIN);
    for rate in rates {
        if rate.mMaximum > max {
            max = rate.mMaximum;
        }
        if rate.mMinimum < min {
            min = rate.mMinimum;
        }
    }
    Ok((min, max))
}

fn get_fixed_latency(devid: AudioObjectID, devtype: DeviceType) -> u32 {
    debug_assert_running_serially();
    let device_latency = match get_device_latency(devid, devtype) {
        Ok(latency) => latency,
        Err(e) => {
            cubeb_log!(
                "Cannot get the device latency for device {} in {:?} scope. Error: {}",
                devid,
                devtype,
                e
            );
            0 // default device latency
        }
    };

    let stream_latency = get_device_streams(devid, devtype).and_then(|devstreams| {
        if devstreams.is_empty() {
            cubeb_log!(
                "No stream on device {} in {:?} scope!",
                devid,
                devtype
            );
            Ok(0) // default stream latency
        } else {
            get_stream_latency(devstreams[0].stream)
        }
    }).inspect_err(|e| {
        cubeb_log!(
            "Cannot get the stream, or the latency of the first stream on device {} in {:?} scope. Error: {}",
            devid,
            devtype,
            e
        );
    }).unwrap_or(0); // default stream latency

    device_latency + stream_latency
}

#[allow(non_upper_case_globals)]
fn get_device_group_id(
    id: AudioDeviceID,
    devtype: DeviceType,
) -> std::result::Result<CString, OSStatus> {
    debug_assert_running_serially();
    match get_device_transport_type(id, devtype) {
        Ok(kAudioDeviceTransportTypeBuiltIn) => {
            cubeb_log!(
                "The transport type is {:?}",
                convert_uint32_into_string(kAudioDeviceTransportTypeBuiltIn)
            );
            match get_custom_group_id(id, devtype) {
                Some(id) => return Ok(id),
                None => {
                    cubeb_log!("Getting model UID instead.");
                }
            };
        }
        Ok(trans_type) => {
            cubeb_log!(
                "The transport type is {:?}. Getting model UID instead.",
                convert_uint32_into_string(trans_type)
            );
        }
        Err(e) => {
            cubeb_log!(
                "Error: {} when getting transport type. Get model uid instead.",
                e
            );
        }
    }

    // Some devices (e.g. AirPods) might only set the model-uid in the global scope.
    // The query might fail if the scope is input-only or output-only.
    get_device_model_uid(id, devtype)
        .or_else(|_| get_device_model_uid(id, DeviceType::INPUT | DeviceType::OUTPUT))
        .map(|uid| uid.into_cstring())
}

fn get_custom_group_id(id: AudioDeviceID, devtype: DeviceType) -> Option<CString> {
    debug_assert_running_serially();

    const IMIC: u32 = 0x696D_6963; // "imic" (internal microphone)
    const ISPK: u32 = 0x6973_706B; // "ispk" (internal speaker)
    const EMIC: u32 = 0x656D_6963; // "emic" (external microphone)
    const HDPN: u32 = 0x6864_706E; // "hdpn" (headphone)

    match get_device_source(id, devtype) {
        s @ Ok(IMIC) | s @ Ok(ISPK) => {
            const GROUP_ID: &str = "builtin-internal-mic|spk";
            cubeb_log!(
                "Using hardcode group id: {} when source is: {:?}.",
                GROUP_ID,
                convert_uint32_into_string(s.unwrap())
            );
            return Some(CString::new(GROUP_ID).unwrap());
        }
        s @ Ok(EMIC) | s @ Ok(HDPN) => {
            const GROUP_ID: &str = "builtin-external-mic|hdpn";
            cubeb_log!(
                "Using hardcode group id: {} when source is: {:?}.",
                GROUP_ID,
                convert_uint32_into_string(s.unwrap())
            );
            return Some(CString::new(GROUP_ID).unwrap());
        }
        Ok(s) => {
            cubeb_log!(
                "No custom group id when source is: {:?}.",
                convert_uint32_into_string(s)
            );
        }
        Err(e) => {
            cubeb_log!("Error: {} when getting device source. ", e);
        }
    }
    None
}

fn get_device_label(
    id: AudioDeviceID,
    devtype: DeviceType,
) -> std::result::Result<StringRef, OSStatus> {
    debug_assert_running_serially();
    get_device_source_name(id, devtype).or_else(|_| get_device_name(id, devtype))
}

fn get_device_global_uid(id: AudioDeviceID) -> std::result::Result<StringRef, OSStatus> {
    debug_assert_running_serially();
    get_device_uid(id, DeviceType::INPUT | DeviceType::OUTPUT)
}

#[allow(clippy::cognitive_complexity)]
fn create_cubeb_device_info(
    devid: AudioObjectID,
    devtype: DeviceType,
) -> Result<ffi::cubeb_device_info> {
    if devtype != DeviceType::INPUT && devtype != DeviceType::OUTPUT {
        return Err(Error::error());
    }
    let channels = get_channel_count(devid, devtype).map_err(|e| {
        cubeb_log!("Cannot get the channel count. Error: {}", e);
        Error::error()
    })?;
    if channels == 0 {
        // Invalid type for this device.
        return Err(Error::error());
    }

    let mut dev_info = ffi::cubeb_device_info {
        max_channels: channels,
        ..Default::default()
    };

    assert!(
        mem::size_of::<ffi::cubeb_devid>() >= mem::size_of_val(&devid),
        "cubeb_devid can't represent devid"
    );
    dev_info.devid = devid as ffi::cubeb_devid;

    match get_device_uid(devid, devtype) {
        Ok(uid) => {
            let c_string = uid.into_cstring();
            dev_info.device_id = c_string.into_raw();
        }
        Err(e) => {
            cubeb_log!(
                "Cannot get the UID for device {} in {:?} scope. Error: {}",
                devid,
                devtype,
                e
            );
        }
    }

    match get_device_group_id(devid, devtype) {
        Ok(group_id) => {
            dev_info.group_id = group_id.into_raw();
        }
        Err(e) => {
            cubeb_log!(
                "Cannot get the model UID for device {} in {:?} scope. Error: {}",
                devid,
                devtype,
                e
            );
        }
    }

    let label = match get_device_label(devid, devtype) {
        Ok(label) => label.into_cstring(),
        Err(e) => {
            cubeb_log!(
                "Cannot get the label for device {} in {:?} scope. Error: {}",
                devid,
                devtype,
                e
            );
            CString::default()
        }
    };
    dev_info.friendly_name = label.into_raw();

    match get_device_manufacturer(devid, devtype) {
        Ok(vendor) => {
            let vendor = vendor.into_cstring();
            dev_info.vendor_name = vendor.into_raw();
        }
        Err(e) => {
            cubeb_log!(
                "Cannot get the manufacturer for device {} in {:?} scope. Error: {}",
                devid,
                devtype,
                e
            );
        }
    }

    dev_info.device_type = match devtype {
        DeviceType::INPUT => ffi::CUBEB_DEVICE_TYPE_INPUT,
        DeviceType::OUTPUT => ffi::CUBEB_DEVICE_TYPE_OUTPUT,
        _ => panic!("invalid type"),
    };

    dev_info.state = ffi::CUBEB_DEVICE_STATE_ENABLED;
    dev_info.preferred = match get_default_device(devtype) {
        Some(id) if id == devid => ffi::CUBEB_DEVICE_PREF_ALL,
        _ => ffi::CUBEB_DEVICE_PREF_NONE,
    };

    dev_info.format = ffi::CUBEB_DEVICE_FMT_ALL;
    dev_info.default_format = ffi::CUBEB_DEVICE_FMT_F32NE;

    match get_device_sample_rate(devid, devtype) {
        Ok(rate) => {
            dev_info.default_rate = rate as u32;
        }
        Err(e) => {
            cubeb_log!(
                "Cannot get the sample rate for device {} in {:?} scope. Error: {}",
                devid,
                devtype,
                e
            );
        }
    }

    match get_range_of_sample_rates(devid, devtype) {
        Ok((min, max)) => {
            dev_info.min_rate = min as u32;
            dev_info.max_rate = max as u32;
        }
        Err(e) => {
            cubeb_log!(
                "Cannot get the range of sample rate for device {} in {:?} scope. Error: {}",
                devid,
                devtype,
                e
            );
        }
    }

    let latency = get_fixed_latency(devid, devtype);

    let (latency_low, latency_high) = match get_device_buffer_frame_size_range(devid, devtype) {
        Ok(range) => (
            latency + range.mMinimum as u32,
            latency + range.mMaximum as u32,
        ),
        Err(e) => {
            cubeb_log!("Cannot get the buffer frame size for device {} in {:?} scope. Using default value instead. Error: {}", devid, devtype, e);
            (
                10 * dev_info.default_rate / 1000,
                100 * dev_info.default_rate / 1000,
            )
        }
    };
    dev_info.latency_lo = latency_low;
    dev_info.latency_hi = latency_high;

    Ok(dev_info)
}

fn destroy_cubeb_device_info(device: &mut ffi::cubeb_device_info) {
    // This should be mapped to the memory allocation in `create_cubeb_device_info`.
    // The `device_id`, `group_id`, `vendor_name` can be null pointer if the queries
    // failed, while `friendly_name` will be assigned to a default empty "" string.
    // Set the pointers to null in case it points to some released memory.
    unsafe {
        if !device.device_id.is_null() {
            let _ = CString::from_raw(device.device_id as *mut _);
            device.device_id = ptr::null();
        }

        if !device.group_id.is_null() {
            let _ = CString::from_raw(device.group_id as *mut _);
            device.group_id = ptr::null();
        }

        assert!(!device.friendly_name.is_null());
        let _ = CString::from_raw(device.friendly_name as *mut _);
        device.friendly_name = ptr::null();

        if !device.vendor_name.is_null() {
            let _ = CString::from_raw(device.vendor_name as *mut _);
            device.vendor_name = ptr::null();
        }
    }
}

fn audiounit_get_devices_of_type(devtype: DeviceType) -> Vec<AudioObjectID> {
    assert!(devtype.intersects(DeviceType::INPUT | DeviceType::OUTPUT));
    debug_assert_running_serially();

    let mut devices = get_devices();

    // Remove the aggregate device from the list of devices (if any).
    devices.retain(|&device| {
        // TODO: (bug 1628411) Figure out when `device` is `kAudioObjectUnknown`.
        if device == kAudioObjectUnknown {
            false
        } else if let Ok(uid) = get_device_global_uid(device) {
            let uid = uid.into_string();
            !uid.contains(PRIVATE_AGGREGATE_DEVICE_NAME)
                && !uid.contains(VOICEPROCESSING_AGGREGATE_DEVICE_NAME)
        } else {
            // Fail to get device uid.
            true
        }
    });

    // Expected sorted but did not find anything in the docs.
    devices.sort_unstable();
    if devtype.contains(DeviceType::INPUT | DeviceType::OUTPUT) {
        return devices;
    }

    let mut devices_in_scope = Vec::new();
    for device in devices {
        let label = match get_device_label(device, DeviceType::OUTPUT | DeviceType::INPUT) {
            Ok(label) => label.into_string(),
            Err(e) => format!("Unknown(error: {})", e),
        };
        let info = format!("{} ({})", device, label);

        if let Ok(channels) = get_channel_count(device, devtype) {
            cubeb_log!("Device {} has {} {:?}-channels", info, channels, devtype);
            if channels > 0 {
                devices_in_scope.push(device);
            }
        } else {
            cubeb_log!("Cannot get the channel count for device {}. Ignored.", info);
        }
    }

    devices_in_scope
}

extern "C" fn audiounit_collection_changed_callback(
    _in_object_id: AudioObjectID,
    _in_number_addresses: u32,
    _in_addresses: *const AudioObjectPropertyAddress,
    in_client_data: *mut c_void,
) -> OSStatus {
    let context = unsafe { &mut *(in_client_data as *mut AudioUnitContext) };

    let queue = context.serial_queue.clone();

    // This can be called from inside an AudioUnit function, dispatch to another queue.
    queue.run_async(move || {
        let ctx_ptr = context as *const AudioUnitContext;

        let mut devices = context.devices.lock().unwrap();

        if devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none() {
            return;
        }
        if devices.input.changed_callback.is_some() {
            let input_devices = audiounit_get_devices_of_type(DeviceType::INPUT);
            if devices.input.update_devices(input_devices) {
                unsafe {
                    devices.input.changed_callback.unwrap()(
                        ctx_ptr as *mut ffi::cubeb,
                        devices.input.callback_user_ptr,
                    );
                }
            }
        }
        if devices.output.changed_callback.is_some() {
            let output_devices = audiounit_get_devices_of_type(DeviceType::OUTPUT);
            if devices.output.update_devices(output_devices) {
                unsafe {
                    devices.output.changed_callback.unwrap()(
                        ctx_ptr as *mut ffi::cubeb,
                        devices.output.callback_user_ptr,
                    );
                }
            }
        }
    });

    NO_ERR
}

#[derive(Debug)]
struct DevicesData {
    changed_callback: ffi::cubeb_device_collection_changed_callback,
    callback_user_ptr: *mut c_void,
    devices: Vec<AudioObjectID>,
}

impl DevicesData {
    fn set(
        &mut self,
        changed_callback: ffi::cubeb_device_collection_changed_callback,
        callback_user_ptr: *mut c_void,
        devices: Vec<AudioObjectID>,
    ) {
        self.changed_callback = changed_callback;
        self.callback_user_ptr = callback_user_ptr;
        self.devices = devices;
    }

    fn update_devices(&mut self, devices: Vec<AudioObjectID>) -> bool {
        // Elements in the vector expected sorted.
        if self.devices == devices {
            return false;
        }
        self.devices = devices;
        true
    }

    fn clear(&mut self) {
        self.changed_callback = None;
        self.callback_user_ptr = ptr::null_mut();
        self.devices.clear();
    }

    fn is_empty(&self) -> bool {
        self.changed_callback.is_none()
            && self.callback_user_ptr.is_null()
            && self.devices.is_empty()
    }
}

impl Default for DevicesData {
    fn default() -> Self {
        Self {
            changed_callback: None,
            callback_user_ptr: ptr::null_mut(),
            devices: Vec::new(),
        }
    }
}

#[derive(Debug, Default)]
struct SharedDevices {
    input: DevicesData,
    output: DevicesData,
}

#[derive(Debug, Default)]
struct LatencyController {
    streams: u32,
    latency: Option<u32>,
}

impl LatencyController {
    fn add_stream(&mut self, latency: u32) -> u32 {
        self.streams += 1;
        // For the 1st stream set anything within safe min-max
        if self.streams == 1 {
            assert!(self.latency.is_none());
            // Silently clamp the latency down to the platform default, because we
            // synthetize the clock from the callbacks, and we want the clock to update often.
            self.latency = Some(latency.clamp(SAFE_MIN_LATENCY_FRAMES, SAFE_MAX_LATENCY_FRAMES));
        }
        self.latency.unwrap_or(latency)
    }

    fn subtract_stream(&mut self) {
        self.streams -= 1;
        if self.streams == 0 {
            assert!(self.latency.is_some());
            self.latency = None;
        }
    }
}

// SharedStorage<T> below looks generic but has evolved to be pretty tailored
// the observed behavior of VoiceProcessingIO audio units on macOS 14.
// Some key points are:
// - Creating the first VoiceProcessingIO unit in a process takes a long time, often > 3s.
// - Creating a second VoiceProcessingIO unit in a process is significantly faster, < 1s.
// - Disposing of a VoiceProcessingIO unit when all other VoiceProcessingIO units are
//   uninitialized will take significantly longer than disposing the remaining
//   VoiceProcessingIO units, and will have other side effects: starting another
//   VoiceProcessingIO unit after this is on par with creating the first one in the
//   process, bluetooth devices will move away from the handsfree profile, etc.
// The takeaway is that there is something internal to the VoiceProcessingIO audio unit
// that is costly to create and dispose of and its creation is triggered by creation of
// the first VoiceProcessingIO unit, and its disposal is triggered by the disposal of
// the first VoiceProcessingIO unit when no other VoiceProcessingIO units are initialized.
//
// The intended behavior of SharedStorage<T> and SharedVoiceProcessingUnitManager is therefore:
// - Retain ideally just one VoiceProcessingIO unit after stream destruction, so device
//   switching is fast. The benefit of retaining more than one is unclear.
// - Dispose of either all VoiceProcessingIO units, or none at all, such that the retained
//   VoiceProcessingIO unit really helps speed up creating and starting the next. In practice
//   this means we retain all VoiceProcessingIO units until they can all be disposed of.

#[derive(Debug)]
struct SharedStorageInternal<T> {
    // Storage for shared elements.
    elements: Vec<T>,
    // Number of elements in use, i.e. all elements created/taken and not recycled.
    outstanding_element_count: usize,
    // Used for invalidation of in-flight tasks to clear elements.
    // Incremented when something takes a shared element.
    generation: usize,
}

#[derive(Debug)]
struct SharedStorage<T> {
    queue: Queue,
    idle_timeout: Duration,
    storage: Mutex<SharedStorageInternal<T>>,
}

impl<T: Send> SharedStorage<T> {
    fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self {
        Self {
            queue,
            idle_timeout,
            storage: Mutex::new(SharedStorageInternal::<T> {
                elements: Vec::default(),
                outstanding_element_count: 0,
                generation: 0,
            }),
        }
    }

    fn take_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) -> Result<T> {
        if let Some(e) = guard.elements.pop() {
            cubeb_log!("Taking shared element #{}.", guard.elements.len());
            guard.outstanding_element_count += 1;
            guard.generation += 1;
            return Ok(e);
        }

        Err(Error::not_supported())
    }

    fn create_with_locked<F>(
        guard: &mut MutexGuard<'_, SharedStorageInternal<T>>,
        f: F,
    ) -> Result<T>
    where
        F: FnOnce() -> Result<T>,
    {
        let start = Instant::now();
        match f() {
            Ok(obj) => {
                cubeb_log!(
                    "Just created shared element #{}. Took {}s.",
                    guard.outstanding_element_count,
                    (Instant::now() - start).as_secs_f32()
                );
                guard.outstanding_element_count += 1;
                guard.generation += 1;
                Ok(obj)
            }
            Err(_) => {
                cubeb_log!("Creating shared element failed");
                Err(Error::error())
            }
        }
    }

    #[cfg(test)]
    fn take(&self) -> Result<T> {
        let mut guard = self.storage.lock().unwrap();
        SharedStorage::take_locked(&mut guard)
    }

    fn take_or_create_with<F>(&self, f: F) -> Result<T>
    where
        F: FnOnce() -> Result<T>,
    {
        let mut guard = self.storage.lock().unwrap();
        SharedStorage::take_locked(&mut guard)
            .or_else(|_| SharedStorage::create_with_locked(&mut guard, f))
    }

    fn recycle(&self, obj: T) {
        let mut guard = self.storage.lock().unwrap();
        guard.outstanding_element_count -= 1;
        cubeb_log!(
            "Recycling shared element #{}. Nr of live elements now {}.",
            guard.elements.len(),
            guard.outstanding_element_count
        );
        guard.elements.push(obj);
    }

    fn clear_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) {
        let count = guard.elements.len();
        let start = Instant::now();
        guard.elements.clear();
        cubeb_log!(
            "Cleared {} shared element{}. Took {}s.",
            count,
            if count == 1 { "" } else { "s" },
            (Instant::now() - start).as_secs_f32()
        );
    }

    fn clear(&self) {
        debug_assert_running_serially();
        let mut guard = self.storage.lock().unwrap();
        SharedStorage::clear_locked(&mut guard);
    }

    fn clear_if_all_idle_async(storage: &Arc<SharedStorage<T>>) {
        let (queue, outstanding_element_count, generation) = {
            let guard = storage.storage.lock().unwrap();
            (
                storage.queue.clone(),
                guard.outstanding_element_count,
                guard.generation,
            )
        };
        if outstanding_element_count > 0 {
            cubeb_log!(
                "Not clearing shared voiceprocessing unit storage because {} elements are in use. Generation={}.",
                outstanding_element_count,
                generation
            );
            return;
        }
        cubeb_log!(
            "Clearing shared voiceprocessing unit storage in {}s if still at generation {}.",
            storage.idle_timeout.as_secs_f32(),
            generation
        );
        let storage = storage.clone();
        queue.run_after(Instant::now() + storage.idle_timeout, move || {
            let mut guard = storage.storage.lock().unwrap();
            if generation != guard.generation {
                cubeb_log!(
                    "Not clearing shared voiceprocessing unit storage for generation {} as we're now at {}.",
                    generation,
                    guard.generation
                );
                return;
            }
            SharedStorage::clear_locked(&mut guard);
        });
    }
}

#[derive(Debug)]
struct OwningHandle<T>
where
    T: Send,
{
    storage: Weak<SharedStorage<T>>,
    obj: Option<T>,
}

impl<T: Send> OwningHandle<T> {
    fn new(storage: Weak<SharedStorage<T>>, obj: T) -> Self {
        Self {
            storage,
            obj: Some(obj),
        }
    }
}

impl<T: Send> AsRef<T> for OwningHandle<T> {
    fn as_ref(&self) -> &T {
        self.obj.as_ref().unwrap()
    }
}

impl<T: Send> AsMut<T> for OwningHandle<T> {
    fn as_mut(&mut self) -> &mut T {
        self.obj.as_mut().unwrap()
    }
}

impl<T: Send> Drop for OwningHandle<T> {
    fn drop(&mut self) {
        let storage = self.storage.upgrade();
        assert!(
            storage.is_some(),
            "Storage must outlive the handle, but didn't"
        );
        let storage = storage.unwrap();
        if self.obj.is_none() {
            return;
        }
        let obj = self.obj.take().unwrap();
        storage.recycle(obj);
        SharedStorage::clear_if_all_idle_async(&storage);
    }
}

#[derive(Debug)]
struct VoiceProcessingUnit {
    unit: AudioUnit,
}

impl Drop for VoiceProcessingUnit {
    fn drop(&mut self) {
        assert!(!self.unit.is_null());
        dispose_audio_unit(self.unit);
    }
}

unsafe impl Send for VoiceProcessingUnit {}

#[derive(Debug)]
struct SharedVoiceProcessingUnitManager {
    sync_storage: Mutex<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>,
    queue: Queue,
    idle_timeout: Duration,
}

impl SharedVoiceProcessingUnitManager {
    fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self {
        Self {
            sync_storage: Mutex::new(None),
            queue,
            idle_timeout,
        }
    }

    fn new(queue: Queue) -> Self {
        SharedVoiceProcessingUnitManager::with_idle_timeout(queue, VPIO_IDLE_TIMEOUT)
    }

    fn ensure_storage_locked(
        &self,
        guard: &mut MutexGuard<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>,
    ) {
        if guard.is_some() {
            return;
        }
        cubeb_log!("Creating shared voiceprocessing storage.");
        let storage = SharedStorage::<VoiceProcessingUnit>::with_idle_timeout(
            self.queue.clone(),
            self.idle_timeout,
        );
        let old_storage = guard.replace(Arc::from(storage));
        assert!(old_storage.is_none());
    }

    // Take an already existing, shared, vpio unit, if one is available.
    #[cfg(test)]
    fn take(&mut self) -> Result<OwningHandle<VoiceProcessingUnit>> {
        debug_assert_running_serially();
        let mut guard = self.sync_storage.lock().unwrap();
        self.ensure_storage_locked(&mut guard);
        let storage = guard.as_mut().unwrap();
        let res = storage.take();
        res.map(|u| OwningHandle::new(Arc::downgrade(storage), u))
    }

    // Take an already existing, shared, vpio unit, or create one if none are available.
    fn take_or_create(&mut self) -> Result<OwningHandle<VoiceProcessingUnit>> {
        debug_assert_running_serially();
        let mut guard = self.sync_storage.lock().unwrap();
        self.ensure_storage_locked(&mut guard);
        let storage = guard.as_mut().unwrap();
        let res = storage.take_or_create_with(create_voiceprocessing_audiounit);
        res.map(|u| OwningHandle::new(Arc::downgrade(storage), u))
    }
}

unsafe impl Send for SharedVoiceProcessingUnitManager {}
unsafe impl Sync for SharedVoiceProcessingUnitManager {}

impl Drop for SharedVoiceProcessingUnitManager {
    fn drop(&mut self) {
        debug_assert_not_running_serially();
        self.queue.run_final(|| {
            let mut guard = self.sync_storage.lock().unwrap();
            if guard.is_none() {
                return;
            }
            guard.as_mut().unwrap().clear();
        });
    }
}

pub const OPS: Ops = capi_new!(AudioUnitContext, AudioUnitStream);

// The fisrt member of the Cubeb context must be a pointer to a Ops struct. The Ops struct is an
// interface to link to all the Cubeb APIs, and the Cubeb interface use this assumption to operate
// the Cubeb APIs on different implementation.
// #[repr(C)] is used to prevent any padding from being added in the beginning of the AudioUnitContext.
#[repr(C)]
#[derive(Debug)]
pub struct AudioUnitContext {
    _ops: *const Ops,
    serial_queue: Queue,
    latency_controller: Mutex<LatencyController>,
    devices: Mutex<SharedDevices>,
    host_time_to_ns_ratio: (u32, u32),
    // Storage for a context-global vpio unit. Duplex streams that need one will take this
    // and return it when done.
    shared_voice_processing_unit: SharedVoiceProcessingUnitManager,
}

impl AudioUnitContext {
    fn new() -> Self {
        let queue_label = format!("{}.context", DISPATCH_QUEUE_LABEL);
        let serial_queue =
            Queue::new_with_target(queue_label.as_str(), get_serial_queue_singleton());
        let shared_vp_queue = Queue::new_with_target(
            format!("{}.context.shared_vpio", DISPATCH_QUEUE_LABEL).as_str(),
            &serial_queue,
        );
        let host_time_to_ns_ratio = {
            let mut timebase_info = mach_timebase_info { numer: 0, denom: 0 };
            unsafe {
                mach_timebase_info(&mut timebase_info);
            }
            (timebase_info.numer, timebase_info.denom)
        };
        Self {
            _ops: &OPS as *const _,
            serial_queue,
            latency_controller: Mutex::new(LatencyController::default()),
            devices: Mutex::new(SharedDevices::default()),
            host_time_to_ns_ratio,
            shared_voice_processing_unit: SharedVoiceProcessingUnitManager::new(shared_vp_queue),
        }
    }

    fn active_streams(&self) -> u32 {
        let controller = self.latency_controller.lock().unwrap();
        controller.streams
    }

    fn update_latency_by_adding_stream(&self, latency_frames: u32) -> u32 {
        let mut controller = self.latency_controller.lock().unwrap();
        controller.add_stream(latency_frames)
    }

    fn update_latency_by_removing_stream(&self) {
        let mut controller = self.latency_controller.lock().unwrap();
        controller.subtract_stream();
    }

    fn add_devices_changed_listener(
        &mut self,
        devtype: DeviceType,
        collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
        user_ptr: *mut c_void,
    ) -> Result<()> {
        assert!(devtype.intersects(DeviceType::INPUT | DeviceType::OUTPUT));
        assert!(collection_changed_callback.is_some());

        let context_ptr = self as *mut AudioUnitContext;
        let mut devices = self.devices.lock().unwrap();

        // Note: second register without unregister first causes 'nope' error.
        // Current implementation requires unregister before register a new cb.
        if devtype.contains(DeviceType::INPUT) && devices.input.changed_callback.is_some()
            || devtype.contains(DeviceType::OUTPUT) && devices.output.changed_callback.is_some()
        {
            return Err(Error::invalid_parameter());
        }

        if devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none() {
            let address = get_property_address(
                Property::HardwareDevices,
                DeviceType::INPUT | DeviceType::OUTPUT,
            );
            let ret = audio_object_add_property_listener(
                kAudioObjectSystemObject,
                &address,
                audiounit_collection_changed_callback,
                context_ptr,
            );
            if ret != NO_ERR {
                cubeb_log!(
                    "Cannot add devices-changed listener for {:?}, Error: {}",
                    devtype,
                    ret
                );
                return Err(Error::error());
            }
        }

        if devtype.contains(DeviceType::INPUT) {
            // Expected empty after unregister.
            assert!(devices.input.is_empty());
            devices.input.set(
                collection_changed_callback,
                user_ptr,
                audiounit_get_devices_of_type(DeviceType::INPUT),
            );
        }

        if devtype.contains(DeviceType::OUTPUT) {
            // Expected empty after unregister.
            assert!(devices.output.is_empty());
            devices.output.set(
                collection_changed_callback,
                user_ptr,
                audiounit_get_devices_of_type(DeviceType::OUTPUT),
            );
        }

        Ok(())
    }

    fn remove_devices_changed_listener(&mut self, devtype: DeviceType) -> Result<()> {
        if !devtype.intersects(DeviceType::INPUT | DeviceType::OUTPUT) {
            return Err(Error::invalid_parameter());
        }

        let context_ptr = self as *mut AudioUnitContext;
        let mut devices = self.devices.lock().unwrap();

        if devtype.contains(DeviceType::INPUT) {
            devices.input.clear();
        }

        if devtype.contains(DeviceType::OUTPUT) {
            devices.output.clear();
        }

        if devices.input.changed_callback.is_some() || devices.output.changed_callback.is_some() {
            return Ok(());
        }

        let address = get_property_address(
            Property::HardwareDevices,
            DeviceType::INPUT | DeviceType::OUTPUT,
        );
        // Note: unregister a non registered cb is not a problem, not checking.
        let r = audio_object_remove_property_listener(
            kAudioObjectSystemObject,
            &address,
            audiounit_collection_changed_callback,
            context_ptr,
        );
        if r == NO_ERR {
            Ok(())
        } else {
            cubeb_log!(
                "Cannot remove devices-changed listener for {:?}, Error: {}",
                devtype,
                r
            );
            Err(Error::error())
        }
    }
}

impl ContextOps for AudioUnitContext {
    fn init(_context_name: Option<&CStr>) -> Result<Context> {
        run_serially(set_notification_runloop);
        let mut ctx = Box::new(AudioUnitContext::new());
        let queue_label = format!("{}.context.{:p}", DISPATCH_QUEUE_LABEL, ctx.as_ref());
        ctx.serial_queue =
            Queue::new_with_target(queue_label.as_str(), get_serial_queue_singleton());
        let shared_vp_queue = Queue::new_with_target(
            format!("{}.shared_vpio", queue_label).as_str(),
            &ctx.serial_queue,
        );
        ctx.shared_voice_processing_unit = SharedVoiceProcessingUnitManager::new(shared_vp_queue);
        Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
    }

    fn backend_id(&mut self) -> &'static CStr {
        unsafe { CStr::from_ptr(b"audiounit-rust\0".as_ptr() as *const _) }
    }
    #[cfg(target_os = "ios")]
    fn max_channel_count(&mut self) -> Result<u32> {
        Ok(2u32)
    }
    #[cfg(not(target_os = "ios"))]
    fn max_channel_count(&mut self) -> Result<u32> {
        self.serial_queue
            .run_sync(|| {
                let device = match get_default_device(DeviceType::OUTPUT) {
                    None => {
                        cubeb_log!("Could not get default output device");
                        return Err(Error::error());
                    }
                    Some(id) => id,
                };
                get_channel_count(device, DeviceType::OUTPUT).map_err(|e| {
                    cubeb_log!("Cannot get the channel count. Error: {}", e);
                    Error::error()
                })
            })
            .unwrap()
    }
    #[cfg(target_os = "ios")]
    fn min_latency(&mut self, _params: StreamParams) -> Result<u32> {
        Err(not_supported());
    }
    #[cfg(not(target_os = "ios"))]
    fn min_latency(&mut self, _params: StreamParams) -> Result<u32> {
        self.serial_queue
            .run_sync(|| {
                let device = match get_default_device(DeviceType::OUTPUT) {
                    None => {
                        cubeb_log!("Could not get default output device");
                        return Err(Error::error());
                    }
                    Some(id) => id,
                };

                let range = get_device_buffer_frame_size_range(device, DeviceType::OUTPUT)
                    .map_err(|e| {
                        cubeb_log!("Could not get acceptable latency range. Error: {}", e);
                        Error::error()
                    })?;

                Ok(cmp::max(range.mMinimum as u32, SAFE_MIN_LATENCY_FRAMES))
            })
            .unwrap()
    }
    #[cfg(target_os = "ios")]
    fn preferred_sample_rate(&mut self) -> Result<u32> {
        Err(not_supported());
    }
    #[cfg(not(target_os = "ios"))]
    fn preferred_sample_rate(&mut self) -> Result<u32> {
        self.serial_queue
            .run_sync(|| {
                let device = match get_default_device(DeviceType::OUTPUT) {
                    None => {
                        cubeb_log!("Could not get default output device");
                        return Err(Error::error());
                    }
                    Some(id) => id,
                };
                let rate = get_device_sample_rate(device, DeviceType::OUTPUT).map_err(|e| {
                    cubeb_log!(
                        "Cannot get the sample rate of the default output device. Error: {}",
                        e
                    );
                    Error::error()
                })?;
                Ok(rate as u32)
            })
            .unwrap()
    }
    fn supported_input_processing_params(&mut self) -> Result<InputProcessingParams> {
        Ok(InputProcessingParams::ECHO_CANCELLATION
            | InputProcessingParams::NOISE_SUPPRESSION
            | InputProcessingParams::AUTOMATIC_GAIN_CONTROL)
    }
    fn enumerate_devices(
        &mut self,
        devtype: DeviceType,
        collection: &DeviceCollectionRef,
    ) -> Result<()> {
        let device_infos = self
            .serial_queue
            .run_sync(|| {
                let mut dev_types = vec![DeviceType::INPUT, DeviceType::OUTPUT];
                dev_types.retain(|&dt| devtype.contains(dt));
                let device_ids: Vec<(DeviceType, Vec<AudioObjectID>)> = dev_types
                    .iter()
                    .map(|&dt| (dt, audiounit_get_devices_of_type(dt)))
                    .collect();
                let count = device_ids.iter().map(|(_dt, ids)| ids.len()).sum();
                let mut device_infos = Vec::with_capacity(count);
                for (dt, dev_ids) in device_ids {
                    for dev_id in dev_ids {
                        if let Ok(info) = create_cubeb_device_info(dev_id, dt) {
                            device_infos.push(info);
                        }
                    }
                }
                device_infos
            })
            .unwrap();
        let (ptr, len) = if device_infos.is_empty() {
            (ptr::null_mut(), 0)
        } else {
            forget_vec(device_infos)
        };
        let coll = unsafe { &mut *collection.as_ptr() };
        coll.device = ptr;
        coll.count = len;
        Ok(())
    }
    fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> {
        assert!(!collection.as_ptr().is_null());
        let coll = unsafe { &mut *collection.as_ptr() };
        if coll.device.is_null() {
            return Ok(());
        }

        let mut devices = retake_forgotten_vec(coll.device, coll.count);
        for device in &mut devices {
            destroy_cubeb_device_info(device);
        }
        drop(devices); // Release the memory.
        coll.device = ptr::null_mut();
        coll.count = 0;
        Ok(())
    }
    fn stream_init(
        &mut self,
        _stream_name: Option<&CStr>,
        input_device: DeviceId,
        input_stream_params: Option<&StreamParamsRef>,
        output_device: DeviceId,
        output_stream_params: Option<&StreamParamsRef>,
        latency_frames: u32,
        data_callback: ffi::cubeb_data_callback,
        state_callback: ffi::cubeb_state_callback,
        user_ptr: *mut c_void,
    ) -> Result<Stream> {
        if !input_device.is_null() && input_stream_params.is_none() {
            cubeb_log!("Cannot init an input device without input stream params");
            return Err(Error::invalid_parameter());
        }

        if !output_device.is_null() && output_stream_params.is_none() {
            cubeb_log!("Cannot init an output device without output stream params");
            return Err(Error::invalid_parameter());
        }

        if input_stream_params.is_none() && output_stream_params.is_none() {
            cubeb_log!("Cannot init a stream without any stream params");
            return Err(Error::invalid_parameter());
        }

        if data_callback.is_none() {
            cubeb_log!("Cannot init a stream without a data callback");
            return Err(Error::invalid_parameter());
        }

        let in_stm_settings = if let Some(params) = input_stream_params {
            let in_device = match self
                .serial_queue
                .run_sync(|| create_device_info(input_device as AudioDeviceID, DeviceType::INPUT))
                .unwrap()
            {
                None => {
                    cubeb_log!("Fail to create device info for input");
                    return Err(Error::error());
                }
                Some(d) => d,
            };
            let stm_params = StreamParams::from(unsafe { *params.as_ptr() });
            Some((stm_params, in_device))
        } else {
            None
        };

        let out_stm_settings = if let Some(params) = output_stream_params {
            let out_device = match self
                .serial_queue
                .run_sync(|| create_device_info(output_device as AudioDeviceID, DeviceType::OUTPUT))
                .unwrap()
            {
                None => {
                    cubeb_log!("Fail to create device info for output");
                    return Err(Error::error());
                }
                Some(d) => d,
            };
            let stm_params = StreamParams::from(unsafe { *params.as_ptr() });
            Some((stm_params, out_device))
        } else {
            None
        };

        // Latency cannot change if another stream is operating in parallel. In this case
        // latency is set to the other stream value.
        let global_latency_frames = self.update_latency_by_adding_stream(latency_frames);
        if global_latency_frames != latency_frames {
            cubeb_log!(
                "Use global latency {} instead of the requested latency {}.",
                global_latency_frames,
                latency_frames
            );
        }

        let mut boxed_stream = Box::new(AudioUnitStream::new(
            self,
            user_ptr,
            data_callback,
            state_callback,
            global_latency_frames,
        ));

        // Rename the task queue to be an unique label.
        let queue_label = format!(
            "{}.stream.{:p}",
            DISPATCH_QUEUE_LABEL,
            boxed_stream.as_ref()
        );
        boxed_stream.queue = Queue::new_with_target(queue_label.as_str(), &boxed_stream.queue);

        boxed_stream.core_stream_data =
            CoreStreamData::new(boxed_stream.as_ref(), in_stm_settings, out_stm_settings);

        let result = boxed_stream
            .queue
            .clone()
            .run_sync(|| {
                boxed_stream
                    .core_stream_data
                    .setup(&mut boxed_stream.context.shared_voice_processing_unit)
            })
            .unwrap();
        if let Err(r) = result {
            cubeb_log!(
                "({:p}) Could not setup the audiounit stream.",
                boxed_stream.as_ref()
            );
            return Err(r);
        }

        let cubeb_stream = unsafe { Stream::from_ptr(Box::into_raw(boxed_stream) as *mut _) };
        cubeb_log!(
            "({:p}) Cubeb stream init successful.",
            cubeb_stream.as_ref()
        );
        Ok(cubeb_stream)
    }
    fn register_device_collection_changed(
        &mut self,
        devtype: DeviceType,
        collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
        user_ptr: *mut c_void,
    ) -> Result<()> {
        if devtype == DeviceType::UNKNOWN {
            return Err(Error::invalid_parameter());
        }
        self.serial_queue
            .clone()
            .run_sync(|| {
                if collection_changed_callback.is_some() {
                    self.add_devices_changed_listener(
                        devtype,
                        collection_changed_callback,
                        user_ptr,
                    )
                } else {
                    self.remove_devices_changed_listener(devtype)
                }
            })
            .unwrap()
    }
}

impl Drop for AudioUnitContext {
    fn drop(&mut self) {
        assert!({
            let devices = self.devices.lock().unwrap();
            devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none()
        });

        self.shared_voice_processing_unit =
            SharedVoiceProcessingUnitManager::new(self.serial_queue.clone());

        // Make sure all the pending (device-collection-changed-callback) tasks
        // in queue are done, and cancel all the tasks appended after `drop` is executed.
        let queue = self.serial_queue.clone();
        queue.run_final(|| {});

        {
            let controller = self.latency_controller.lock().unwrap();
            // Disabling this assert in release for bug 1083664 -- we seem to leak a stream
            // assert(controller.streams == 0);
            debug_assert!(controller.streams == 0);
            if controller.streams > 0 {
                cubeb_log!(
                    "({:p}) API misuse, {} streams active when context destroyed!",
                    self as *const AudioUnitContext,
                    controller.streams
                );
            }
        }
    }
}

#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl Send for AudioUnitContext {}
unsafe impl Sync for AudioUnitContext {}

// Holds the information for an audio input callback call, for debugging purposes.
struct InputCallbackData {
    bytes: u32,
    rendered_frames: u32,
    total_available: usize,
    channels: u32,
    num_buf: u32,
}
struct InputCallbackLogger {
    prod: ringbuf::Producer<InputCallbackData>,
    cons: ringbuf::Consumer<InputCallbackData>,
}

impl InputCallbackLogger {
    fn new() -> Self {
        let ring = RingBuffer::<InputCallbackData>::new(16);
        let (prod, cons) = ring.split();
        Self { prod, cons }
    }

    fn push(&mut self, data: InputCallbackData) {
        self.prod.push(data);
    }

    fn pop(&mut self) -> Option<InputCallbackData> {
        self.cons.pop()
    }

    fn is_empty(&self) -> bool {
        self.cons.is_empty()
    }
}

impl fmt::Debug for InputCallbackLogger {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "InputCallbackLogger  {{ prod: {}, cons: {} }}",
            self.prod.len(),
            self.cons.len()
        )
    }
}

#[derive(Debug)]
struct CoreStreamData<'ctx> {
    stm_ptr: *const AudioUnitStream<'ctx>,
    aggregate_device: Option<AggregateDevice>,
    mixer: Option<Mixer>,
    resampler: Resampler,
    // Stream creation parameters.
    input_stream_params: StreamParams,
    output_stream_params: StreamParams,
    // Device settings for AudioUnits.
    input_dev_desc: AudioStreamBasicDescription,
    output_dev_desc: AudioStreamBasicDescription,
    // I/O AudioUnits.
    input_unit: AudioUnit,
    output_unit: AudioUnit,
    // Handle to shared voiceprocessing AudioUnit, if in use.
    voiceprocessing_unit_handle: Option<OwningHandle<VoiceProcessingUnit>>,
    // Info of the I/O devices.
    input_device: device_info,
    output_device: device_info,
    input_processing_params: InputProcessingParams,
    input_mute: bool,
    input_buffer_manager: Option<BufferManager>,
    units_running: bool,
    // Listeners indicating what system events are monitored.
    default_input_listener: Option<device_property_listener>,
    default_output_listener: Option<device_property_listener>,
    input_alive_listener: Option<device_property_listener>,
    input_source_listener: Option<device_property_listener>,
    output_alive_listener: Option<device_property_listener>,
    output_source_listener: Option<device_property_listener>,
    input_logging: Option<InputCallbackLogger>,
    #[cfg(feature = "audio-dump")]
    audio_dump_session: ffi::cubeb_audio_dump_session_t,
    #[cfg(feature = "audio-dump")]
    audio_dump_session_running: bool,
    #[cfg(feature = "audio-dump")]
    audio_dump_input: ffi::cubeb_audio_dump_stream_t,
    #[cfg(feature = "audio-dump")]
    audio_dump_output: ffi::cubeb_audio_dump_stream_t,
}

impl<'ctx> Default for CoreStreamData<'ctx> {
    fn default() -> Self {
        Self {
            stm_ptr: ptr::null(),
            aggregate_device: None,
            mixer: None,
            resampler: Resampler::default(),
            input_stream_params: StreamParams::from(ffi::cubeb_stream_params {
                format: ffi::CUBEB_SAMPLE_FLOAT32NE,
                rate: 0,
                channels: 0,
                layout: ffi::CUBEB_LAYOUT_UNDEFINED,
                prefs: ffi::CUBEB_STREAM_PREF_NONE,
            }),
            output_stream_params: StreamParams::from(ffi::cubeb_stream_params {
                format: ffi::CUBEB_SAMPLE_FLOAT32NE,
                rate: 0,
                channels: 0,
                layout: ffi::CUBEB_LAYOUT_UNDEFINED,
                prefs: ffi::CUBEB_STREAM_PREF_NONE,
            }),
            input_dev_desc: AudioStreamBasicDescription::default(),
            output_dev_desc: AudioStreamBasicDescription::default(),
            input_unit: ptr::null_mut(),
            output_unit: ptr::null_mut(),
            voiceprocessing_unit_handle: None,
            input_device: device_info::default(),
            output_device: device_info::default(),
            input_processing_params: InputProcessingParams::NONE,
            input_mute: false,
            input_buffer_manager: None,
            units_running: false,
            default_input_listener: None,
            default_output_listener: None,
            input_alive_listener: None,
            input_source_listener: None,
            output_alive_listener: None,
            output_source_listener: None,
            input_logging: None,
            #[cfg(feature = "audio-dump")]
            audio_dump_session: ptr::null_mut(),
            #[cfg(feature = "audio-dump")]
            audio_dump_session_running: false,
            #[cfg(feature = "audio-dump")]
            audio_dump_input: ptr::null_mut(),
            #[cfg(feature = "audio-dump")]
            audio_dump_output: ptr::null_mut(),
        }
    }
}

impl<'ctx> CoreStreamData<'ctx> {
    fn new(
        stm: &AudioUnitStream<'ctx>,
        input_stream_settings: Option<(StreamParams, device_info)>,
        output_stream_settings: Option<(StreamParams, device_info)>,
    ) -> Self {
        fn get_default_sttream_params() -> StreamParams {
            StreamParams::from(ffi::cubeb_stream_params {
                format: ffi::CUBEB_SAMPLE_FLOAT32NE,
                rate: 0,
                channels: 0,
                layout: ffi::CUBEB_LAYOUT_UNDEFINED,
                prefs: ffi::CUBEB_STREAM_PREF_NONE,
            })
        }
        let (in_stm_params, in_dev) =
            input_stream_settings.unwrap_or((get_default_sttream_params(), device_info::default()));
        let (out_stm_params, out_dev) = output_stream_settings
            .unwrap_or((get_default_sttream_params(), device_info::default()));
        Self {
            stm_ptr: stm,
            aggregate_device: None,
            mixer: None,
            resampler: Resampler::default(),
            input_stream_params: in_stm_params,
            output_stream_params: out_stm_params,
            input_dev_desc: AudioStreamBasicDescription::default(),
            output_dev_desc: AudioStreamBasicDescription::default(),
            input_unit: ptr::null_mut(),
            output_unit: ptr::null_mut(),
            voiceprocessing_unit_handle: None,
            input_device: in_dev,
            output_device: out_dev,
            input_processing_params: InputProcessingParams::NONE,
            input_mute: false,
            input_buffer_manager: None,
            units_running: false,
            default_input_listener: None,
            default_output_listener: None,
            input_alive_listener: None,
            input_source_listener: None,
            output_alive_listener: None,
            output_source_listener: None,
            input_logging: None,
            #[cfg(feature = "audio-dump")]
            audio_dump_session: ptr::null_mut(),
            #[cfg(feature = "audio-dump")]
            audio_dump_session_running: false,
            #[cfg(feature = "audio-dump")]
            audio_dump_input: ptr::null_mut(),
            #[cfg(feature = "audio-dump")]
            audio_dump_output: ptr::null_mut(),
        }
    }

    fn debug_assert_is_on_stream_queue(&self) {
        if self.stm_ptr.is_null() {
            return;
        }
        let stm = unsafe { &*self.stm_ptr };
        stm.queue.debug_assert_is_current();
    }

    fn start_audiounits(&mut self) -> Result<()> {
        self.debug_assert_is_on_stream_queue();
        // Only allowed to be called after the stream is initialized
        // and before the stream is destroyed.
        debug_assert!(!self.input_unit.is_null() || !self.output_unit.is_null());

        if !self.input_unit.is_null() {
            start_audiounit(self.input_unit)?;
        }
        if self.using_voice_processing_unit() {
            // Handle the VoiceProcessIO case where there is a single unit.

            // Always try to remember the applied input processing params. If they cannot
            // be applied in the new device pair, we notify the client of an error and it
            // will have to open a new stream.
            if let Err(r) =
                set_input_processing_params(self.input_unit, self.input_processing_params)
            {
                cubeb_log!(
                    "({:p}) Failed to set params of voiceprocessing. Error: {}",
                    self.stm_ptr,
                    r
                );
                return Err(r);
            }
            return Ok(());
        }
        if !self.output_unit.is_null() {
            start_audiounit(self.output_unit)?;
        }
        self.units_running = true;
        Ok(())
    }

    fn stop_audiounits(&mut self) {
        self.debug_assert_is_on_stream_queue();
        self.units_running = false;
        if !self.input_unit.is_null() {
            let r = stop_audiounit(self.input_unit);
            assert!(r.is_ok());
        }
        if self.using_voice_processing_unit() {
            // Handle the VoiceProcessIO case where there is a single unit.

            // Always reset input processing params to VPIO defaults in case VPIO is reused later.
            let vpio_defaults = InputProcessingParams::ECHO_CANCELLATION
                | InputProcessingParams::AUTOMATIC_GAIN_CONTROL
                | InputProcessingParams::NOISE_SUPPRESSION;
            if let Err(r) = set_input_processing_params(self.input_unit, vpio_defaults) {
                cubeb_log!(
                    "({:p}) Failed to reset params of voiceprocessing. Error: {}",
                    self.stm_ptr,
                    r
                );
            }
            return;
        }
        if !self.output_unit.is_null() {
            let r = stop_audiounit(self.output_unit);
            assert!(r.is_ok());
        }
    }

    fn has_input(&self) -> bool {
        self.input_stream_params.rate() > 0
    }

    fn has_output(&self) -> bool {
        self.output_stream_params.rate() > 0
    }

    fn using_voice_processing_unit(&self) -> bool {
        self.voiceprocessing_unit_handle.is_some()
    }

    fn same_clock_domain(&self) -> bool {
        self.debug_assert_is_on_stream_queue();
        // If not setting up a duplex stream, there is only one device,
        // no reclocking necessary.
        if !(self.has_input() && self.has_output()) {
            return true;
        }
        let input_domain = match get_clock_domain(self.input_device.id, DeviceType::INPUT) {
            Ok(clock_domain) => clock_domain,
            Err(_) => {
                cubeb_log!("Coudn't determine clock domains for input.");
                return false;
            }
        };

        let output_domain = match get_clock_domain(self.output_device.id, DeviceType::OUTPUT) {
            Ok(clock_domain) => clock_domain,
            Err(_) => {
                cubeb_log!("Coudn't determine clock domains for input.");
                return false;
            }
        };
        input_domain == output_domain
    }

    #[allow(non_upper_case_globals)]
    fn should_force_vpio_for_input_device(id: AudioDeviceID) -> bool {
        assert!(id != kAudioObjectUnknown);
        debug_assert_running_serially();
        match get_device_transport_type(id, DeviceType::INPUT) {
            Ok(kAudioDeviceTransportTypeBuiltIn) => {
                cubeb_log!(
                    "Input device {} is on the VPIO force list because it is built in, \
                     and its volume is known to be very low without VPIO whenever VPIO \
                     is hooked up to it elsewhere.",
                    id
                );
                true
            }
            _ => false,
        }
    }

    fn should_block_vpio_for_device_pair(
        &self,
        in_device: &device_info,
        out_device: &device_info,
    ) -> bool {
        self.debug_assert_is_on_stream_queue();
        cubeb_log!("Evaluating device pair against VPIO block list");
        let log_device_and_get_model_uid = |id, devtype| -> String {
            let device_model_uid = get_device_model_uid(id, devtype)
                .map(|s| s.into_string())
                .unwrap_or_default();
            cubeb_log!("{} uid=\"{}\", model_uid=\"{}\", transport_type={:?}, source={:?}, source_name=\"{}\", name=\"{}\", manufacturer=\"{}\"",
                if devtype == DeviceType::INPUT {
                    "Input"
                } else {
                    debug_assert_eq!(devtype, DeviceType::OUTPUT);
                    "Output"
                },
                get_device_uid(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
                device_model_uid,
                convert_uint32_into_string(get_device_transport_type(id, devtype).unwrap_or(0)),
                convert_uint32_into_string(get_device_source(id, devtype).unwrap_or(0)),
                get_device_source_name(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
                get_device_name(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
                get_device_manufacturer(id, devtype).map(|s| s.into_string()).unwrap_or_default());
            device_model_uid
        };

        #[allow(non_upper_case_globals)]
        let in_id = match in_device.id {
            kAudioObjectUnknown => None,
            id => Some(id),
        };
        #[allow(non_upper_case_globals)]
        let out_id = match out_device.id {
            kAudioObjectUnknown => None,
            id => Some(id),
        };

        let (in_model_uid, out_model_uid) = (
            in_id
                .map(|id| log_device_and_get_model_uid(id, DeviceType::INPUT))
                .unwrap_or_default(),
            out_id
                .or_else(|| get_default_device(DeviceType::OUTPUT))
                .map(|id| log_device_and_get_model_uid(id, DeviceType::OUTPUT))
                .unwrap_or_default(),
        );

        if in_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID)
            && out_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID)
        {
            cubeb_log!("Both input and output device is an Apple Studio Display. BLOCKED");
            return true;
        }

        cubeb_log!("Device pair is not blocked");
        false
    }

    fn create_audiounits(
        &mut self,
        shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
    ) -> Result<(device_info, device_info)> {
        self.debug_assert_is_on_stream_queue();
        let should_use_voice_processing_unit = self.has_input()
            && (self
                .input_stream_params
                .prefs()
                .contains(StreamPrefs::VOICE)
                || CoreStreamData::should_force_vpio_for_input_device(self.input_device.id))
            && !self.should_block_vpio_for_device_pair(&self.input_device, &self.output_device)
            && macos_kernel_major_version() != Ok(MACOS_KERNEL_MAJOR_VERSION_MONTEREY);

        let should_use_aggregate_device = {
            // It's impossible to create an aggregate device from an aggregate device, and it's
            // unnecessary to create an aggregate device when opening the same device input/output. In
            // all other cases, use an aggregate device.
            let mut either_already_aggregate = false;
            if self.has_input() {
                let input_is_aggregate =
                    get_device_transport_type(self.input_device.id, DeviceType::INPUT).unwrap_or(0)
                        == kAudioDeviceTransportTypeAggregate;
                if input_is_aggregate {
                    either_already_aggregate = true;
                }
                cubeb_log!(
                    "Input device ID: {} (aggregate: {:?})",
                    self.input_device.id,
                    input_is_aggregate
                );
            }
            if self.has_output() {
                let output_is_aggregate =
                    get_device_transport_type(self.output_device.id, DeviceType::OUTPUT)
                        .unwrap_or(0)
                        == kAudioDeviceTransportTypeAggregate;
                if output_is_aggregate {
                    either_already_aggregate = true;
                }
                cubeb_log!(
                    "Output device ID: {} (aggregate: {:?})",
                    self.output_device.id,
                    output_is_aggregate
                );
            }
            // Only use an aggregate device when the device are different.
            self.has_input()
                && self.has_output()
                && self.input_device.id != self.output_device.id
                && !either_already_aggregate
        };

        // Create an AudioUnit:
        // - If we're eligible to use voice processing, try creating a VoiceProcessingIO AudioUnit.
        // - If we should use an aggregate device, try creating one and input and output AudioUnits next.
        // - As last resort, create regular AudioUnits. This is also the normal non-duplex path.

        if should_use_voice_processing_unit {
            if let Ok(mut au_handle) = get_voiceprocessing_audiounit(
                shared_voice_processing_unit,
                &self.input_device,
                &self.output_device,
            ) {
                self.input_unit = au_handle.as_mut().unit;
                if self.has_output() {
                    self.output_unit = au_handle.as_mut().unit;
                }
                self.voiceprocessing_unit_handle = Some(au_handle);
                return Ok((self.input_device.clone(), self.output_device.clone()));
            }
            cubeb_log!(
                "({:p}) Failed to get VoiceProcessingIO AudioUnit. Trying a regular one.",
                self.stm_ptr
            );
        }

        if should_use_aggregate_device {
            if let Ok(device) = AggregateDevice::new(self.input_device.id, self.output_device.id) {
                let in_dev_info = {
                    device_info {
                        id: device.get_device_id(),
                        ..self.input_device
                    }
                };
                let out_dev_info = {
                    device_info {
                        id: device.get_device_id(),
                        ..self.output_device
                    }
                };

                match (
                    create_audiounit(&in_dev_info),
                    create_audiounit(&out_dev_info),
                ) {
                    (Ok(in_au), Ok(out_au)) => {
                        cubeb_log!(
                            "({:p}) Using an aggregate device {} for input and output.",
                            self.stm_ptr,
                            device.get_device_id()
                        );
                        self.aggregate_device = Some(device);
                        self.input_unit = in_au;
                        self.output_unit = out_au;
                        return Ok((in_dev_info, out_dev_info));
                    }
                    (Err(e), Ok(au)) => {
                        cubeb_log!(
                            "({:p}) Failed to create input AudioUnit for aggregate device. Error: {}.",
                            self.stm_ptr,
                            e
                        );
                        dispose_audio_unit(au);
                    }
                    (Ok(au), Err(e)) => {
                        cubeb_log!(
                            "({:p}) Failed to create output AudioUnit for aggregate device. Error: {}.",
                            self.stm_ptr,
                            e
                        );
                        dispose_audio_unit(au);
                    }
                    (Err(e), _) => {
                        cubeb_log!(
                            "({:p}) Failed to create AudioUnits for aggregate device. Error: {}.",
                            self.stm_ptr,
                            e
                        );
                    }
                }
            }
            cubeb_log!(
                "({:p}) Failed to set up aggregate device. Using regular AudioUnits.",
                self.stm_ptr
            );
        }

        if self.has_input() {
            match create_audiounit(&self.input_device) {
                Ok(in_au) => self.input_unit = in_au,
                Err(e) => {
                    cubeb_log!(
                        "({:p}) Failed to create regular AudioUnit for input. Error: {}",
                        self.stm_ptr,
                        e
                    );
                    return Err(e);
                }
            }
        }

        if self.has_output() {
            match create_audiounit(&self.output_device) {
                Ok(out_au) => self.output_unit = out_au,
                Err(e) => {
                    cubeb_log!(
                        "({:p}) Failed to create regular AudioUnit for output. Error: {}",
                        self.stm_ptr,
                        e
                    );
                    if !self.input_unit.is_null() {
                        dispose_audio_unit(self.input_unit);
                        self.input_unit = ptr::null_mut();
                    }
                    return Err(e);
                }
            }
        }

        Ok((self.input_device.clone(), self.output_device.clone()))
    }

    #[allow(clippy::cognitive_complexity)] // TODO: Refactoring.
    fn setup(
        &mut self,
        shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
    ) -> Result<()> {
        self.debug_assert_is_on_stream_queue();
        if self
            .input_stream_params
            .prefs()
            .contains(StreamPrefs::LOOPBACK)
            || self
                .output_stream_params
                .prefs()
                .contains(StreamPrefs::LOOPBACK)
        {
            cubeb_log!("({:p}) Loopback not supported for audiounit.", self.stm_ptr);
            return Err(Error::not_supported());
        }

        let same_clock_domain = self.same_clock_domain();
        let (in_dev_info, out_dev_info) = self.create_audiounits(shared_voice_processing_unit)?;
        let using_voice_processing_unit = self.using_voice_processing_unit();

        assert!(!self.stm_ptr.is_null());
        let stream = unsafe { &(*self.stm_ptr) };

        #[cfg(feature = "audio-dump")]
        unsafe {
            ffi::cubeb_audio_dump_init(&mut self.audio_dump_session);
        }

        // Configure I/O stream
        if self.has_input() {
            assert!(!self.input_unit.is_null());

            cubeb_log!(
                "({:p}) Initializing input by device info: {:?}",
                self.stm_ptr,
                in_dev_info
            );

            let device_channel_count =
                get_channel_count(self.input_device.id, DeviceType::INPUT).unwrap_or(0);
            if device_channel_count < self.input_stream_params.channels() {
                cubeb_log!(
                    "({:p}) Invalid input channel count; device={}, params={}",
                    self.stm_ptr,
                    device_channel_count,
                    self.input_stream_params.channels()
                );
                return Err(Error::invalid_parameter());
            }

            cubeb_log!(
                "({:p}) Opening input side: rate {}, channels {}, format {:?}, layout {:?}, prefs {:?}, latency in frames {}, voice processing {}.",
                self.stm_ptr,
                self.input_stream_params.rate(),
                self.input_stream_params.channels(),
                self.input_stream_params.format(),
                self.input_stream_params.layout(),
                self.input_stream_params.prefs(),
                stream.latency_frames,
                using_voice_processing_unit
            );

            // Get input device hardware information.
            let mut input_hw_desc = AudioStreamBasicDescription::default();
            let mut size = mem::size_of::<AudioStreamBasicDescription>();
            let r = audio_unit_get_property(
                self.input_unit,
                kAudioUnitProperty_StreamFormat,
                if using_voice_processing_unit {
                    // With a VPIO unit the input scope includes AEC reference channels.
                    // We need to use the output scope of the input bus.
                    kAudioUnitScope_Output
                } else {
                    // With a HAL unit the output scope for the input bus returns the number of
                    // output channels of the output device, i.e. it seems the bus is ignored.
                    kAudioUnitScope_Input
                },
                AU_IN_BUS,
                &mut input_hw_desc,
                &mut size,
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitGetProperty/input/kAudioUnitProperty_StreamFormat rv={}",
                    r
                );
                return Err(Error::error());
            }
            cubeb_log!(
                "({:p}) Input hardware description: {:?}",
                self.stm_ptr,
                input_hw_desc
            );
            // Notice: when we are using aggregate device, the input_hw_desc.mChannelsPerFrame is
            // the total of all the input channel count of the devices added in the aggregate device.
            // Due to our aggregate device settings, the data captured by the output device's input
            // channels will be put in the beginning of the raw data given by the input callback.

            // Always request all the input channels of the device, and only pass the correct
            // channels to the audio callback.
            let params = unsafe {
                let mut p = *self.input_stream_params.as_ptr();
                p.channels = input_hw_desc.mChannelsPerFrame;
                // Input AudioUnit must be configured with device's sample rate.
                // we will resample inside input callback.
                p.rate = input_hw_desc.mSampleRate as _;
                StreamParams::from(p)
            };

            self.input_dev_desc = create_stream_description(¶ms).inspect_err(|_| {
                cubeb_log!(
                    "({:p}) Setting format description for input failed.",
                    self.stm_ptr
                );
            })?;

            #[cfg(feature = "audio-dump")]
            {
                let name = format!("input-{:p}.wav", self.stm_ptr);
                let cname = CString::new(name).expect("OK");
                let rv = unsafe {
                    ffi::cubeb_audio_dump_stream_init(
                        self.audio_dump_session,
                        &mut self.audio_dump_input,
                        *params.as_ptr(),
                        cname.as_ptr(),
                    )
                };
                if rv == 0 {
                    assert_ne!(self.audio_dump_input, ptr::null_mut(),);
                    cubeb_log!("Successfully inited audio dump for input");
                } else {
                    cubeb_log!("Failed to init audio dump for input");
                }
            }

            assert_eq!(self.input_dev_desc.mSampleRate, input_hw_desc.mSampleRate);

            // Use latency to set buffer size
            assert_ne!(stream.latency_frames, 0);
            if let Err(r) =
                set_buffer_size_sync(self.input_unit, DeviceType::INPUT, stream.latency_frames)
            {
                cubeb_log!("({:p}) Error in change input buffer size.", self.stm_ptr);
                return Err(r);
            }

            let r = audio_unit_set_property(
                self.input_unit,
                kAudioUnitProperty_StreamFormat,
                kAudioUnitScope_Output,
                AU_IN_BUS,
                &self.input_dev_desc,
                mem::size_of::<AudioStreamBasicDescription>(),
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitSetProperty/input/kAudioUnitProperty_StreamFormat rv={}",
                    r
                );
                return Err(Error::error());
            }

            // Frames per buffer in the input callback.
            let r = audio_unit_set_property(
                self.input_unit,
                kAudioUnitProperty_MaximumFramesPerSlice,
                kAudioUnitScope_Global,
                AU_IN_BUS,
                &stream.latency_frames,
                mem::size_of::<u32>(),
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitSetProperty/input/kAudioUnitProperty_MaximumFramesPerSlice rv={}",
                    r
                );
                return Err(Error::error());
            }

            // When we use the aggregate device, the self.input_dev_desc.mChannelsPerFrame is the
            // total input channel count of all the device added in the aggregate device. However,
            // we only need the audio data captured by the requested input device, so we need to
            // ignore some data captured by the audio input of the requested output device (e.g.,
            // the requested output device is a USB headset with built-in mic), in the beginning of
            // the raw data taken from input callback.
            self.input_buffer_manager = Some(BufferManager::new(
                self.input_stream_params.format(),
                SAFE_MAX_LATENCY_FRAMES as usize,
                self.input_dev_desc.mChannelsPerFrame as usize,
                self.input_dev_desc
                    .mChannelsPerFrame
                    .saturating_sub(device_channel_count) as usize,
                self.input_stream_params.channels() as usize,
            ));

            let aurcbs_in = AURenderCallbackStruct {
                inputProc: Some(audiounit_input_callback),
                inputProcRefCon: self.stm_ptr as *mut c_void,
            };

            let r = audio_unit_set_property(
                self.input_unit,
                kAudioOutputUnitProperty_SetInputCallback,
                kAudioUnitScope_Global,
                AU_OUT_BUS,
                &aurcbs_in,
                mem::size_of_val(&aurcbs_in),
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitSetProperty/input/kAudioOutputUnitProperty_SetInputCallback rv={}",
                    r
                );
                return Err(Error::error());
            }

            stream.frames_read.store(0, Ordering::SeqCst);

            cubeb_log!(
                "({:p}) Input audiounit init with device {} successfully.",
                self.stm_ptr,
                in_dev_info.id
            );
        }

        if self.has_input() && !self.has_output() && using_voice_processing_unit {
            // We must configure the output side of VPIO to match the input side, even if we don't use it.
            let r = audio_unit_set_property(
                self.input_unit,
                kAudioUnitProperty_StreamFormat,
                kAudioUnitScope_Input,
                AU_OUT_BUS,
                &self.input_dev_desc,
                mem::size_of::<AudioStreamBasicDescription>(),
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitSetProperty/output/kAudioUnitProperty_StreamFormat rv={}",
                    r
                );
                return Err(Error::error());
            }
        }

        if self.has_output() {
            assert!(!self.output_unit.is_null());

            cubeb_log!(
                "({:p}) Initialize output by device info: {:?}",
                self.stm_ptr,
                out_dev_info
            );

            cubeb_log!(
                "({:p}) Opening output side: rate {}, channels {}, format {:?}, layout {:?}, prefs {:?}, latency in frames {}, voice processing {}.",
                self.stm_ptr,
                self.output_stream_params.rate(),
                self.output_stream_params.channels(),
                self.output_stream_params.format(),
                self.output_stream_params.layout(),
                self.output_stream_params.prefs(),
                stream.latency_frames,
                using_voice_processing_unit
            );

            // Get output device hardware information.
            let mut output_hw_desc = AudioStreamBasicDescription::default();
            let mut size = mem::size_of::<AudioStreamBasicDescription>();
            let r = audio_unit_get_property(
                self.output_unit,
                kAudioUnitProperty_StreamFormat,
                if using_voice_processing_unit {
                    // With a VPIO unit the output scope includes all channels in the hw.
                    // The VPIO unit however is only MONO which the input scope reflects.
                    kAudioUnitScope_Input
                } else {
                    // With a HAL unit the output scope for the output bus returns the number of
                    // output channels of the hw, as we want. The input scope seems limited to
                    // two channels.
                    kAudioUnitScope_Output
                },
                AU_OUT_BUS,
                &mut output_hw_desc,
                &mut size,
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitGetProperty/output/kAudioUnitProperty_StreamFormat rv={}",
                    r
                );
                return Err(Error::error());
            }
            cubeb_log!(
                "({:p}) Output hardware description: {:?}",
                self.stm_ptr,
                output_hw_desc
            );

            // This has been observed in the wild.
            if output_hw_desc.mChannelsPerFrame == 0 {
                cubeb_log!(
                    "({:p}) Output hardware description channel count is zero",
                    self.stm_ptr
                );
                return Err(Error::error());
            }

            // Simple case of stereo output, map to the stereo pair (that might not be the first
            // two channels). Fall back to regular mixing if this fails.
            let mut maybe_need_mixer = true;
            if self.output_stream_params.channels() == 2
                && self.output_stream_params.layout() == ChannelLayout::STEREO
            {
                let layout = AudioChannelLayout {
                    mChannelLayoutTag: kAudioChannelLayoutTag_Stereo,
                    ..Default::default()
                };
                let r = audio_unit_set_property(
                    self.output_unit,
                    kAudioUnitProperty_AudioChannelLayout,
                    kAudioUnitScope_Input,
                    AU_OUT_BUS,
                    &layout,
                    mem::size_of::<AudioChannelLayout>(),
                );
                if r != NO_ERR {
                    cubeb_log!(
                        "AudioUnitSetProperty/output/kAudioUnitProperty_AudioChannelLayout rv={}",
                        r
                    );
                }
                maybe_need_mixer = r != NO_ERR;
            }

            // Notice: when we are using aggregate device, the output_hw_desc.mChannelsPerFrame is
            // the total of all the output channel count of the devices added in the aggregate device.
            // Due to our aggregate device settings, the data recorded by the input device's output
            // channels will be appended at the end of the raw data given by the output callback.
            let params = unsafe {
                let mut p = *self.output_stream_params.as_ptr();
                p.channels = if maybe_need_mixer {
                    output_hw_desc.mChannelsPerFrame
                } else {
                    self.output_stream_params.channels()
                };
                if using_voice_processing_unit {
                    // VPIO will always use the sample rate of the input hw for both input and output,
                    // as reported to us. (We can override it but we cannot improve quality this way).
                    p.rate = self.input_dev_desc.mSampleRate as _;
                }
                StreamParams::from(p)
            };

            self.output_dev_desc = create_stream_description(¶ms).inspect_err(|_| {
                cubeb_log!(
                    "({:p}) Could not initialize the audio stream description.",
                    self.stm_ptr
                );
            })?;

            #[cfg(feature = "audio-dump")]
            {
                let name = format!("output-{:p}.wav", self.stm_ptr);
                let cname = CString::new(name).expect("OK");
                let rv = unsafe {
                    ffi::cubeb_audio_dump_stream_init(
                        self.audio_dump_session,
                        &mut self.audio_dump_output,
                        *params.as_ptr(),
                        cname.as_ptr(),
                    )
                };
                if rv == 0 {
                    assert_ne!(self.audio_dump_output, ptr::null_mut(),);
                    cubeb_log!("Successfully inited audio dump for output");
                } else {
                    cubeb_log!("Failed to init audio dump for output");
                }
            }

            let device_layout = self
                .get_output_channel_layout()
                .inspect_err(|_| {
                    cubeb_log!(
                        "({:p}) Could not get any channel layout. Defaulting to no channels.",
                        self.stm_ptr
                    );
                })
                .unwrap_or_default();

            cubeb_log!(
                "({:p} Using output device channel layout {:?}",
                self.stm_ptr,
                device_layout
            );

            if maybe_need_mixer {
                // The mixer will be set up when
                // 0. not playing simply stereo, or failing to set the channel layout to the stereo
                //    pair
                // 1. using aggregate device whose input device has output channels
                // 2. output device has more channels than we need, and stream isn't simply stereo
                // 3. output device has different layout than the one we have
                self.mixer = if self.output_dev_desc.mChannelsPerFrame
                    != self.output_stream_params.channels()
                    || device_layout != mixer::get_channel_order(self.output_stream_params.layout())
                {
                    cubeb_log!("Incompatible channel layouts detected, setting up remixer");
                    // We will be remixing the data before it reaches the output device.
                    Some(Mixer::new(
                        self.output_stream_params.format(),
                        self.output_stream_params.channels() as usize,
                        self.output_stream_params.layout(),
                        self.output_dev_desc.mChannelsPerFrame as usize,
                        device_layout,
                    ))
                } else {
                    None
                };
            }

            let r = audio_unit_set_property(
                self.output_unit,
                kAudioUnitProperty_StreamFormat,
                kAudioUnitScope_Input,
                AU_OUT_BUS,
                &self.output_dev_desc,
                mem::size_of::<AudioStreamBasicDescription>(),
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitSetProperty/output/kAudioUnitProperty_StreamFormat rv={}",
                    r
                );
                return Err(Error::error());
            }

            // Use latency to set buffer size
            assert_ne!(stream.latency_frames, 0);
            if let Err(r) =
                set_buffer_size_sync(self.output_unit, DeviceType::OUTPUT, stream.latency_frames)
            {
                cubeb_log!("({:p}) Error in change output buffer size.", self.stm_ptr);
                return Err(r);
            }

            // Frames per buffer in the input callback.
            let r = audio_unit_set_property(
                self.output_unit,
                kAudioUnitProperty_MaximumFramesPerSlice,
                kAudioUnitScope_Global,
                AU_OUT_BUS,
                &stream.latency_frames,
                mem::size_of::<u32>(),
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitSetProperty/output/kAudioUnitProperty_MaximumFramesPerSlice rv={}",
                    r
                );
                return Err(Error::error());
            }

            let aurcbs_out = AURenderCallbackStruct {
                inputProc: Some(audiounit_output_callback),
                inputProcRefCon: self.stm_ptr as *mut c_void,
            };
            let r = audio_unit_set_property(
                self.output_unit,
                kAudioUnitProperty_SetRenderCallback,
                kAudioUnitScope_Global,
                AU_OUT_BUS,
                &aurcbs_out,
                mem::size_of_val(&aurcbs_out),
            );
            if r != NO_ERR {
                cubeb_log!(
                    "AudioUnitSetProperty/output/kAudioUnitProperty_SetRenderCallback rv={}",
                    r
                );
                return Err(Error::error());
            }

            stream.frames_written.store(0, Ordering::SeqCst);

            cubeb_log!(
                "({:p}) Output audiounit init with device {} successfully.",
                self.stm_ptr,
                out_dev_info.id
            );
        }

        // We use a resampler because input AudioUnit operates
        // reliable only in the capture device sample rate.
        // Resampler will convert it to the user sample rate
        // and deliver it to the callback.
        let target_sample_rate = if self.has_input() {
            self.input_stream_params.rate()
        } else {
            assert!(self.has_output());
            self.output_stream_params.rate()
        };

        let resampler_input_params = if self.has_input() {
            let mut p = unsafe { *(self.input_stream_params.as_ptr()) };
            p.rate = self.input_dev_desc.mSampleRate as u32;
            Some(p)
        } else {
            None
        };
        let resampler_output_params = if self.has_output() {
            let mut p = unsafe { *(self.output_stream_params.as_ptr()) };
            p.rate = self.output_dev_desc.mSampleRate as u32;
            Some(p)
        } else {
            None
        };

        // Only reclock if there is an input and we couldn't use an aggregate device, and the
        // devices are not part of the same clock domain.
        let reclock_policy = if self.aggregate_device.is_none()
            && !using_voice_processing_unit
            && !same_clock_domain
        {
            cubeb_log!(
                "Reclocking duplex steam using_aggregate_device={} same_clock_domain={}",
                self.aggregate_device.is_some(),
                same_clock_domain
            );
            ffi::CUBEB_RESAMPLER_RECLOCK_INPUT
        } else {
            ffi::CUBEB_RESAMPLER_RECLOCK_NONE
        };

        self.resampler = Resampler::new(
            self.stm_ptr as *mut ffi::cubeb_stream,
            resampler_input_params,
            resampler_output_params,
            target_sample_rate,
            stream.data_callback,
            stream.user_ptr,
            ffi::CUBEB_RESAMPLER_QUALITY_DESKTOP,
            reclock_policy,
        );

        // In duplex, the input thread might be different from the output thread, and we're logging
        // everything from the output thread: relay the audio input callback information using a
        // ring buffer to diagnose issues.
        if self.has_input() && self.has_output() {
            self.input_logging = Some(InputCallbackLogger::new());
        }

        #[cfg(feature = "audio-dump")]
        {
            unsafe { ffi::cubeb_audio_dump_start(self.audio_dump_session) };
            self.audio_dump_session_running = true;
        }

        if !self.input_unit.is_null() {
            let r = audio_unit_initialize(self.input_unit);
            if r != NO_ERR {
                cubeb_log!("AudioUnitInitialize/input rv={}", r);
                return Err(Error::error());
            }

            stream.input_device_latency_frames.store(
                get_fixed_latency(self.input_device.id, DeviceType::INPUT),
                Ordering::SeqCst,
            );
        }

        if !self.output_unit.is_null() {
            if self.input_unit != self.output_unit {
                let r = audio_unit_initialize(self.output_unit);
                if r != NO_ERR {
                    cubeb_log!("AudioUnitInitialize/output rv={}", r);
                    return Err(Error::error());
                }
            }

            stream.output_device_latency_frames.store(
                get_fixed_latency(self.output_device.id, DeviceType::OUTPUT),
                Ordering::SeqCst,
            );

            let mut unit_s: f64 = 0.0;
            let mut size = mem::size_of_val(&unit_s);
            if audio_unit_get_property(
                self.output_unit,
                kAudioUnitProperty_Latency,
                kAudioUnitScope_Global,
                0,
                &mut unit_s,
                &mut size,
            ) == NO_ERR
            {
                stream.output_device_latency_frames.fetch_add(
                    (unit_s * self.output_dev_desc.mSampleRate) as u32,
                    Ordering::SeqCst,
                );
            }
        }

        if using_voice_processing_unit {
            // The VPIO AudioUnit automatically ducks other audio streams on the VPIO
            // output device. Its ramp duration is 0.5s when ducking, so unduck similarly
            // now.
            // NOTE: On MacOS 14 the ducking happens on creation of the VPIO AudioUnit.
            //       On MacOS 10.15 it happens on both creation and initialization, which
            //       is why we defer the unducking until now.
            #[allow(non_upper_case_globals)]
            let mut device = match self.output_device.id {
                kAudioObjectUnknown => None,
                id => Some(id),
            };
            device = device.or_else(|| get_default_device(DeviceType::OUTPUT));
            match device {
                None => {
                    cubeb_log!(
                        "({:p}) No output device to undo vpio ducking on",
                        self.stm_ptr
                    );
                }
                Some(id) => {
                    let r = audio_device_duck(id, 1.0, ptr::null_mut(), 0.5);
                    if r != NO_ERR {
                        cubeb_log!(
                            "({:p}) Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}",
                            self.stm_ptr,
                            id,
                            r
                        );
                    }
                }
            };

            // Always try to remember the applied input mute state. If it cannot be applied
            // to the new device pair, we notify the client of an error and it will have to
            // open a new stream.
            if let Err(r) = set_input_mute(self.input_unit, self.input_mute) {
                cubeb_log!(
                    "({:p}) Failed to set mute state of voiceprocessing. Error: {}",
                    self.stm_ptr,
                    r
                );
                return Err(r);
            }
        }

        if let Err(r) = self.install_system_changed_callback() {
            cubeb_log!(
                "({:p}) Could not install the device change callback.",
                self.stm_ptr
            );
            return Err(r);
        }

        if let Err(r) = self.install_device_changed_callback() {
            cubeb_log!(
                "({:p}) Could not install all device change callback.",
                self.stm_ptr
            );
            return Err(r);
        }

        // We have either default_input_listener or input_alive_listener.
        // We cannot have both of them at the same time.
        assert!(
            !self.has_input()
                || ((self.default_input_listener.is_some() != self.input_alive_listener.is_some())
                    && (self.default_input_listener.is_some()
                        || self.input_alive_listener.is_some()))
        );

        // We have either default_output_listener or output_alive_listener.
        // We cannot have both of them at the same time.
        assert!(
            !self.has_output()
                || ((self.default_output_listener.is_some()
                    != self.output_alive_listener.is_some())
                    && (self.default_output_listener.is_some()
                        || self.output_alive_listener.is_some()))
        );

        Ok(())
    }

    fn close(&mut self) {
        self.debug_assert_is_on_stream_queue();
        if !self.input_unit.is_null() {
            audio_unit_uninitialize(self.input_unit);
            if self.using_voice_processing_unit() {
                // Handle the VoiceProcessIO case where there is a single unit.
                self.output_unit = ptr::null_mut();
            }

            // Cannot unset self.input_unit yet, since the output callback might be live
            // and reading it.
        }

        if !self.output_unit.is_null() {
            audio_unit_uninitialize(self.output_unit);
            dispose_audio_unit(self.output_unit);
            self.output_unit = ptr::null_mut();
        }

        if !self.input_unit.is_null() {
            if !self.using_voice_processing_unit() {
                // The VPIO unit is shared and must not be disposed.
                dispose_audio_unit(self.input_unit);
            }
            self.input_unit = ptr::null_mut();
        }

        // Return the VPIO unit if present.
        self.voiceprocessing_unit_handle = None;

        #[cfg(feature = "audio-dump")]
        {
            if !self.audio_dump_session.is_null() {
                unsafe {
                    ffi::cubeb_audio_dump_stop(self.audio_dump_session);
                    if !self.audio_dump_input.is_null() {
                        let rv = ffi::cubeb_audio_dump_stream_shutdown(
                            self.audio_dump_session,
                            self.audio_dump_input,
                        );
                        if rv != 0 {
                            cubeb_log!("Failed to shutdown audio dump for input");
                        }
                    }
                    if !self.audio_dump_output.is_null() {
                        let rv = ffi::cubeb_audio_dump_stream_shutdown(
                            self.audio_dump_session,
                            self.audio_dump_output,
                        );
                        if rv != 0 {
                            cubeb_log!("Failed to shutdown audio dump for output");
                        }
                    }
                    ffi::cubeb_audio_dump_shutdown(self.audio_dump_session);
                    self.audio_dump_session = ptr::null_mut();
                    self.audio_dump_session_running = false;
                }
            }
        }

        self.resampler.destroy();
        self.mixer = None;
        self.aggregate_device = None;

        if self.uninstall_system_changed_callback().is_err() {
            cubeb_log!(
                "({:p}) Could not uninstall the system changed callback",
                self.stm_ptr
            );
        }

        if self.uninstall_device_changed_callback().is_err() {
            cubeb_log!(
                "({:p}) Could not uninstall all device change listeners",
                self.stm_ptr
            );
        }
    }

    fn install_device_changed_callback(&mut self) -> Result<()> {
        self.debug_assert_is_on_stream_queue();
        assert!(!self.stm_ptr.is_null());
        let stm = unsafe { &(*self.stm_ptr) };

        if !self.output_unit.is_null() {
            assert_ne!(self.output_device.id, kAudioObjectUnknown);
            assert_ne!(self.output_device.id, kAudioObjectSystemObject);
            assert!(
                self.output_source_listener.is_none(),
                "register output_source_listener without unregistering the one in use"
            );
            assert!(
                self.output_alive_listener.is_none(),
                "register output_alive_listener without unregistering the one in use"
            );

            // Get the notification when the data source on the same device changes,
            // e.g., when the user plugs in a TRRS headset into the headphone jack.
            self.output_source_listener = Some(device_property_listener::new(
                self.output_device.id,
                get_property_address(Property::DeviceSource, DeviceType::OUTPUT),
                audiounit_property_listener_callback,
            ));
            let rv = stm.add_device_listener(self.output_source_listener.as_ref().unwrap());
            if rv != NO_ERR {
                self.output_source_listener = None;
                cubeb_log!("AudioObjectAddPropertyListener/output/kAudioDevicePropertyDataSource rv={}, device id={}", rv, self.output_device.id);
                return Err(Error::error());
            }

            // Get the notification when the output device is going away
            // if the output doesn't follow the system default.
            if !self
                .output_device
                .flags
                .contains(device_flags::DEV_SELECTED_DEFAULT)
            {
                self.output_alive_listener = Some(device_property_listener::new(
                    self.output_device.id,
                    get_property_address(
                        Property::DeviceIsAlive,
                        DeviceType::INPUT | DeviceType::OUTPUT,
                    ),
                    audiounit_property_listener_callback,
                ));
                let rv = stm.add_device_listener(self.output_alive_listener.as_ref().unwrap());
                if rv != NO_ERR {
                    self.output_alive_listener = None;
                    cubeb_log!("AudioObjectAddPropertyListener/output/kAudioDevicePropertyDeviceIsAlive rv={}, device id ={}", rv, self.output_device.id);
                    return Err(Error::error());
                }
            }
        }

        if !self.input_unit.is_null() {
            assert_ne!(self.input_device.id, kAudioObjectUnknown);
            assert_ne!(self.input_device.id, kAudioObjectSystemObject);
            assert!(
                self.input_source_listener.is_none(),
                "register input_source_listener without unregistering the one in use"
            );
            assert!(
                self.input_alive_listener.is_none(),
                "register input_alive_listener without unregistering the one in use"
            );

            // Get the notification when the data source on the same device changes,
            // e.g., when the user plugs in a TRRS mic into the headphone jack.
            self.input_source_listener = Some(device_property_listener::new(
                self.input_device.id,
                get_property_address(Property::DeviceSource, DeviceType::INPUT),
                audiounit_property_listener_callback,
            ));
            let rv = stm.add_device_listener(self.input_source_listener.as_ref().unwrap());
            if rv != NO_ERR {
                self.input_source_listener = None;
                cubeb_log!("AudioObjectAddPropertyListener/input/kAudioDevicePropertyDataSource rv={}, device id={}", rv, self.input_device.id);
                return Err(Error::error());
            }

            // Get the notification when the input device is going away
            // if the input doesn't follow the system default.
            if !self
                .input_device
                .flags
                .contains(device_flags::DEV_SELECTED_DEFAULT)
            {
                self.input_alive_listener = Some(device_property_listener::new(
                    self.input_device.id,
                    get_property_address(
                        Property::DeviceIsAlive,
                        DeviceType::INPUT | DeviceType::OUTPUT,
                    ),
                    audiounit_property_listener_callback,
                ));
                let rv = stm.add_device_listener(self.input_alive_listener.as_ref().unwrap());
                if rv != NO_ERR {
                    self.input_alive_listener = None;
                    cubeb_log!("AudioObjectAddPropertyListener/input/kAudioDevicePropertyDeviceIsAlive rv={}, device id ={}", rv, self.input_device.id);
                    return Err(Error::error());
                }
            }
        }

        Ok(())
    }

    fn install_system_changed_callback(&mut self) -> Result<()> {
        self.debug_assert_is_on_stream_queue();
        assert!(!self.stm_ptr.is_null());
        let stm = unsafe { &(*self.stm_ptr) };

        if !self.output_unit.is_null()
            && self
                .output_device
                .flags
                .contains(device_flags::DEV_SELECTED_DEFAULT)
        {
            assert!(
                self.default_output_listener.is_none(),
                "register default_output_listener without unregistering the one in use"
            );

            // Get the notification when the default output audio changes, e.g.,
            // when the user plugs in a USB headset and the system chooses it automatically as the default,
            // or when another device is chosen in the dropdown list.
            self.default_output_listener = Some(device_property_listener::new(
                kAudioObjectSystemObject,
                get_property_address(
                    Property::HardwareDefaultOutputDevice,
                    DeviceType::INPUT | DeviceType::OUTPUT,
                ),
                audiounit_property_listener_callback,
            ));
            let r = stm.add_device_listener(self.default_output_listener.as_ref().unwrap());
            if r != NO_ERR {
                self.default_output_listener = None;
                cubeb_log!("AudioObjectAddPropertyListener/output/kAudioHardwarePropertyDefaultOutputDevice rv={}", r);
                return Err(Error::error());
            }
        }

        if !self.input_unit.is_null()
            && self
                .input_device
                .flags
                .contains(device_flags::DEV_SELECTED_DEFAULT)
        {
            assert!(
                self.default_input_listener.is_none(),
                "register default_input_listener without unregistering the one in use"
            );

            // Get the notification when the default intput audio changes, e.g.,
            // when the user plugs in a USB mic and the system chooses it automatically as the default,
            // or when another device is chosen in the system preference.
            self.default_input_listener = Some(device_property_listener::new(
                kAudioObjectSystemObject,
                get_property_address(
                    Property::HardwareDefaultInputDevice,
                    DeviceType::INPUT | DeviceType::OUTPUT,
                ),
                audiounit_property_listener_callback,
            ));
            let r = stm.add_device_listener(self.default_input_listener.as_ref().unwrap());
            if r != NO_ERR {
                self.default_input_listener = None;
                cubeb_log!("AudioObjectAddPropertyListener/input/kAudioHardwarePropertyDefaultInputDevice rv={}", r);
                return Err(Error::error());
            }
        }

        Ok(())
    }

    fn uninstall_device_changed_callback(&mut self) -> Result<()> {
        self.debug_assert_is_on_stream_queue();
        if self.stm_ptr.is_null() {
            assert!(
                self.output_source_listener.is_none()
                    && self.output_alive_listener.is_none()
                    && self.input_source_listener.is_none()
                    && self.input_alive_listener.is_none()
            );
            return Ok(());
        }

        let stm = unsafe { &(*self.stm_ptr) };

        // Failing to uninstall listeners is not a fatal error.
        let mut r = Ok(());

        if self.output_source_listener.is_some() {
            let rv = stm.remove_device_listener(self.output_source_listener.as_ref().unwrap());
            if rv != NO_ERR {
                cubeb_log!("AudioObjectRemovePropertyListener/output/kAudioDevicePropertyDataSource rv={}, device id={}", rv, self.output_device.id);
                r = Err(Error::error());
            }
            self.output_source_listener = None;
        }

        if self.output_alive_listener.is_some() {
            let rv = stm.remove_device_listener(self.output_alive_listener.as_ref().unwrap());
            if rv != NO_ERR {
                cubeb_log!("AudioObjectRemovePropertyListener/output/kAudioDevicePropertyDeviceIsAlive rv={}, device id={}", rv, self.output_device.id);
                r = Err(Error::error());
            }
            self.output_alive_listener = None;
        }

        if self.input_source_listener.is_some() {
            let rv = stm.remove_device_listener(self.input_source_listener.as_ref().unwrap());
            if rv != NO_ERR {
                cubeb_log!("AudioObjectRemovePropertyListener/input/kAudioDevicePropertyDataSource rv={}, device id={}", rv, self.input_device.id);
                r = Err(Error::error());
            }
            self.input_source_listener = None;
        }

        if self.input_alive_listener.is_some() {
            let rv = stm.remove_device_listener(self.input_alive_listener.as_ref().unwrap());
            if rv != NO_ERR {
                cubeb_log!("AudioObjectRemovePropertyListener/input/kAudioDevicePropertyDeviceIsAlive rv={}, device id={}", rv, self.input_device.id);
                r = Err(Error::error());
            }
            self.input_alive_listener = None;
        }

        r
    }

    fn uninstall_system_changed_callback(&mut self) -> Result<()> {
        self.debug_assert_is_on_stream_queue();
        if self.stm_ptr.is_null() {
            assert!(
                self.default_output_listener.is_none() && self.default_input_listener.is_none()
            );
            return Ok(());
        }

        let stm = unsafe { &(*self.stm_ptr) };

        if self.default_output_listener.is_some() {
            let r = stm.remove_device_listener(self.default_output_listener.as_ref().unwrap());
            if r != NO_ERR {
                return Err(Error::error());
            }
            self.default_output_listener = None;
        }

        if self.default_input_listener.is_some() {
            let r = stm.remove_device_listener(self.default_input_listener.as_ref().unwrap());
            if r != NO_ERR {
                return Err(Error::error());
            }
            self.default_input_listener = None;
        }

        Ok(())
    }

    fn get_output_channel_layout(&self) -> Result<Vec<mixer::Channel>> {
        self.debug_assert_is_on_stream_queue();
        assert!(!self.output_unit.is_null());
        if self.using_voice_processing_unit() {
            return Ok(get_channel_order(ChannelLayout::MONO));
        }
        get_channel_layout(self.output_unit)
    }
}

impl<'ctx> Drop for CoreStreamData<'ctx> {
    fn drop(&mut self) {
        self.debug_assert_is_on_stream_queue();
        self.stop_audiounits();
        self.close();
    }
}

#[derive(Debug, Clone)]
struct OutputCallbackTimingData {
    frames_queued: u64,
    timestamp: u64,
    buffer_size: u64,
}

// The fisrt two members of the Cubeb stream must be a pointer to its Cubeb context and a void user
// defined pointer. The Cubeb interface use this assumption to operate the Cubeb APIs.
// #[repr(C)] is used to prevent any padding from being added in the beginning of the AudioUnitStream.
#[repr(C)]
#[derive(Debug)]
struct AudioUnitStream<'ctx> {
    context: &'ctx mut AudioUnitContext,
    user_ptr: *mut c_void,
    // Task queue for the stream.
    queue: Queue,

    data_callback: ffi::cubeb_data_callback,
    state_callback: ffi::cubeb_state_callback,
    device_changed_callback: Mutex<ffi::cubeb_device_changed_callback>,
    // Frame counters
    frames_queued: u64,
    // How many frames got read from the input since the stream started (includes
    // padded silence)
    frames_read: AtomicUsize,
    // How many frames got written to the output device since the stream started
    frames_written: AtomicUsize,
    stopped: AtomicBool,
    draining: AtomicBool,
    reinit_pending: AtomicBool,
    delayed_reinit: bool,
    destroy_pending: AtomicBool,
    // Latency requested by the user.
    latency_frames: u32,
    // Fixed latency, characteristic of the device.
    output_device_latency_frames: AtomicU32,
    input_device_latency_frames: AtomicU32,
    // Total latency: the latency of the device + the OS latency
    total_output_latency_frames: AtomicU32,
    total_input_latency_frames: AtomicU32,
    output_callback_timing_data_read: triple_buffer::Output<OutputCallbackTimingData>,
    output_callback_timing_data_write: triple_buffer::Input<OutputCallbackTimingData>,
    prev_position: u64,
    // This is true if a device change callback is currently running.
    switching_device: AtomicBool,
    core_stream_data: CoreStreamData<'ctx>,
}

impl<'ctx> AudioUnitStream<'ctx> {
    fn new(
        context: &'ctx mut AudioUnitContext,
        user_ptr: *mut c_void,
        data_callback: ffi::cubeb_data_callback,
        state_callback: ffi::cubeb_state_callback,
        latency_frames: u32,
    ) -> Self {
        let output_callback_timing_data =
            triple_buffer::TripleBuffer::new(OutputCallbackTimingData {
                frames_queued: 0,
                timestamp: 0,
                buffer_size: 0,
            });
        let (output_callback_timing_data_write, output_callback_timing_data_read) =
            output_callback_timing_data.split();
        let queue = context.serial_queue.clone();
        AudioUnitStream {
            context,
            user_ptr,
            queue,
            data_callback,
            state_callback,
            device_changed_callback: Mutex::new(None),
            frames_queued: 0,
            frames_read: AtomicUsize::new(0),
            frames_written: AtomicUsize::new(0),
            stopped: AtomicBool::new(true),
            draining: AtomicBool::new(false),
            reinit_pending: AtomicBool::new(false),
            delayed_reinit: false,
            destroy_pending: AtomicBool::new(false),
            latency_frames,
            output_device_latency_frames: AtomicU32::new(0),
            input_device_latency_frames: AtomicU32::new(0),
            total_output_latency_frames: AtomicU32::new(0),
            total_input_latency_frames: AtomicU32::new(0),
            output_callback_timing_data_write,
            output_callback_timing_data_read,
            prev_position: 0,
            switching_device: AtomicBool::new(false),
            core_stream_data: CoreStreamData::default(),
        }
    }

    fn add_device_listener(&self, listener: &device_property_listener) -> OSStatus {
        self.queue.debug_assert_is_current();
        audio_object_add_property_listener(
            listener.device,
            &listener.property,
            listener.listener,
            self as *const Self as *mut c_void,
        )
    }

    fn remove_device_listener(&self, listener: &device_property_listener) -> OSStatus {
        self.queue.debug_assert_is_current();
        audio_object_remove_property_listener(
            listener.device,
            &listener.property,
            listener.listener,
            self as *const Self as *mut c_void,
        )
    }

    fn notify_state_changed(&self, state: State) {
        if self.state_callback.is_none() {
            return;
        }
        let callback = self.state_callback.unwrap();
        unsafe {
            callback(
                self as *const AudioUnitStream as *mut ffi::cubeb_stream,
                self.user_ptr,
                state.into(),
            );
        }
    }

    fn reinit(&mut self) -> Result<()> {
        self.queue.debug_assert_is_current();
        // Call stop_audiounits to avoid potential data race. If there is a running data callback,
        // which locks a mutex inside CoreAudio framework, then this call will block the current
        // thread until the callback is finished since this call asks to lock a mutex inside
        // CoreAudio framework that is used by the data callback.
        if !self.stopped.load(Ordering::SeqCst) {
            self.core_stream_data.stop_audiounits();
        }

        if self.stopped.load(Ordering::SeqCst) {
            // Something stopped the stream, reinit on next start
            self.delayed_reinit = true;
            return Ok(());
        }

        debug_assert!(
            !self.core_stream_data.input_unit.is_null()
                || !self.core_stream_data.output_unit.is_null()
        );
        let vol_rv = if self.core_stream_data.output_unit.is_null() {
            Err(Error::error())
        } else {
            get_volume(self.core_stream_data.output_unit)
        };

        self.core_stream_data.close();

        // Use the new default device if this stream was set to follow the output device.
        if self.core_stream_data.has_output()
            && self
                .core_stream_data
                .output_device
                .flags
                .contains(device_flags::DEV_SELECTED_DEFAULT)
        {
            cubeb_log!("Using new default output device");
            self.core_stream_data.output_device =
                match create_device_info(kAudioObjectUnknown, DeviceType::OUTPUT) {
                    None => {
                        cubeb_log!("Fail to create device info for output");
                        return Err(Error::error());
                    }
                    Some(d) => d,
                };
        }

        // Likewise, for the input side
        if self.core_stream_data.has_input()
            && self
                .core_stream_data
                .input_device
                .flags
                .contains(device_flags::DEV_SELECTED_DEFAULT)
        {
            cubeb_log!("Using new default input device");
            self.core_stream_data.input_device =
                match create_device_info(kAudioObjectUnknown, DeviceType::INPUT) {
                    None => {
                        cubeb_log!("Fail to create device info for input");
                        return Err(Error::error());
                    }
                    Some(d) => d,
                }
        }

        cubeb_log!("Reinit: setup");
        self.core_stream_data
            .setup(&mut self.context.shared_voice_processing_unit)
            .inspect_err(|_| {
                cubeb_log!("({:p}) Setup failed.", self.core_stream_data.stm_ptr);
            })?;

        if let Ok(volume) = vol_rv {
            set_volume(self.core_stream_data.output_unit, volume);
        }

        // If the stream was running, start it again.
        if !self.stopped.load(Ordering::SeqCst) {
            self.core_stream_data.start_audiounits().inspect_err(|_| {
                cubeb_log!(
                    "({:p}) Start audiounit failed.",
                    self.core_stream_data.stm_ptr
                );
            })?;
        }

        Ok(())
    }

    fn reinit_async(&mut self) {
        if self.reinit_pending.swap(true, Ordering::SeqCst) {
            // A reinit task is already pending, nothing more to do.
            cubeb_log!(
                "({:p}) re-init stream task already pending, cancelling request",
                self as *const AudioUnitStream
            );
            return;
        }

        let queue = self.queue.clone();
        // Use a new thread, through the queue, to avoid deadlock when calling
        // Get/SetProperties method from inside notify callback
        queue.run_async(move || {
            cubeb_log!("Reinitialization of stream");
            let stm_ptr = self as *const AudioUnitStream;
            if self.destroy_pending.load(Ordering::SeqCst) {
                cubeb_log!(
                    "({:p}) stream pending destroy, cancelling reinit task",
                    stm_ptr
                );
                return;
            }

            if self.reinit().is_err() {
                self.core_stream_data.close();
                self.notify_state_changed(State::Error);
                cubeb_log!(
                    "({:p}) Could not reopen the stream after switching.",
                    stm_ptr
                );
            }
            self.switching_device.store(false, Ordering::SeqCst);
            self.reinit_pending.store(false, Ordering::SeqCst);
        });
    }

    fn close_on_error(&mut self) {
        self.queue.debug_assert_is_current();
        let stm_ptr = self as *const AudioUnitStream;

        self.core_stream_data.close();
        self.notify_state_changed(State::Error);
        cubeb_log!("({:p}) Close the stream due to an error.", stm_ptr);

        self.switching_device.store(false, Ordering::SeqCst);
    }

    fn destroy_internal(&mut self) {
        self.queue.debug_assert_is_current();
        self.core_stream_data.close();
        assert!(self.context.active_streams() >= 1);
        self.context.update_latency_by_removing_stream();
    }

    fn destroy(&mut self) {
        self.queue.debug_assert_is_current();
        if self
            .core_stream_data
            .uninstall_system_changed_callback()
            .is_err()
        {
            cubeb_log!(
                "({:p}) Could not uninstall the system changed callback",
                self as *const AudioUnitStream
            );
        }

        if self
            .core_stream_data
            .uninstall_device_changed_callback()
            .is_err()
        {
            cubeb_log!(
                "({:p}) Could not uninstall all device change listeners",
                self as *const AudioUnitStream
            );
        }

        // Execute the stream destroy work.
        self.destroy_pending.store(true, Ordering::SeqCst);

        // Call stop_audiounits to avoid potential data race. If there is a running data callback,
        // which locks a mutex inside CoreAudio framework, then this call will block the current
        // thread until the callback is finished since this call asks to lock a mutex inside
        // CoreAudio framework that is used by the data callback.
        if !self.stopped.swap(true, Ordering::SeqCst) {
            self.core_stream_data.stop_audiounits();
        }

        self.destroy_internal();

        cubeb_log!(
            "Cubeb stream ({:p}) destroyed successful.",
            self as *const AudioUnitStream
        );
    }
}

impl<'ctx> Drop for AudioUnitStream<'ctx> {
    fn drop(&mut self) {
        // Execute destroy in serial queue to avoid collision with reinit when un/plug devices
        self.queue.clone().run_final(|| {
            self.destroy();
            self.core_stream_data = CoreStreamData::default();
        });
    }
}

impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
    fn start(&mut self) -> Result<()> {
        let was_stopped = self.stopped.load(Ordering::SeqCst);
        let was_draining = self.draining.load(Ordering::SeqCst);
        self.stopped.store(false, Ordering::SeqCst);
        self.draining.store(false, Ordering::SeqCst);

        self.queue
            .clone()
            .run_sync(|| -> Result<()> {
                // Need reinitialization: device was changed when paused. It will be started after
                // reinit because self.stopped is false.
                if self.delayed_reinit {
                    let rv = self.reinit().inspect_err(|_| {
                        cubeb_log!(
                            "({:p}) delayed reinit during start failed.",
                            self.core_stream_data.stm_ptr
                        );
                    });
                    // In case of failure, restore the state
                    if rv.is_err() {
                        self.stopped.store(was_stopped, Ordering::SeqCst);
                        self.draining.store(was_draining, Ordering::SeqCst);
                        return rv;
                    }
                    self.delayed_reinit = false;
                    Ok(())
                } else {
                    // Execute start in serial queue to avoid racing with destroy or reinit.
                    let rv = self.core_stream_data.start_audiounits();
                    if rv.is_err() {
                        cubeb_log!("({:p}) start failed.", self.core_stream_data.stm_ptr);
                        self.stopped.store(was_stopped, Ordering::SeqCst);
                        self.draining.store(was_draining, Ordering::SeqCst);
                        return rv;
                    }
                    Ok(())
                }
            })
            .unwrap()?;

        self.notify_state_changed(State::Started);

        cubeb_log!(
            "Cubeb stream ({:p}) started successfully.",
            self as *const AudioUnitStream
        );
        Ok(())
    }
    fn stop(&mut self) -> Result<()> {
        if !self.stopped.swap(true, Ordering::SeqCst) {
            // Execute stop in serial queue to avoid racing with destroy or reinit.
            self.queue
                .clone()
                .run_sync(|| self.core_stream_data.stop_audiounits());

            self.notify_state_changed(State::Stopped);

            cubeb_log!(
                "Cubeb stream ({:p}) stopped successfully.",
                self as *const AudioUnitStream
            );
        }
        Ok(())
    }
    fn position(&mut self) -> Result<u64> {
        let OutputCallbackTimingData {
            frames_queued,
            timestamp,
            buffer_size,
        } = self.output_callback_timing_data_read.read().clone();
        let total_output_latency_frames =
            u64::from(self.total_output_latency_frames.load(Ordering::SeqCst));
        // If output latency is available, take it into account. Otherwise, use the number of
        // frames played.
        let position = if total_output_latency_frames != 0 {
            if total_output_latency_frames > frames_queued {
                0
            } else {
                // Interpolate here to match other cubeb backends. Only return an interpolated time
                // if we've played enough frames. If the stream is paused, clamp the interpolated
                // number of frames to the buffer size.
                const NS2S: u64 = 1_000_000_000;
                let now = unsafe { mach_absolute_time() };
                let diff = now - timestamp;
                let interpolated_frames = cmp::min(
                    host_time_to_ns(self.context, diff)
                        * self.core_stream_data.output_stream_params.rate() as u64
                        / NS2S,
                    buffer_size,
                );
                (frames_queued - total_output_latency_frames) + interpolated_frames
            }
        } else {
            frames_queued
        };

        // Ensure mononicity of the clock even when changing output device.
        if position > self.prev_position {
            self.prev_position = position;
        }
        Ok(self.prev_position)
    }
    #[cfg(target_os = "ios")]
    fn latency(&mut self) -> Result<u32> {
        Err(not_supported())
    }
    #[cfg(not(target_os = "ios"))]
    fn latency(&mut self) -> Result<u32> {
        Ok(self.total_output_latency_frames.load(Ordering::SeqCst))
    }
    #[cfg(target_os = "ios")]
    fn input_latency(&mut self) -> Result<u32> {
        Err(not_supported())
    }
    #[cfg(not(target_os = "ios"))]
    fn input_latency(&mut self) -> Result<u32> {
        let user_rate = self.core_stream_data.input_stream_params.rate();
        let hw_rate = self.core_stream_data.input_dev_desc.mSampleRate as u32;
        let frames = self.total_input_latency_frames.load(Ordering::SeqCst);
        if frames != 0 {
            if hw_rate == user_rate {
                Ok(frames)
            } else {
                Ok((frames * user_rate) / hw_rate)
            }
        } else {
            Err(Error::error())
        }
    }
    fn set_volume(&mut self, volume: f32) -> Result<()> {
        // Execute set_volume in serial queue to avoid racing with destroy or reinit.
        let result = self
            .queue
            .run_sync(|| set_volume(self.core_stream_data.output_unit, volume))
            .unwrap();

        result?;

        cubeb_log!(
            "Cubeb stream ({:p}) set volume to {}.",
            self as *const AudioUnitStream,
            volume
        );
        Ok(())
    }
    fn set_name(&mut self, _: &CStr) -> Result<()> {
        Err(Error::not_supported())
    }
    fn current_device(&mut self) -> Result<&DeviceRef> {
        Err(Error::not_supported())
    }
    fn set_input_mute(&mut self, mute: bool) -> Result<()> {
        if self.core_stream_data.input_unit.is_null() {
            return Err(Error::invalid_parameter());
        }

        if !self.core_stream_data.using_voice_processing_unit() {
            return Err(Error::error());
        }

        // Execute set_input_mute in serial queue to avoid racing with destroy or reinit.
        let mut result = Err(Error::error());
        let set = &mut result;
        let stream = &self;
        self.queue.run_sync(move || {
            *set = set_input_mute(stream.core_stream_data.input_unit, mute);
        });

        result?;

        cubeb_log!(
            "Cubeb stream ({:p}) set input mute to {}.",
            self as *const AudioUnitStream,
            mute
        );
        self.core_stream_data.input_mute = mute;
        Ok(())
    }
    fn set_input_processing_params(&mut self, params: InputProcessingParams) -> Result<()> {
        // CUBEB_ERROR_INVALID_PARAMETER if a given param is not supported by
        // this backend, or if this stream does not have an input device
        if self.core_stream_data.input_unit.is_null() {
            return Err(Error::invalid_parameter());
        }

        if self
            .context
            .supported_input_processing_params()
            .unwrap()
            .intersection(params)
            != params
        {
            return Err(Error::invalid_parameter());
        }

        // AEC and NS are active as soon as VPIO is not bypassed, therefore the only combinations
        // of those we can explicitly support are {} and {aec, ns}.
        let aec = params.contains(InputProcessingParams::ECHO_CANCELLATION);
        let ns = params.contains(InputProcessingParams::NOISE_SUPPRESSION);
        if aec != ns {
            // No control to turn on AEC without NS or vice versa.
            cubeb_log!(
                "Cubeb stream ({:p}) couldn't set input processing params {:?}. AEC != NS.",
                self as *const AudioUnitStream,
                params
            );
            return Err(Error::error());
        }

        // CUBEB_ERROR if params could not be applied
        //   note: only works with VoiceProcessingIO
        if !self.core_stream_data.using_voice_processing_unit() {
            return Err(Error::error());
        }

        // Execute set_input_processing_params in serial queue to avoid racing with destroy or reinit.
        let mut result = Err(Error::error());
        let result_ = &mut result;
        let mut deferred = false;
        let deferred_ = &mut deferred;
        let stream = &self;
        self.queue.run_sync(move || {
            if stream.core_stream_data.units_running {
                *deferred_ = true;
                *result_ = Ok(());
            } else {
                *deferred_ = false;
                *result_ = set_input_processing_params(stream.core_stream_data.input_unit, params);
            }
        });

        result?;

        cubeb_log!(
            "Cubeb stream ({:p}) {} input processing params {:?}.",
            self as *const AudioUnitStream,
            if deferred { "deferred" } else { "set" },
            params
        );
        self.core_stream_data.input_processing_params = params;
        Ok(())
    }
    #[cfg(target_os = "ios")]
    fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
        Err(not_supported())
    }
    #[cfg(not(target_os = "ios"))]
    fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
        if device.as_ptr().is_null() {
            Err(Error::error())
        } else {
            unsafe {
                let mut dev: Box<ffi::cubeb_device> = Box::from_raw(device.as_ptr() as *mut _);
                if !dev.output_name.is_null() {
                    let _ = CString::from_raw(dev.output_name as *mut _);
                    dev.output_name = ptr::null_mut();
                }
                if !dev.input_name.is_null() {
                    let _ = CString::from_raw(dev.input_name as *mut _);
                    dev.input_name = ptr::null_mut();
                }
                drop(dev);
            }
            Ok(())
        }
    }
    fn register_device_changed_callback(
        &mut self,
        device_changed_callback: ffi::cubeb_device_changed_callback,
    ) -> Result<()> {
        let mut callback = self.device_changed_callback.lock().unwrap();
        // Note: second register without unregister first causes 'nope' error.
        // Current implementation requires unregister before register a new cb.
        if device_changed_callback.is_some() && callback.is_some() {
            Err(Error::invalid_parameter())
        } else {
            *callback = device_changed_callback;
            Ok(())
        }
    }
}

#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<'ctx> Send for AudioUnitStream<'ctx> {}
unsafe impl<'ctx> Sync for AudioUnitStream<'ctx> {}

#[cfg(test)]
mod tests;

[Verzeichnis aufwärts0.155unsichere VerbindungÜbersetzung europäischer Sprachen durch Browser2026-04-25]

                                                                                                                                                                                                                                                                                                                                                                                                     


Quelle

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....
    

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge