Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Firefox/dom/media/webrtc/transportbridge/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 56 kB image not shown  

Quelle  MediaPipeline.cpp   Sprache: C

 
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
 * You can obtain one at http://mozilla.org/MPL/2.0/. */


// Original author: ekr@rtfm.com

#include "MediaPipeline.h"

#include <inttypes.h>
#include <math.h>
#include <sstream>
#include <utility>

#include "AudioSegment.h"
#include "AudioConverter.h"
#include "DOMMediaStream.h"
#include "ImageContainer.h"
#include "ImageTypes.h"
#include "MediaEngine.h"
#include "MediaSegment.h"
#include "MediaTrackGraph.h"
#include "MediaTrackListener.h"
#include "MediaStreamTrack.h"
#include "RtpLogger.h"
#include "VideoFrameConverter.h"
#include "VideoSegment.h"
#include "VideoStreamTrack.h"
#include "VideoUtils.h"
#include "mozilla/Logging.h"
#include "mozilla/NullPrincipal.h"
#include "mozilla/PeerIdentity.h"
#include "mozilla/Preferences.h"
#include "mozilla/SharedThreadPool.h"
#include "mozilla/Sprintf.h"
#include "mozilla/StaticPrefs_media.h"
#include "mozilla/TaskQueue.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/UniquePtrExtensions.h"
#include "mozilla/dom/RTCStatsReportBinding.h"
#include "mozilla/dom/Document.h"
#include "mozilla/gfx/Point.h"
#include "mozilla/gfx/Types.h"
#include "nsError.h"
#include "nsThreadUtils.h"
#include "transport/runnable_utils.h"
#include "jsapi/MediaTransportHandler.h"
#include "jsapi/PeerConnectionImpl.h"
#include "Tracing.h"
#include "libwebrtcglue/WebrtcImageBuffer.h"
#include "libwebrtcglue/MediaConduitInterface.h"
#include "common_video/include/video_frame_buffer.h"
#include "modules/rtp_rtcp/include/rtp_rtcp.h"
#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
#include "modules/rtp_rtcp/source/rtp_packet_received.h"

// Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
// 48KHz)
#define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
                  AUDIO_SAMPLE_BUFFER_MAX_BYTES,
              "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");

using namespace mozilla;
using namespace mozilla::dom;
using namespace mozilla::gfx;
using namespace mozilla::layers;

mozilla::LazyLogModule gMediaPipelineLog("MediaPipeline");

namespace mozilla {

// An async inserter for audio data, to avoid running audio codec encoders
// on the MTG/input audio thread.  Basically just bounces all the audio
// data to a single audio processing/input queue.  We could if we wanted to
// use multiple threads and a TaskQueue.
class AudioProxyThread {
 public:
  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)

  explicit AudioProxyThread(RefPtr<AudioSessionConduit> aConduit)
      : mConduit(std::move(aConduit)),
        mTaskQueue(TaskQueue::Create(
            GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER), "AudioProxy")),
        mAudioConverter(nullptr) {
    MOZ_ASSERT(mConduit);
    MOZ_COUNT_CTOR(AudioProxyThread);
  }

  // This function is the identity if aInputRate is supported.
  // Else, it returns a rate that is supported, that ensure no loss in audio
  // quality: the sampling rate returned is always greater to the inputed
  // sampling-rate, if they differ..
  uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
    AudioSessionConduit* conduit =
        static_cast<AudioSessionConduit*>(mConduit.get());
    if (conduit->IsSamplingFreqSupported(aInputRate)) {
      return aInputRate;
    }
    if (aInputRate < 16000) {
      return 16000;
    }
    if (aInputRate < 32000) {
      return 32000;
    }
    if (aInputRate < 44100) {
      return 44100;
    }
    return 48000;
  }

  // From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
  // something the conduit can work with (or send silence if the track is not
  // enabled), and send the audio in 10ms chunks to the conduit.
  void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
                                 bool aEnabled) {
    MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());

    // Convert to interleaved 16-bits integer audio, with a maximum of two
    // channels (since the WebRTC.org code below makes the assumption that the
    // input audio is either mono or stereo), with a sample-rate rate that is
    // 16, 32, 44.1, or 48kHz.
    uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
    int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);

    // We take advantage of the fact that the common case (microphone directly
    // to PeerConnection, that is, a normal call), the samples are already
    // 16-bits mono, so the representation in interleaved and planar is the
    // same, and we can just use that.
    if (aEnabled && outputChannels == 1 &&
        aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
      const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
      PacketizeAndSend(samples, transmissionRate, outputChannels,
                       aChunk.mDuration);
      return;
    }

    uint32_t sampleCount = aChunk.mDuration * outputChannels;
    if (mInterleavedAudio.Length() < sampleCount) {
      mInterleavedAudio.SetLength(sampleCount);
    }

    if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
      PodZero(mInterleavedAudio.Elements(), sampleCount);
    } else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
      DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
                           aChunk.mVolume, outputChannels,
                           mInterleavedAudio.Elements());
    } else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
      DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
                           aChunk.mVolume, outputChannels,
                           mInterleavedAudio.Elements());
    }
    int16_t* inputAudio = mInterleavedAudio.Elements();
    size_t inputAudioFrameCount = aChunk.mDuration;

    AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
                            AudioConfig::FORMAT_S16);
    AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
                             transmissionRate, AudioConfig::FORMAT_S16);
    // Resample to an acceptable sample-rate for the sending side
    if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
        mAudioConverter->OutputConfig() != outputConfig) {
      mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
    }

    int16_t* processedAudio = nullptr;
    size_t framesProcessed =
        mAudioConverter->Process(inputAudio, inputAudioFrameCount);

    if (framesProcessed == 0) {
      // In place conversion not possible, use a buffer.
      framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
                                                 inputAudioFrameCount);
      processedAudio = mOutputAudio.Data();
    } else {
      processedAudio = inputAudio;
    }

    PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
                     framesProcessed);
  }

  // This packetizes aAudioData in 10ms chunks and sends it.
  // aAudioData is interleaved audio data at a rate and with a channel count
  // that is appropriate to send with the conduit.
  void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
                        uint32_t aChannels, uint32_t aFrameCount) {
    MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
    MOZ_ASSERT(aChannels == 1 || aChannels == 2);
    MOZ_ASSERT(aAudioData);

    uint32_t audio_10ms = aRate / 100;

    if (!mPacketizer || mPacketizer->mPacketSize != audio_10ms ||
        mPacketizer->mChannels != aChannels) {
      // It's the right thing to drop the bit of audio still in the packetizer:
      // we don't want to send to the conduit audio that has two different
      // rates while telling it that it has a constante rate.
      mPacketizer =
          MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
      mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
    }

    mPacketizer->Input(aAudioData, aFrameCount);

    while (mPacketizer->PacketsAvailable()) {
      mPacketizer->Output(mPacket.get());
      auto frame = std::make_unique<webrtc::AudioFrame>();
      // UpdateFrame makes a copy of the audio data.
      frame->UpdateFrame(frame->timestamp_, mPacket.get(),
                         mPacketizer->mPacketSize, aRate, frame->speech_type_,
                         frame->vad_activity_, mPacketizer->mChannels);
      mConduit->SendAudioFrame(std::move(frame));
    }
  }

  void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
                       bool aEnabled) {
    RefPtr<AudioProxyThread> self = this;
    nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
        "AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
          self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
        }));
    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
    Unused << rv;
  }

 protected:
  virtual ~AudioProxyThread() { MOZ_COUNT_DTOR(AudioProxyThread); }

  const RefPtr<AudioSessionConduit> mConduit;
  const RefPtr<TaskQueue> mTaskQueue;
  // Only accessed on mTaskQueue
  UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
  // A buffer to hold a single packet of audio.
  UniquePtr<int16_t[]> mPacket;
  nsTArray<int16_t> mInterleavedAudio;
  AlignedShortBuffer mOutputAudio;
  UniquePtr<AudioConverter> mAudioConverter;
};

