/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
nsresult OutputStreamHolder::Init(JSContext* aCx) { if (NS_IsMainThread()) { return NS_OK;
}
// We're in a worker
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
MOZ_ASSERT(workerPrivate);
workerPrivate->AssertIsOnWorkerThread();
// Note, this will create a ref-cycle between the holder and the stream. // The cycle is broken when the stream is closed or the worker begins // shutting down.
mWorkerRef =
StrongWorkerRef::Create(workerPrivate, "OutputStreamHolder",
[self = RefPtr{this}]() { self->Shutdown(); }); if (NS_WARN_IF(!mWorkerRef)) { return NS_ERROR_FAILURE;
} return NS_OK;
}
void OutputStreamHolder::Shutdown() { if (mOutput) {
mOutput->Close();
} // If we have an AsyncWait running, we'll get a callback and clear // the mAsyncWaitWorkerRef
mWorkerRef = nullptr;
}
nsresult OutputStreamHolder::AsyncWait(uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aEventTarget) {
mAsyncWaitWorkerRef = mWorkerRef; // Grab the strong reference for the reader but only when we are waiting for // the output stream, because it means we still have things to write. // (WAIT_CLOSURE_ONLY happens when waiting for ReadableStream to respond, at // which point the pull callback should get an indirect strong reference via // the controller argument.)
mAsyncWaitReader =
aFlags == nsIAsyncOutputStream::WAIT_CLOSURE_ONLY ? nullptr : mReader;
nsresult rv = mOutput->AsyncWait(this, aFlags, aRequestedCount, aEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) {
mAsyncWaitWorkerRef = nullptr;
mAsyncWaitReader = nullptr;
} return rv;
}
NS_IMETHODIMP OutputStreamHolder::OnOutputStreamReady(
nsIAsyncOutputStream* aStream) { // We may get called back after ::Shutdown() if (!mReader) {
mAsyncWaitWorkerRef = nullptr;
MOZ_ASSERT(!mAsyncWaitReader); return NS_OK;
}
// mAsyncWaitReader may be reset during OnOutputStreamReady, make sure to let // it live during the call
RefPtr<FetchStreamReader> reader = mReader.get(); if (!reader->OnOutputStreamReady()) {
mAsyncWaitWorkerRef = nullptr;
mAsyncWaitReader = nullptr; return NS_OK;
} return NS_OK;
}
// If a context is provided, an attempt will be made to cancel the reader. The // only situation where we don't expect to have a context is when closure is // being triggered from the destructor or the WorkerRef is notifying. If // we're at the destructor, it's far too late to cancel anything. And if the // WorkerRef is being notified, the global is going away, so there's also // no need to do further JS work. void FetchStreamReader::CloseAndRelease(JSContext* aCx, nsresult aStatus) {
NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
if (mStreamClosed) { // Already closed. return;
}
RefPtr<FetchStreamReader> kungFuDeathGrip = this; if (aCx && mReader) {
ErrorResult rv; if (aStatus == NS_ERROR_DOM_WRONG_TYPE_ERR) {
rv.ThrowTypeError<MSG_FETCH_BODY_WRONG_TYPE>();
} else {
rv = aStatus;
}
JS::Rooted<JS::Value> errorValue(aCx); if (ToJSValue(aCx, std::move(rv), &errorValue)) {
IgnoredErrorResult ignoredError; // It's currently safe to cancel an already closed reader because, per the // comments in ReadableStream::cancel() conveying the spec, step 2 of // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is // "closed", return a new promise resolved with undefined.
RefPtr<Promise> cancelResultPromise =
MOZ_KnownLive(mReader)->Cancel(aCx, errorValue, ignoredError);
NS_WARNING_ASSERTION(!ignoredError.Failed(), "Failed to cancel stream during close and release"); if (cancelResultPromise) { bool setHandled = cancelResultPromise->SetAnyPromiseIsHandled();
NS_WARNING_ASSERTION(setHandled, "Failed to mark cancel promise as handled.");
(void)setHandled;
}
}
// We don't want to propagate exceptions during the cleanup.
JS_ClearPendingException(aCx);
}
mStreamClosed = true;
mGlobal = nullptr;
if (mOutput) {
mOutput->CloseWithStatus(aStatus);
mOutput->Shutdown();
mOutput = nullptr;
}
mReader = nullptr;
mBuffer.Clear();
}
// https://fetch.spec.whatwg.org/#body-incrementally-read void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream,
ErrorResult& aRv) {
MOZ_DIAGNOSTIC_ASSERT(!mReader);
MOZ_DIAGNOSTIC_ASSERT(aStream);
MOZ_ASSERT(!aStream->MaybeGetInputStreamIfUnread(), "FetchStreamReader is for JS streams but we got a stream based on " "nsIInputStream here. Extract nsIInputStream and read it instead " "to reduce overhead.");
aRv = mOutput->Init(aCx); if (aRv.Failed()) {
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); return;
}
// Step 2: Let reader be the result of getting a reader for body’s stream.
RefPtr<ReadableStreamDefaultReader> reader = aStream->GetReader(aRv); if (aRv.Failed()) {
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); return;
}
mReader = reader;
aRv = mOutput->AsyncWait(0, 0, mOwningEventTarget); if (NS_WARN_IF(aRv.Failed())) {
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
}
}
struct FetchReadRequest : public ReadRequest { public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(FetchReadRequest, ReadRequest)
if (!mBuffer.IsEmpty()) {
nsresult rv = WriteBuffer(); if (NS_WARN_IF(NS_FAILED(rv))) {
CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR); returnfalse;
} returntrue;
}
// Check if the output stream has already been closed. This lets us propagate // errors eagerly, and detect output stream closures even when we have no data // to write. if (NS_WARN_IF(NS_FAILED(mOutput->StreamStatus()))) {
CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR); returnfalse;
}
// We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we // notice if the reader closes.
nsresult rv = mOutput->AsyncWait(nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0,
mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) {
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); returnfalse;
}
// If we already have an outstanding read request, don't start another one // concurrently. if (!mHasOutstandingReadRequest) { // https://fetch.spec.whatwg.org/#incrementally-read-loop // The below very loosely tries to implement the incrementally-read-loop // from the fetch spec. // Step 2: Read a chunk from reader given readRequest.
RefPtr<ReadRequest> readRequest = new FetchReadRequest(this);
RefPtr<ReadableStreamDefaultReader> reader = mReader;
mHasOutstandingReadRequest = true;
IgnoredErrorResult err;
reader->ReadChunk(aCx, *readRequest, err); if (NS_WARN_IF(err.Failed())) { // Let's close the stream.
mHasOutstandingReadRequest = false;
CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR); // Don't return false, as we've already called `AsyncWait`.
}
} returntrue;
}
// Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to // this step: run processBodyError given a TypeError.
RootedSpiderMonkeyInterface<Uint8Array> chunk(aCx); if (!aChunk.isObject() || !chunk.Init(&aChunk.toObject())) {
CloseAndRelease(aCx, NS_ERROR_DOM_WRONG_TYPE_ERR); return;
}
MOZ_DIAGNOSTIC_ASSERT(mBuffer.IsEmpty());
// Let's take a copy of the data. // FIXME: We could sometimes avoid this copy by trying to write `chunk` // directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't // enough space in the pipe's buffer. if (!chunk.AppendDataTo(mBuffer)) {
CloseAndRelease(aCx, NS_ERROR_OUT_OF_MEMORY); return;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.