/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// Helper for queueing up actions to be run once the mutex has been unlocked. // Actions will be run in-order. class MOZ_SCOPED_CAPABILITY DataPipeAutoLock { public: explicit DataPipeAutoLock(Mutex& aMutex) MOZ_CAPABILITY_ACQUIRE(aMutex)
: mMutex(aMutex) {
mMutex.Lock();
}
DataPipeAutoLock(const DataPipeAutoLock&) = delete;
DataPipeAutoLock& operator=(const DataPipeAutoLock&) = delete;
void OnPortStatusChanged() final MOZ_EXCLUDES(*mMutex);
// Add a task to notify the callback after `aLock` is unlocked. // // This method is safe to call multiple times, as after the first time it is // called, `mCallback` will be cleared. void NotifyOnUnlock(DataPipeAutoLock& aLock) MOZ_REQUIRES(*mMutex) {
DoNotifyOnUnlock(aLock, mCallback.forget(), mCallbackTarget.forget());
}
// `mPort` may be destroyed by `SetPeerError` after the DataPipe is unlocked // but before we send the message. The strong controller and port references // will allow us to try to send the message anyway, and it will be safely // dropped if the port has already been closed. CONSUMED messages are safe // to deliver out-of-order, so we don't need to worry about ordering here.
aLock.AddUnlockAction([controller = RefPtr{mPort.Controller()},
port = mPort.Port(), aBytes]() mutable { auto message = MakeUnique<IPC::Message>(
MSG_ROUTING_NONE, DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE);
IPC::MessageWriter writer(*message);
WriteParam(&writer, aBytes);
controller->SendUserMessage(port, std::move(message));
});
}
void SetPeerError(DataPipeAutoLock& aLock, nsresult aStatus, bool aSendClosed = false) MOZ_REQUIRES(*mMutex) {
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("SetPeerError(%s%s) %s", GetStaticErrorName(aStatus),
aSendClosed ? ", send" : "", Describe(aLock).get())); // The pipe was closed or errored. Clear the observer reference back // to this type from the port layer, and ensure we notify waiters.
MOZ_ASSERT(NS_SUCCEEDED(mPeerStatus));
mPeerStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
aLock.AddUnlockAction([port = std::move(mPort), aStatus, aSendClosed] { if (aSendClosed) { auto message = MakeUnique<IPC::Message>(MSG_ROUTING_NONE,
DATA_PIPE_CLOSED_MESSAGE_TYPE);
IPC::MessageWriter writer(*message);
WriteParam(&writer, aStatus);
port.Controller()->SendUserMessage(port.Port(), std::move(message));
} // The `ScopedPort` being destroyed with this action will close it, // clearing the observer reference from the ports layer.
});
NotifyOnUnlock(aLock);
}
while (NS_SUCCEEDED(mPeerStatus)) {
UniquePtr<IPC::Message> message; if (!mPort.Controller()->GetMessage(mPort.Port(), &message)) {
SetPeerError(lock, NS_BASE_STREAM_CLOSED); return;
} if (!message) { return; // no more messages
}
IPC::MessageReader reader(*message); switch (message->type()) { case DATA_PIPE_CLOSED_MESSAGE_TYPE: {
nsresult status = NS_OK; if (!ReadParam(&reader, &status)) {
NS_WARNING("Unable to parse nsresult error from peer");
status = NS_ERROR_UNEXPECTED;
}
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("Got CLOSED(%s) %s", GetStaticErrorName(status),
Describe(lock).get()));
SetPeerError(lock, status); return;
} case DATA_PIPE_BYTES_CONSUMED_MESSAGE_TYPE: {
uint32_t consumed = 0; if (!ReadParam(&reader, &consumed)) {
NS_WARNING("Unable to parse bytes consumed from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED); return;
}
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("Got CONSUMED(%u) %s", consumed, Describe(lock).get())); auto newAvailable = CheckedUint32{mAvailable} + consumed; if (!newAvailable.isValid() || newAvailable.value() > mCapacity) {
NS_WARNING("Illegal bytes consumed message received from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED); return;
}
mAvailable = newAvailable.value(); if (!mCallbackClosureOnly) {
NotifyOnUnlock(lock);
} break;
} default: {
NS_WARNING("Illegal message type received from peer");
SetPeerError(lock, NS_ERROR_UNEXPECTED); return;
}
}
}
}
// Set our status to an errored status.
mStatus = NS_SUCCEEDED(aStatus) ? NS_BASE_STREAM_CLOSED : aStatus;
RefPtr<DataPipeLink> link = mLink.forget();
AssertSameMutex(link->mMutex);
link->NotifyOnUnlock(aLock);
// If our peer hasn't disappeared yet, clean up our connection to it. if (NS_SUCCEEDED(link->mPeerStatus)) {
link->SetPeerError(aLock, mStatus, /* aSendClosed */ true);
}
}
while (*aProcessedCount < aCount) {
DataPipeAutoLock lock(*mMutex);
mMutex->AssertCurrentThreadOwns();
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("ProcessSegments(%u of %u) %s", *aProcessedCount, aCount,
Describe(lock).get()));
nsresult status = CheckStatus(lock); if (NS_FAILED(status)) { if (*aProcessedCount > 0) { return NS_OK;
} return status == NS_BASE_STREAM_CLOSED ? NS_OK : status;
}
RefPtr<DataPipeLink> link = mLink;
AssertSameMutex(link->mMutex); if (!link->mAvailable) {
MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(link->mPeerStatus), "CheckStatus will have returned an error"); return *aProcessedCount > 0 ? NS_OK : NS_BASE_STREAM_WOULD_BLOCK;
}
MOZ_RELEASE_ASSERT(!link->mProcessingSegment, "Only one thread may be processing a segment at a time");
// Extract an iterator over the next contiguous region of the shared memory // buffer which will be used . char* start = static_cast<char*>(link->mShmem->Memory()) + link->mOffset; char* iter = start; char* end = start + std::min({aCount - *aProcessedCount, link->mAvailable,
link->mCapacity - link->mOffset});
// Record the consumed region from our segment when exiting this scope, // telling our peer how many bytes were consumed. Hold on to `mLink` to keep // the shmem mapped and make sure we can clean up even if we're closed while // processing the shmem region.
link->mProcessingSegment = true; auto scopeExit = MakeScopeExit([&] {
mMutex->AssertCurrentThreadOwns(); // should still be held
AssertSameMutex(link->mMutex);
MOZ_RELEASE_ASSERT(link->mProcessingSegment);
link->mProcessingSegment = false;
uint32_t totalProcessed = iter - start; if (totalProcessed > 0) {
link->mOffset += totalProcessed;
MOZ_RELEASE_ASSERT(link->mOffset <= link->mCapacity); if (link->mOffset == link->mCapacity) {
link->mOffset = 0;
}
link->mAvailable -= totalProcessed;
link->SendBytesConsumedOnUnlock(lock, totalProcessed);
}
MOZ_LOG(gDataPipeLog, LogLevel::Verbose,
("Processed Segment(%u of %zu) %s", totalProcessed, end - start,
Describe(lock).get()));
});
if (NS_FAILED(CheckStatus(lock))) { #ifdef DEBUG if (mLink) {
AssertSameMutex(mLink->mMutex);
MOZ_ASSERT(!mLink->mCallback);
} #endif
DoNotifyOnUnlock(lock, callback.forget(), target.forget()); return;
}
AssertSameMutex(mLink->mMutex);
// NOTE: After this point, `mLink` may have previously had a callback which is // now being cancelled, make sure we clear `mCallback` even if we're going to // call `aCallback` immediately.
mLink->mCallback = callback.forget();
mLink->mCallbackTarget = target.forget();
mLink->mCallbackClosureOnly = aClosureOnly; if (!aClosureOnly && mLink->mAvailable) {
mLink->NotifyOnUnlock(lock);
}
}
nsresult DataPipeBase::CheckStatus(DataPipeAutoLock& aLock) { // If our peer has closed or errored, we may need to close our local side to // reflect the error code our peer provided. If we're a sender, we want to // become closed immediately, whereas if we're a receiver we want to wait // until our available buffer has been exhausted. // // NOTE: There may still be 2-stage writes/reads ongoing at this point, which // will continue due to `mLink` being kept alive by the // `ProcessSegmentsInternal` function. if (NS_FAILED(mStatus)) { return mStatus;
}
AssertSameMutex(mLink->mMutex); if (NS_FAILED(mLink->mPeerStatus) &&
(!mLink->mReceiverSide || !mLink->mAvailable)) {
CloseInternal(aLock, mLink->mPeerStatus);
} return mStatus;
}
// Mark our peer as closed so we don't try to send to it when closing.
aParam->mLink->mPeerStatus = NS_ERROR_NOT_INITIALIZED;
aParam->CloseInternal(lock, NS_ERROR_NOT_INITIALIZED);
}
template <typename T> bool DataPipeRead(IPC::MessageReader* aReader, RefPtr<T>* aResult) {
nsresult rv = NS_OK; if (!ReadParam(aReader, &rv)) {
aReader->FatalError("failed to read DataPipe status"); returnfalse;
} if (NS_FAILED(rv)) {
*aResult = new T(rv);
MOZ_LOG(gDataPipeLog, LogLevel::Debug,
("IPC Read: [status=%s]", GetStaticErrorName(rv))); returntrue;
}
ScopedPort port; if (!ReadParam(aReader, &port)) {
aReader->FatalError("failed to read DataPipe port"); returnfalse;
}
SharedMemory::Handle shmemHandle; if (!ReadParam(aReader, &shmemHandle)) {
aReader->FatalError("failed to read DataPipe shmem"); returnfalse;
} // Due to the awkward shared memory API provided by SharedMemory, we need to // transfer ownership into the `shmem` here, then steal it back later in the // function. Bug 1797039 tracks potential changes to the RawShmem API which // could improve this situation.
RefPtr shmem = new SharedMemory(); if (!shmem->SetHandle(std::move(shmemHandle),
SharedMemory::RightsReadWrite)) {
aReader->FatalError("failed to create DataPipe shmem from handle"); returnfalse;
}
uint32_t capacity = 0;
nsresult peerStatus = NS_OK;
uint32_t offset = 0;
uint32_t available = 0; if (!ReadParam(aReader, &capacity) || !ReadParam(aReader, &peerStatus) ||
!ReadParam(aReader, &offset) || !ReadParam(aReader, &available)) {
aReader->FatalError("failed to read DataPipe fields"); returnfalse;
} if (!capacity || offset >= capacity || available > capacity) {
aReader->FatalError("received DataPipe state values are inconsistent"); returnfalse;
} if (!shmem->Map(SharedMemory::PageAlignedSize(capacity))) {
aReader->FatalError("failed to map DataPipe shared memory region"); returnfalse;
}
void DataPipeReceiver::SerializedComplexity(uint32_t aMaxSize,
uint32_t* aSizeUsed,
uint32_t* aPipes,
uint32_t* aTransferables) { // We report DataPipeReceiver as taking one transferrable to serialize, rather // than one pipe, as we aren't starting a new pipe for this purpose, and are // instead transferring an existing pipe.
*aTransferables = 1;
}
RefPtr<NodeController> controller = NodeController::GetSingleton(); if (!controller) { return NS_ERROR_ILLEGAL_DURING_SHUTDOWN;
}
// Allocate a pair of ports for messaging between the sender & receiver. auto [senderPort, receiverPort] = controller->CreatePortPair();
// Create and allocate the shared memory region. auto shmem = MakeRefPtr<SharedMemory>();
size_t alignedCapacity = SharedMemory::PageAlignedSize(aCapacity); if (!shmem->Create(alignedCapacity) || !shmem->Map(alignedCapacity)) { return NS_ERROR_OUT_OF_MEMORY;
}
// We'll first clone then take the handle from the region so that the sender & // receiver each have a handle. This avoids the need to duplicate the handle // when serializing, when errors are non-recoverable.
SharedMemory::Handle senderShmemHandle = shmem->CloneHandle();
SharedMemory::Handle receiverShmemHandle = shmem->TakeHandle(); if (!senderShmemHandle || !receiverShmemHandle) { return NS_ERROR_OUT_OF_MEMORY;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.