#define INIT_MIRROR(name, val) \
  name(AbstractThread::MainThread(), val, "MediaPipeline::" #name " (Mirror)")

MediaPipeline::MediaPipeline(const std::string& aPc,
                             RefPtr<MediaTransportHandler> aTransportHandler,
                             DirectionType aDirection,
                             RefPtr<AbstractThread> aCallThread,
                             RefPtr<nsISerialEventTarget> aStsThread,
                             RefPtr<MediaSessionConduit> aConduit)
    : mConduit(std::move(aConduit)),
      mDirection(aDirection),
      mCallThread(std::move(aCallThread)),
      mStsThread(std::move(aStsThread)),
      INIT_MIRROR(mActive, false),
      mLevel(0),
      mTransportHandler(std::move(aTransportHandler)),
      mRtpPacketsSent(0),
      mRtcpPacketsSent(0),
      mRtpPacketsReceived(0),
      mRtpBytesSent(0),
      mRtpBytesReceived(0),
      mPc(aPc),
      mRtpHeaderExtensionMap(new webrtc::RtpHeaderExtensionMap()),
      mPacketDumper(PacketDumper::GetPacketDumper(mPc)) {}

#undef INIT_MIRROR

MediaPipeline::~MediaPipeline() {
  MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
          ("Destroying MediaPipeline: %s", mDescription.c_str()));
}

void MediaPipeline::Shutdown() {
  MOZ_ASSERT(NS_IsMainThread());

  mActive.DisconnectIfConnected();
  RUN_ON_THREAD(mStsThread,
                WrapRunnable(RefPtr<MediaPipeline>(this),
                             &MediaPipeline::DetachTransport_s),
                NS_DISPATCH_NORMAL);
}

void MediaPipeline::DetachTransport_s() {
  ASSERT_ON_THREAD(mStsThread);

  MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
          ("%s in %s", mDescription.c_str(), __FUNCTION__));

  disconnect_all();
  mRtpState = TransportLayer::TS_NONE;
  mRtcpState = TransportLayer::TS_NONE;
  mTransportId.clear();
  mConduit->SetTransportActive(false);
  mRtpSendEventListener.DisconnectIfExists();
  mSenderRtcpSendEventListener.DisconnectIfExists();
  mReceiverRtcpSendEventListener.DisconnectIfExists();
}

void MediaPipeline::UpdateTransport_m(
    const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
  mStsThread->Dispatch(NS_NewRunnableFunction(
      __func__, [aTransportId, filter = std::move(aFilter),
                 self = RefPtr<MediaPipeline>(this)]() mutable {
        self->UpdateTransport_s(aTransportId, std::move(filter));
      }));
}

void MediaPipeline::UpdateTransport_s(
    const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
  ASSERT_ON_THREAD(mStsThread);
  if (!mSignalsConnected) {
    mTransportHandler->SignalStateChange.connect(
        this, &MediaPipeline::RtpStateChange);
    mTransportHandler->SignalRtcpStateChange.connect(
        this, &MediaPipeline::RtcpStateChange);
    mTransportHandler->SignalEncryptedSending.connect(
        this, &MediaPipeline::EncryptedPacketSending);
    mTransportHandler->SignalPacketReceived.connect(
        this, &MediaPipeline::PacketReceived);
    mTransportHandler->SignalAlpnNegotiated.connect(
        this, &MediaPipeline::AlpnNegotiated);
    mSignalsConnected = true;
  }

  if (aTransportId != mTransportId) {
    mTransportId = aTransportId;
    mRtpState = mTransportHandler->GetState(mTransportId, false);
    mRtcpState = mTransportHandler->GetState(mTransportId, true);
    CheckTransportStates();
  }

  if (mFilter) {
    for (const auto& extension : mFilter->GetExtmap()) {
      mRtpHeaderExtensionMap->Deregister(extension.uri);
    }
  }
  if (mFilter && aFilter) {
    // Use the new filter, but don't forget any remote SSRCs that we've learned
    // by receiving traffic.
    mFilter->Update(*aFilter);
  } else {
    mFilter = std::move(aFilter);
  }
  if (mFilter) {
    for (const auto& extension : mFilter->GetExtmap()) {
      mRtpHeaderExtensionMap->RegisterByUri(extension.id, extension.uri);
    }
  }
}

void MediaPipeline::GetContributingSourceStats(
    const nsString& aInboundRtpStreamId,
    FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
  ASSERT_ON_THREAD(mStsThread);
  // Get the expiry from now
  DOMHighResTimeStamp expiry =
      RtpCSRCStats::GetExpiryFromTime(GetTimestampMaker().GetNow().ToDom());
  for (auto info : mCsrcStats) {
    if (!info.second.Expired(expiry)) {
      RTCRTPContributingSourceStats stats;
      info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
      if (!aArr.AppendElement(stats, fallible)) {
        mozalloc_handle_oom(0);
      }
    }
  }
}

void MediaPipeline::RtpStateChange(const std::string& aTransportId,
                                   TransportLayer::State aState) {
  if (mTransportId != aTransportId) {
    return;
  }
  mRtpState = aState;
  CheckTransportStates();
}

void MediaPipeline::RtcpStateChange(const std::string& aTransportId,
                                    TransportLayer::State aState) {
  if (mTransportId != aTransportId) {
    return;
  }
  mRtcpState = aState;
  CheckTransportStates();
}

void MediaPipeline::CheckTransportStates() {
  ASSERT_ON_THREAD(mStsThread);

  if (mRtpState == TransportLayer::TS_CLOSED ||
      mRtpState == TransportLayer::TS_ERROR ||
      mRtcpState == TransportLayer::TS_CLOSED ||
      mRtcpState == TransportLayer::TS_ERROR) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Warning,
            ("RTP Transport failed for pipeline %p flow %s"this,
             mDescription.c_str()));

    NS_WARNING(
        "MediaPipeline Transport failed. This is not properly cleaned up yet");
    // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
    // connection was good and now it is bad.
    // TODO(ekr@rtfm.com): Report up so that the PC knows we
    // have experienced an error.
    mConduit->SetTransportActive(false);
    mRtpSendEventListener.DisconnectIfExists();
    mSenderRtcpSendEventListener.DisconnectIfExists();
    mReceiverRtcpSendEventListener.DisconnectIfExists();
    return;
  }

  if (mRtpState == TransportLayer::TS_OPEN) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
            ("RTP Transport ready for pipeline %p flow %s"this,
             mDescription.c_str()));
  }

  if (mRtcpState == TransportLayer::TS_OPEN) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
            ("RTCP Transport ready for pipeline %p flow %s"this,
             mDescription.c_str()));
  }

  if (mRtpState == TransportLayer::TS_OPEN && mRtcpState == mRtpState) {
    if (mDirection == DirectionType::TRANSMIT) {
      mRtpSendEventListener = mConduit->SenderRtpSendEvent().Connect(
          mStsThread, this, &MediaPipeline::SendPacket);
      mSenderRtcpSendEventListener = mConduit->SenderRtcpSendEvent().Connect(
          mStsThread, this, &MediaPipeline::SendPacket);
    } else {
      mConduit->ConnectReceiverRtpEvent(mRtpReceiveEvent);
      mReceiverRtcpSendEventListener =
          mConduit->ReceiverRtcpSendEvent().Connect(mStsThread, this,
                                                    &MediaPipeline::SendPacket);
    }
    mConduit->SetTransportActive(true);
    TransportReady_s();
  }
}

