Backed out 2 changesets (bug 1133939) because of bug 1136453

Backed out changeset 212080d51fb7 (bug 1133939)
Backed out changeset 27de4b553912 (bug 1133939)
This commit is contained in:
Ehsan Akhgari 2015-02-25 13:26:41 -05:00
parent a1e2293a3f
commit 18eb496f5f
4 changed files with 73 additions and 298 deletions

View File

@ -54,22 +54,6 @@ class nsPipeEvents;
class nsPipeInputStream;
class nsPipeOutputStream;
namespace {
enum MonitorAction
{
DoNotNotifyMonitor,
NotifyMonitor
};
enum SegmentChangeResult
{
SegmentNotChanged,
SegmentDeleted
};
} // anonymous namespace
//-----------------------------------------------------------------------------
// this class is used to delay notifications until the end of a particular
@ -176,8 +160,10 @@ public:
// synchronously wait for the pipe to become readable.
nsresult Wait();
MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&);
MonitorAction OnInputException(nsresult, nsPipeEvents&);
// these functions return true to indicate that the pipe's monitor should
// be notified, to wake up a blocked reader if any.
bool OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&);
bool OnInputException(nsresult, nsPipeEvents&);
nsPipeReadState& ReadState()
{
@ -253,8 +239,10 @@ public:
// synchronously wait for the pipe to become writable.
nsresult Wait();
MonitorAction OnOutputWritable(nsPipeEvents&);
MonitorAction OnOutputException(nsresult, nsPipeEvents&);
// these functions return true to indicate that the pipe's monitor should
// be notified, to wake up a blocked writer if any.
bool OnOutputWritable(nsPipeEvents&);
bool OnOutputException(nsresult, nsPipeEvents&);
private:
nsPipe* mPipe;
@ -304,10 +292,6 @@ public:
const char*& aSegment, uint32_t& aSegmentLen);
void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount,
uint32_t* aAvailableOut);
SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState);
void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents,
uint32_t* aAvailableOut);
bool ReadSegmentBeingWritten(nsPipeReadState& aReadState);
nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
void AdvanceWriteCursor(uint32_t aCount);
@ -556,110 +540,60 @@ nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead,
// if still writing in this segment then bail because we're not done
// with the segment and have to wait for now...
if (ReadSegmentBeingWritten(aReadState)) {
if (mWriteSegment == aReadState.mSegment && mWriteLimit > mWriteCursor) {
NS_ASSERTION(aReadState.mReadLimit == mWriteCursor, "unexpected state");
return;
}
// Check to see if we can free up any segments. If we can, then notify
// the output stream that the pipe has room for a new segment.
if (AdvanceReadSegment(aReadState) == SegmentDeleted &&
mOutput.OnOutputWritable(events) == NotifyMonitor) {
mon.NotifyAll();
uint32_t currentSegment = aReadState.mSegment;
// Move to the next segment to read
aReadState.mSegment += 1;
// If this was the last reference to the first segment, then remove it.
if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
// shift write and read segment index (-1 indicates an empty buffer).
mWriteSegment -= 1;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
mInputList[i]->ReadState().mSegment -= 1;
}
// done with this segment
mBuffer.DeleteFirstSegment();
LOG(("III deleting first segment\n"));
}
if (mWriteSegment < aReadState.mSegment) {
// read cursor has hit the end of written data, so reset it
MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
aReadState.mReadCursor = nullptr;
aReadState.mReadLimit = nullptr;
// also, the buffer is completely empty, so reset the write cursor
if (mWriteSegment == -1) {
mWriteCursor = nullptr;
mWriteLimit = nullptr;
}
} else {
// advance read cursor and limit to next buffer segment
aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
if (mWriteSegment == aReadState.mSegment) {
aReadState.mReadLimit = mWriteCursor;
} else {
aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
}
}
// we've free'd up a segment, so notify output stream that pipe has
// room for a new segment.
if (mOutput.OnOutputWritable(events)) {
mon.Notify();
}
}
}
}
SegmentChangeResult
nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState)
{
int32_t currentSegment = aReadState.mSegment;
// Move to the next segment to read
aReadState.mSegment += 1;
SegmentChangeResult result = SegmentNotChanged;
// If this was the last reference to the first segment, then remove it.
if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
// shift write and read segment index (-1 indicates an empty buffer).
mWriteSegment -= 1;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
mInputList[i]->ReadState().mSegment -= 1;
}
// done with this segment
mBuffer.DeleteFirstSegment();
LOG(("III deleting first segment\n"));
result = SegmentDeleted;
}
if (mWriteSegment < aReadState.mSegment) {
// read cursor has hit the end of written data, so reset it
MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
aReadState.mReadCursor = nullptr;
aReadState.mReadLimit = nullptr;
// also, the buffer is completely empty, so reset the write cursor
if (mWriteSegment == -1) {
mWriteCursor = nullptr;
mWriteLimit = nullptr;
}
} else {
// advance read cursor and limit to next buffer segment
aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
if (mWriteSegment == aReadState.mSegment) {
aReadState.mReadLimit = mWriteCursor;
} else {
aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
}
}
return result;
}
void
nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents,
uint32_t* aAvailableOut)
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
*aAvailableOut = 0;
SegmentChangeResult result = SegmentNotChanged;
while(mWriteSegment >= aReadState.mSegment) {
// If the last segment to free is still being written to, we're done
// draining. We can't free any more.
if (ReadSegmentBeingWritten(aReadState)) {
break;
}
if (AdvanceReadSegment(aReadState) == SegmentDeleted) {
result = SegmentDeleted;
}
}
// if we've free'd up a segment, notify output stream that pipe has
// room for a new segment.
if (result == SegmentDeleted &&
mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
mon.NotifyAll();
}
}
bool
nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState)
{
bool beingWritten = mWriteSegment == aReadState.mSegment &&
mWriteLimit > mWriteCursor;
NS_ASSERTION(!beingWritten || aReadState.mReadLimit == mWriteCursor,
"unexpected state");
return beingWritten;
}
nsresult
nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen)
{
@ -730,7 +664,7 @@ nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten)
// notify input stream that pipe now contains additional data
bool needNotify = false;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
if (mInputList[i]->OnInputReadable(aBytesWritten, events) == NotifyMonitor) {
if (mInputList[i]->OnInputReadable(aBytesWritten, events)) {
needNotify = true;
}
}
@ -771,12 +705,12 @@ nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason)
continue;
}
MonitorAction action = mInputList[i]->OnInputException(aReason, events);
bool needNotify = mInputList[i]->OnInputException(aReason, events);
mInputList.RemoveElementAt(i);
// Notify after element is removed in case we re-enter as a result.
if (action == NotifyMonitor) {
mon.NotifyAll();
if (needNotify) {
mon.Notify();
}
return;
@ -812,13 +746,13 @@ nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly)
continue;
}
if (mInputList[i]->OnInputException(aReason, events) == NotifyMonitor) {
if (mInputList[i]->OnInputException(aReason, events)) {
needNotify = true;
}
}
mInputList = tmpInputList;
if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
if (mOutput.OnOutputException(aReason, events)) {
needNotify = true;
}
@ -1007,10 +941,10 @@ nsPipeInputStream::Wait()
return Status() == NS_BASE_STREAM_CLOSED ? NS_OK : Status();
}
MonitorAction
bool
nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents)
{
MonitorAction result = DoNotNotifyMonitor;
bool result = false;
mAvailable += aBytesWritten;
@ -1019,19 +953,19 @@ nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents
mCallback = 0;
mCallbackFlags = 0;
} else if (mBlocked) {
result = NotifyMonitor;
result = true;
}
return result;
}
MonitorAction
bool
nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents)
{
LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n",
this, aReason));
MonitorAction result = DoNotNotifyMonitor;
bool result = false;
NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
@ -1040,14 +974,14 @@ nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents)
}
// force count of available bytes to zero.
mPipe->DrainInputStream(mReadState, aEvents, &mAvailable);
mAvailable = 0;
if (mCallback) {
aEvents.NotifyInputReady(this, mCallback);
mCallback = 0;
mCallbackFlags = 0;
} else if (mBlocked) {
result = NotifyMonitor;
result = true;
}
return result;
@ -1388,10 +1322,10 @@ nsPipeOutputStream::Wait()
return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
}
MonitorAction
bool
nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents)
{
MonitorAction result = DoNotNotifyMonitor;
bool result = false;
mWritable = true;
@ -1400,19 +1334,19 @@ nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents)
mCallback = 0;
mCallbackFlags = 0;
} else if (mBlocked) {
result = NotifyMonitor;
result = true;
}
return result;
}
MonitorAction
bool
nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents)
{
LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n",
this, aReason));
MonitorAction result = DoNotNotifyMonitor;
bool result = false;
NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
mWritable = false;
@ -1422,7 +1356,7 @@ nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents)
mCallback = 0;
mCallbackFlags = 0;
} else if (mBlocked) {
result = NotifyMonitor;
result = true;
}
return result;

