Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/xpcom/tests/gtest/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 31 kB image not shown  

Quelle  TestPipes.cpp   Sprache: C

 
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */


#include <algorithm>
#include "gtest/gtest.h"
#include "Helpers.h"
#include "mozilla/gtest/MozAssertions.h"
#include "mozilla/ReentrantMonitor.h"
#include "mozilla/Printf.h"
#include "nsCOMPtr.h"
#include "nsCRT.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
#include "nsIBufferedStreams.h"
#include "nsIClassInfo.h"
#include "nsICloneableInputStream.h"
#include "nsIInputStream.h"
#include "nsIOutputStream.h"
#include "nsIPipe.h"
#include "nsITellableStream.h"
#include "nsIThread.h"
#include "nsIRunnable.h"
#include "nsStreamUtils.h"
#include "nsString.h"
#include "nsThreadUtils.h"
#include "prinrval.h"

using namespace mozilla;

#define ITERATIONS 33333
char kTestPattern[] = "My hovercraft is full of eels.\n";

bool gTrace = false;

static nsresult WriteAll(nsIOutputStream* os, const char* buf, uint32_t bufLen,
                         uint32_t* lenWritten) {
  const char* p = buf;
  *lenWritten = 0;
  while (bufLen) {
    uint32_t n;
    nsresult rv = os->Write(p, bufLen, &n);
    if (NS_FAILED(rv)) return rv;
    p += n;
    bufLen -= n;
    *lenWritten += n;
  }
  return NS_OK;
}

class nsReceiver final : public Runnable {
 public:
  NS_IMETHOD Run() override {
    nsresult rv;
    char buf[101];
    uint32_t count;
    PRIntervalTime start = PR_IntervalNow();
    while (true) {
      rv = mIn->Read(buf, 100, &count);
      if (NS_FAILED(rv)) {
        printf("read failed\n");
        break;
      }
      if (count == 0) {
        //                printf("EOF count = %d\n", mCount);
        break;
      }

      if (gTrace) {
        buf[count] = '\0';
        printf("read: %s\n", buf);
      }
      mCount += count;
    }
    PRIntervalTime end = PR_IntervalNow();
    printf("read %d bytes, time = %dms\n", mCount,
           PR_IntervalToMilliseconds(end - start));
    return rv;
  }

  explicit nsReceiver(nsIInputStream* in)
      : Runnable("nsReceiver"), mIn(in), mCount(0) {}

  uint32_t GetBytesRead() { return mCount; }

 private:
  ~nsReceiver() = default;

 protected:
  nsCOMPtr<nsIInputStream> mIn;
  uint32_t mCount;
};

static nsresult TestPipe(nsIInputStream* in, nsIOutputStream* out) {
  RefPtr<nsReceiver> receiver = new nsReceiver(in);
  nsresult rv;

  nsCOMPtr<nsIThread> thread;
  rv = NS_NewNamedThread("TestPipe", getter_AddRefs(thread), receiver);
  if (NS_FAILED(rv)) return rv;

  uint32_t total = 0;
  PRIntervalTime start = PR_IntervalNow();
  for (uint32_t i = 0; i < ITERATIONS; i++) {
    uint32_t writeCount;
    SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
    uint32_t len = strlen(buf.get());
    rv = WriteAll(out, buf.get(), len, &writeCount);
    if (gTrace) {
      printf("wrote: ");
      for (uint32_t j = 0; j < writeCount; j++) {
        putc(buf.get()[j], stdout);
      }
      printf("\n");
    }
    if (NS_FAILED(rv)) return rv;
    total += writeCount;
  }
  rv = out->Close();
  if (NS_FAILED(rv)) return rv;

  PRIntervalTime end = PR_IntervalNow();

  thread->Shutdown();

  printf("wrote %d bytes, time = %dms\n", total,
         PR_IntervalToMilliseconds(end - start));
  EXPECT_EQ(receiver->GetBytesRead(), total);

  return NS_OK;
}

////////////////////////////////////////////////////////////////////////////////