void MediaPipeline::SendPacket(MediaPacket&& aPacket) {
  ASSERT_ON_THREAD(mStsThread);

  const bool isRtp = aPacket.type() == MediaPacket::RTP;

  if (isRtp && mRtpState != TransportLayer::TS_OPEN) {
    return;
  }

  if (!isRtp && mRtcpState != TransportLayer::TS_OPEN) {
    return;
  }

  aPacket.sdp_level() = Some(Level());

  if (RtpLogger::IsPacketLoggingOn()) {
    RtpLogger::LogPacket(aPacket, false, mDescription);
  }

  if (isRtp) {
    mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtp, true,
                        aPacket.data(), aPacket.len());
    IncrementRtpPacketsSent(aPacket);
  } else {
    mPacketDumper->Dump(Level(), dom::mozPacketDumpType::Rtcp, true,
                        aPacket.data(), aPacket.len());
    IncrementRtcpPacketsSent();
  }

  MOZ_LOG(
      gMediaPipelineLog, LogLevel::Debug,
      ("%s sending %s packet", mDescription.c_str(), (isRtp ? "RTP" : "RTCP")));

  mTransportHandler->SendPacket(mTransportId, std::move(aPacket));
}

void MediaPipeline::IncrementRtpPacketsSent(const MediaPacket& aPacket) {
  ASSERT_ON_THREAD(mStsThread);
  ++mRtpPacketsSent;
  mRtpBytesSent += aPacket.len();

  if (!(mRtpPacketsSent % 100)) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
            ("RTP sent packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
             mDescription.c_str(), this, mRtpPacketsSent, mRtpBytesSent));
  }
}

void MediaPipeline::IncrementRtcpPacketsSent() {
  ASSERT_ON_THREAD(mStsThread);
  ++mRtcpPacketsSent;
  if (!(mRtcpPacketsSent % 100)) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
            ("RTCP sent packet count for %s Pipeline %p: %u",
             mDescription.c_str(), this, mRtcpPacketsSent));
  }
}

void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
  ASSERT_ON_THREAD(mStsThread);
  ++mRtpPacketsReceived;
  mRtpBytesReceived += aBytes;
  if (!(mRtpPacketsReceived % 100)) {
    MOZ_LOG(
        gMediaPipelineLog, LogLevel::Info,
        ("RTP received packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
         mDescription.c_str(), this, mRtpPacketsReceived, mRtpBytesReceived));
  }
}

void MediaPipeline::PacketReceived(const std::string& aTransportId,
                                   const MediaPacket& packet) {
  ASSERT_ON_THREAD(mStsThread);

  if (mTransportId != aTransportId) {
    return;
  }

  MOZ_ASSERT(mRtpState == TransportLayer::TS_OPEN);

  if (packet.type() != MediaPacket::RTP) {
    return;
  }

  if (mDirection == DirectionType::TRANSMIT) {
    return;
  }

  if (!packet.len()) {
    return;
  }

  webrtc::RTPHeader header;
  rtc::CopyOnWriteBuffer packet_buffer(packet.data(), packet.len());
  webrtc::RtpPacketReceived parsedPacket(mRtpHeaderExtensionMap.get());
  if (!parsedPacket.Parse(packet_buffer)) {
    return;
  }
  parsedPacket.GetHeader(&header);

  if (mFilter && !mFilter->Filter(header)) {
    return;
  }

  auto now = GetTimestampMaker().GetNow();
  parsedPacket.set_arrival_time(now.ToRealtime());
  if (IsVideo()) {
    parsedPacket.set_payload_type_frequency(webrtc::kVideoPayloadTypeFrequency);
  }

  // Remove expired RtpCSRCStats
  if (!mCsrcStats.empty()) {
    auto expiry = RtpCSRCStats::GetExpiryFromTime(now.ToDom());
    for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
      if (p->second.Expired(expiry)) {
        p = mCsrcStats.erase(p);
        continue;
      }
      p++;
    }
  }

  // Add new RtpCSRCStats
  if (header.numCSRCs) {
    for (auto i = 0; i < header.numCSRCs; i++) {
      auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
      if (csrcInfo == mCsrcStats.end()) {
        mCsrcStats.insert(
            std::make_pair(header.arrOfCSRCs[i],
                           RtpCSRCStats(header.arrOfCSRCs[i], now.ToDom())));
      } else {
        csrcInfo->second.SetTimestamp(now.ToDom());
      }
    }
  }

  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
          ("%s received RTP packet.", mDescription.c_str()));
  IncrementRtpPacketsReceived(packet.len());

  RtpLogger::LogPacket(packet, true, mDescription);

  // Might be nice to pass ownership of the buffer in this case, but it is a
  // small optimization in a rare case.
  mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false,
                      packet.encrypted_data(), packet.encrypted_len());

  mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(),
                      packet.len());

  mRtpReceiveEvent.Notify(std::move(parsedPacket), header);
}

void MediaPipeline::AlpnNegotiated(const std::string& aAlpn,
                                   bool aPrivacyRequested) {
  ASSERT_ON_THREAD(mStsThread);

  if (aPrivacyRequested && Direction() == DirectionType::RECEIVE) {
    // This will force the receive pipelines to drop data until they have
    // received a private PrincipalHandle from RTCRtpReceiver (which takes a
    // detour via main thread).
    static_cast<MediaPipelineReceive*>(this)->OnPrivacyRequested_s();
  }
}

void MediaPipeline::EncryptedPacketSending(const std::string& aTransportId,
                                           const MediaPacket& aPacket) {
  ASSERT_ON_THREAD(mStsThread);

  if (mTransportId == aTransportId) {
    dom::mozPacketDumpType type;
    if (aPacket.type() == MediaPacket::SRTP) {
      type = dom::mozPacketDumpType::Srtp;
    } else if (aPacket.type() == MediaPacket::SRTCP) {
      type = dom::mozPacketDumpType::Srtcp;
    } else if (aPacket.type() == MediaPacket::DTLS) {
      // TODO(bug 1497936): Implement packet dump for DTLS
      return;
    } else {
      MOZ_ASSERT(false);
      return;
    }
    mPacketDumper->Dump(Level(), type, true, aPacket.data(), aPacket.len());
  }
}

class MediaPipelineTransmit::PipelineListener
    : public DirectMediaTrackListener {
  friend class MediaPipelineTransmit;

 public:
  explicit PipelineListener(RefPtr<MediaSessionConduit> aConduit)
      : mConduit(std::move(aConduit)),
        mActive(false),
        mEnabled(false),
        mDirectConnect(false) {}

  ~PipelineListener() {
    if (mConverter) {
      mConverter->Shutdown();
    }
  }

  void SetActive(bool aActive) {
    mActive = aActive;
    if (mConverter) {
      mConverter->SetActive(aActive);
    }
  }
  void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }

  // These are needed since nested classes don't have access to any particular
  // instance of the parent
  void SetAudioProxy(RefPtr<AudioProxyThread> aProxy) {
    mAudioProcessing = std::move(aProxy);
  }

  void SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter) {
    mConverter = std::move(aConverter);
  }

  // Implement MediaTrackListener
  void NotifyQueuedChanges(MediaTrackGraph* aGraph, TrackTime aOffset,
                           const MediaSegment& aQueuedMedia) override;
  void NotifyEnabledStateChanged(MediaTrackGraph* aGraph,
                                 bool aEnabled) override;

  // Implement DirectMediaTrackListener
  void NotifyRealtimeTrackData(MediaTrackGraph* aGraph, TrackTime aOffset,
                               const MediaSegment& aMedia) override;
  void NotifyDirectListenerInstalled(InstallationResult aResult) override;
  void NotifyDirectListenerUninstalled() override;

 private:
  void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);

  const RefPtr<MediaSessionConduit> mConduit;
  RefPtr<AudioProxyThread> mAudioProcessing;
  RefPtr<VideoFrameConverter> mConverter;

  // active is true if there is a transport to send on
  mozilla::Atomic<bool> mActive;
  // enabled is true if the media access control permits sending
  // actual content; when false you get black/silence
  mozilla::Atomic<bool> mEnabled;

  // Written and read on the MediaTrackGraph thread
  bool mDirectConnect;
};

