/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
#define NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_
#include <memory>
#include <string>
#include <vector>
#include <errno.h>
#include "nsISupports.h"
#include "nsCOMPtr.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/WeakPtr.h"
#include "nsString.h"
#include "nsThreadUtils.h"
#include "nsTArray.h"
#include "nsDeque.h"
#include "mozilla/dom/Blob.h"
#include "mozilla/Mutex.h"
#include "DataChannelProtocol.h"
#include "DataChannelListener.h"
#include "mozilla/net/NeckoTargetHolder.h"
#include "transport/sigslot.h"
#include "transport/transportlayer.h" // For TransportLayer::State
#ifndef EALREADY
# define EALREADY WSAEALREADY
#endif
extern "C" {
struct socket;
struct sctp_rcvinfo;
}
namespace mozilla {
class DataChannelConnection;
class DataChannel;
class DataChannelOnMessageAvailable;
class MediaPacket;
class MediaTransportHandler;
namespace dom {
struct RTCStatsCollection;
};
enum class DataChannelState { Connecting, Open, Closing, Closed };
enum class DataChannelConnectionState { Connecting, Open, Closed };
enum class DataChannelReliabilityPolicy {
Reliable,
LimitedRetransmissions,
LimitedLifetime
};
// For sending outgoing messages.
// This class only holds a reference to the data and the info structure but does
// not copy it.
class OutgoingMsg {
public:
OutgoingMsg(
struct sctp_sendv_spa& info, Span<
const uint8_t> data);
void Advance(size_t offset);
struct sctp_sendv_spa& GetInfo()
const {
return *mInfo; };
size_t GetLength()
const {
return mData.Length(); };
Span<
const uint8_t> GetRemainingData()
const {
return mData.From(mPos); }
protected:
const Span<
const uint8_t> mData;
struct sctp_sendv_spa*
const mInfo;
size_t mPos = 0;
};
// For queuing outgoing messages
// This class copies data of an outgoing message.
class BufferedOutgoingMsg :
public OutgoingMsg {
public:
static UniquePtr<BufferedOutgoingMsg> CopyFrom(
const OutgoingMsg& msg);
private:
BufferedOutgoingMsg(nsTArray<uint8_t>&& data,
UniquePtr<
struct sctp_sendv_spa>&& info);
const nsTArray<uint8_t> mDataStorage;
const UniquePtr<
struct sctp_sendv_spa> mInfoStorage;
};
// for queuing incoming data messages before the Open or
// external negotiation is indicated to us
class QueuedDataMessage {
public:
QueuedDataMessage(uint16_t stream, uint32_t ppid,
int flags,
const uint8_t* data, uint32_t length)
: mStream(stream), mPpid(ppid), mFlags(flags), mData(data, length) {}
const uint16_t mStream;
const uint32_t mPpid;
const int mFlags;
const nsTArray<uint8_t> mData;
};
// One per PeerConnection
class DataChannelConnection final :
public net::NeckoTargetHolder,
public sigslot::has_slots<> {
friend class DataChannel;
friend class DataChannelOnMessageAvailable;
friend class DataChannelConnectRunnable;
virtual ~DataChannelConnection();
public:
enum class PendingType {
None,
// No outgoing messages are pending.
Dcep,
// Outgoing DCEP messages are pending.
Data,
// Outgoing data channel messages are pending.
};
class DataConnectionListener :
public SupportsWeakPtr {
public:
virtual ~DataConnectionListener() =
default;
// Called when a new DataChannel has been opened by the other side.
virtual void NotifyDataChannel(already_AddRefed<DataChannel> channel) = 0;
// Called when a DataChannel transitions to state open
virtual void NotifyDataChannelOpen(DataChannel* aChannel) = 0;
// Called when a DataChannel (that was open at some point in the past)
// transitions to state closed
virtual void NotifyDataChannelClosed(DataChannel* aChannel) = 0;
// Called when SCTP connects
virtual void NotifySctpConnected() = 0;
// Called when SCTP closes
virtual void NotifySctpClosed() = 0;
};
// Create a new DataChannel Connection
// Must be called on Main thread
static Maybe<RefPtr<DataChannelConnection>> Create(
DataConnectionListener* aListener, nsISerialEventTarget* aTarget,
MediaTransportHandler* aHandler,
const uint16_t aLocalPort,
const uint16_t aNumStreams,
const Maybe<uint64_t>& aMaxMessageSize);
DataChannelConnection(
const DataChannelConnection&) =
delete;
DataChannelConnection(DataChannelConnection&&) =
delete;
DataChannelConnection&
operator=(
const DataChannelConnection&) =
delete;
DataChannelConnection&
operator=(DataChannelConnection&&) =
delete;
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannelConnection)
void Destroy();
// So we can spawn refs tied to runnables in shutdown
// Finish Destroy on STS to avoid SCTP race condition with ABORT from far end
void DestroyOnSTS(
struct socket* aMasterSocket,
struct socket* aSocket);
void DestroyOnSTSFinal();
void SetMaxMessageSize(
bool aMaxMessageSizeSet, uint64_t aMaxMessageSize);
uint64_t GetMaxMessageSize();
void AppendStatsToReport(
const UniquePtr<dom::RTCStatsCollection>& aReport,
const DOMHighResTimeStamp aTimestamp)
const;
bool ConnectToTransport(
const std::string& aTransportId,
const bool aClient,
const uint16_t aLocalPort,
const uint16_t aRemotePort);
void TransportStateChange(
const std::string& aTransportId,
TransportLayer::State aState);
void CompleteConnect();
void SetSignals(
const std::string& aTransportId);
[[nodiscard]] already_AddRefed<DataChannel> Open(
const nsACString& label,
const nsACString& protocol,
DataChannelReliabilityPolicy prPolicy,
bool inOrder, uint32_t prValue,
DataChannelListener* aListener, nsISupports* aContext,
bool aExternalNegotiated, uint16_t aStream);
void Stop();
void Close(DataChannel* aChannel);
void CloseLocked(DataChannel* aChannel) MOZ_REQUIRES(mLock);
void CloseAll();
// Returns a POSIX error code.
int SendMsg(uint16_t stream,
const nsACString& aMsg) {
return SendDataMsgCommon(stream, aMsg,
false);
}
// Returns a POSIX error code.
int SendBinaryMsg(uint16_t stream,
const nsACString& aMsg) {
return SendDataMsgCommon(stream, aMsg,
true);
}
// Returns a POSIX error code.
int SendBlob(uint16_t stream, nsIInputStream* aBlob);
// Called on data reception from the SCTP library
// must(?) be public so my c->c++ trampoline can call it
// May be called with (STS thread) or without the lock
int ReceiveCallback(
struct socket* sock,
void* data, size_t datalen,
struct sctp_rcvinfo rcv,
int flags);
void ReadBlob(already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
nsIInputStream* aBlob);
bool SendDeferredMessages() MOZ_REQUIRES(mLock);
int SctpDtlsOutput(
void* addr,
void* buffer, size_t length, uint8_t tos,
uint8_t set_df);
bool InShutdown()
const {
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
return mShutdown;
#else
return false;
#endif
}
private:
class Channels {
public:
using ChannelArray = AutoTArray<RefPtr<DataChannel>, 16>;
Channels() : mMutex(
"DataChannelConnection::Channels::mMutex") {}
Channels(
const Channels&) =
delete;
Channels(Channels&&) =
delete;
Channels&
operator=(
const Channels&) =
delete;
Channels&
operator=(Channels&&) =
delete;
void Insert(
const RefPtr<DataChannel>& aChannel);
bool Remove(
const RefPtr<DataChannel>& aChannel);
RefPtr<DataChannel> Get(uint16_t aId)
const;
ChannelArray GetAll()
const {
MutexAutoLock lock(mMutex);
return mChannels.Clone();
}
RefPtr<DataChannel> GetNextChannel(uint16_t aCurrentId)
const;
private:
struct IdComparator {
bool Equals(
const RefPtr<DataChannel>& aChannel, uint16_t aId)
const;
bool LessThan(
const RefPtr<DataChannel>& aChannel, uint16_t aId)
const;
bool Equals(
const RefPtr<DataChannel>& a1,
const RefPtr<DataChannel>& a2)
const;
bool LessThan(
const RefPtr<DataChannel>& a1,
const RefPtr<DataChannel>& a2)
const;
};
mutable Mutex mMutex;
ChannelArray mChannels MOZ_GUARDED_BY(mMutex);
};
DataChannelConnection(DataConnectionListener* aListener,
nsISerialEventTarget* aTarget,
MediaTransportHandler* aHandler);
bool Init(
const uint16_t aLocalPort,
const uint16_t aNumStreams,
const Maybe<uint64_t>& aMaxMessageSize);
DataChannelConnectionState GetState()
const MOZ_REQUIRES(mLock) {
mLock.AssertCurrentThreadOwns();
return mState;
}
void SetState(DataChannelConnectionState aState) MOZ_REQUIRES(mLock);
static int OnThresholdEvent(
struct socket* sock, uint32_t sb_free,
void* ulp_info);
static void DTLSConnectThread(
void* data);
void SendPacket(std::unique_ptr<MediaPacket>&& packet);
void SctpDtlsInput(
const std::string& aTransportId,
const MediaPacket& packet);
DataChannel* FindChannelByStream(uint16_t stream) MOZ_REQUIRES(mLock);
uint16_t FindFreeStream()
const MOZ_REQUIRES(mLock);
bool RequestMoreStreams(int32_t aNeeded = 16) MOZ_REQUIRES(mLock);
uint32_t UpdateCurrentStreamIndex() MOZ_REQUIRES(mLock);
uint32_t GetCurrentStreamIndex() MOZ_REQUIRES(mLock);
int SendControlMessage(
const uint8_t* data, uint32_t len, uint16_t stream)
MOZ_REQUIRES(mLock);
int SendOpenAckMessage(uint16_t stream) MOZ_REQUIRES(mLock);
int SendOpenRequestMessage(
const nsACString& label,
const nsACString& protocol, uint16_t stream,
bool unordered,
DataChannelReliabilityPolicy prPolicy,
uint32_t prValue) MOZ_REQUIRES(mLock);
bool SendBufferedMessages(nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer,
size_t* aWritten) MOZ_REQUIRES(mLock);
int SendMsgInternal(OutgoingMsg& msg, size_t* aWritten) MOZ_REQUIRES(mLock);
int SendMsgInternalOrBuffer(nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer,
OutgoingMsg& msg,
bool& buffered,
size_t* aWritten) MOZ_REQUIRES(mLock);
int SendDataMsgInternalOrBuffer(DataChannel& channel,
const uint8_t* data,
size_t len, uint32_t ppid)
MOZ_REQUIRES(mLock);
int SendDataMsg(DataChannel& channel,
const uint8_t* data, size_t len,
uint32_t ppidPartial, uint32_t ppidFinal) MOZ_REQUIRES(mLock);
int SendDataMsgCommon(uint16_t stream,
const nsACString& aMsg,
bool isBinary);
void DeliverQueuedData(uint16_t stream) MOZ_REQUIRES(mLock);
already_AddRefed<DataChannel> OpenFinish(
already_AddRefed<DataChannel>&& aChannel) MOZ_REQUIRES(mLock);
void ProcessQueuedOpens() MOZ_REQUIRES(mLock);
void ClearResets() MOZ_REQUIRES(mLock);
void SendOutgoingStreamReset() MOZ_REQUIRES(mLock);
void ResetOutgoingStream(DataChannel& aChannel) MOZ_REQUIRES(mLock);
void HandleOpenRequestMessage(
const struct rtcweb_datachannel_open_request* req, uint32_t length,
uint16_t stream) MOZ_REQUIRES(mLock);
void HandleOpenAckMessage(
const struct rtcweb_datachannel_ack* ack,
uint32_t length, uint16_t stream);
void HandleUnknownMessage(uint32_t ppid, uint32_t length, uint16_t stream)
MOZ_REQUIRES(mLock);
uint8_t BufferMessage(nsACString& recvBuffer,
const void* data,
uint32_t length, uint32_t ppid,
int flags);
void HandleDataMessage(
const void* data, size_t length, uint32_t ppid,
uint16_t stream,
int flags) MOZ_REQUIRES(mLock);
void HandleDCEPMessage(
const void* buffer, size_t length, uint32_t ppid,
uint16_t stream,
int flags) MOZ_REQUIRES(mLock);
void HandleMessage(
const void* buffer, size_t length, uint32_t ppid,
uint16_t stream,
int flags) MOZ_REQUIRES(mLock);
void HandleAssociationChangeEvent(
const struct sctp_assoc_change* sac)
MOZ_REQUIRES(mLock);
void HandlePeerAddressChangeEvent(
const struct sctp_paddr_change* spc)
MOZ_REQUIRES(mLock);
void HandleRemoteErrorEvent(
const struct sctp_remote_error* sre)
MOZ_REQUIRES(mLock);
void HandleShutdownEvent(
const struct sctp_shutdown_event* sse)
MOZ_REQUIRES(mLock);
void HandleAdaptationIndication(
const struct sctp_adaptation_event* sai)
MOZ_REQUIRES(mLock);
void HandlePartialDeliveryEvent(
const struct sctp_pdapi_event* spde)
MOZ_REQUIRES(mLock);
void HandleSendFailedEvent(
const struct sctp_send_failed_event* ssfe)
MOZ_REQUIRES(mLock);
void HandleStreamResetEvent(
const struct sctp_stream_reset_event* strrst)
MOZ_REQUIRES(mLock);
void HandleStreamChangeEvent(
const struct sctp_stream_change_event* strchg)
MOZ_REQUIRES(mLock);
void HandleNotification(
const union sctp_notification* notif, size_t n)
MOZ_REQUIRES(mLock);
bool IsSTSThread()
const {
bool on =
false;
if (mSTS) {
mSTS->IsOnCurrentThread(&on);
}
return on;
}
mutable Mutex mLock;
// Avoid cycles with PeerConnectionImpl
// Use from main thread only as WeakPtr is not threadsafe
WeakPtr<DataConnectionListener> mListener;
bool mSendInterleaved MOZ_GUARDED_BY(mLock) =
false;
// MainThread only
bool mMaxMessageSizeSet =
false;
// mMaxMessageSize is only set on MainThread, but read off-main-thread
uint64_t mMaxMessageSize MOZ_GUARDED_BY(mLock) = 0;
// Main thread only
Maybe<
bool> mAllocateEven;
// Data:
// NOTE: while this container will auto-expand, increases in the number of
// channels available from the stack must be negotiated!
// Accessed from both main and sts, API is threadsafe
Channels mChannels;
// STS only
uint32_t mCurrentStream = 0;
// STS and main
std::set<RefPtr<DataChannel>> mPending MOZ_GUARDED_BY(mLock);
size_t mNegotiatedIdLimit MOZ_GUARDED_BY(mLock) = 0;
PendingType mPendingType MOZ_GUARDED_BY(mLock) = PendingType::None;
// holds data that's come in before a channel is open
nsTArray<UniquePtr<QueuedDataMessage>> mQueuedData MOZ_GUARDED_BY(mLock);
// holds outgoing control messages
nsTArray<UniquePtr<BufferedOutgoingMsg>> mBufferedControl
MOZ_GUARDED_BY(mLock);
// Streams pending reset. Accessed from main and STS.
AutoTArray<uint16_t, 4> mStreamsResetting MOZ_GUARDED_BY(mLock);
// accessed from STS thread
struct socket* mMasterSocket = nullptr;
// cloned from mMasterSocket on successful Connect on STS thread
struct socket* mSocket = nullptr;
DataChannelConnectionState mState MOZ_GUARDED_BY(mLock) =
DataChannelConnectionState::Closed;
std::string mTransportId;
bool mConnectedToTransportHandler =
false;
RefPtr<MediaTransportHandler> mTransportHandler;
nsCOMPtr<nsIEventTarget> mSTS;
uint16_t mLocalPort = 0;
// Accessed from connect thread
uint16_t mRemotePort = 0;
nsCOMPtr<nsIThread> mInternalIOThread = nullptr;
nsCString mRecvBuffer;
// Workaround to prevent a message from being received on main before the
// sender sees the decrease in bufferedAmount.
bool mDeferSend =
false;
std::vector<std::unique_ptr<MediaPacket>> mDeferredSend;
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
bool mShutdown;
#endif
uintptr_t mId = 0;
};
class DataChannel {
friend class DataChannelOnMessageAvailable;
friend class DataChannelConnection;
public:
struct TrafficCounters {
uint32_t mMessagesSent = 0;
uint64_t mBytesSent = 0;
uint32_t mMessagesReceived = 0;
uint64_t mBytesReceived = 0;
};
DataChannel(DataChannelConnection* connection, uint16_t stream,
DataChannelState state,
const nsACString& label,
const nsACString& protocol, DataChannelReliabilityPolicy policy,
uint32_t value,
bool ordered,
bool negotiated,
DataChannelListener* aListener, nsISupports* aContext);
DataChannel(
const DataChannel&) =
delete;
DataChannel(DataChannel&&) =
delete;
DataChannel&
operator=(
const DataChannel&) =
delete;
DataChannel&
operator=(DataChannel&&) =
delete;
private:
~DataChannel();
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannel)
// when we disconnect from the connection after stream RESET
void StreamClosedLocked();
// Complete dropping of the link between DataChannel and the connection.
// After this, except for a few methods below listed to be safe, you can't
// call into DataChannel.
void ReleaseConnection();
// Close this DataChannel. Can be called multiple times. MUST be called
// before destroying the DataChannel (state must be CLOSED or CLOSING).
void Close();
// Set the listener (especially for channels created from the other side)
void SetListener(DataChannelListener* aListener, nsISupports* aContext);
// Helper for send methods that converts POSIX error codes to an ErrorResult.
static void SendErrnoToErrorResult(
int error, size_t aMessageSize,
ErrorResult& aRv);
// Send a string
void SendMsg(
const nsACString& aMsg, ErrorResult& aRv);
// Send a binary message (TypedArray)
void SendBinaryMsg(
const nsACString& aMsg, ErrorResult& aRv);
// Send a binary blob
void SendBinaryBlob(dom::Blob& aBlob, ErrorResult& aRv);
DataChannelReliabilityPolicy GetType()
const {
return mPrPolicy; }
dom::Nullable<uint16_t> GetMaxPacketLifeTime()
const;
dom::Nullable<uint16_t> GetMaxRetransmits()
const;
bool GetNegotiated()
const {
return mNegotiated; }
bool GetOrdered()
const {
return mOrdered; }
void IncrementBufferedAmount(uint32_t aSize, ErrorResult& aRv);
void DecrementBufferedAmount(uint32_t aSize);
// Amount of data buffered to send
uint32_t GetBufferedAmount()
const {
MOZ_ASSERT(NS_IsMainThread());
return mBufferedAmount;
}
// Trigger amount for generating BufferedAmountLow events
uint32_t GetBufferedAmountLowThreshold()
const;
void SetBufferedAmountLowThreshold(uint32_t aThreshold);
void AnnounceOpen();
// TODO(bug 843625): Optionally pass an error here.
void AnnounceClosed();
// Find out state
DataChannelState GetReadyState()
const {
MOZ_ASSERT(NS_IsMainThread());
return mReadyState;
}
// Set ready state
void SetReadyState(DataChannelState aState);
void GetLabel(nsAString& aLabel) { CopyUTF8toUTF16(mLabel, aLabel); }
void GetProtocol(nsAString& aProtocol) {
CopyUTF8toUTF16(mProtocol, aProtocol);
}
uint16_t GetStream()
const {
return mStream; }
void SendOrQueue(DataChannelOnMessageAvailable* aMessage)
MOZ_REQUIRES(mConnection->mLock);
TrafficCounters GetTrafficCounters()
const;
bool HasSentStreamReset()
const {
return mHasSentStreamReset; }
void SetHasSentStreamReset() { mHasSentStreamReset =
true; }
private:
nsresult AddDataToBinaryMsg(
const char* data, uint32_t size);
bool EnsureValidStream(ErrorResult& aRv);
void WithTrafficCounters(
const std::function<
void(TrafficCounters&)>&);
// These are both mainthread only
DataChannelListener* mListener;
nsCOMPtr<nsISupports> mContext;
RefPtr<DataChannelConnection> mConnection;
// mainthread only
bool mEverOpened =
false;
const nsCString mLabel;
const nsCString mProtocol;
// This is mainthread only
DataChannelState mReadyState;
uint16_t mStream;
const DataChannelReliabilityPolicy mPrPolicy;
const uint32_t mPrValue;
// Accessed on main and STS
const bool mNegotiated;
const bool mOrdered;
// The data channel has completed the open procedure and the client has been
// notified about it.
bool mHasFinishedOpen =
false;
// The channel has been opened, but the peer has not yet acked - ensures that
// the messages are sent ordered until this is cleared.
bool mWaitingForAck =
false;
// A too large message was attempted to be sent - closing data channel.
bool mClosingTooLarge =
false;
bool mHasSentStreamReset =
false;
bool mIsRecvBinary;
size_t mBufferedThreshold;
// Read/written on main only. Decremented via message-passing, because the
// spec requires us to queue a task for this.
size_t mBufferedAmount;
nsCString mRecvBuffer;
nsTArray<UniquePtr<BufferedOutgoingMsg>> mBufferedData
MOZ_GUARDED_BY(mConnection->mLock);
nsCOMPtr<nsISerialEventTarget> mMainThreadEventTarget;
mutable Mutex mStatsLock;
TrafficCounters mTrafficCounters MOZ_GUARDED_BY(mStatsLock);
};
// used to dispatch notifications of incoming data to the main thread
// Patterned on CallOnMessageAvailable in WebSockets
// Also used to proxy other items to MainThread
class DataChannelOnMessageAvailable :
public Runnable {
public:
enum class EventType {
OnConnection,
OnDisconnected,
OnChannelCreated,
OnDataString,
OnDataBinary,
};
DataChannelOnMessageAvailable(
EventType aType, DataChannelConnection* aConnection,
DataChannel* aChannel,
nsCString& aData)
// XXX this causes inefficiency
: Runnable(
"DataChannelOnMessageAvailable"),
mType(aType),
mChannel(aChannel),
mConnection(aConnection),
mData(aData) {}
DataChannelOnMessageAvailable(EventType aType, DataChannel* aChannel)
: Runnable(
"DataChannelOnMessageAvailable"),
mType(aType),
mChannel(aChannel) {}
// XXX is it safe to leave mData uninitialized? This should only be
// used for notifications that don't use them, but I'd like more
// bulletproof compile-time checking.
DataChannelOnMessageAvailable(EventType aType,
DataChannelConnection* aConnection,
DataChannel* aChannel)
: Runnable(
"DataChannelOnMessageAvailable"),
mType(aType),
mChannel(aChannel),
mConnection(aConnection) {}
// for ON_CONNECTION/ON_DISCONNECTED
DataChannelOnMessageAvailable(EventType aType,
DataChannelConnection* aConnection)
: Runnable(
"DataChannelOnMessageAvailable"),
mType(aType),
mConnection(aConnection) {}
DataChannelOnMessageAvailable(
const DataChannelOnMessageAvailable&) =
delete;
DataChannelOnMessageAvailable(DataChannelOnMessageAvailable&&) =
delete;
DataChannelOnMessageAvailable&
operator=(
const DataChannelOnMessageAvailable&) =
delete;
DataChannelOnMessageAvailable&
operator=(DataChannelOnMessageAvailable&&) =
delete;
NS_IMETHOD Run() override;
private:
~DataChannelOnMessageAvailable() =
default;
EventType mType;
// XXX should use union
RefPtr<DataChannel> mChannel;
RefPtr<DataChannelConnection> mConnection;
nsCString mData;
};
}
// namespace mozilla
#endif // NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_