/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
/** * The multiplex stream concatenates a list of input streams into a single * stream.
*/
class nsMultiplexInputStream final : public nsIMultiplexInputStream, public nsISeekableStream, public nsIIPCSerializableInputStream, public nsICloneableInputStream, public nsIAsyncInputStream, public nsIInputStreamCallback, public nsIInputStreamLength, public nsIAsyncInputStreamLength { public:
nsMultiplexInputStream();
// This is used for nsIAsyncInputStream::AsyncWait void AsyncWaitCompleted();
// This is used for nsIAsyncInputStreamLength::AsyncLengthWait void AsyncWaitCompleted(int64_t aLength, const MutexAutoLock& aProofOfLock)
MOZ_REQUIRES(mLock);
// These are Atomics so that we can check them in QueryInterface without // taking a lock (to look at mStreams.Length() and the numbers above) // With no streams added yet, all of these are possible
Atomic<bool, Relaxed> mIsSeekableStream{true};
Atomic<bool, Relaxed> mIsIPCSerializableStream{true};
Atomic<bool, Relaxed> mIsCloneableStream{true};
static nsresult AvailableMaybeSeek(nsMultiplexInputStream::StreamData& aStream,
uint64_t* aResult) {
nsresult rv = aStream.mBufferedStream->Available(aResult); if (rv == NS_BASE_STREAM_CLOSED) { // Blindly seek to the current position if Available() returns // NS_BASE_STREAM_CLOSED. // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag, // Seek() could reopen the file if REOPEN_ON_REWIND flag is set. if (aStream.mSeekableStream) {
nsresult rvSeek =
aStream.mSeekableStream->Seek(nsISeekableStream::NS_SEEK_CUR, 0); if (NS_SUCCEEDED(rvSeek)) {
rv = aStream.mBufferedStream->Available(aResult);
}
}
} return rv;
}
// Let's take a copy of the streams becuase, calling close() it could trigger // a nsIInputStreamCallback immediately and we don't want to create a deadlock // with mutex.
{
MutexAutoLock lock(mLock);
uint32_t len = mStreams.Length(); for (uint32_t i = 0; i < len; ++i) { if (NS_WARN_IF(
!streams.AppendElement(mStreams[i].mBufferedStream, fallible))) {
mStatus = NS_BASE_STREAM_CLOSED; return NS_ERROR_OUT_OF_MEMORY;
}
}
mStatus = NS_BASE_STREAM_CLOSED;
}
nsresult rv = NS_OK;
uint32_t len = streams.Length(); for (uint32_t i = 0; i < len; ++i) {
nsresult rv2 = streams[i]->Close(); // We still want to close all streams, but we should return an error if (NS_FAILED(rv2)) {
rv = rv2;
}
}
uint32_t len = mStreams.Length(); for (uint32_t i = mCurrentStream; i < len; i++) {
uint64_t streamAvail;
rv = AvailableMaybeSeek(mStreams[i], &streamAvail); if (rv == NS_BASE_STREAM_CLOSED) { // If a stream is closed, we continue with the next one. // If this is the current stream we move to the following stream. if (mCurrentStream == i) {
NextStream();
}
// If this is the last stream, we want to return this error code. continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
mStatus = rv; return mStatus;
}
// If the current stream is async, we have to return what we have so far // without processing the following streams. This is needed because // ::Available should return only what is currently available. In case of an // nsIAsyncInputStream, we have to call AsyncWait() in order to read more. if (mStreams[i].mAsyncStream) {
avail += streamAvail; break;
}
if (streamAvail == 0) { // Nothing to read for this stream. Let's move to the next one. continue;
}
avail += streamAvail;
}
// We still have something to read. We don't want to return an error code yet. if (avail) {
*aResult = avail; return NS_OK;
}
// Let's propagate the last error message.
mStatus = rv; return rv;
}
NS_IMETHODIMP
nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
MutexAutoLock lock(mLock); // It is tempting to implement this method in terms of ReadSegments, but // that would prevent this class from being used with streams that only // implement Read (e.g., file streams).
*aResult = 0;
if (mStatus == NS_BASE_STREAM_CLOSED) { return NS_OK;
} if (NS_FAILED(mStatus)) { return mStatus;
}
nsresult rv = NS_OK;
uint32_t len = mStreams.Length(); while (mCurrentStream < len && aCount) {
uint32_t read;
rv = mStreams[mCurrentStream].mBufferedStream->Read(aBuf, aCount, &read);
// XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF. // (This is a bug in those stream implementations) if (rv == NS_BASE_STREAM_CLOSED) {
MOZ_ASSERT_UNREACHABLE( "Input stream's Read method returned " "NS_BASE_STREAM_CLOSED");
rv = NS_OK;
read = 0;
} elseif (NS_FAILED(rv)) { break;
}
if (read == 0) {
NextStream();
} else {
NS_ASSERTION(aCount >= read, "Read more than requested");
*aResult += read;
aCount -= read;
aBuf += read;
mStartedReadingCurrent = true;
uint32_t len = mStreams.Length(); while (mCurrentStream < len && aCount) {
uint32_t read;
rv = mStreams[mCurrentStream].mBufferedStream->ReadSegments(
ReadSegCb, &state, aCount, &read);
// XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF. // (This is a bug in those stream implementations) if (rv == NS_BASE_STREAM_CLOSED) {
MOZ_ASSERT_UNREACHABLE( "Input stream's Read method returned " "NS_BASE_STREAM_CLOSED");
rv = NS_OK;
read = 0;
}
// if |aWriter| decided to stop reading segments... if (state.mDone || NS_FAILED(rv)) { break;
}
// if stream is empty, then advance to the next stream. if (read == 0) {
NextStream();
} else {
NS_ASSERTION(aCount >= read, "Read more than requested");
state.mOffset += read;
aCount -= read;
mStartedReadingCurrent = true;
mStreams[mCurrentStream].mCurrentPos += read;
}
}
// if we successfully read some data, then this call succeeded.
*aResult = state.mOffset; return state.mOffset ? NS_OK : rv;
}
uint32_t len = mStreams.Length(); if (len == 0) { // Claim to be non-blocking, since we won't block the caller.
*aNonBlocking = true; return NS_OK;
}
for (uint32_t i = 0; i < len; ++i) {
nsresult rv = mStreams[i].mBufferedStream->IsNonBlocking(aNonBlocking); if (NS_WARN_IF(NS_FAILED(rv))) { return rv;
} // If one is blocking the entire stream becomes blocking. if (!*aNonBlocking) { return NS_OK;
}
}
if (aWhence == NS_SEEK_SET) {
int64_t remaining = aOffset; if (aOffset == 0) {
mCurrentStream = 0;
} for (uint32_t i = 0; i < mStreams.Length(); ++i) {
nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream; if (!stream) { return NS_ERROR_FAILURE;
}
// See if all remaining streams should be rewound if (remaining == 0) { if (i < oldCurrentStream ||
(i == oldCurrentStream && oldStartedReadingCurrent)) {
rv = stream->Seek(NS_SEEK_SET, 0); if (NS_WARN_IF(NS_FAILED(rv))) { return rv;
}
// Get position in the current stream
int64_t streamPos; if (i > oldCurrentStream ||
(i == oldCurrentStream && !oldStartedReadingCurrent)) {
streamPos = 0;
} else {
streamPos = mStreams[i].mCurrentPos;
}
// See if we need to seek the current stream forward or backward if (remaining < streamPos) {
rv = stream->Seek(NS_SEEK_SET, remaining); if (NS_WARN_IF(NS_FAILED(rv))) { return rv;
}
remaining = 0;
} elseif (remaining > streamPos) { if (i < oldCurrentStream) { // We're already at end so no need to seek this stream
remaining -= streamPos;
NS_ASSERTION(remaining >= 0, "Remaining invalid");
} else {
uint64_t avail;
rv = AvailableMaybeSeek(mStreams[i], &avail); if (NS_WARN_IF(NS_FAILED(rv))) { return rv;
}
// The seek(END) can be completed in the current stream. if (streamLength >= DeprecatedAbs(remaining)) {
rv = stream->Seek(NS_SEEK_END, remaining); if (NS_WARN_IF(NS_FAILED(rv))) { return rv;
}
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
ret64 += mStreams[i].mCurrentPos;
#ifdef DEBUG // When we see 1 stream with currentPos = 0, all the remaining streams must // be set to 0 as well.
MOZ_ASSERT_IF(zeroFound, mStreams[i].mCurrentPos == 0); if (mStreams[i].mCurrentPos == 0) {
zeroFound = true;
} #endif
}
*aResult = ret64;
// This class is used to inform nsMultiplexInputStream that it's time to execute // the asyncWait callback. class AsyncWaitRunnable final : public DiscardableRunnable {
RefPtr<nsMultiplexInputStream> mStream;
// Let's take the first async stream if we are not already closed, and if // it has data to read or if it async. if (mStatus != NS_BASE_STREAM_CLOSED) { for (; mCurrentStream < mStreams.Length(); NextStream()) {
stream = mStreams[mCurrentStream].mAsyncStream; if (stream) { break;
}
uint64_t avail = 0;
nsresult rv = AvailableMaybeSeek(mStreams[mCurrentStream], &avail); if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) { // Nothing to read here. Let's move on. continue;
}
// If we are here it's because we are already closed, or if the current stream // is not async. In both case we have to execute the callback. if (!stream) { if (asyncWaitCallback) {
AsyncWaitRunnable::Create(this, asyncWaitEventTarget);
} return NS_OK;
}
// When OnInputStreamReady is called, we could be in 2 scenarios: // a. there is something to read; // b. the stream is closed. // But if the stream is closed and it was not the last one, we must proceed // with the following stream in order to have something to read by the callee.
{
MutexAutoLock lock(mLock);
// The callback has been nullified in the meantime. if (!mAsyncWaitCallback) { return NS_OK;
}
if (NS_SUCCEEDED(mStatus)) {
uint64_t avail = 0;
nsresult rv = NS_OK; // Only check `Available()` if `aStream` is actually the current stream, // otherwise we'll always want to re-poll, as we got the callback for the // wrong stream. if (mCurrentStream < mStreams.Length() &&
aStream == mStreams[mCurrentStream].mAsyncStream) {
rv = aStream->Available(&avail);
} if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) { // This stream is either closed, has no data available, or is not the // current stream. If it is closed and current, move to the next stream, // otherwise re-wait on the current stream until it has data available // or becomes closed. // Unlike streams not implementing nsIAsyncInputStream, async streams // cannot use `Available() == 0` to indicate EOF, so we re-poll in that // situation. if (NS_FAILED(rv)) {
NextStream();
}
// Unlock and invoke AsyncWaitInternal to wait again. If this succeeds, // we'll be called again, otherwise fall through and notify.
MutexAutoUnlock unlock(mLock); if (NS_SUCCEEDED(AsyncWaitInternal())) { return NS_OK;
}
}
}
// If the combination of all streams when serialized independently is // sufficiently complex, we may choose to serialize it as a pipe to limit the // complexity of the payload. if (totalTransferables.value() == 0) { // If there are no transferables within our serialization, and it would // contain at least one pipe, serialize the entire payload as a pipe for // simplicity.
*aSerializeAsPipe = totalSizeUsed.value() > 0 && totalPipes.value() > 0;
} else { // Otherwise, we may want to still serialize in segments to take advantage // of the efficiency of serializing transferables. We'll only serialize as a // pipe if the total attachment count exceeds kMaxAttachmentThreshold. static constexpr uint32_t kMaxAttachmentThreshold = 8;
CheckedUint32 totalAttachments = totalPipes + totalTransferables;
*aSerializeAsPipe = !totalAttachments.isValid() ||
totalAttachments.value() > kMaxAttachmentThreshold;
}
if (*aSerializeAsPipe) {
NS_WARNING(
nsPrintfCString("Choosing to serialize multiplex stream as a pipe " "(would be %u bytes, %u pipes, %u transferables)",
totalSizeUsed.value(), totalPipes.value(),
totalTransferables.value())
.get());
*aSizeUsed = 0;
*aPipes = 1;
*aTransferables = 0;
} else {
*aSizeUsed = totalSizeUsed.value();
*aPipes = totalPipes.value();
*aTransferables = totalTransferables.value();
}
}
bool nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams) { if (aParams.type() != InputStreamParams::TMultiplexInputStreamParams) {
NS_ERROR("Received unknown parameters from the other process!"); returnfalse;
}
NS_IMETHODIMP
nsMultiplexInputStream::GetCloneable(bool* aCloneable) {
MutexAutoLock lock(mLock); // XXXnsm Cloning a multiplex stream which has started reading is not // permitted right now. if (mCurrentStream > 0 || mStartedReadingCurrent) {
*aCloneable = false; return NS_OK;
}
uint32_t len = mStreams.Length(); for (uint32_t i = 0; i < len; ++i) {
nsCOMPtr<nsICloneableInputStream> cis =
do_QueryInterface(mStreams[i].mBufferedStream); if (!cis || !cis->GetCloneable()) {
*aCloneable = false; return NS_OK;
}
}
// XXXnsm Cloning a multiplex stream which has started reading is not // permitted right now. if (mCurrentStream > 0 || mStartedReadingCurrent) { return NS_ERROR_FAILURE;
}
RefPtr<nsMultiplexInputStream> clone = new nsMultiplexInputStream();
nsresult rv;
uint32_t len = mStreams.Length(); for (uint32_t i = 0; i < len; ++i) {
nsCOMPtr<nsICloneableInputStream> substream =
do_QueryInterface(mStreams[i].mBufferedStream); if (NS_WARN_IF(!substream)) { return NS_ERROR_FAILURE;
}
if (mCurrentStream > 0 || mStartedReadingCurrent) { return NS_ERROR_NOT_AVAILABLE;
}
CheckedInt64 length = 0;
nsresult retval = NS_OK;
for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
nsCOMPtr<nsIInputStreamLength> substream =
do_QueryInterface(mStreams[i].mBufferedStream); if (!substream) { // Let's use available as fallback.
uint64_t streamAvail = 0;
nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail); if (rv == NS_BASE_STREAM_CLOSED) { continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
mStatus = rv; return mStatus;
}
length += streamAvail; if (!length.isValid()) { return NS_ERROR_OUT_OF_MEMORY;
}
// If one stream blocks, we all block. if (rv != NS_BASE_STREAM_WOULD_BLOCK && NS_WARN_IF(NS_FAILED(rv))) { return rv;
}
// We want to return WOULD_BLOCK if there is 1 stream that blocks. But want // to see if there are other streams with length = -1. if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
retval = NS_BASE_STREAM_WOULD_BLOCK; continue;
}
// If one of the stream doesn't know the size, we all don't know the size. if (size == -1) {
*aLength = -1; return NS_OK;
}
length += size; if (!length.isValid()) { return NS_ERROR_OUT_OF_MEMORY;
}
}
*aLength = length.value(); return retval;
}
class nsMultiplexInputStream::AsyncWaitLengthHelper final
: public nsIInputStreamLengthCallback { public:
NS_DECL_THREADSAFE_ISUPPORTS
if (mCurrentStream > 0 || mStartedReadingCurrent) { return NS_ERROR_NOT_AVAILABLE;
}
if (!aCallback) {
mAsyncWaitLengthCallback = nullptr; return NS_OK;
}
// We have a pending operation! Let's use this instead of creating a new one. if (mAsyncWaitLengthHelper) {
mAsyncWaitLengthCallback = aCallback; return NS_OK;
}
RefPtr<AsyncWaitLengthHelper> helper = new AsyncWaitLengthHelper();
for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
nsCOMPtr<nsIAsyncInputStreamLength> asyncStream =
do_QueryInterface(mStreams[i].mBufferedStream); if (asyncStream) { if (NS_WARN_IF(!helper->AddStream(asyncStream))) { return NS_ERROR_OUT_OF_MEMORY;
} continue;
}
nsCOMPtr<nsIInputStreamLength> stream =
do_QueryInterface(mStreams[i].mBufferedStream); if (!stream) { // Let's use available as fallback.
uint64_t streamAvail = 0;
nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail); if (rv == NS_BASE_STREAM_CLOSED) { continue;
}
if (NS_WARN_IF(NS_FAILED(rv))) {
mStatus = rv; return mStatus;
}
if (NS_WARN_IF(!helper->AddSize(streamAvail))) { return NS_ERROR_OUT_OF_MEMORY;
}
bool nsMultiplexInputStream::IsAsyncInputStream() const { // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the // substream implements that interface. return mIsAsyncInputStream;
}
¤ Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.0.27Bemerkung:
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.