MediaPipelineTransmit::MediaPipelineTransmit(
    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
    bool aIsVideo, RefPtr<MediaSessionConduit> aConduit)
    : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::TRANSMIT,
                    std::move(aCallThread), std::move(aStsThread),
                    std::move(aConduit)),
      mWatchManager(this, AbstractThread::MainThread()),
      mIsVideo(aIsVideo),
      mListener(new PipelineListener(mConduit)),
      mDomTrack(nullptr, "MediaPipelineTransmit::mDomTrack"),
      mSendTrackOverride(nullptr, "MediaPipelineTransmit::mSendTrackOverride") {
  if (!IsVideo()) {
    mAudioProcessing =
        MakeAndAddRef<AudioProxyThread>(*mConduit->AsAudioSessionConduit());
    mListener->SetAudioProxy(mAudioProcessing);
  }

  mWatchManager.Watch(mActive, &MediaPipelineTransmit::UpdateSendState);
  mWatchManager.Watch(mDomTrack, &MediaPipelineTransmit::UpdateSendState);
  mWatchManager.Watch(mSendTrackOverride,
                      &MediaPipelineTransmit::UpdateSendState);

  mDescription = GenerateDescription();
}

void MediaPipelineTransmit::RegisterListener() {
  if (!IsVideo()) {
    return;
  }
  RefPtr videoConduit = *mConduit->AsVideoSessionConduit();
  mConverter = VideoFrameConverter::Create(
      TaskQueue::Create(GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER),
                        "VideoFrameConverter")
          .forget(),
      GetTimestampMaker(), videoConduit->LockScaling());
  mConverter->SetIdleFrameDuplicationInterval(TimeDuration::FromSeconds(1));
  videoConduit->SetTrackSource(mConverter);
  mListener->SetVideoFrameConverter(mConverter);
}

already_AddRefed<MediaPipelineTransmit> MediaPipelineTransmit::Create(
    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
    bool aIsVideo, RefPtr<MediaSessionConduit> aConduit) {
  RefPtr<MediaPipelineTransmit> transmit = new MediaPipelineTransmit(
      aPc, std::move(aTransportHandler), std::move(aCallThread),
      std::move(aStsThread), aIsVideo, std::move(aConduit));

  transmit->RegisterListener();

  return transmit.forget();
}

MediaPipelineTransmit::~MediaPipelineTransmit() {
  mFrameListener.DisconnectIfExists();

  MOZ_ASSERT(!mTransmitting);
  MOZ_ASSERT(!mDomTrack.Ref());
}

void MediaPipelineTransmit::InitControl(
    MediaPipelineTransmitControlInterface* aControl) {
  aControl->CanonicalTransmitting().ConnectMirror(&mActive);
}

void MediaPipelineTransmit::Shutdown() {
  MediaPipeline::Shutdown();
  mWatchManager.Shutdown();
  if (mDomTrack.Ref()) {
    mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
    mDomTrack = nullptr;
  }
  mUnsettingSendTrack = false;
  UpdateSendState();
  MOZ_ASSERT(!mTransmitting);
}

void MediaPipeline::SetDescription_s(const std::string& description) {
  ASSERT_ON_THREAD(mStsThread);
  mDescription = description;
}

std::string MediaPipelineTransmit::GenerateDescription() const {
  MOZ_ASSERT(NS_IsMainThread());

  std::stringstream description;
  description << mPc << "| ";
  description << (mIsVideo ? "Transmit video[" : "Transmit audio[");

  if (mDomTrack.Ref()) {
    nsString nsTrackId;
    mDomTrack.Ref()->GetId(nsTrackId);
    description << NS_ConvertUTF16toUTF8(nsTrackId).get();
  } else if (mSendTrackOverride.Ref()) {
    description << "override " << mSendTrackOverride.Ref().get();
  } else {
    description << "no track";
  }

  description << "]";

  return description.str();
}

void MediaPipelineTransmit::UpdateSendState() {
  MOZ_ASSERT(NS_IsMainThread());

  // This runs because either mActive, mDomTrack or mSendTrackOverride changed,
  // or because mSendTrack was unset async. Based on these inputs this method
  // is responsible for hooking up mSendTrack to mListener in order to feed data
  // to the conduit.
  //
  // If we are inactive, or if the send track does not match what we want to
  // send (mDomTrack or mSendTrackOverride), we must stop feeding data to the
  // conduit. NB that removing the listener from mSendTrack is async, and we
  // must wait for it to resolve before adding mListener to another track.
  // mUnsettingSendTrack gates us until the listener has been removed from
  // mSendTrack.
  //
  // If we are active and the send track does match what we want to send, we
  // make sure mListener is added to the send track. Either now, or if we're
  // still waiting for another send track to be removed, during a future call to
  // this method.

  if (mUnsettingSendTrack) {
    // We must wait for the send track to be unset before we can set it again,
    // to avoid races. Once unset this function is triggered again.
    return;
  }

  const bool wasTransmitting = mTransmitting;

  const bool haveLiveSendTrack = mSendTrack && !mSendTrack->IsDestroyed();
  const bool haveLiveDomTrack = mDomTrack.Ref() && !mDomTrack.Ref()->Ended();
  const bool haveLiveOverrideTrack =
      mSendTrackOverride.Ref() && !mSendTrackOverride.Ref()->IsDestroyed();
  const bool mustRemoveSendTrack =
      haveLiveSendTrack && !mSendTrackOverride.Ref() &&
      (!haveLiveDomTrack || mDomTrack.Ref()->GetTrack() != mSendPortSource);

  mTransmitting = mActive && (haveLiveDomTrack || haveLiveOverrideTrack) &&
                  !mustRemoveSendTrack;

  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
          ("MediaPipeline %p UpdateSendState wasTransmitting=%d, active=%d, "
           "sendTrack=%p (%s), domTrack=%p (%s), "
           "sendTrackOverride=%p (%s), mustRemove=%d, mTransmitting=%d",
           this, wasTransmitting, mActive.Ref(), mSendTrack.get(),
           haveLiveSendTrack ? "live" : "ended", mDomTrack.Ref().get(),
           haveLiveDomTrack ? "live" : "ended", mSendTrackOverride.Ref().get(),
           haveLiveOverrideTrack ? "live" : "ended", mustRemoveSendTrack,
           mTransmitting));

  if (!wasTransmitting && mTransmitting) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
            ("Attaching pipeline %p to track %p conduit type=%s"this,
             mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
    if (mDescriptionInvalidated) {
      // Only update the description when we attach to a track, as detaching is
      // always a longer async step than updating the description. Updating on
      // detach would cause the wrong track id to be attributed in logs.
      RUN_ON_THREAD(mStsThread,
                    WrapRunnable(RefPtr<MediaPipeline>(this),
                                 &MediaPipelineTransmit::SetDescription_s,
                                 GenerateDescription()),
                    NS_DISPATCH_NORMAL);
      mDescriptionInvalidated = false;
    }
    if (mSendTrackOverride.Ref()) {
      // Special path that allows unittests to avoid mDomTrack and the graph by
      // manually calling SetSendTrack.
      mSendTrack = mSendTrackOverride.Ref();
    } else {
      mSendTrack = mDomTrack.Ref()->Graph()->CreateForwardedInputTrack(
          mDomTrack.Ref()->GetTrack()->mType);
      mSendPortSource = mDomTrack.Ref()->GetTrack();
      mSendPort = mSendTrack->AllocateInputPort(mSendPortSource.get());
    }
    if (mIsVideo) {
      mConverter->SetTrackingId(mDomTrack.Ref()->GetSource().mTrackingId);
    }
    mSendTrack->QueueSetAutoend(false);
    if (mIsVideo) {
      mSendTrack->AddDirectListener(mListener);
    }
    mSendTrack->AddListener(mListener);
  }

  if (wasTransmitting && !mTransmitting) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
            ("Detaching pipeline %p from track %p conduit type=%s"this,
             mDomTrack.Ref().get(), mIsVideo ? "video" : "audio"));
    mUnsettingSendTrack = true;
    if (mIsVideo) {
      mSendTrack->RemoveDirectListener(mListener);
    }
    mSendTrack->RemoveListener(mListener)->Then(
        GetMainThreadSerialEventTarget(), __func__,
        [this, self = RefPtr<MediaPipelineTransmit>(this)] {
          mUnsettingSendTrack = false;
          mSendTrack = nullptr;
          if (!mWatchManager.IsShutdown()) {
            mWatchManager.ManualNotify(&MediaPipelineTransmit::UpdateSendState);
          }
        });
    if (!mSendTrackOverride.Ref()) {
      // If an override is set it may be re-used.
      mSendTrack->Destroy();
      mSendPort->Destroy();
      mSendPort = nullptr;
      mSendPortSource = nullptr;
    }
  }
}

