/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim:set ts=4 sts=2 sw=2 et cin: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// See if the pipe is closed by checking the return of Available.
uint64_t dummy64;
rv = mAsyncStream->Available(&dummy64); if (NS_FAILED(rv)) return rv;
uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
// no need to worry about multiple threads... an input stream pump lives // on only one thread at a time.
MOZ_ASSERT(mAsyncStream); if (!mWaitingForInputStreamReady && !mProcessingCallbacks) { // Ensure OnStateStop is called on the main thread only when this pump is // created on main thread. if (mState == STATE_STOP && !mOffMainThread) {
nsCOMPtr<nsISerialEventTarget> mainThread =
mLabeledMainThreadTarget
? mLabeledMainThreadTarget
: do_AddRef(mozilla::GetMainThreadSerialEventTarget()); if (mTargetThread != mainThread) {
mTargetThread = mainThread;
}
}
MOZ_ASSERT(mTargetThread);
nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread); if (NS_FAILED(rv)) {
NS_ERROR("AsyncWait failed"); return rv;
} // Any retargeting during STATE_START or START_TRANSFER is complete // after the call to AsyncWait; next callback will be on mTargetThread.
mRetargeting = false;
mWaitingForInputStreamReady = true;
} return NS_OK;
}
// although this class can only be accessed from one thread at a time, we do // allow its ownership to move from thread to thread, assuming the consumer // understands the limitations of this.
NS_IMPL_ADDREF(nsInputStreamPump)
NS_IMPL_RELEASE(nsInputStreamPump)
NS_INTERFACE_MAP_BEGIN(nsInputStreamPump)
NS_INTERFACE_MAP_ENTRY(nsIRequest)
NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableRequest)
NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
NS_INTERFACE_MAP_ENTRY(nsIInputStreamPump)
NS_INTERFACE_MAP_ENTRY_CONCRETE(nsInputStreamPump)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStreamPump)
NS_INTERFACE_MAP_END
if (NS_FAILED(mStatus)) {
LOG((" already canceled\n")); return NS_OK;
}
NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
mStatus = status;
// close input stream if (mAsyncStream) { // If mSuspendCount != 0, EnsureWaiting will be called by Resume(). // Note that while suspended, OnInputStreamReady will // not do anything, and also note that calling asyncWait // on a closed stream works and will dispatch an event immediately.
// There is a brief in-between state when we null out mAsyncStream in // OnStateStop() before calling OnStopRequest, and only afterwards set // STATE_DEAD, which we need to handle gracefully. if (--mSuspendCount == 0 && mAsyncStream) {
EnsureWaiting();
} return NS_OK;
}
// This ensures only one thread can interact with a pump at a time
NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
NS_ENSURE_ARG_POINTER(listener);
MOZ_ASSERT(NS_IsMainThread() || mOffMainThread, "nsInputStreamPump should be read from the " "main thread only.");
// mStreamOffset now holds the number of bytes currently read.
mStreamOffset = 0;
// grab event queue (we must do this here by contract, since all notifications // must go to the thread which called AsyncRead) if (NS_IsMainThread() && mLabeledMainThreadTarget) {
mTargetThread = mLabeledMainThreadTarget;
} else {
mTargetThread = mozilla::GetCurrentSerialEventTarget();
}
NS_ENSURE_STATE(mTargetThread);
rv = EnsureWaiting(); if (NS_FAILED(rv)) return rv;
if (mLoadGroup) mLoadGroup->AddRequest(this, nullptr);
// this function has been called from a PLEvent, so we can safely call // any listener or progress sink methods directly from here.
for (;;) { // There should only be one iteration of this loop happening at a time. // To prevent AsyncWait() (called during callbacks or on other threads) // from creating a parallel OnInputStreamReady(), we use: // -- a mutex; and // -- a boolean mProcessingCallbacks to detect parallel loops // when exiting the mutex for callbacks.
RecursiveMutexAutoLock lock(mMutex);
// Prevent parallel execution during callbacks, while out of mutex. if (mProcessingCallbacks) {
MOZ_ASSERT(!mProcessingCallbacks); break;
}
mProcessingCallbacks = true; if (mSuspendCount || mState == STATE_IDLE || mState == STATE_DEAD) {
mWaitingForInputStreamReady = false;
mProcessingCallbacks = false; break;
}
bool stillTransferring =
(mState == STATE_TRANSFER && nextState == STATE_TRANSFER); if (stillTransferring) {
NS_ASSERTION(NS_SUCCEEDED(mStatus), "Should not have failed status for ongoing transfer");
} else {
NS_ASSERTION(mState != nextState, "Only OnStateTransfer can be called more than once.");
} if (mRetargeting) {
NS_ASSERTION(mState != STATE_STOP, "Retargeting should not happen during OnStateStop.");
}
// Set mRetargeting so EnsureWaiting will be called. It ensures that // OnStateStop is called on the main thread. if (nextState == STATE_STOP && !NS_IsMainThread() && !mOffMainThread) {
mRetargeting = true;
}
// Unset mProcessingCallbacks here (while we have lock) so our own call to // EnsureWaiting isn't blocked by it.
mProcessingCallbacks = false;
// We must break the loop if suspended during one of the previous // operation. if (mSuspendCount) {
mState = nextState;
mWaitingForInputStreamReady = false; break;
}
// Wait asynchronously if there is still data to transfer, or we're // switching event delivery to another thread. if (stillTransferring || mRetargeting) {
mState = nextState;
mWaitingForInputStreamReady = false;
nsresult rv = EnsureWaiting(); if (NS_SUCCEEDED(rv)) break;
// Failure to start asynchronous wait: stop transfer. // Do not set mStatus if it was previously set to report a failure. if (NS_SUCCEEDED(mStatus)) {
mStatus = rv;
}
nextState = STATE_STOP;
}
// need to check the reason why the stream is ready. this is required // so our listener can check our status from OnStartRequest. // XXX async streams should have a GetStatus method! if (NS_SUCCEEDED(mStatus)) {
uint64_t avail;
rv = mAsyncStream->Available(&avail); if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED) mStatus = rv;
}
{
nsCOMPtr<nsIStreamListener> listener = mListener; // We're on the writing thread
AssertOnThread();
// Note: Must exit mutex for call to OnStartRequest to avoid // deadlocks when calls to RetargetDeliveryTo for multiple // nsInputStreamPumps are needed (e.g. nsHttpChannel).
RecursiveMutexAutoUnlock unlock(mMutex);
rv = listener->OnStartRequest(this);
}
// an error returned from OnStartRequest should cause us to abort; however, // we must not stomp on mStatus if already canceled. if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus)) mStatus = rv;
// if canceled, go directly to STATE_STOP... if (NS_FAILED(mStatus)) return STATE_STOP;
nsresult rv = CreateBufferedStreamIfNeeded(); if (NS_WARN_IF(NS_FAILED(rv))) { return STATE_STOP;
}
uint64_t avail;
rv = mAsyncStream->Available(&avail);
LOG((" Available returned [stream=%p rv=%" PRIx32 " avail=%" PRIu64 "]\n",
mAsyncStream.get(), static_cast<uint32_t>(rv), avail));
if (rv == NS_BASE_STREAM_CLOSED) {
rv = NS_OK;
avail = 0;
} elseif (NS_SUCCEEDED(rv) && avail) { // we used to limit avail to 16K - we were afraid some ODA handlers // might assume they wouldn't get more than 16K at once // we're removing that limit since it speeds up local file access. // Now there's an implicit 64K limit of 4 16K segments // NOTE: ok, so the story is as follows. OnDataAvailable impls // are by contract supposed to consume exactly |avail| bytes. // however, many do not... mailnews... stream converters... // cough, cough. the input stream pump is fairly tolerant // in this regard; however, if an ODA does not consume any // data from the stream, then we could potentially end up in // an infinite loop. we do our best here to try to catch // such an error. (see bug 189672)
// in most cases this QI will succeed (mAsyncStream is almost always // a nsPipeInputStream, which implements nsITellableStream::Tell).
int64_t offsetBefore;
nsCOMPtr<nsITellableStream> tellable = do_QueryInterface(mAsyncStream); if (tellable && NS_FAILED(tellable->Tell(&offsetBefore))) {
MOZ_ASSERT_UNREACHABLE("Tell failed on readable stream");
offsetBefore = 0;
}
{ // We may be called on non-MainThread even if mOffMainThread is // false, due to RetargetDeliveryTo(), so don't use AssertOnThread() if (mTargetThread) {
MOZ_ASSERT(mTargetThread->IsOnCurrentThread());
} else {
MOZ_ASSERT(NS_IsMainThread());
}
nsCOMPtr<nsIStreamListener> listener = mListener; // Note: Must exit mutex for call to OnStartRequest to avoid // deadlocks when calls to RetargetDeliveryTo for multiple // nsInputStreamPumps are needed (e.g. nsHttpChannel).
RecursiveMutexAutoUnlock unlock(mMutex); // We're on the writing thread for mListener and mAsyncStream. // mStreamOffset is only touched in OnStateTransfer, and AsyncRead // shouldn't be called during OnDataAvailable()
// don't enter this code if ODA failed or called Cancel if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) { // test to see if this ODA failed to consume data if (tellable) { // NOTE: if Tell fails, which can happen if the stream is // now closed, then we assume that everything was read.
int64_t offsetAfter; if (NS_FAILED(tellable->Tell(&offsetAfter))) {
offsetAfter = offsetBefore + odaAvail;
} if (offsetAfter > offsetBefore) {
mStreamOffset += (offsetAfter - offsetBefore);
} elseif (mSuspendCount == 0) { // // possible infinite loop if we continue pumping data! // // NOTE: although not allowed by nsIStreamListener, we // will allow the ODA impl to Suspend the pump. IMAP // does this :-( //
NS_ERROR("OnDataAvailable implementation consumed no data");
mStatus = NS_ERROR_UNEXPECTED;
}
} else {
mStreamOffset += odaAvail; // assume ODA behaved well
}
}
}
// an error returned from Available or OnDataAvailable should cause us to // abort; however, we must not stop on mStatus if already canceled.
if (NS_SUCCEEDED(mStatus)) { if (NS_FAILED(rv)) {
mStatus = rv;
} elseif (avail) { // if stream is now closed, advance to STATE_STOP right away. // Available may return 0 bytes available at the moment; that // would not mean that we are done. // XXX async streams should have a GetStatus method!
rv = mAsyncStream->Available(&avail); if (NS_SUCCEEDED(rv)) return STATE_TRANSFER; if (rv != NS_BASE_STREAM_CLOSED) mStatus = rv;
}
} return STATE_STOP;
}
if (!NS_IsMainThread() && !mOffMainThread) { // This method can be called on a different thread if nsInputStreamPump // is used off the main-thread. if (NS_SUCCEEDED(mStatus) && mListener &&
mozilla::StaticPrefs::network_send_OnDataFinished_nsInputStreamPump()) {
nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
do_QueryInterface(mListener); if (retargetableListener) {
retargetableListener->OnDataFinished(mStatus);
}
}
nsresult rv = mLabeledMainThreadTarget->Dispatch(
mozilla::NewRunnableMethod("nsInputStreamPump::CallOnStateStop", this,
&nsInputStreamPump::CallOnStateStop));
NS_ENSURE_SUCCESS(rv, STATE_DEAD); return STATE_DEAD;
}
// if an error occurred, we must be sure to pass the error onto the async // stream. in some cases, this is redundant, but since close is idempotent, // this is OK. otherwise, be sure to honor the "close-when-done" option.
if (!mAsyncStream || !mListener) {
MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?"); return STATE_DEAD;
}
if (NS_FAILED(mStatus)) {
mAsyncStream->CloseWithStatus(mStatus);
} elseif (mCloseWhenDone) {
mAsyncStream->Close();
}
mAsyncStream = nullptr;
mIsPending = false;
{ // We're on the writing thread. // We believe that mStatus can't be changed on us here.
AssertOnThread();
nsCOMPtr<nsIStreamListener> listener = mListener;
nsresult status = mStatus; // Note: Must exit mutex for call to OnStartRequest to avoid // deadlocks when calls to RetargetDeliveryTo for multiple // nsInputStreamPumps are needed (e.g. nsHttpChannel).
RecursiveMutexAutoUnlock unlock(mMutex);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.