/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// This is a nsICancelableRunnable because we can dispatch it to Workers and // those can be shut down at any time, and in these cases, Cancel() is called // instead of Run(). class nsInputStreamReadyEvent final : public CancelableRunnable, public nsIInputStreamCallback, public nsIRunnablePriority { public:
NS_DECL_ISUPPORTS_INHERITED
private:
~nsInputStreamReadyEvent() { if (!mCallback) { return;
} // // whoa!! looks like we never posted this event. take care to // release mCallback on the correct thread. if mTarget lives on the // calling thread, then we are ok. otherwise, we have to try to // proxy the Release over the right thread. if that thread is dead, // then there's nothing we can do... better to leak than crash. // bool val;
nsresult rv = mTarget->IsOnCurrentThread(&val); if (NS_FAILED(rv) || !val) {
nsCOMPtr<nsIInputStreamCallback> event = NS_NewInputStreamReadyEvent( "~nsInputStreamReadyEvent", mCallback, mTarget, mPriority);
mCallback = nullptr; if (event) {
rv = event->OnInputStreamReady(nullptr); if (NS_FAILED(rv)) {
MOZ_ASSERT_UNREACHABLE("leaking stream event");
nsISupports* sup = event;
NS_ADDREF(sup);
}
}
}
}
// This is a nsICancelableRunnable because we can dispatch it to Workers and // those can be shut down at any time, and in these cases, Cancel() is called // instead of Run(). class nsOutputStreamReadyEvent final : public CancelableRunnable, public nsIOutputStreamCallback { public:
NS_DECL_ISUPPORTS_INHERITED
private:
~nsOutputStreamReadyEvent() { if (!mCallback) { return;
} // // whoa!! looks like we never posted this event. take care to // release mCallback on the correct thread. if mTarget lives on the // calling thread, then we are ok. otherwise, we have to try to // proxy the Release over the right thread. if that thread is dead, // then there's nothing we can do... better to leak than crash. // bool val;
nsresult rv = mTarget->IsOnCurrentThread(&val); if (NS_FAILED(rv) || !val) {
nsCOMPtr<nsIOutputStreamCallback> event =
NS_NewOutputStreamReadyEvent(mCallback, mTarget);
mCallback = nullptr; if (event) {
rv = event->OnOutputStreamReady(nullptr); if (NS_FAILED(rv)) {
MOZ_ASSERT_UNREACHABLE("leaking stream event");
nsISupports* sup = event;
NS_ADDREF(sup);
}
}
}
}
// abstract stream copier... class nsAStreamCopier : public nsIInputStreamCallback, public nsIOutputStreamCallback, public CancelableRunnable { public:
NS_DECL_ISUPPORTS_INHERITED
// implemented by subclasses, returns number of bytes copied and // sets source and sink condition before returning. virtual uint32_t DoCopy(nsresult* aSourceCondition,
nsresult* aSinkCondition) = 0;
void Process() { if (!mSource || !mSink) { return;
}
// If the copy was canceled before Process() was even called, then // sourceCondition and sinkCondition should be set to error results to // ensure we don't call Finish() on a canceled nsISafeOutputStream.
MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
nsresult sourceCondition = cancelStatus;
nsresult sinkCondition = cancelStatus;
// Copy data from the source to the sink until we hit failure or have // copied all the data. for (;;) { // Note: copyFailed will be true if the source or the sink have // reported an error, or if we failed to write any bytes // because we have consumed all of our data. bool copyFailed = false; if (!canceled) {
uint32_t n = DoCopy(&sourceCondition, &sinkCondition); if (n > 0 && mProgressCallback) {
mProgressCallback(mClosure, n);
}
copyFailed =
NS_FAILED(sourceCondition) || NS_FAILED(sinkCondition) || n == 0;
MutexAutoLock lock(mLock);
canceled = mCanceled;
cancelStatus = mCancelStatus;
} if (copyFailed && !canceled) { if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) { // need to wait for more data from source. while waiting for // more source data, be sure to observe failures on output end.
mAsyncSource->AsyncWait(this, 0, 0, nullptr);
if (mAsyncSink) {
mAsyncSink->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
0, nullptr);
} break;
} if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) { // need to wait for more room in the sink. while waiting for // more room in the sink, be sure to observer failures on the // input end.
mAsyncSink->AsyncWait(this, 0, 0, nullptr);
if (mAsyncSource) {
mAsyncSource->AsyncWait( this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0, nullptr);
} break;
}
} if (copyFailed || canceled) { if (mAsyncSource) { // cancel any previously-registered AsyncWait callbacks to avoid leaks
mAsyncSource->AsyncWait(nullptr, 0, 0, nullptr);
} if (mCloseSource) { // close source if (mAsyncSource) {
mAsyncSource->CloseWithStatus(canceled ? cancelStatus
: sinkCondition);
} else {
mSource->Close();
}
}
mAsyncSource = nullptr;
mSource = nullptr;
if (mAsyncSink) { // cancel any previously-registered AsyncWait callbacks to avoid leaks
mAsyncSink->AsyncWait(nullptr, 0, 0, nullptr);
} if (mCloseSink) { // close sink if (mAsyncSink) {
mAsyncSink->CloseWithStatus(canceled ? cancelStatus
: sourceCondition);
} else { // If we have an nsISafeOutputStream, and our // sourceCondition and sinkCondition are not set to a // failure state, finish writing.
nsCOMPtr<nsISafeOutputStream> sostream = do_QueryInterface(mSink); if (sostream && NS_SUCCEEDED(sourceCondition) &&
NS_SUCCEEDED(sinkCondition)) {
sostream->Finish();
} else {
mSink->Close();
}
}
}
mAsyncSink = nullptr;
mSink = nullptr;
// notify state complete... if (mCallback) {
nsresult status; if (!canceled) {
status = sourceCondition; if (NS_SUCCEEDED(status)) {
status = sinkCondition;
} if (status == NS_BASE_STREAM_CLOSED) {
status = NS_OK;
}
} else {
status = cancelStatus;
}
mCallback(mClosure, status);
} break;
}
}
}
// clear "in process" flag and post any pending continuation event
MutexAutoLock lock(mLock);
mEventInProcess = false; if (mEventIsPending) {
mEventIsPending = false;
PostContinuationEvent_Locked();
}
return NS_OK;
}
nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
nsresult PostContinuationEvent() { // we cannot post a continuation event if there is currently // an event in process. doing so could result in Process being // run simultaneously on multiple threads, so we mark the event // as pending, and if an event is already in process then we // just let that existing event take care of posting the real // continuation event.
class nsStreamCopierIB final : public nsAStreamCopier { public:
nsStreamCopierIB() = default; virtual ~nsStreamCopierIB() = default;
struct MOZ_STACK_CLASS ReadSegmentsState { // the nsIOutputStream will outlive the ReadSegmentsState on the stack
nsIOutputStream* MOZ_NON_OWNING_REF mSink;
nsresult mSinkCondition;
};
class nsStreamCopierOB final : public nsAStreamCopier { public:
nsStreamCopierOB() = default; virtual ~nsStreamCopierOB() = default;
struct MOZ_STACK_CLASS WriteSegmentsState { // the nsIInputStream will outlive the WriteSegmentsState on the stack
nsIInputStream* MOZ_NON_OWNING_REF mSource;
nsresult mSourceCondition;
};
nsresult rv =
aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes); if (NS_FAILED(rv)) {
*aNewBytes = 0;
} // NOTE: we rely on the fact that the new slots are NOT initialized by // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct() // in nsTArray.h:
aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
// Attempt to perform the clone directly on the source stream
nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource); if (cloneable && cloneable->GetCloneable()) { if (aReplacementOut) {
*aReplacementOut = nullptr;
} return cloneable->Clone(aCloneOut);
}
// If we failed the clone and the caller does not want to replace their // original stream, then we are done. Return error. if (!aReplacementOut) { return NS_ERROR_FAILURE;
}
// The caller has opted-in to the fallback clone support that replaces // the original stream. Copy the data to a pipe and return two cloned // input streams.
NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), 0,
0, // default segment size and max size true, true); // non-blocking
// Propagate length information provided by nsIInputStreamLength. We don't use // InputStreamLengthHelper::GetSyncLength to avoid the risk of blocking when // called off-main-thread.
int64_t length = -1; if (nsCOMPtr<nsIInputStreamLength> streamLength = do_QueryInterface(aSource);
streamLength && NS_SUCCEEDED(streamLength->Length(&length)) &&
length != -1) {
reader = new mozilla::InputStreamLengthWrapper(reader.forget(), length);
}
if (nonBlocking && asyncStream) { // This stream is perfect!
asyncStream.forget(aAsyncInputStream); return NS_OK;
}
if (nonBlocking) { // If the stream is non-blocking but not async, we wrap it. return NonBlockingAsyncInputStream::Create(source.forget(),
aAsyncInputStream);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.