bool MediaPipelineTransmit::Transmitting() const {
  MOZ_ASSERT(NS_IsMainThread());

  return mActive;
}

bool MediaPipelineTransmit::IsVideo() const { return mIsVideo; }

void MediaPipelineTransmit::PrincipalChanged(dom::MediaStreamTrack* aTrack) {
  MOZ_ASSERT(aTrack && aTrack == mDomTrack.Ref());

  PeerConnectionWrapper pcw(mPc);
  if (pcw.impl()) {
    Document* doc = pcw.impl()->GetParentObject()->GetExtantDoc();
    if (doc) {
      UpdateSinkIdentity(doc->NodePrincipal(), pcw.impl()->GetPeerIdentity());
    } else {
      MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
              ("Can't update sink principal; document gone"));
    }
  }
}

void MediaPipelineTransmit::UpdateSinkIdentity(
    nsIPrincipal* aPrincipal, const PeerIdentity* aSinkIdentity) {
  MOZ_ASSERT(NS_IsMainThread());

  if (!mDomTrack.Ref()) {
    // Nothing to do here
    return;
  }

  bool enableTrack = aPrincipal->Subsumes(mDomTrack.Ref()->GetPrincipal());
  if (!enableTrack) {
    // first try didn't work, but there's a chance that this is still available
    // if our track is bound to a peerIdentity, and the peer connection (our
    // sink) is bound to the same identity, then we can enable the track.
    const PeerIdentity* trackIdentity = mDomTrack.Ref()->GetPeerIdentity();
    if (aSinkIdentity && trackIdentity) {
      enableTrack = (*aSinkIdentity == *trackIdentity);
    }
  }

  mListener->SetEnabled(enableTrack);
}

void MediaPipelineTransmit::TransportReady_s() {
  ASSERT_ON_THREAD(mStsThread);
  // Call base ready function.
  MediaPipeline::TransportReady_s();
  mListener->SetActive(true);
}

nsresult MediaPipelineTransmit::SetTrack(
    const RefPtr<MediaStreamTrack>& aDomTrack) {
  MOZ_ASSERT(NS_IsMainThread());
  if (mDomTrack.Ref()) {
    mDomTrack.Ref()->RemovePrincipalChangeObserver(this);
  }

  if (aDomTrack) {
    nsString nsTrackId;
    aDomTrack->GetId(nsTrackId);
    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
            ("Reattaching pipeline to track %p track %s conduit type: %s",
             aDomTrack.get(), NS_ConvertUTF16toUTF8(nsTrackId).get(),
             mIsVideo ? "video" : "audio"));
  }

  mDescriptionInvalidated = true;
  mDomTrack = aDomTrack;
  if (mDomTrack.Ref()) {
    mDomTrack.Ref()->AddPrincipalChangeObserver(this);
    PrincipalChanged(mDomTrack.Ref());
  }

  return NS_OK;
}

RefPtr<dom::MediaStreamTrack> MediaPipelineTransmit::GetTrack() const {
  MOZ_ASSERT(NS_IsMainThread());
  return mDomTrack;
}

void MediaPipelineTransmit::SetSendTrackOverride(
    const RefPtr<ProcessedMediaTrack>& aSendTrack) {
  MOZ_ASSERT(NS_IsMainThread());
  MOZ_RELEASE_ASSERT(!mSendTrack);
  MOZ_RELEASE_ASSERT(!mSendPort);
  MOZ_RELEASE_ASSERT(!mSendTrackOverride.Ref());
  mDescriptionInvalidated = true;
  mSendTrackOverride = aSendTrack;
}

// Called if we're attached with AddDirectListener()
void MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData(
    MediaTrackGraph* aGraph, TrackTime aOffset, const MediaSegment& aMedia) {
  MOZ_LOG(
      gMediaPipelineLog, LogLevel::Debug,
      ("MediaPipeline::NotifyRealtimeTrackData() listener=%p, offset=%" PRId64
       ", duration=%" PRId64,
       this, aOffset, aMedia.GetDuration()));
  TRACE_COMMENT(
      "MediaPipelineTransmit::PipelineListener::NotifyRealtimeTrackData""%s",
      aMedia.GetType() == MediaSegment::VIDEO ? "Video" : "Audio");
  NewData(aMedia, aGraph->GraphRate());
}

void MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges(
    MediaTrackGraph* aGraph, TrackTime aOffset,
    const MediaSegment& aQueuedMedia) {
  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
          ("MediaPipeline::NotifyQueuedChanges()"));

  if (aQueuedMedia.GetType() == MediaSegment::VIDEO) {
    // We always get video from the direct listener.
    return;
  }

  TRACE("MediaPipelineTransmit::PipelineListener::NotifyQueuedChanges (Audio)");

  if (mDirectConnect) {
    // ignore non-direct data if we're also getting direct data
    return;
  }

  size_t rate;
  if (aGraph) {
    rate = aGraph->GraphRate();
  } else {
    // When running tests, graph may be null. In that case use a default.
    rate = 16000;
  }
  NewData(aQueuedMedia, rate);
}

void MediaPipelineTransmit::PipelineListener::NotifyEnabledStateChanged(
    MediaTrackGraph* aGraph, bool aEnabled) {
  if (mConduit->type() != MediaSessionConduit::VIDEO) {
    return;
  }
  MOZ_ASSERT(mConverter);
  mConverter->SetTrackEnabled(aEnabled);
}

