Bug 779715. Part 1: Add basic support for ProcessedMediaStreams. r=jesup

The main thing this patch does is to add ProcessedMediaStream objects and
MediaInputPorts connecting streams. ProcessedMediaStreams are an abstract
class that doesn't constrain what processing is actually performed, except
that for now we assume a stream's processing depends only on its inputs
window of data between mCurrentTime and mStateComputedTime.
This patch reorganizes the way the blocking status of each stream is computed.
The streams are partitioned into groups so that every stream which can affect
the blocking status of another stream is in the same group as that other stream.
We also add a pass to order the streams by dependency so we can process the streams
in order of dependency; this pass also identifies the streams that form part of a
cycle.

--HG--
extra : rebase_source : c45c931a264e73f295642a934500bbeaa6448774
This commit is contained in:
Robert O'Callahan 2012-08-01 00:17:21 +12:00
parent aefb4e4803
commit d3b9aab0ce
7 changed files with 670 additions and 145 deletions

View File

@ -123,23 +123,15 @@ public:
*/
void WriteTo(nsAudioStream* aOutput);
using MediaSegmentBase<AudioSegment, AudioChunk>::AppendFrom;
void AppendFrom(AudioSegment* aSource)
{
NS_ASSERTION(aSource->mChannels == mChannels, "Non-matching channels");
MediaSegmentBase<AudioSegment, AudioChunk>::AppendFrom(aSource);
}
// Segment-generic methods not in MediaSegmentBase
void InitFrom(const AudioSegment& aOther)
{
NS_ASSERTION(mChannels == 0, "Channels already set");
mChannels = aOther.mChannels;
}
void SliceFrom(const AudioSegment& aOther, TrackTicks aStart, TrackTicks aEnd)
void CheckCompatible(const AudioSegment& aOther) const
{
InitFrom(aOther);
BaseSliceFrom(aOther, aStart, aEnd);
NS_ASSERTION(aOther.mChannels == mChannels, "Non-matching channels");
}
static Type StaticType() { return AUDIO; }

View File

