Bug 820102 - Clean up MediaPipeline threading (re-land after fix). r=derf,jesup

This commit is contained in:
EKR 2012-12-21 06:03:22 -08:00
parent 8b55dd24fe
commit 3f8b6eafea
11 changed files with 459 additions and 285 deletions

View File

@ -0,0 +1,46 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Original author: ekr@rtfm.com
#ifndef databuffer_h__
#define databuffer_h__
#include <algorithm>
#include <mozilla/Scoped.h>
#include <m_cpp_utils.h>
namespace mozilla {
class DataBuffer {
public:
DataBuffer() : data_(nullptr), len_(0) {}
DataBuffer(const uint8_t *data, size_t len) {
Assign(data, len);
}
void Assign(const uint8_t *data, size_t len) {
data_ = new unsigned char[ len ? len : 1]; // Don't depend on new [0].
memcpy(static_cast<void *>(data_.get()),
static_cast<const void *>(data), len);
len_ = len;
}
const uint8_t *data() const { return data_; }
size_t len() const { return len_; }
const bool empty() const { return len_ != 0; }
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataBuffer)
private:
ScopedDeleteArray<uint8_t> data_;
size_t len_;
DISALLOW_COPY_ASSIGN(DataBuffer);
};
}
#endif

View File

@ -40,6 +40,15 @@ class runnable_args_base : public nsRunnable {
// Temporary hack. Really we want to have a template which will do this
#define RUN_ON_THREAD(t, r, h) ((t && (t != nsRefPtr<nsIThread>(do_GetCurrentThread()))) ? t->Dispatch(r, h) : r->Run())
#define ASSERT_ON_THREAD(t) do { \
if (t) { \
bool on; \
nsresult rv; \
rv = t->IsOnCurrentThread(&on); \
MOZ_ASSERT(NS_SUCCEEDED(rv)); \
MOZ_ASSERT(on); \
} \
} while(0)
}
#endif

View File

