Bug 786307: Implement RTCP MUX in MediaPipeline r=ekr

This commit is contained in:
Suhas Nandakumar 2013-06-27 09:13:09 -07:00
parent fd3dc3173f
commit a3f331a8fe
3 changed files with 223 additions and 96 deletions

View File

@ -84,25 +84,28 @@ nsresult MediaPipeline::Init_s() {
MOZ_MTLOG(ML_ERROR, "RTP transport is already in error state");
TransportFailed_s(rtp_transport_);
return NS_ERROR_FAILURE;
} else {
if (!muxed_) {
rtcp_transport_->SignalStateChange.connect(this,
&MediaPipeline::StateChange);
}
if (rtcp_transport_->state() == TransportLayer::TS_OPEN) {
res = TransportReady_s(rtcp_transport_);
if (NS_FAILED(res)) {
MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res="
<< static_cast<uint32_t>(res) << " in " << __FUNCTION__);
return res;
}
} else if (rtcp_transport_->state() == TransportLayer::TS_ERROR) {
MOZ_MTLOG(ML_ERROR, "RTCP transport is already in error state");
TransportFailed_s(rtcp_transport_);
return NS_ERROR_FAILURE;
// If rtcp_transport_ is the same as rtp_transport_ then we are muxing.
// Otherwise, set it up separately.
if (rtcp_transport_ != rtp_transport_) {
rtcp_transport_->SignalStateChange.connect(this,
&MediaPipeline::StateChange);
if (rtcp_transport_->state() == TransportLayer::TS_OPEN) {
res = TransportReady_s(rtcp_transport_);
if (NS_FAILED(res)) {
MOZ_MTLOG(ML_ERROR, "Error calling TransportReady(); res="
<< static_cast<uint32_t>(res) << " in " << __FUNCTION__);
return res;
}
} else if (rtcp_transport_->state() == TransportLayer::TS_ERROR) {
MOZ_MTLOG(ML_ERROR, "RTCP transport is already in error state");
TransportFailed_s(rtcp_transport_);
return NS_ERROR_FAILURE;
}
}
return NS_OK;
}
@ -121,6 +124,12 @@ void MediaPipeline::ShutdownTransport_s() {
}
void MediaPipeline::StateChange(TransportFlow *flow, TransportLayer::State state) {
// If rtcp_transport_ is the same as rtp_transport_ then we are muxing.
// So the only flow should be the RTP flow.
if (rtcp_transport_ == rtp_transport_) {
MOZ_ASSERT(flow == rtp_transport_);
}
if (state == TransportLayer::TS_OPEN) {
MOZ_MTLOG(ML_DEBUG, "Flow is ready");
TransportReady_s(flow);
@ -216,7 +225,8 @@ nsresult MediaPipeline::TransportReady_s(TransportFlow *flow) {
}
// Start listening
if (muxed_) {
// If rtcp_transport_ is the same as rtp_transport_ then we are muxing
if (rtcp_transport_ == rtp_transport_) {
MOZ_ASSERT(!rtcp_send_srtp_ && !rtcp_recv_srtp_);
rtcp_send_srtp_ = rtp_send_srtp_;
rtcp_recv_srtp_ = rtp_recv_srtp_;
@ -227,6 +237,7 @@ nsresult MediaPipeline::TransportReady_s(TransportFlow *flow) {
dtls->downward()->SignalPacketReceived.connect(this,
&MediaPipeline::
PacketReceived);
rtcp_state_ = MP_OPEN;
} else {
MOZ_MTLOG(ML_DEBUG, "Listening for RTP packets received on " <<
static_cast<void *>(dtls->downward()));
@ -269,6 +280,13 @@ nsresult MediaPipeline::TransportFailed_s(TransportFlow *flow) {
*state = MP_CLOSED;
// If rtcp_transport_ is the same as rtp_transport_ then we are muxing
if(rtcp_transport_ == rtp_transport_) {
MOZ_ASSERT(state != &rtcp_state_);
rtcp_state_ = MP_CLOSED;
}
MOZ_MTLOG(ML_DEBUG, "Transport closed for flow " << (rtcp ? "rtcp" : "rtp"));
NS_WARNING(
@ -353,7 +371,7 @@ void MediaPipeline::RtpPacketReceived(TransportLayer *layer,
const unsigned char *data,
size_t len) {
if (!transport_->pipeline()) {
MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; transport disconnected");
MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; transport disconnected");
return;
}
@ -363,7 +381,7 @@ void MediaPipeline::RtpPacketReceived(TransportLayer *layer,
}
if (rtp_state_ != MP_OPEN) {
MOZ_MTLOG(ML_DEBUG, "Discarding incoming packet; pipeline not open");
MOZ_MTLOG(ML_ERROR, "Discarding incoming packet; pipeline not open");
return;
}

View File

@ -92,9 +92,16 @@ class MediaPipeline : public sigslot::has_slots<> {
rtcp_packets_sent_(0),
rtp_packets_received_(0),
rtcp_packets_received_(0),
muxed_((rtcp_transport_ == NULL) || (rtp_transport_ == rtcp_transport_)),
pc_(pc),
description_() {
// To indicate rtcp-mux rtcp_transport should be NULL.
// Therefore it's an error to send in the same flow for
// both rtp and rtcp.
MOZ_ASSERT(rtp_transport_ != rtcp_transport_);
if (!rtcp_transport_) {
rtcp_transport_ = rtp_transport;
}
}
virtual ~MediaPipeline();
@ -116,9 +123,9 @@ class MediaPipeline : public sigslot::has_slots<> {
virtual Direction direction() const { return direction_; }
int rtp_packets_sent() const { return rtp_packets_sent_; }
int rtcp_packets_sent() const { return rtp_packets_sent_; }
int rtcp_packets_sent() const { return rtcp_packets_sent_; }
int rtp_packets_received() const { return rtp_packets_received_; }
int rtcp_packets_received() const { return rtp_packets_received_; }
int rtcp_packets_received() const { return rtcp_packets_received_; }
MediaSessionConduit *Conduit() { return conduit_; }
@ -209,7 +216,6 @@ class MediaPipeline : public sigslot::has_slots<> {
int rtcp_packets_received_;
// Written on Init. Read on STS thread.
bool muxed_;
std::string pc_;
std::string description_;
@ -290,6 +296,7 @@ private:
// and transmitting to the network.
class MediaPipelineTransmit : public MediaPipeline {
public:
// Set rtcp_transport to NULL to use rtcp-mux
MediaPipelineTransmit(const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,
@ -384,6 +391,7 @@ class MediaPipelineTransmit : public MediaPipeline {
// rendering video.
class MediaPipelineReceive : public MediaPipeline {
public:
// Set rtcp_transport to NULL to use rtcp-mux
MediaPipelineReceive(const std::string& pc,
nsCOMPtr<nsIEventTarget> main_thread,
nsCOMPtr<nsIEventTarget> sts_thread,

View File

@ -42,47 +42,94 @@ MOZ_MTLOG_MODULE("mediapipeline")
MtransportTestUtils *test_utils;
namespace {
class TestAgent {
class TransportInfo {
public:
TestAgent() :
audio_flow_(new TransportFlow()),
audio_prsock_(new TransportLayerPrsock()),
audio_dtls_(new TransportLayerDtls()),
audio_config_(109, "opus", 48000, 960, 2, 64000),
audio_conduit_(mozilla::AudioSessionConduit::Create(NULL)),
audio_(),
audio_pipeline_(),
video_flow_(new TransportFlow()),
video_prsock_(new TransportLayerPrsock()),
video_config_(120, "VP8"),
video_conduit_(mozilla::VideoSessionConduit::Create()),
video_(),
video_pipeline_() {
}
TransportInfo() :
flow_(NULL),
prsock_(NULL),
dtls_(NULL) {}
void ConnectSocket(PRFileDesc *fd, bool client) {
void Init(bool client) {
nsresult res;
res = audio_prsock_->Init();
flow_ = new TransportFlow();
prsock_ = new TransportLayerPrsock();
dtls_ = new TransportLayerDtls();
res = prsock_->Init();
if (res != NS_OK) {
FreeLayers();
}
ASSERT_EQ((nsresult)NS_OK, res);
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(audio_prsock_, &TransportLayerPrsock::Import, fd, &res));
ASSERT_TRUE(NS_SUCCEEDED(res));
ASSERT_EQ((nsresult)NS_OK, audio_flow_->PushLayer(audio_prsock_));
std::vector<uint16_t> ciphers;
ciphers.push_back(SRTP_AES128_CM_HMAC_SHA1_80);
audio_dtls_->SetSrtpCiphers(ciphers);
audio_dtls_->SetIdentity(DtlsIdentity::Generate());
audio_dtls_->SetRole(client ? TransportLayerDtls::CLIENT :
TransportLayerDtls::SERVER);
audio_flow_->PushLayer(audio_dtls_);
dtls_->SetSrtpCiphers(ciphers);
dtls_->SetIdentity(DtlsIdentity::Generate());
dtls_->SetRole(client ? TransportLayerDtls::CLIENT :
TransportLayerDtls::SERVER);
dtls_->SetVerificationAllowAll();
}
virtual void CreatePipelines_s() = 0;
void PushLayers() {
nsresult res;
nsAutoPtr<std::queue<TransportLayer *> > layers(
new std::queue<TransportLayer *>);
layers->push(prsock_);
layers->push(dtls_);
res = flow_->PushLayers(layers);
if (res != NS_OK) {
FreeLayers();
}
ASSERT_EQ((nsresult)NS_OK, res);
}
// Free the memory allocated at the beginning of Init
// if failure occurs before layers setup.
void FreeLayers() {
delete prsock_;
delete dtls_;
}
void Stop() {
flow_ = NULL;
}
mozilla::RefPtr<TransportFlow> flow_;
TransportLayerPrsock *prsock_;
TransportLayerDtls *dtls_;
};
class TestAgent {
public:
TestAgent() :
audio_config_(109, "opus", 48000, 960, 2, 64000),
audio_conduit_(mozilla::AudioSessionConduit::Create(NULL)),
audio_(),
audio_pipeline_() {
}
void ConnectSocket(PRFileDesc *fd, bool client, bool isRtcp) {
nsresult res;
TransportInfo *transport = isRtcp ?
&audio_rtcp_transport_ : &audio_rtp_transport_;
transport->Init(client);
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(transport->prsock_, &TransportLayerPrsock::Import, fd, &res));
if (!NS_SUCCEEDED(res)) {
transport->FreeLayers();
}
ASSERT_TRUE(NS_SUCCEEDED(res));
transport->PushLayers();
}
virtual void CreatePipelines_s(bool aIsRtcpMux) = 0;
void Start() {
nsresult ret;
@ -98,12 +145,10 @@ class TestAgent {
void StopInt() {
audio_->GetStream()->Stop();
audio_flow_ = NULL;
video_flow_ = NULL;
audio_rtp_transport_.Stop();
audio_rtcp_transport_.Stop();
if (audio_pipeline_)
audio_pipeline_->ShutdownTransport_s();
if (video_pipeline_)
video_pipeline_->ShutdownTransport_s();
}
void Stop() {
@ -111,39 +156,28 @@ class TestAgent {
if (audio_pipeline_)
audio_pipeline_->ShutdownMedia_m();
if (video_pipeline_)
video_pipeline_->ShutdownMedia_m();
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(this, &TestAgent::StopInt));
audio_pipeline_ = NULL;
video_pipeline_ = NULL;
PR_Sleep(1000); // Deal with race condition
}
protected:
mozilla::RefPtr<TransportFlow> audio_flow_;
TransportLayerPrsock *audio_prsock_;
TransportLayerDtls *audio_dtls_;
mozilla::AudioCodecConfig audio_config_;
mozilla::RefPtr<mozilla::MediaSessionConduit> audio_conduit_;
nsRefPtr<DOMMediaStream> audio_;
mozilla::RefPtr<mozilla::MediaPipeline> audio_pipeline_;
mozilla::RefPtr<TransportFlow> video_flow_;
TransportLayerPrsock *video_prsock_;
mozilla::VideoCodecConfig video_config_;
mozilla::RefPtr<mozilla::MediaSessionConduit> video_conduit_;
nsRefPtr<DOMMediaStream> video_;
mozilla::RefPtr<mozilla::MediaPipeline> video_pipeline_;
TransportInfo audio_rtp_transport_;
TransportInfo audio_rtcp_transport_;
};
class TestAgentSend : public TestAgent {
public:
virtual void CreatePipelines_s() {
virtual void CreatePipelines_s(bool aIsRtcpMux) {
audio_ = new Fake_DOMMediaStream(new Fake_AudioStreamSource());
mozilla::MediaConduitErrorCode err =
@ -153,16 +187,29 @@ class TestAgentSend : public TestAgent {
std::string test_pc("PC");
if (aIsRtcpMux) {
ASSERT_FALSE(audio_rtcp_transport_.flow_);
}
audio_pipeline_ = new mozilla::MediaPipelineTransmit(
test_pc,
NULL,
test_utils->sts_target(),
audio_->GetStream(), 1, audio_conduit_, audio_flow_, NULL);
audio_->GetStream(),
1,
audio_conduit_,
audio_rtp_transport_.flow_,
audio_rtcp_transport_.flow_);
audio_pipeline_->Init();
}
// video_ = new Fake_DOMMediaStream(new Fake_VideoStreamSource());
// video_pipeline_ = new mozilla::MediaPipelineTransmit(video_, video_conduit_, &video_flow_, &video_flow_);
int GetAudioRtpCount() {
return audio_pipeline_->rtp_packets_sent();
}
int GetAudioRtcpCount() {
return audio_pipeline_->rtcp_packets_received();
}
private:
@ -171,7 +218,7 @@ class TestAgentSend : public TestAgent {
class TestAgentReceive : public TestAgent {
public:
virtual void CreatePipelines_s() {
virtual void CreatePipelines_s(bool aIsRtcpMux) {
mozilla::SourceMediaStream *audio = new Fake_SourceMediaStream();
audio->SetPullEnabled(true);
@ -190,17 +237,30 @@ class TestAgentReceive : public TestAgent {
EXPECT_EQ(mozilla::kMediaConduitNoError, err);
std::string test_pc("PC");
if (aIsRtcpMux) {
ASSERT_FALSE(audio_rtcp_transport_.flow_);
}
audio_pipeline_ = new mozilla::MediaPipelineReceiveAudio(
test_pc,
NULL,
test_utils->sts_target(),
audio_->GetStream(), 1,
static_cast<mozilla::AudioSessionConduit *>(audio_conduit_.get()),
audio_flow_, NULL);
audio_rtp_transport_.flow_, audio_rtcp_transport_.flow_);
audio_pipeline_->Init();
}
int GetAudioRtpCount() {
return audio_pipeline_->rtp_packets_received();
}
int GetAudioRtcpCount() {
return audio_pipeline_->rtcp_packets_sent();
}
private:
};
@ -208,44 +268,85 @@ class TestAgentReceive : public TestAgent {
class MediaPipelineTest : public ::testing::Test {
public:
MediaPipelineTest() : p1_() {
fds_[0] = fds_[1] = NULL;
rtp_fds_[0] = rtp_fds_[1] = NULL;
rtcp_fds_[0] = rtcp_fds_[1] = NULL;
}
void SetUp() {
PRStatus status = PR_NewTCPSocketPair(fds_);
// Setup transport.
void InitTransports(bool aIsRtcpMux) {
// Create RTP related transport.
PRStatus status = PR_NewTCPSocketPair(rtp_fds_);
ASSERT_EQ(status, PR_SUCCESS);
// RTP, DTLS server
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(&p1_, &TestAgent::ConnectSocket, fds_[0], false));
WrapRunnable(&p1_, &TestAgent::ConnectSocket, rtp_fds_[0], false, false));
// RTP, DTLS client
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(&p2_, &TestAgent::ConnectSocket, fds_[1], false));
WrapRunnable(&p2_, &TestAgent::ConnectSocket, rtp_fds_[1], true, false));
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(&p1_, &TestAgent::CreatePipelines_s));
// Create RTCP flows separately if we are not muxing them.
if(!aIsRtcpMux) {
status = PR_NewTCPSocketPair(rtcp_fds_);
ASSERT_EQ(status, PR_SUCCESS);
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(&p2_, &TestAgent::CreatePipelines_s));
// RTCP, DTLS server
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(&p1_, &TestAgent::ConnectSocket, rtcp_fds_[0], false, true));
// RTCP, DTLS client
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(&p2_, &TestAgent::ConnectSocket, rtcp_fds_[1], true, true));
}
}
protected:
PRFileDesc *fds_[2];
// Verify RTP and RTCP
void TestAudioSend(bool aIsRtcpMux) {
// Setup transport flows
InitTransports(aIsRtcpMux);
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(&p1_, &TestAgent::CreatePipelines_s, aIsRtcpMux));
mozilla::SyncRunnable::DispatchToThread(
test_utils->sts_target(),
WrapRunnable(&p2_, &TestAgent::CreatePipelines_s, aIsRtcpMux));
p2_.Start();
p1_.Start();
// wait for some RTP/RTCP tx and rx to happen
PR_Sleep(10000);
ASSERT_GE(p1_.GetAudioRtpCount(), 40);
ASSERT_GE(p2_.GetAudioRtpCount(), 40);
ASSERT_GE(p1_.GetAudioRtcpCount(), 1);
ASSERT_GE(p2_.GetAudioRtcpCount(), 1);
p1_.Stop();
p2_.Stop();
}
protected:
PRFileDesc *rtp_fds_[2];
PRFileDesc *rtcp_fds_[2];
TestAgentSend p1_;
TestAgentReceive p2_;
};
TEST_F(MediaPipelineTest, AudioSend) {
p2_.Start();
p1_.Start();
PR_Sleep(1000);
p1_.Stop();
p2_.Stop();
TEST_F(MediaPipelineTest, TestAudioSendNoMux) {
TestAudioSend(false);
}
TEST_F(MediaPipelineTest, TestAudioSendMux) {
TestAudioSend(true);
}
} // end namespace