/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ /** * The multiplex stream concatenates a list of input streams into a single * stream. */ #include "IPC/IPCMessageUtils.h" #include "mozilla/net/NeckoMessageUtils.h" #include "nsMultiplexInputStream.h" #include "nsIMultiplexInputStream.h" #include "nsISeekableStream.h" #include "nsCOMPtr.h" #include "nsCOMArray.h" #include "nsIIPCSerializable.h" #include "nsIClassInfoImpl.h" class nsMultiplexInputStream : public nsIMultiplexInputStream, public nsISeekableStream, public nsIIPCSerializable { public: nsMultiplexInputStream(); NS_DECL_ISUPPORTS NS_DECL_NSIINPUTSTREAM NS_DECL_NSIMULTIPLEXINPUTSTREAM NS_DECL_NSISEEKABLESTREAM NS_DECL_NSIIPCSERIALIZABLE private: ~nsMultiplexInputStream() {} struct ReadSegmentsState { nsIInputStream* mThisStream; PRUint32 mOffset; nsWriteSegmentFun mWriter; void* mClosure; bool mDone; }; static NS_METHOD ReadSegCb(nsIInputStream* aIn, void* aClosure, const char* aFromRawSegment, PRUint32 aToOffset, PRUint32 aCount, PRUint32 *aWriteCount); nsCOMArray mStreams; PRUint32 mCurrentStream; bool mStartedReadingCurrent; nsresult mStatus; }; NS_IMPL_THREADSAFE_ADDREF(nsMultiplexInputStream) NS_IMPL_THREADSAFE_RELEASE(nsMultiplexInputStream) NS_IMPL_CLASSINFO(nsMultiplexInputStream, NULL, nsIClassInfo::THREADSAFE, NS_MULTIPLEXINPUTSTREAM_CID) NS_IMPL_QUERY_INTERFACE4_CI(nsMultiplexInputStream, nsIMultiplexInputStream, nsIInputStream, nsISeekableStream, nsIIPCSerializable) NS_IMPL_CI_INTERFACE_GETTER4(nsMultiplexInputStream, nsIMultiplexInputStream, nsIInputStream, nsISeekableStream, nsIIPCSerializable) nsMultiplexInputStream::nsMultiplexInputStream() : mCurrentStream(0), mStartedReadingCurrent(false), mStatus(NS_OK) { } /* readonly attribute unsigned long count; */ NS_IMETHODIMP nsMultiplexInputStream::GetCount(PRUint32 *aCount) { *aCount = mStreams.Count(); return NS_OK; } /* void appendStream (in nsIInputStream stream); */ NS_IMETHODIMP nsMultiplexInputStream::AppendStream(nsIInputStream *aStream) { return mStreams.AppendObject(aStream) ? NS_OK : NS_ERROR_OUT_OF_MEMORY; } /* void insertStream (in nsIInputStream stream, in unsigned long index); */ NS_IMETHODIMP nsMultiplexInputStream::InsertStream(nsIInputStream *aStream, PRUint32 aIndex) { bool result = mStreams.InsertObjectAt(aStream, aIndex); NS_ENSURE_TRUE(result, NS_ERROR_OUT_OF_MEMORY); if (mCurrentStream > aIndex || (mCurrentStream == aIndex && mStartedReadingCurrent)) ++mCurrentStream; return NS_OK; } /* void removeStream (in unsigned long index); */ NS_IMETHODIMP nsMultiplexInputStream::RemoveStream(PRUint32 aIndex) { bool result = mStreams.RemoveObjectAt(aIndex); NS_ENSURE_TRUE(result, NS_ERROR_NOT_AVAILABLE); if (mCurrentStream > aIndex) --mCurrentStream; else if (mCurrentStream == aIndex) mStartedReadingCurrent = false; return NS_OK; } /* nsIInputStream getStream (in unsigned long index); */ NS_IMETHODIMP nsMultiplexInputStream::GetStream(PRUint32 aIndex, nsIInputStream **_retval) { *_retval = mStreams.SafeObjectAt(aIndex); NS_ENSURE_TRUE(*_retval, NS_ERROR_NOT_AVAILABLE); NS_ADDREF(*_retval); return NS_OK; } /* void close (); */ NS_IMETHODIMP nsMultiplexInputStream::Close() { mStatus = NS_BASE_STREAM_CLOSED; nsresult rv = NS_OK; PRUint32 len = mStreams.Count(); for (PRUint32 i = 0; i < len; ++i) { nsresult rv2 = mStreams[i]->Close(); // We still want to close all streams, but we should return an error if (NS_FAILED(rv2)) rv = rv2; } return rv; } /* unsigned long available (); */ NS_IMETHODIMP nsMultiplexInputStream::Available(PRUint32 *_retval) { if (NS_FAILED(mStatus)) return mStatus; nsresult rv; PRUint32 avail = 0; PRUint32 len = mStreams.Count(); for (PRUint32 i = mCurrentStream; i < len; i++) { PRUint32 streamAvail; rv = mStreams[i]->Available(&streamAvail); NS_ENSURE_SUCCESS(rv, rv); avail += streamAvail; } *_retval = avail; return NS_OK; } /* [noscript] unsigned long read (in charPtr buf, in unsigned long count); */ NS_IMETHODIMP nsMultiplexInputStream::Read(char * aBuf, PRUint32 aCount, PRUint32 *_retval) { // It is tempting to implement this method in terms of ReadSegments, but // that would prevent this class from being used with streams that only // implement Read (e.g., file streams). *_retval = 0; if (mStatus == NS_BASE_STREAM_CLOSED) return NS_OK; if (NS_FAILED(mStatus)) return mStatus; nsresult rv = NS_OK; PRUint32 len = mStreams.Count(); while (mCurrentStream < len && aCount) { PRUint32 read; rv = mStreams[mCurrentStream]->Read(aBuf, aCount, &read); // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF. // (This is a bug in those stream implementations) if (rv == NS_BASE_STREAM_CLOSED) { NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED"); rv = NS_OK; read = 0; } else if (NS_FAILED(rv)) break; if (read == 0) { ++mCurrentStream; mStartedReadingCurrent = false; } else { NS_ASSERTION(aCount >= read, "Read more than requested"); *_retval += read; aCount -= read; aBuf += read; mStartedReadingCurrent = true; } } return *_retval ? NS_OK : rv; } /* [noscript] unsigned long readSegments (in nsWriteSegmentFun writer, * in voidPtr closure, * in unsigned long count); */ NS_IMETHODIMP nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void *aClosure, PRUint32 aCount, PRUint32 *_retval) { if (mStatus == NS_BASE_STREAM_CLOSED) { *_retval = 0; return NS_OK; } if (NS_FAILED(mStatus)) return mStatus; NS_ASSERTION(aWriter, "missing aWriter"); nsresult rv = NS_OK; ReadSegmentsState state; state.mThisStream = this; state.mOffset = 0; state.mWriter = aWriter; state.mClosure = aClosure; state.mDone = false; PRUint32 len = mStreams.Count(); while (mCurrentStream < len && aCount) { PRUint32 read; rv = mStreams[mCurrentStream]->ReadSegments(ReadSegCb, &state, aCount, &read); // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF. // (This is a bug in those stream implementations) if (rv == NS_BASE_STREAM_CLOSED) { NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED"); rv = NS_OK; read = 0; } // if |aWriter| decided to stop reading segments... if (state.mDone || NS_FAILED(rv)) break; // if stream is empty, then advance to the next stream. if (read == 0) { ++mCurrentStream; mStartedReadingCurrent = false; } else { NS_ASSERTION(aCount >= read, "Read more than requested"); state.mOffset += read; aCount -= read; mStartedReadingCurrent = true; } } // if we successfully read some data, then this call succeeded. *_retval = state.mOffset; return state.mOffset ? NS_OK : rv; } NS_METHOD nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure, const char* aFromRawSegment, PRUint32 aToOffset, PRUint32 aCount, PRUint32 *aWriteCount) { nsresult rv; ReadSegmentsState* state = (ReadSegmentsState*)aClosure; rv = (state->mWriter)(state->mThisStream, state->mClosure, aFromRawSegment, aToOffset + state->mOffset, aCount, aWriteCount); if (NS_FAILED(rv)) state->mDone = true; return rv; } /* readonly attribute boolean nonBlocking; */ NS_IMETHODIMP nsMultiplexInputStream::IsNonBlocking(bool *aNonBlocking) { PRUint32 len = mStreams.Count(); if (len == 0) { // Claim to be non-blocking, since we won't block the caller. // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK, // so maybe we should claim to be blocking? It probably doesn't // matter in practice. *aNonBlocking = true; return NS_OK; } for (PRUint32 i = 0; i < len; ++i) { nsresult rv = mStreams[i]->IsNonBlocking(aNonBlocking); NS_ENSURE_SUCCESS(rv, rv); // If one is non-blocking the entire stream becomes non-blocking // (except that we don't implement nsIAsyncInputStream, so there's // not much for the caller to do if Read returns "would block") if (*aNonBlocking) return NS_OK; } return NS_OK; } /* void seek (in PRInt32 whence, in PRInt32 offset); */ NS_IMETHODIMP nsMultiplexInputStream::Seek(PRInt32 aWhence, PRInt64 aOffset) { if (NS_FAILED(mStatus)) return mStatus; nsresult rv; // rewinding to start is easy, and should be the most common case if (aWhence == NS_SEEK_SET && aOffset == 0) { PRUint32 i, last; last = mStartedReadingCurrent ? mCurrentStream+1 : mCurrentStream; for (i = 0; i < last; ++i) { nsCOMPtr stream = do_QueryInterface(mStreams[i]); NS_ENSURE_TRUE(stream, NS_ERROR_NO_INTERFACE); rv = stream->Seek(NS_SEEK_SET, 0); NS_ENSURE_SUCCESS(rv, rv); } mCurrentStream = 0; mStartedReadingCurrent = false; return NS_OK; } // other Seeks not implemented yet return NS_ERROR_NOT_IMPLEMENTED; } /* PRUint32 tell (); */ NS_IMETHODIMP nsMultiplexInputStream::Tell(PRInt64 *_retval) { if (NS_FAILED(mStatus)) return mStatus; nsresult rv; PRInt64 ret64 = 0; PRUint32 i, last; last = mStartedReadingCurrent ? mCurrentStream+1 : mCurrentStream; for (i = 0; i < last; ++i) { nsCOMPtr stream = do_QueryInterface(mStreams[i]); NS_ENSURE_TRUE(stream, NS_ERROR_NO_INTERFACE); PRInt64 pos; rv = stream->Tell(&pos); NS_ENSURE_SUCCESS(rv, rv); ret64 += pos; } *_retval = ret64; return NS_OK; } /* void setEOF (); */ NS_IMETHODIMP nsMultiplexInputStream::SetEOF() { return NS_ERROR_NOT_IMPLEMENTED; } nsresult nsMultiplexInputStreamConstructor(nsISupports *outer, REFNSIID iid, void **result) { *result = nsnull; if (outer) return NS_ERROR_NO_AGGREGATION; nsMultiplexInputStream *inst = new nsMultiplexInputStream(); if (!inst) return NS_ERROR_OUT_OF_MEMORY; NS_ADDREF(inst); nsresult rv = inst->QueryInterface(iid, result); NS_RELEASE(inst); return rv; } bool nsMultiplexInputStream::Read(const IPC::Message *aMsg, void **aIter) { using IPC::ReadParam; PRUint32 count; if (!ReadParam(aMsg, aIter, &count)) return false; for (PRUint32 i = 0; i < count; i++) { IPC::InputStream inputStream; if (!ReadParam(aMsg, aIter, &inputStream)) return false; nsCOMPtr stream(inputStream); nsresult rv = AppendStream(stream); if (NS_FAILED(rv)) return false; } if (!ReadParam(aMsg, aIter, &mCurrentStream) || !ReadParam(aMsg, aIter, &mStartedReadingCurrent) || !ReadParam(aMsg, aIter, &mStatus)) return false; return true; } void nsMultiplexInputStream::Write(IPC::Message *aMsg) { using IPC::WriteParam; PRUint32 count = mStreams.Count(); WriteParam(aMsg, count); for (PRUint32 i = 0; i < count; i++) { IPC::InputStream inputStream(mStreams.ObjectAt(i)); WriteParam(aMsg, inputStream); } WriteParam(aMsg, mCurrentStream); WriteParam(aMsg, mStartedReadingCurrent); WriteParam(aMsg, mStatus); }