class nsShortReader final : public Runnable {
 public:
  NS_IMETHOD Run() override {
    nsresult rv;
    char buf[101];
    uint32_t count;
    uint32_t total = 0;
    while (true) {
      // if (gTrace)
      //    printf("calling Read\n");
      rv = mIn->Read(buf, 100, &count);
      if (NS_FAILED(rv)) {
        printf("read failed\n");
        break;
      }
      if (count == 0) {
        break;
      }

      if (gTrace) {
        // For next |printf()| call and possible others elsewhere.
        buf[count] = '\0';

        printf("read %d bytes: %s\n", count, buf);
      }

      Received(count);
      total += count;
    }
    printf("read %d bytes\n", total);
    return rv;
  }

  explicit nsShortReader(nsIInputStream* in)
      : Runnable("nsShortReader"), mIn(in), mReceived(0) {
    mMon = new ReentrantMonitor("nsShortReader");
  }

  void Received(uint32_t count) {
    ReentrantMonitorAutoEnter mon(*mMon);
    mReceived += count;
    mon.Notify();
  }

  uint32_t WaitForReceipt(const uint32_t aWriteCount) {
    ReentrantMonitorAutoEnter mon(*mMon);
    uint32_t result = mReceived;

    while (result < aWriteCount) {
      mon.Wait();

      EXPECT_TRUE(mReceived > result);
      result = mReceived;
    }

    mReceived = 0;
    return result;
  }

 private:
  ~nsShortReader() = default;

 protected:
  nsCOMPtr<nsIInputStream> mIn;
  uint32_t mReceived;
  ReentrantMonitor* mMon;
};

static nsresult TestShortWrites(nsIInputStream* in, nsIOutputStream* out) {
  RefPtr<nsShortReader> receiver = new nsShortReader(in);
  nsresult rv;

  nsCOMPtr<nsIThread> thread;
  rv = NS_NewNamedThread("TestShortWrites", getter_AddRefs(thread), receiver);
  if (NS_FAILED(rv)) return rv;

  uint32_t total = 0;
  for (uint32_t i = 0; i < ITERATIONS; i++) {
    uint32_t writeCount;
    SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
    uint32_t len = strlen(buf.get());
    len = len * rand() / RAND_MAX;
    len = std::min(1u, len);
    rv = WriteAll(out, buf.get(), len, &writeCount);
    if (NS_FAILED(rv)) return rv;
    EXPECT_EQ(writeCount, len);
    total += writeCount;

    if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
    // printf("calling Flush\n");
    out->Flush();
    // printf("calling WaitForReceipt\n");

#ifdef DEBUG
    const uint32_t received = receiver->WaitForReceipt(writeCount);
    EXPECT_EQ(received, writeCount);
#endif
  }
  rv = out->Close();
  if (NS_FAILED(rv)) return rv;

  thread->Shutdown();

  printf("wrote %d bytes\n", total);

  return NS_OK;
}

////////////////////////////////////////////////////////////////////////////////

class nsPump final : public Runnable {
 public:
  NS_IMETHOD Run() override {
    nsresult rv;
    uint32_t count;
    while (true) {
      rv = mOut->WriteFrom(mIn, ~0U, &count);
      if (NS_FAILED(rv)) {
        printf("Write failed\n");
        break;
      }
      if (count == 0) {
        printf("EOF count = %d\n", mCount);
        break;
      }

      if (gTrace) {
        printf("Wrote: %d\n", count);
      }
      mCount += count;
    }
    mOut->Close();
    return rv;
  }

  nsPump(nsIInputStream* in, nsIOutputStream* out)
      : Runnable("nsPump"), mIn(in), mOut(out), mCount(0) {}

 private:
  ~nsPump() = default;

 protected:
  nsCOMPtr<nsIInputStream> mIn;
  nsCOMPtr<nsIOutputStream> mOut;
  uint32_t mCount;
};

