diff --git a/xpcom/io/nsPipe3.cpp b/xpcom/io/nsPipe3.cpp index 4de5458a5a9..4ba97158e2e 100644 --- a/xpcom/io/nsPipe3.cpp +++ b/xpcom/io/nsPipe3.cpp @@ -54,22 +54,6 @@ class nsPipeEvents; class nsPipeInputStream; class nsPipeOutputStream; -namespace { - -enum MonitorAction -{ - DoNotNotifyMonitor, - NotifyMonitor -}; - -enum SegmentChangeResult -{ - SegmentNotChanged, - SegmentDeleted -}; - -} // anonymous namespace - //----------------------------------------------------------------------------- // this class is used to delay notifications until the end of a particular @@ -176,8 +160,10 @@ public: // synchronously wait for the pipe to become readable. nsresult Wait(); - MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&); - MonitorAction OnInputException(nsresult, nsPipeEvents&); + // these functions return true to indicate that the pipe's monitor should + // be notified, to wake up a blocked reader if any. + bool OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&); + bool OnInputException(nsresult, nsPipeEvents&); nsPipeReadState& ReadState() { @@ -253,8 +239,10 @@ public: // synchronously wait for the pipe to become writable. nsresult Wait(); - MonitorAction OnOutputWritable(nsPipeEvents&); - MonitorAction OnOutputException(nsresult, nsPipeEvents&); + // these functions return true to indicate that the pipe's monitor should + // be notified, to wake up a blocked writer if any. + bool OnOutputWritable(nsPipeEvents&); + bool OnOutputException(nsresult, nsPipeEvents&); private: nsPipe* mPipe; @@ -304,9 +292,6 @@ public: const char*& aSegment, uint32_t& aSegmentLen); void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount, uint32_t* aAvailableOut); - SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState); - void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents, - uint32_t* aAvailableOut); nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen); void AdvanceWriteCursor(uint32_t aCount); @@ -560,89 +545,55 @@ nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead, return; } - // Check to see if we can free up any segments. If we can, then notify - // the output stream that the pipe has room for a new segment. - if (AdvanceReadSegment(aReadState) == SegmentDeleted && - mOutput.OnOutputWritable(events) == NotifyMonitor) { - mon.NotifyAll(); + uint32_t currentSegment = aReadState.mSegment; + + // Move to the next segment to read + aReadState.mSegment += 1; + + // If this was the last reference to the first segment, then remove it. + if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) { + + // shift write and read segment index (-1 indicates an empty buffer). + mWriteSegment -= 1; + + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + mInputList[i]->ReadState().mSegment -= 1; + } + + // done with this segment + mBuffer.DeleteFirstSegment(); + LOG(("III deleting first segment\n")); + } + + if (mWriteSegment < aReadState.mSegment) { + // read cursor has hit the end of written data, so reset it + MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1)); + aReadState.mReadCursor = nullptr; + aReadState.mReadLimit = nullptr; + // also, the buffer is completely empty, so reset the write cursor + if (mWriteSegment == -1) { + mWriteCursor = nullptr; + mWriteLimit = nullptr; + } + } else { + // advance read cursor and limit to next buffer segment + aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment); + if (mWriteSegment == aReadState.mSegment) { + aReadState.mReadLimit = mWriteCursor; + } else { + aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize(); + } + } + + // we've free'd up a segment, so notify output stream that pipe has + // room for a new segment. + if (mOutput.OnOutputWritable(events)) { + mon.Notify(); } } } } -SegmentChangeResult -nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState) -{ - uint32_t currentSegment = aReadState.mSegment; - - // Move to the next segment to read - aReadState.mSegment += 1; - - SegmentChangeResult result = SegmentNotChanged; - - // If this was the last reference to the first segment, then remove it. - if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) { - - // shift write and read segment index (-1 indicates an empty buffer). - mWriteSegment -= 1; - - for (uint32_t i = 0; i < mInputList.Length(); ++i) { - mInputList[i]->ReadState().mSegment -= 1; - } - - // done with this segment - mBuffer.DeleteFirstSegment(); - LOG(("III deleting first segment\n")); - - result = SegmentDeleted; - } - - if (mWriteSegment < aReadState.mSegment) { - // read cursor has hit the end of written data, so reset it - MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1)); - aReadState.mReadCursor = nullptr; - aReadState.mReadLimit = nullptr; - // also, the buffer is completely empty, so reset the write cursor - if (mWriteSegment == -1) { - mWriteCursor = nullptr; - mWriteLimit = nullptr; - } - } else { - // advance read cursor and limit to next buffer segment - aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment); - if (mWriteSegment == aReadState.mSegment) { - aReadState.mReadLimit = mWriteCursor; - } else { - aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize(); - } - } - - return result; -} - -void -nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents, - uint32_t* aAvailableOut) -{ - ReentrantMonitorAutoEnter mon(mReentrantMonitor); - - *aAvailableOut = 0; - - SegmentChangeResult result = SegmentNotChanged; - while(mWriteSegment >= aReadState.mSegment) { - if (AdvanceReadSegment(aReadState) == SegmentDeleted) { - result = SegmentDeleted; - } - } - - // if we've free'd up a segment, notify output stream that pipe has - // room for a new segment. - if (result == SegmentDeleted && - mOutput.OnOutputWritable(aEvents) == NotifyMonitor) { - mon.NotifyAll(); - } -} - nsresult nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen) { @@ -713,7 +664,7 @@ nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten) // notify input stream that pipe now contains additional data bool needNotify = false; for (uint32_t i = 0; i < mInputList.Length(); ++i) { - if (mInputList[i]->OnInputReadable(aBytesWritten, events) == NotifyMonitor) { + if (mInputList[i]->OnInputReadable(aBytesWritten, events)) { needNotify = true; } } @@ -754,12 +705,12 @@ nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason) continue; } - MonitorAction action = mInputList[i]->OnInputException(aReason, events); + bool needNotify = mInputList[i]->OnInputException(aReason, events); mInputList.RemoveElementAt(i); // Notify after element is removed in case we re-enter as a result. - if (action == NotifyMonitor) { - mon.NotifyAll(); + if (needNotify) { + mon.Notify(); } return; @@ -795,13 +746,13 @@ nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) continue; } - if (mInputList[i]->OnInputException(aReason, events) == NotifyMonitor) { + if (mInputList[i]->OnInputException(aReason, events)) { needNotify = true; } } mInputList = tmpInputList; - if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) { + if (mOutput.OnOutputException(aReason, events)) { needNotify = true; } @@ -990,10 +941,10 @@ nsPipeInputStream::Wait() return Status() == NS_BASE_STREAM_CLOSED ? NS_OK : Status(); } -MonitorAction +bool nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents) { - MonitorAction result = DoNotNotifyMonitor; + bool result = false; mAvailable += aBytesWritten; @@ -1002,19 +953,19 @@ nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents mCallback = 0; mCallbackFlags = 0; } else if (mBlocked) { - result = NotifyMonitor; + result = true; } return result; } -MonitorAction +bool nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents) { LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n", this, aReason)); - MonitorAction result = DoNotNotifyMonitor; + bool result = false; NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception"); @@ -1023,14 +974,14 @@ nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents) } // force count of available bytes to zero. - mPipe->DrainInputStream(mReadState, aEvents, &mAvailable); + mAvailable = 0; if (mCallback) { aEvents.NotifyInputReady(this, mCallback); mCallback = 0; mCallbackFlags = 0; } else if (mBlocked) { - result = NotifyMonitor; + result = true; } return result; @@ -1371,10 +1322,10 @@ nsPipeOutputStream::Wait() return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; } -MonitorAction +bool nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) { - MonitorAction result = DoNotNotifyMonitor; + bool result = false; mWritable = true; @@ -1383,19 +1334,19 @@ nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) mCallback = 0; mCallbackFlags = 0; } else if (mBlocked) { - result = NotifyMonitor; + result = true; } return result; } -MonitorAction +bool nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents) { LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n", this, aReason)); - MonitorAction result = DoNotNotifyMonitor; + bool result = false; NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception"); mWritable = false; @@ -1405,7 +1356,7 @@ nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents) mCallback = 0; mCallbackFlags = 0; } else if (mBlocked) { - result = NotifyMonitor; + result = true; } return result; diff --git a/xpcom/tests/gtest/Helpers.cpp b/xpcom/tests/gtest/Helpers.cpp index 8a98a5aa096..faa019b6686 100644 --- a/xpcom/tests/gtest/Helpers.cpp +++ b/xpcom/tests/gtest/Helpers.cpp @@ -94,22 +94,4 @@ ConsumeAndValidateStream(nsIInputStream* aStream, ASSERT_TRUE(aExpectedData.Equals(outputData)); } -NS_IMPL_ISUPPORTS(OutputStreamCallback, nsIOutputStreamCallback); - -OutputStreamCallback::OutputStreamCallback() - : mCalled(false) -{ -} - -OutputStreamCallback::~OutputStreamCallback() -{ -} - -NS_IMETHODIMP -OutputStreamCallback::OnOutputStreamReady(nsIAsyncOutputStream* aStream) -{ - mCalled = true; - return NS_OK; -} - } // namespace testing diff --git a/xpcom/tests/gtest/Helpers.h b/xpcom/tests/gtest/Helpers.h index 1a52370007d..0e501ae63e9 100644 --- a/xpcom/tests/gtest/Helpers.h +++ b/xpcom/tests/gtest/Helpers.h @@ -7,7 +7,6 @@ #ifndef __Helpers_h #define __Helpers_h -#include "nsIAsyncOutputStream.h" #include "nsString.h" #include @@ -35,22 +34,6 @@ void ConsumeAndValidateStream(nsIInputStream* aStream, const nsACString& aExpectedData); -class OutputStreamCallback MOZ_FINAL : public nsIOutputStreamCallback -{ -public: - OutputStreamCallback(); - - bool Called() const { return mCalled; } - -private: - ~OutputStreamCallback(); - - bool mCalled; -public: - NS_DECL_ISUPPORTS - NS_DECL_NSIOUTPUTSTREAMCALLBACK -}; - } // namespace testing #endif // __Helpers_h diff --git a/xpcom/tests/gtest/TestPipes.cpp b/xpcom/tests/gtest/TestPipes.cpp index de0942c9055..7120a0b5c24 100644 --- a/xpcom/tests/gtest/TestPipes.cpp +++ b/xpcom/tests/gtest/TestPipes.cpp @@ -660,127 +660,3 @@ TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite) 2, // num clones to add after each write 3); // num streams to read after each write } - -TEST(Pipes, Write_AsyncWait) -{ - nsCOMPtr reader; - nsCOMPtr writer; - - const uint32_t segmentSize = 1024; - const uint32_t numSegments = 1; - - nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), - true, true, // non-blocking - reader, writer - segmentSize, numSegments); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - nsTArray inputData; - testing::CreateData(segmentSize, inputData); - - uint32_t numWritten = 0; - rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); - ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); - - nsRefPtr cb = - new testing::OutputStreamCallback(); - - rv = writer->AsyncWait(cb, 0, 0, nullptr); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - ASSERT_FALSE(cb->Called()); - - testing::ConsumeAndValidateStream(reader, inputData); - - ASSERT_TRUE(cb->Called()); -} - -TEST(Pipes, Write_AsyncWait_Clone) -{ - nsCOMPtr reader; - nsCOMPtr writer; - - const uint32_t segmentSize = 1024; - const uint32_t numSegments = 1; - - nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), - true, true, // non-blocking - reader, writer - segmentSize, numSegments); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - nsCOMPtr clone; - rv = NS_CloneInputStream(reader, getter_AddRefs(clone)); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - nsTArray inputData; - testing::CreateData(segmentSize, inputData); - - uint32_t numWritten = 0; - rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); - ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); - - nsRefPtr cb = - new testing::OutputStreamCallback(); - - rv = writer->AsyncWait(cb, 0, 0, nullptr); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - ASSERT_FALSE(cb->Called()); - - testing::ConsumeAndValidateStream(reader, inputData); - - ASSERT_FALSE(cb->Called()); - - testing::ConsumeAndValidateStream(clone, inputData); - - ASSERT_TRUE(cb->Called()); -} - -TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal) -{ - nsCOMPtr reader; - nsCOMPtr writer; - - const uint32_t segmentSize = 1024; - const uint32_t numSegments = 1; - - nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer), - true, true, // non-blocking - reader, writer - segmentSize, numSegments); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - nsCOMPtr clone; - rv = NS_CloneInputStream(reader, getter_AddRefs(clone)); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - nsTArray inputData; - testing::CreateData(segmentSize, inputData); - - uint32_t numWritten = 0; - rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten); - ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv); - - nsRefPtr cb = - new testing::OutputStreamCallback(); - - rv = writer->AsyncWait(cb, 0, 0, nullptr); - ASSERT_TRUE(NS_SUCCEEDED(rv)); - - ASSERT_FALSE(cb->Called()); - - testing::ConsumeAndValidateStream(clone, inputData); - - ASSERT_FALSE(cb->Called()); - - reader->Close(); - - ASSERT_TRUE(cb->Called()); -}