View File

@ -94,22 +94,4 @@ ConsumeAndValidateStream(nsIInputStream* aStream,
ASSERT_TRUE(aExpectedData.Equals(outputData));
}
NS_IMPL_ISUPPORTS(OutputStreamCallback, nsIOutputStreamCallback);
OutputStreamCallback::OutputStreamCallback()
: mCalled(false)
{
}
OutputStreamCallback::~OutputStreamCallback()
{
}
NS_IMETHODIMP
OutputStreamCallback::OnOutputStreamReady(nsIAsyncOutputStream* aStream)
{
mCalled = true;
return NS_OK;
}
} // namespace testing

View File

@ -7,7 +7,6 @@
#ifndef __Helpers_h
#define __Helpers_h
#include "nsIAsyncOutputStream.h"
#include "nsString.h"
#include <stdint.h>
@ -35,22 +34,6 @@ void
ConsumeAndValidateStream(nsIInputStream* aStream,
const nsACString& aExpectedData);
class OutputStreamCallback MOZ_FINAL : public nsIOutputStreamCallback
{
public:
OutputStreamCallback();
bool Called() const { return mCalled; }
private:
~OutputStreamCallback();
bool mCalled;
public:
NS_DECL_ISUPPORTS
NS_DECL_NSIOUTPUTSTREAMCALLBACK
};
} // namespace testing
#endif // __Helpers_h