void MediaPipelineTransmit::PipelineListener::NotifyDirectListenerInstalled(
    InstallationResult aResult) {
  MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
          ("MediaPipeline::NotifyDirectListenerInstalled() listener=%p,"
           " result=%d",
           thisstatic_cast<int32_t>(aResult)));

  mDirectConnect = InstallationResult::SUCCESS == aResult;
}

void MediaPipelineTransmit::PipelineListener::
    NotifyDirectListenerUninstalled() {
  MOZ_LOG(
      gMediaPipelineLog, LogLevel::Info,
      ("MediaPipeline::NotifyDirectListenerUninstalled() listener=%p"this));

  if (mConduit->type() == MediaSessionConduit::VIDEO) {
    // Reset the converter's track-enabled state. If re-added to a new track
    // later and that track is disabled, we will be signaled explicitly.
    MOZ_ASSERT(mConverter);
    mConverter->SetTrackEnabled(true);
  }

  mDirectConnect = false;
}

void MediaPipelineTransmit::PipelineListener::NewData(
    const MediaSegment& aMedia, TrackRate aRate /* = 0 */) {
  if (mConduit->type() != (aMedia.GetType() == MediaSegment::AUDIO
                               ? MediaSessionConduit::AUDIO
                               : MediaSessionConduit::VIDEO)) {
    MOZ_ASSERT(false,
               "The media type should always be correct since the "
               "listener is locked to a specific track");
    return;
  }

  // TODO(ekr@rtfm.com): For now assume that we have only one
  // track type and it's destined for us
  // See bug 784517
  if (aMedia.GetType() == MediaSegment::AUDIO) {
    MOZ_RELEASE_ASSERT(aRate > 0);

    if (!mActive) {
      MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
              ("Discarding audio packets because transport not ready"));
      return;
    }

    const AudioSegment* audio = static_cast<const AudioSegment*>(&aMedia);
    for (AudioSegment::ConstChunkIterator iter(*audio); !iter.IsEnded();
         iter.Next()) {
      mAudioProcessing->QueueAudioChunk(aRate, *iter, mEnabled);
    }
  } else {
    const VideoSegment* video = static_cast<const VideoSegment*>(&aMedia);

    for (VideoSegment::ConstChunkIterator iter(*video); !iter.IsEnded();
         iter.Next()) {
      mConverter->QueueVideoChunk(*iter, !mEnabled);
    }
  }
}

class GenericReceiveListener : public MediaTrackListener {
 public:
  GenericReceiveListener(RefPtr<SourceMediaTrack> aSource,
                         TrackingId aTrackingId)
      : mSource(std::move(aSource)),
        mTrackingId(std::move(aTrackingId)),
        mIsAudio(mSource->mType == MediaSegment::AUDIO),
        mEnabled(false) {
    MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread());
    MOZ_DIAGNOSTIC_ASSERT(mSource, "Must be used with a SourceMediaTrack");
  }

  virtual ~GenericReceiveListener() = default;

  void Init() { mSource->AddListener(this); }
  void Shutdown() { mSource->RemoveListener(this); }

  void SetEnabled(bool aEnabled) {
    if (mEnabled == aEnabled) {
      return;
    }
    mEnabled = aEnabled;
    if (mIsAudio && !mSource->IsDestroyed()) {
      mSource->SetPullingEnabled(mEnabled);
    }
  }

 protected:
  const RefPtr<SourceMediaTrack> mSource;
  const TrackingId mTrackingId;
  const bool mIsAudio;
  // Main thread only.
  bool mEnabled;
};

MediaPipelineReceive::MediaPipelineReceive(
    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
    RefPtr<MediaSessionConduit> aConduit)
    : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::RECEIVE,
                    std::move(aCallThread), std::move(aStsThread),
                    std::move(aConduit)),
      mWatchManager(this, AbstractThread::MainThread()) {
  mWatchManager.Watch(mActive, &MediaPipelineReceive::UpdateListener);
}

MediaPipelineReceive::~MediaPipelineReceive() = default;

void MediaPipelineReceive::InitControl(
    MediaPipelineReceiveControlInterface* aControl) {
  aControl->CanonicalReceiving().ConnectMirror(&mActive);
}

void MediaPipelineReceive::Shutdown() {
  MOZ_ASSERT(NS_IsMainThread());
  MediaPipeline::Shutdown();
  mWatchManager.Shutdown();
}

