/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
uint32_t total = 0; for (uint32_t i = 0; i < ITERATIONS; i++) {
uint32_t writeCount;
SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
uint32_t len = strlen(buf.get());
len = len * rand() / RAND_MAX;
len = std::min(1u, len);
rv = WriteAll(out, buf.get(), len, &writeCount); if (NS_FAILED(rv)) return rv;
EXPECT_EQ(writeCount, len);
total += writeCount;
RefPtr<nsPump> pump = new nsPump(in1, out2); if (pump == nullptr) return;
nsCOMPtr<nsIThread> thread;
rv = NS_NewNamedThread("ChainedPipePump", getter_AddRefs(thread), pump); if (NS_FAILED(rv)) return;
RefPtr<nsReceiver> receiver = new nsReceiver(in2); if (receiver == nullptr) return;
nsCOMPtr<nsIThread> receiverThread;
rv = NS_NewNamedThread("ChainedPipeRecv", getter_AddRefs(receiverThread),
receiver); if (NS_FAILED(rv)) return;
uint32_t total = 0; for (uint32_t i = 0; i < ITERATIONS; i++) {
uint32_t writeCount;
SmprintfPointer buf = mozilla::Smprintf("%d %s", i, kTestPattern);
uint32_t len = strlen(buf.get());
len = len * rand() / RAND_MAX;
len = std::max(1u, len);
rv = WriteAll(out1, buf.get(), len, &writeCount); if (NS_FAILED(rv)) return;
EXPECT_EQ(writeCount, len);
total += writeCount;
if (gTrace) printf("wrote %d bytes: %s\n", writeCount, buf.get());
} if (gTrace) {
printf("wrote total of %d bytes\n", total);
}
rv = out1->Close(); if (NS_FAILED(rv)) return;
// Utility routine to validate pipe clone before. There are many knobs. // // aTotalBytes Total number of bytes to write to the pipe. // aNumWrites How many separate write calls should be made. Bytes // are evenly distributed over these write calls. // aNumInitialClones How many clones of the pipe input stream should be // made before writing begins. // aNumToCloseAfterWrite How many streams should be closed after each write. // One stream is always kept open. This verifies that // closing one stream does not effect other open // streams. // aNumToCloneAfterWrite How many clones to create after each write. Occurs // after closing any streams. This tests cloning // active streams on a pipe that is being written to. // aNumStreamToReadPerWrite How many streams to read fully after each write. // This tests reading cloned streams at different rates // while the pipe is being written to. staticvoid TestPipeClone(uint32_t aTotalBytes, uint32_t aNumWrites,
uint32_t aNumInitialClones,
uint32_t aNumToCloseAfterWrite,
uint32_t aNumToCloneAfterWrite,
uint32_t aNumStreamsToReadPerWrite,
uint32_t aSegmentSize = DEFAULT_SEGMENT_SIZE) {
nsCOMPtr<nsIInputStream> reader;
nsCOMPtr<nsIOutputStream> writer;
// Use async input streams so we can NS_ConsumeStream() the current data // while the pipe is still being written to.
NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer), aSegmentSize,
maxSize, true, false); // non-blocking - reader, writer
// first stream is our original reader from the pipe
streamList.AppendElement(reader);
outputDataList.AppendElement();
// Clone the initial input stream the specified number of times // before performing any writes.
nsresult rv; for (uint32_t i = 0; i < aNumInitialClones; ++i) {
nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
rv = cloneable->Clone(getter_AddRefs(*clone));
ASSERT_NS_SUCCEEDED(rv);
ASSERT_TRUE(*clone);
// Close the specified number of streams. This allows us to // test that one closed clone does not break other open clones. for (uint32_t i = 0; i < aNumToCloseAfterWrite && streamList.Length() > 1;
++i) {
uint32_t lastIndex = streamList.Length() - 1;
streamList[lastIndex]->Close();
streamList.RemoveElementAt(lastIndex);
outputDataList.RemoveElementAt(lastIndex);
if (nextStreamToRead >= streamList.Length()) {
nextStreamToRead = 0;
}
}
// Create the specified number of clones. This lets us verify // that we can create clones in the middle of pipe reading and // writing. for (uint32_t i = 0; i < aNumToCloneAfterWrite; ++i) {
nsCOMPtr<nsIInputStream>* clone = streamList.AppendElement();
rv = cloneable->Clone(getter_AddRefs(*clone));
ASSERT_NS_SUCCEEDED(rv);
ASSERT_TRUE(*clone);
// Initialize the new output data to make whats been read to data for // the original stream. First stream is always the original stream.
nsCString* outputData = outputDataList.AppendElement();
*outputData = outputDataList[0];
}
// Read the specified number of streams. This lets us verify that we // can read from the clones at different rates while the pipe is being // written to. for (uint32_t i = 0; i < aNumStreamsToReadPerWrite; ++i) {
nsCOMPtr<nsIInputStream>& stream = streamList[nextStreamToRead];
nsCString& outputData = outputDataList[nextStreamToRead];
// Can't use ConsumeAndValidateStream() here because we're not // guaranteed the exact amount read. It should just be at least // as many as numToWrite.
nsAutoCString tmpOutputData;
rv = NS_ConsumeStream(stream, UINT32_MAX, tmpOutputData);
ASSERT_TRUE(rv == NS_BASE_STREAM_WOULD_BLOCK || NS_SUCCEEDED(rv));
ASSERT_GE(tmpOutputData.Length(), numToWrite);
outputData += tmpOutputData;
nextStreamToRead += 1; if (nextStreamToRead >= streamList.Length()) { // Note: When we wrap around on the streams being read, its possible // we will trigger a segment to be deleted from the pipe. It // would be nice to validate this here, but we don't have any // QI'able interface that would let us check easily.
// Finally, read the remaining bytes from each stream. This may be // different amounts of data depending on how much reading we did while // writing. Verify that the end result matches the input data. for (uint32_t i = 0; i < streamList.Length(); ++i) {
nsCOMPtr<nsIInputStream>& stream = streamList[i];
nsCString& outputData = outputDataList[i];
TEST(Pipes, Clone_BeforeWrite_ReadAtEnd)
{
TestPipeClone(32 * 1024, // total bytes
16, // num writes
3, // num initial clones
0, // num streams to close after each write
0, // num clones to add after each write
0); // num streams to read after each write
}
TEST(Pipes, Clone_BeforeWrite_ReadDuringWrite)
{ // Since this reads all streams on every write, it should trigger the // pipe cursor roll back optimization. Currently we can only verify // this with logging.
TestPipeClone(32 * 1024, // total bytes
16, // num writes
3, // num initial clones
0, // num streams to close after each write
0, // num clones to add after each write
4); // num streams to read after each write
}
TEST(Pipes, Clone_DuringWrite_ReadAtEnd)
{
TestPipeClone(32 * 1024, // total bytes
16, // num writes
0, // num initial clones
0, // num streams to close after each write
1, // num clones to add after each write
0); // num streams to read after each write
}
TEST(Pipes, Clone_DuringWrite_ReadDuringWrite)
{
TestPipeClone(32 * 1024, // total bytes
16, // num writes
0, // num initial clones
0, // num streams to close after each write
1, // num clones to add after each write
1); // num streams to read after each write
}
TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
{ // Since this reads streams faster than we clone new ones, it should // trigger pipe segment deletion periodically. Currently we can // only verify this with logging.
TestPipeClone(32 * 1024, // total bytes
16, // num writes
1, // num initial clones
1, // num streams to close after each write
2, // num clones to add after each write
3); // num streams to read after each write
}
// This attempts to write data beyond the original pipe size limit. It // should fail since neither side of the clone has been read yet.
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
RefPtr<testing::OutputStreamCallback> cb = new testing::OutputStreamCallback();
// Consume data on the original stream, but the clone still has not been read.
testing::ConsumeAndValidateStream(reader, inputData);
// A clone that is not being read should not stall the other input stream // reader. Therefore the writer callback should trigger when the fastest // reader drains the other input stream.
ASSERT_TRUE(cb->Called());
// Attempt to write data. This will buffer data beyond the pipe size limit in // order for the clone stream to still work. This is allowed because the // other input stream has drained its buffered segments and is ready for more // data.
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_NS_SUCCEEDED(rv);
// Again, this should fail since the origin stream has not been read again. // The pipe size should still restrict how far ahead we can buffer even // when there is a cloned stream not being read.
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_NS_FAILED(rv);
// We should now be able to consume the entire backlog of buffered data on // the cloned stream.
testing::ConsumeAndValidateStream(clone, expectedCloneData);
// Draining the clone side should also trigger the AsyncWait() writer // callback
ASSERT_TRUE(cb->Called());
// Finally, we should be able to consume the remaining data on the original // reader.
testing::ConsumeAndValidateStream(reader, inputData);
}
// This attempts to write data beyond the original pipe size limit. It // should fail since neither side of the clone has been read yet.
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
RefPtr<testing::OutputStreamCallback> cb = new testing::OutputStreamCallback();
// Consume data on the original stream, but the clone still has not been read.
testing::ConsumeAndValidateStream(reader, inputData);
// A clone that is not being read should not stall the other input stream // reader. Therefore the writer callback should trigger when the fastest // reader drains the other input stream.
ASSERT_TRUE(cb->Called());
// Attempt to write data. This will buffer data beyond the pipe size limit in // order for the clone stream to still work. This is allowed because the // other input stream has drained its buffered segments and is ready for more // data.
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_NS_SUCCEEDED(rv);
// Again, this should fail since the origin stream has not been read again. // The pipe size should still restrict how far ahead we can buffer even // when there is a cloned stream not being read.
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_NS_FAILED(rv);
// The write should again be blocked since we have written data and the // main reader is at its maximum advance buffer.
ASSERT_FALSE(cb->Called());
// Close the original reader input stream. This was the fastest reader, // so we should have a single stream that is buffered beyond our nominal // limit.
reader->Close();
// Because the clone stream is still buffered the writable callback should // not be fired.
ASSERT_FALSE(cb->Called());
// And we should not be able to perform a write.
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_NS_FAILED(rv);
// Create another clone stream. Now we have two streams that exceed our // maximum size limit
nsCOMPtr<nsIInputStream> clone2;
rv = NS_CloneInputStream(clone, getter_AddRefs(clone2));
ASSERT_NS_SUCCEEDED(rv);
// We should now be able to consume the entire backlog of buffered data on // the cloned stream.
testing::ConsumeAndValidateStream(clone, expectedCloneData);
// The pipe should now be writable because we have two open streams, one of // which is completely drained.
ASSERT_TRUE(cb->Called());
// Write again to reach our limit again.
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_NS_SUCCEEDED(rv);
// The stream is again non-writeable.
cb = new testing::OutputStreamCallback();
rv = writer->AsyncWait(cb, 0, 0, nullptr);
ASSERT_NS_SUCCEEDED(rv);
ASSERT_FALSE(cb->Called());
// Close the empty stream. This is different from our previous close since // before we were closing a stream with some data still buffered.
clone->Close();
// The pipe should not be writable. The second clone is still fully buffered // over our limit.
ASSERT_FALSE(cb->Called());
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_NS_FAILED(rv);
// Finally consume all of the buffered data on the second clone.
expectedCloneData.AppendElements(inputData);
testing::ConsumeAndValidateStream(clone2, expectedCloneData);
// Draining the final clone should make the pipe writable again.
ASSERT_TRUE(cb->Called());
}
// This is insanity and you probably should not do this under normal // conditions. We want to simulate the case where the pipe is closed // (possibly from other end on another thread) simultaneously with the // read. This is the easiest way to do trigger this case in a synchronous // gtest.
MOZ_ALWAYS_SUCCEEDS(aReader->Close());
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.