Bug 830172 - Use a shared nsIThreadPool instead of one MFWorkQueue per WMFByteStream instance. r=padenot

This commit is contained in:
Chris Pearce 2013-02-05 10:33:52 +13:00
parent 84b08b7baf
commit 7ca4ca34ee
4 changed files with 186 additions and 148 deletions

View File

@ -48,14 +48,6 @@ HRESULT MFStartup();
HRESULT MFShutdown();
HRESULT MFPutWorkItem(DWORD aWorkQueueId,
IMFAsyncCallback *aCallback,
IUnknown *aState);
HRESULT MFAllocateWorkQueue(DWORD *aOutWorkQueueId);
HRESULT MFUnlockWorkQueue(DWORD aWorkQueueId);
HRESULT MFCreateAsyncResult(IUnknown *aUunkObject,
IMFAsyncCallback *aCallback,
IUnknown *aUnkState,

View File

@ -14,6 +14,8 @@
#include "MediaResource.h"
#include "nsISeekableStream.h"
#include "mozilla/RefPtr.h"
#include "nsIThreadPool.h"
#include "nsXPCOMCIDInternal.h"
namespace mozilla {
@ -34,11 +36,68 @@ DoGetInterface(IUnknown* aUnknown, void** aInterface)
return S_OK;
}
// Thread pool listener which ensures that MSCOM is initialized and
// deinitialized on the thread pool thread. We can call back into WMF
// on this thread, so we need MSCOM working.
class ThreadPoolListener MOZ_FINAL : public nsIThreadPoolListener {
public:
NS_DECL_ISUPPORTS
NS_DECL_NSITHREADPOOLLISTENER
};
NS_IMPL_THREADSAFE_ISUPPORTS1(ThreadPoolListener, nsIThreadPoolListener)
NS_IMETHODIMP
ThreadPoolListener::OnThreadCreated()
{
HRESULT hr = CoInitializeEx(0, COINIT_MULTITHREADED);
if (FAILED(hr)) {
NS_WARNING("Failed to initialize MSCOM on WMFByteStream thread.");
}
return NS_OK;
}
NS_IMETHODIMP
ThreadPoolListener::OnThreadShuttingDown()
{
CoUninitialize();
return NS_OK;
}
// Thread pool on which read requests are processed.
// This is created and destroyed on the main thread only.
static nsIThreadPool* sThreadPool = nullptr;
// Counter of the number of WMFByteStreams that are instantiated and that need
// the thread pool. This is read/write on the main thread only.
static int32_t sThreadPoolRefCnt = 0;
class ReleaseThreadPoolEvent MOZ_FINAL : public nsRunnable {
public:
NS_IMETHOD Run() {
NS_ASSERTION(NS_IsMainThread(), "Must be on main thread.");
NS_ASSERTION(sThreadPoolRefCnt > 0, "sThreadPoolRefCnt Should be non-negative");
sThreadPoolRefCnt--;
if (sThreadPoolRefCnt == 0) {
NS_ASSERTION(sThreadPool != nullptr, "Should have thread pool ref if sThreadPoolRefCnt==0.");
// Note: store ref to thread pool, then clear global ref, then
// Shutdown() using the stored ref. Events can run during the Shutdown()
// call, so if we release after calling Shutdown(), another event may
// have incremented the refcnt in the meantime, and have a dangling
// pointer to the now destroyed threadpool!
nsCOMPtr<nsIThreadPool> pool = sThreadPool;
NS_IF_RELEASE(sThreadPool);
pool->Shutdown();
}
return NS_OK;
}
};
WMFByteStream::WMFByteStream(MediaResource* aResource)
: mWorkQueueId(MFASYNC_CALLBACK_QUEUE_UNDEFINED),
mResource(aResource),
: mResource(aResource),
mReentrantMonitor("WMFByteStream"),
mOffset(0)
mOffset(0),
mIsShutdown(false)
{
NS_ASSERTION(NS_IsMainThread(), "Must be on main thread.");
NS_ASSERTION(mResource, "Must have a valid media resource");
@ -55,18 +114,38 @@ WMFByteStream::WMFByteStream(MediaResource* aResource)
WMFByteStream::~WMFByteStream()
{
MOZ_COUNT_DTOR(WMFByteStream);
// The WMFByteStream can be deleted from a WMF work queue thread, so we
// dispatch an event to the main thread to deref the thread pool.
nsCOMPtr<nsIRunnable> event = new ReleaseThreadPoolEvent();
NS_DispatchToMainThread(event, NS_DISPATCH_NORMAL);
}
nsresult
WMFByteStream::Init()
{
NS_ASSERTION(NS_IsMainThread(), "Must be on main thread.");
// Work queue is not yet initialized, try to create.
HRESULT hr = wmf::MFAllocateWorkQueue(&mWorkQueueId);
if (FAILED(hr)) {
NS_WARNING("WMFByteStream Failed to allocate work queue.");
return NS_ERROR_FAILURE;
if (!sThreadPool) {
nsresult rv;
nsCOMPtr<nsIThreadPool> pool = do_CreateInstance(NS_THREADPOOL_CONTRACTID, &rv);
NS_ENSURE_SUCCESS(rv, rv);
sThreadPool = pool;
NS_ADDREF(sThreadPool);
rv = sThreadPool->SetName(NS_LITERAL_CSTRING("WMFByteStream Async Read Pool"));
NS_ENSURE_SUCCESS(rv, rv);
nsCOMPtr<nsIThreadPoolListener> listener = new ThreadPoolListener();
rv = sThreadPool->SetListener(listener);
NS_ENSURE_SUCCESS(rv, rv);
}
sThreadPoolRefCnt++;
// Store a ref to the thread pool, so that we keep the pool alive as long as
// we're alive.
mThreadPool = sThreadPool;
return NS_OK;
}
@ -74,13 +153,9 @@ nsresult
WMFByteStream::Shutdown()
{
NS_ASSERTION(NS_IsMainThread(), "Must be on main thread.");
if (mWorkQueueId != MFASYNC_CALLBACK_QUEUE_UNDEFINED) {
HRESULT hr = wmf::MFUnlockWorkQueue(mWorkQueueId);
if (FAILED(hr)) {
NS_WARNING("WMFByteStream Failed to unlock work queue.");
LOG("WMFByteStream unlock work queue hr=%x %d\n", hr, hr);
return NS_ERROR_FAILURE;
}
{
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
mIsShutdown = true;
}
return NS_OK;
}
@ -94,9 +169,6 @@ WMFByteStream::QueryInterface(REFIID aIId, void **aInterface)
if (aIId == IID_IMFByteStream) {
return DoGetInterface(static_cast<IMFByteStream*>(this), aInterface);
}
if (aIId == IID_IMFAsyncCallback) {
return DoGetInterface(static_cast<IMFAsyncCallback*>(this), aInterface);
}
if (aIId == IID_IUnknown) {
return DoGetInterface(static_cast<IMFByteStream*>(this), aInterface);
}
@ -108,12 +180,38 @@ WMFByteStream::QueryInterface(REFIID aIId, void **aInterface)
NS_IMPL_THREADSAFE_ADDREF(WMFByteStream)
NS_IMPL_THREADSAFE_RELEASE(WMFByteStream)
NS_IMPL_THREADSAFE_ADDREF(WMFByteStream::AsyncReadRequestState)
NS_IMPL_THREADSAFE_RELEASE(WMFByteStream::AsyncReadRequestState)
// Stores data regarding an async read opreation.
class AsyncReadRequestState MOZ_FINAL : public IUnknown {
public:
AsyncReadRequestState(int64_t aOffset, BYTE* aBuffer, ULONG aLength)
: mOffset(aOffset),
mBuffer(aBuffer),
mBufferLength(aLength),
mBytesRead(0)
{}
// IUnknown Methods
STDMETHODIMP QueryInterface(REFIID aRIID, LPVOID *aOutObject);
STDMETHODIMP_(ULONG) AddRef();
STDMETHODIMP_(ULONG) Release();
int64_t mOffset;
BYTE* mBuffer;
ULONG mBufferLength;
ULONG mBytesRead;
// IUnknown ref counting.
nsAutoRefCnt mRefCnt;
NS_DECL_OWNINGTHREAD
};
NS_IMPL_THREADSAFE_ADDREF(AsyncReadRequestState)
NS_IMPL_THREADSAFE_RELEASE(AsyncReadRequestState)
// IUnknown Methods
STDMETHODIMP
WMFByteStream::AsyncReadRequestState::QueryInterface(REFIID aIId, void **aInterface)
AsyncReadRequestState::QueryInterface(REFIID aIId, void **aInterface)
{
LOG("WMFByteStream::AsyncReadRequestState::QueryInterface %s", GetGUIDName(aIId).get());
@ -125,6 +223,25 @@ WMFByteStream::AsyncReadRequestState::QueryInterface(REFIID aIId, void **aInterf
return E_NOINTERFACE;
}
class PerformReadEvent MOZ_FINAL : public nsRunnable {
public:
PerformReadEvent(WMFByteStream* aStream,
IMFAsyncResult* aResult,
AsyncReadRequestState* aRequestState)
: mStream(aStream),
mResult(aResult),
mRequestState(aRequestState) {}
NS_IMETHOD Run() {
mStream->PerformRead(mResult, mRequestState);
return NS_OK;
}
private:
RefPtr<WMFByteStream> mStream;
RefPtr<IMFAsyncResult> mResult;
RefPtr<AsyncReadRequestState> mRequestState;
};
// IMFByteStream Methods
STDMETHODIMP
WMFByteStream::BeginRead(BYTE *aBuffer,
@ -136,11 +253,15 @@ WMFByteStream::BeginRead(BYTE *aBuffer,
NS_ENSURE_TRUE(aCallback, E_POINTER);
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
LOG("WMFByteStream::BeginRead() mOffset=%lld tell=%lld length=%lu",
mOffset, mResource->Tell(), aLength);
LOG("WMFByteStream::BeginRead() mOffset=%lld tell=%lld length=%lu mIsShutdown=%d",
mOffset, mResource->Tell(), aLength, mIsShutdown);
if (mIsShutdown) {
return E_FAIL;
}
// Create an object to store our state.
RefPtr<IUnknown> requestState = new AsyncReadRequestState(mOffset, aBuffer, aLength);
RefPtr<AsyncReadRequestState> requestState = new AsyncReadRequestState(mOffset, aBuffer, aLength);
// Create an IMFAsyncResult, this is passed back to the caller as a token to
// retrieve the number of bytes read.
@ -151,64 +272,39 @@ WMFByteStream::BeginRead(BYTE *aBuffer,
byRef(callersResult));
NS_ENSURE_TRUE(SUCCEEDED(hr), hr);
// Queue a work item on our Windows Media Foundation work queue to call
// this object's Invoke() function which performs the read, and in turn
// invokes the caller's callback to notify the caller that the read
// operation is complete. Note: This creates another IMFAsyncResult to
// wrap callersResult, and it's that wrapping IMFAsyncResult which is
// passed to Invoke().
hr = wmf::MFPutWorkItem(mWorkQueueId, this, callersResult);
// Dispatch an event to perform the read in the thread pool.
nsCOMPtr<nsIRunnable> r = new PerformReadEvent(this, callersResult, requestState);
nsresult rv = mThreadPool->Dispatch(r, NS_DISPATCH_NORMAL);
return hr;
return NS_SUCCEEDED(rv) ? S_OK : E_FAIL;
}
// IMFAsyncCallback::Invoke implementation. This is called back on the work
// queue thread, and performs the actual read.
STDMETHODIMP
WMFByteStream::Invoke(IMFAsyncResult* aResult)
// Note: This is called on one of the thread pool's threads.
void
WMFByteStream::PerformRead(IMFAsyncResult* aResult, AsyncReadRequestState* aRequestState)
{
// Note: We assume that the WMF Work Queue that calls this function does
// so synchronously, i.e. this function call returns before any other
// work queue item runs. This is important, as if we run multiple instances
// of this function at once we'll interleave seeks and reads on the
// media resoure.
// Extract the caller's IMFAsyncResult object from the wrapping aResult object.
RefPtr<IMFAsyncResult> callerResult;
RefPtr<IUnknown> unknown;
HRESULT hr = aResult->GetState(byRef(unknown));
NS_ENSURE_TRUE(SUCCEEDED(hr), hr);
hr = unknown->QueryInterface(static_cast<IMFAsyncResult**>(byRef(callerResult)));
NS_ENSURE_TRUE(SUCCEEDED(hr), E_FAIL);
// Get the object that holds our state information for the asynchronous call.
hr = callerResult->GetObject(byRef(unknown));
NS_ENSURE_TRUE(SUCCEEDED(hr) && unknown, hr);
AsyncReadRequestState* requestState =
static_cast<AsyncReadRequestState*>(unknown.get());
// Ensure the read head is at the correct offset in the resource. It may not
// be if the SourceReader seeked.
if (mResource->Tell() != requestState->mOffset) {
if (mResource->Tell() != aRequestState->mOffset) {
nsresult rv = mResource->Seek(nsISeekableStream::NS_SEEK_SET,
requestState->mOffset);
aRequestState->mOffset);
if (NS_FAILED(rv)) {
// Let SourceReader know the read failed.
callerResult->SetStatus(E_FAIL);
wmf::MFInvokeCallback(callerResult);
aResult->SetStatus(E_FAIL);
wmf::MFInvokeCallback(aResult);
LOG("WMFByteStream::Invoke() seek to read offset failed, aborting read");
return S_OK;
return;
}
}
NS_ASSERTION(mResource->Tell() == requestState->mOffset, "State mismatch!");
NS_ASSERTION(mResource->Tell() == aRequestState->mOffset, "State mismatch!");
// Read in a loop to ensure we fill the buffer, when possible.
ULONG totalBytesRead = 0;
nsresult rv = NS_OK;
while (totalBytesRead < requestState->mBufferLength) {
BYTE* buffer = requestState->mBuffer + totalBytesRead;
while (totalBytesRead < aRequestState->mBufferLength) {
BYTE* buffer = aRequestState->mBuffer + totalBytesRead;
ULONG bytesRead = 0;
ULONG length = requestState->mBufferLength - totalBytesRead;
ULONG length = aRequestState->mBufferLength - totalBytesRead;
rv = mResource->Read(reinterpret_cast<char*>(buffer),
length,
reinterpret_cast<uint32_t*>(&bytesRead));
@ -220,16 +316,15 @@ WMFByteStream::Invoke(IMFAsyncResult* aResult)
// Record the number of bytes read, so the caller can retrieve
// it later.
requestState->mBytesRead = NS_SUCCEEDED(rv) ? totalBytesRead : 0;
callerResult->SetStatus(S_OK);
aRequestState->mBytesRead = NS_SUCCEEDED(rv) ? totalBytesRead : 0;
aResult->SetStatus(S_OK);
LOG("WMFByteStream::Invoke() read %d at %lld finished rv=%x",
requestState->mBytesRead, requestState->mOffset, rv);
aRequestState->mBytesRead, aRequestState->mOffset, rv);
// Let caller know read is complete.
wmf::MFInvokeCallback(callerResult);
return S_OK;
DebugOnly<HRESULT> hr = wmf::MFInvokeCallback(aResult);
NS_ASSERTION(SUCCEEDED(hr), "Failed to invoke callback!");
}
STDMETHODIMP
@ -355,6 +450,10 @@ WMFByteStream::Seek(MFBYTESTREAM_SEEK_ORIGIN aSeekOrigin,
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
if (mIsShutdown) {
return E_FAIL;
}
if (aSeekOrigin == msoBegin) {
mOffset = aSeekOffset;
} else {
@ -375,6 +474,10 @@ WMFByteStream::SetCurrentPosition(QWORD aPosition)
LOG("WMFByteStream::SetCurrentPosition(%lld)",
aPosition);
if (mIsShutdown) {
return E_FAIL;
}
// Note: WMF calls SetCurrentPosition() sometimes after calling BeginRead()
// but before the read has finished, and thus before it's called EndRead().
// See comment in EndRead() for more details.
@ -408,11 +511,4 @@ WMFByteStream::Write(const BYTE *, ULONG, ULONG *)
return E_NOTIMPL;
}
STDMETHODIMP
WMFByteStream::GetParameters(DWORD*, DWORD*)
{
LOG("WMFByteStream::GetParameters()");
return E_NOTIMPL;
}
} // namespace mozilla

View File

@ -14,17 +14,19 @@
#include "mozilla/Attributes.h"
#include "nsAutoPtr.h"
class nsIThreadPool;
namespace mozilla {
class MediaResource;
class AsyncReadRequestState;
// Wraps a MediaResource around an IMFByteStream interface, so that it can
// be used by the IMFSourceReader. Each WMFByteStream creates a WMF Work Queue
// on which blocking I/O is performed. The SourceReader requests reads
// asynchronously using {Begin,End}Read(). The synchronous I/O methods aren't
// used by the SourceReader, so they're not implemented on this class.
class WMFByteStream MOZ_FINAL : public IMFByteStream,
public IMFAsyncCallback
class WMFByteStream MOZ_FINAL : public IMFByteStream
{
public:
WMFByteStream(MediaResource* aResource);
@ -63,44 +65,14 @@ public:
STDMETHODIMP SetLength(QWORD);
STDMETHODIMP Write(const BYTE *, ULONG, ULONG *);
// IMFAsyncCallback methods.
// We perform an async read operation in this callback implementation.
STDMETHODIMP GetParameters(DWORD*, DWORD*);
STDMETHODIMP Invoke(IMFAsyncResult* aResult);
void PerformRead(IMFAsyncResult* aResult, AsyncReadRequestState* aRequestState);
private:
// Id of the work queue upon which we perfrom reads. This
// objects's Invoke() function is called on the work queue's thread,
// and it's there that we perform blocking IO. This has value
// MFASYNC_CALLBACK_QUEUE_UNDEFINED if the work queue hasn't been
// created yet.
DWORD mWorkQueueId;
// Stores data regarding an async read opreation.
class AsyncReadRequestState MOZ_FINAL : public IUnknown {
public:
AsyncReadRequestState(int64_t aOffset, BYTE* aBuffer, ULONG aLength)
: mOffset(aOffset),
mBuffer(aBuffer),
mBufferLength(aLength),
mBytesRead(0)
{}
// IUnknown Methods
STDMETHODIMP QueryInterface(REFIID aRIID, LPVOID *aOutObject);
STDMETHODIMP_(ULONG) AddRef();
STDMETHODIMP_(ULONG) Release();
int64_t mOffset;
BYTE* mBuffer;
ULONG mBufferLength;
ULONG mBytesRead;
// IUnknown ref counting.
nsAutoRefCnt mRefCnt;
NS_DECL_OWNINGTHREAD
};
// Reference to the thread pool in which we perform the reads asynchronously.
// Note this is pool is shared amongst all active WMFByteStreams.
nsCOMPtr<nsIThreadPool> mThreadPool;
// Resource we're wrapping. Note this object's methods are threadsafe,
// and we only call read/seek on the work queue's thread.
@ -116,6 +88,10 @@ private:
// since the read hadn't yet completed.
int64_t mOffset;
// True if the resource has been shutdown, either because the WMFReader is
// shutting down, or because the underlying MediaResource has closed.
bool mIsShutdown;
// IUnknown ref counting.
nsAutoRefCnt mRefCnt;
NS_DECL_OWNINGTHREAD

View File

@ -290,32 +290,6 @@ MFShutdown()
return (MFShutdownPtr)();
}
HRESULT
MFPutWorkItem(DWORD aQueueId,
IMFAsyncCallback *aCallback,
IUnknown *aState)
{
DECL_FUNCTION_PTR(MFPutWorkItem, DWORD, IMFAsyncCallback*, IUnknown*);
ENSURE_FUNCTION_PTR(MFPutWorkItem, Mfplat.dll)
return (MFPutWorkItemPtr)(aQueueId, aCallback, aState);
}
HRESULT
MFAllocateWorkQueue(DWORD *aOutWorkQueueId)
{
DECL_FUNCTION_PTR(MFAllocateWorkQueue, DWORD*);
ENSURE_FUNCTION_PTR(MFAllocateWorkQueue, Mfplat.dll)
return (MFAllocateWorkQueuePtr)(aOutWorkQueueId);
}
HRESULT
MFUnlockWorkQueue(DWORD aWorkQueueId)
{
DECL_FUNCTION_PTR(MFUnlockWorkQueue, DWORD);
ENSURE_FUNCTION_PTR(MFUnlockWorkQueue, Mfplat.dll);
return (MFUnlockWorkQueuePtr)(aWorkQueueId);
}
HRESULT MFCreateAsyncResult(IUnknown *aUnkObject,
IMFAsyncCallback *aCallback,
IUnknown *aUnkState,