@ -909,7 +909,7 @@ static short vcmCreateRemoteStream_m(
hints |= nsDOMMediaStream::HINT_CONTENTS_VIDEO;
}
sipcc::RemoteSourceStreamInfo* info;
nsRefPtr<sipcc::RemoteSourceStreamInfo> info;
res = pc.impl()->CreateRemoteSourceStreamInfo(hints, &info);
if (NS_FAILED(res)) {
return VCM_ERROR;
@ -1329,14 +1329,26 @@ static int vcmRxStartICE_m(cc_mcapid_t mcap_id,
if (conduit->ConfigureRecvMediaCodecs(configs))
return VCM_ERROR;
// Now we have all the pieces, create the pipeline
stream->StorePipeline(pc_track_id,
mozilla::RefPtr<mozilla::MediaPipeline> pipeline =
new mozilla::MediaPipelineReceiveAudio(
pc.impl()->GetHandle(),
pc.impl()->GetMainThread().get(),
pc.impl()->GetSTSThread(),
stream->GetMediaStream(),
conduit, rtp_flow, rtcp_flow));
stream->GetMediaStream()->GetStream(),
conduit, rtp_flow, rtcp_flow);
nsresult res = pipeline->Init();
if (NS_FAILED(res)) {
CSFLogError(logTag, "Failure initializing audio pipeline");
return VCM_ERROR;
}
CSFLogDebug(logTag, "Created audio pipeline %p, conduit=%p, pc_stream=%d pc_track=%d",
pipeline.get(), conduit.get(), pc_stream_id, pc_track_id);
stream->StorePipeline(pc_track_id, pipeline);
} else if (CC_IS_VIDEO(mcap_id)) {
std::vector<mozilla::VideoCodecConfig *> configs;
@ -1362,13 +1374,24 @@ static int vcmRxStartICE_m(cc_mcapid_t mcap_id,
return VCM_ERROR;
// Now we have all the pieces, create the pipeline
stream->StorePipeline(pc_track_id,
new mozilla::MediaPipelineReceiveVideo(
pc.impl()->GetMainThread().get(),
pc.impl()->GetSTSThread(),
stream->GetMediaStream(),
conduit, rtp_flow, rtcp_flow));
mozilla::RefPtr<mozilla::MediaPipeline> pipeline =
new mozilla::MediaPipelineReceiveVideo(
pc.impl()->GetHandle(),
pc.impl()->GetMainThread().get(),
pc.impl()->GetSTSThread(),
stream->GetMediaStream()->GetStream(),
conduit, rtp_flow, rtcp_flow);
nsresult res = pipeline->Init();
if (NS_FAILED(res)) {
CSFLogError(logTag, "Failure initializing video pipeline");
return VCM_ERROR;
}
CSFLogDebug(logTag, "Created video pipeline %p, conduit=%p, pc_stream=%d pc_track=%d",
pipeline.get(), conduit.get(), pc_stream_id, pc_track_id);
stream->StorePipeline(pc_track_id, pipeline);
} else {
CSFLogError(logTag, "%s: mcap_id unrecognized", __FUNCTION__);
return VCM_ERROR;
@ -1936,16 +1959,23 @@ static int vcmTxStartICE_m(cc_mcapid_t mcap_id,
if (!conduit || conduit->ConfigureSendMediaCodec(config))
return VCM_ERROR;
mozilla::RefPtr<mozilla::MediaPipelineTransmit> pipeline =
new mozilla::MediaPipelineTransmit(
pc.impl()->GetMainThread().get(),
pc.impl()->GetSTSThread(),
stream->GetMediaStream(),
conduit, rtp_flow, rtcp_flow);
mozilla::RefPtr<mozilla::MediaPipeline> pipeline =
new mozilla::MediaPipelineTransmit(
pc.impl()->GetHandle(),
pc.impl()->GetMainThread().get(),
pc.impl()->GetSTSThread(),
stream->GetMediaStream()->GetStream(),
conduit, rtp_flow, rtcp_flow);
nsresult res = pipeline->Init();
if (NS_FAILED(res)) {
CSFLogError(logTag, "Failure initializing audio pipeline");
return VCM_ERROR;
}
CSFLogDebug(logTag, "Created audio pipeline %p, conduit=%p, pc_stream=%d pc_track=%d",
pipeline.get(), conduit.get(), pc_stream_id, pc_track_id);
// Now we have all the pieces, create the pipeline
stream->StorePipeline(pc_track_id, pipeline);
@ -1968,18 +1998,24 @@ static int vcmTxStartICE_m(cc_mcapid_t mcap_id,
if (!conduit || conduit->ConfigureSendMediaCodec(config))
return VCM_ERROR;
// Create the pipeline
// Now we have all the pieces, create the pipeline
mozilla::RefPtr<mozilla::MediaPipeline> pipeline =
new mozilla::MediaPipelineTransmit(
pc.impl()->GetMainThread().get(),
pc.impl()->GetSTSThread(),
stream->GetMediaStream(),
conduit, rtp_flow, rtcp_flow);
pc.impl()->GetHandle(),
pc.impl()->GetMainThread().get(),
pc.impl()->GetSTSThread(),
stream->GetMediaStream()->GetStream(),
conduit, rtp_flow, rtcp_flow);
nsresult res = pipeline->Init();
if (NS_FAILED(res)) {
CSFLogError(logTag, "Failure initializing video pipeline");
return VCM_ERROR;
}
CSFLogDebug(logTag, "Created video pipeline %p, conduit=%p, pc_stream=%d pc_track=%d",
pipeline.get(), conduit.get(), pc_stream_id, pc_track_id);
// Now we have all the pieces, create the pipeline
stream->StorePipeline(pc_track_id, pipeline);
} else {
CSFLogError(logTag, "%s: mcap_id unrecognized", __FUNCTION__);

View File

@ -23,6 +23,7 @@
#include "nsError.h"
#include "AudioSegment.h"
#include "MediaSegment.h"
#include "databuffer.h"
#include "transportflow.h"
#include "transportlayer.h"
#include "transportlayerdtls.h"
@ -32,6 +33,14 @@
using namespace mozilla;
#ifdef DEBUG
// Dial up pipeline logging in debug mode
#define MP_LOG_INFO PR_LOG_WARN
#else
#define MP_LOG_INFO PR_LOG_INFO
#endif
// Logging context
MOZ_MTLOG_MODULE("mediapipeline");
@ -40,6 +49,7 @@ namespace mozilla {
static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp";
nsresult MediaPipeline::Init() {
ASSERT_ON_THREAD(main_thread_);
conduit_->AttachTransport(transport_);
MOZ_ASSERT(rtp_transport_);
@ -52,7 +62,10 @@ nsresult MediaPipeline::Init() {
if (rtp_transport_->state() == TransportLayer::TS_OPEN) {
res = TransportReady(rtp_transport_);
NS_ENSURE_SUCCESS(res, res);
if (NS_FAILED(res)) {
MOZ_MTLOG(PR_LOG_ERROR, "Error calling TransportReady()");
return res;
}
} else {
if (!muxed_) {
rtcp_transport_->SignalStateChange.connect(this,
@ -60,7 +73,10 @@ nsresult MediaPipeline::Init() {
if (rtcp_transport_->state() == TransportLayer::TS_OPEN) {
res = TransportReady(rtcp_transport_);
NS_ENSURE_SUCCESS(res, res);
if (NS_FAILED(res)) {
MOZ_MTLOG(PR_LOG_ERROR, "Error calling TransportReady()");
return res;
}
}
}
}
@ -70,7 +86,9 @@ nsresult MediaPipeline::Init() {
// Disconnect us from the transport so that we can cleanly destruct
// the pipeline on the main thread.
void MediaPipeline::DetachTransportInt() {
void MediaPipeline::DetachTransport_s() {
ASSERT_ON_THREAD(sts_thread_);
transport_->Detach();
rtp_transport_ = NULL;
rtcp_transport_ = NULL;
@ -78,13 +96,13 @@ void MediaPipeline::DetachTransportInt() {
void MediaPipeline::DetachTransport() {
RUN_ON_THREAD(sts_thread_,
WrapRunnable(this, &MediaPipeline::DetachTransportInt),
WrapRunnable(this, &MediaPipeline::DetachTransport_s),
NS_DISPATCH_SYNC);
}
void MediaPipeline::StateChange(TransportFlow *flow, TransportLayer::State state) {
if (state == TransportLayer::TS_OPEN) {
MOZ_MTLOG(PR_LOG_DEBUG, "Flow is ready");
MOZ_MTLOG(MP_LOG_INFO, "Flow is ready");
TransportReady(flow);
} else if (state == TransportLayer::TS_CLOSED ||
state == TransportLayer::TS_ERROR) {
@ -108,18 +126,21 @@ nsresult MediaPipeline::TransportReady(TransportFlow *flow) {
}
nsresult MediaPipeline::TransportReadyInt(TransportFlow *flow) {
MOZ_ASSERT(!description_.empty());
bool rtcp = !(flow == rtp_transport_);
State *state = rtcp ? &rtcp_state_ : &rtp_state_;
if (*state != MP_CONNECTING) {
MOZ_MTLOG(PR_LOG_ERROR, "Transport ready for flow in wrong state:" <<
(rtcp ? "rtcp" : "rtp"));
description_ << ": " << (rtcp ? "rtcp" : "rtp"));
return NS_ERROR_FAILURE;
}
nsresult res;
MOZ_MTLOG(PR_LOG_DEBUG, "Transport ready for flow " << (rtcp ? "rtcp" : "rtp"));
MOZ_MTLOG(MP_LOG_INFO, "Transport ready for pipeline " <<
static_cast<void *>(this) << " flow " << description_ << ": " <<
(rtcp ? "rtcp" : "rtp"));
// Now instantiate the SRTP objects
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
@ -192,14 +213,14 @@ nsresult MediaPipeline::TransportReadyInt(TransportFlow *flow) {
rtcp_send_srtp_ = rtp_send_srtp_;
rtcp_recv_srtp_ = rtp_recv_srtp_;
MOZ_MTLOG(PR_LOG_DEBUG, "Listening for packets received on " <<
MOZ_MTLOG(MP_LOG_INFO, "Listening for packets received on " <<
static_cast<void *>(dtls->downward()));
dtls->downward()->SignalPacketReceived.connect(this,
&MediaPipeline::
PacketReceived);
} else {
MOZ_MTLOG(PR_LOG_DEBUG, "Listening for RTP packets received on " <<
MOZ_MTLOG(MP_LOG_INFO, "Listening for RTP packets received on " <<
static_cast<void *>(dtls->downward()));
dtls->downward()->SignalPacketReceived.connect(this,
@ -219,7 +240,7 @@ nsresult MediaPipeline::TransportReadyInt(TransportFlow *flow) {
return NS_ERROR_FAILURE;
}
MOZ_MTLOG(PR_LOG_DEBUG, "Listening for RTCP packets received on " <<
MOZ_MTLOG(MP_LOG_INFO, "Listening for RTCP packets received on " <<
static_cast<void *>(dtls->downward()));
// Start listening
@ -239,7 +260,7 @@ nsresult MediaPipeline::TransportFailed(TransportFlow *flow) {
*state = MP_CLOSED;
MOZ_MTLOG(PR_LOG_DEBUG, "Transport closed for flow " << (rtcp ? "rtcp" : "rtp"));
MOZ_MTLOG(MP_LOG_INFO, "Transport closed for flow " << (rtcp ? "rtcp" : "rtp"));
NS_WARNING(
"MediaPipeline Transport failed. This is not properly cleaned up yet");
@ -254,25 +275,10 @@ nsresult MediaPipeline::TransportFailed(TransportFlow *flow) {
}
// Wrapper to send a packet on the STS thread.
nsresult MediaPipeline::SendPacket(TransportFlow *flow, const void *data,
int len) {
nsresult rv;
nsresult res;
ASSERT_ON_THREAD(sts_thread_);
rv = RUN_ON_THREAD(sts_thread_,
WrapRunnableRet(this, &MediaPipeline::SendPacketInt, flow, data, len, &res),
NS_DISPATCH_SYNC);
// res is invalid unless the dispatch succeeded
if (NS_FAILED(rv))
return rv;
return res;
}
nsresult MediaPipeline::SendPacketInt(TransportFlow *flow, const void *data,
int len) {
// Note that we bypass the DTLS layer here
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
flow->GetLayer(TransportLayerDtls::ID()));
@ -295,37 +301,45 @@ nsresult MediaPipeline::SendPacketInt(TransportFlow *flow, const void *data,
void MediaPipeline::increment_rtp_packets_sent() {
++rtp_packets_sent_;
if (!(rtp_packets_sent_ % 1000)) {
MOZ_MTLOG(PR_LOG_DEBUG, "RTP packet count " << static_cast<void *>(this)
if (!(rtp_packets_sent_ % 100)) {
MOZ_MTLOG(MP_LOG_INFO, "RTP sent packet count for " << description_
<< " Pipeline " << static_cast<void *>(this)
<< " Flow : " << static_cast<void *>(rtp_transport_)
<< ": " << rtp_packets_sent_);
}
}
void MediaPipeline::increment_rtcp_packets_sent() {
++rtcp_packets_sent_;
if (!(rtcp_packets_sent_ % 1000)) {
MOZ_MTLOG(PR_LOG_DEBUG, "RTCP packet count " << static_cast<void *>(this)
if (!(rtcp_packets_sent_ % 100)) {
MOZ_MTLOG(MP_LOG_INFO, "RTCP sent packet count for " << description_
<< " Pipeline " << static_cast<void *>(this)
<< " Flow : " << static_cast<void *>(rtcp_transport_)
<< ": " << rtcp_packets_sent_);
}
}
void MediaPipeline::increment_rtp_packets_received() {
++rtp_packets_received_;
if (!(rtp_packets_received_ % 1000)) {
MOZ_MTLOG(PR_LOG_DEBUG, "RTP packet count " << static_cast<void *>(this)
if (!(rtp_packets_received_ % 100)) {
MOZ_MTLOG(MP_LOG_INFO, "RTP received packet count for " << description_
<< " Pipeline " << static_cast<void *>(this)
<< " Flow : " << static_cast<void *>(rtp_transport_)
<< ": " << rtp_packets_received_);
}
}
void MediaPipeline::increment_rtcp_packets_received() {
++rtcp_packets_received_;
if (!(rtcp_packets_received_ % 1000)) {
MOZ_MTLOG(PR_LOG_DEBUG, "RTCP packet count " << static_cast<void *>(this)
if (!(rtcp_packets_received_ % 100)) {
MOZ_MTLOG(MP_LOG_INFO, "RTCP received packet count for " << description_
<< " Pipeline " << static_cast<void *>(this)
<< " Flow : " << static_cast<void *>(rtcp_transport_)
<< ": " << rtcp_packets_received_);
}
}
void MediaPipeline::RtpPacketReceived(TransportLayer *layer,
const unsigned char *data,
size_t len) {
@ -334,9 +348,10 @@ void MediaPipeline::RtpPacketReceived(TransportLayer *layer,
return;
}
// TODO(ekr@rtfm.com): filter for DTLS here and in RtcpPacketReceived
// TODO(ekr@rtfm.com): filter on SSRC for bundle
increment_rtp_packets_received();
if (!conduit_) {
MOZ_MTLOG(PR_LOG_DEBUG, "Discarding incoming packet; media disconnected");
return;
}
MOZ_ASSERT(rtp_recv_srtp_); // This should never happen
@ -346,6 +361,10 @@ void MediaPipeline::RtpPacketReceived(TransportLayer *layer,
return;
}
// TODO(ekr@rtfm.com): filter for DTLS here and in RtcpPacketReceived
// TODO(ekr@rtfm.com): filter on SSRC for bundle
increment_rtp_packets_received();
// Make a copy rather than cast away constness
ScopedDeletePtr<unsigned char> inner_data(
new unsigned char[len]);
@ -367,6 +386,17 @@ void MediaPipeline::RtcpPacketReceived(TransportLayer *layer,
return;
}
if (!conduit_) {
MOZ_MTLOG(PR_LOG_DEBUG, "Discarding incoming packet; media disconnected");
return;
}
if (direction_ == RECEIVE) {
// Discard any RTCP that is being transmitted to us
// This will be unnecessary when we have SSRC filtering.
return;
}
increment_rtcp_packets_received();
MOZ_ASSERT(rtcp_recv_srtp_); // This should never happen
@ -434,25 +464,55 @@ void MediaPipeline::PacketReceived(TransportLayer *layer,
}
nsresult MediaPipelineTransmit::Init() {
ASSERT_ON_THREAD(main_thread_);
description_ = pc_ + "| ";
description_ += conduit_->type() == MediaSessionConduit::AUDIO ?
"Transmit audio" : "Transmit video";
// TODO(ekr@rtfm.com): Check for errors
MOZ_MTLOG(PR_LOG_DEBUG, "Attaching pipeline to stream "
<< static_cast<void *>(stream_) <<
" conduit type=" <<
(conduit_->type() == MediaSessionConduit::AUDIO ?
"audio" : "video") <<
" hints=" << stream_->GetHintContents());
"audio" : "video"));
// Force this to be a refptr so that we are holding a strong reference
// to the media stream.
nsRefPtr<MediaStream> stream (stream_->GetStream());
return RUN_ON_THREAD(main_thread_, WrapRunnable(stream,
&MediaStream::AddListener,
listener_),
NS_DISPATCH_NORMAL);
stream_->AddListener(listener_);
return MediaPipeline::Init();
}
nsresult MediaPipelineTransmit::TransportReady(TransportFlow *flow) {
// Call base ready function.
MediaPipeline::TransportReady(flow);
if (flow == rtp_transport_) {
// TODO(ekr@rtfm.com): Move onto MSG thread.
listener_->SetActive(true);
}
return NS_OK;
}
nsresult MediaPipeline::PipelineTransport::SendRtpPacket(
const void *data, int len) {
nsresult ret;
nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t *>(data),
len));
RUN_ON_THREAD(sts_thread_,
WrapRunnableRet(
RefPtr<MediaPipeline::PipelineTransport>(this),
&MediaPipeline::PipelineTransport::SendRtpPacket_s,
buf, &ret),
NS_DISPATCH_NORMAL);
return NS_OK;
}
nsresult MediaPipeline::PipelineTransport::SendRtpPacket_s(
nsAutoPtr<DataBuffer> data) {
if (!pipeline_)
return NS_OK; // Detached
@ -467,14 +527,15 @@ nsresult MediaPipeline::PipelineTransport::SendRtpPacket(
// libsrtp enciphers in place, so we need a new, big enough
// buffer.
// XXX. allocates and deletes one buffer per packet sent.
int max_len = len + SRTP_MAX_EXPANSION;
// Bug 822129
int max_len = data->len() + SRTP_MAX_EXPANSION;
ScopedDeletePtr<unsigned char> inner_data(
new unsigned char[max_len]);
memcpy(inner_data, data, len);
memcpy(inner_data, data->data(), data->len());
int out_len;
nsresult res = pipeline_->rtp_send_srtp_->ProtectRtp(inner_data,
len,
data->len(),
max_len,
&out_len);
if (!NS_SUCCEEDED(res))
@ -487,6 +548,23 @@ nsresult MediaPipeline::PipelineTransport::SendRtpPacket(
nsresult MediaPipeline::PipelineTransport::SendRtcpPacket(
const void *data, int len) {
nsresult ret;
nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t *>(data),
len));
RUN_ON_THREAD(sts_thread_,
WrapRunnableRet(
RefPtr<MediaPipeline::PipelineTransport>(this),
&MediaPipeline::PipelineTransport::SendRtcpPacket_s,
buf, &ret),
NS_DISPATCH_NORMAL);
return NS_OK;
}
nsresult MediaPipeline::PipelineTransport::SendRtcpPacket_s(
nsAutoPtr<DataBuffer> data) {
if (!pipeline_)
return NS_OK; // Detached
@ -501,14 +579,15 @@ nsresult MediaPipeline::PipelineTransport::SendRtcpPacket(
// libsrtp enciphers in place, so we need a new, big enough
// buffer.
// XXX. allocates and deletes one buffer per packet sent.
int max_len = len + SRTP_MAX_EXPANSION;
// Bug 822129.
int max_len = data->len() + SRTP_MAX_EXPANSION;
ScopedDeletePtr<unsigned char> inner_data(
new unsigned char[max_len]);
memcpy(inner_data, data, len);
memcpy(inner_data, data->data(), data->len());
int out_len;
nsresult res = pipeline_->rtcp_send_srtp_->ProtectRtcp(inner_data,
len,
data->len(),
max_len,
&out_len);
if (!NS_SUCCEEDED(res))
@ -525,22 +604,18 @@ NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
TrackTicks offset,
uint32_t events,
const MediaSegment& queued_media) {
if (!pipeline_)
return; // Detached
MOZ_MTLOG(PR_LOG_DEBUG, "MediaPipeline::NotifyQueuedTrackChanges()");
// Return early if we are not connected to avoid queueing stuff
// up in the conduit
if (pipeline_->rtp_transport_->state() != TransportLayer::TS_OPEN) {
MOZ_MTLOG(PR_LOG_DEBUG, "Transport not ready yet, dropping packets");
if (!active_) {
MOZ_MTLOG(PR_LOG_DEBUG, "Discarding packets because transport not ready");
return;
}
// TODO(ekr@rtfm.com): For now assume that we have only one
// track type and it's destined for us
// See bug 784517
if (queued_media.GetType() == MediaSegment::AUDIO) {
if (pipeline_->conduit_->type() != MediaSessionConduit::AUDIO) {
if (conduit_->type() != MediaSessionConduit::AUDIO) {
// Ignore data in case we have a muxed stream
return;
}
@ -549,14 +624,13 @@ NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
AudioSegment::ChunkIterator iter(*audio);
while(!iter.IsEnded()) {
pipeline_->ProcessAudioChunk(static_cast<AudioSessionConduit *>
(pipeline_->conduit_.get()),
rate, *iter);
ProcessAudioChunk(static_cast<AudioSessionConduit*>(conduit_.get()),
rate, *iter);
iter.Next();
}
} else if (queued_media.GetType() == MediaSegment::VIDEO) {
#ifdef MOZILLA_INTERNAL_API
if (pipeline_->conduit_->type() != MediaSessionConduit::VIDEO) {
if (conduit_->type() != MediaSessionConduit::VIDEO) {
// Ignore data in case we have a muxed stream
return;
}
@ -565,9 +639,8 @@ NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
VideoSegment::ChunkIterator iter(*video);
while(!iter.IsEnded()) {
pipeline_->ProcessVideoChunk(static_cast<VideoSessionConduit *>
(pipeline_->conduit_.get()),
rate, *iter);
ProcessVideoChunk(static_cast<VideoSessionConduit*>(conduit_.get()),
rate, *iter);
iter.Next();
}
#endif
@ -576,9 +649,10 @@ NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
}
}
void MediaPipelineTransmit::ProcessAudioChunk(AudioSessionConduit *conduit,
TrackRate rate,
AudioChunk& chunk) {
void MediaPipelineTransmit::PipelineListener::ProcessAudioChunk(
AudioSessionConduit *conduit,
TrackRate rate,
AudioChunk& chunk) {
// TODO(ekr@rtfm.com): Do more than one channel
nsAutoArrayPtr<int16_t> samples(new int16_t[chunk.mDuration]);
@ -612,9 +686,10 @@ void MediaPipelineTransmit::ProcessAudioChunk(AudioSessionConduit *conduit,
}
#ifdef MOZILLA_INTERNAL_API
void MediaPipelineTransmit::ProcessVideoChunk(VideoSessionConduit *conduit,
TrackRate rate,
VideoChunk& chunk) {
void MediaPipelineTransmit::PipelineListener::ProcessVideoChunk(
VideoSessionConduit* conduit,
TrackRate rate,
VideoChunk& chunk) {
// We now need to send the video frame to the other side
layers::Image *img = chunk.mFrame.GetImage();
if (!img) {
@ -667,27 +742,20 @@ void MediaPipelineTransmit::ProcessVideoChunk(VideoSessionConduit *conduit,
#endif
nsresult MediaPipelineReceiveAudio::Init() {
ASSERT_ON_THREAD(main_thread_);
MOZ_MTLOG(PR_LOG_DEBUG, __FUNCTION__);
// Force this to be a refptr so that we are holding a strong reference
// to the media stream.
nsRefPtr<MediaStream> stream (stream_->GetStream());
return RUN_ON_THREAD(main_thread_, WrapRunnable(stream,
&MediaStream::AddListener,
listener_),
NS_DISPATCH_NORMAL);
description_ = pc_ + "| Receive audio";
stream_->AddListener(listener_);
return MediaPipelineReceive::Init();
}
void MediaPipelineReceiveAudio::PipelineListener::
NotifyPull(MediaStreamGraph* graph, StreamTime total) {
if (!pipeline_)
return; // Detached
SourceMediaStream *source =
pipeline_->stream_->GetStream()->AsSourceStream();
MOZ_ASSERT(source);
if (!source) {
MOZ_ASSERT(source_);
if (!source_) {
MOZ_MTLOG(PR_LOG_ERROR, "NotifyPull() called from a non-SourceMediaStream");
return;
}
@ -715,7 +783,7 @@ NotifyPull(MediaStreamGraph* graph, StreamTime total) {
int samples_length;
MediaConduitErrorCode err =
static_cast<AudioSessionConduit*>(pipeline_->conduit_.get())->GetAudioFrame(
static_cast<AudioSessionConduit*>(conduit_.get())->GetAudioFrame(
static_cast<int16_t *>(samples->Data()),
16000, // Sampling rate fixed at 16 kHz for now
0, // TODO(ekr@rtfm.com): better estimate of capture delay
@ -731,20 +799,21 @@ NotifyPull(MediaStreamGraph* graph, StreamTime total) {
segment.AppendFrames(samples.forget(), samples_length,
0, samples_length, AUDIO_FORMAT_S16);
char buf[32];
PR_snprintf(buf, 32, "%p", source);
source->AppendToTrack(1, // TODO(ekr@rtfm.com): Track ID
&segment);
source_->AppendToTrack(1, // TODO(ekr@rtfm.com): Track ID
&segment);
}
}
nsresult MediaPipelineReceiveVideo::Init() {
ASSERT_ON_THREAD(main_thread_);
MOZ_MTLOG(PR_LOG_DEBUG, __FUNCTION__);
description_ = pc_ + "| Receive video";
static_cast<VideoSessionConduit *>(conduit_.get())->
AttachRenderer(renderer_);
return NS_OK;
return MediaPipelineReceive::Init();
}
MediaPipelineReceiveVideo::PipelineRenderer::PipelineRenderer(
@ -755,7 +824,7 @@ MediaPipelineReceiveVideo::PipelineRenderer::PipelineRenderer(
#ifdef MOZILLA_INTERNAL_API
image_container_ = layers::LayerManager::CreateImageContainer();
SourceMediaStream *source =
pipeline_->stream_->GetStream()->AsSourceStream();
pipeline_->stream_->AsSourceStream();
source->AddTrack(1 /* Track ID */, 30, 0, new VideoSegment());
source->AdvanceKnownTracksTime(STREAM_TIME_MAX);
#endif
@ -768,7 +837,7 @@ void MediaPipelineReceiveVideo::PipelineRenderer::RenderVideoFrame(
int64_t render_time) {
#ifdef MOZILLA_INTERNAL_API
SourceMediaStream *source =
pipeline_->stream_->GetStream()->AsSourceStream();
pipeline_->stream_->AsSourceStream();
// Create a video frame and append it to the track.
ImageFormat format = PLANAR_YCBCR;

View File

@ -17,6 +17,8 @@
#include "MediaConduitInterface.h"
#include "AudioSegment.h"
#include "SrtpFlow.h"
#include "databuffer.h"
#include "runnable_utils.h"
#include "transportflow.h"
#ifdef MOZILLA_INTERNAL_API
@ -35,14 +37,35 @@ namespace mozilla {
// network -> transport -> [us] -> conduit -> [us] -> stream -> Playout
//
// The boxes labeled [us] are just bridge logic implemented in this class
//
// We have to deal with a number of threads:
//
// GSM:
// * Assembles the pipeline
// SocketTransportService
// * Receives notification that ICE and DTLS have completed
// * Processes incoming network data and passes it to the conduit
// * Processes outgoing RTP and RTCP
// MediaStreamGraph
// * Receives outgoing data from the MediaStreamGraph
// * Receives pull requests for more data from the
// MediaStreamGraph
// One or another GIPS threads
// * Receives RTCP messages to send to the other side
// * Processes video frames GIPS wants to render
//
// For a transmitting conduit, "output" is RTP and "input" is RTCP.
// For a receiving conduit, "input" is RTP and "output" is RTCP.
//
class MediaPipeline : public sigslot::has_slots<> {
public:
enum Direction { TRANSMIT, RECEIVE };
enum State { MP_CONNECTING, MP_OPEN, MP_CLOSED };
MediaPipeline(Direction direction,
MediaPipeline(const std::string& pc,
Direction direction,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
nsDOMMediaStream* stream,
MediaStream *stream,
RefPtr<MediaSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport)
@ -64,21 +87,31 @@ class MediaPipeline : public sigslot::has_slots<> {
rtcp_packets_sent_(0),
rtp_packets_received_(0),
rtcp_packets_received_(0),
muxed_((rtcp_transport_ == NULL) || (rtp_transport_ == rtcp_transport_)) {
Init();
muxed_((rtcp_transport_ == NULL) || (rtp_transport_ == rtcp_transport_)),
pc_(pc),
description_() {
}
virtual ~MediaPipeline() {
MOZ_ASSERT(!stream_); // Check that we have shut down already.
}
void Shutdown() {
ASSERT_ON_THREAD(main_thread_);
// First shut down networking and then disconnect from
// the media streams. DetachTransport() is sync so
// we are sure that the transport is shut down before
// we touch stream_ or conduit_.
DetachTransport();
if (stream_) {
DetachMediaStream();
}
}
virtual nsresult Init();
virtual Direction direction() const { return direction_; }
virtual void DetachMediaStream() {}
virtual void DetachTransport();
int rtp_packets_sent() const { return rtp_packets_sent_; }
int rtcp_packets_sent() const { return rtp_packets_sent_; }
int rtp_packets_received() const { return rtp_packets_received_; }
@ -87,13 +120,17 @@ class MediaPipeline : public sigslot::has_slots<> {
// Thread counting
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaPipeline)
protected:
protected:
virtual void DetachMediaStream() {}
// Separate class to allow ref counting
class PipelineTransport : public TransportInterface {
public:
// Implement the TransportInterface functions
PipelineTransport(MediaPipeline *pipeline)
: pipeline_(pipeline) {}
: pipeline_(pipeline),
sts_thread_(pipeline->sts_thread_) {}
void Detach() { pipeline_ = NULL; }
MediaPipeline *pipeline() const { return pipeline_; }
@ -101,7 +138,11 @@ class MediaPipeline : public sigslot::has_slots<> {
virtual nsresult SendRtcpPacket(const void* data, int len);
private:
virtual nsresult SendRtpPacket_s(nsAutoPtr<DataBuffer> data);
virtual nsresult SendRtcpPacket_s(nsAutoPtr<DataBuffer> data);
MediaPipeline *pipeline_; // Raw pointer to avoid cycles
nsCOMPtr<nsIEventTarget> sts_thread_;
};
friend class PipelineTransport;
@ -124,31 +165,51 @@ class MediaPipeline : public sigslot::has_slots<> {
void PacketReceived(TransportLayer *layer, const unsigned char *data,
size_t len);
Direction direction_;
nsDOMMediaStream* stream_;
RefPtr<MediaSessionConduit> conduit_;
RefPtr<MediaStream> stream_; // A pointer to the stream we are servicing.
// Written on the main thread.
// Used on STS and MediaStreamGraph threads.
RefPtr<MediaSessionConduit> conduit_; // Our conduit. Written on the main
// thread. Read on STS thread.
// The transport objects. Read/written on STS thread.
RefPtr<TransportFlow> rtp_transport_;
State rtp_state_;
RefPtr<TransportFlow> rtcp_transport_;
State rtcp_state_;
// Pointers to the threads we need. Initialized at creation
// and used all over the place.
nsCOMPtr<nsIEventTarget> main_thread_;
nsCOMPtr<nsIEventTarget> sts_thread_;
// Created on Init. Referenced by the conduit and eventually
// destroyed on the STS thread.
RefPtr<PipelineTransport> transport_;
bool transport_connected_;
// Used only on STS thread.
RefPtr<SrtpFlow> rtp_send_srtp_;
RefPtr<SrtpFlow> rtcp_send_srtp_;
RefPtr<SrtpFlow> rtp_recv_srtp_;
RefPtr<SrtpFlow> rtcp_recv_srtp_;
// Written only on STS thread. May be read on other
// threads but since there is no mutex, the values
// will only be approximate.
int rtp_packets_sent_;
int rtcp_packets_sent_;
int rtp_packets_received_;
int rtcp_packets_received_;
// Written on Init. Read on STS thread.
bool muxed_;
std::string pc_;
std::string description_;
private:
virtual void DetachTransportInt();
nsresult SendPacketInt(TransportFlow *flow, const void* data, int len);
void DetachTransport();
void DetachTransport_s();
nsresult TransportReadyInt(TransportFlow *flow);
bool IsRtp(const unsigned char *data, size_t len);
@ -159,47 +220,44 @@ class MediaPipeline : public sigslot::has_slots<> {
// and transmitting to the network.
class MediaPipelineTransmit : public MediaPipeline {
public:
MediaPipelineTransmit(nsCOMPtr<nsIEventTarget> main_thread,
MediaPipelineTransmit(const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
nsDOMMediaStream* stream,
MediaStream *stream,
RefPtr<MediaSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport) :
MediaPipeline(TRANSMIT, main_thread, sts_thread,
MediaPipeline(pc, TRANSMIT, main_thread, sts_thread,
stream, conduit, rtp_transport,
rtcp_transport),
listener_(new PipelineListener(this)) {
Init(); // TODO(ekr@rtfm.com): ignoring error
}
listener_(new PipelineListener(conduit)) {}
// Initialize (stuff here may fail)
nsresult Init();
virtual ~MediaPipelineTransmit() {
if (stream_ && listener_){
stream_->GetStream()->RemoveListener(listener_);
// These shouldn't be necessary, but just to make sure
// that if we have messed up ownership somehow the
// interfaces just abort.
listener_->Detach();
}
}
virtual nsresult Init();
// Called on the main thread.
virtual void DetachMediaStream() {
// TODO(ekr@rtfm.com): Are multiple removes a problem?
stream_->GetStream()->RemoveListener(listener_);
stream_ = NULL;
listener_->Detach();
ASSERT_ON_THREAD(main_thread_);
stream_->RemoveListener(listener_);
// Remove our reference so that when the MediaStreamGraph
// releases the listener, it will be destroyed.
listener_ = nullptr;
stream_ = nullptr;
}
// Override MediaPipeline::TransportReady.
virtual nsresult TransportReady(TransportFlow *flow);
// Separate class to allow ref counting
class PipelineListener : public MediaStreamListener {
public:
PipelineListener(MediaPipelineTransmit *pipeline) :
pipeline_(pipeline) {}
void Detach() { pipeline_ = NULL; }
PipelineListener(const RefPtr<MediaSessionConduit>& conduit)
: conduit_(conduit), active_(false) {}
// XXX. This is not thread-safe but the hazard is just
// that active_ = true takes a while to propagate. Revisit
// when 823600 lands.
void SetActive(bool active) { active_ = active; }
// Implement MediaStreamListener
virtual void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
@ -210,18 +268,19 @@ class MediaPipelineTransmit : public MediaPipeline {
virtual void NotifyPull(MediaStreamGraph* aGraph, StreamTime aDesiredTime) {}
private:
MediaPipelineTransmit *pipeline_; // Raw pointer to avoid cycles
virtual void ProcessAudioChunk(AudioSessionConduit *conduit,
TrackRate rate, AudioChunk& chunk);
#ifdef MOZILLA_INTERNAL_API
virtual void ProcessVideoChunk(VideoSessionConduit *conduit,
TrackRate rate, VideoChunk& chunk);
#endif
RefPtr<MediaSessionConduit> conduit_;
volatile bool active_;
};
friend class PipelineListener;
private:
virtual void ProcessAudioChunk(AudioSessionConduit *conduit,
TrackRate rate, AudioChunk& chunk);
#ifdef MOZILLA_INTERNAL_API
virtual void ProcessVideoChunk(VideoSessionConduit *conduit,
TrackRate rate, VideoChunk& chunk);
#endif
RefPtr<PipelineListener> listener_;
RefPtr<PipelineListener> listener_;
};
@ -229,13 +288,14 @@ class MediaPipelineTransmit : public MediaPipeline {
// rendering video.
class MediaPipelineReceive : public MediaPipeline {
public:
MediaPipelineReceive(nsCOMPtr<nsIEventTarget> main_thread,
MediaPipelineReceive(const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
nsDOMMediaStream* stream,
MediaStream *stream,
RefPtr<MediaSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport) :
MediaPipeline(RECEIVE, main_thread, sts_thread,
MediaPipeline(pc, RECEIVE, main_thread, sts_thread,
stream, conduit, rtp_transport,
rtcp_transport),
segments_added_(0) {
@ -254,41 +314,40 @@ class MediaPipelineReceive : public MediaPipeline {
// rendering audio.
class MediaPipelineReceiveAudio : public MediaPipelineReceive {
public:
MediaPipelineReceiveAudio(nsCOMPtr<nsIEventTarget> main_thread,
MediaPipelineReceiveAudio(const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
nsDOMMediaStream* stream,
MediaStream *stream,
RefPtr<AudioSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport) :
MediaPipelineReceive(main_thread, sts_thread,
MediaPipelineReceive(pc, main_thread, sts_thread,
stream, conduit, rtp_transport,
rtcp_transport),
listener_(new PipelineListener(this)) {
Init();
}
~MediaPipelineReceiveAudio() {
if (stream_ && listener_) {
stream_->GetStream()->RemoveListener(listener_);
listener_->Detach();
}
listener_(new PipelineListener(stream->AsSourceStream(),
conduit)) {
}
virtual void DetachMediaStream() {
// TODO(ekr@rtfm.com): Are multiple removes a problem?
stream_->GetStream()->RemoveListener(listener_);
stream_ = NULL;
listener_->Detach();
ASSERT_ON_THREAD(main_thread_);
stream_->RemoveListener(listener_);
// Remove our reference so that when the MediaStreamGraph
// releases the listener, it will be destroyed.
listener_ = nullptr;
stream_ = nullptr;
}
virtual nsresult Init();
private:
// Separate class to allow ref counting
class PipelineListener : public MediaStreamListener {
public:
PipelineListener(MediaPipelineReceiveAudio *pipeline)
: pipeline_(pipeline),
played_(0) {}
void Detach() { pipeline_ = NULL; }
PipelineListener(SourceMediaStream * source,
const RefPtr<MediaSessionConduit>& conduit)
: source_(source),
conduit_(conduit),
played_(0) {}
// Implement MediaStreamListener
virtual void NotifyQueuedTrackChanges(MediaStreamGraph* graph, TrackID tid,
@ -299,12 +358,10 @@ class MediaPipelineReceiveAudio : public MediaPipelineReceive {
virtual void NotifyPull(MediaStreamGraph* aGraph, StreamTime aDesiredTime);
private:
MediaPipelineReceiveAudio *pipeline_; // Raw pointer to avoid cycles
SourceMediaStream *source_;
RefPtr<MediaSessionConduit> conduit_;
StreamTime played_;
};
friend class PipelineListener;
nsresult Init();
RefPtr<PipelineListener> listener_;
};
@ -314,22 +371,29 @@ class MediaPipelineReceiveAudio : public MediaPipelineReceive {
// rendering video.
class MediaPipelineReceiveVideo : public MediaPipelineReceive {
public:
MediaPipelineReceiveVideo(nsCOMPtr<nsIEventTarget> main_thread,
MediaPipelineReceiveVideo(const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
nsDOMMediaStream* stream,
MediaStream *stream,
RefPtr<VideoSessionConduit> conduit,
RefPtr<TransportFlow> rtp_transport,
RefPtr<TransportFlow> rtcp_transport) :
MediaPipelineReceive(main_thread, sts_thread,
MediaPipelineReceive(pc, main_thread, sts_thread,
stream, conduit, rtp_transport,
rtcp_transport),
renderer_(new PipelineRenderer(this)) {
Init();
}
~MediaPipelineReceiveVideo() {
// Called on the main thread.
virtual void DetachMediaStream() {
ASSERT_ON_THREAD(main_thread_);
conduit_ = nullptr; // Force synchronous destruction so we
// stop generating video.
stream_ = nullptr;
}
virtual nsresult Init();
private:
class PipelineRenderer : public VideoRenderer {
public:
@ -360,7 +424,6 @@ class MediaPipelineReceiveVideo : public MediaPipelineReceive {
};
friend class PipelineRenderer;
nsresult Init();
RefPtr<PipelineRenderer> renderer_;
};

View File

@ -31,7 +31,9 @@ nsresult PeerConnectionCtx::InitializeGlobal(nsIThread *mainThread) {
gMainThread = mainThread;
CSF::VcmSIPCCBinding::setMainThread(gMainThread);
} else {
#ifdef MOZILLA_INTERNAL_API
MOZ_ASSERT(gMainThread == mainThread);
#endif
}
nsresult res;

View File

@ -260,19 +260,7 @@ PeerConnectionImpl::MakeMediaStream(uint32_t aHint, nsIDOMMediaStream** aRetval)
}
nsresult
PeerConnectionImpl::MakeRemoteSource(nsDOMMediaStream* aStream, RemoteSourceStreamInfo** aInfo)
{
MOZ_ASSERT(aInfo);
MOZ_ASSERT(aStream);
// TODO(ekr@rtfm.com): Add the track info with the first segment
nsRefPtr<RemoteSourceStreamInfo> remote = new RemoteSourceStreamInfo(aStream);
NS_ADDREF(*aInfo = remote);
return NS_OK;
}
nsresult
PeerConnectionImpl::CreateRemoteSourceStreamInfo(uint32_t aHint, RemoteSourceStreamInfo** aInfo)
PeerConnectionImpl::CreateRemoteSourceStreamInfo(uint32_t aHint, nsRefPtr<RemoteSourceStreamInfo>* aInfo)
{
MOZ_ASSERT(aInfo);
PC_AUTO_ENTER_API_CALL_NO_CHECK();
@ -288,19 +276,8 @@ PeerConnectionImpl::CreateRemoteSourceStreamInfo(uint32_t aHint, RemoteSourceStr
static_cast<mozilla::SourceMediaStream*>(comstream->GetStream())->SetPullEnabled(true);
nsRefPtr<RemoteSourceStreamInfo> remote;
if (!mThread || NS_IsMainThread()) {
remote = new RemoteSourceStreamInfo(comstream);
NS_ADDREF(*aInfo = remote);
return NS_OK;
}
mThread->Dispatch(WrapRunnableNMRet(
&PeerConnectionImpl::MakeRemoteSource, comstream, aInfo, &res
), NS_DISPATCH_SYNC);
if (NS_FAILED(res)) {
return res;
}
remote = new RemoteSourceStreamInfo(comstream);
*aInfo = remote;
return NS_OK;
}

View File

@ -119,14 +119,14 @@ public:
static nsresult ConvertConstraints(
const JS::Value& aConstraints, MediaConstraints* aObj, JSContext* aCx);
static nsresult MakeMediaStream(uint32_t aHint, nsIDOMMediaStream** aStream);
static nsresult MakeRemoteSource(nsDOMMediaStream* aStream, RemoteSourceStreamInfo** aInfo);
Role GetRole() const {
PC_AUTO_ENTER_API_CALL_NO_CHECK();
return mRole;
}
nsresult CreateRemoteSourceStreamInfo(uint32_t aHint, RemoteSourceStreamInfo** aInfo);
nsresult CreateRemoteSourceStreamInfo(uint32_t aHint,
nsRefPtr<RemoteSourceStreamInfo>* aInfo);
// Implementation of the only observer we need
virtual void onCallEvent(

View File

@ -28,22 +28,6 @@ static const char* logTag = "PeerConnectionMedia";
static const mozilla::TrackID TRACK_AUDIO = 0;
static const mozilla::TrackID TRACK_VIDEO = 1;
/* We get this callback in order to find out which tracks are audio and which
* are video. We should get this callback right away for existing streams after
* we add this class as a listener.
*/
void
LocalSourceStreamInfo::NotifyQueuedTrackChanges(
mozilla::MediaStreamGraph* aGraph,
mozilla::TrackID aID,
mozilla::TrackRate aTrackRate,
mozilla::TrackTicks aTrackOffset,
uint32_t aTrackEvents,
const mozilla::MediaSegment& aQueuedMedia)
{
/* TODO: use this callback to keep track of changes to the MediaStream */
}
/* If the ExpectAudio hint is on we will add a track at the default first
* audio track ID (0)
* FIX - Do we need to iterate over the tracks instead of taking these hints?
@ -197,13 +181,6 @@ PeerConnectionMedia::AddStream(nsIDOMMediaStream* aMediaStream, uint32_t *stream
localSourceStream->ExpectVideo(TRACK_VIDEO);
}
// Make it the listener for info from the MediaStream and add it to the list
mozilla::MediaStream *plainMediaStream = stream->GetStream();
if (plainMediaStream) {
plainMediaStream->AddListener(localSourceStream);
}
mLocalSourceStreams.AppendElement(localSourceStream);
PR_Unlock(mLocalSourceStreamsLock);

View File

@ -157,7 +157,7 @@ class Fake_VideoGenerator {
};
#endif
class LocalSourceStreamInfo : public mozilla::MediaStreamListener {
class LocalSourceStreamInfo {
public:
LocalSourceStreamInfo(nsDOMMediaStream* aMediaStream)
: mMediaStream(aMediaStream) {}
@ -165,24 +165,6 @@ public:
mMediaStream = NULL;
}
/**
* Notify that changes to one of the stream tracks have been queued.
* aTrackEvents can be any combination of TRACK_EVENT_CREATED and
* TRACK_EVENT_ENDED. aQueuedMedia is the data being added to the track
* at aTrackOffset (relative to the start of the stream).
*/
virtual void NotifyQueuedTrackChanges(
mozilla::MediaStreamGraph* aGraph,
mozilla::TrackID aID,
mozilla::TrackRate aTrackRate,
mozilla::TrackTicks aTrackOffset,
uint32_t aTrackEvents,
const mozilla::MediaSegment& aQueuedMedia
);
virtual void NotifyPull(mozilla::MediaStreamGraph* aGraph,
mozilla::StreamTime aDesiredTime) {}
nsDOMMediaStream* GetMediaStream() {
return mMediaStream;
}
@ -194,18 +176,16 @@ public:
unsigned VideoTrackCount();
void Detach() {
// Disconnect my own listener
GetMediaStream()->GetStream()->RemoveListener(this);
// walk through all the MediaPipelines and disconnect them.
for (std::map<int, mozilla::RefPtr<mozilla::MediaPipeline> >::iterator it =
mPipelines.begin(); it != mPipelines.end();
++it) {
it->second->DetachMediaStream();
it->second->Shutdown();
}
mMediaStream = NULL;
}
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(LocalSourceStreamInfo)
private:
std::map<int, mozilla::RefPtr<mozilla::MediaPipeline> > mPipelines;
nsRefPtr<nsDOMMediaStream> mMediaStream;
@ -229,7 +209,7 @@ class RemoteSourceStreamInfo {
for (std::map<int, mozilla::RefPtr<mozilla::MediaPipeline> >::iterator it =
mPipelines.begin(); it != mPipelines.end();
++it) {
it->second->DetachMediaStream();
it->second->Shutdown();
}
mMediaStream = NULL;
}

View File

@ -96,6 +96,10 @@ class TestAgent {
audio_->GetStream()->Stop();
audio_flow_ = NULL;
video_flow_ = NULL;
if (audio_pipeline_)
audio_pipeline_->Shutdown();
if (video_pipeline_)
video_pipeline_->Shutdown();
audio_pipeline_ = NULL;
video_pipeline_ = NULL;
}
@ -137,9 +141,15 @@ class TestAgentSend : public TestAgent {
ConfigureSendMediaCodec(&audio_config_);
EXPECT_EQ(mozilla::kMediaConduitNoError, err);
audio_pipeline_ = new mozilla::MediaPipelineTransmit(NULL,
test_utils->sts_target(),
audio_, audio_conduit_, audio_flow_, NULL);
std::string test_pc("PC");
audio_pipeline_ = new mozilla::MediaPipelineTransmit(
test_pc,
NULL,
test_utils->sts_target(),
audio_->GetStream(), audio_conduit_, audio_flow_, NULL);
audio_pipeline_->Init();
// video_ = new Fake_nsDOMMediaStream(new Fake_VideoStreamSource());
// video_pipeline_ = new mozilla::MediaPipelineTransmit(video_, video_conduit_, &video_flow_, &video_flow_);
@ -170,11 +180,16 @@ class TestAgentReceive : public TestAgent {
ConfigureRecvMediaCodecs(codecs);
EXPECT_EQ(mozilla::kMediaConduitNoError, err);
audio_pipeline_ = new mozilla::MediaPipelineReceiveAudio(NULL,
test_utils->sts_target(),
audio_,
static_cast<mozilla::AudioSessionConduit *>(audio_conduit_.get()),
audio_flow_, NULL);
std::string test_pc("PC");
audio_pipeline_ = new mozilla::MediaPipelineReceiveAudio(
test_pc,
NULL,
test_utils->sts_target(),
audio_->GetStream(),
static_cast<mozilla::AudioSessionConduit *>(audio_conduit_.get()),
audio_flow_, NULL);
audio_pipeline_->Init();
}
private: