gecko/content/media/MediaStreamGraph.cpp
Robert O'Callahan c2b6383f48 Bug 759908. Create MediaStreamListener::NotifyConsumptionChanged. r=jesup
--HG--
extra : rebase_source : f9479b836ec92170782eb01ea8b97004b057ceb7
2012-06-01 18:26:17 +12:00

2023 lines
66 KiB
C++

/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*-*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "MediaStreamGraph.h"
#include "mozilla/Monitor.h"
#include "mozilla/TimeStamp.h"
#include "AudioSegment.h"
#include "VideoSegment.h"
#include "nsContentUtils.h"
#include "nsIAppShell.h"
#include "nsIObserver.h"
#include "nsServiceManagerUtils.h"
#include "nsWidgetsCID.h"
#include "nsXPCOMCIDInternal.h"
#include "prlog.h"
#include "VideoUtils.h"
using namespace mozilla::layers;
namespace mozilla {
namespace {
#ifdef PR_LOGGING
PRLogModuleInfo* gMediaStreamGraphLog;
#define LOG(type, msg) PR_LOG(gMediaStreamGraphLog, type, msg)
#else
#define LOG(type, msg)
#endif
/**
* Assume we can run an iteration of the MediaStreamGraph loop in this much time
* or less.
* We try to run the control loop at this rate.
*/
const int MEDIA_GRAPH_TARGET_PERIOD_MS = 10;
/**
* Assume that we might miss our scheduled wakeup of the MediaStreamGraph by
* this much.
*/
const int SCHEDULE_SAFETY_MARGIN_MS = 10;
/**
* Try have this much audio buffered in streams and queued to the hardware.
* The maximum delay to the end of the next control loop
* is 2*MEDIA_GRAPH_TARGET_PERIOD_MS + SCHEDULE_SAFETY_MARGIN_MS.
* There is no point in buffering more audio than this in a stream at any
* given time (until we add processing).
* This is not optimal yet.
*/
const int AUDIO_TARGET_MS = 2*MEDIA_GRAPH_TARGET_PERIOD_MS +
SCHEDULE_SAFETY_MARGIN_MS;
/**
* Try have this much video buffered. Video frames are set
* near the end of the iteration of the control loop. The maximum delay
* to the setting of the next video frame is 2*MEDIA_GRAPH_TARGET_PERIOD_MS +
* SCHEDULE_SAFETY_MARGIN_MS. This is not optimal yet.
*/
const int VIDEO_TARGET_MS = 2*MEDIA_GRAPH_TARGET_PERIOD_MS +
SCHEDULE_SAFETY_MARGIN_MS;
/**
* A per-stream update message passed from the media graph thread to the
* main thread.
*/
struct StreamUpdate {
PRInt64 mGraphUpdateIndex;
nsRefPtr<MediaStream> mStream;
StreamTime mNextMainThreadCurrentTime;
bool mNextMainThreadFinished;
};
/**
* This represents a message passed from the main thread to the graph thread.
* A ControlMessage always references a particular affected stream.
*/
class ControlMessage {
public:
ControlMessage(MediaStream* aStream) : mStream(aStream)
{
MOZ_COUNT_CTOR(ControlMessage);
}
// All these run on the graph thread
virtual ~ControlMessage()
{
MOZ_COUNT_DTOR(ControlMessage);
}
// Executed before we know what the action time for this message will be.
// Call NoteStreamAffected on the stream whose output will be
// modified by this message. Default implementation calls
// NoteStreamAffected(mStream).
virtual void UpdateAffectedStream();
// Executed after we know what the action time for this message will be.
virtual void Process() {}
// When we're shutting down the application, most messages are ignored but
// some cleanup messages should still be processed (on the main thread).
virtual void ProcessDuringShutdown() {}
protected:
// We do not hold a reference to mStream. The main thread will be holding
// a reference to the stream while this message is in flight. The last message
// referencing a stream is the Destroy message for that stream.
MediaStream* mStream;
};
}
/**
* The implementation of a media stream graph. This class is private to this
* file. It's not in the anonymous namespace because MediaStream needs to
* be able to friend it.
*
* Currently we only have one per process.
*/
class MediaStreamGraphImpl : public MediaStreamGraph {
public:
MediaStreamGraphImpl();
~MediaStreamGraphImpl()
{
NS_ASSERTION(IsEmpty(),
"All streams should have been destroyed by messages from the main thread");
LOG(PR_LOG_DEBUG, ("MediaStreamGraph %p destroyed", this));
}
// Main thread only.
/**
* This runs every time we need to sync state from the media graph thread
* to the main thread while the main thread is not in the middle
* of a script. It runs during a "stable state" (per HTML5) or during
* an event posted to the main thread.
*/
void RunInStableState();
/**
* Ensure a runnable to run RunInStableState is posted to the appshell to
* run at the next stable state (per HTML5).
* See EnsureStableStateEventPosted.
*/
void EnsureRunInStableState();
/**
* Called to apply a StreamUpdate to its stream.
*/
void ApplyStreamUpdate(StreamUpdate* aUpdate);
/**
* Append a ControlMessage to the message queue. This queue is drained
* during RunInStableState; the messages will run on the graph thread.
*/
void AppendMessage(ControlMessage* aMessage);
/**
* Make this MediaStreamGraph enter forced-shutdown state. This state
* will be noticed by the media graph thread, which will shut down all streams
* and other state controlled by the media graph thread.
* This is called during application shutdown.
*/
void ForceShutDown();
/**
* Shutdown() this MediaStreamGraph's threads and return when they've shut down.
*/
void ShutdownThreads();
// The following methods run on the graph thread (or possibly the main thread if
// mLifecycleState > LIFECYCLE_RUNNING)
/**
* Runs main control loop on the graph thread. Normally a single invocation
* of this runs for the entire lifetime of the graph thread.
*/
void RunThread();
/**
* Call this to indicate that another iteration of the control loop is
* required on its regular schedule. The monitor must not be held.
*/
void EnsureNextIteration();
/**
* As above, but with the monitor already held.
*/
void EnsureNextIterationLocked(MonitorAutoLock& aLock);
/**
* Call this to indicate that another iteration of the control loop is
* required immediately. The monitor must already be held.
*/
void EnsureImmediateWakeUpLocked(MonitorAutoLock& aLock);
/**
* Ensure there is an event posted to the main thread to run RunInStableState.
* mMonitor must be held.
* See EnsureRunInStableState
*/
void EnsureStableStateEventPosted();
/**
* Generate messages to the main thread to update it for all state changes.
* mMonitor must be held.
*/
void PrepareUpdatesToMainThreadState();
// The following methods are the various stages of RunThread processing.
/**
* Compute a new current time for the graph and advance all on-graph-thread
* state to the new current time.
*/
void UpdateCurrentTime();
/**
* Update mLastActionTime to the time at which the current set of messages
* will take effect.
*/
void ChooseActionTime();
/**
* Update the consumption state of aStream to reflect whether its data
* is needed or not.
*/
void UpdateConsumptionState(SourceMediaStream* aStream);
/**
* Extract any state updates pending in aStream, and apply them.
*/
void ExtractPendingInput(SourceMediaStream* aStream);
/**
* Update "have enough data" flags in aStream.
*/
void UpdateBufferSufficiencyState(SourceMediaStream* aStream);
/**
* Compute the blocking states of streams from mBlockingDecisionsMadeUntilTime
* until the desired future time (determined by heuristic).
* Updates mBlockingDecisionsMadeUntilTime and sets MediaStream::mBlocked
* for all streams.
*/
void RecomputeBlocking();
// The following methods are used to help RecomputeBlocking.
/**
* Mark a stream blocked at time aTime. If this results in decisions that need
* to be revisited at some point in the future, *aEnd will be reduced to the
* first time in the future to recompute those decisions.
*/
void MarkStreamBlocked(MediaStream* aStream, GraphTime aTime, GraphTime* aEnd);
/**
* Recompute blocking for all streams for the interval starting at aTime.
* If this results in decisions that need to be revisited at some point
* in the future, *aEnd will be reduced to the first time in the future to
* recompute those decisions.
*/
void RecomputeBlockingAt(GraphTime aTime, GraphTime aEndBlockingDecisions,
GraphTime* aEnd);
/**
* Returns true if aStream will underrun at aTime for its own playback.
* aEndBlockingDecisions is when we plan to stop making blocking decisions.
* *aEnd will be reduced to the first time in the future to recompute these
* decisions.
*/
bool WillUnderrun(MediaStream* aStream, GraphTime aTime,
GraphTime aEndBlockingDecisions, GraphTime* aEnd);
/**
* Return true if there is an explicit blocker set from the current time
* indefinitely far into the future.
*/
bool IsAlwaysExplicitlyBlocked(MediaStream* aStream);
/**
* Given a graph time aTime, convert it to a stream time taking into
* account the time during which aStream is scheduled to be blocked.
*/
StreamTime GraphTimeToStreamTime(MediaStream* aStream, StreamTime aTime);
enum {
INCLUDE_TRAILING_BLOCKED_INTERVAL = 0x01
};
/**
* Given a stream time aTime, convert it to a graph time taking into
* account the time during which aStream is scheduled to be blocked.
* aTime must be <= mBlockingDecisionsMadeUntilTime since blocking decisions
* are only known up to that point.
* If aTime is exactly at the start of a blocked interval, then the blocked
* interval is included in the time returned if and only if
* aFlags includes INCLUDE_TRAILING_BLOCKED_INTERVAL.
*/
GraphTime StreamTimeToGraphTime(MediaStream* aStream, StreamTime aTime,
PRUint32 aFlags = 0);
/**
* Get the current audio position of the stream's audio output.
*/
GraphTime GetAudioPosition(MediaStream* aStream);
/**
* If aStream needs an audio stream but doesn't have one, create it.
* If aStream doesn't need an audio stream but has one, destroy it.
*/
void CreateOrDestroyAudioStream(GraphTime aAudioOutputStartTime,
MediaStream* aStream);
/**
* Update aStream->mFirstActiveTracks.
*/
void UpdateFirstActiveTracks(MediaStream* aStream);
/**
* Queue audio (mix of stream audio and silence for blocked intervals)
* to the audio output stream.
*/
void PlayAudio(MediaStream* aStream, GraphTime aFrom, GraphTime aTo);
/**
* Set the correct current video frame for stream aStream.
*/
void PlayVideo(MediaStream* aStream);
/**
* No more data will be forthcoming for aStream. The stream will end
* at the current buffer end point. The StreamBuffer's tracks must be
* explicitly set to finished by the caller.
*/
void FinishStream(MediaStream* aStream);
/**
* Compute how much stream data we would like to buffer for aStream.
*/
StreamTime GetDesiredBufferEnd(MediaStream* aStream);
/**
* Returns true when there are no active streams.
*/
bool IsEmpty() { return mStreams.IsEmpty(); }
// For use by control messages
/**
* Identify which graph update index we are currently processing.
*/
PRInt64 GetProcessingGraphUpdateIndex() { return mProcessingGraphUpdateIndex; }
/**
* Marks aStream as affected by a change in its output at desired time aTime
* (in the timeline of aStream). The change may not actually happen at this time,
* it may be delayed until later if there is buffered data we can't change.
*/
void NoteStreamAffected(MediaStream* aStream, double aTime);
/**
* Marks aStream as affected by a change in its output at the earliest
* possible time.
*/
void NoteStreamAffected(MediaStream* aStream);
/**
* Add aStream to the graph and initializes its graph-specific state.
*/
void AddStream(MediaStream* aStream);
/**
* Remove aStream from the graph. Ensures that pending messages about the
* stream back to the main thread are flushed.
*/
void RemoveStream(MediaStream* aStream);
/**
* Compute the earliest time at which an action be allowed to occur on any
* stream. Actions cannot be earlier than the previous action time, and
* cannot affect already-committed blocking decisions (and associated
* buffered audio).
*/
GraphTime GetEarliestActionTime()
{
return NS_MAX(mCurrentTime, NS_MAX(mLastActionTime, mBlockingDecisionsMadeUntilTime));
}
// Data members
/**
* Media graph thread.
* Readonly after initialization on the main thread.
*/
nsCOMPtr<nsIThread> mThread;
// The following state is managed on the graph thread only, unless
// mLifecycleState > LIFECYCLE_RUNNING in which case the graph thread
// is not running and this state can be used from the main thread.
nsTArray<nsRefPtr<MediaStream> > mStreams;
/**
* The time the last action was deemed to have occurred. This could be
* later than mCurrentTime if actions have to be delayed during data
* buffering, or before mCurrentTime if mCurrentTime has advanced since
* the last action happened. In ControlMessage::Process calls,
* mLastActionTime has always been updated to be >= mCurrentTime.
*/
GraphTime mLastActionTime;
/**
* The current graph time for the current iteration of the RunThread control
* loop.
*/
GraphTime mCurrentTime;
/**
* Blocking decisions have been made up to this time. We also buffer audio
* up to this time.
*/
GraphTime mBlockingDecisionsMadeUntilTime;
/**
* This is only used for logging.
*/
TimeStamp mInitialTimeStamp;
/**
* The real timestamp of the latest run of UpdateCurrentTime.
*/
TimeStamp mCurrentTimeStamp;
/**
* Which update batch we are currently processing.
*/
PRInt64 mProcessingGraphUpdateIndex;
// mMonitor guards the data below.
// MediaStreamGraph normally does its work without holding mMonitor, so it is
// not safe to just grab mMonitor from some thread and start monkeying with
// the graph. Instead, communicate with the graph thread using provided
// mechanisms such as the ControlMessage queue.
Monitor mMonitor;
// Data guarded by mMonitor (must always be accessed with mMonitor held,
// regardless of the value of mLifecycleState.
/**
* State to copy to main thread
*/
nsTArray<StreamUpdate> mStreamUpdates;
/**
* Runnables to run after the next update to main thread state.
*/
nsTArray<nsCOMPtr<nsIRunnable> > mUpdateRunnables;
struct MessageBlock {
PRInt64 mGraphUpdateIndex;
nsTArray<nsAutoPtr<ControlMessage> > mMessages;
};
/**
* A list of batches of messages to process. Each batch is processed
* as an atomic unit.
*/
nsTArray<MessageBlock> mMessageQueue;
/**
* This enum specifies where this graph is in its lifecycle. This is used
* to control shutdown.
* Shutdown is tricky because it can happen in two different ways:
* 1) Shutdown due to inactivity. RunThread() detects that it has no
* pending messages and no streams, and exits. The next RunInStableState()
* checks if there are new pending messages from the main thread (true only
* if new stream creation raced with shutdown); if there are, it revives
* RunThread(), otherwise it commits to shutting down the graph. New stream
* creation after this point will create a new graph. An async event is
* dispatched to Shutdown() the graph's threads and then delete the graph
* object.
* 2) Forced shutdown at application shutdown. A flag is set, RunThread()
* detects the flag and exits, the next RunInStableState() detects the flag,
* and dispatches the async event to Shutdown() the graph's threads. However
* the graph object is not deleted. New messages for the graph are processed
* synchronously on the main thread if necessary. When the last stream is
* destroyed, the graph object is deleted.
*/
enum LifecycleState {
// The graph thread hasn't started yet.
LIFECYCLE_THREAD_NOT_STARTED,
// RunThread() is running normally.
LIFECYCLE_RUNNING,
// In the following states, the graph thread is not running so
// all "graph thread only" state in this class can be used safely
// on the main thread.
// RunThread() has exited and we're waiting for the next
// RunInStableState(), at which point we can clean up the main-thread
// side of the graph.
LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP,
// RunInStableState() posted a ShutdownRunnable, and we're waiting for it
// to shut down the graph thread(s).
LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN,
// Graph threads have shut down but we're waiting for remaining streams
// to be destroyed. Only happens during application shutdown since normally
// we'd only shut down a graph when it has no streams.
LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION
};
LifecycleState mLifecycleState;
/**
* This enum specifies the wait state of the graph thread.
*/
enum WaitState {
// RunThread() is running normally
WAITSTATE_RUNNING,
// RunThread() is paused waiting for its next iteration, which will
// happen soon
WAITSTATE_WAITING_FOR_NEXT_ITERATION,
// RunThread() is paused indefinitely waiting for something to change
WAITSTATE_WAITING_INDEFINITELY,
// Something has signaled RunThread() to wake up immediately,
// but it hasn't done so yet
WAITSTATE_WAKING_UP
};
WaitState mWaitState;
/**
* True when another iteration of the control loop is required.
*/
bool mNeedAnotherIteration;
/**
* True when we need to do a forced shutdown during application shutdown.
*/
bool mForceShutDown;
/**
* True when we have posted an event to the main thread to run
* RunInStableState() and the event hasn't run yet.
*/
bool mPostedRunInStableStateEvent;
// Main thread only
/**
* Messages posted by the current event loop task. These are forwarded to
* the media graph thread during RunInStableState. We can't forward them
* immediately because we want all messages between stable states to be
* processed as an atomic batch.
*/
nsTArray<nsAutoPtr<ControlMessage> > mCurrentTaskMessageQueue;
/**
* True when RunInStableState has determined that mLifecycleState is >
* LIFECYCLE_RUNNING. Since only the main thread can reset mLifecycleState to
* LIFECYCLE_RUNNING, this can be relied on to not change unexpectedly.
*/
bool mDetectedNotRunning;
/**
* True when a stable state runner has been posted to the appshell to run
* RunInStableState at the next stable state.
*/
bool mPostedRunInStableState;
};
/**
* The singleton graph instance.
*/
static MediaStreamGraphImpl* gGraph;
StreamTime
MediaStreamGraphImpl::GetDesiredBufferEnd(MediaStream* aStream)
{
StreamTime current = mCurrentTime - aStream->mBufferStartTime;
StreamTime desiredEnd = current;
if (!aStream->mAudioOutputs.IsEmpty()) {
desiredEnd = NS_MAX(desiredEnd, current + MillisecondsToMediaTime(AUDIO_TARGET_MS));
}
if (!aStream->mVideoOutputs.IsEmpty()) {
desiredEnd = NS_MAX(desiredEnd, current + MillisecondsToMediaTime(VIDEO_TARGET_MS));
}
return desiredEnd;
}
bool
MediaStreamGraphImpl::IsAlwaysExplicitlyBlocked(MediaStream* aStream)
{
GraphTime t = mCurrentTime;
while (true) {
GraphTime end;
if (aStream->mExplicitBlockerCount.GetAt(t, &end) == 0)
return false;
if (end >= GRAPH_TIME_MAX)
return true;
t = end;
}
}
void
MediaStreamGraphImpl::FinishStream(MediaStream* aStream)
{
if (aStream->mFinished)
return;
LOG(PR_LOG_DEBUG, ("MediaStream %p will finish", aStream));
aStream->mFinished = true;
// Force at least one more iteration of the control loop, since we rely
// on UpdateCurrentTime to notify our listeners once the stream end
// has been reached.
EnsureNextIteration();
}
void
MediaStreamGraphImpl::NoteStreamAffected(MediaStream* aStream, double aTime)
{
NS_ASSERTION(aTime >= 0, "Bad time");
GraphTime t =
NS_MAX(GetEarliestActionTime(),
StreamTimeToGraphTime(aStream, SecondsToMediaTime(aTime),
INCLUDE_TRAILING_BLOCKED_INTERVAL));
aStream->mMessageAffectedTime = NS_MIN(aStream->mMessageAffectedTime, t);
}
void
MediaStreamGraphImpl::NoteStreamAffected(MediaStream* aStream)
{
GraphTime t = GetEarliestActionTime();
aStream->mMessageAffectedTime = NS_MIN(aStream->mMessageAffectedTime, t);
}
void
ControlMessage::UpdateAffectedStream()
{
NS_ASSERTION(mStream, "Must have stream for default UpdateAffectedStream");
mStream->GraphImpl()->NoteStreamAffected(mStream);
}
void
MediaStreamGraphImpl::AddStream(MediaStream* aStream)
{
aStream->mBufferStartTime = mCurrentTime;
aStream->mMessageAffectedTime = GetEarliestActionTime();
*mStreams.AppendElement() = already_AddRefed<MediaStream>(aStream);
LOG(PR_LOG_DEBUG, ("Adding media stream %p to the graph", aStream));
}
void
MediaStreamGraphImpl::RemoveStream(MediaStream* aStream)
{
// Remove references in mStreamUpdates before we allow aStream to die.
// Pending updates are not needed (since the main thread has already given
// up the stream) so we will just drop them.
{
MonitorAutoLock lock(mMonitor);
for (PRUint32 i = 0; i < mStreamUpdates.Length(); ++i) {
if (mStreamUpdates[i].mStream == aStream) {
mStreamUpdates[i].mStream = nsnull;
}
}
}
// This unrefs the stream, probably destroying it
mStreams.RemoveElement(aStream);
LOG(PR_LOG_DEBUG, ("Removing media stream %p from the graph", aStream));
}
void
MediaStreamGraphImpl::ChooseActionTime()
{
mLastActionTime = GetEarliestActionTime();
}
void
MediaStreamGraphImpl::UpdateConsumptionState(SourceMediaStream* aStream)
{
bool isConsumed = !aStream->mAudioOutputs.IsEmpty() ||
!aStream->mVideoOutputs.IsEmpty();
MediaStreamListener::Consumption state = isConsumed ? MediaStreamListener::CONSUMED
: MediaStreamListener::NOT_CONSUMED;
if (state != aStream->mLastConsumptionState) {
aStream->mLastConsumptionState = state;
for (PRUint32 j = 0; j < aStream->mListeners.Length(); ++j) {
MediaStreamListener* l = aStream->mListeners[j];
l->NotifyConsumptionChanged(this, state);
}
}
}
void
MediaStreamGraphImpl::ExtractPendingInput(SourceMediaStream* aStream)
{
bool finished;
{
MutexAutoLock lock(aStream->mMutex);
finished = aStream->mUpdateFinished;
for (PRInt32 i = aStream->mUpdateTracks.Length() - 1; i >= 0; --i) {
SourceMediaStream::TrackData* data = &aStream->mUpdateTracks[i];
for (PRUint32 j = 0; j < aStream->mListeners.Length(); ++j) {
MediaStreamListener* l = aStream->mListeners[j];
TrackTicks offset = (data->mCommands & SourceMediaStream::TRACK_CREATE)
? data->mStart : aStream->mBuffer.FindTrack(data->mID)->GetSegment()->GetDuration();
l->NotifyQueuedTrackChanges(this, data->mID, data->mRate,
offset, data->mCommands, *data->mData);
}
if (data->mCommands & SourceMediaStream::TRACK_CREATE) {
MediaSegment* segment = data->mData.forget();
LOG(PR_LOG_DEBUG, ("SourceMediaStream %p creating track %d, rate %d, start %lld, initial end %lld",
aStream, data->mID, data->mRate, PRInt64(data->mStart),
PRInt64(segment->GetDuration())));
aStream->mBuffer.AddTrack(data->mID, data->mRate, data->mStart, segment);
// The track has taken ownership of data->mData, so let's replace
// data->mData with an empty clone.
data->mData = segment->CreateEmptyClone();
data->mCommands &= ~SourceMediaStream::TRACK_CREATE;
} else if (data->mData->GetDuration() > 0) {
MediaSegment* dest = aStream->mBuffer.FindTrack(data->mID)->GetSegment();
LOG(PR_LOG_DEBUG, ("SourceMediaStream %p track %d, advancing end from %lld to %lld",
aStream, data->mID,
PRInt64(dest->GetDuration()),
PRInt64(dest->GetDuration() + data->mData->GetDuration())));
dest->AppendFrom(data->mData);
}
if (data->mCommands & SourceMediaStream::TRACK_END) {
aStream->mBuffer.FindTrack(data->mID)->SetEnded();
aStream->mUpdateTracks.RemoveElementAt(i);
}
}
aStream->mBuffer.AdvanceKnownTracksTime(aStream->mUpdateKnownTracksTime);
}
if (finished) {
FinishStream(aStream);
}
}
void
MediaStreamGraphImpl::UpdateBufferSufficiencyState(SourceMediaStream* aStream)
{
StreamTime desiredEnd = GetDesiredBufferEnd(aStream);
nsTArray<SourceMediaStream::ThreadAndRunnable> runnables;
{
MutexAutoLock lock(aStream->mMutex);
for (PRUint32 i = 0; i < aStream->mUpdateTracks.Length(); ++i) {
SourceMediaStream::TrackData* data = &aStream->mUpdateTracks[i];
if (data->mCommands & SourceMediaStream::TRACK_CREATE) {
// This track hasn't been created yet, so we have no sufficiency
// data. The track will be created in the next iteration of the
// control loop and then we'll fire insufficiency notifications
// if necessary.
continue;
}
if (data->mCommands & SourceMediaStream::TRACK_END) {
// This track will end, so no point in firing not-enough-data
// callbacks.
continue;
}
StreamBuffer::Track* track = aStream->mBuffer.FindTrack(data->mID);
// Note that track->IsEnded() must be false, otherwise we would have
// removed the track from mUpdateTracks already.
NS_ASSERTION(!track->IsEnded(), "What is this track doing here?");
data->mHaveEnough = track->GetEndTimeRoundDown() >= desiredEnd;
if (!data->mHaveEnough) {
runnables.MoveElementsFrom(data->mDispatchWhenNotEnough);
}
}
}
for (PRUint32 i = 0; i < runnables.Length(); ++i) {
runnables[i].mThread->Dispatch(runnables[i].mRunnable, 0);
}
}
StreamTime
MediaStreamGraphImpl::GraphTimeToStreamTime(MediaStream* aStream,
GraphTime aTime)
{
NS_ASSERTION(aTime <= mBlockingDecisionsMadeUntilTime,
"Don't ask about times where we haven't made blocking decisions yet");
if (aTime <= mCurrentTime) {
return NS_MAX<StreamTime>(0, aTime - aStream->mBufferStartTime);
}
GraphTime t = mCurrentTime;
StreamTime s = t - aStream->mBufferStartTime;
while (t < aTime) {
GraphTime end;
if (!aStream->mBlocked.GetAt(t, &end)) {
s += NS_MIN(aTime, end) - t;
}
t = end;
}
return NS_MAX<StreamTime>(0, s);
}
GraphTime
MediaStreamGraphImpl::StreamTimeToGraphTime(MediaStream* aStream,
StreamTime aTime, PRUint32 aFlags)
{
if (aTime >= STREAM_TIME_MAX) {
return GRAPH_TIME_MAX;
}
MediaTime bufferElapsedToCurrentTime = mCurrentTime - aStream->mBufferStartTime;
if (aTime < bufferElapsedToCurrentTime ||
(aTime == bufferElapsedToCurrentTime && !(aFlags & INCLUDE_TRAILING_BLOCKED_INTERVAL))) {
return aTime + aStream->mBufferStartTime;
}
MediaTime streamAmount = aTime - bufferElapsedToCurrentTime;
NS_ASSERTION(streamAmount >= 0, "Can't answer queries before current time");
GraphTime t = mCurrentTime;
while (t < GRAPH_TIME_MAX) {
bool blocked;
GraphTime end;
if (t < mBlockingDecisionsMadeUntilTime) {
blocked = aStream->mBlocked.GetAt(t, &end);
end = NS_MIN(end, mBlockingDecisionsMadeUntilTime);
} else {
blocked = false;
end = GRAPH_TIME_MAX;
}
if (blocked) {
t = end;
} else {
if (streamAmount == 0) {
// No more stream time to consume at time t, so we're done.
break;
}
MediaTime consume = NS_MIN(end - t, streamAmount);
streamAmount -= consume;
t += consume;
}
}
return t;
}
GraphTime
MediaStreamGraphImpl::GetAudioPosition(MediaStream* aStream)
{
if (!aStream->mAudioOutput) {
return mCurrentTime;
}
PRInt64 positionInFrames = aStream->mAudioOutput->GetPositionInFrames();
if (positionInFrames < 0) {
return mCurrentTime;
}
return aStream->mAudioPlaybackStartTime +
TicksToTimeRoundDown(aStream->mAudioOutput->GetRate(),
positionInFrames);
}
void
MediaStreamGraphImpl::UpdateCurrentTime()
{
GraphTime prevCurrentTime = mCurrentTime;
TimeStamp now = TimeStamp::Now();
GraphTime nextCurrentTime =
SecondsToMediaTime((now - mCurrentTimeStamp).ToSeconds()) + mCurrentTime;
if (mBlockingDecisionsMadeUntilTime < nextCurrentTime) {
LOG(PR_LOG_WARNING, ("Media graph global underrun detected"));
LOG(PR_LOG_DEBUG, ("Advancing mBlockingDecisionsMadeUntilTime from %f to %f",
MediaTimeToSeconds(mBlockingDecisionsMadeUntilTime),
MediaTimeToSeconds(nextCurrentTime)));
// Advance mBlockingDecisionsMadeUntilTime to nextCurrentTime by
// adding blocked time to all streams starting at mBlockingDecisionsMadeUntilTime
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
mStreams[i]->mBlocked.SetAtAndAfter(mBlockingDecisionsMadeUntilTime, true);
}
mBlockingDecisionsMadeUntilTime = nextCurrentTime;
}
mCurrentTimeStamp = now;
LOG(PR_LOG_DEBUG, ("Updating current time to %f (real %f, mBlockingDecisionsMadeUntilTime %f)",
MediaTimeToSeconds(nextCurrentTime),
(now - mInitialTimeStamp).ToSeconds(),
MediaTimeToSeconds(mBlockingDecisionsMadeUntilTime)));
if (prevCurrentTime >= nextCurrentTime) {
NS_ASSERTION(prevCurrentTime == nextCurrentTime, "Time can't go backwards!");
// This could happen due to low clock resolution, maybe?
LOG(PR_LOG_DEBUG, ("Time did not advance"));
// There's not much left to do here, but the code below that notifies
// listeners that streams have ended still needs to run.
}
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
// Calculate blocked time and fire Blocked/Unblocked events
GraphTime blockedTime = 0;
GraphTime t = prevCurrentTime;
// Save current blocked status
bool wasBlocked = stream->mBlocked.GetAt(prevCurrentTime);
while (t < nextCurrentTime) {
GraphTime end;
bool blocked = stream->mBlocked.GetAt(t, &end);
if (blocked) {
blockedTime += NS_MIN(end, nextCurrentTime) - t;
}
if (blocked != wasBlocked) {
for (PRUint32 j = 0; j < stream->mListeners.Length(); ++j) {
MediaStreamListener* l = stream->mListeners[j];
l->NotifyBlockingChanged(this,
blocked ? MediaStreamListener::BLOCKED : MediaStreamListener::UNBLOCKED);
}
wasBlocked = blocked;
}
t = end;
}
stream->AdvanceTimeVaryingValuesToCurrentTime(nextCurrentTime, blockedTime);
// Advance mBlocked last so that implementations of
// AdvanceTimeVaryingValuesToCurrentTime can rely on the value of mBlocked.
stream->mBlocked.AdvanceCurrentTime(nextCurrentTime);
if (blockedTime < nextCurrentTime - prevCurrentTime) {
for (PRUint32 i = 0; i < stream->mListeners.Length(); ++i) {
MediaStreamListener* l = stream->mListeners[i];
l->NotifyOutput(this);
}
}
if (stream->mFinished && !stream->mNotifiedFinished &&
stream->mBufferStartTime + stream->GetBufferEnd() <= nextCurrentTime) {
stream->mNotifiedFinished = true;
for (PRUint32 j = 0; j < stream->mListeners.Length(); ++j) {
MediaStreamListener* l = stream->mListeners[j];
l->NotifyFinished(this);
}
}
LOG(PR_LOG_DEBUG, ("MediaStream %p bufferStartTime=%f blockedTime=%f",
stream, MediaTimeToSeconds(stream->mBufferStartTime),
MediaTimeToSeconds(blockedTime)));
}
mCurrentTime = nextCurrentTime;
}
void
MediaStreamGraphImpl::MarkStreamBlocked(MediaStream* aStream,
GraphTime aTime, GraphTime* aEnd)
{
NS_ASSERTION(!aStream->mBlocked.GetAt(aTime), "MediaStream already blocked");
aStream->mBlocked.SetAtAndAfter(aTime, true);
}
bool
MediaStreamGraphImpl::WillUnderrun(MediaStream* aStream, GraphTime aTime,
GraphTime aEndBlockingDecisions, GraphTime* aEnd)
{
// Finished streams, or streams that aren't being played back, can't underrun.
if (aStream->mFinished ||
(aStream->mAudioOutputs.IsEmpty() && aStream->mVideoOutputs.IsEmpty())) {
return false;
}
GraphTime bufferEnd =
StreamTimeToGraphTime(aStream, aStream->GetBufferEnd(),
INCLUDE_TRAILING_BLOCKED_INTERVAL);
NS_ASSERTION(bufferEnd >= mCurrentTime, "Buffer underran");
// We should block after bufferEnd.
if (bufferEnd <= aTime) {
LOG(PR_LOG_DEBUG, ("MediaStream %p will block due to data underrun, "
"bufferEnd %f",
aStream, MediaTimeToSeconds(bufferEnd)));
return true;
}
// We should keep blocking if we're currently blocked and we don't have
// data all the way through to aEndBlockingDecisions. If we don't have
// data all the way through to aEndBlockingDecisions, we'll block soon,
// but we might as well remain unblocked and play the data we've got while
// we can.
if (bufferEnd <= aEndBlockingDecisions && aStream->mBlocked.GetBefore(aTime)) {
LOG(PR_LOG_DEBUG, ("MediaStream %p will block due to speculative data underrun, "
"bufferEnd %f",
aStream, MediaTimeToSeconds(bufferEnd)));
return true;
}
// Reconsider decisions at bufferEnd
*aEnd = NS_MIN(*aEnd, bufferEnd);
return false;
}
void
MediaStreamGraphImpl::RecomputeBlocking()
{
PRInt32 writeAudioUpTo = AUDIO_TARGET_MS;
GraphTime endBlockingDecisions =
mCurrentTime + MillisecondsToMediaTime(writeAudioUpTo);
bool blockingDecisionsWillChange = false;
// mBlockingDecisionsMadeUntilTime has been set in UpdateCurrentTime
while (mBlockingDecisionsMadeUntilTime < endBlockingDecisions) {
LOG(PR_LOG_DEBUG, ("Media graph %p computing blocking for time %f",
this, MediaTimeToSeconds(mBlockingDecisionsMadeUntilTime)));
GraphTime end = GRAPH_TIME_MAX;
RecomputeBlockingAt(mBlockingDecisionsMadeUntilTime, endBlockingDecisions, &end);
LOG(PR_LOG_DEBUG, ("Media graph %p computed blocking for interval %f to %f",
this, MediaTimeToSeconds(mBlockingDecisionsMadeUntilTime),
MediaTimeToSeconds(end)));
mBlockingDecisionsMadeUntilTime = end;
if (end < GRAPH_TIME_MAX) {
blockingDecisionsWillChange = true;
}
}
mBlockingDecisionsMadeUntilTime = endBlockingDecisions;
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
GraphTime end;
stream->mBlocked.GetAt(mCurrentTime, &end);
if (end < GRAPH_TIME_MAX) {
blockingDecisionsWillChange = true;
}
}
if (blockingDecisionsWillChange) {
// Make sure we wake up to notify listeners about these changes.
EnsureNextIteration();
}
}
void
MediaStreamGraphImpl::RecomputeBlockingAt(GraphTime aTime,
GraphTime aEndBlockingDecisions,
GraphTime* aEnd)
{
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
stream->mBlocked.SetAtAndAfter(aTime, false);
}
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
// Stream might be blocked by some other stream (due to processing
// constraints)
if (stream->mBlocked.GetAt(aTime)) {
continue;
}
if (stream->mFinished) {
GraphTime endTime = StreamTimeToGraphTime(stream, stream->GetBufferEnd());
if (endTime <= aTime) {
LOG(PR_LOG_DEBUG, ("MediaStream %p is blocked due to being finished", stream));
MarkStreamBlocked(stream, aTime, aEnd);
continue;
} else {
LOG(PR_LOG_DEBUG, ("MediaStream %p is finished, but not blocked yet (end at %f, with blocking at %f)",
stream, MediaTimeToSeconds(stream->GetBufferEnd()),
MediaTimeToSeconds(endTime)));
*aEnd = NS_MIN(*aEnd, endTime);
}
}
// We don't need to explicitly check for cycles; streams in a cycle will
// just never be able to produce data, and WillUnderrun will trigger.
GraphTime end;
bool explicitBlock = stream->mExplicitBlockerCount.GetAt(aTime, &end) > 0;
*aEnd = NS_MIN(*aEnd, end);
if (explicitBlock) {
LOG(PR_LOG_DEBUG, ("MediaStream %p is blocked due to explicit blocker", stream));
MarkStreamBlocked(stream, aTime, aEnd);
continue;
}
bool underrun = WillUnderrun(stream, aTime, aEndBlockingDecisions, aEnd);
if (underrun) {
MarkStreamBlocked(stream, aTime, aEnd);
continue;
}
if (stream->mAudioOutputs.IsEmpty() && stream->mVideoOutputs.IsEmpty()) {
// See if the stream is being consumed anywhere. If not, it should block.
LOG(PR_LOG_DEBUG, ("MediaStream %p is blocked due to having no consumers", stream));
MarkStreamBlocked(stream, aTime, aEnd);
continue;
}
}
NS_ASSERTION(*aEnd > aTime, "Failed to advance!");
}
void
MediaStreamGraphImpl::UpdateFirstActiveTracks(MediaStream* aStream)
{
StreamBuffer::Track* newTracksByType[MediaSegment::TYPE_COUNT];
for (PRUint32 i = 0; i < ArrayLength(newTracksByType); ++i) {
newTracksByType[i] = nsnull;
}
for (StreamBuffer::TrackIter iter(aStream->mBuffer);
!iter.IsEnded(); iter.Next()) {
MediaSegment::Type type = iter->GetType();
if ((newTracksByType[type] &&
iter->GetStartTimeRoundDown() < newTracksByType[type]->GetStartTimeRoundDown()) ||
aStream->mFirstActiveTracks[type] == TRACK_NONE) {
newTracksByType[type] = &(*iter);
aStream->mFirstActiveTracks[type] = iter->GetID();
}
}
}
void
MediaStreamGraphImpl::CreateOrDestroyAudioStream(GraphTime aAudioOutputStartTime,
MediaStream* aStream)
{
StreamBuffer::Track* track;
if (aStream->mAudioOutputs.IsEmpty() ||
!(track = aStream->mBuffer.FindTrack(aStream->mFirstActiveTracks[MediaSegment::AUDIO]))) {
if (aStream->mAudioOutput) {
aStream->mAudioOutput->Shutdown();
aStream->mAudioOutput = nsnull;
}
return;
}
if (aStream->mAudioOutput)
return;
// No output stream created yet. Check if it's time to create one.
GraphTime startTime =
StreamTimeToGraphTime(aStream, track->GetStartTimeRoundDown(),
INCLUDE_TRAILING_BLOCKED_INTERVAL);
if (startTime >= mBlockingDecisionsMadeUntilTime) {
// The stream wants to play audio, but nothing will play for the forseeable
// future, so don't create the stream.
return;
}
// Don't bother destroying the nsAudioStream for ended tracks yet.
// XXX allocating a nsAudioStream could be slow so we're going to have to do
// something here ... preallocation, async allocation, multiplexing onto a single
// stream ...
AudioSegment* audio = track->Get<AudioSegment>();
aStream->mAudioPlaybackStartTime = aAudioOutputStartTime;
aStream->mAudioOutput = nsAudioStream::AllocateStream();
aStream->mAudioOutput->Init(audio->GetChannels(),
track->GetRate(),
audio->GetFirstFrameFormat());
}
void
MediaStreamGraphImpl::PlayAudio(MediaStream* aStream,
GraphTime aFrom, GraphTime aTo)
{
if (!aStream->mAudioOutput)
return;
StreamBuffer::Track* track =
aStream->mBuffer.FindTrack(aStream->mFirstActiveTracks[MediaSegment::AUDIO]);
AudioSegment* audio = track->Get<AudioSegment>();
// When we're playing multiple copies of this stream at the same time, they're
// perfectly correlated so adding volumes is the right thing to do.
float volume = 0.0f;
for (PRUint32 i = 0; i < aStream->mAudioOutputs.Length(); ++i) {
volume += aStream->mAudioOutputs[i].mVolume;
}
// We don't update aStream->mBufferStartTime here to account for
// time spent blocked. Instead, we'll update it in UpdateCurrentTime after the
// blocked period has completed. But we do need to make sure we play from the
// right offsets in the stream buffer, even if we've already written silence for
// some amount of blocked time after the current time.
GraphTime t = aFrom;
while (t < aTo) {
GraphTime end;
bool blocked = aStream->mBlocked.GetAt(t, &end);
end = NS_MIN(end, aTo);
AudioSegment output;
if (blocked) {
// Track total blocked time in aStream->mBlockedAudioTime so that
// the amount of silent samples we've inserted for blocking never gets
// more than one sample away from the ideal amount.
TrackTicks startTicks =
TimeToTicksRoundDown(track->GetRate(), aStream->mBlockedAudioTime);
aStream->mBlockedAudioTime += end - t;
TrackTicks endTicks =
TimeToTicksRoundDown(track->GetRate(), aStream->mBlockedAudioTime);
output.InitFrom(*audio);
output.InsertNullDataAtStart(endTicks - startTicks);
LOG(PR_LOG_DEBUG, ("MediaStream %p writing blocking-silence samples for %f to %f",
aStream, MediaTimeToSeconds(t), MediaTimeToSeconds(end)));
} else {
TrackTicks startTicks =
track->TimeToTicksRoundDown(GraphTimeToStreamTime(aStream, t));
TrackTicks endTicks =
track->TimeToTicksRoundDown(GraphTimeToStreamTime(aStream, end));
output.SliceFrom(*audio, startTicks, endTicks);
output.ApplyVolume(volume);
LOG(PR_LOG_DEBUG, ("MediaStream %p writing samples for %f to %f (samples %lld to %lld)",
aStream, MediaTimeToSeconds(t), MediaTimeToSeconds(end),
startTicks, endTicks));
}
output.WriteTo(aStream->mAudioOutput);
t = end;
}
}
void
MediaStreamGraphImpl::PlayVideo(MediaStream* aStream)
{
if (aStream->mVideoOutputs.IsEmpty())
return;
StreamBuffer::Track* track =
aStream->mBuffer.FindTrack(aStream->mFirstActiveTracks[MediaSegment::VIDEO]);
if (!track)
return;
VideoSegment* video = track->Get<VideoSegment>();
// Display the next frame a bit early. This is better than letting the current
// frame be displayed for too long.
GraphTime framePosition = mCurrentTime + MEDIA_GRAPH_TARGET_PERIOD_MS;
NS_ASSERTION(framePosition >= aStream->mBufferStartTime, "frame position before buffer?");
StreamTime frameBufferTime = GraphTimeToStreamTime(aStream, framePosition);
TrackTicks start;
const VideoFrame* frame =
video->GetFrameAt(track->TimeToTicksRoundDown(frameBufferTime), &start);
if (!frame) {
frame = video->GetLastFrame(&start);
if (!frame)
return;
}
if (*frame != aStream->mLastPlayedVideoFrame) {
LOG(PR_LOG_DEBUG, ("MediaStream %p writing video frame %p (%dx%d)",
aStream, frame->GetImage(), frame->GetIntrinsicSize().width,
frame->GetIntrinsicSize().height));
GraphTime startTime = StreamTimeToGraphTime(aStream,
track->TicksToTimeRoundDown(start), INCLUDE_TRAILING_BLOCKED_INTERVAL);
TimeStamp targetTime = mCurrentTimeStamp +
TimeDuration::FromMilliseconds(double(startTime - mCurrentTime));
for (PRUint32 i = 0; i < aStream->mVideoOutputs.Length(); ++i) {
VideoFrameContainer* output = aStream->mVideoOutputs[i];
output->SetCurrentFrame(frame->GetIntrinsicSize(), frame->GetImage(),
targetTime);
nsCOMPtr<nsIRunnable> event =
NS_NewRunnableMethod(output, &VideoFrameContainer::Invalidate);
NS_DispatchToMainThread(event, NS_DISPATCH_NORMAL);
}
aStream->mLastPlayedVideoFrame = *frame;
}
}
void
MediaStreamGraphImpl::PrepareUpdatesToMainThreadState()
{
mMonitor.AssertCurrentThreadOwns();
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
StreamUpdate* update = mStreamUpdates.AppendElement();
update->mGraphUpdateIndex = stream->mGraphUpdateIndices.GetAt(mCurrentTime);
update->mStream = stream;
update->mNextMainThreadCurrentTime =
GraphTimeToStreamTime(stream, mCurrentTime);
update->mNextMainThreadFinished =
stream->mFinished &&
StreamTimeToGraphTime(stream, stream->GetBufferEnd()) <= mCurrentTime;
}
mUpdateRunnables.MoveElementsFrom(mPendingUpdateRunnables);
EnsureStableStateEventPosted();
}
void
MediaStreamGraphImpl::EnsureImmediateWakeUpLocked(MonitorAutoLock& aLock)
{
if (mWaitState == WAITSTATE_WAITING_FOR_NEXT_ITERATION ||
mWaitState == WAITSTATE_WAITING_INDEFINITELY) {
mWaitState = WAITSTATE_WAKING_UP;
aLock.Notify();
}
}
void
MediaStreamGraphImpl::EnsureNextIteration()
{
MonitorAutoLock lock(mMonitor);
EnsureNextIterationLocked(lock);
}
void
MediaStreamGraphImpl::EnsureNextIterationLocked(MonitorAutoLock& aLock)
{
if (mNeedAnotherIteration)
return;
mNeedAnotherIteration = true;
if (mWaitState == WAITSTATE_WAITING_INDEFINITELY) {
mWaitState = WAITSTATE_WAKING_UP;
aLock.Notify();
}
}
void
MediaStreamGraphImpl::RunThread()
{
nsTArray<MessageBlock> messageQueue;
{
MonitorAutoLock lock(mMonitor);
messageQueue.SwapElements(mMessageQueue);
}
NS_ASSERTION(!messageQueue.IsEmpty(),
"Shouldn't have started a graph with empty message queue!");
for (;;) {
// Update mCurrentTime to the min of the playing audio times, or using the
// wall-clock time change if no audio is playing.
UpdateCurrentTime();
// Calculate independent action times for each batch of messages (each
// batch corresponding to an event loop task). This isolates the performance
// of different scripts to some extent.
for (PRUint32 i = 0; i < messageQueue.Length(); ++i) {
mProcessingGraphUpdateIndex = messageQueue[i].mGraphUpdateIndex;
nsTArray<nsAutoPtr<ControlMessage> >& messages = messageQueue[i].mMessages;
for (PRUint32 j = 0; j < mStreams.Length(); ++j) {
mStreams[j]->mMessageAffectedTime = GRAPH_TIME_MAX;
}
for (PRUint32 j = 0; j < messages.Length(); ++j) {
messages[j]->UpdateAffectedStream();
}
ChooseActionTime();
for (PRUint32 j = 0; j < messages.Length(); ++j) {
messages[j]->Process();
}
}
messageQueue.Clear();
// Grab pending ProcessingEngine results.
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
SourceMediaStream* is = mStreams[i]->AsSourceStream();
if (is) {
UpdateConsumptionState(is);
ExtractPendingInput(is);
}
}
GraphTime prevBlockingDecisionsMadeUntilTime = mBlockingDecisionsMadeUntilTime;
RecomputeBlocking();
PRUint32 audioStreamsActive = 0;
bool allBlockedForever = true;
// Figure out what each stream wants to do
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
UpdateFirstActiveTracks(stream);
CreateOrDestroyAudioStream(prevBlockingDecisionsMadeUntilTime, stream);
PlayAudio(stream, prevBlockingDecisionsMadeUntilTime,
mBlockingDecisionsMadeUntilTime);
if (stream->mAudioOutput) {
++audioStreamsActive;
}
PlayVideo(stream);
SourceMediaStream* is = stream->AsSourceStream();
if (is) {
UpdateBufferSufficiencyState(is);
}
GraphTime end;
if (!stream->mBlocked.GetAt(mCurrentTime, &end) || end < GRAPH_TIME_MAX) {
allBlockedForever = false;
}
}
if (!allBlockedForever || audioStreamsActive > 0) {
EnsureNextIteration();
}
{
MonitorAutoLock lock(mMonitor);
PrepareUpdatesToMainThreadState();
if (mForceShutDown || (IsEmpty() && mMessageQueue.IsEmpty())) {
// Enter shutdown mode. The stable-state handler will detect this
// and complete shutdown. Destroy any streams immediately.
LOG(PR_LOG_DEBUG, ("MediaStreamGraph %p waiting for main thread cleanup", this));
mLifecycleState = LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP;
{
MonitorAutoUnlock unlock(mMonitor);
// Unlock mMonitor while destroying our streams, since
// SourceMediaStream::DestroyImpl needs to take its lock while
// we're not holding mMonitor.
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
mStreams[i]->DestroyImpl();
}
}
return;
}
PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
TimeStamp now = TimeStamp::Now();
if (mNeedAnotherIteration) {
PRInt64 timeoutMS = MEDIA_GRAPH_TARGET_PERIOD_MS -
PRInt64((now - mCurrentTimeStamp).ToMilliseconds());
// Make sure timeoutMS doesn't overflow 32 bits by waking up at
// least once a minute, if we need to wake up at all
timeoutMS = NS_MAX<PRInt64>(0, NS_MIN<PRInt64>(timeoutMS, 60*1000));
timeout = PR_MillisecondsToInterval(PRUint32(timeoutMS));
LOG(PR_LOG_DEBUG, ("Waiting for next iteration; at %f, timeout=%f",
(now - mInitialTimeStamp).ToSeconds(), timeoutMS/1000.0));
mWaitState = WAITSTATE_WAITING_FOR_NEXT_ITERATION;
} else {
mWaitState = WAITSTATE_WAITING_INDEFINITELY;
}
if (timeout > 0) {
lock.Wait(timeout);
LOG(PR_LOG_DEBUG, ("Resuming after timeout; at %f, elapsed=%f",
(TimeStamp::Now() - mInitialTimeStamp).ToSeconds(),
(TimeStamp::Now() - now).ToSeconds()));
}
mWaitState = WAITSTATE_RUNNING;
mNeedAnotherIteration = false;
messageQueue.SwapElements(mMessageQueue);
}
}
}
void
MediaStreamGraphImpl::ApplyStreamUpdate(StreamUpdate* aUpdate)
{
mMonitor.AssertCurrentThreadOwns();
MediaStream* stream = aUpdate->mStream;
if (!stream)
return;
stream->mMainThreadCurrentTime = aUpdate->mNextMainThreadCurrentTime;
stream->mMainThreadFinished = aUpdate->mNextMainThreadFinished;
}
void
MediaStreamGraphImpl::ShutdownThreads()
{
NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread");
// mGraph's thread is not running so it's OK to do whatever here
LOG(PR_LOG_DEBUG, ("Stopping threads for MediaStreamGraph %p", this));
if (mThread) {
mThread->Shutdown();
mThread = nsnull;
}
}
void
MediaStreamGraphImpl::ForceShutDown()
{
NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread");
LOG(PR_LOG_DEBUG, ("MediaStreamGraph %p ForceShutdown", this));
{
MonitorAutoLock lock(mMonitor);
mForceShutDown = true;
EnsureImmediateWakeUpLocked(lock);
}
}
namespace {
class MediaStreamGraphThreadRunnable : public nsRunnable {
public:
NS_IMETHOD Run()
{
gGraph->RunThread();
return NS_OK;
}
};
class MediaStreamGraphShutDownRunnable : public nsRunnable {
public:
MediaStreamGraphShutDownRunnable(MediaStreamGraphImpl* aGraph) : mGraph(aGraph) {}
NS_IMETHOD Run()
{
NS_ASSERTION(mGraph->mDetectedNotRunning,
"We should know the graph thread control loop isn't running!");
// mGraph's thread is not running so it's OK to do whatever here
if (mGraph->IsEmpty()) {
// mGraph is no longer needed, so delete it. If the graph is not empty
// then we must be in a forced shutdown and some later AppendMessage will
// detect that the manager has been emptied, and delete it.
delete mGraph;
} else {
NS_ASSERTION(mGraph->mForceShutDown, "Not in forced shutdown?");
mGraph->mLifecycleState =
MediaStreamGraphImpl::LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION;
}
return NS_OK;
}
private:
MediaStreamGraphImpl* mGraph;
};
class MediaStreamGraphStableStateRunnable : public nsRunnable {
public:
NS_IMETHOD Run()
{
if (gGraph) {
gGraph->RunInStableState();
}
return NS_OK;
}
};
/*
* Control messages forwarded from main thread to graph manager thread
*/
class CreateMessage : public ControlMessage {
public:
CreateMessage(MediaStream* aStream) : ControlMessage(aStream) {}
virtual void UpdateAffectedStream()
{
mStream->GraphImpl()->AddStream(mStream);
}
virtual void Process()
{
mStream->Init();
}
};
class MediaStreamGraphShutdownObserver : public nsIObserver
{
public:
NS_DECL_ISUPPORTS
NS_DECL_NSIOBSERVER
};
}
void
MediaStreamGraphImpl::RunInStableState()
{
NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread");
nsTArray<nsCOMPtr<nsIRunnable> > runnables;
{
MonitorAutoLock lock(mMonitor);
mPostedRunInStableStateEvent = false;
runnables.SwapElements(mUpdateRunnables);
for (PRUint32 i = 0; i < mStreamUpdates.Length(); ++i) {
StreamUpdate* update = &mStreamUpdates[i];
if (update->mStream) {
ApplyStreamUpdate(update);
}
}
mStreamUpdates.Clear();
if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP && mForceShutDown) {
for (PRUint32 i = 0; i < mMessageQueue.Length(); ++i) {
MessageBlock& mb = mMessageQueue[i];
for (PRUint32 j = 0; j < mb.mMessages.Length(); ++j) {
mb.mMessages[j]->ProcessDuringShutdown();
}
}
mMessageQueue.Clear();
for (PRUint32 i = 0; i < mCurrentTaskMessageQueue.Length(); ++i) {
mCurrentTaskMessageQueue[i]->ProcessDuringShutdown();
}
mCurrentTaskMessageQueue.Clear();
// Stop MediaStreamGraph threads. Do not clear gGraph since
// we have outstanding DOM objects that may need it.
mLifecycleState = LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN;
nsCOMPtr<nsIRunnable> event = new MediaStreamGraphShutDownRunnable(this);
NS_DispatchToMainThread(event);
}
if (mLifecycleState == LIFECYCLE_THREAD_NOT_STARTED) {
mLifecycleState = LIFECYCLE_RUNNING;
// Start the thread now. We couldn't start it earlier because
// the graph might exit immediately on finding it has no streams. The
// first message for a new graph must create a stream.
nsCOMPtr<nsIRunnable> event = new MediaStreamGraphThreadRunnable();
NS_NewThread(getter_AddRefs(mThread), event);
}
if (mCurrentTaskMessageQueue.IsEmpty()) {
if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP && IsEmpty()) {
NS_ASSERTION(gGraph == this, "Not current graph??");
// Complete shutdown. First, ensure that this graph is no longer used.
// A new graph graph will be created if one is needed.
LOG(PR_LOG_DEBUG, ("Disconnecting MediaStreamGraph %p", gGraph));
gGraph = nsnull;
// Asynchronously clean up old graph. We don't want to do this
// synchronously because it spins the event loop waiting for threads
// to shut down, and we don't want to do that in a stable state handler.
mLifecycleState = LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN;
nsCOMPtr<nsIRunnable> event = new MediaStreamGraphShutDownRunnable(this);
NS_DispatchToMainThread(event);
}
} else {
if (mLifecycleState <= LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) {
MessageBlock* block = mMessageQueue.AppendElement();
block->mMessages.SwapElements(mCurrentTaskMessageQueue);
block->mGraphUpdateIndex = mGraphUpdatesSent;
++mGraphUpdatesSent;
EnsureNextIterationLocked(lock);
}
if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) {
mLifecycleState = LIFECYCLE_RUNNING;
// Revive the MediaStreamGraph since we have more messages going to it.
// Note that we need to put messages into its queue before reviving it,
// or it might exit immediately.
nsCOMPtr<nsIRunnable> event = new MediaStreamGraphThreadRunnable();
mThread->Dispatch(event, 0);
}
}
mDetectedNotRunning = mLifecycleState > LIFECYCLE_RUNNING;
}
// Make sure we get a new current time in the next event loop task
mPostedRunInStableState = false;
for (PRUint32 i = 0; i < runnables.Length(); ++i) {
runnables[i]->Run();
}
}
static NS_DEFINE_CID(kAppShellCID, NS_APPSHELL_CID);
void
MediaStreamGraphImpl::EnsureRunInStableState()
{
NS_ASSERTION(NS_IsMainThread(), "main thread only");
if (mPostedRunInStableState)
return;
mPostedRunInStableState = true;
nsCOMPtr<nsIRunnable> event = new MediaStreamGraphStableStateRunnable();
nsCOMPtr<nsIAppShell> appShell = do_GetService(kAppShellCID);
if (appShell) {
appShell->RunInStableState(event);
} else {
NS_ERROR("Appshell already destroyed?");
}
}
void
MediaStreamGraphImpl::EnsureStableStateEventPosted()
{
mMonitor.AssertCurrentThreadOwns();
if (mPostedRunInStableStateEvent)
return;
mPostedRunInStableStateEvent = true;
nsCOMPtr<nsIRunnable> event = new MediaStreamGraphStableStateRunnable();
NS_DispatchToMainThread(event);
}
void
MediaStreamGraphImpl::AppendMessage(ControlMessage* aMessage)
{
NS_ASSERTION(NS_IsMainThread(), "main thread only");
if (mDetectedNotRunning &&
mLifecycleState > LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) {
// The graph control loop is not running and main thread cleanup has
// happened. From now on we can't append messages to mCurrentTaskMessageQueue,
// because that will never be processed again, so just ProcessDuringShutdown
// this message.
// This should only happen during forced shutdown.
aMessage->ProcessDuringShutdown();
delete aMessage;
if (IsEmpty()) {
NS_ASSERTION(gGraph == this, "Switched managers during forced shutdown?");
gGraph = nsnull;
delete this;
}
return;
}
mCurrentTaskMessageQueue.AppendElement(aMessage);
EnsureRunInStableState();
}
void
MediaStream::Init()
{
MediaStreamGraphImpl* graph = GraphImpl();
mBlocked.SetAtAndAfter(graph->mCurrentTime, true);
mExplicitBlockerCount.SetAtAndAfter(graph->mCurrentTime, true);
mExplicitBlockerCount.SetAtAndAfter(graph->mLastActionTime, false);
}
MediaStreamGraphImpl*
MediaStream::GraphImpl()
{
return gGraph;
}
void
MediaStream::DestroyImpl()
{
if (mAudioOutput) {
mAudioOutput->Shutdown();
mAudioOutput = nsnull;
}
}
void
MediaStream::Destroy()
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream) : ControlMessage(aStream) {}
virtual void UpdateAffectedStream()
{
mStream->DestroyImpl();
mStream->GraphImpl()->RemoveStream(mStream);
}
virtual void ProcessDuringShutdown()
{ UpdateAffectedStream(); }
};
mWrapper = nsnull;
GraphImpl()->AppendMessage(new Message(this));
}
void
MediaStream::AddAudioOutput(void* aKey)
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, void* aKey) : ControlMessage(aStream), mKey(aKey) {}
virtual void UpdateAffectedStream()
{
mStream->AddAudioOutputImpl(mKey);
}
void* mKey;
};
GraphImpl()->AppendMessage(new Message(this, aKey));
}
void
MediaStream::SetAudioOutputVolumeImpl(void* aKey, float aVolume)
{
for (PRUint32 i = 0; i < mAudioOutputs.Length(); ++i) {
if (mAudioOutputs[i].mKey == aKey) {
mAudioOutputs[i].mVolume = aVolume;
return;
}
}
NS_ERROR("Audio output key not found");
}
void
MediaStream::SetAudioOutputVolume(void* aKey, float aVolume)
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, void* aKey, float aVolume) :
ControlMessage(aStream), mKey(aKey), mVolume(aVolume) {}
virtual void UpdateAffectedStream()
{
mStream->SetAudioOutputVolumeImpl(mKey, mVolume);
}
void* mKey;
float mVolume;
};
GraphImpl()->AppendMessage(new Message(this, aKey, aVolume));
}
void
MediaStream::RemoveAudioOutputImpl(void* aKey)
{
for (PRUint32 i = 0; i < mAudioOutputs.Length(); ++i) {
if (mAudioOutputs[i].mKey == aKey) {
mAudioOutputs.RemoveElementAt(i);
return;
}
}
NS_ERROR("Audio output key not found");
}
void
MediaStream::RemoveAudioOutput(void* aKey)
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, void* aKey) :
ControlMessage(aStream), mKey(aKey) {}
virtual void UpdateAffectedStream()
{
mStream->RemoveAudioOutputImpl(mKey);
}
void* mKey;
};
GraphImpl()->AppendMessage(new Message(this, aKey));
}
void
MediaStream::AddVideoOutput(VideoFrameContainer* aContainer)
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, VideoFrameContainer* aContainer) :
ControlMessage(aStream), mContainer(aContainer) {}
virtual void UpdateAffectedStream()
{
mStream->AddVideoOutputImpl(mContainer.forget());
}
nsRefPtr<VideoFrameContainer> mContainer;
};
GraphImpl()->AppendMessage(new Message(this, aContainer));
}
void
MediaStream::RemoveVideoOutput(VideoFrameContainer* aContainer)
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, VideoFrameContainer* aContainer) :
ControlMessage(aStream), mContainer(aContainer) {}
virtual void UpdateAffectedStream()
{
mStream->RemoveVideoOutputImpl(mContainer);
}
nsRefPtr<VideoFrameContainer> mContainer;
};
GraphImpl()->AppendMessage(new Message(this, aContainer));
}
void
MediaStream::ChangeExplicitBlockerCount(PRInt32 aDelta)
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, PRInt32 aDelta) :
ControlMessage(aStream), mDelta(aDelta) {}
virtual void UpdateAffectedStream()
{
mStream->ChangeExplicitBlockerCountImpl(
mStream->GraphImpl()->mLastActionTime, mDelta);
}
PRInt32 mDelta;
};
GraphImpl()->AppendMessage(new Message(this, aDelta));
}
void
MediaStream::AddListenerImpl(already_AddRefed<MediaStreamListener> aListener)
{
MediaStreamListener* listener = *mListeners.AppendElement() = aListener;
listener->NotifyBlockingChanged(GraphImpl(),
mBlocked.GetAt(GraphImpl()->mCurrentTime) ? MediaStreamListener::BLOCKED : MediaStreamListener::UNBLOCKED);
if (mNotifiedFinished) {
listener->NotifyFinished(GraphImpl());
}
}
void
MediaStream::AddListener(MediaStreamListener* aListener)
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, MediaStreamListener* aListener) :
ControlMessage(aStream), mListener(aListener) {}
virtual void UpdateAffectedStream()
{
mStream->AddListenerImpl(mListener.forget());
}
nsRefPtr<MediaStreamListener> mListener;
};
GraphImpl()->AppendMessage(new Message(this, aListener));
}
void
MediaStream::RemoveListener(MediaStreamListener* aListener)
{
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, MediaStreamListener* aListener) :
ControlMessage(aStream), mListener(aListener) {}
virtual void UpdateAffectedStream()
{
mStream->RemoveListenerImpl(mListener);
}
nsRefPtr<MediaStreamListener> mListener;
};
GraphImpl()->AppendMessage(new Message(this, aListener));
}
void
SourceMediaStream::DestroyImpl()
{
{
MutexAutoLock lock(mMutex);
mDestroyed = true;
}
MediaStream::DestroyImpl();
}
void
SourceMediaStream::AddTrack(TrackID aID, TrackRate aRate, TrackTicks aStart,
MediaSegment* aSegment)
{
MutexAutoLock lock(mMutex);
TrackData* data = mUpdateTracks.AppendElement();
data->mID = aID;
data->mRate = aRate;
data->mStart = aStart;
data->mCommands = TRACK_CREATE;
data->mData = aSegment;
data->mHaveEnough = false;
if (!mDestroyed) {
GraphImpl()->EnsureNextIteration();
}
}
void
SourceMediaStream::AppendToTrack(TrackID aID, MediaSegment* aSegment)
{
MutexAutoLock lock(mMutex);
TrackData *track = FindDataForTrack(aID);
if (track) {
track->mData->AppendFrom(aSegment);
} else {
NS_ERROR("Append to non-existent track!");
}
if (!mDestroyed) {
GraphImpl()->EnsureNextIteration();
}
}
bool
SourceMediaStream::HaveEnoughBuffered(TrackID aID)
{
MutexAutoLock lock(mMutex);
TrackData *track = FindDataForTrack(aID);
if (track) {
return track->mHaveEnough;
}
NS_ERROR("No track in HaveEnoughBuffered!");
return true;
}
void
SourceMediaStream::DispatchWhenNotEnoughBuffered(TrackID aID,
nsIThread* aSignalThread, nsIRunnable* aSignalRunnable)
{
MutexAutoLock lock(mMutex);
TrackData* data = FindDataForTrack(aID);
if (!data) {
NS_ERROR("No track in DispatchWhenNotEnoughBuffered");
return;
}
if (data->mHaveEnough) {
data->mDispatchWhenNotEnough.AppendElement()->Init(aSignalThread, aSignalRunnable);
} else {
aSignalThread->Dispatch(aSignalRunnable, 0);
}
}
void
SourceMediaStream::EndTrack(TrackID aID)
{
MutexAutoLock lock(mMutex);
TrackData *track = FindDataForTrack(aID);
if (track) {
track->mCommands |= TRACK_END;
} else {
NS_ERROR("End of non-existant track");
}
if (!mDestroyed) {
GraphImpl()->EnsureNextIteration();
}
}
void
SourceMediaStream::AdvanceKnownTracksTime(StreamTime aKnownTime)
{
MutexAutoLock lock(mMutex);
mUpdateKnownTracksTime = aKnownTime;
if (!mDestroyed) {
GraphImpl()->EnsureNextIteration();
}
}
void
SourceMediaStream::Finish()
{
MutexAutoLock lock(mMutex);
mUpdateFinished = true;
if (!mDestroyed) {
GraphImpl()->EnsureNextIteration();
}
}
static const PRUint32 kThreadLimit = 4;
static const PRUint32 kIdleThreadLimit = 4;
static const PRUint32 kIdleThreadTimeoutMs = 2000;
/**
* We make the initial mCurrentTime nonzero so that zero times can have
* special meaning if necessary.
*/
static const PRInt32 INITIAL_CURRENT_TIME = 1;
MediaStreamGraphImpl::MediaStreamGraphImpl()
: mLastActionTime(INITIAL_CURRENT_TIME)
, mCurrentTime(INITIAL_CURRENT_TIME)
, mBlockingDecisionsMadeUntilTime(1)
, mProcessingGraphUpdateIndex(0)
, mMonitor("MediaStreamGraphImpl")
, mLifecycleState(LIFECYCLE_THREAD_NOT_STARTED)
, mWaitState(WAITSTATE_RUNNING)
, mNeedAnotherIteration(false)
, mForceShutDown(false)
, mPostedRunInStableStateEvent(false)
, mDetectedNotRunning(false)
, mPostedRunInStableState(false)
{
#ifdef PR_LOGGING
if (!gMediaStreamGraphLog) {
gMediaStreamGraphLog = PR_NewLogModule("MediaStreamGraph");
}
#endif
mCurrentTimeStamp = mInitialTimeStamp = TimeStamp::Now();
}
NS_IMPL_ISUPPORTS1(MediaStreamGraphShutdownObserver, nsIObserver)
static bool gShutdownObserverRegistered = false;
NS_IMETHODIMP
MediaStreamGraphShutdownObserver::Observe(nsISupports *aSubject,
const char *aTopic,
const PRUnichar *aData)
{
if (strcmp(aTopic, NS_XPCOM_SHUTDOWN_OBSERVER_ID) == 0) {
if (gGraph) {
gGraph->ForceShutDown();
}
nsContentUtils::UnregisterShutdownObserver(this);
gShutdownObserverRegistered = false;
}
return NS_OK;
}
MediaStreamGraph*
MediaStreamGraph::GetInstance()
{
NS_ASSERTION(NS_IsMainThread(), "Main thread only");
if (!gGraph) {
if (!gShutdownObserverRegistered) {
gShutdownObserverRegistered = true;
nsContentUtils::RegisterShutdownObserver(new MediaStreamGraphShutdownObserver());
}
gGraph = new MediaStreamGraphImpl();
LOG(PR_LOG_DEBUG, ("Starting up MediaStreamGraph %p", gGraph));
}
return gGraph;
}
SourceMediaStream*
MediaStreamGraph::CreateInputStream(nsDOMMediaStream* aWrapper)
{
SourceMediaStream* stream = new SourceMediaStream(aWrapper);
NS_ADDREF(stream);
static_cast<MediaStreamGraphImpl*>(this)->AppendMessage(new CreateMessage(stream));
return stream;
}
}