TEST(Pipes, ChainedPipes)
{
  nsresult rv;
  if (gTrace) {
    printf("TestChainedPipes\n");
  }

  nsCOMPtr<nsIInputStream> in1;
  nsCOMPtr<nsIOutputStream> out1;
  NS_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999);

  nsCOMPtr<nsIInputStream> in2;
  nsCOMPtr<nsIOutputStream> out2;
  NS_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401);

  RefPtr<nsPump> pump = new nsPump(in1, out2);
  if (pump == nullptr) return;

  nsCOMPtr<nsIThread> thread;
  rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump);
  if (NS_FAILED(rv)) return;

  RefPtr<nsReceiver> receiver = new nsReceiver(in2);
  if (receiver == nullptr) return;

  nsCOMPtr<nsIThread> receiverThread;
  rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread),
                         receiver);
  if (NS_FAILED(rv)) return;

  uint32_t total = 0;
  for (uint32_t i = 0; i < ITERATIONS; i++) {
    uint32_t writeCount;
    SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
    uint32_t len = strlen(buf.get());
    len = len * rand() / RAND_MAX;
    len = std::max(1u, len);
    rv = WriteAll(out1, buf.get(), len, &writeCount);
    if (NS_FAILED(rv)) return;
    EXPECT_EQ(writeCount, len);
    total += writeCount;

    if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
  }
  if (gTrace) {
    printf("wrote total of %d bytes\n", total);
  }
  rv = out1->Close();
  if (NS_FAILED(rv)) return;

  thread->Shutdown();
  receiverThread->Shutdown();
}

////////////////////////////////////////////////////////////////////////////////

static void RunTests(uint32_t segSize, uint32_t segCount) {
  nsresult rv;
  nsCOMPtr<nsIInputStream> in;
  nsCOMPtr<nsIOutputStream> out;
  uint32_t bufSize = segSize * segCount;
  if (gTrace) {
    printf("Testing New Pipes: segment size %d buffer size %d\n", segSize,
           bufSize);
    printf("Testing long writes...\n");
  }
  NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
  rv = TestPipe(in, out);
  EXPECT_NS_SUCCEEDED(rv);

  if (gTrace) {
    printf("Testing short writes...\n");
  }
  NS_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize);
  rv = TestShortWrites(in, out);
  EXPECT_NS_SUCCEEDED(rv);
}

TEST(Pipes, Main)
{
  RunTests(16, 1);
  RunTests(4096, 16);
}

////////////////////////////////////////////////////////////////////////////////

namespace {

static const uint32_t DEFAULT_SEGMENT_SIZE = 4 * 1024;

// An alternate pipe testing routing that uses NS_ConsumeStream() instead of
// manual read loop.
static void TestPipe2(uint32_t aNumBytes,
                      uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
  nsCOMPtr<nsIInputStream> reader;
  nsCOMPtr<nsIOutputStream> writer;

  uint32_t maxSize = std::max(aNumBytes, aSegmentSize);

  NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
             maxSize);

  nsTArray<char> inputData;
  testing::CreateData(aNumBytes, inputData);
  testing::WriteAllAndClose(writer, inputData);
  testing::ConsumeAndValidateStream(reader, inputData);
}

}  // namespace

TEST(Pipes, Blocking_32k)
{ TestPipe2(32 * 1024); }

TEST(Pipes, Blocking_64k)
{ TestPipe2(64 * 1024); }

TEST(Pipes, Blocking_128k)
{ TestPipe2(128 * 1024); }

////////////////////////////////////////////////////////////////////////////////

namespace {

// Utility routine to validate pipe clone before.  There are many knobs.
//
// aTotalBytes              Total number of bytes to write to the pipe.
// aNumWrites               How many separate write calls should be made.  Bytes
//                          are evenly distributed over these write calls.
// aNumInitialClones        How many clones of the pipe input stream should be
//                          made before writing begins.
// aNumToCloseAfterWrite    How many streams should be closed after each write.
//                          One stream is always kept open.  This verifies that
//                          closing one stream does not effect other open
//                          streams.
// aNumToCloneAfterWrite    How many clones to create after each write.  Occurs
//                          after closing any streams.  This tests cloning
//                          active streams on a pipe that is being written to.
// aNumStreamToReadPerWrite How many streams to read fully after each write.
//                          This tests reading cloned streams at different rates
//                          while the pipe is being written to.
static void TestPipeClone(uint32_t aTotalBytes, uint32_t aNumWrites,
                          uint32_t aNumInitialClones,
                          uint32_t aNumToCloseAfterWrite,
                          uint32_t aNumToCloneAfterWrite,
                          uint32_t aNumStreamsToReadPerWrite,
                          uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
  nsCOMPtr<nsIInputStream> reader;
  nsCOMPtr<nsIOutputStream> writer;

  uint32_t maxSize = std::max(aTotalBytes, aSegmentSize);

  // Use async input streams so we can NS_ConsumeStream() the current data
  // while the pipe is still being written to.
  NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
             maxSize, truefalse);  // non-blocking - reader, writer

  nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(reader);
  ASSERT_TRUE(cloneable);
  ASSERT_TRUE(cloneable->GetCloneable());

  nsTArray<nsCString> outputDataList;

  nsTArray<nsCOMPtr<nsIInputStream>> streamList;

  // first stream is our original reader from the pipe
  streamList.AppendElement(reader);
  outputDataList.AppendElement();

  // Clone the initial input stream the specified number of times
  // before performing any writes.
  nsresult rv;
  for (uint32_t i = 0; i < aNumInitialClones; ++i) {
    nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
    rv = cloneable->Clone(getter_AddRefs(*clone));
    ASSERT_NS_SUCCEEDED(rv);
    ASSERT_TRUE(*clone);

    outputDataList.AppendElement();
  }

  nsTArray<char> inputData;
  testing::CreateData(aTotalBytes, inputData);

  const uint32_t bytesPerWrite = ((aTotalBytes - 1) / aNumWrites) + 1;
  uint32_t offset = 0;
  uint32_t remaining = aTotalBytes;
  uint32_t nextStreamToRead = 0;

  while (remaining) {
    uint32_t numToWrite = std::min(bytesPerWrite, remaining);
    testing::Write(writer, inputData, offset, numToWrite);
    offset += numToWrite;
    remaining -= numToWrite;

    // Close the specified number of streams.  This allows us to
    // test that one closed clone does not break other open clones.
    for (uint32_t i = 0; i < aNumToCloseAfterWrite && streamList.Length() > 1;
         ++i) {
      uint32_t lastIndex = streamList.Length() - 1;
      streamList[lastIndex]->Close();
      streamList.RemoveElementAt(lastIndex);
      outputDataList.RemoveElementAt(lastIndex);

      if (nextStreamToRead >= streamList.Length()) {
        nextStreamToRead = 0;
      }
    }

    // Create the specified number of clones.  This lets us verify
    // that we can create clones in the middle of pipe reading and
    // writing.
    for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
      nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
      rv = cloneable->Clone(getter_AddRefs(*clone));
      ASSERT_NS_SUCCEEDED(rv);
      ASSERT_TRUE(*clone);

      // Initialize the new output data to make whats been read to data for
      // the original stream.  First stream is always the original stream.
      nsCString* outputData = outputDataList.AppendElement();
      *outputData = outputDataList[0];
    }

    // Read the specified number of streams.  This lets us verify that we
    // can read from the clones at different rates while the pipe is being
    // written to.
    for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
      nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
      nsCString& outputData = outputDataList[nextStreamToRead];

      // Can't use ConsumeAndValidateStream() here because we're not
      // guaranteed the exact amount read.  It should just be at least
      // as many as numToWrite.
      nsAutoCString tmpOutputData;
      rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
      ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
      ASSERT_GE(tmpOutputData.Length(), numToWrite);

      outputData += tmpOutputData;

      nextStreamToRead += 1;
      if (nextStreamToRead >= streamList.Length()) {
        // Note: When we wrap around on the streams being read, its possible
        //       we will trigger a segment to be deleted from the pipe.  It
        //       would be nice to validate this here, but we don't have any
        //       QI'able interface that would let us check easily.

        nextStreamToRead = 0;
      }
    }
  }

  rv = writer->Close();
  ASSERT_NS_SUCCEEDED(rv);

  nsDependentCSubstring inputString(inputData.Elements(), inputData.Length());

  // Finally, read the remaining bytes from each stream.  This may be
  // different amounts of data depending on how much reading we did while
  // writing.  Verify that the end result matches the input data.
  for (uint32_t i = 0; i < streamList.Length(); ++i) {
    nsCOMPtr<nsIInputStream>& stream = streamList[i];
    nsCString& outputData = outputDataList[i];

    nsAutoCString tmpOutputData;
    rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
    ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
    stream->Close();

    // Append to total amount read from the stream
    outputData += tmpOutputData;

    ASSERT_EQ(inputString.Length(), outputData.Length());
    ASSERT_TRUE(inputString.Equals(outputData));
  }
}

}  // namespace

TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
{
  TestPipeClone(32 * 1024,  // total bytes
                16,         // num writes
                3,          // num initial clones
                0,          // num streams to close after each write
                0,          // num clones to add after each write
                0);         // num streams to read after each write
}

TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
{
  // Since this reads all streams on every write, it should trigger the
  // pipe cursor roll back optimization.  Currently we can only verify
  // this with logging.

  TestPipeClone(32 * 1024,  // total bytes
                16,         // num writes
                3,          // num initial clones
                0,          // num streams to close after each write
                0,          // num clones to add after each write
                4);         // num streams to read after each write
}

TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
{
  TestPipeClone(32 * 1024,  // total bytes
                16,         // num writes
                0,          // num initial clones
                0,          // num streams to close after each write
                1,          // num clones to add after each write
                0);         // num streams to read after each write
}

TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
{
  TestPipeClone(32 * 1024,  // total bytes
                16,         // num writes
                0,          // num initial clones
                0,          // num streams to close after each write
                1,          // num clones to add after each write
                1);         // num streams to read after each write
}

TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
{
  // Since this reads streams faster than we clone new ones, it should
  // trigger pipe segment deletion periodically.  Currently we can
  // only verify this with logging.

  TestPipeClone(32 * 1024,  // total bytes
                16,         // num writes
                1,          // num initial clones
                1,          // num streams to close after each write
                2,          // num clones to add after each write
                3);         // num streams to read after each write
}

TEST(Pipes, Write_AsyncWait)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
              true,  // non-blocking - reader, writer
              segmentSize, numSegments);

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  uint32_t numWritten = 0;
  nsresult rv =
      writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);

  RefPtr<testing::OutputStreamCallback> cb =
      new testing::OutputStreamCallback();

  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);

  ASSERT_FALSE(cb->Called());

  testing::ConsumeAndValidateStream(reader, inputData);

  ASSERT_TRUE(cb->Called());
}

TEST(Pipes, Write_AsyncWait_Clone)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
              true,  // non-blocking - reader, writer
              segmentSize, numSegments);

  nsCOMPtr<nsIInputStream> clone;
  nsresult rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
  ASSERT_NS_SUCCEEDED(rv);

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  // This attempts to write data beyond the original pipe size limit.  It
  // should fail since neither side of the clone has been read yet.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);

  RefPtr<testing::OutputStreamCallback> cb =
      new testing::OutputStreamCallback();

  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);

  ASSERT_FALSE(cb->Called());

  // Consume data on the original stream, but the clone still has not been read.
  testing::ConsumeAndValidateStream(reader, inputData);

  // A clone that is not being read should not stall the other input stream
  // reader.  Therefore the writer callback should trigger when the fastest
  // reader drains the other input stream.
  ASSERT_TRUE(cb->Called());

  // Attempt to write data.  This will buffer data beyond the pipe size limit in
  // order for the clone stream to still work.  This is allowed because the
  // other input stream has drained its buffered segments and is ready for more
  // data.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  // Again, this should fail since the origin stream has not been read again.
  // The pipe size should still restrict how far ahead we can buffer even
  // when there is a cloned stream not being read.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_FAILED(rv);

  cb = new testing::OutputStreamCallback();
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);

  // The write should again be blocked since we have written data and the
  // main reader is at its maximum advance buffer.
  ASSERT_FALSE(cb->Called());

  nsTArray<char> expectedCloneData;
  expectedCloneData.AppendElements(inputData);
  expectedCloneData.AppendElements(inputData);

  // We should now be able to consume the entire backlog of buffered data on
  // the cloned stream.
  testing::ConsumeAndValidateStream(clone, expectedCloneData);

  // Draining the clone side should also trigger the AsyncWait() writer
  // callback
  ASSERT_TRUE(cb->Called());

  // Finally, we should be able to consume the remaining data on the original
  // reader.
  testing::ConsumeAndValidateStream(reader, inputData);
}

TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
              true,  // non-blocking - reader, writer
              segmentSize, numSegments);

  nsCOMPtr<nsIInputStream> clone;
  nsresult rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
  ASSERT_NS_SUCCEEDED(rv);

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  // This attempts to write data beyond the original pipe size limit.  It
  // should fail since neither side of the clone has been read yet.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);

  RefPtr<testing::OutputStreamCallback> cb =
      new testing::OutputStreamCallback();

  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);

  ASSERT_FALSE(cb->Called());

  // Consume data on the original stream, but the clone still has not been read.
  testing::ConsumeAndValidateStream(reader, inputData);

  // A clone that is not being read should not stall the other input stream
  // reader.  Therefore the writer callback should trigger when the fastest
  // reader drains the other input stream.
  ASSERT_TRUE(cb->Called());

  // Attempt to write data.  This will buffer data beyond the pipe size limit in
  // order for the clone stream to still work.  This is allowed because the
  // other input stream has drained its buffered segments and is ready for more
  // data.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  // Again, this should fail since the origin stream has not been read again.
  // The pipe size should still restrict how far ahead we can buffer even
  // when there is a cloned stream not being read.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_FAILED(rv);

  cb = new testing::OutputStreamCallback();
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);

  // The write should again be blocked since we have written data and the
  // main reader is at its maximum advance buffer.
  ASSERT_FALSE(cb->Called());

  // Close the original reader input stream.  This was the fastest reader,
  // so we should have a single stream that is buffered beyond our nominal
  // limit.
  reader->Close();

  // Because the clone stream is still buffered the writable callback should
  // not be fired.
  ASSERT_FALSE(cb->Called());

  // And we should not be able to perform a write.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_FAILED(rv);

  // Create another clone stream.  Now we have two streams that exceed our
  // maximum size limit
  nsCOMPtr<nsIInputStream> clone2;
  rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
  ASSERT_NS_SUCCEEDED(rv);

  nsTArray<char> expectedCloneData;
  expectedCloneData.AppendElements(inputData);
  expectedCloneData.AppendElements(inputData);

  // We should now be able to consume the entire backlog of buffered data on
  // the cloned stream.
  testing::ConsumeAndValidateStream(clone, expectedCloneData);

  // The pipe should now be writable because we have two open streams, one of
  // which is completely drained.
  ASSERT_TRUE(cb->Called());

  // Write again to reach our limit again.
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  // The stream is again non-writeable.
  cb = new testing::OutputStreamCallback();
  rv = writer->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);
  ASSERT_FALSE(cb->Called());

  // Close the empty stream.  This is different from our previous close since
  // before we were closing a stream with some data still buffered.
  clone->Close();

  // The pipe should not be writable.  The second clone is still fully buffered
  // over our limit.
  ASSERT_FALSE(cb->Called());
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_FAILED(rv);

  // Finally consume all of the buffered data on the second clone.
  expectedCloneData.AppendElements(inputData);
  testing::ConsumeAndValidateStream(clone2, expectedCloneData);

  // Draining the final clone should make the pipe writable again.
  ASSERT_TRUE(cb->Called());
}

TEST(Pipes, Read_AsyncWait)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
              true,  // non-blocking - reader, writer
              segmentSize, numSegments);

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();

  nsresult rv = reader->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);

  ASSERT_FALSE(cb->Called());

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  ASSERT_TRUE(cb->Called());

  testing::ConsumeAndValidateStream(reader, inputData);
}