class MediaPipelineReceiveAudio::PipelineListener
    : public GenericReceiveListener {
 public:
  PipelineListener(RefPtr<SourceMediaTrack> aSource, TrackingId aTrackingId,
                   RefPtr<MediaSessionConduit> aConduit,
                   PrincipalHandle aPrincipalHandle, PrincipalPrivacy aPrivacy)
      : GenericReceiveListener(std::move(aSource), std::move(aTrackingId)),
        mConduit(std::move(aConduit)),
        // AudioSession conduit only supports 16, 32, 44.1 and 48kHz
        // This is an artificial limitation, it would however require more
        // changes to support any rates. If the sampling rate is not-supported,
        // we will use 48kHz instead.
        mRate(static_cast<AudioSessionConduit*>(mConduit.get())
                      ->IsSamplingFreqSupported(mSource->Graph()->GraphRate())
                  ? mSource->Graph()->GraphRate()
                  : WEBRTC_MAX_SAMPLE_RATE),
        mTaskQueue(TaskQueue::Create(
            GetMediaThreadPool(MediaThreadType::WEBRTC_WORKER),
            "AudioPipelineListener")),
        mPlayedTicks(0),
        mAudioFrame(std::make_unique<webrtc::AudioFrame>()),
        mPrincipalHandle(std::move(aPrincipalHandle)),
        mPrivacy(aPrivacy),
        mForceSilence(false) {}

  void Init() {
    GenericReceiveListener::Init();
    mSource->SetAppendDataSourceRate(mRate);
  }

  // Implement MediaTrackListener
  void NotifyPull(MediaTrackGraph* aGraph, TrackTime aEndOfAppendedData,
                  TrackTime aDesiredTime) override {
    NotifyPullImpl(aDesiredTime);
  }

  void OnPrivacyRequested_s() {
    if (mPrivacy == PrincipalPrivacy::Private) {
      return;
    }
    mForceSilence = true;
  }

  void SetPrivatePrincipal(PrincipalHandle aHandle) {
    MOZ_ASSERT(NS_IsMainThread());

    if (mSource->IsDestroyed()) {
      return;
    }

    mSource->QueueControlMessageWithNoShutdown(
        [self = RefPtr{this}, this,
         privatePrincipal = std::move(aHandle)]() mutable {
          if (mPrivacy == PrincipalPrivacy::Private) {
            return;
          }
          mPrincipalHandle = std::move(privatePrincipal);
          mPrivacy = PrincipalPrivacy::Private;
          mForceSilence = false;
        });
  }

 private:
  ~PipelineListener() = default;

  void NotifyPullImpl(TrackTime aDesiredTime) {
    TRACE_COMMENT("PiplineListener::NotifyPullImpl""PipelineListener %p",
                  this);
    uint32_t samplesPer10ms = mRate / 100;

    // mSource's rate is not necessarily the same as the graph rate, since there
    // are sample-rate constraints on the inbound audio: only 16, 32, 44.1 and
    // 48kHz are supported. The audio frames we get here is going to be
    // resampled when inserted into the graph. aDesiredTime and mPlayedTicks are
    // in the graph rate.

    while (mPlayedTicks < aDesiredTime) {
      // This fetches 10ms of data, either mono or stereo
      MediaConduitErrorCode err =
          static_cast<AudioSessionConduit*>(mConduit.get())
              ->GetAudioFrame(mRate, mAudioFrame.get());

      if (err != kMediaConduitNoError) {
        // Insert silence on conduit/GIPS failure (extremely unlikely)
        MOZ_LOG(gMediaPipelineLog, LogLevel::Error,
                ("Audio conduit failed (%d) to return data @ %" PRId64
                 " (desired %" PRId64 " -> %f)",
                 err, mPlayedTicks, aDesiredTime,
                 mSource->TrackTimeToSeconds(aDesiredTime)));
        constexpr size_t mono = 1;
        mAudioFrame->UpdateFrame(
            mAudioFrame->timestamp_, nullptr, samplesPer10ms, mRate,
            mAudioFrame->speech_type_, mAudioFrame->vad_activity_,
            std::max(mono, mAudioFrame->num_channels()));
      }

      MOZ_LOG(
          gMediaPipelineLog, LogLevel::Debug,
          ("Audio conduit returned buffer for %zu channels, %zu frames",
           mAudioFrame->num_channels(), mAudioFrame->samples_per_channel()));

      AudioSegment segment;
      if (mForceSilence || mAudioFrame->muted()) {
        segment.AppendNullData(mAudioFrame->samples_per_channel());
      } else {
        CheckedInt<size_t> bufferSize(sizeof(uint16_t));
        bufferSize *= mAudioFrame->samples_per_channel();
        bufferSize *= mAudioFrame->num_channels();
        RefPtr<SharedBuffer> samples = SharedBuffer::Create(bufferSize);
        int16_t* samplesData = static_cast<int16_t*>(samples->Data());
        AutoTArray<int16_t*, 2> channels;
        AutoTArray<const int16_t*, 2> outputChannels;

        channels.SetLength(mAudioFrame->num_channels());

        size_t offset = 0;
        for (size_t i = 0; i < mAudioFrame->num_channels(); i++) {
          channels[i] = samplesData + offset;
          offset += mAudioFrame->samples_per_channel();
        }

        DeinterleaveAndConvertBuffer(
            mAudioFrame->data(), mAudioFrame->samples_per_channel(),
            mAudioFrame->num_channels(), channels.Elements());

        outputChannels.AppendElements(channels);

        segment.AppendFrames(samples.forget(), outputChannels,
                             mAudioFrame->samples_per_channel(),
                             mPrincipalHandle);
      }

      // Handle track not actually added yet or removed/finished
      if (TrackTime appended = mSource->AppendData(&segment)) {
        mPlayedTicks += appended;
      } else {
        MOZ_LOG(gMediaPipelineLog, LogLevel::Error, ("AppendData failed"));
        // we can't un-read the data, but that's ok since we don't want to
        // buffer - but don't i-loop!
        break;
      }
    }
  }

  const RefPtr<MediaSessionConduit> mConduit;
  // This conduit's sampling rate. This is either 16, 32, 44.1 or 48kHz, and
  // tries to be the same as the graph rate. If the graph rate is higher than
  // 48kHz, mRate is capped to 48kHz. If mRate does not match the graph rate,
  // audio is resampled to the graph rate.
  const TrackRate mRate;
  const RefPtr<TaskQueue> mTaskQueue;
  // Number of frames of data that has been added to the SourceMediaTrack in
  // the graph's rate. Graph thread only.
  TrackTicks mPlayedTicks;
  // Allocation of an audio frame used as a scratch buffer when reading data out
  // of libwebrtc for forwarding into the graph. Graph thread only.
  std::unique_ptr<webrtc::AudioFrame> mAudioFrame;
  // Principal handle used when appending data to the SourceMediaTrack. Graph
  // thread only.
  PrincipalHandle mPrincipalHandle;
  // Privacy of mPrincipalHandle. Graph thread only.
  PrincipalPrivacy mPrivacy;
  // Set to true on the sts thread if privacy is requested when ALPN was
  // negotiated. Set to false again when mPrincipalHandle is private.
  Atomic<bool> mForceSilence;
};

MediaPipelineReceiveAudio::MediaPipelineReceiveAudio(
    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
    RefPtr<AudioSessionConduit> aConduit, RefPtr<SourceMediaTrack> aSource,
    TrackingId aTrackingId, PrincipalHandle aPrincipalHandle,
    PrincipalPrivacy aPrivacy)
    : MediaPipelineReceive(aPc, std::move(aTransportHandler),
                           std::move(aCallThread), std::move(aStsThread),
                           std::move(aConduit)),
      mListener(aSource ? new PipelineListener(
                              std::move(aSource), std::move(aTrackingId),
                              mConduit, std::move(aPrincipalHandle), aPrivacy)
                        : nullptr) {
  mDescription = mPc + "| Receive audio";
  if (mListener) {
    mListener->Init();
  }
}

void MediaPipelineReceiveAudio::Shutdown() {
  MOZ_ASSERT(NS_IsMainThread());
  MediaPipelineReceive::Shutdown();
  if (mListener) {
    mListener->Shutdown();
  }
}

void MediaPipelineReceiveAudio::OnPrivacyRequested_s() {
  ASSERT_ON_THREAD(mStsThread);
  if (mListener) {
    mListener->OnPrivacyRequested_s();
  }
}

void MediaPipelineReceiveAudio::SetPrivatePrincipal(PrincipalHandle aHandle) {
  MOZ_ASSERT(NS_IsMainThread());
  if (mListener) {
    mListener->SetPrivatePrincipal(std::move(aHandle));
  }
}

void MediaPipelineReceiveAudio::UpdateListener() {
  MOZ_ASSERT(NS_IsMainThread());
  if (mListener) {
    mListener->SetEnabled(mActive.Ref());
  }
}

