/*-*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "MediaStreamGraph.h" #include "mozilla/Monitor.h" #include "mozilla/TimeStamp.h" #include "AudioSegment.h" #include "VideoSegment.h" #include "nsContentUtils.h" #include "nsIAppShell.h" #include "nsIObserver.h" #include "nsServiceManagerUtils.h" #include "nsWidgetsCID.h" #include "nsXPCOMCIDInternal.h" #include "prlog.h" #include "VideoUtils.h" using namespace mozilla::layers; namespace mozilla { namespace { #ifdef PR_LOGGING PRLogModuleInfo* gMediaStreamGraphLog; #define LOG(type, msg) PR_LOG(gMediaStreamGraphLog, type, msg) #else #define LOG(type, msg) #endif /** * Assume we can run an iteration of the MediaStreamGraph loop in this much time * or less. * We try to run the control loop at this rate. */ const int MEDIA_GRAPH_TARGET_PERIOD_MS = 10; /** * Assume that we might miss our scheduled wakeup of the MediaStreamGraph by * this much. */ const int SCHEDULE_SAFETY_MARGIN_MS = 10; /** * Try have this much audio buffered in streams and queued to the hardware. * The maximum delay to the end of the next control loop * is 2*MEDIA_GRAPH_TARGET_PERIOD_MS + SCHEDULE_SAFETY_MARGIN_MS. * There is no point in buffering more audio than this in a stream at any * given time (until we add processing). * This is not optimal yet. */ const int AUDIO_TARGET_MS = 2*MEDIA_GRAPH_TARGET_PERIOD_MS + SCHEDULE_SAFETY_MARGIN_MS; /** * Try have this much video buffered. Video frames are set * near the end of the iteration of the control loop. The maximum delay * to the setting of the next video frame is 2*MEDIA_GRAPH_TARGET_PERIOD_MS + * SCHEDULE_SAFETY_MARGIN_MS. This is not optimal yet. */ const int VIDEO_TARGET_MS = 2*MEDIA_GRAPH_TARGET_PERIOD_MS + SCHEDULE_SAFETY_MARGIN_MS; /** * A per-stream update message passed from the media graph thread to the * main thread. */ struct StreamUpdate { PRInt64 mGraphUpdateIndex; nsRefPtr mStream; StreamTime mNextMainThreadCurrentTime; bool mNextMainThreadFinished; }; /** * This represents a message passed from the main thread to the graph thread. * A ControlMessage always references a particular affected stream. */ class ControlMessage { public: ControlMessage(MediaStream* aStream) : mStream(aStream) { MOZ_COUNT_CTOR(ControlMessage); } // All these run on the graph thread virtual ~ControlMessage() { MOZ_COUNT_DTOR(ControlMessage); } // Executed before we know what the action time for this message will be. // Call NoteStreamAffected on the stream whose output will be // modified by this message. Default implementation calls // NoteStreamAffected(mStream). virtual void UpdateAffectedStream(); // Executed after we know what the action time for this message will be. virtual void Process() {} // When we're shutting down the application, most messages are ignored but // some cleanup messages should still be processed (on the main thread). virtual void ProcessDuringShutdown() {} protected: // We do not hold a reference to mStream. The main thread will be holding // a reference to the stream while this message is in flight. The last message // referencing a stream is the Destroy message for that stream. MediaStream* mStream; }; } /** * The implementation of a media stream graph. This class is private to this * file. It's not in the anonymous namespace because MediaStream needs to * be able to friend it. * * Currently we only have one per process. */ class MediaStreamGraphImpl : public MediaStreamGraph { public: MediaStreamGraphImpl(); ~MediaStreamGraphImpl() { NS_ASSERTION(IsEmpty(), "All streams should have been destroyed by messages from the main thread"); LOG(PR_LOG_DEBUG, ("MediaStreamGraph %p destroyed", this)); } // Main thread only. /** * This runs every time we need to sync state from the media graph thread * to the main thread while the main thread is not in the middle * of a script. It runs during a "stable state" (per HTML5) or during * an event posted to the main thread. */ void RunInStableState(); /** * Ensure a runnable to run RunInStableState is posted to the appshell to * run at the next stable state (per HTML5). * See EnsureStableStateEventPosted. */ void EnsureRunInStableState(); /** * Called to apply a StreamUpdate to its stream. */ void ApplyStreamUpdate(StreamUpdate* aUpdate); /** * Append a ControlMessage to the message queue. This queue is drained * during RunInStableState; the messages will run on the graph thread. */ void AppendMessage(ControlMessage* aMessage); /** * Make this MediaStreamGraph enter forced-shutdown state. This state * will be noticed by the media graph thread, which will shut down all streams * and other state controlled by the media graph thread. * This is called during application shutdown. */ void ForceShutDown(); /** * Shutdown() this MediaStreamGraph's threads and return when they've shut down. */ void ShutdownThreads(); // The following methods run on the graph thread (or possibly the main thread if // mLifecycleState > LIFECYCLE_RUNNING) /** * Runs main control loop on the graph thread. Normally a single invocation * of this runs for the entire lifetime of the graph thread. */ void RunThread(); /** * Call this to indicate that another iteration of the control loop is * required on its regular schedule. The monitor must not be held. */ void EnsureNextIteration(); /** * As above, but with the monitor already held. */ void EnsureNextIterationLocked(MonitorAutoLock& aLock); /** * Call this to indicate that another iteration of the control loop is * required immediately. The monitor must already be held. */ void EnsureImmediateWakeUpLocked(MonitorAutoLock& aLock); /** * Ensure there is an event posted to the main thread to run RunInStableState. * mMonitor must be held. * See EnsureRunInStableState */ void EnsureStableStateEventPosted(); /** * Generate messages to the main thread to update it for all state changes. * mMonitor must be held. */ void PrepareUpdatesToMainThreadState(); // The following methods are the various stages of RunThread processing. /** * Compute a new current time for the graph and advance all on-graph-thread * state to the new current time. */ void UpdateCurrentTime(); /** * Update mLastActionTime to the time at which the current set of messages * will take effect. */ void ChooseActionTime(); /** * Extract any state updates pending in aStream, and apply them. */ void ExtractPendingInput(SourceMediaStream* aStream); /** * Update "have enough data" flags in aStream. */ void UpdateBufferSufficiencyState(SourceMediaStream* aStream); /** * Compute the blocking states of streams from mBlockingDecisionsMadeUntilTime * until the desired future time (determined by heuristic). * Updates mBlockingDecisionsMadeUntilTime and sets MediaStream::mBlocked * for all streams. */ void RecomputeBlocking(); // The following methods are used to help RecomputeBlocking. /** * Mark a stream blocked at time aTime. If this results in decisions that need * to be revisited at some point in the future, *aEnd will be reduced to the * first time in the future to recompute those decisions. */ void MarkStreamBlocked(MediaStream* aStream, GraphTime aTime, GraphTime* aEnd); /** * Recompute blocking for all streams for the interval starting at aTime. * If this results in decisions that need to be revisited at some point * in the future, *aEnd will be reduced to the first time in the future to * recompute those decisions. */ void RecomputeBlockingAt(GraphTime aTime, GraphTime aEndBlockingDecisions, GraphTime* aEnd); /** * Returns true if aStream will underrun at aTime for its own playback. * aEndBlockingDecisions is when we plan to stop making blocking decisions. * *aEnd will be reduced to the first time in the future to recompute these * decisions. */ bool WillUnderrun(MediaStream* aStream, GraphTime aTime, GraphTime aEndBlockingDecisions, GraphTime* aEnd); /** * Return true if there is an explicit blocker set from the current time * indefinitely far into the future. */ bool IsAlwaysExplicitlyBlocked(MediaStream* aStream); /** * Given a graph time aTime, convert it to a stream time taking into * account the time during which aStream is scheduled to be blocked. */ StreamTime GraphTimeToStreamTime(MediaStream* aStream, StreamTime aTime); enum { INCLUDE_TRAILING_BLOCKED_INTERVAL = 0x01 }; /** * Given a stream time aTime, convert it to a graph time taking into * account the time during which aStream is scheduled to be blocked. * aTime must be <= mBlockingDecisionsMadeUntilTime since blocking decisions * are only known up to that point. * If aTime is exactly at the start of a blocked interval, then the blocked * interval is included in the time returned if and only if * aFlags includes INCLUDE_TRAILING_BLOCKED_INTERVAL. */ GraphTime StreamTimeToGraphTime(MediaStream* aStream, StreamTime aTime, PRUint32 aFlags = 0); /** * Get the current audio position of the stream's audio output. */ GraphTime GetAudioPosition(MediaStream* aStream); /** * If aStream needs an audio stream but doesn't have one, create it. * If aStream doesn't need an audio stream but has one, destroy it. */ void CreateOrDestroyAudioStream(GraphTime aAudioOutputStartTime, MediaStream* aStream); /** * Update aStream->mFirstActiveTracks. */ void UpdateFirstActiveTracks(MediaStream* aStream); /** * Queue audio (mix of stream audio and silence for blocked intervals) * to the audio output stream. */ void PlayAudio(MediaStream* aStream, GraphTime aFrom, GraphTime aTo); /** * Set the correct current video frame for stream aStream. */ void PlayVideo(MediaStream* aStream); /** * No more data will be forthcoming for aStream. The stream will end * at the current buffer end point. The StreamBuffer's tracks must be * explicitly set to finished by the caller. */ void FinishStream(MediaStream* aStream); /** * Compute how much stream data we would like to buffer for aStream. */ StreamTime GetDesiredBufferEnd(MediaStream* aStream); /** * Returns true when there are no active streams. */ bool IsEmpty() { return mStreams.IsEmpty(); } // For use by control messages /** * Identify which graph update index we are currently processing. */ PRInt64 GetProcessingGraphUpdateIndex() { return mProcessingGraphUpdateIndex; } /** * Marks aStream as affected by a change in its output at desired time aTime * (in the timeline of aStream). The change may not actually happen at this time, * it may be delayed until later if there is buffered data we can't change. */ void NoteStreamAffected(MediaStream* aStream, double aTime); /** * Marks aStream as affected by a change in its output at the earliest * possible time. */ void NoteStreamAffected(MediaStream* aStream); /** * Add aStream to the graph and initializes its graph-specific state. */ void AddStream(MediaStream* aStream); /** * Remove aStream from the graph. Ensures that pending messages about the * stream back to the main thread are flushed. */ void RemoveStream(MediaStream* aStream); /** * Compute the earliest time at which an action be allowed to occur on any * stream. Actions cannot be earlier than the previous action time, and * cannot affect already-committed blocking decisions (and associated * buffered audio). */ GraphTime GetEarliestActionTime() { return NS_MAX(mCurrentTime, NS_MAX(mLastActionTime, mBlockingDecisionsMadeUntilTime)); } // Data members /** * Media graph thread. * Readonly after initialization on the main thread. */ nsCOMPtr mThread; // The following state is managed on the graph thread only, unless // mLifecycleState > LIFECYCLE_RUNNING in which case the graph thread // is not running and this state can be used from the main thread. nsTArray > mStreams; /** * The time the last action was deemed to have occurred. This could be * later than mCurrentTime if actions have to be delayed during data * buffering, or before mCurrentTime if mCurrentTime has advanced since * the last action happened. In ControlMessage::Process calls, * mLastActionTime has always been updated to be >= mCurrentTime. */ GraphTime mLastActionTime; /** * The current graph time for the current iteration of the RunThread control * loop. */ GraphTime mCurrentTime; /** * Blocking decisions have been made up to this time. We also buffer audio * up to this time. */ GraphTime mBlockingDecisionsMadeUntilTime; /** * This is only used for logging. */ TimeStamp mInitialTimeStamp; /** * The real timestamp of the latest run of UpdateCurrentTime. */ TimeStamp mCurrentTimeStamp; /** * Which update batch we are currently processing. */ PRInt64 mProcessingGraphUpdateIndex; // mMonitor guards the data below. // MediaStreamGraph normally does its work without holding mMonitor, so it is // not safe to just grab mMonitor from some thread and start monkeying with // the graph. Instead, communicate with the graph thread using provided // mechanisms such as the ControlMessage queue. Monitor mMonitor; // Data guarded by mMonitor (must always be accessed with mMonitor held, // regardless of the value of mLifecycleState. /** * State to copy to main thread */ nsTArray mStreamUpdates; /** * Runnables to run after the next update to main thread state. */ nsTArray > mUpdateRunnables; struct MessageBlock { PRInt64 mGraphUpdateIndex; nsTArray > mMessages; }; /** * A list of batches of messages to process. Each batch is processed * as an atomic unit. */ nsTArray mMessageQueue; /** * This enum specifies where this graph is in its lifecycle. This is used * to control shutdown. * Shutdown is tricky because it can happen in two different ways: * 1) Shutdown due to inactivity. RunThread() detects that it has no * pending messages and no streams, and exits. The next RunInStableState() * checks if there are new pending messages from the main thread (true only * if new stream creation raced with shutdown); if there are, it revives * RunThread(), otherwise it commits to shutting down the graph. New stream * creation after this point will create a new graph. An async event is * dispatched to Shutdown() the graph's threads and then delete the graph * object. * 2) Forced shutdown at application shutdown. A flag is set, RunThread() * detects the flag and exits, the next RunInStableState() detects the flag, * and dispatches the async event to Shutdown() the graph's threads. However * the graph object is not deleted. New messages for the graph are processed * synchronously on the main thread if necessary. When the last stream is * destroyed, the graph object is deleted. */ enum LifecycleState { // The graph thread hasn't started yet. LIFECYCLE_THREAD_NOT_STARTED, // RunThread() is running normally. LIFECYCLE_RUNNING, // In the following states, the graph thread is not running so // all "graph thread only" state in this class can be used safely // on the main thread. // RunThread() has exited and we're waiting for the next // RunInStableState(), at which point we can clean up the main-thread // side of the graph. LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP, // RunInStableState() posted a ShutdownRunnable, and we're waiting for it // to shut down the graph thread(s). LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN, // Graph threads have shut down but we're waiting for remaining streams // to be destroyed. Only happens during application shutdown since normally // we'd only shut down a graph when it has no streams. LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION }; LifecycleState mLifecycleState; /** * This enum specifies the wait state of the graph thread. */ enum WaitState { // RunThread() is running normally WAITSTATE_RUNNING, // RunThread() is paused waiting for its next iteration, which will // happen soon WAITSTATE_WAITING_FOR_NEXT_ITERATION, // RunThread() is paused indefinitely waiting for something to change WAITSTATE_WAITING_INDEFINITELY, // Something has signaled RunThread() to wake up immediately, // but it hasn't done so yet WAITSTATE_WAKING_UP }; WaitState mWaitState; /** * True when another iteration of the control loop is required. */ bool mNeedAnotherIteration; /** * True when we need to do a forced shutdown during application shutdown. */ bool mForceShutDown; /** * True when we have posted an event to the main thread to run * RunInStableState() and the event hasn't run yet. */ bool mPostedRunInStableStateEvent; // Main thread only /** * Messages posted by the current event loop task. These are forwarded to * the media graph thread during RunInStableState. We can't forward them * immediately because we want all messages between stable states to be * processed as an atomic batch. */ nsTArray > mCurrentTaskMessageQueue; /** * True when RunInStableState has determined that mLifecycleState is > * LIFECYCLE_RUNNING. Since only the main thread can reset mLifecycleState to * LIFECYCLE_RUNNING, this can be relied on to not change unexpectedly. */ bool mDetectedNotRunning; /** * True when a stable state runner has been posted to the appshell to run * RunInStableState at the next stable state. */ bool mPostedRunInStableState; }; /** * The singleton graph instance. */ static MediaStreamGraphImpl* gGraph; StreamTime MediaStreamGraphImpl::GetDesiredBufferEnd(MediaStream* aStream) { StreamTime current = mCurrentTime - aStream->mBufferStartTime; StreamTime desiredEnd = current; if (!aStream->mAudioOutputs.IsEmpty()) { desiredEnd = NS_MAX(desiredEnd, current + MillisecondsToMediaTime(AUDIO_TARGET_MS)); } if (!aStream->mVideoOutputs.IsEmpty()) { desiredEnd = NS_MAX(desiredEnd, current + MillisecondsToMediaTime(VIDEO_TARGET_MS)); } return desiredEnd; } bool MediaStreamGraphImpl::IsAlwaysExplicitlyBlocked(MediaStream* aStream) { GraphTime t = mCurrentTime; while (true) { GraphTime end; if (aStream->mExplicitBlockerCount.GetAt(t, &end) == 0) return false; if (end >= GRAPH_TIME_MAX) return true; t = end; } } void MediaStreamGraphImpl::FinishStream(MediaStream* aStream) { if (aStream->mFinished) return; LOG(PR_LOG_DEBUG, ("MediaStream %p will finish", aStream)); aStream->mFinished = true; // Force at least one more iteration of the control loop, since we rely // on UpdateCurrentTime to notify our listeners once the stream end // has been reached. EnsureNextIteration(); } void MediaStreamGraphImpl::NoteStreamAffected(MediaStream* aStream, double aTime) { NS_ASSERTION(aTime >= 0, "Bad time"); GraphTime t = NS_MAX(GetEarliestActionTime(), StreamTimeToGraphTime(aStream, SecondsToMediaTime(aTime), INCLUDE_TRAILING_BLOCKED_INTERVAL)); aStream->mMessageAffectedTime = NS_MIN(aStream->mMessageAffectedTime, t); } void MediaStreamGraphImpl::NoteStreamAffected(MediaStream* aStream) { GraphTime t = GetEarliestActionTime(); aStream->mMessageAffectedTime = NS_MIN(aStream->mMessageAffectedTime, t); } void ControlMessage::UpdateAffectedStream() { NS_ASSERTION(mStream, "Must have stream for default UpdateAffectedStream"); mStream->GraphImpl()->NoteStreamAffected(mStream); } void MediaStreamGraphImpl::AddStream(MediaStream* aStream) { aStream->mBufferStartTime = mCurrentTime; aStream->mMessageAffectedTime = GetEarliestActionTime(); *mStreams.AppendElement() = already_AddRefed(aStream); LOG(PR_LOG_DEBUG, ("Adding media stream %p to the graph", aStream)); } void MediaStreamGraphImpl::RemoveStream(MediaStream* aStream) { // Remove references in mStreamUpdates before we allow aStream to die. // Pending updates are not needed (since the main thread has already given // up the stream) so we will just drop them. { MonitorAutoLock lock(mMonitor); for (PRUint32 i = 0; i < mStreamUpdates.Length(); ++i) { if (mStreamUpdates[i].mStream == aStream) { mStreamUpdates[i].mStream = nsnull; } } } // This unrefs the stream, probably destroying it mStreams.RemoveElement(aStream); LOG(PR_LOG_DEBUG, ("Removing media stream %p from the graph", aStream)); } void MediaStreamGraphImpl::ChooseActionTime() { mLastActionTime = GetEarliestActionTime(); } void MediaStreamGraphImpl::ExtractPendingInput(SourceMediaStream* aStream) { bool finished; { MutexAutoLock lock(aStream->mMutex); finished = aStream->mUpdateFinished; for (PRInt32 i = aStream->mUpdateTracks.Length() - 1; i >= 0; --i) { SourceMediaStream::TrackData* data = &aStream->mUpdateTracks[i]; for (PRUint32 j = 0; j < aStream->mListeners.Length(); ++j) { MediaStreamListener* l = aStream->mListeners[j]; TrackTicks offset = (data->mCommands & SourceMediaStream::TRACK_CREATE) ? data->mStart : aStream->mBuffer.FindTrack(data->mID)->GetSegment()->GetDuration(); l->NotifyQueuedTrackChanges(this, data->mID, data->mRate, offset, data->mCommands, *data->mData); } if (data->mCommands & SourceMediaStream::TRACK_CREATE) { MediaSegment* segment = data->mData.forget(); LOG(PR_LOG_DEBUG, ("SourceMediaStream %p creating track %d, rate %d, start %lld, initial end %lld", aStream, data->mID, data->mRate, PRInt64(data->mStart), PRInt64(segment->GetDuration()))); aStream->mBuffer.AddTrack(data->mID, data->mRate, data->mStart, segment); // The track has taken ownership of data->mData, so let's replace // data->mData with an empty clone. data->mData = segment->CreateEmptyClone(); data->mCommands &= ~SourceMediaStream::TRACK_CREATE; } else if (data->mData->GetDuration() > 0) { MediaSegment* dest = aStream->mBuffer.FindTrack(data->mID)->GetSegment(); LOG(PR_LOG_DEBUG, ("SourceMediaStream %p track %d, advancing end from %lld to %lld", aStream, data->mID, PRInt64(dest->GetDuration()), PRInt64(dest->GetDuration() + data->mData->GetDuration()))); dest->AppendFrom(data->mData); } if (data->mCommands & SourceMediaStream::TRACK_END) { aStream->mBuffer.FindTrack(data->mID)->SetEnded(); aStream->mUpdateTracks.RemoveElementAt(i); } } aStream->mBuffer.AdvanceKnownTracksTime(aStream->mUpdateKnownTracksTime); } if (finished) { FinishStream(aStream); } } void MediaStreamGraphImpl::UpdateBufferSufficiencyState(SourceMediaStream* aStream) { StreamTime desiredEnd = GetDesiredBufferEnd(aStream); nsTArray runnables; { MutexAutoLock lock(aStream->mMutex); for (PRUint32 i = 0; i < aStream->mUpdateTracks.Length(); ++i) { SourceMediaStream::TrackData* data = &aStream->mUpdateTracks[i]; if (data->mCommands & SourceMediaStream::TRACK_CREATE) { continue; } StreamBuffer::Track* track = aStream->mBuffer.FindTrack(data->mID); data->mHaveEnough = track->GetEndTimeRoundDown() >= desiredEnd; if (!data->mHaveEnough) { runnables.MoveElementsFrom(data->mDispatchWhenNotEnough); } } } for (PRUint32 i = 0; i < runnables.Length(); ++i) { runnables[i].mThread->Dispatch(runnables[i].mRunnable, 0); } } StreamTime MediaStreamGraphImpl::GraphTimeToStreamTime(MediaStream* aStream, GraphTime aTime) { NS_ASSERTION(aTime <= mBlockingDecisionsMadeUntilTime, "Don't ask about times where we haven't made blocking decisions yet"); if (aTime <= mCurrentTime) { return NS_MAX(0, aTime - aStream->mBufferStartTime); } GraphTime t = mCurrentTime; StreamTime s = t - aStream->mBufferStartTime; while (t < aTime) { GraphTime end; if (!aStream->mBlocked.GetAt(t, &end)) { s += NS_MIN(aTime, end) - t; } t = end; } return NS_MAX(0, s); } GraphTime MediaStreamGraphImpl::StreamTimeToGraphTime(MediaStream* aStream, StreamTime aTime, PRUint32 aFlags) { if (aTime >= STREAM_TIME_MAX) { return GRAPH_TIME_MAX; } MediaTime bufferElapsedToCurrentTime = mCurrentTime - aStream->mBufferStartTime; if (aTime < bufferElapsedToCurrentTime || (aTime == bufferElapsedToCurrentTime && !(aFlags & INCLUDE_TRAILING_BLOCKED_INTERVAL))) { return aTime + aStream->mBufferStartTime; } MediaTime streamAmount = aTime - bufferElapsedToCurrentTime; NS_ASSERTION(streamAmount >= 0, "Can't answer queries before current time"); GraphTime t = mCurrentTime; while (t < GRAPH_TIME_MAX) { bool blocked; GraphTime end; if (t < mBlockingDecisionsMadeUntilTime) { blocked = aStream->mBlocked.GetAt(t, &end); end = NS_MIN(end, mBlockingDecisionsMadeUntilTime); } else { blocked = false; end = GRAPH_TIME_MAX; } if (blocked) { t = end; } else { if (streamAmount == 0) { // No more stream time to consume at time t, so we're done. break; } MediaTime consume = NS_MIN(end - t, streamAmount); streamAmount -= consume; t += consume; } } return t; } GraphTime MediaStreamGraphImpl::GetAudioPosition(MediaStream* aStream) { if (!aStream->mAudioOutput) { return mCurrentTime; } return aStream->mAudioPlaybackStartTime + TicksToTimeRoundDown(aStream->mAudioOutput->GetRate(), aStream->mAudioOutput->GetPositionInFrames()); } void MediaStreamGraphImpl::UpdateCurrentTime() { GraphTime prevCurrentTime = mCurrentTime; TimeStamp now = TimeStamp::Now(); // The earliest buffer end time for streams that haven't finished. We can't // advance the current time past this point. GraphTime minBufferEndTime = GRAPH_TIME_MAX; for (PRUint32 i = 0; i < mStreams.Length(); ++i) { MediaStream* stream = mStreams[i]; GraphTime blockedBufferEndTime = StreamTimeToGraphTime(stream, stream->GetBufferEnd(), INCLUDE_TRAILING_BLOCKED_INTERVAL); if (stream->mAudioOutput && (!stream->mFinished || mBlockingDecisionsMadeUntilTime <= blockedBufferEndTime)) { // XXX We should take audio positions into account when determining how // far to advance the current time. Basically the current time should // track the average or minimum of the audio positions. We don't do this // currently since the audio positions aren't accurate enough. This // logging code is helpful to track the accuracy of audio positions. GraphTime audioPosition = GetAudioPosition(stream); LOG(PR_LOG_DEBUG, ("Audio position for stream %p is %f", stream, MediaTimeToSeconds(audioPosition))); } if (!stream->mFinished) { minBufferEndTime = NS_MIN(minBufferEndTime, blockedBufferEndTime); } } NS_ASSERTION(mCurrentTime <= minBufferEndTime, "We shouldn't have already advanced beyond buffer end!"); GraphTime nextCurrentTime = SecondsToMediaTime((now - mCurrentTimeStamp).ToSeconds()) + mCurrentTime; if (minBufferEndTime < nextCurrentTime) { LOG(PR_LOG_WARNING, ("Reducing current time to minimum buffer end")); nextCurrentTime = minBufferEndTime; } mCurrentTimeStamp = now; mBlockingDecisionsMadeUntilTime = NS_MAX(nextCurrentTime, mBlockingDecisionsMadeUntilTime); LOG(PR_LOG_DEBUG, ("Updating current time to %f (minBufferEndTime %f, real %f, mBlockingDecisionsMadeUntilTime %f)", MediaTimeToSeconds(nextCurrentTime), MediaTimeToSeconds(minBufferEndTime), (now - mInitialTimeStamp).ToSeconds(), MediaTimeToSeconds(mBlockingDecisionsMadeUntilTime))); if (prevCurrentTime >= nextCurrentTime) { NS_ASSERTION(prevCurrentTime == nextCurrentTime, "Time can't go backwards!"); // This could happen due to low clock resolution, maybe? LOG(PR_LOG_DEBUG, ("Time did not advance")); return; } for (PRUint32 i = 0; i < mStreams.Length(); ++i) { MediaStream* stream = mStreams[i]; // Calculate blocked time and fire Blocked/Unblocked events GraphTime blockedTime = 0; GraphTime t = prevCurrentTime; // Save current blocked status bool wasBlocked = stream->mBlocked.GetAt(prevCurrentTime); while (t < nextCurrentTime) { GraphTime end; bool blocked = stream->mBlocked.GetAt(t, &end); if (blocked) { blockedTime += NS_MIN(end, nextCurrentTime) - t; } if (blocked != wasBlocked) { for (PRUint32 j = 0; j < stream->mListeners.Length(); ++j) { MediaStreamListener* l = stream->mListeners[j]; l->NotifyBlockingChanged(this, blocked ? MediaStreamListener::BLOCKED : MediaStreamListener::UNBLOCKED); } wasBlocked = blocked; } t = end; } stream->AdvanceTimeVaryingValuesToCurrentTime(nextCurrentTime, blockedTime); // Advance mBlocked last so that implementations of // AdvanceTimeVaryingValuesToCurrentTime can rely on the value of mBlocked. stream->mBlocked.AdvanceCurrentTime(nextCurrentTime); if (blockedTime < nextCurrentTime - mCurrentTime) { for (PRUint32 i = 0; i < stream->mListeners.Length(); ++i) { MediaStreamListener* l = stream->mListeners[i]; l->NotifyOutput(this); } } if (stream->mFinished && !stream->mNotifiedFinished && stream->mBufferStartTime + stream->GetBufferEnd() <= nextCurrentTime) { stream->mNotifiedFinished = true; for (PRUint32 j = 0; j < stream->mListeners.Length(); ++j) { MediaStreamListener* l = stream->mListeners[j]; l->NotifyFinished(this); } } LOG(PR_LOG_DEBUG, ("MediaStream %p bufferStartTime=%f blockedTime=%f", stream, MediaTimeToSeconds(stream->mBufferStartTime), MediaTimeToSeconds(blockedTime))); } mCurrentTime = nextCurrentTime; } void MediaStreamGraphImpl::MarkStreamBlocked(MediaStream* aStream, GraphTime aTime, GraphTime* aEnd) { NS_ASSERTION(!aStream->mBlocked.GetAt(aTime), "MediaStream already blocked"); aStream->mBlocked.SetAtAndAfter(aTime, true); } bool MediaStreamGraphImpl::WillUnderrun(MediaStream* aStream, GraphTime aTime, GraphTime aEndBlockingDecisions, GraphTime* aEnd) { // Finished streams, or streams that aren't being played back, can't underrun. if (aStream->mFinished || (aStream->mAudioOutputs.IsEmpty() && aStream->mVideoOutputs.IsEmpty())) { return false; } GraphTime bufferEnd = StreamTimeToGraphTime(aStream, aStream->GetBufferEnd(), INCLUDE_TRAILING_BLOCKED_INTERVAL); NS_ASSERTION(bufferEnd >= mCurrentTime, "Buffer underran"); // We should block after bufferEnd. if (bufferEnd <= aTime) { LOG(PR_LOG_DEBUG, ("MediaStream %p will block due to data underrun, " "bufferEnd %f", aStream, MediaTimeToSeconds(bufferEnd))); return true; } // We should keep blocking if we're currently blocked and we don't have // data all the way through to aEndBlockingDecisions. If we don't have // data all the way through to aEndBlockingDecisions, we'll block soon, // but we might as well remain unblocked and play the data we've got while // we can. if (bufferEnd <= aEndBlockingDecisions && aStream->mBlocked.GetBefore(aTime)) { LOG(PR_LOG_DEBUG, ("MediaStream %p will block due to speculative data underrun, " "bufferEnd %f", aStream, MediaTimeToSeconds(bufferEnd))); return true; } // Reconsider decisions at bufferEnd *aEnd = NS_MIN(*aEnd, bufferEnd); return false; } void MediaStreamGraphImpl::RecomputeBlocking() { PRInt32 writeAudioUpTo = AUDIO_TARGET_MS; GraphTime endBlockingDecisions = mCurrentTime + MillisecondsToMediaTime(writeAudioUpTo); bool blockingDecisionsWillChange = false; // mBlockingDecisionsMadeUntilTime has been set in UpdateCurrentTime while (mBlockingDecisionsMadeUntilTime < endBlockingDecisions) { LOG(PR_LOG_DEBUG, ("Media graph %p computing blocking for time %f", this, MediaTimeToSeconds(mBlockingDecisionsMadeUntilTime))); GraphTime end = GRAPH_TIME_MAX; RecomputeBlockingAt(mBlockingDecisionsMadeUntilTime, endBlockingDecisions, &end); LOG(PR_LOG_DEBUG, ("Media graph %p computed blocking for interval %f to %f", this, MediaTimeToSeconds(mBlockingDecisionsMadeUntilTime), MediaTimeToSeconds(end))); mBlockingDecisionsMadeUntilTime = end; if (end < GRAPH_TIME_MAX) { blockingDecisionsWillChange = true; } } mBlockingDecisionsMadeUntilTime = endBlockingDecisions; for (PRUint32 i = 0; i < mStreams.Length(); ++i) { MediaStream* stream = mStreams[i]; GraphTime end; stream->mBlocked.GetAt(mCurrentTime, &end); if (end < GRAPH_TIME_MAX) { blockingDecisionsWillChange = true; } } if (blockingDecisionsWillChange) { // Make sure we wake up to notify listeners about these changes. EnsureNextIteration(); } } void MediaStreamGraphImpl::RecomputeBlockingAt(GraphTime aTime, GraphTime aEndBlockingDecisions, GraphTime* aEnd) { for (PRUint32 i = 0; i < mStreams.Length(); ++i) { MediaStream* stream = mStreams[i]; stream->mBlocked.SetAtAndAfter(aTime, false); } for (PRUint32 i = 0; i < mStreams.Length(); ++i) { MediaStream* stream = mStreams[i]; // Stream might be blocked by some other stream (due to processing // constraints) if (stream->mBlocked.GetAt(aTime)) { continue; } if (stream->mFinished) { GraphTime endTime = StreamTimeToGraphTime(stream, stream->GetBufferEnd()); if (endTime <= aTime) { LOG(PR_LOG_DEBUG, ("MediaStream %p is blocked due to being finished", stream)); MarkStreamBlocked(stream, aTime, aEnd); continue; } else { LOG(PR_LOG_DEBUG, ("MediaStream %p is finished, but not blocked yet (end at %f, with blocking at %f)", stream, MediaTimeToSeconds(stream->GetBufferEnd()), MediaTimeToSeconds(endTime))); *aEnd = NS_MIN(*aEnd, endTime); } } // We don't need to explicitly check for cycles; streams in a cycle will // just never be able to produce data, and WillUnderrun will trigger. GraphTime end; bool explicitBlock = stream->mExplicitBlockerCount.GetAt(aTime, &end) > 0; *aEnd = NS_MIN(*aEnd, end); if (explicitBlock) { LOG(PR_LOG_DEBUG, ("MediaStream %p is blocked due to explicit blocker", stream)); MarkStreamBlocked(stream, aTime, aEnd); continue; } bool underrun = WillUnderrun(stream, aTime, aEndBlockingDecisions, aEnd); if (underrun) { MarkStreamBlocked(stream, aTime, aEnd); continue; } if (stream->mAudioOutputs.IsEmpty() && stream->mVideoOutputs.IsEmpty()) { // See if the stream is being consumed anywhere. If not, it should block. LOG(PR_LOG_DEBUG, ("MediaStream %p is blocked due to having no consumers", stream)); MarkStreamBlocked(stream, aTime, aEnd); continue; } } NS_ASSERTION(*aEnd > aTime, "Failed to advance!"); } void MediaStreamGraphImpl::UpdateFirstActiveTracks(MediaStream* aStream) { StreamBuffer::Track* newTracksByType[MediaSegment::TYPE_COUNT]; for (PRUint32 i = 0; i < ArrayLength(newTracksByType); ++i) { newTracksByType[i] = nsnull; } for (StreamBuffer::TrackIter iter(aStream->mBuffer); !iter.IsEnded(); iter.Next()) { MediaSegment::Type type = iter->GetType(); if ((newTracksByType[type] && iter->GetStartTimeRoundDown() < newTracksByType[type]->GetStartTimeRoundDown()) || aStream->mFirstActiveTracks[type] == TRACK_NONE) { newTracksByType[type] = &(*iter); aStream->mFirstActiveTracks[type] = iter->GetID(); } } } void MediaStreamGraphImpl::CreateOrDestroyAudioStream(GraphTime aAudioOutputStartTime, MediaStream* aStream) { StreamBuffer::Track* track; if (aStream->mAudioOutputs.IsEmpty() || !(track = aStream->mBuffer.FindTrack(aStream->mFirstActiveTracks[MediaSegment::AUDIO]))) { if (aStream->mAudioOutput) { aStream->mAudioOutput->Shutdown(); aStream->mAudioOutput = nsnull; } return; } if (aStream->mAudioOutput) return; // No output stream created yet. Check if it's time to create one. GraphTime startTime = StreamTimeToGraphTime(aStream, track->GetStartTimeRoundDown(), INCLUDE_TRAILING_BLOCKED_INTERVAL); if (startTime >= mBlockingDecisionsMadeUntilTime) { // The stream wants to play audio, but nothing will play for the forseeable // future, so don't create the stream. return; } // Don't bother destroying the nsAudioStream for ended tracks yet. // XXX allocating a nsAudioStream could be slow so we're going to have to do // something here ... preallocation, async allocation, multiplexing onto a single // stream ... AudioSegment* audio = track->Get(); aStream->mAudioPlaybackStartTime = aAudioOutputStartTime; aStream->mAudioOutput = nsAudioStream::AllocateStream(); aStream->mAudioOutput->Init(audio->GetChannels(), track->GetRate(), audio->GetFirstFrameFormat()); } void MediaStreamGraphImpl::PlayAudio(MediaStream* aStream, GraphTime aFrom, GraphTime aTo) { if (!aStream->mAudioOutput) return; StreamBuffer::Track* track = aStream->mBuffer.FindTrack(aStream->mFirstActiveTracks[MediaSegment::AUDIO]); AudioSegment* audio = track->Get(); // When we're playing multiple copies of this stream at the same time, they're // perfectly correlated so adding volumes is the right thing to do. float volume = 0.0f; for (PRUint32 i = 0; i < aStream->mAudioOutputs.Length(); ++i) { volume += aStream->mAudioOutputs[i].mVolume; } // We don't update aStream->mBufferStartTime here to account for // time spent blocked. Instead, we'll update it in UpdateCurrentTime after the // blocked period has completed. But we do need to make sure we play from the // right offsets in the stream buffer, even if we've already written silence for // some amount of blocked time after the current time. GraphTime t = aFrom; while (t < aTo) { GraphTime end; bool blocked = aStream->mBlocked.GetAt(t, &end); end = NS_MIN(end, aTo); AudioSegment output; if (blocked) { // Track total blocked time in aStream->mBlockedAudioTime so that // the amount of silent samples we've inserted for blocking never gets // more than one sample away from the ideal amount. TrackTicks startTicks = TimeToTicksRoundDown(track->GetRate(), aStream->mBlockedAudioTime); aStream->mBlockedAudioTime += end - t; TrackTicks endTicks = TimeToTicksRoundDown(track->GetRate(), aStream->mBlockedAudioTime); output.InitFrom(*audio); output.InsertNullDataAtStart(endTicks - startTicks); LOG(PR_LOG_DEBUG, ("MediaStream %p writing blocking-silence samples for %f to %f", aStream, MediaTimeToSeconds(t), MediaTimeToSeconds(end))); } else { TrackTicks startTicks = track->TimeToTicksRoundDown(GraphTimeToStreamTime(aStream, t)); TrackTicks endTicks = track->TimeToTicksRoundDown(GraphTimeToStreamTime(aStream, end)); output.SliceFrom(*audio, startTicks, endTicks); output.ApplyVolume(volume); LOG(PR_LOG_DEBUG, ("MediaStream %p writing samples for %f to %f (samples %lld to %lld)", aStream, MediaTimeToSeconds(t), MediaTimeToSeconds(end), startTicks, endTicks)); } output.WriteTo(aStream->mAudioOutput); t = end; } } void MediaStreamGraphImpl::PlayVideo(MediaStream* aStream) { if (aStream->mVideoOutputs.IsEmpty()) return; StreamBuffer::Track* track = aStream->mBuffer.FindTrack(aStream->mFirstActiveTracks[MediaSegment::VIDEO]); if (!track) return; VideoSegment* video = track->Get(); // Display the next frame a bit early. This is better than letting the current // frame be displayed for too long. GraphTime framePosition = mCurrentTime + MEDIA_GRAPH_TARGET_PERIOD_MS; NS_ASSERTION(framePosition >= aStream->mBufferStartTime, "frame position before buffer?"); StreamTime frameBufferTime = GraphTimeToStreamTime(aStream, framePosition); TrackTicks start; const VideoFrame* frame = video->GetFrameAt(track->TimeToTicksRoundDown(frameBufferTime), &start); if (!frame) { frame = video->GetLastFrame(&start); if (!frame) return; } if (*frame != aStream->mLastPlayedVideoFrame) { LOG(PR_LOG_DEBUG, ("MediaStream %p writing video frame %p (%dx%d)", aStream, frame->GetImage(), frame->GetIntrinsicSize().width, frame->GetIntrinsicSize().height)); GraphTime startTime = StreamTimeToGraphTime(aStream, track->TicksToTimeRoundDown(start), INCLUDE_TRAILING_BLOCKED_INTERVAL); TimeStamp targetTime = mCurrentTimeStamp + TimeDuration::FromMilliseconds(double(startTime - mCurrentTime)); for (PRUint32 i = 0; i < aStream->mVideoOutputs.Length(); ++i) { VideoFrameContainer* output = aStream->mVideoOutputs[i]; output->SetCurrentFrame(frame->GetIntrinsicSize(), frame->GetImage(), targetTime); nsCOMPtr event = NS_NewRunnableMethod(output, &VideoFrameContainer::Invalidate); NS_DispatchToMainThread(event, NS_DISPATCH_NORMAL); } aStream->mLastPlayedVideoFrame = *frame; } } void MediaStreamGraphImpl::PrepareUpdatesToMainThreadState() { mMonitor.AssertCurrentThreadOwns(); for (PRUint32 i = 0; i < mStreams.Length(); ++i) { MediaStream* stream = mStreams[i]; StreamUpdate* update = mStreamUpdates.AppendElement(); update->mGraphUpdateIndex = stream->mGraphUpdateIndices.GetAt(mCurrentTime); update->mStream = stream; update->mNextMainThreadCurrentTime = GraphTimeToStreamTime(stream, mCurrentTime); update->mNextMainThreadFinished = stream->mFinished && StreamTimeToGraphTime(stream, stream->GetBufferEnd()) <= mCurrentTime; } mUpdateRunnables.MoveElementsFrom(mPendingUpdateRunnables); EnsureStableStateEventPosted(); } void MediaStreamGraphImpl::EnsureImmediateWakeUpLocked(MonitorAutoLock& aLock) { if (mWaitState == WAITSTATE_WAITING_FOR_NEXT_ITERATION || mWaitState == WAITSTATE_WAITING_INDEFINITELY) { mWaitState = WAITSTATE_WAKING_UP; aLock.Notify(); } } void MediaStreamGraphImpl::EnsureNextIteration() { MonitorAutoLock lock(mMonitor); EnsureNextIterationLocked(lock); } void MediaStreamGraphImpl::EnsureNextIterationLocked(MonitorAutoLock& aLock) { if (mNeedAnotherIteration) return; mNeedAnotherIteration = true; if (mWaitState == WAITSTATE_WAITING_INDEFINITELY) { mWaitState = WAITSTATE_WAKING_UP; aLock.Notify(); } } void MediaStreamGraphImpl::RunThread() { nsTArray messageQueue; { MonitorAutoLock lock(mMonitor); messageQueue.SwapElements(mMessageQueue); } NS_ASSERTION(!messageQueue.IsEmpty(), "Shouldn't have started a graph with empty message queue!"); for (;;) { // Update mCurrentTime to the min of the playing audio times, or using the // wall-clock time change if no audio is playing. UpdateCurrentTime(); // Calculate independent action times for each batch of messages (each // batch corresponding to an event loop task). This isolates the performance // of different scripts to some extent. for (PRUint32 i = 0; i < messageQueue.Length(); ++i) { mProcessingGraphUpdateIndex = messageQueue[i].mGraphUpdateIndex; nsTArray >& messages = messageQueue[i].mMessages; for (PRUint32 j = 0; j < mStreams.Length(); ++j) { mStreams[j]->mMessageAffectedTime = GRAPH_TIME_MAX; } for (PRUint32 j = 0; j < messages.Length(); ++j) { messages[j]->UpdateAffectedStream(); } ChooseActionTime(); for (PRUint32 j = 0; j < messages.Length(); ++j) { messages[j]->Process(); } } messageQueue.Clear(); // Grab pending ProcessingEngine results. for (PRUint32 i = 0; i < mStreams.Length(); ++i) { SourceMediaStream* is = mStreams[i]->AsSourceStream(); if (is) { ExtractPendingInput(is); } } GraphTime prevBlockingDecisionsMadeUntilTime = mBlockingDecisionsMadeUntilTime; RecomputeBlocking(); PRUint32 audioStreamsActive = 0; bool allBlockedForever = true; // Figure out what each stream wants to do for (PRUint32 i = 0; i < mStreams.Length(); ++i) { MediaStream* stream = mStreams[i]; UpdateFirstActiveTracks(stream); CreateOrDestroyAudioStream(prevBlockingDecisionsMadeUntilTime, stream); PlayAudio(stream, prevBlockingDecisionsMadeUntilTime, mBlockingDecisionsMadeUntilTime); if (stream->mAudioOutput) { ++audioStreamsActive; } PlayVideo(stream); SourceMediaStream* is = stream->AsSourceStream(); if (is) { UpdateBufferSufficiencyState(is); } GraphTime end; if (!stream->mBlocked.GetAt(mCurrentTime, &end) || end < GRAPH_TIME_MAX) { allBlockedForever = false; } } if (!allBlockedForever || audioStreamsActive > 0) { EnsureNextIteration(); } { MonitorAutoLock lock(mMonitor); PrepareUpdatesToMainThreadState(); if (mForceShutDown || (IsEmpty() && mMessageQueue.IsEmpty())) { // Enter shutdown mode. The stable-state handler will detect this // and complete shutdown. Destroy any streams immediately. for (PRUint32 i = 0; i < mStreams.Length(); ++i) { mStreams[i]->DestroyImpl(); } LOG(PR_LOG_DEBUG, ("MediaStreamGraph %p waiting for main thread cleanup", this)); mLifecycleState = LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP; return; } PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT; TimeStamp now = TimeStamp::Now(); if (mNeedAnotherIteration) { PRInt64 timeoutMS = MEDIA_GRAPH_TARGET_PERIOD_MS - PRInt64((now - mCurrentTimeStamp).ToMilliseconds()); // Make sure timeoutMS doesn't overflow 32 bits by waking up at // least once a minute, if we need to wake up at all timeoutMS = NS_MAX(0, NS_MIN(timeoutMS, 60*1000)); timeout = PR_MillisecondsToInterval(PRUint32(timeoutMS)); LOG(PR_LOG_DEBUG, ("Waiting for next iteration; at %f, timeout=%f", (now - mInitialTimeStamp).ToSeconds(), timeoutMS/1000.0)); mWaitState = WAITSTATE_WAITING_FOR_NEXT_ITERATION; } else { mWaitState = WAITSTATE_WAITING_INDEFINITELY; } if (timeout > 0) { lock.Wait(timeout); LOG(PR_LOG_DEBUG, ("Resuming after timeout; at %f, elapsed=%f", (TimeStamp::Now() - mInitialTimeStamp).ToSeconds(), (TimeStamp::Now() - now).ToSeconds())); } mWaitState = WAITSTATE_RUNNING; mNeedAnotherIteration = false; messageQueue.SwapElements(mMessageQueue); } } } void MediaStreamGraphImpl::ApplyStreamUpdate(StreamUpdate* aUpdate) { mMonitor.AssertCurrentThreadOwns(); MediaStream* stream = aUpdate->mStream; if (!stream) return; stream->mMainThreadCurrentTime = aUpdate->mNextMainThreadCurrentTime; stream->mMainThreadFinished = aUpdate->mNextMainThreadFinished; } void MediaStreamGraphImpl::ShutdownThreads() { NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread"); // mGraph's thread is not running so it's OK to do whatever here LOG(PR_LOG_DEBUG, ("Stopping threads for MediaStreamGraph %p", this)); if (mThread) { mThread->Shutdown(); mThread = nsnull; } } void MediaStreamGraphImpl::ForceShutDown() { NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread"); LOG(PR_LOG_DEBUG, ("MediaStreamGraph %p ForceShutdown", this)); { MonitorAutoLock lock(mMonitor); mForceShutDown = true; EnsureImmediateWakeUpLocked(lock); } } namespace { class MediaStreamGraphThreadRunnable : public nsRunnable { public: NS_IMETHOD Run() { gGraph->RunThread(); return NS_OK; } }; class MediaStreamGraphShutDownRunnable : public nsRunnable { public: MediaStreamGraphShutDownRunnable(MediaStreamGraphImpl* aGraph) : mGraph(aGraph) {} NS_IMETHOD Run() { NS_ASSERTION(mGraph->mDetectedNotRunning, "We should know the graph thread control loop isn't running!"); // mGraph's thread is not running so it's OK to do whatever here if (mGraph->IsEmpty()) { // mGraph is no longer needed, so delete it. If the graph is not empty // then we must be in a forced shutdown and some later AppendMessage will // detect that the manager has been emptied, and delete it. delete mGraph; } else { NS_ASSERTION(mGraph->mForceShutDown, "Not in forced shutdown?"); mGraph->mLifecycleState = MediaStreamGraphImpl::LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION; } return NS_OK; } private: MediaStreamGraphImpl* mGraph; }; class MediaStreamGraphStableStateRunnable : public nsRunnable { public: NS_IMETHOD Run() { if (gGraph) { gGraph->RunInStableState(); } return NS_OK; } }; /* * Control messages forwarded from main thread to graph manager thread */ class CreateMessage : public ControlMessage { public: CreateMessage(MediaStream* aStream) : ControlMessage(aStream) {} virtual void UpdateAffectedStream() { mStream->GraphImpl()->AddStream(mStream); } virtual void Process() { mStream->Init(); } }; class MediaStreamGraphShutdownObserver : public nsIObserver { public: NS_DECL_ISUPPORTS NS_DECL_NSIOBSERVER }; } void MediaStreamGraphImpl::RunInStableState() { NS_ASSERTION(NS_IsMainThread(), "Must be called on main thread"); nsTArray > runnables; { MonitorAutoLock lock(mMonitor); mPostedRunInStableStateEvent = false; runnables.SwapElements(mUpdateRunnables); for (PRUint32 i = 0; i < mStreamUpdates.Length(); ++i) { StreamUpdate* update = &mStreamUpdates[i]; if (update->mStream) { ApplyStreamUpdate(update); } } mStreamUpdates.Clear(); if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP && mForceShutDown) { for (PRUint32 i = 0; i < mMessageQueue.Length(); ++i) { MessageBlock& mb = mMessageQueue[i]; for (PRUint32 j = 0; j < mb.mMessages.Length(); ++j) { mb.mMessages[j]->ProcessDuringShutdown(); } } mMessageQueue.Clear(); for (PRUint32 i = 0; i < mCurrentTaskMessageQueue.Length(); ++i) { mCurrentTaskMessageQueue[i]->ProcessDuringShutdown(); } mCurrentTaskMessageQueue.Clear(); // Stop MediaStreamGraph threads. Do not clear gGraph since // we have outstanding DOM objects that may need it. mLifecycleState = LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN; nsCOMPtr event = new MediaStreamGraphShutDownRunnable(this); NS_DispatchToMainThread(event); } if (mLifecycleState == LIFECYCLE_THREAD_NOT_STARTED) { mLifecycleState = LIFECYCLE_RUNNING; // Start the thread now. We couldn't start it earlier because // the graph might exit immediately on finding it has no streams. The // first message for a new graph must create a stream. nsCOMPtr event = new MediaStreamGraphThreadRunnable(); NS_NewThread(getter_AddRefs(mThread), event); } if (mCurrentTaskMessageQueue.IsEmpty()) { if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP && IsEmpty()) { NS_ASSERTION(gGraph == this, "Not current graph??"); // Complete shutdown. First, ensure that this graph is no longer used. // A new graph graph will be created if one is needed. LOG(PR_LOG_DEBUG, ("Disconnecting MediaStreamGraph %p", gGraph)); gGraph = nsnull; // Asynchronously clean up old graph. We don't want to do this // synchronously because it spins the event loop waiting for threads // to shut down, and we don't want to do that in a stable state handler. mLifecycleState = LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN; nsCOMPtr event = new MediaStreamGraphShutDownRunnable(this); NS_DispatchToMainThread(event); } } else { if (mLifecycleState <= LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) { MessageBlock* block = mMessageQueue.AppendElement(); block->mMessages.SwapElements(mCurrentTaskMessageQueue); block->mGraphUpdateIndex = mGraphUpdatesSent; ++mGraphUpdatesSent; EnsureNextIterationLocked(lock); } if (mLifecycleState == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) { mLifecycleState = LIFECYCLE_RUNNING; // Revive the MediaStreamGraph since we have more messages going to it. // Note that we need to put messages into its queue before reviving it, // or it might exit immediately. nsCOMPtr event = new MediaStreamGraphThreadRunnable(); mThread->Dispatch(event, 0); } } mDetectedNotRunning = mLifecycleState > LIFECYCLE_RUNNING; } // Make sure we get a new current time in the next event loop task mPostedRunInStableState = false; for (PRUint32 i = 0; i < runnables.Length(); ++i) { runnables[i]->Run(); } } static NS_DEFINE_CID(kAppShellCID, NS_APPSHELL_CID); void MediaStreamGraphImpl::EnsureRunInStableState() { NS_ASSERTION(NS_IsMainThread(), "main thread only"); if (mPostedRunInStableState) return; mPostedRunInStableState = true; nsCOMPtr event = new MediaStreamGraphStableStateRunnable(); nsCOMPtr appShell = do_GetService(kAppShellCID); if (appShell) { appShell->RunInStableState(event); } else { NS_ERROR("Appshell already destroyed?"); } } void MediaStreamGraphImpl::EnsureStableStateEventPosted() { mMonitor.AssertCurrentThreadOwns(); if (mPostedRunInStableStateEvent) return; mPostedRunInStableStateEvent = true; nsCOMPtr event = new MediaStreamGraphStableStateRunnable(); NS_DispatchToMainThread(event); } void MediaStreamGraphImpl::AppendMessage(ControlMessage* aMessage) { NS_ASSERTION(NS_IsMainThread(), "main thread only"); if (mDetectedNotRunning && mLifecycleState > LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) { // The graph control loop is not running and main thread cleanup has // happened. From now on we can't append messages to mCurrentTaskMessageQueue, // because that will never be processed again, so just ProcessDuringShutdown // this message. // This should only happen during forced shutdown. aMessage->ProcessDuringShutdown(); delete aMessage; if (IsEmpty()) { NS_ASSERTION(gGraph == this, "Switched managers during forced shutdown?"); gGraph = nsnull; delete this; } return; } mCurrentTaskMessageQueue.AppendElement(aMessage); EnsureRunInStableState(); } void MediaStream::Init() { MediaStreamGraphImpl* graph = GraphImpl(); mBlocked.SetAtAndAfter(graph->mCurrentTime, true); mExplicitBlockerCount.SetAtAndAfter(graph->mCurrentTime, true); mExplicitBlockerCount.SetAtAndAfter(graph->mLastActionTime, false); } MediaStreamGraphImpl* MediaStream::GraphImpl() { return gGraph; } void MediaStream::DestroyImpl() { if (mAudioOutput) { mAudioOutput->Shutdown(); mAudioOutput = nsnull; } } void MediaStream::Destroy() { class Message : public ControlMessage { public: Message(MediaStream* aStream) : ControlMessage(aStream) {} virtual void UpdateAffectedStream() { mStream->DestroyImpl(); mStream->GraphImpl()->RemoveStream(mStream); } virtual void ProcessDuringShutdown() { UpdateAffectedStream(); } }; mWrapper = nsnull; GraphImpl()->AppendMessage(new Message(this)); } void MediaStream::AddAudioOutput(void* aKey) { class Message : public ControlMessage { public: Message(MediaStream* aStream, void* aKey) : ControlMessage(aStream), mKey(aKey) {} virtual void UpdateAffectedStream() { mStream->AddAudioOutputImpl(mKey); } void* mKey; }; GraphImpl()->AppendMessage(new Message(this, aKey)); } void MediaStream::SetAudioOutputVolumeImpl(void* aKey, float aVolume) { for (PRUint32 i = 0; i < mAudioOutputs.Length(); ++i) { if (mAudioOutputs[i].mKey == aKey) { mAudioOutputs[i].mVolume = aVolume; return; } } NS_ERROR("Audio output key not found"); } void MediaStream::SetAudioOutputVolume(void* aKey, float aVolume) { class Message : public ControlMessage { public: Message(MediaStream* aStream, void* aKey, float aVolume) : ControlMessage(aStream), mKey(aKey), mVolume(aVolume) {} virtual void UpdateAffectedStream() { mStream->SetAudioOutputVolumeImpl(mKey, mVolume); } void* mKey; float mVolume; }; GraphImpl()->AppendMessage(new Message(this, aKey, aVolume)); } void MediaStream::RemoveAudioOutputImpl(void* aKey) { for (PRUint32 i = 0; i < mAudioOutputs.Length(); ++i) { if (mAudioOutputs[i].mKey == aKey) { mAudioOutputs.RemoveElementAt(i); return; } } NS_ERROR("Audio output key not found"); } void MediaStream::RemoveAudioOutput(void* aKey) { class Message : public ControlMessage { public: Message(MediaStream* aStream, void* aKey) : ControlMessage(aStream), mKey(aKey) {} virtual void UpdateAffectedStream() { mStream->RemoveAudioOutputImpl(mKey); } void* mKey; }; GraphImpl()->AppendMessage(new Message(this, aKey)); } void MediaStream::AddVideoOutput(VideoFrameContainer* aContainer) { class Message : public ControlMessage { public: Message(MediaStream* aStream, VideoFrameContainer* aContainer) : ControlMessage(aStream), mContainer(aContainer) {} virtual void UpdateAffectedStream() { mStream->AddVideoOutputImpl(mContainer.forget()); } nsRefPtr mContainer; }; GraphImpl()->AppendMessage(new Message(this, aContainer)); } void MediaStream::RemoveVideoOutput(VideoFrameContainer* aContainer) { class Message : public ControlMessage { public: Message(MediaStream* aStream, VideoFrameContainer* aContainer) : ControlMessage(aStream), mContainer(aContainer) {} virtual void UpdateAffectedStream() { mStream->RemoveVideoOutputImpl(mContainer); } nsRefPtr mContainer; }; GraphImpl()->AppendMessage(new Message(this, aContainer)); } void MediaStream::ChangeExplicitBlockerCount(PRInt32 aDelta) { class Message : public ControlMessage { public: Message(MediaStream* aStream, PRInt32 aDelta) : ControlMessage(aStream), mDelta(aDelta) {} virtual void UpdateAffectedStream() { mStream->ChangeExplicitBlockerCountImpl( mStream->GraphImpl()->mLastActionTime, mDelta); } PRInt32 mDelta; }; GraphImpl()->AppendMessage(new Message(this, aDelta)); } void MediaStream::AddListener(MediaStreamListener* aListener) { class Message : public ControlMessage { public: Message(MediaStream* aStream, MediaStreamListener* aListener) : ControlMessage(aStream), mListener(aListener) {} virtual void UpdateAffectedStream() { mStream->AddListenerImpl(mListener.forget()); } nsRefPtr mListener; }; GraphImpl()->AppendMessage(new Message(this, aListener)); } void MediaStream::RemoveListener(MediaStreamListener* aListener) { class Message : public ControlMessage { public: Message(MediaStream* aStream, MediaStreamListener* aListener) : ControlMessage(aStream), mListener(aListener) {} virtual void UpdateAffectedStream() { mStream->RemoveListenerImpl(mListener); } nsRefPtr mListener; }; GraphImpl()->AppendMessage(new Message(this, aListener)); } void SourceMediaStream::AddTrack(TrackID aID, TrackRate aRate, TrackTicks aStart, MediaSegment* aSegment) { { MutexAutoLock lock(mMutex); TrackData* data = mUpdateTracks.AppendElement(); data->mID = aID; data->mRate = aRate; data->mStart = aStart; data->mCommands = TRACK_CREATE; data->mData = aSegment; data->mHaveEnough = false; } GraphImpl()->EnsureNextIteration(); } void SourceMediaStream::AppendToTrack(TrackID aID, MediaSegment* aSegment) { { MutexAutoLock lock(mMutex); FindDataForTrack(aID)->mData->AppendFrom(aSegment); } GraphImpl()->EnsureNextIteration(); } bool SourceMediaStream::HaveEnoughBuffered(TrackID aID) { MutexAutoLock lock(mMutex); return FindDataForTrack(aID)->mHaveEnough; } void SourceMediaStream::DispatchWhenNotEnoughBuffered(TrackID aID, nsIThread* aSignalThread, nsIRunnable* aSignalRunnable) { MutexAutoLock lock(mMutex); TrackData* data = FindDataForTrack(aID); if (data->mHaveEnough) { data->mDispatchWhenNotEnough.AppendElement()->Init(aSignalThread, aSignalRunnable); } else { aSignalThread->Dispatch(aSignalRunnable, 0); } } void SourceMediaStream::EndTrack(TrackID aID) { { MutexAutoLock lock(mMutex); FindDataForTrack(aID)->mCommands |= TRACK_END; } GraphImpl()->EnsureNextIteration(); } void SourceMediaStream::AdvanceKnownTracksTime(StreamTime aKnownTime) { { MutexAutoLock lock(mMutex); mUpdateKnownTracksTime = aKnownTime; } GraphImpl()->EnsureNextIteration(); } void SourceMediaStream::Finish() { { MutexAutoLock lock(mMutex); mUpdateFinished = true; } GraphImpl()->EnsureNextIteration(); } static const PRUint32 kThreadLimit = 4; static const PRUint32 kIdleThreadLimit = 4; static const PRUint32 kIdleThreadTimeoutMs = 2000; MediaStreamGraphImpl::MediaStreamGraphImpl() : mLastActionTime(1) , mCurrentTime(1) , mBlockingDecisionsMadeUntilTime(1) , mProcessingGraphUpdateIndex(0) , mMonitor("MediaStreamGraphImpl") , mLifecycleState(LIFECYCLE_THREAD_NOT_STARTED) , mWaitState(WAITSTATE_RUNNING) , mNeedAnotherIteration(false) , mForceShutDown(false) , mPostedRunInStableStateEvent(false) , mDetectedNotRunning(false) , mPostedRunInStableState(false) { #ifdef PR_LOGGING if (!gMediaStreamGraphLog) { gMediaStreamGraphLog = PR_NewLogModule("MediaStreamGraph"); } #endif mCurrentTimeStamp = mInitialTimeStamp = TimeStamp::Now(); } NS_IMPL_ISUPPORTS1(MediaStreamGraphShutdownObserver, nsIObserver) static bool gShutdownObserverRegistered = false; NS_IMETHODIMP MediaStreamGraphShutdownObserver::Observe(nsISupports *aSubject, const char *aTopic, const PRUnichar *aData) { if (strcmp(aTopic, NS_XPCOM_SHUTDOWN_OBSERVER_ID) == 0) { if (gGraph) { gGraph->ForceShutDown(); } nsContentUtils::UnregisterShutdownObserver(this); gShutdownObserverRegistered = false; } return NS_OK; } MediaStreamGraph* MediaStreamGraph::GetInstance() { NS_ASSERTION(NS_IsMainThread(), "Main thread only"); if (!gGraph) { if (!gShutdownObserverRegistered) { gShutdownObserverRegistered = true; nsContentUtils::RegisterShutdownObserver(new MediaStreamGraphShutdownObserver()); } gGraph = new MediaStreamGraphImpl(); LOG(PR_LOG_DEBUG, ("Starting up MediaStreamGraph %p", gGraph)); } return gGraph; } SourceMediaStream* MediaStreamGraph::CreateInputStream(nsDOMMediaStream* aWrapper) { SourceMediaStream* stream = new SourceMediaStream(aWrapper); NS_ADDREF(stream); static_cast(this)->AppendMessage(new CreateMessage(stream)); return stream; } }