/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "Http3WebTransportStream.h"
#include "HttpLog.h"
#include "Http3Session.h"
#include "Http3WebTransportSession.h"
#include "mozilla/TimeStamp.h"
#include "nsHttpHandler.h"
#include "nsIOService.h"
#include "nsIPipe.h"
#include "nsSocketTransportService2.h"
#include "nsIWebTransportStream.h"
namespace mozilla::net {
namespace {
// This is an nsAHttpTransaction that does nothing.
class DummyWebTransportStreamTransaction : public nsAHttpTransaction {
public :
NS_DECL_THREADSAFE_ISUPPORTS
DummyWebTransportStreamTransaction() = default ;
void SetConnection(nsAHttpConnection*) override {}
nsAHttpConnection* Connection() override { return nullptr; }
void GetSecurityCallbacks(nsIInterfaceRequestor**) override {}
void OnTransportStatus(nsITransport* transport, nsresult status,
int64_t progress) override {}
bool IsDone() override { return false ; }
nsresult Status() override { return NS_OK; }
uint32_t Caps() override { return 0; }
[[nodiscard]] nsresult ReadSegments(nsAHttpSegmentReader*, uint32_t,
uint32_t*) override {
return NS_OK;
}
[[nodiscard]] nsresult WriteSegments(nsAHttpSegmentWriter*, uint32_t,
uint32_t*) override {
return NS_OK;
}
void Close(nsresult reason) override {}
nsHttpConnectionInfo* ConnectionInfo() override { return nullptr; }
void SetProxyConnectFailed() override {}
nsHttpRequestHead* RequestHead() override { return nullptr; }
uint32_t Http1xTransactionCount() override { return 0; }
[[nodiscard]] nsresult TakeSubTransactions(
nsTArray<RefPtr<nsAHttpTransaction>>& outTransactions) override {
return NS_OK;
}
private :
virtual ~DummyWebTransportStreamTransaction() = default ;
};
NS_IMPL_ISUPPORTS(DummyWebTransportStreamTransaction, nsISupportsWeakReference)
class WebTransportSendStreamStats : public nsIWebTransportSendStreamStats {
public :
NS_DECL_THREADSAFE_ISUPPORTS
explicit WebTransportSendStreamStats(uint64_t aSent, uint64_t aAcked)
: mTimeStamp(TimeStamp::Now()),
mTotalSent(aSent),
mTotalAcknowledged(aAcked) {}
NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
*aTimestamp = mTimeStamp;
return NS_OK;
}
NS_IMETHOD GetBytesSent(uint64_t* aBytesSent) override {
*aBytesSent = mTotalSent;
return NS_OK;
}
NS_IMETHOD GetBytesAcknowledged(uint64_t* aBytesAcknowledged) override {
*aBytesAcknowledged = mTotalAcknowledged;
return NS_OK;
}
private :
virtual ~WebTransportSendStreamStats() = default ;
TimeStamp mTimeStamp;
uint64_t mTotalSent;
uint64_t mTotalAcknowledged;
};
NS_IMPL_ISUPPORTS(WebTransportSendStreamStats, nsIWebTransportSendStreamStats)
class WebTransportReceiveStreamStats
: public nsIWebTransportReceiveStreamStats {
public :
NS_DECL_THREADSAFE_ISUPPORTS
explicit WebTransportReceiveStreamStats(uint64_t aReceived)
: mTimeStamp(TimeStamp::Now()), mTotalReceived(aReceived) {}
NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
*aTimestamp = mTimeStamp;
return NS_OK;
}
NS_IMETHOD GetBytesReceived(uint64_t* aByteReceived) override {
*aByteReceived = mTotalReceived;
return NS_OK;
}
private :
virtual ~WebTransportReceiveStreamStats() = default ;
TimeStamp mTimeStamp;
uint64_t mTotalReceived;
};
NS_IMPL_ISUPPORTS(WebTransportReceiveStreamStats,
nsIWebTransportReceiveStreamStats)
} // namespace
NS_IMPL_ISUPPORTS(Http3WebTransportStream, nsIInputStreamCallback,
nsIOutputStreamCallback)
Http3WebTransportStream::Http3WebTransportStream(
Http3Session* aSession, uint64_t aSessionId, WebTransportStreamType aType,
std::function<void (Result<RefPtr<Http3WebTransportStream>, nsresult>&&)>&&
aCallback)
: Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession),
mSessionId(aSessionId),
mStreamType(aType),
mStreamRole(OUTGOING),
mStreamReadyCallback(std::move(aCallback)) {
LOG(("Http3WebTransportStream outgoing ctor %p" , this ));
}
Http3WebTransportStream::Http3WebTransportStream(Http3Session* aSession,
uint64_t aSessionId,
WebTransportStreamType aType,
uint64_t aStreamId)
: Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession),
mSessionId(aSessionId),
mStreamType(aType),
mStreamRole(INCOMING),
// WAITING_DATA indicates we are waiting
// Http3WebTransportStream::OnInputStreamReady to be called.
mSendState(WAITING_DATA),
mStreamReadyCallback(nullptr) {
LOG(("Http3WebTransportStream incoming ctor %p" , this ));
mStreamId = aStreamId;
}
Http3WebTransportStream::~Http3WebTransportStream() {
LOG(("Http3WebTransportStream dtor %p" , this ));
}
nsresult Http3WebTransportStream::TryActivating() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
return mSession->TryActivatingWebTransportStream(&mStreamId, this );
}
NS_IMETHODIMP Http3WebTransportStream::OnInputStreamReady(
nsIAsyncInputStream* aStream) {
LOG1(
("Http3WebTransportStream::OnInputStreamReady [this=%p stream=%p "
"state=%d]" ,
this , aStream, mSendState));
if (mSendState == SEND_DONE) {
// already closed
return NS_OK;
}
mSendState = SENDING;
mSession->StreamHasDataToWrite(this );
return NS_OK;
}
NS_IMETHODIMP
Http3WebTransportStream::OnOutputStreamReady(nsIAsyncOutputStream* aOutStream) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
if (!mSession) {
return NS_OK;
}
mSession->ConnectSlowConsumer(this );
return NS_OK;
}
nsresult Http3WebTransportStream::InitOutputPipe() {
nsCOMPtr<nsIAsyncOutputStream> out;
nsCOMPtr<nsIAsyncInputStream> in;
NS_NewPipe2(getter_AddRefs(in), getter_AddRefs(out), true , true ,
nsIOService::gDefaultSegmentSize,
nsIOService::gDefaultSegmentCount);
{
MutexAutoLock lock(mMutex);
mSendStreamPipeIn = std::move(in);
mSendStreamPipeOut = std::move(out);
}
nsresult rv =
mSendStreamPipeIn->AsyncWait(this , 0, 0, gSocketTransportService);
if (NS_FAILED(rv)) {
return rv;
}
mSendState = WAITING_DATA;
return NS_OK;
}
nsresult Http3WebTransportStream::InitInputPipe() {
nsCOMPtr<nsIAsyncOutputStream> out;
nsCOMPtr<nsIAsyncInputStream> in;
NS_NewPipe2(getter_AddRefs(in), getter_AddRefs(out), true , true ,
nsIOService::gDefaultSegmentSize,
nsIOService::gDefaultSegmentCount);
{
MutexAutoLock lock(mMutex);
mReceiveStreamPipeIn = std::move(in);
mReceiveStreamPipeOut = std::move(out);
}
mRecvState = READING;
return NS_OK;
}
void Http3WebTransportStream::GetWriterAndReader(
nsIAsyncOutputStream** aOutOutputStream,
nsIAsyncInputStream** aOutInputStream) {
nsCOMPtr<nsIAsyncOutputStream> output;
nsCOMPtr<nsIAsyncInputStream> input;
{
MutexAutoLock lock(mMutex);
output = mSendStreamPipeOut;
input = mReceiveStreamPipeIn;
}
output.forget(aOutOutputStream);
input.forget(aOutInputStream);
}
already_AddRefed<nsIWebTransportSendStreamStats>
Http3WebTransportStream::GetSendStreamStats() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
nsCOMPtr<nsIWebTransportSendStreamStats> stats =
new WebTransportSendStreamStats(mTotalSent, mTotalAcknowledged);
return stats.forget();
}
already_AddRefed<nsIWebTransportReceiveStreamStats>
Http3WebTransportStream::GetReceiveStreamStats() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
nsCOMPtr<nsIWebTransportReceiveStreamStats> stats =
new WebTransportReceiveStreamStats(mTotalReceived);
return stats.forget();
}
nsresult Http3WebTransportStream::OnReadSegment(const char * buf, uint32_t count,
uint32_t* countRead) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
LOG(("Http3WebTransportStream::OnReadSegment count=%u state=%d [this=%p]" ,
count, mSendState, this ));
nsresult rv = NS_OK;
switch (mSendState) {
case WAITING_TO_ACTIVATE:
rv = TryActivating();
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
LOG3(
("Http3WebTransportStream::OnReadSegment %p cannot activate now. "
"queued.\n" ,
this ));
break ;
}
if (NS_FAILED(rv)) {
LOG3(
("Http3WebTransportStream::OnReadSegment %p cannot activate "
"error=0x%" PRIx32 "." ,
this , static_cast <uint32_t>(rv)));
mStreamReadyCallback(Err(rv));
mStreamReadyCallback = nullptr;
break ;
}
rv = InitOutputPipe();
if (NS_SUCCEEDED(rv) && mStreamType == WebTransportStreamType::BiDi) {
rv = InitInputPipe();
}
if (NS_FAILED(rv)) {
LOG3(
("Http3WebTransportStream::OnReadSegment %p failed to create pipe "
"error=0x%" PRIx32 "." ,
this , static_cast <uint32_t>(rv)));
mSendState = SEND_DONE;
mStreamReadyCallback(Err(rv));
mStreamReadyCallback = nullptr;
break ;
}
// Successfully activated.
mStreamReadyCallback(RefPtr{this });
mStreamReadyCallback = nullptr;
break ;
case SENDING: {
rv = mSession->SendRequestBody(mStreamId, buf, count, countRead);
LOG3(
("Http3WebTransportStream::OnReadSegment %p sending body returns "
"error=0x%" PRIx32 "." ,
this , static_cast <uint32_t>(rv)));
mTotalSent += *countRead;
} break ;
case WAITING_DATA:
// Still waiting
LOG3((
"Http3WebTransportStream::OnReadSegment %p Still waiting for data..." ,
this ));
break ;
case SEND_DONE:
LOG3(("Http3WebTransportStream::OnReadSegment %p called after SEND_DONE " ,
this ));
MOZ_ASSERT(false , "We are done sending this request!" );
MOZ_ASSERT(mStreamReadyCallback);
rv = NS_ERROR_UNEXPECTED;
mStreamReadyCallback(Err(rv));
mStreamReadyCallback = nullptr;
break ;
}
mSocketOutCondition = rv;
return mSocketOutCondition;
}
// static
nsresult Http3WebTransportStream::ReadRequestSegment(
nsIInputStream* stream, void * closure, const char * buf, uint32_t offset,
uint32_t count, uint32_t* countRead) {
Http3WebTransportStream* wtStream = (Http3WebTransportStream*)closure;
nsresult rv = wtStream->OnReadSegment(buf, count, countRead);
LOG(("Http3WebTransportStream::ReadRequestSegment %p read=%u" , wtStream,
*countRead));
return rv;
}
nsresult Http3WebTransportStream::ReadSegments() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
LOG(("Http3WebTransportStream::ReadSegments [this=%p]" , this ));
nsresult rv = NS_OK;
uint32_t sendBytes = 0;
bool again = true ;
do {
sendBytes = 0;
rv = mSocketOutCondition = NS_OK;
LOG(("Http3WebTransportStream::ReadSegments state=%d [this=%p]" , mSendState,
this ));
switch (mSendState) {
case WAITING_TO_ACTIVATE: {
LOG3(
("Http3WebTransportStream %p ReadSegments forcing OnReadSegment "
"call\n" ,
this ));
uint32_t wasted = 0;
nsresult rv2 = OnReadSegment("" , 0, &wasted);
LOG3((" OnReadSegment returned 0x%08" PRIx32,
static_cast <uint32_t>(rv2)));
if (mSendState != WAITING_DATA) {
break ;
}
}
[[fallthrough]];
case WAITING_DATA:
[[fallthrough]];
case SENDING: {
if (mStreamRole == INCOMING &&
mStreamType == WebTransportStreamType::UniDi) {
rv = NS_OK;
break ;
}
mSendState = SENDING;
rv = mSendStreamPipeIn->ReadSegments(ReadRequestSegment, this ,
nsIOService::gDefaultSegmentSize,
&sendBytes);
} break ;
case SEND_DONE: {
return NS_OK;
}
default :
sendBytes = 0;
rv = NS_OK;
break ;
}
LOG(("Http3WebTransportStream::ReadSegments rv=0x%" PRIx32
" read=%u sock-cond=%" PRIx32 " again=%d mSendFin=%d [this=%p]" ,
static_cast <uint32_t>(rv), sendBytes,
static_cast <uint32_t>(mSocketOutCondition), again, mSendFin, this ));
// XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
if (rv == NS_BASE_STREAM_CLOSED || !mPendingTasks.IsEmpty()) {
rv = NS_OK;
sendBytes = 0;
}
if (NS_FAILED(rv)) {
// if the writer didn't want to write any more data, then
// wait for the transaction to call ResumeSend.
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mSendState = WAITING_DATA;
rv = mSendStreamPipeIn->AsyncWait(this , 0, 0, gSocketTransportService);
}
again = false ;
// Got a WebTransport specific error
if (rv >= NS_ERROR_WEBTRANSPORT_CODE_BASE &&
rv <= NS_ERROR_WEBTRANSPORT_CODE_END) {
uint8_t errorCode = GetWebTransportErrorFromNSResult(rv);
mSendState = SEND_DONE;
Reset(WebTransportErrorToHttp3Error(errorCode));
rv = NS_OK;
}
} else if (NS_FAILED(mSocketOutCondition)) {
if (mSocketOutCondition != NS_BASE_STREAM_WOULD_BLOCK) {
rv = mSocketOutCondition;
}
again = false ;
} else if (!sendBytes) {
mSendState = SEND_DONE;
rv = NS_OK;
again = false ;
if (!mPendingTasks.IsEmpty()) {
LOG(("Has pending tasks to do" ));
nsTArray<std::function<void ()>> tasks = std::move(mPendingTasks);
for (const auto & task : tasks) {
task();
}
}
// Tell the underlying stream we're done
SendFin();
}
// write more to the socket until error or end-of-request...
} while (again && gHttpHandler->Active());
return rv;
}
nsresult Http3WebTransportStream::OnWriteSegment(char * buf, uint32_t count,
uint32_t* countWritten) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
LOG(("Http3WebTransportStream::OnWriteSegment [this=%p, state=%d" , this ,
static_cast <uint32_t>(mRecvState)));
nsresult rv = NS_OK;
switch (mRecvState) {
case READING: {
rv = mSession->ReadResponseData(mStreamId, buf, count, countWritten,
&mFin);
if (*countWritten == 0) {
if (mFin) {
mRecvState = RECV_DONE;
rv = NS_BASE_STREAM_CLOSED;
} else {
rv = NS_BASE_STREAM_WOULD_BLOCK;
}
} else {
mTotalReceived += *countWritten;
if (mFin) {
mRecvState = RECEIVED_FIN;
}
}
} break ;
case RECEIVED_FIN:
rv = NS_BASE_STREAM_CLOSED;
mRecvState = RECV_DONE;
break ;
case RECV_DONE:
rv = NS_ERROR_UNEXPECTED;
break ;
default :
rv = NS_ERROR_UNEXPECTED;
break ;
}
// Remember the error received from lower layers. A stream pipe may overwrite
// it.
// If rv == NS_OK this will reset mSocketInCondition.
mSocketInCondition = rv;
return rv;
}
// static
nsresult Http3WebTransportStream::WritePipeSegment(nsIOutputStream* stream,
void * closure, char * buf,
uint32_t offset,
uint32_t count,
uint32_t* countWritten) {
Http3WebTransportStream* self = (Http3WebTransportStream*)closure;
nsresult rv = self->OnWriteSegment(buf, count, countWritten);
if (NS_FAILED(rv)) {
return rv;
}
LOG(("Http3WebTransportStream::WritePipeSegment %p written=%u" , self,
*countWritten));
return rv;
}
nsresult Http3WebTransportStream::WriteSegments() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
if (!mReceiveStreamPipeOut) {
return NS_OK;
}
LOG(("Http3WebTransportStream::WriteSegments [this=%p]" , this ));
nsresult rv = NS_OK;
uint32_t countWrittenSingle = 0;
bool again = true ;
do {
mSocketInCondition = NS_OK;
countWrittenSingle = 0;
rv = mReceiveStreamPipeOut->WriteSegments(WritePipeSegment, this ,
nsIOService::gDefaultSegmentSize,
&countWrittenSingle);
LOG(("Http3WebTransportStream::WriteSegments rv=0x%" PRIx32
" countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]" ,
static_cast <uint32_t>(rv), countWrittenSingle,
static_cast <uint32_t>(mSocketInCondition), this ));
if (NS_FAILED(rv)) {
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
nsCOMPtr<nsIEventTarget> target;
Unused << gHttpHandler->GetSocketThreadTarget(getter_AddRefs(target));
if (target) {
mReceiveStreamPipeOut->AsyncWait(this , 0, 0, target);
rv = NS_OK;
} else {
rv = NS_ERROR_UNEXPECTED;
}
}
if (rv == NS_BASE_STREAM_CLOSED) {
mReceiveStreamPipeOut->Close();
rv = NS_OK;
}
again = false ;
} else if (NS_FAILED(mSocketInCondition)) {
if (mSocketInCondition != NS_BASE_STREAM_WOULD_BLOCK) {
rv = mSocketInCondition;
if (rv == NS_BASE_STREAM_CLOSED) {
mReceiveStreamPipeOut->Close();
rv = NS_OK;
}
}
again = false ;
}
// read more from the socket until error...
} while (again && gHttpHandler->Active());
return rv;
}
bool Http3WebTransportStream::Done() const {
return mSendState == SEND_DONE && mRecvState == RECV_DONE;
}
void Http3WebTransportStream::Close(nsresult aResult) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
LOG(("Http3WebTransportStream::Close [this=%p]" , this ));
mTransaction = nullptr;
if (mSendStreamPipeIn) {
mSendStreamPipeIn->AsyncWait(nullptr, 0, 0, nullptr);
mSendStreamPipeIn->CloseWithStatus(aResult);
}
if (mReceiveStreamPipeOut) {
mReceiveStreamPipeOut->AsyncWait(nullptr, 0, 0, nullptr);
mReceiveStreamPipeOut->CloseWithStatus(aResult);
}
mSendState = SEND_DONE;
mRecvState = RECV_DONE;
mSession = nullptr;
}
void Http3WebTransportStream::SendFin() {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
LOG(("Http3WebTransportStream::SendFin [this=%p mSendState=%d]" , this ,
mSendState));
if (mSendFin || !mSession || mResetError) {
// Already closed.
return ;
}
mSendFin = true ;
switch (mSendState) {
case SENDING: {
mPendingTasks.AppendElement([self = RefPtr{this }]() {
self->mSession->CloseSendingSide(self->mStreamId);
});
} break ;
case WAITING_DATA:
mSendState = SEND_DONE;
[[fallthrough]];
case SEND_DONE:
mSession->CloseSendingSide(mStreamId);
// StreamHasDataToWrite needs to be called to trigger ProcessOutput.
mSession->StreamHasDataToWrite(this );
break ;
default :
MOZ_ASSERT_UNREACHABLE("invalid mSendState!" );
break ;
}
}
void Http3WebTransportStream::Reset(uint64_t aErrorCode) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
LOG(("Http3WebTransportStream::Reset [this=%p, mSendState=%d]" , this ,
mSendState));
if (mResetError || !mSession || mSendFin) {
// The stream is already reset.
return ;
}
mResetError = Some(aErrorCode);
switch (mSendState) {
case SENDING: {
LOG(("Http3WebTransportStream::Reset [this=%p] reset after sending data" ,
this ));
mPendingTasks.AppendElement([self = RefPtr{this }]() {
// "Reset" needs a special treatment here. If we are sending data and
// ResetWebTransportStream is called before Http3Session::ProcessOutput,
// neqo will drop the last piece of data.
NS_DispatchToCurrentThread(
NS_NewRunnableFunction("Http3WebTransportStream::Reset" , [self]() {
self->mSession->ResetWebTransportStream(self, *self->mResetError);
self->mSession->StreamHasDataToWrite(self);
self->mSession->ConnectSlowConsumer(self);
}));
});
} break ;
case WAITING_DATA:
mSendState = SEND_DONE;
[[fallthrough]];
case SEND_DONE:
mSession->ResetWebTransportStream(this , *mResetError);
// StreamHasDataToWrite needs to be called to trigger ProcessOutput.
mSession->StreamHasDataToWrite(this );
mSession->ConnectSlowConsumer(this );
break ;
default :
MOZ_ASSERT_UNREACHABLE("invalid mSendState!" );
break ;
}
}
void Http3WebTransportStream::SendStopSending(uint8_t aErrorCode) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread" );
LOG(("Http3WebTransportStream::SendStopSending [this=%p, mSendState=%d]" ,
this , mSendState));
if (mSendState == WAITING_TO_ACTIVATE) {
return ;
}
if (mStopSendingError || !mSession) {
return ;
}
mStopSendingError = Some(aErrorCode);
mSession->StreamStopSending(this , *mStopSendingError);
// StreamHasDataToWrite needs to be called to trigger ProcessOutput.
mSession->StreamHasDataToWrite(this );
}
void Http3WebTransportStream::SetSendOrder(Maybe<int64_t> aSendOrder) {
mSession->SetSendOrder(this , aSendOrder);
}
} // namespace mozilla::net
quality 97%
¤ Dauer der Verarbeitung: 0.17 Sekunden
(vorverarbeitet)
¤
*© Formatika GbR, Deutschland