From b10a1e20053168b5be59fc82734fbe5d409767c0 Mon Sep 17 00:00:00 2001 From: Suhas Nandakumar Date: Thu, 27 Jun 2013 09:13:09 -0700 Subject: [PATCH] Bug 786307: Implement RTCP MUX in MediaPipeline r=ekr --- .../src/mediapipeline/MediaPipeline.cpp | 54 ++-- .../src/mediapipeline/MediaPipeline.h | 16 +- .../signaling/test/mediapipeline_unittest.cpp | 249 ++++++++++++------ 3 files changed, 223 insertions(+), 96 deletions(-) diff --git a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp index 3d397d68f54..80417657ca3 100644 --- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp +++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.cpp @@ -84,25 +84,28 @@ nsresult MediaPipeline::Init_s() { MOZ_MTLOG(ML_ERROR, "RTP transport is already in error state"); TransportFailed_s(rtp_transport_); return NS_ERROR_FAILURE; - } else { - if (!muxed_) { - rtcp_transport_->SignalStateChange.connect(this, - &MediaPipeline::StateChange); + } - if (rtcp_transport_->state() == TransportLayer::TS_OPEN) { - res = TransportReady_s(rtcp_transport_); - if (NS_FAILED(res)) { - MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res=" - << static_cast(res) << " in " << __FUNCTION__); - return res; - } - } else if (rtcp_transport_->state() == TransportLayer::TS_ERROR) { - MOZ_MTLOG(ML_ERROR, "RTCP transport is already in error state"); - TransportFailed_s(rtcp_transport_); - return NS_ERROR_FAILURE; + // If rtcp_transport_ is the same as rtp_transport_ then we are muxing. + // Otherwise, set it up separately. + if (rtcp_transport_ != rtp_transport_) { + rtcp_transport_->SignalStateChange.connect(this, + &MediaPipeline::StateChange); + + if (rtcp_transport_->state() == TransportLayer::TS_OPEN) { + res = TransportReady_s(rtcp_transport_); + if (NS_FAILED(res)) { + MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res=" + << static_cast(res) << " in " << __FUNCTION__); + return res; } + } else if (rtcp_transport_->state() == TransportLayer::TS_ERROR) { + MOZ_MTLOG(ML_ERROR, "RTCP transport is already in error state"); + TransportFailed_s(rtcp_transport_); + return NS_ERROR_FAILURE; } } + return NS_OK; } @@ -121,6 +124,12 @@ void MediaPipeline::ShutdownTransport_s() { } void MediaPipeline::StateChange(TransportFlow *flow, TransportLayer::State state) { + // If rtcp_transport_ is the same as rtp_transport_ then we are muxing. + // So the only flow should be the RTP flow. + if (rtcp_transport_ == rtp_transport_) { + MOZ_ASSERT(flow == rtp_transport_); + } + if (state == TransportLayer::TS_OPEN) { MOZ_MTLOG(ML_DEBUG, "Flow is ready"); TransportReady_s(flow); @@ -216,7 +225,8 @@ nsresult MediaPipeline::TransportReady_s(TransportFlow *flow) { } // Start listening - if (muxed_) { + // If rtcp_transport_ is the same as rtp_transport_ then we are muxing + if (rtcp_transport_ == rtp_transport_) { MOZ_ASSERT(!rtcp_send_srtp_ && !rtcp_recv_srtp_); rtcp_send_srtp_ = rtp_send_srtp_; rtcp_recv_srtp_ = rtp_recv_srtp_; @@ -227,6 +237,7 @@ nsresult MediaPipeline::TransportReady_s(TransportFlow *flow) { dtls->downward()->SignalPacketReceived.connect(this, &MediaPipeline:: PacketReceived); + rtcp_state_ = MP_OPEN; } else { MOZ_MTLOG(ML_DEBUG, "Listening for RTP packets received on " << static_cast(dtls->downward())); @@ -269,6 +280,13 @@ nsresult MediaPipeline::TransportFailed_s(TransportFlow *flow) { *state = MP_CLOSED; + // If rtcp_transport_ is the same as rtp_transport_ then we are muxing + if(rtcp_transport_ == rtp_transport_) { + MOZ_ASSERT(state != &rtcp_state_); + rtcp_state_ = MP_CLOSED; + } + + MOZ_MTLOG(ML_DEBUG, "Transport closed for flow " << (rtcp ? "rtcp" : "rtp")); NS_WARNING( @@ -353,7 +371,7 @@ void MediaPipeline::RtpPacketReceived(TransportLayer *layer, const unsigned char *data, size_t len) { if (!transport_->pipeline()) { - MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; transport disconnected"); + MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport disconnected"); return; } @@ -363,7 +381,7 @@ void MediaPipeline::RtpPacketReceived(TransportLayer *layer, } if (rtp_state_ != MP_OPEN) { - MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; pipeline not open"); + MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; pipeline not open"); return; } diff --git a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h index 52c3a312a90..639a94cffe6 100644 --- a/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h +++ b/media/webrtc/signaling/src/mediapipeline/MediaPipeline.h @@ -92,9 +92,16 @@ class MediaPipeline : public sigslot::has_slots<> { rtcp_packets_sent_(0), rtp_packets_received_(0), rtcp_packets_received_(0), - muxed_((rtcp_transport_ == NULL) || (rtp_transport_ == rtcp_transport_)), pc_(pc), description_() { + // To indicate rtcp-mux rtcp_transport should be NULL. + // Therefore it's an error to send in the same flow for + // both rtp and rtcp. + MOZ_ASSERT(rtp_transport_ != rtcp_transport_); + + if (!rtcp_transport_) { + rtcp_transport_ = rtp_transport; + } } virtual ~MediaPipeline(); @@ -116,9 +123,9 @@ class MediaPipeline : public sigslot::has_slots<> { virtual Direction direction() const { return direction_; } int rtp_packets_sent() const { return rtp_packets_sent_; } - int rtcp_packets_sent() const { return rtp_packets_sent_; } + int rtcp_packets_sent() const { return rtcp_packets_sent_; } int rtp_packets_received() const { return rtp_packets_received_; } - int rtcp_packets_received() const { return rtp_packets_received_; } + int rtcp_packets_received() const { return rtcp_packets_received_; } MediaSessionConduit *Conduit() { return conduit_; } @@ -209,7 +216,6 @@ class MediaPipeline : public sigslot::has_slots<> { int rtcp_packets_received_; // Written on Init. Read on STS thread. - bool muxed_; std::string pc_; std::string description_; @@ -290,6 +296,7 @@ private: // and transmitting to the network. class MediaPipelineTransmit : public MediaPipeline { public: + // Set rtcp_transport to NULL to use rtcp-mux MediaPipelineTransmit(const std::string& pc, nsCOMPtr main_thread, nsCOMPtr sts_thread, @@ -384,6 +391,7 @@ class MediaPipelineTransmit : public MediaPipeline { // rendering video. class MediaPipelineReceive : public MediaPipeline { public: + // Set rtcp_transport to NULL to use rtcp-mux MediaPipelineReceive(const std::string& pc, nsCOMPtr main_thread, nsCOMPtr sts_thread, diff --git a/media/webrtc/signaling/test/mediapipeline_unittest.cpp b/media/webrtc/signaling/test/mediapipeline_unittest.cpp index a8fa75d9725..e8c8dfc46ce 100644 --- a/media/webrtc/signaling/test/mediapipeline_unittest.cpp +++ b/media/webrtc/signaling/test/mediapipeline_unittest.cpp @@ -42,47 +42,94 @@ MOZ_MTLOG_MODULE("mediapipeline") MtransportTestUtils *test_utils; namespace { -class TestAgent { + +class TransportInfo { public: - TestAgent() : - audio_flow_(new TransportFlow()), - audio_prsock_(new TransportLayerPrsock()), - audio_dtls_(new TransportLayerDtls()), - audio_config_(109, "opus", 48000, 960, 2, 64000), - audio_conduit_(mozilla::AudioSessionConduit::Create(NULL)), - audio_(), - audio_pipeline_(), - video_flow_(new TransportFlow()), - video_prsock_(new TransportLayerPrsock()), - video_config_(120, "VP8"), - video_conduit_(mozilla::VideoSessionConduit::Create()), - video_(), - video_pipeline_() { - } + TransportInfo() : + flow_(NULL), + prsock_(NULL), + dtls_(NULL) {} - void ConnectSocket(PRFileDesc *fd, bool client) { + void Init(bool client) { nsresult res; - res = audio_prsock_->Init(); + + flow_ = new TransportFlow(); + prsock_ = new TransportLayerPrsock(); + dtls_ = new TransportLayerDtls(); + + res = prsock_->Init(); + if (res != NS_OK) { + FreeLayers(); + } ASSERT_EQ((nsresult)NS_OK, res); - mozilla::SyncRunnable::DispatchToThread( - test_utils->sts_target(), - WrapRunnable(audio_prsock_, &TransportLayerPrsock::Import, fd, &res)); - - ASSERT_TRUE(NS_SUCCEEDED(res)); - - ASSERT_EQ((nsresult)NS_OK, audio_flow_->PushLayer(audio_prsock_)); - std::vector ciphers; ciphers.push_back(SRTP_AES128_CM_HMAC_SHA1_80); - audio_dtls_->SetSrtpCiphers(ciphers); - audio_dtls_->SetIdentity(DtlsIdentity::Generate()); - audio_dtls_->SetRole(client ? TransportLayerDtls::CLIENT : - TransportLayerDtls::SERVER); - audio_flow_->PushLayer(audio_dtls_); + dtls_->SetSrtpCiphers(ciphers); + dtls_->SetIdentity(DtlsIdentity::Generate()); + dtls_->SetRole(client ? TransportLayerDtls::CLIENT : + TransportLayerDtls::SERVER); + dtls_->SetVerificationAllowAll(); } - virtual void CreatePipelines_s() = 0; + void PushLayers() { + nsresult res; + + nsAutoPtr > layers( + new std::queue); + layers->push(prsock_); + layers->push(dtls_); + res = flow_->PushLayers(layers); + if (res != NS_OK) { + FreeLayers(); + } + ASSERT_EQ((nsresult)NS_OK, res); + } + + // Free the memory allocated at the beginning of Init + // if failure occurs before layers setup. + void FreeLayers() { + delete prsock_; + delete dtls_; + } + + void Stop() { + flow_ = NULL; + } + + mozilla::RefPtr flow_; + TransportLayerPrsock *prsock_; + TransportLayerDtls *dtls_; +}; + +class TestAgent { + public: + TestAgent() : + audio_config_(109, "opus", 48000, 960, 2, 64000), + audio_conduit_(mozilla::AudioSessionConduit::Create(NULL)), + audio_(), + audio_pipeline_() { + } + + void ConnectSocket(PRFileDesc *fd, bool client, bool isRtcp) { + nsresult res; + TransportInfo *transport = isRtcp ? + &audio_rtcp_transport_ : &audio_rtp_transport_; + + transport->Init(client); + + mozilla::SyncRunnable::DispatchToThread( + test_utils->sts_target(), + WrapRunnable(transport->prsock_, &TransportLayerPrsock::Import, fd, &res)); + if (!NS_SUCCEEDED(res)) { + transport->FreeLayers(); + } + ASSERT_TRUE(NS_SUCCEEDED(res)); + + transport->PushLayers(); + } + + virtual void CreatePipelines_s(bool aIsRtcpMux) = 0; void Start() { nsresult ret; @@ -98,12 +145,10 @@ class TestAgent { void StopInt() { audio_->GetStream()->Stop(); - audio_flow_ = NULL; - video_flow_ = NULL; + audio_rtp_transport_.Stop(); + audio_rtcp_transport_.Stop(); if (audio_pipeline_) audio_pipeline_->ShutdownTransport_s(); - if (video_pipeline_) - video_pipeline_->ShutdownTransport_s(); } void Stop() { @@ -111,39 +156,28 @@ class TestAgent { if (audio_pipeline_) audio_pipeline_->ShutdownMedia_m(); - if (video_pipeline_) - video_pipeline_->ShutdownMedia_m(); mozilla::SyncRunnable::DispatchToThread( test_utils->sts_target(), WrapRunnable(this, &TestAgent::StopInt)); audio_pipeline_ = NULL; - video_pipeline_ = NULL; PR_Sleep(1000); // Deal with race condition } - protected: - mozilla::RefPtr audio_flow_; - TransportLayerPrsock *audio_prsock_; - TransportLayerDtls *audio_dtls_; mozilla::AudioCodecConfig audio_config_; mozilla::RefPtr audio_conduit_; nsRefPtr audio_; mozilla::RefPtr audio_pipeline_; - mozilla::RefPtr video_flow_; - TransportLayerPrsock *video_prsock_; - mozilla::VideoCodecConfig video_config_; - mozilla::RefPtr video_conduit_; - nsRefPtr video_; - mozilla::RefPtr video_pipeline_; + TransportInfo audio_rtp_transport_; + TransportInfo audio_rtcp_transport_; }; class TestAgentSend : public TestAgent { public: - virtual void CreatePipelines_s() { + virtual void CreatePipelines_s(bool aIsRtcpMux) { audio_ = new Fake_DOMMediaStream(new Fake_AudioStreamSource()); mozilla::MediaConduitErrorCode err = @@ -153,16 +187,29 @@ class TestAgentSend : public TestAgent { std::string test_pc("PC"); + if (aIsRtcpMux) { + ASSERT_FALSE(audio_rtcp_transport_.flow_); + } + audio_pipeline_ = new mozilla::MediaPipelineTransmit( test_pc, NULL, test_utils->sts_target(), - audio_->GetStream(), 1, audio_conduit_, audio_flow_, NULL); + audio_->GetStream(), + 1, + audio_conduit_, + audio_rtp_transport_.flow_, + audio_rtcp_transport_.flow_); audio_pipeline_->Init(); + } -// video_ = new Fake_DOMMediaStream(new Fake_VideoStreamSource()); -// video_pipeline_ = new mozilla::MediaPipelineTransmit(video_, video_conduit_, &video_flow_, &video_flow_); + int GetAudioRtpCount() { + return audio_pipeline_->rtp_packets_sent(); + } + + int GetAudioRtcpCount() { + return audio_pipeline_->rtcp_packets_received(); } private: @@ -171,7 +218,7 @@ class TestAgentSend : public TestAgent { class TestAgentReceive : public TestAgent { public: - virtual void CreatePipelines_s() { + virtual void CreatePipelines_s(bool aIsRtcpMux) { mozilla::SourceMediaStream *audio = new Fake_SourceMediaStream(); audio->SetPullEnabled(true); @@ -190,17 +237,30 @@ class TestAgentReceive : public TestAgent { EXPECT_EQ(mozilla::kMediaConduitNoError, err); std::string test_pc("PC"); + + if (aIsRtcpMux) { + ASSERT_FALSE(audio_rtcp_transport_.flow_); + } + audio_pipeline_ = new mozilla::MediaPipelineReceiveAudio( test_pc, NULL, test_utils->sts_target(), audio_->GetStream(), 1, static_cast(audio_conduit_.get()), - audio_flow_, NULL); + audio_rtp_transport_.flow_, audio_rtcp_transport_.flow_); audio_pipeline_->Init(); } + int GetAudioRtpCount() { + return audio_pipeline_->rtp_packets_received(); + } + + int GetAudioRtcpCount() { + return audio_pipeline_->rtcp_packets_sent(); + } + private: }; @@ -208,44 +268,85 @@ class TestAgentReceive : public TestAgent { class MediaPipelineTest : public ::testing::Test { public: MediaPipelineTest() : p1_() { - fds_[0] = fds_[1] = NULL; + rtp_fds_[0] = rtp_fds_[1] = NULL; + rtcp_fds_[0] = rtcp_fds_[1] = NULL; } - void SetUp() { - PRStatus status = PR_NewTCPSocketPair(fds_); + // Setup transport. + void InitTransports(bool aIsRtcpMux) { + // Create RTP related transport. + PRStatus status = PR_NewTCPSocketPair(rtp_fds_); ASSERT_EQ(status, PR_SUCCESS); + // RTP, DTLS server mozilla::SyncRunnable::DispatchToThread( test_utils->sts_target(), - WrapRunnable(&p1_, &TestAgent::ConnectSocket, fds_[0], false)); + WrapRunnable(&p1_, &TestAgent::ConnectSocket, rtp_fds_[0], false, false)); + // RTP, DTLS client mozilla::SyncRunnable::DispatchToThread( test_utils->sts_target(), - WrapRunnable(&p2_, &TestAgent::ConnectSocket, fds_[1], false)); + WrapRunnable(&p2_, &TestAgent::ConnectSocket, rtp_fds_[1], true, false)); - mozilla::SyncRunnable::DispatchToThread( - test_utils->sts_target(), - WrapRunnable(&p1_, &TestAgent::CreatePipelines_s)); + // Create RTCP flows separately if we are not muxing them. + if(!aIsRtcpMux) { + status = PR_NewTCPSocketPair(rtcp_fds_); + ASSERT_EQ(status, PR_SUCCESS); - mozilla::SyncRunnable::DispatchToThread( - test_utils->sts_target(), - WrapRunnable(&p2_, &TestAgent::CreatePipelines_s)); + // RTCP, DTLS server + mozilla::SyncRunnable::DispatchToThread( + test_utils->sts_target(), + WrapRunnable(&p1_, &TestAgent::ConnectSocket, rtcp_fds_[0], false, true)); + + // RTCP, DTLS client + mozilla::SyncRunnable::DispatchToThread( + test_utils->sts_target(), + WrapRunnable(&p2_, &TestAgent::ConnectSocket, rtcp_fds_[1], true, true)); + } } - protected: - PRFileDesc *fds_[2]; + // Verify RTP and RTCP + void TestAudioSend(bool aIsRtcpMux) { + // Setup transport flows + InitTransports(aIsRtcpMux); + + mozilla::SyncRunnable::DispatchToThread( + test_utils->sts_target(), + WrapRunnable(&p1_, &TestAgent::CreatePipelines_s, aIsRtcpMux)); + + mozilla::SyncRunnable::DispatchToThread( + test_utils->sts_target(), + WrapRunnable(&p2_, &TestAgent::CreatePipelines_s, aIsRtcpMux)); + + p2_.Start(); + p1_.Start(); + + // wait for some RTP/RTCP tx and rx to happen + PR_Sleep(10000); + + ASSERT_GE(p1_.GetAudioRtpCount(), 40); + ASSERT_GE(p2_.GetAudioRtpCount(), 40); + ASSERT_GE(p1_.GetAudioRtcpCount(), 1); + ASSERT_GE(p2_.GetAudioRtcpCount(), 1); + + p1_.Stop(); + p2_.Stop(); + } + +protected: + PRFileDesc *rtp_fds_[2]; + PRFileDesc *rtcp_fds_[2]; TestAgentSend p1_; TestAgentReceive p2_; }; -TEST_F(MediaPipelineTest, AudioSend) { - p2_.Start(); - p1_.Start(); - PR_Sleep(1000); - p1_.Stop(); - p2_.Stop(); +TEST_F(MediaPipelineTest, TestAudioSendNoMux) { + TestAudioSend(false); } +TEST_F(MediaPipelineTest, TestAudioSendMux) { + TestAudioSend(true); +} } // end namespace