Bug 1129263 - Part 5. Add intial remote PeerConnection tracks atomically to SourceMediaStream. r=jesup

This commit is contained in:
Andreas Pehrson 2015-02-11 16:21:11 +08:00
parent 497c8d2219
commit d3e1679cb5
8 changed files with 113 additions and 33 deletions

View File

@ -1231,7 +1231,8 @@ nsresult MediaPipelineReceiveAudio::Init() {
static void AddTrackAndListener(MediaStream* source,
TrackID track_id, TrackRate track_rate,
MediaStreamListener* listener, MediaSegment* segment,
const RefPtr<TrackAddedCallback>& completed) {
const RefPtr<TrackAddedCallback>& completed,
bool queue_track) {
// This both adds the listener and the track
#ifdef MOZILLA_INTERNAL_API
class Message : public ControlMessage {
@ -1273,12 +1274,6 @@ static void AddTrackAndListener(MediaStream* source,
mStream->AsSourceStream()->AddTrack(track_id_,
current_ticks, segment_.forget());
}
// AdvanceKnownTracksTicksTime(HEAT_DEATH_OF_UNIVERSE) means that in
// theory per the API, we can't add more tracks before that
// time. However, the impl actually allows it, and it avoids a whole
// bunch of locking that would be required (and potential blocking)
// if we used smaller values and updated them on each NotifyPull.
mStream->AsSourceStream()->AdvanceKnownTracksTime(STREAM_TIME_MAX);
// We need to know how much has been "inserted" because we're given absolute
// times in NotifyPull.
@ -1294,27 +1289,40 @@ static void AddTrackAndListener(MediaStream* source,
MOZ_ASSERT(listener);
if (!queue_track) {
// We're only queueing the initial set of tracks since they are added
// atomically and have start time 0. When not queueing we have to add
// the track on the MediaStreamGraph thread so it can be added with the
// appropriate start time.
source->GraphImpl()->AppendMessage(new Message(source, track_id, track_rate, segment, listener, completed));
#else
MOZ_MTLOG(ML_INFO, "Dispatched track-add for track id " << track_id <<
" on stream " << source);
return;
}
#endif
source->AddListener(listener);
if (segment->GetType() == MediaSegment::AUDIO) {
source->AsSourceStream()->AddAudioTrack(track_id, track_rate, 0,
static_cast<AudioSegment*>(segment));
static_cast<AudioSegment*>(segment),
SourceMediaStream::ADDTRACK_QUEUED);
} else {
source->AsSourceStream()->AddTrack(track_id, 0, segment);
source->AsSourceStream()->AddTrack(track_id, 0, segment,
SourceMediaStream::ADDTRACK_QUEUED);
}
#endif
MOZ_MTLOG(ML_INFO, "Queued track-add for track id " << track_id <<
" on MediaStream " << source);
}
void GenericReceiveListener::AddSelf(MediaSegment* segment) {
RefPtr<TrackAddedCallback> callback = new GenericReceiveCallback(this);
AddTrackAndListener(source_, track_id_, track_rate_, this, segment, callback);
AddTrackAndListener(source_, track_id_, track_rate_, this, segment, callback,
queue_track_);
}
MediaPipelineReceiveAudio::PipelineListener::PipelineListener(
SourceMediaStream * source, TrackID track_id,
const RefPtr<MediaSessionConduit>& conduit)
: GenericReceiveListener(source, track_id, 16000), // XXX rate assumption
const RefPtr<MediaSessionConduit>& conduit, bool queue_track)
: GenericReceiveListener(source, track_id, 16000, queue_track), // XXX rate assumption
conduit_(conduit)
{
MOZ_ASSERT(track_rate_%100 == 0);
@ -1400,8 +1408,8 @@ nsresult MediaPipelineReceiveVideo::Init() {
}
MediaPipelineReceiveVideo::PipelineListener::PipelineListener(
SourceMediaStream* source, TrackID track_id)
: GenericReceiveListener(source, track_id, source->GraphRate()),
SourceMediaStream* source, TrackID track_id, bool queue_track)
: GenericReceiveListener(source, track_id, source->GraphRate(), queue_track),
width_(640),
height_(480),
#ifdef MOZILLA_INTERNAL_API

View File

@ -298,11 +298,12 @@ class GenericReceiveListener : public MediaStreamListener
{
public:
GenericReceiveListener(SourceMediaStream *source, TrackID track_id,
TrackRate track_rate)
TrackRate track_rate, bool queue_track)
: source_(source),
track_id_(track_id),
track_rate_(track_rate),
played_ticks_(0) {}
played_ticks_(0),
queue_track_(queue_track) {}
virtual ~GenericReceiveListener() {}
@ -321,6 +322,7 @@ class GenericReceiveListener : public MediaStreamListener
TrackID track_id_;
TrackRate track_rate_;
TrackTicks played_ticks_;
bool queue_track_;
};
class TrackAddedCallback {
@ -577,12 +579,13 @@ class MediaPipelineReceiveAudio : public MediaPipelineReceive {
RefPtr<AudioSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter) :
nsAutoPtr<MediaPipelineFilter> filter,
bool queue_track) :
MediaPipelineReceive(pc, main_thread, sts_thread,
stream, media_stream_track_id, level, conduit,
rtp_transport, rtcp_transport, filter),
listener_(new PipelineListener(stream->AsSourceStream(),
numeric_track_id, conduit)) {
numeric_track_id, conduit, queue_track)) {
}
virtual void DetachMediaStream() MOZ_OVERRIDE {
@ -600,7 +603,8 @@ class MediaPipelineReceiveAudio : public MediaPipelineReceive {
class PipelineListener : public GenericReceiveListener {
public:
PipelineListener(SourceMediaStream * source, TrackID track_id,
const RefPtr<MediaSessionConduit>& conduit);
const RefPtr<MediaSessionConduit>& conduit,
bool queue_track);
~PipelineListener()
{
@ -647,13 +651,14 @@ class MediaPipelineReceiveVideo : public MediaPipelineReceive {
RefPtr<VideoSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport,
nsAutoPtr<MediaPipelineFilter> filter) :
nsAutoPtr<MediaPipelineFilter> filter,
bool queue_track) :
MediaPipelineReceive(pc, main_thread, sts_thread,
stream, media_stream_track_id, level, conduit,
rtp_transport, rtcp_transport, filter),
renderer_(new PipelineRenderer(this)),
listener_(new PipelineListener(stream->AsSourceStream(),
numeric_track_id)) {
numeric_track_id, queue_track)) {
}
// Called on the main thread.
@ -705,7 +710,8 @@ class MediaPipelineReceiveVideo : public MediaPipelineReceive {
// Separate class to allow ref counting
class PipelineListener : public GenericReceiveListener {
public:
PipelineListener(SourceMediaStream * source, TrackID track_id);
PipelineListener(SourceMediaStream * source, TrackID track_id,
bool queue_track);
// Implement MediaStreamListener
virtual void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,

View File

@ -427,6 +427,8 @@ MediaPipelineFactory::CreateMediaPipelineReceiving(
TrackID numericTrackId = stream->GetNumericTrackId(aTrack.GetTrackId());
MOZ_ASSERT(numericTrackId != TRACK_INVALID);
bool queue_track = stream->QueueTracks();
MOZ_MTLOG(ML_DEBUG, __FUNCTION__ << ": Creating pipeline for "
<< numericTrackId << " -> " << aTrack.GetTrackId());
@ -442,7 +444,8 @@ MediaPipelineFactory::CreateMediaPipelineReceiving(
static_cast<AudioSessionConduit*>(aConduit.get()), // Ugly downcast.
aRtpFlow,
aRtcpFlow,
aFilter);
aFilter,
queue_track);
} else if (aTrack.GetMediaType() == SdpMediaSection::kVideo) {
pipeline = new MediaPipelineReceiveVideo(
mPC->GetHandle(),
@ -455,7 +458,8 @@ MediaPipelineFactory::CreateMediaPipelineReceiving(
static_cast<VideoSessionConduit*>(aConduit.get()), // Ugly downcast.
aRtpFlow,
aRtcpFlow,
aFilter);
aFilter,
queue_track);
} else {
MOZ_ASSERT(false);
MOZ_MTLOG(ML_ERROR, "Invalid media type in CreateMediaPipelineReceiving");
@ -477,6 +481,10 @@ MediaPipelineFactory::CreateMediaPipelineReceiving(
}
stream->SyncPipeline(pipeline);
if (queue_track) {
stream->TrackQueued(aTrack.GetTrackId());
}
return NS_OK;
}

View File

@ -495,8 +495,6 @@ PeerConnectionImpl::CreateRemoteSourceStreamInfo(nsRefPtr<RemoteSourceStreamInfo
return NS_ERROR_FAILURE;
}
static_cast<SourceMediaStream*>(stream->GetStream())->SetPullEnabled(true);
nsRefPtr<RemoteSourceStreamInfo> remote;
remote = new RemoteSourceStreamInfo(stream.forget(), mMedia, aStreamID);
*aInfo = remote;

View File

@ -1110,6 +1110,36 @@ RemoteSourceStreamInfo::SyncPipeline(
}
}
void
RemoteSourceStreamInfo::TrackQueued(const std::string& trackId)
{
// When tracks start being queued we know the pipelines have been created.
mPipelinesCreated = true;
MOZ_ASSERT(mTracksToQueue.count(trackId) > 0);
mTracksToQueue.erase(trackId);
CSFLogDebug(logTag, "Queued adding of track id %d to MediaStream %p. "
"%zu more tracks to queue.",
GetNumericTrackId(trackId),
GetMediaStream()->GetStream(),
mTracksToQueue.size());
// If all tracks have been queued for this stream, finish adding them.
if (mTracksToQueue.empty()) {
SourceMediaStream* source = GetMediaStream()->GetStream()->AsSourceStream();
source->FinishAddTracks();
source->SetPullEnabled(true);
// AdvanceKnownTracksTicksTime(HEAT_DEATH_OF_UNIVERSE) means that in
// theory per the API, we can't add more tracks before that
// time. However, the impl actually allows it, and it avoids a whole
// bunch of locking that would be required (and potential blocking)
// if we used smaller values and updated them on each NotifyPull.
source->AdvanceKnownTracksTime(STREAM_TIME_MAX);
CSFLogDebug(logTag, "Finished adding tracks to MediaStream %p", source);
}
}
RefPtr<MediaPipeline> SourceStreamInfo::GetPipelineByTrackId_m(
const std::string& trackId) {
ASSERT_ON_THREAD(mParent->GetMainThread());

View File

@ -147,7 +147,8 @@ class RemoteSourceStreamInfo : public SourceStreamInfo {
RemoteSourceStreamInfo(already_AddRefed<DOMMediaStream> aMediaStream,
PeerConnectionMedia *aParent,
const std::string& aId)
: SourceStreamInfo(aMediaStream, aParent, aId)
: SourceStreamInfo(aMediaStream, aParent, aId),
mPipelinesCreated(false)
{
}
@ -162,6 +163,11 @@ class RemoteSourceStreamInfo : public SourceStreamInfo {
virtual void AddTrack(const std::string& track) MOZ_OVERRIDE
{
mTrackIdMap.push_back(track);
MOZ_ASSERT(!mPipelinesCreated || mTracksToQueue.empty(),
"Track added while waiting for existing tracks to be queued.");
if (!mPipelinesCreated) {
mTracksToQueue.insert(track);
}
SourceStreamInfo::AddTrack(track);
}
@ -186,6 +192,17 @@ class RemoteSourceStreamInfo : public SourceStreamInfo {
return NS_OK;
}
/**
* Returns true if a |MediaPipeline| should be queueing its track instead of
* adding it to the |SourceMediaStream| directly.
*/
bool QueueTracks() const
{
return !mPipelinesCreated || !mTracksToQueue.empty();
}
void TrackQueued(const std::string& trackId);
private:
// For remote streams, the MediaStreamGraph API forces us to select a
// numeric track id before creation of the MediaStreamTrack, and does not
@ -195,6 +212,14 @@ class RemoteSourceStreamInfo : public SourceStreamInfo {
// and have the numeric track id selected for us, in which case this variable
// and its dependencies can go away.
std::vector<std::string> mTrackIdMap;
// When a remote stream gets created we need to add its initial set of tracks
// atomically. Here we track which tracks we have created Pipelines for and
// that will be queued later on.
std::set<std::string> mTracksToQueue;
// True if we have finished creating the initial set of pipelines
bool mPipelinesCreated;
};
class PeerConnectionMedia : public sigslot::has_slots<> {

View File

@ -131,14 +131,18 @@ class Fake_SourceMediaStream : public Fake_MediaStream {
mStop(false),
mPeriodic(new Fake_MediaPeriodic(this)) {}
enum {
ADDTRACK_QUEUED = 0x01 // Queue track add until FinishAddTracks()
};
void AddTrack(mozilla::TrackID aID, mozilla::StreamTime aStart,
mozilla::MediaSegment* aSegment) {
mozilla::MediaSegment* aSegment, uint32_t aFlags = 0) {
delete aSegment;
}
void AddAudioTrack(mozilla::TrackID aID, mozilla::TrackRate aRate, mozilla::StreamTime aStart,
mozilla::AudioSegment* aSegment) {
mozilla::AudioSegment* aSegment, uint32_t aFlags = 0) {
delete aSegment;
}
void FinishAddTracks() {}
void EndTrack(mozilla::TrackID aID) {}
bool AppendToTrack(mozilla::TrackID aID, mozilla::MediaSegment* aSegment,

View File

@ -324,7 +324,8 @@ class TestAgentReceive : public TestAgent {
static_cast<mozilla::AudioSessionConduit *>(audio_conduit_.get()),
audio_rtp_transport_.flow_,
audio_rtcp_transport_.flow_,
bundle_filter_);
bundle_filter_,
false);
audio_pipeline_->Init();
}