View File

@ -660,127 +660,3 @@ TEST(Pipes, Clone_DuringWrite_ReadDuringWrite_CloseDuringWrite)
2, // num clones to add after each write
3); // num streams to read after each write
}
TEST(Pipes, Write_AsyncWait)
{
nsCOMPtr<nsIAsyncInputStream> reader;
nsCOMPtr<nsIAsyncOutputStream> writer;
const uint32_t segmentSize = 1024;
const uint32_t numSegments = 1;
nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
true, true, // non-blocking - reader, writer
segmentSize, numSegments);
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsTArray<char> inputData;
testing::CreateData(segmentSize, inputData);
uint32_t numWritten = 0;
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
nsRefPtr<testing::OutputStreamCallback> cb =
new testing::OutputStreamCallback();
rv = writer->AsyncWait(cb, 0, 0, nullptr);
ASSERT_TRUE(NS_SUCCEEDED(rv));
ASSERT_FALSE(cb->Called());
testing::ConsumeAndValidateStream(reader, inputData);
ASSERT_TRUE(cb->Called());
}
TEST(Pipes, Write_AsyncWait_Clone)
{
nsCOMPtr<nsIAsyncInputStream> reader;
nsCOMPtr<nsIAsyncOutputStream> writer;
const uint32_t segmentSize = 1024;
const uint32_t numSegments = 1;
nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
true, true, // non-blocking - reader, writer
segmentSize, numSegments);
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCOMPtr<nsIInputStream> clone;
rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsTArray<char> inputData;
testing::CreateData(segmentSize, inputData);
uint32_t numWritten = 0;
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
nsRefPtr<testing::OutputStreamCallback> cb =
new testing::OutputStreamCallback();
rv = writer->AsyncWait(cb, 0, 0, nullptr);
ASSERT_TRUE(NS_SUCCEEDED(rv));
ASSERT_FALSE(cb->Called());
testing::ConsumeAndValidateStream(reader, inputData);
ASSERT_FALSE(cb->Called());
testing::ConsumeAndValidateStream(clone, inputData);
ASSERT_TRUE(cb->Called());
}
TEST(Pipes, Write_AsyncWait_Clone_CloseOriginal)
{
nsCOMPtr<nsIAsyncInputStream> reader;
nsCOMPtr<nsIAsyncOutputStream> writer;
const uint32_t segmentSize = 1024;
const uint32_t numSegments = 1;
nsresult rv = NS_NewPipe2(getter_AddRefs(reader), getter_AddRefs(writer),
true, true, // non-blocking - reader, writer
segmentSize, numSegments);
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsCOMPtr<nsIInputStream> clone;
rv = NS_CloneInputStream(reader, getter_AddRefs(clone));
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsTArray<char> inputData;
testing::CreateData(segmentSize, inputData);
uint32_t numWritten = 0;
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = writer->Write(inputData.Elements(), inputData.Length(), &numWritten);
ASSERT_EQ(NS_BASE_STREAM_WOULD_BLOCK, rv);
nsRefPtr<testing::OutputStreamCallback> cb =
new testing::OutputStreamCallback();
rv = writer->AsyncWait(cb, 0, 0, nullptr);
ASSERT_TRUE(NS_SUCCEEDED(rv));
ASSERT_FALSE(cb->Called());
testing::ConsumeAndValidateStream(clone, inputData);
ASSERT_FALSE(cb->Called());
reader->Close();
ASSERT_TRUE(cb->Called());
}