TEST(Pipes, Read_AsyncWait_Clone)
{
  nsCOMPtr<nsIAsyncInputStream> reader;
  nsCOMPtr<nsIAsyncOutputStream> writer;

  const uint32_t segmentSize = 1024;
  const uint32_t numSegments = 1;

  NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), true,
              true,  // non-blocking - reader, writer
              segmentSize, numSegments);

  nsCOMPtr<nsIInputStream> clone;
  nsresult rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
  ASSERT_NS_SUCCEEDED(rv);

  nsCOMPtr<nsIAsyncInputStream> asyncClone = do_QueryInterface(clone);
  ASSERT_TRUE(asyncClone);

  nsTArray<char> inputData;
  testing::CreateData(segmentSize, inputData);

  RefPtr<testing::InputStreamCallback> cb = new testing::InputStreamCallback();

  RefPtr<testing::InputStreamCallback> cb2 = new testing::InputStreamCallback();

  rv = reader->AsyncWait(cb, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);

  ASSERT_FALSE(cb->Called());

  rv = asyncClone->AsyncWait(cb2, 0, 0, nullptr);
  ASSERT_NS_SUCCEEDED(rv);

  ASSERT_FALSE(cb2->Called());

  uint32_t numWritten = 0;
  rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  ASSERT_TRUE(cb->Called());
  ASSERT_TRUE(cb2->Called());

  testing::ConsumeAndValidateStream(reader, inputData);
}

namespace {

nsresult CloseDuringReadFunc(nsIInputStream* aReader, void* aClosure,
                             const char* aFromSegment, uint32_t aToOffset,
                             uint32_t aCount, uint32_t* aWriteCountOut) {
  MOZ_RELEASE_ASSERT(aReader);
  MOZ_RELEASE_ASSERT(aClosure);
  MOZ_RELEASE_ASSERT(aFromSegment);
  MOZ_RELEASE_ASSERT(aWriteCountOut);
  MOZ_RELEASE_ASSERT(aToOffset == 0);

  // This is insanity and you probably should not do this under normal
  // conditions.  We want to simulate the case where the pipe is closed
  // (possibly from other end on another thread) simultaneously with the
  // read.  This is the easiest way to do trigger this case in a synchronous
  // gtest.
  MOZ_ALWAYS_SUCCEEDS(aReader->Close());

  nsTArray<char>* buffer = static_cast<nsTArray<char>*>(aClosure);
  buffer->AppendElements(aFromSegment, aCount);

  *aWriteCountOut = aCount;

  return NS_OK;
}

void TestCloseDuringRead(uint32_t aSegmentSize, uint32_t aDataSize) {
  nsCOMPtr<nsIInputStream> reader;
  nsCOMPtr<nsIOutputStream> writer;

  const uint32_t maxSize = aSegmentSize;

  NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
             maxSize);

  nsTArray<char> inputData;

  testing::CreateData(aDataSize, inputData);

  uint32_t numWritten = 0;
  nsresult rv =
      writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
  ASSERT_NS_SUCCEEDED(rv);

  nsTArray<char> outputData;

  uint32_t numRead = 0;
  rv = reader->ReadSegments(CloseDuringReadFunc, &outputData,
                            inputData.Length(), &numRead);
  ASSERT_NS_SUCCEEDED(rv);
  ASSERT_EQ(inputData.Length(), numRead);

  ASSERT_EQ(inputData, outputData);

  uint64_t available;
  rv = reader->Available(&available);
  ASSERT_EQ(NS_BASE_STREAM_CLOSED, rv);
}

}  // namespace

TEST(Pipes, Close_During_Read_Partial_Segment)
{ TestCloseDuringRead(1024, 512); }

TEST(Pipes, Close_During_Read_Full_Segment)
{ TestCloseDuringRead(1024, 1024); }

TEST(Pipes, Interfaces)
{
  nsCOMPtr<nsIInputStream> reader;
  nsCOMPtr<nsIOutputStream> writer;

  NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer));

  nsCOMPtr<nsIAsyncInputStream> readerType1 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType1);

  nsCOMPtr<nsITellableStream> readerType2 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType2);

  nsCOMPtr<nsISearchableInputStream> readerType3 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType3);

  nsCOMPtr<nsICloneableInputStream> readerType4 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType4);

  nsCOMPtr<nsIClassInfo> readerType5 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType5);

  nsCOMPtr<nsIBufferedInputStream> readerType6 = do_QueryInterface(reader);
  ASSERT_TRUE(readerType6);
}

99%


¤ Dauer der Verarbeitung: 0.16 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.