Bug 855623: Queue createDataChannel() calls made before onconnection and process later r=tuexen,ehugg

This commit is contained in:
Randell Jesup 2013-03-31 21:09:26 -04:00
parent 79ca89ab34
commit 0f2e621c7a
5 changed files with 105 additions and 61 deletions

View File

@ -639,6 +639,28 @@ PeerConnectionImpl::ConnectDataConnection(uint16_t aLocalport,
// Data channels won't work without a window, so in order for the C++ unit
// tests to work (it doesn't have a window available) we ifdef the following
// two implementations.
NS_IMETHODIMP
PeerConnectionImpl::EnsureDataConnection(uint16_t aNumstreams)
{
PC_AUTO_ENTER_API_CALL_NO_CHECK();
#ifdef MOZILLA_INTERNAL_API
if (mDataConnection) {
CSFLogDebug(logTag,"%s DataConnection already connected",__FUNCTION__);
// Ignore the request to connect when already connected. This entire
// implementation is temporary. Ignore aNumstreams as it's merely advisory
// and we increase the number of streams dynamically as needed.
return NS_OK;
}
mDataConnection = new mozilla::DataChannelConnection(this);
if (!mDataConnection->Init(5000, aNumstreams, true)) {
CSFLogError(logTag,"%s DataConnection Init Failed",__FUNCTION__);
return NS_ERROR_FAILURE;
}
#endif
return NS_OK;
}
nsresult
PeerConnectionImpl::InitializeDataChannel(uint16_t aLocalport,
uint16_t aRemoteport,
@ -647,25 +669,14 @@ PeerConnectionImpl::InitializeDataChannel(uint16_t aLocalport,
PC_AUTO_ENTER_API_CALL_NO_CHECK();
#ifdef MOZILLA_INTERNAL_API
if (mDataConnection) {
CSFLogError(logTag,"%s DataConnection already connected",__FUNCTION__);
// Ignore the request to connect when already connected. This entire
// implementation is temporary. Ignore aNumstreams as it's merely advisory
// and we increase the number of streams dynamically as needed.
return NS_OK;
}
// FIX! Temporary cheat to decide on even/odd
mDataConnection = new mozilla::DataChannelConnection(this, aLocalport > aRemoteport);
if (!mDataConnection->Init(aLocalport, aNumstreams, true)) {
CSFLogError(logTag,"%s DataConnection Init Failed",__FUNCTION__);
return NS_ERROR_FAILURE;
}
nsresult rv = EnsureDataConnection(aNumstreams);
// XXX Fix! Get the correct flow for DataChannel. Also error handling.
for (int i = 2; i >= 0; i--) {
nsRefPtr<TransportFlow> flow = mMedia->GetTransportFlow(i,false).get();
CSFLogDebug(logTag, "Transportflow[%d] = %p", i, flow.get());
if (flow) {
if (!mDataConnection->ConnectDTLS(flow, aLocalport, aRemoteport)) {
if (!mDataConnection->ConnectDTLS(flow, aLocalport, aRemoteport, aLocalport > aRemoteport)) {
return NS_ERROR_FAILURE;
}
break;
@ -673,7 +684,7 @@ PeerConnectionImpl::InitializeDataChannel(uint16_t aLocalport,
}
return NS_OK;
#else
return NS_ERROR_FAILURE;
return NS_ERROR_FAILURE;
#endif
}
@ -696,8 +707,9 @@ PeerConnectionImpl::CreateDataChannel(const nsACString& aLabel,
mozilla::DataChannelConnection::Type theType =
static_cast<mozilla::DataChannelConnection::Type>(aType);
if (!mDataConnection) {
return NS_ERROR_FAILURE;
nsresult rv = EnsureDataConnection(WEBRTC_DATACHANNEL_STREAMS_DEFAULT);
if (NS_FAILED(rv)) {
return rv;
}
dataChannel = mDataConnection->Open(
aLabel, aProtocol, theType, !outOfOrderAllowed,

View File

@ -259,6 +259,7 @@ private:
JSContext* aCx);
NS_IMETHODIMP CreateOfferInt(MediaConstraints& constraints);
NS_IMETHODIMP CreateAnswerInt(MediaConstraints& constraints);
NS_IMETHODIMP EnsureDataConnection(uint16_t aNumstreams);
nsresult CloseInt(bool aIsSynchronous);
void ChangeReadyState(ReadyState aReadyState);

View File

@ -175,11 +175,9 @@ debug_printf(const char *format, ...)
}
#endif
DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
bool aIsEven) :
DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
mLock("netwerk::sctp::DataChannelConnection")
{
mAllocateEven = aIsEven;
mState = CLOSED;
mSocket = nullptr;
mMasterSocket = nullptr;
@ -188,8 +186,7 @@ DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
mRemotePort = 0;
mDeferTimeout = 10;
mTimerRunning = false;
LOG(("Constructor DataChannelConnection=%p, listener=%p, %s", this, mListener.get(),
aIsEven ? "Even" : "Odd"));
LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
}
DataChannelConnection::~DataChannelConnection()
@ -488,9 +485,10 @@ DataChannelConnection::Notify(nsITimer *timer)
#ifdef MOZ_PEERCONNECTION
bool
DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport,
bool even)
{
LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
LOG(("Connect DTLS local %d, remote %d, %s", localport, remoteport, even ? "Even" : "Odd"));
NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectDTLS!");
NS_ENSURE_TRUE(aFlow, false);
@ -499,6 +497,7 @@ DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uin
mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
mLocalPort = localport;
mRemotePort = remoteport;
mAllocateEven = even;
mState = CONNECTING;
struct sockaddr_conn addr;
@ -539,6 +538,10 @@ DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uin
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CONNECTION,
this, true));
// Open any streams pending...
MutexAutoLock lock(mLock); // OpenFinish assumes this
ProcessQueuedOpens();
return true;
}
}
@ -549,6 +552,28 @@ DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uin
return false;
}
// Process any pending Opens
void
DataChannelConnection::ProcessQueuedOpens()
{
// Can't copy nsDeque's. Move into temp array since any that fail will
// go back to mPending
nsDeque temp;
DataChannel *temp_channel; // really already_AddRefed<>
while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
temp.Push(static_cast<void *>(temp_channel));
}
nsRefPtr<DataChannel> channel;
while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
OpenFinish(channel.forget()); // may reset the flag and re-push
}
}
}
void
DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
const unsigned char *data, size_t len)
@ -1235,6 +1260,10 @@ DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_chan
DataChannelOnMessageAvailable::ON_CONNECTION,
this, true));
LOG(("DTLS connect() succeeded! Entering connected mode"));
// Open any streams pending...
ProcessQueuedOpens();
} else if (mState == OPEN) {
LOG(("DataConnection Already OPEN"));
} else {
@ -1588,21 +1617,7 @@ DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_e
RequestMoreStreams(num_needed);
}
// Can't copy nsDeque's. Move into temp array since any that fail will
// go back to mPending
nsDeque temp;
DataChannel *temp_channel; // really already_AddRefed<>
while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
temp.Push(static_cast<void *>(temp_channel));
}
// Now assign our new streams
while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
OpenFinish(channel.forget()); // may reset the flag and re-push
}
}
ProcessQueuedOpens();
}
// else probably not a change in # of streams
}
@ -1774,30 +1789,37 @@ DataChannelConnection::OpenFinish(already_AddRefed<DataChannel> aChannel)
nsRefPtr<DataChannel> channel(aChannel);
uint16_t stream = channel->mStream;
if (stream == INVALID_STREAM) {
stream = FindFreeStream(); // may be INVALID_STREAM!
mLock.AssertCurrentThreadOwns();
mLock.AssertCurrentThreadOwns();
if (stream == INVALID_STREAM || mState != OPEN) {
if (stream == INVALID_STREAM) {
stream = FindFreeStream(); // may be INVALID_STREAM!
}
LOG(("Finishing open: channel %p, stream = %u", channel.get(), stream));
if (stream == INVALID_STREAM) {
if (!RequestMoreStreams()) {
channel->mState = CLOSED;
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
// We already returned the channel to the app.
NS_ERROR("Failed to request more streams");
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel));
return channel.forget();
if (stream == INVALID_STREAM || mState != OPEN) {
if (stream == INVALID_STREAM) {
if (!RequestMoreStreams()) {
channel->mState = CLOSED;
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
// We already returned the channel to the app.
NS_ERROR("Failed to request more streams");
NS_DispatchToMainThread(new DataChannelOnMessageAvailable(
DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
channel));
return channel.forget();
}
// we'll be destroying the channel, but it never really got set up
// Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
// Dispatch it to ourselves
return nullptr;
}
// we'll be destroying the channel, but it never really got set up
// Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
// Dispatch it to ourselves
return nullptr;
} else if (mState != OPEN) {
mStreams[stream] = channel;
}
LOG(("Queuing channel %p to finish open", channel.get()));
LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
// Also serves to mark we told the app
channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
channel->AddRef(); // we need a ref for the nsDeQue and one to return
@ -1806,7 +1828,12 @@ DataChannelConnection::OpenFinish(already_AddRefed<DataChannel> aChannel)
}
channel->mStream = stream;
}
mStreams[stream] = channel;
if (!mStreams[stream]) {
mStreams[stream] = channel;
} else {
// externally negotiated before connection
MOZ_ASSERT(mStreams[stream] == channel);
}
#ifdef TEST_QUEUED_DATA
// It's painful to write a test for this...

View File

@ -138,8 +138,7 @@ public:
virtual void NotifyDataChannel(already_AddRefed<DataChannel> channel) = 0;
};
DataChannelConnection(DataConnectionListener *listener,
bool aIsEven);
DataChannelConnection(DataConnectionListener *listener);
virtual ~DataChannelConnection();
bool Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls);
@ -153,7 +152,8 @@ public:
#ifdef SCTP_DTLS_SUPPORTED
// Connect using a TransportFlow (DTLS) channel
bool ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport);
bool ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport,
bool even);
#endif
typedef enum {
@ -237,6 +237,7 @@ private:
void StartDefer();
bool SendDeferredMessages();
void ProcessQueuedOpens();
void SendOutgoingStreamReset();
void ResetOutgoingStream(uint16_t streamOut);
void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,

View File

@ -16,6 +16,9 @@
#error "Unsupported compiler"
#endif
// Duplicated in fsm.def
#define WEBRTC_DATACHANNEL_STREAMS_DEFAULT 16
#define DATA_CHANNEL_PPID_CONTROL 50
#define DATA_CHANNEL_PPID_DOMSTRING 51
#define DATA_CHANNEL_PPID_BINARY 52