@ -71,17 +71,22 @@ public:
/**
* Gets the total duration of the segment.
*/
TrackTicks GetDuration() { return mDuration; }
Type GetType() { return mType; }
TrackTicks GetDuration() const { return mDuration; }
Type GetType() const { return mType; }
/**
* Create a MediaSegment of the same type.
*/
virtual MediaSegment* CreateEmptyClone() = 0;
virtual MediaSegment* CreateEmptyClone() const = 0;
/**
* Moves contents of aSource to the end of this segment.
*/
virtual void AppendFrom(MediaSegment* aSource) = 0;
/**
* Append a slice of aSource to this segment.
*/
virtual void AppendSlice(const MediaSegment& aSource,
TrackTicks aStart, TrackTicks aEnd) = 0;
/**
* Replace all contents up to aDuration with null data.
*/
@ -90,6 +95,10 @@ public:
* Insert aDuration of null data at the start of the segment.
*/
virtual void InsertNullDataAtStart(TrackTicks aDuration) = 0;
/**
* Insert aDuration of null data at the end of the segment.
*/
virtual void AppendNullData(TrackTicks aDuration) = 0;
protected:
MediaSegment(Type aType) : mDuration(0), mType(aType)
@ -107,35 +116,35 @@ protected:
*/
template <class C, class Chunk> class MediaSegmentBase : public MediaSegment {
public:
virtual MediaSegment* CreateEmptyClone()
virtual MediaSegment* CreateEmptyClone() const
{
C* s = new C();
s->InitFrom(*static_cast<C*>(this));
s->InitFrom(*static_cast<const C*>(this));
return s;
}
/**
* Appends the contents of aSource to this segment, clearing aSource.
*/
virtual void AppendFrom(MediaSegmentBase<C, Chunk>* aSource)
{
mDuration += aSource->mDuration;
aSource->mDuration = 0;
if (!mChunks.IsEmpty() && !aSource->mChunks.IsEmpty() &&
mChunks[mChunks.Length() - 1].CanCombineWithFollowing(aSource->mChunks[0])) {
mChunks[mChunks.Length() - 1].mDuration += aSource->mChunks[0].mDuration;
aSource->mChunks.RemoveElementAt(0);
}
mChunks.MoveElementsFrom(aSource->mChunks);
}
void RemoveLeading(TrackTicks aDuration)
{
RemoveLeadingInternal(aDuration, 0);
}
virtual void AppendFrom(MediaSegment* aSource)
{
NS_ASSERTION(aSource->GetType() == C::StaticType(), "Wrong type");
AppendFrom(static_cast<C*>(aSource));
AppendFromInternal(static_cast<C*>(aSource));
}
void AppendFrom(C* aSource)
{
AppendFromInternal(aSource);
}
virtual void AppendSlice(const MediaSegment& aSource,
TrackTicks aStart, TrackTicks aEnd)
{
NS_ASSERTION(aSource.GetType() == C::StaticType(), "Wrong type");
AppendSliceInternal(static_cast<const C&>(aSource), aStart, aEnd);
}
void AppendSlice(const C& aOther, TrackTicks aStart, TrackTicks aEnd)
{
AppendSliceInternal(aOther, aStart, aEnd);
}
void InitToSlice(const C& aOther, TrackTicks aStart, TrackTicks aEnd)
{
static_cast<C*>(this)->InitFrom(aOther);
AppendSliceInternal(aOther, aStart, aEnd);
}
/**
* Replace the first aDuration ticks with null media data, because the data
@ -149,13 +158,13 @@ public:
if (mChunks[0].IsNull()) {
TrackTicks extraToForget = NS_MIN(aDuration, mDuration) - mChunks[0].GetDuration();
if (extraToForget > 0) {
RemoveLeadingInternal(extraToForget, 1);
RemoveLeading(extraToForget, 1);
mChunks[0].mDuration += extraToForget;
mDuration += extraToForget;
}
return;
}
RemoveLeading(aDuration);
RemoveLeading(aDuration, 0);
mChunks.InsertElementAt(0)->SetNull(aDuration);
mDuration += aDuration;
}
@ -171,18 +180,49 @@ public:
}
mDuration += aDuration;
}
virtual void AppendNullData(TrackTicks aDuration)
{
if (aDuration <= 0) {
return;
}
if (!mChunks.IsEmpty() && mChunks[mChunks.Length() - 1].IsNull()) {
mChunks[mChunks.Length() - 1].mDuration += aDuration;
} else {
mChunks.AppendElement()->SetNull(aDuration);
}
mDuration += aDuration;
}
protected:
MediaSegmentBase(Type aType) : MediaSegment(aType) {}
void BaseSliceFrom(const MediaSegmentBase<C, Chunk>& aOther,
TrackTicks aStart, TrackTicks aEnd)
/**
* Appends the contents of aSource to this segment, clearing aSource.
*/
void AppendFromInternal(MediaSegmentBase<C, Chunk>* aSource)
{
NS_ASSERTION(aStart >= 0 && aEnd <= aOther.mDuration,
static_cast<C*>(this)->CheckCompatible(*static_cast<C*>(aSource));
mDuration += aSource->mDuration;
aSource->mDuration = 0;
if (!mChunks.IsEmpty() && !aSource->mChunks.IsEmpty() &&
mChunks[mChunks.Length() - 1].CanCombineWithFollowing(aSource->mChunks[0])) {
mChunks[mChunks.Length() - 1].mDuration += aSource->mChunks[0].mDuration;
aSource->mChunks.RemoveElementAt(0);
}
mChunks.MoveElementsFrom(aSource->mChunks);
}
void AppendSliceInternal(const MediaSegmentBase<C, Chunk>& aSource,
TrackTicks aStart, TrackTicks aEnd)
{
static_cast<C*>(this)->CheckCompatible(static_cast<const C&>(aSource));
NS_ASSERTION(aStart <= aEnd, "Endpoints inverted");
NS_ASSERTION(aStart >= 0 && aEnd <= aSource.mDuration,
"Slice out of range");
mDuration += aEnd - aStart;
TrackTicks offset = 0;
for (PRUint32 i = 0; i < aOther.mChunks.Length() && offset < aEnd; ++i) {
const Chunk& c = aOther.mChunks[i];
for (PRUint32 i = 0; i < aSource.mChunks.Length() && offset < aEnd; ++i) {
const Chunk& c = aSource.mChunks[i];
TrackTicks start = NS_MAX(aStart, offset);
TrackTicks nextOffset = offset + c.GetDuration();
TrackTicks end = NS_MIN(aEnd, nextOffset);
@ -242,8 +282,7 @@ protected:
PRUint32 mIndex;
};
protected:
void RemoveLeadingInternal(TrackTicks aDuration, PRUint32 aStartIndex)
void RemoveLeading(TrackTicks aDuration, PRUint32 aStartIndex)
{
NS_ASSERTION(aDuration >= 0, "Can't remove negative duration");
TrackTicks t = aDuration;

View File

@ -215,6 +215,22 @@ public:
* Update "have enough data" flags in aStream.
*/
void UpdateBufferSufficiencyState(SourceMediaStream* aStream);
/*
* If aStream hasn't already been ordered, push it onto aStack and order
* its children.
*/
void UpdateStreamOrderForStream(nsTArray<MediaStream*>* aStack,
already_AddRefed<MediaStream> aStream);
/**
* Compute aStream's mIsConsumed.
*/
static void DetermineWhetherStreamIsConsumed(MediaStream* aStream);
/**
* Sort mStreams so that every stream not in a cycle is after any streams
* it depends on, and every stream in a cycle is marked as being in a cycle.
* Also sets mIsConsumed on every stream.
*/
void UpdateStreamOrder();
/**
* Compute the blocking states of streams from mStateComputedTime
* until the desired future time aEndBlockingDecisions.
@ -223,19 +239,27 @@ public:
*/
void RecomputeBlocking(GraphTime aEndBlockingDecisions);
// The following methods are used to help RecomputeBlocking.
/**
* If aStream isn't already in aStreams, add it and recursively call
* AddBlockingRelatedStreamsToSet on all the streams whose blocking
* status could depend on or affect the state of aStream.
*/
void AddBlockingRelatedStreamsToSet(nsTArray<MediaStream*>* aStreams,
MediaStream* aStream);
/**
* Mark a stream blocked at time aTime. If this results in decisions that need
* to be revisited at some point in the future, *aEnd will be reduced to the
* first time in the future to recompute those decisions.
*/
void MarkStreamBlocked(MediaStream* aStream, GraphTime aTime, GraphTime* aEnd);
void MarkStreamBlocking(MediaStream* aStream);
/**
* Recompute blocking for all streams for the interval starting at aTime.
* Recompute blocking for the streams in aStreams for the interval starting at aTime.
* If this results in decisions that need to be revisited at some point
* in the future, *aEnd will be reduced to the first time in the future to
* recompute those decisions.
*/
void RecomputeBlockingAt(GraphTime aTime, GraphTime aEndBlockingDecisions,
void RecomputeBlockingAt(const nsTArray<MediaStream*>& aStreams,
GraphTime aTime, GraphTime aEndBlockingDecisions,
GraphTime* aEnd);
/**
* Returns true if aStream will underrun at aTime for its own playback.
@ -245,11 +269,6 @@ public:
*/
bool WillUnderrun(MediaStream* aStream, GraphTime aTime,
GraphTime aEndBlockingDecisions, GraphTime* aEnd);
/**
* Return true if there is an explicit blocker set from the current time
* indefinitely far into the future.
*/
bool IsAlwaysExplicitlyBlocked(MediaStream* aStream);
/**
* Given a graph time aTime, convert it to a stream time taking into
* account the time during which aStream is scheduled to be blocked.
@ -305,7 +324,7 @@ public:
/**
* Returns true when there are no active streams.
*/
bool IsEmpty() { return mStreams.IsEmpty(); }
bool IsEmpty() { return mStreams.IsEmpty() && mPortCount == 0; }
// For use by control messages
/**
@ -321,6 +340,10 @@ public:
* stream back to the main thread are flushed.
*/
void RemoveStream(MediaStream* aStream);
/**
* Remove aPort from the graph and release it.
*/
void DestroyPort(MediaInputPort* aPort);
// Data members
@ -358,6 +381,10 @@ public:
* Which update batch we are currently processing.
*/
PRInt64 mProcessingGraphUpdateIndex;
/**
* Number of active MediaInputPorts
*/
PRInt32 mPortCount;
// mMonitor guards the data below.
// MediaStreamGraph normally does its work without holding mMonitor, so it is
@ -487,28 +514,8 @@ StreamTime
MediaStreamGraphImpl::GetDesiredBufferEnd(MediaStream* aStream)
{
StreamTime current = mCurrentTime - aStream->mBufferStartTime;
StreamTime desiredEnd = current;
if (!aStream->mAudioOutputs.IsEmpty()) {
desiredEnd = NS_MAX(desiredEnd, current + MillisecondsToMediaTime(AUDIO_TARGET_MS));
}
if (!aStream->mVideoOutputs.IsEmpty()) {
desiredEnd = NS_MAX(desiredEnd, current + MillisecondsToMediaTime(VIDEO_TARGET_MS));
}
return desiredEnd;
}
bool
MediaStreamGraphImpl::IsAlwaysExplicitlyBlocked(MediaStream* aStream)
{
GraphTime t = mCurrentTime;
while (true) {
GraphTime end;
if (aStream->mExplicitBlockerCount.GetAt(t, &end) == 0)
return false;
if (end >= GRAPH_TIME_MAX)
return true;
t = end;
}
return current +
MillisecondsToMediaTime(NS_MAX(AUDIO_TARGET_MS, VIDEO_TARGET_MS));
}
void
@ -557,10 +564,9 @@ MediaStreamGraphImpl::RemoveStream(MediaStream* aStream)
void
MediaStreamGraphImpl::UpdateConsumptionState(SourceMediaStream* aStream)
{
bool isConsumed = !aStream->mAudioOutputs.IsEmpty() ||
!aStream->mVideoOutputs.IsEmpty();
MediaStreamListener::Consumption state = isConsumed ? MediaStreamListener::CONSUMED
: MediaStreamListener::NOT_CONSUMED;
MediaStreamListener::Consumption state =
aStream->mIsConsumed ? MediaStreamListener::CONSUMED
: MediaStreamListener::NOT_CONSUMED;
if (state != aStream->mLastConsumptionState) {
aStream->mLastConsumptionState = state;
for (PRUint32 j = 0; j < aStream->mListeners.Length(); ++j) {
@ -672,7 +678,6 @@ MediaStreamGraphImpl::UpdateBufferSufficiencyState(SourceMediaStream* aStream)
}
}
StreamTime
MediaStreamGraphImpl::GraphTimeToStreamTime(MediaStream* aStream,
GraphTime aTime)
@ -839,22 +844,14 @@ MediaStreamGraphImpl::UpdateCurrentTime()
mCurrentTime = nextCurrentTime;
}
void
MediaStreamGraphImpl::MarkStreamBlocked(MediaStream* aStream,
GraphTime aTime, GraphTime* aEnd)
{
NS_ASSERTION(!aStream->mBlocked.GetAt(aTime), "MediaStream already blocked");
aStream->mBlocked.SetAtAndAfter(aTime, true);
}
bool
MediaStreamGraphImpl::WillUnderrun(MediaStream* aStream, GraphTime aTime,
GraphTime aEndBlockingDecisions, GraphTime* aEnd)
{
// Finished streams, or streams that aren't being played back, can't underrun.
if (aStream->mFinished ||
(aStream->mAudioOutputs.IsEmpty() && aStream->mVideoOutputs.IsEmpty())) {
// Finished streams can't underrun. ProcessedMediaStreams also can't cause
// underrun currently, since we'll always be able to produce data for them
// unless they block on some other stream.
if (aStream->mFinished || aStream->AsProcessedStream()) {
return false;
}
GraphTime bufferEnd =
@ -884,33 +881,126 @@ MediaStreamGraphImpl::WillUnderrun(MediaStream* aStream, GraphTime aTime,
return false;
}
void
MediaStreamGraphImpl::DetermineWhetherStreamIsConsumed(MediaStream* aStream)
{
if (aStream->mKnowIsConsumed)
return;
aStream->mKnowIsConsumed = true;
if (!aStream->mAudioOutputs.IsEmpty() ||
!aStream->mVideoOutputs.IsEmpty()) {
aStream->mIsConsumed = true;
return;
}
for (PRUint32 i = 0; i < aStream->mConsumers.Length(); ++i) {
MediaStream* dest = aStream->mConsumers[i]->mDest;
DetermineWhetherStreamIsConsumed(dest);
if (dest->mIsConsumed) {
aStream->mIsConsumed = true;
return;
}
}
}
void
MediaStreamGraphImpl::UpdateStreamOrderForStream(nsTArray<MediaStream*>* aStack,
already_AddRefed<MediaStream> aStream)
{
nsRefPtr<MediaStream> stream = aStream;
NS_ASSERTION(!stream->mHasBeenOrdered, "stream should not have already been ordered");
if (stream->mIsOnOrderingStack) {
for (PRInt32 i = aStack->Length() - 1; ; --i) {
aStack->ElementAt(i)->AsProcessedStream()->mInCycle = true;
if (aStack->ElementAt(i) == stream)
break;
}
return;
}
SourceMediaStream* s = stream->AsSourceStream();
if (s) {
DetermineWhetherStreamIsConsumed(stream);
}
ProcessedMediaStream* ps = stream->AsProcessedStream();
if (ps) {
aStack->AppendElement(stream);
stream->mIsOnOrderingStack = true;
for (PRUint32 i = 0; i < ps->mInputs.Length(); ++i) {
MediaStream* source = ps->mInputs[i]->mSource;
if (!source->mHasBeenOrdered) {
nsRefPtr<MediaStream> s = source;
UpdateStreamOrderForStream(aStack, s.forget());
}
}
aStack->RemoveElementAt(aStack->Length() - 1);
stream->mIsOnOrderingStack = false;
}
stream->mHasBeenOrdered = true;
*mStreams.AppendElement() = stream.forget();
}
void
MediaStreamGraphImpl::UpdateStreamOrder()
{
nsTArray<nsRefPtr<MediaStream> > oldStreams;
oldStreams.SwapElements(mStreams);
for (PRUint32 i = 0; i < oldStreams.Length(); ++i) {
MediaStream* stream = oldStreams[i];
stream->mHasBeenOrdered = false;
stream->mKnowIsConsumed = false;
stream->mIsOnOrderingStack = false;
stream->mInBlockingSet = false;
ProcessedMediaStream* ps = stream->AsProcessedStream();
if (ps) {
ps->mInCycle = false;
}
}
nsAutoTArray<MediaStream*,10> stack;
for (PRUint32 i = 0; i < oldStreams.Length(); ++i) {
if (!oldStreams[i]->mHasBeenOrdered) {
UpdateStreamOrderForStream(&stack, oldStreams[i].forget());
}
}
}
void
MediaStreamGraphImpl::RecomputeBlocking(GraphTime aEndBlockingDecisions)
{
bool blockingDecisionsWillChange = false;
while (mStateComputedTime < aEndBlockingDecisions) {
LOG(PR_LOG_DEBUG, ("Media graph %p computing blocking for time %f",
this, MediaTimeToSeconds(mStateComputedTime)));
GraphTime end = GRAPH_TIME_MAX;
RecomputeBlockingAt(mStateComputedTime, aEndBlockingDecisions, &end);
LOG(PR_LOG_DEBUG, ("Media graph %p computed blocking for interval %f to %f",
this, MediaTimeToSeconds(mStateComputedTime),
MediaTimeToSeconds(end)));
mStateComputedTime = end;
if (end < GRAPH_TIME_MAX) {
blockingDecisionsWillChange = true;
}
}
mStateComputedTime = aEndBlockingDecisions;
LOG(PR_LOG_DEBUG, ("Media graph %p computing blocking for time %f",
this, MediaTimeToSeconds(mStateComputedTime)));
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
if (!stream->mInBlockingSet) {
// Compute a partition of the streams containing 'stream' such that we can
// compute the blocking status of each subset independently.
nsAutoTArray<MediaStream*,10> streamSet;
AddBlockingRelatedStreamsToSet(&streamSet, stream);
GraphTime end;
for (GraphTime t = mStateComputedTime;
t < aEndBlockingDecisions; t = end) {
end = GRAPH_TIME_MAX;
RecomputeBlockingAt(streamSet, t, aEndBlockingDecisions, &end);
if (end < GRAPH_TIME_MAX) {
blockingDecisionsWillChange = true;
}
}
}
GraphTime end;
stream->mBlocked.GetAt(mCurrentTime, &end);
if (end < GRAPH_TIME_MAX) {
blockingDecisionsWillChange = true;
}
}
LOG(PR_LOG_DEBUG, ("Media graph %p computed blocking for interval %f to %f",
this, MediaTimeToSeconds(mStateComputedTime),
MediaTimeToSeconds(aEndBlockingDecisions)));
mStateComputedTime = aEndBlockingDecisions;
if (blockingDecisionsWillChange) {
// Make sure we wake up to notify listeners about these changes.
EnsureNextIteration();
@ -918,28 +1008,74 @@ MediaStreamGraphImpl::RecomputeBlocking(GraphTime aEndBlockingDecisions)
}
void
MediaStreamGraphImpl::RecomputeBlockingAt(GraphTime aTime,
MediaStreamGraphImpl::AddBlockingRelatedStreamsToSet(nsTArray<MediaStream*>* aStreams,
MediaStream* aStream)
{
if (aStream->mInBlockingSet)
return;
aStream->mInBlockingSet = true;
aStreams->AppendElement(aStream);
for (PRUint32 i = 0; i < aStream->mConsumers.Length(); ++i) {
MediaInputPort* port = aStream->mConsumers[i];
if (port->mFlags & (MediaInputPort::FLAG_BLOCK_INPUT | MediaInputPort::FLAG_BLOCK_OUTPUT)) {
AddBlockingRelatedStreamsToSet(aStreams, port->mDest);
}
}
ProcessedMediaStream* ps = aStream->AsProcessedStream();
if (ps) {
for (PRUint32 i = 0; i < ps->mInputs.Length(); ++i) {
MediaInputPort* port = ps->mInputs[i];
if (port->mFlags & (MediaInputPort::FLAG_BLOCK_INPUT | MediaInputPort::FLAG_BLOCK_OUTPUT)) {
AddBlockingRelatedStreamsToSet(aStreams, port->mSource);
}
}
}
}
void
MediaStreamGraphImpl::MarkStreamBlocking(MediaStream* aStream)
{
if (aStream->mBlockInThisPhase)
return;
aStream->mBlockInThisPhase = true;
for (PRUint32 i = 0; i < aStream->mConsumers.Length(); ++i) {
MediaInputPort* port = aStream->mConsumers[i];
if (port->mFlags & MediaInputPort::FLAG_BLOCK_OUTPUT) {
MarkStreamBlocking(port->mDest);
}
}
ProcessedMediaStream* ps = aStream->AsProcessedStream();
if (ps) {
for (PRUint32 i = 0; i < ps->mInputs.Length(); ++i) {
MediaInputPort* port = ps->mInputs[i];
if (port->mFlags & MediaInputPort::FLAG_BLOCK_INPUT) {
MarkStreamBlocking(port->mSource);
}
}
}
}
void
MediaStreamGraphImpl::RecomputeBlockingAt(const nsTArray<MediaStream*>& aStreams,
GraphTime aTime,
GraphTime aEndBlockingDecisions,
GraphTime* aEnd)
{
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
stream->mBlocked.SetAtAndAfter(aTime, false);
for (PRUint32 i = 0; i < aStreams.Length(); ++i) {
MediaStream* stream = aStreams[i];
stream->mBlockInThisPhase = false;
}
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
// Stream might be blocked by some other stream (due to processing
// constraints)
if (stream->mBlocked.GetAt(aTime)) {
continue;
}
for (PRUint32 i = 0; i < aStreams.Length(); ++i) {
MediaStream* stream = aStreams[i];
if (stream->mFinished) {
GraphTime endTime = StreamTimeToGraphTime(stream, stream->GetBufferEnd());
if (endTime <= aTime) {
LOG(PR_LOG_DEBUG, ("MediaStream %p is blocked due to being finished", stream));
MarkStreamBlocked(stream, aTime, aEnd);
// We'll block indefinitely
MarkStreamBlocking(stream);
*aEnd = aEndBlockingDecisions;
continue;
} else {
LOG(PR_LOG_DEBUG, ("MediaStream %p is finished, but not blocked yet (end at %f, with blocking at %f)",
@ -949,25 +1085,29 @@ MediaStreamGraphImpl::RecomputeBlockingAt(GraphTime aTime,
}
}
// We don't need to explicitly check for cycles; streams in a cycle will
// just never be able to produce data, and WillUnderrun will trigger.
GraphTime end;
bool explicitBlock = stream->mExplicitBlockerCount.GetAt(aTime, &end) > 0;
*aEnd = NS_MIN(*aEnd, end);
if (explicitBlock) {
LOG(PR_LOG_DEBUG, ("MediaStream %p is blocked due to explicit blocker", stream));
MarkStreamBlocked(stream, aTime, aEnd);
MarkStreamBlocking(stream);
continue;
}
bool underrun = WillUnderrun(stream, aTime, aEndBlockingDecisions, aEnd);
if (underrun) {
MarkStreamBlocked(stream, aTime, aEnd);
// We'll block indefinitely
MarkStreamBlocking(stream);
*aEnd = aEndBlockingDecisions;
continue;
}
}
NS_ASSERTION(*aEnd > aTime, "Failed to advance!");
for (PRUint32 i = 0; i < aStreams.Length(); ++i) {
MediaStream* stream = aStreams[i];
stream->mBlocked.SetAtAndAfter(aTime, stream->mBlockInThisPhase);
}
}
void
@ -1082,7 +1222,7 @@ MediaStreamGraphImpl::PlayAudio(MediaStream* aStream,
TrackTicks endTicks =
track->TimeToTicksRoundDown(GraphTimeToStreamTime(aStream, end));
output.SliceFrom(*audio, startTicks, endTicks);
output.AppendSlice(*audio, startTicks, endTicks);
output.ApplyVolume(volume);
LOG(PR_LOG_DEBUG, ("MediaStream %p writing samples for %f to %f (samples %lld to %lld)",
aStream, MediaTimeToSeconds(t), MediaTimeToSeconds(end),
@ -1218,12 +1358,14 @@ MediaStreamGraphImpl::RunThread()
}
messageQueue.Clear();
UpdateStreamOrder();
PRInt32 writeAudioUpTo = AUDIO_TARGET_MS;
GraphTime endBlockingDecisions =
mCurrentTime + MillisecondsToMediaTime(writeAudioUpTo);
// Grab pending ProcessingEngine results.
bool ensureNextIteration = false;
// Grab pending stream input.
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
SourceMediaStream* is = mStreams[i]->AsSourceStream();
if (is) {
@ -1232,15 +1374,24 @@ MediaStreamGraphImpl::RunThread()
}
}
// Figure out which streams are blocked and when.
GraphTime prevComputedTime = mStateComputedTime;
RecomputeBlocking(endBlockingDecisions);
// Play stream contents.
PRUint32 audioStreamsActive = 0;
bool allBlockedForever = true;
// Figure out what each stream wants to do
for (PRUint32 i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
UpdateFirstActiveTracks(stream);
ProcessedMediaStream* ps = stream->AsProcessedStream();
if (ps && !ps->mFinished) {
ps->ProduceOutput(prevComputedTime, mStateComputedTime);
NS_ASSERTION(stream->mBuffer.GetEnd() >=
GraphTimeToStreamTime(stream, mStateComputedTime),
"Stream did not produce enough data");
}
CreateOrDestroyAudioStream(prevComputedTime, stream);
PlayAudio(stream, prevComputedTime, mStateComputedTime);
if (stream->mAudioOutput) {
@ -1260,6 +1411,8 @@ MediaStreamGraphImpl::RunThread()
EnsureNextIteration();
}
// Send updates to the main thread and wait for the next control loop
// iteration.
{
// Not using MonitorAutoLock since we need to unlock in a way
// that doesn't match lexical scopes.
@ -1579,9 +1732,30 @@ MediaStream::GraphImpl()
return gGraph;
}
MediaStreamGraph*
MediaStream::Graph()
{
return gGraph;
}
StreamTime
MediaStream::GraphTimeToStreamTime(GraphTime aTime)
{
return GraphImpl()->GraphTimeToStreamTime(this, aTime);
}
void
MediaStream::FinishOnGraphThread()
{
GraphImpl()->FinishStream(this);
}
void
MediaStream::DestroyImpl()
{
for (PRInt32 i = mConsumers.Length() - 1; i >= 0; --i) {
mConsumers[i]->Disconnect();
}
if (mAudioOutput) {
mAudioOutput->Shutdown();
mAudioOutput = nullptr;
@ -1888,9 +2062,135 @@ SourceMediaStream::Finish()
}
}
static const PRUint32 kThreadLimit = 4;
static const PRUint32 kIdleThreadLimit = 4;
static const PRUint32 kIdleThreadTimeoutMs = 2000;
void
MediaInputPort::Init()
{
LOG(PR_LOG_DEBUG, ("Adding MediaInputPort %p (from %p to %p) to the graph",
this, mSource, mDest));
mSource->AddConsumer(this);
mDest->AddInput(this);
// mPortCount decremented in Disconnect()
++mDest->GraphImpl()->mPortCount;
}
void
MediaInputPort::Disconnect()
{
NS_ASSERTION(!mSource == !mDest,
"mSource must either both be null or both non-null");
if (!mSource)
return;
--mDest->GraphImpl()->mPortCount;
mSource->RemoveConsumer(this);
mSource = nullptr;
mDest->RemoveInput(this);
mDest = nullptr;
}
MediaInputPort::InputInterval
MediaInputPort::GetNextInputInterval(GraphTime aTime)
{
InputInterval result = { GRAPH_TIME_MAX, GRAPH_TIME_MAX, false };
GraphTime t = aTime;
GraphTime end;
for (;;) {
if (!mDest->mBlocked.GetAt(t, &end))
break;
if (end == GRAPH_TIME_MAX)
return result;
t = end;
}
result.mStart = t;
GraphTime sourceEnd;
result.mInputIsBlocked = mSource->mBlocked.GetAt(t, &sourceEnd);
result.mEnd = NS_MIN(end, sourceEnd);
return result;
}
void
MediaInputPort::Destroy()
{
class Message : public ControlMessage {
public:
Message(MediaInputPort* aPort)
: ControlMessage(aPort->GetDestination()), mPort(aPort) {}
virtual void Run()
{
mPort->Disconnect();
NS_RELEASE(mPort);
}
virtual void RunDuringShutdown()
{
Run();
}
// This does not need to be strongly referenced; the graph is holding
// a strong reference to the port, which we will remove. This will be the
// last message for the port.
MediaInputPort* mPort;
};
mSource->GraphImpl()->AppendMessage(new Message(this));
}
MediaInputPort*
ProcessedMediaStream::AllocateInputPort(MediaStream* aStream, PRUint32 aFlags)
{
class Message : public ControlMessage {
public:
Message(MediaInputPort* aPort)
: ControlMessage(aPort->GetDestination()),
mPort(aPort) {}
virtual void Run()
{
mPort->Init();
}
MediaInputPort* mPort;
};
MediaInputPort* port = new MediaInputPort(aStream, this, aFlags);
NS_ADDREF(port);
GraphImpl()->AppendMessage(new Message(port));
return port;
}
void
ProcessedMediaStream::Finish()
{
class Message : public ControlMessage {
public:
Message(ProcessedMediaStream* aStream)
: ControlMessage(aStream) {}
virtual void Run()
{
mStream->GraphImpl()->FinishStream(mStream);
}
};
GraphImpl()->AppendMessage(new Message(this));
}
void
ProcessedMediaStream::SetAutofinish(bool aAutofinish)
{
class Message : public ControlMessage {
public:
Message(ProcessedMediaStream* aStream, bool aAutofinish)
: ControlMessage(aStream), mAutofinish(aAutofinish) {}
virtual void Run()
{
mStream->AsProcessedStream()->SetAutofinishImpl(mAutofinish);
}
bool mAutofinish;
};
GraphImpl()->AppendMessage(new Message(this, aAutofinish));
}
void
ProcessedMediaStream::DestroyImpl()
{
for (PRInt32 i = mInputs.Length() - 1; i >= 0; --i) {
mInputs[i]->Disconnect();
}
MediaStream::DestroyImpl();
}
/**
* We make the initial mCurrentTime nonzero so that zero times can have
@ -1902,6 +2202,7 @@ MediaStreamGraphImpl::MediaStreamGraphImpl()
: mCurrentTime(INITIAL_CURRENT_TIME)
, mStateComputedTime(INITIAL_CURRENT_TIME)
, mProcessingGraphUpdateIndex(0)
, mPortCount(0)
, mMonitor("MediaStreamGraphImpl")
, mLifecycleState(LIFECYCLE_THREAD_NOT_STARTED)
, mWaitState(WAITSTATE_RUNNING)

View File

@ -57,15 +57,6 @@ const GraphTime GRAPH_TIME_MAX = MEDIA_TIME_MAX;
*
* When the graph is changed, we may need to throw out buffered data and
* reprocess it. This is triggered automatically by the MediaStreamGraph.
*
* Streams that use different sampling rates complicate things a lot. We
* considered forcing all streams to have the same audio sample rate, resampling
* at inputs and outputs only, but that would create situations where a stream
* is resampled from X to Y and then back to X unnecessarily. It seems easier
* to just live with streams having different sample rates. We do require that
* the sample rate for a stream be constant for the life of a stream.
*
* XXX does not yet support blockInput/blockOutput functionality.
*/
class MediaStreamGraph;
@ -148,6 +139,7 @@ public:
* aTrackEvents can be any combination of TRACK_EVENT_CREATED and
* TRACK_EVENT_ENDED. aQueuedMedia is the data being added to the track
* at aTrackOffset (relative to the start of the stream).
* aQueuedMedia can be null if there is no output.
*/
virtual void NotifyQueuedTrackChanges(MediaStreamGraph* aGraph, TrackID aID,
TrackRate aTrackRate,
@ -158,6 +150,8 @@ public:
class MediaStreamGraphImpl;
class SourceMediaStream;
class ProcessedMediaStream;
class MediaInputPort;
/**
* A stream of synchronized audio and video data. All (not blocked) streams
@ -250,6 +244,7 @@ public:
* Returns the graph that owns this stream.
*/
MediaStreamGraphImpl* GraphImpl();
MediaStreamGraph* Graph();
// Control API.
// Since a stream can be played multiple ways, we need to combine independent
@ -294,8 +289,10 @@ public:
}
friend class MediaStreamGraphImpl;
friend class MediaInputPort;
virtual SourceMediaStream* AsSourceStream() { return nullptr; }
virtual ProcessedMediaStream* AsProcessedStream() { return nullptr; }
// media graph thread only
void Init();
@ -330,10 +327,19 @@ public:
{
mListeners.RemoveElement(aListener);
}
#ifdef DEBUG
void AddConsumer(MediaInputPort* aPort)
{
mConsumers.AppendElement(aPort);
}
void RemoveConsumer(MediaInputPort* aPort)
{
mConsumers.RemoveElement(aPort);
}
const StreamBuffer& GetStreamBuffer() { return mBuffer; }
#endif
GraphTime GetStreamBufferStartTime() { return mBufferStartTime; }
StreamTime GraphTimeToStreamTime(GraphTime aTime);
bool IsFinishedOnGraphThread() { return mFinished; }
void FinishOnGraphThread();
protected:
virtual void AdvanceTimeVaryingValuesToCurrentTime(GraphTime aCurrentTime, GraphTime aBlockedTime)
@ -384,6 +390,9 @@ protected:
// Maps graph time to the graph update that affected this stream at that time
TimeVarying<GraphTime,PRInt64> mGraphUpdateIndices;
// MediaInputPorts to which this is connected
nsTArray<MediaInputPort*> mConsumers;
/**
* When true, this means the stream will be finished once all
* buffered data has been consumed.
@ -409,6 +418,22 @@ protected:
// tracks start at the same time, the one with the lowest ID.
TrackID mFirstActiveTracks[MediaSegment::TYPE_COUNT];
// Temporary data for ordering streams by dependency graph
bool mHasBeenOrdered;
bool mIsOnOrderingStack;
// Temporary data to record if the stream is being consumed
// (i.e. has track data being played, or is feeding into some stream
// that is being consumed).
bool mIsConsumed;
// True if the above value is accurate.
bool mKnowIsConsumed;
// Temporary data for computing blocking status of streams
// True if we've added this stream to the set of streams we're computing
// blocking for.
bool mInBlockingSet;
// True if this stream should be blocked in this phase.
bool mBlockInThisPhase;
// This state is only used on the main thread.
nsDOMMediaStream* mWrapper;
// Main-thread views of state
@ -492,7 +517,6 @@ public:
// XXX need a Reset API
friend class MediaStreamGraph;
friend class MediaStreamGraphImpl;
struct ThreadAndRunnable {
@ -552,6 +576,159 @@ protected:
bool mDestroyed;
};
/**
* Represents a connection between a ProcessedMediaStream and one of its
* input streams.
* We make these refcounted so that stream-related messages with MediaInputPort*
* pointers can be sent to the main thread safely.
*
* When a port's source or destination stream dies, the stream's DestroyImpl
* calls MediaInputPort::Disconnect to disconnect the port from
* the source and destination streams.
*
* The lifetimes of MediaInputPort are controlled from the main thread.
* The media graph adds a reference to the port. When a MediaInputPort is no
* longer needed, main-thread code sends a Destroy message for the port and
* clears its reference (the last main-thread reference to the object). When
* the Destroy message is processed on the graph manager thread we disconnect
* the port and drop the graph's reference, destroying the object.
*/
class MediaInputPort {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaInputPort)
/**
* The FLAG_BLOCK_INPUT and FLAG_BLOCK_OUTPUT flags can be used to control
* exactly how the blocking statuses of the input and output streams affect
* each other.
*/
enum {
// When set, blocking on the input stream forces blocking on the output
// stream.
FLAG_BLOCK_INPUT = 0x01,
// When set, blocking on the output stream forces blocking on the input
// stream.
FLAG_BLOCK_OUTPUT = 0x02
};
// Do not call this constructor directly. Instead call aDest->AllocateInputPort.
MediaInputPort(MediaStream* aSource, ProcessedMediaStream* aDest,
PRUint32 aFlags)
: mSource(aSource)
, mDest(aDest)
, mFlags(aFlags)
{
MOZ_COUNT_CTOR(MediaInputPort);
}
~MediaInputPort()
{
MOZ_COUNT_DTOR(MediaInputPort);
}
// Called on graph manager thread
// Do not call these from outside MediaStreamGraph.cpp!
void Init();
// Called during message processing to trigger removal of this stream.
void Disconnect();
// Control API
/**
* Disconnects and destroys the port. The caller must not reference this
* object again.
*/
void Destroy();
// Any thread
MediaStream* GetSource() { return mSource; }
ProcessedMediaStream* GetDestination() { return mDest; }
// Call on graph manager thread
struct InputInterval {
GraphTime mStart;
GraphTime mEnd;
bool mInputIsBlocked;
};
// Find the next time interval starting at or after aTime during which
// mDest is not blocked and mSource's blocking status does not change.
InputInterval GetNextInputInterval(GraphTime aTime);
protected:
friend class MediaStreamGraphImpl;
friend class MediaStream;
friend class ProcessedMediaStream;
// Never modified after Init()
MediaStream* mSource;
ProcessedMediaStream* mDest;
PRUint32 mFlags;
};
/**
* This stream processes zero or more input streams in parallel to produce
* its output. The details of how the output is produced are handled by
* subclasses overriding the ProduceOutput method.
*/
class ProcessedMediaStream : public MediaStream {
public:
ProcessedMediaStream(nsDOMMediaStream* aWrapper)
: MediaStream(aWrapper), mAutofinish(false), mInCycle(false)
{}
// Control API.
/**
* Allocates a new input port attached to source aStream.
* This stream can be removed by calling MediaInputPort::Remove().
*/
MediaInputPort* AllocateInputPort(MediaStream* aStream, PRUint32 aFlags = 0);
/**
* Force this stream into the finished state.
*/
void Finish();
/**
* Set the autofinish flag on this stream (defaults to false). When this flag
* is set, and all input streams are in the finished state (including if there
* are no input streams), this stream automatically enters the finished state.
*/
void SetAutofinish(bool aAutofinish);
virtual ProcessedMediaStream* AsProcessedStream() { return this; }
friend class MediaStreamGraphImpl;
// Do not call these from outside MediaStreamGraph.cpp!
virtual void AddInput(MediaInputPort* aPort)
{
mInputs.AppendElement(aPort);
}
virtual void RemoveInput(MediaInputPort* aPort)
{
mInputs.RemoveElement(aPort);
}
bool HasInputPort(MediaInputPort* aPort)
{
return mInputs.Contains(aPort);
}
virtual void DestroyImpl();
/**
* This gets called after we've computed the blocking states for all
* streams (mBlocked is up to date up to mStateComputedTime).
* Also, we've produced output for all streams up to this one. If this stream
* is not in a cycle, then all its source streams have produced data.
* Generate output up to mStateComputedTime.
* This is called only on streams that have not finished.
*/
virtual void ProduceOutput(GraphTime aFrom, GraphTime aTo) = 0;
void SetAutofinishImpl(bool aAutofinish) { mAutofinish = aAutofinish; }
protected:
// This state is all accessed only on the media graph thread.
// The list of all inputs that are currently enabled or waiting to be enabled.
nsTArray<MediaInputPort*> mInputs;
bool mAutofinish;
// True if and only if this stream is in a cycle.
// Updated by MediaStreamGraphImpl::UpdateStreamOrder.
bool mInCycle;
};
/**
* Initially, at least, we will have a singleton MediaStreamGraph per
* process.

View File

@ -38,7 +38,7 @@ void
StreamBuffer::ForgetUpTo(StreamTime aTime)
{
// Round to nearest 50ms so we don't spend too much time pruning segments.
const int roundTo = MillisecondsToMediaTime(50);
const MediaTime roundTo = MillisecondsToMediaTime(50);
StreamTime forget = (aTime/roundTo)*roundTo;
if (forget <= mForgottenTime) {
return;

View File

@ -91,6 +91,7 @@ public:
* two tracks with the same ID (even if they don't overlap in time).
* TODO Tracks can also be enabled and disabled over time.
* TODO Add TimeVarying<TrackTicks,bool> mEnabled.
* Takes ownership of aSegment.
*/
class Track {
public:
@ -161,7 +162,13 @@ public:
void ForgetUpTo(TrackTicks aTime)
{
mSegment->ForgetUpTo(aTime);
#ifdef DEBUG
mForgottenUpTo = NS_MAX<TrackTicks>(mForgottenUpTo, aTime);
#endif
}
#ifdef DEBUG
TrackTicks GetForgottenUpTo() { return mForgottenUpTo; }
#endif
protected:
friend class StreamBuffer;
@ -176,6 +183,7 @@ public:
TrackID mID;
// True when the track ends with the data in mSegment
bool mEnded;
DebugOnly<TrackTicks> mForgottenUpTo;
};
class CompareTracksByID {
@ -244,6 +252,7 @@ public:
++mIndex;
FindMatch();
}
Track* get() { return mBuffer->ElementAt(mIndex); }
Track& operator*() { return *mBuffer->ElementAt(mIndex); }
Track* operator->() { return mBuffer->ElementAt(mIndex); }
private:
@ -270,6 +279,13 @@ public:
* Can't be used to forget beyond GetEnd().
*/
void ForgetUpTo(StreamTime aTime);
/**
* Returns the latest time passed to ForgetUpTo.
*/
StreamTime GetForgottenDuration()
{
return mForgottenTime;
}
protected:
// Any new tracks added will start at or after this time. In other words, the track

View File

@ -106,8 +106,8 @@ public:
void InitFrom(const VideoSegment& aOther)
{
}
void SliceFrom(const VideoSegment& aOther, TrackTicks aStart, TrackTicks aEnd) {
BaseSliceFrom(aOther, aStart, aEnd);
void CheckCompatible(const VideoSegment& aOther) const
{
}
static Type StaticType() { return VIDEO; }
};