class MediaPipelineReceiveVideo::PipelineListener
    : public GenericReceiveListener {
 public:
  PipelineListener(RefPtr<SourceMediaTrack> aSource, TrackingId aTrackingId,
                   PrincipalHandle aPrincipalHandle, PrincipalPrivacy aPrivacy)
      : GenericReceiveListener(std::move(aSource), std::move(aTrackingId)),
        mImageContainer(MakeAndAddRef<ImageContainer>(
            ImageUsageType::Webrtc, ImageContainer::ASYNCHRONOUS)),
        mMutex("MediaPipelineReceiveVideo::PipelineListener::mMutex"),
        mPrincipalHandle(std::move(aPrincipalHandle)),
        mPrivacy(aPrivacy) {}
  void OnPrivacyRequested_s() {
    MutexAutoLock lock(mMutex);
    if (mPrivacy == PrincipalPrivacy::Private) {
      return;
    }
    mForceDropFrames = true;
  }

  void SetPrivatePrincipal(PrincipalHandle aHandle) {
    MutexAutoLock lock(mMutex);
    if (mPrivacy == PrincipalPrivacy::Private) {
      return;
    }
    mPrincipalHandle = std::move(aHandle);
    mPrivacy = PrincipalPrivacy::Private;
    mForceDropFrames = false;
  }

  void RenderVideoFrame(const webrtc::VideoFrame& aVideoFrame) {
    PrincipalHandle principal;
    {
      MutexAutoLock lock(mMutex);
      if (mForceDropFrames) {
        return;
      }
      principal = mPrincipalHandle;
    }
    RefPtr<Image> image;
    const webrtc::VideoFrameBuffer& buffer = *aVideoFrame.video_frame_buffer();
    if (buffer.type() == webrtc::VideoFrameBuffer::Type::kNative) {
      // We assume that only native handles are used with the
      // WebrtcMediaDataCodec decoder.
      const ImageBuffer* imageBuffer = static_cast<const ImageBuffer*>(&buffer);
      image = imageBuffer->GetNativeImage();
    } else {
      MOZ_ASSERT(buffer.type() == webrtc::VideoFrameBuffer::Type::kI420);
      rtc::scoped_refptr<const webrtc::I420BufferInterface> i420(
          buffer.GetI420());

      MOZ_ASSERT(i420->DataY());
      // Create a video frame using |buffer|.
      PerformanceRecorder<CopyVideoStage> rec(
          "MediaPipelineReceiveVideo::CopyToImage"_ns, mTrackingId,
          i420->width(), i420->height());

      RefPtr<PlanarYCbCrImage> yuvImage =
          mImageContainer->CreatePlanarYCbCrImage();

      PlanarYCbCrData yuvData;
      yuvData.mYChannel = const_cast<uint8_t*>(i420->DataY());
      yuvData.mYStride = i420->StrideY();
      MOZ_ASSERT(i420->StrideU() == i420->StrideV());
      yuvData.mCbCrStride = i420->StrideU();
      yuvData.mCbChannel = const_cast<uint8_t*>(i420->DataU());
      yuvData.mCrChannel = const_cast<uint8_t*>(i420->DataV());
      yuvData.mPictureRect = IntRect(0, 0, i420->width(), i420->height());
      yuvData.mStereoMode = StereoMode::MONO;
      // This isn't the best default.
      yuvData.mYUVColorSpace = gfx::YUVColorSpace::BT601;
      yuvData.mChromaSubsampling =
          gfx::ChromaSubsampling::HALF_WIDTH_AND_HEIGHT;

      if (NS_FAILED(yuvImage->CopyData(yuvData))) {
        MOZ_ASSERT(false);
        return;
      }
      rec.Record();

      image = std::move(yuvImage);
    }

    Maybe<webrtc::Timestamp> receiveTime;
    for (const auto& packet : aVideoFrame.packet_infos()) {
      if (!receiveTime || *receiveTime < packet.receive_time()) {
        receiveTime = Some(packet.receive_time());
      }
    }

    VideoSegment segment;
    auto size = image->GetSize();
    auto processingDuration =
        aVideoFrame.processing_time()
            ? media::TimeUnit::FromMicroseconds(
                  aVideoFrame.processing_time()->Elapsed().us())
            : media::TimeUnit::Invalid();
    segment.AppendWebrtcRemoteFrame(
        image.forget(), size, principal,
        /* aForceBlack */ false, TimeStamp::Now(), processingDuration,
        aVideoFrame.rtp_timestamp(), aVideoFrame.ntp_time_ms(),
        receiveTime ? receiveTime->us() : 0);
    mSource->AppendData(&segment);
  }

 private:
  RefPtr<layers::ImageContainer> mImageContainer;
  Mutex mMutex;
  PrincipalHandle mPrincipalHandle MOZ_GUARDED_BY(mMutex);
  PrincipalPrivacy mPrivacy MOZ_GUARDED_BY(mMutex);
  // Set to true on the sts thread if privacy is requested when ALPN was
  // negotiated. Set to false again when mPrincipalHandle is private.
  bool mForceDropFrames MOZ_GUARDED_BY(mMutex) = false;
};

class MediaPipelineReceiveVideo::PipelineRenderer
    : public mozilla::VideoRenderer {
 public:
  explicit PipelineRenderer(MediaPipelineReceiveVideo* aPipeline)
      : mPipeline(aPipeline) {}

  void Detach() { mPipeline = nullptr; }

  // Implement VideoRenderer
  void FrameSizeChange(unsigned int aWidth, unsigned int aHeight) override {}
  void RenderVideoFrame(const webrtc::VideoFrame& aVideoFrame) override {
    mPipeline->mListener->RenderVideoFrame(aVideoFrame);
  }

 private:
  MediaPipelineReceiveVideo* mPipeline;  // Raw pointer to avoid cycles
};

MediaPipelineReceiveVideo::MediaPipelineReceiveVideo(
    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
    RefPtr<AbstractThread> aCallThread, RefPtr<nsISerialEventTarget> aStsThread,
    RefPtr<VideoSessionConduit> aConduit, RefPtr<SourceMediaTrack> aSource,
    TrackingId aTrackingId, PrincipalHandle aPrincipalHandle,
    PrincipalPrivacy aPrivacy)
    : MediaPipelineReceive(aPc, std::move(aTransportHandler),
                           std::move(aCallThread), std::move(aStsThread),
                           std::move(aConduit)),
      mRenderer(new PipelineRenderer(this)),
      mListener(aSource ? new PipelineListener(
                              std::move(aSource), std::move(aTrackingId),
                              std::move(aPrincipalHandle), aPrivacy)
                        : nullptr) {
  mDescription = mPc + "| Receive video";
  if (mListener) {
    mListener->Init();
  }
  static_cast<VideoSessionConduit*>(mConduit.get())->AttachRenderer(mRenderer);
}

void MediaPipelineReceiveVideo::Shutdown() {
  MOZ_ASSERT(NS_IsMainThread());
  MediaPipelineReceive::Shutdown();
  if (mListener) {
    mListener->Shutdown();
  }

  // stop generating video and thus stop invoking the PipelineRenderer
  // and PipelineListener - the renderer has a raw ptr to the Pipeline to
  // avoid cycles, and the render callbacks are invoked from a different
  // thread so simple null-checks would cause TSAN bugs without locks.
  static_cast<VideoSessionConduit*>(mConduit.get())->DetachRenderer();
}

void MediaPipelineReceiveVideo::OnPrivacyRequested_s() {
  ASSERT_ON_THREAD(mStsThread);
  if (mListener) {
    mListener->OnPrivacyRequested_s();
  }
}

void MediaPipelineReceiveVideo::SetPrivatePrincipal(PrincipalHandle aHandle) {
  MOZ_ASSERT(NS_IsMainThread());
  if (mListener) {
    mListener->SetPrivatePrincipal(std::move(aHandle));
  }
}

void MediaPipelineReceiveVideo::UpdateListener() {
  MOZ_ASSERT(NS_IsMainThread());
  if (mListener) {
    mListener->SetEnabled(mActive.Ref());
  }
}

const dom::RTCStatsTimestampMaker& MediaPipeline::GetTimestampMaker() const {
  return mConduit->GetTimestampMaker();
}

DOMHighResTimeStamp MediaPipeline::RtpCSRCStats::GetExpiryFromTime(
    const DOMHighResTimeStamp aTime) {
  // DOMHighResTimeStamp is a unit measured in ms
  return aTime + EXPIRY_TIME_MILLISECONDS;
}

MediaPipeline::RtpCSRCStats::RtpCSRCStats(const uint32_t aCsrc,
                                          const DOMHighResTimeStamp aTime)
    : mCsrc(aCsrc), mTimestamp(aTime) {}

void MediaPipeline::RtpCSRCStats::GetWebidlInstance(
    dom::RTCRTPContributingSourceStats& aWebidlObj,
    const nsString& aInboundRtpStreamId) const {
  nsString statId = u"csrc_"_ns + aInboundRtpStreamId;
  statId.AppendLiteral("_");
  statId.AppendInt(mCsrc);
  aWebidlObj.mId.Construct(statId);
  aWebidlObj.mType.Construct(RTCStatsType::Csrc);
  aWebidlObj.mTimestamp.Construct(mTimestamp);
  aWebidlObj.mContributorSsrc.Construct(mCsrc);
  aWebidlObj.mInboundRtpStreamId.Construct(aInboundRtpStreamId);
}

}  // namespace mozilla

Messung V0.5
C=93 H=92 G=92

¤ Dauer der Verarbeitung: 0.23 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.