From 3864eb3c22b46bdffe603d7bd80fd77a7fa5cebc Mon Sep 17 00:00:00 2001 From: Randell Jesup Date: Wed, 3 Oct 2012 19:51:23 -0400 Subject: [PATCH] Bug 729512: DataChannel protocol rollup r=mcmanus --- netwerk/Makefile.in | 1 + netwerk/build/Makefile.in | 1 + netwerk/sctp/datachannel/DataChannel.cpp | 1973 +++++++++++++++++ netwerk/sctp/datachannel/DataChannel.h | 456 ++++ netwerk/sctp/datachannel/DataChannelLog.h | 28 + .../sctp/datachannel/DataChannelProtocol.h | 81 + netwerk/sctp/datachannel/Makefile.in | 69 + 7 files changed, 2609 insertions(+) create mode 100644 netwerk/sctp/datachannel/DataChannel.cpp create mode 100644 netwerk/sctp/datachannel/DataChannel.h create mode 100644 netwerk/sctp/datachannel/DataChannelLog.h create mode 100644 netwerk/sctp/datachannel/DataChannelProtocol.h create mode 100644 netwerk/sctp/datachannel/Makefile.in diff --git a/netwerk/Makefile.in b/netwerk/Makefile.in index df67cdc79a4..a79c1ebb4b9 100644 --- a/netwerk/Makefile.in +++ b/netwerk/Makefile.in @@ -34,6 +34,7 @@ endif ifdef MOZ_SCTP PARALLEL_DIRS += \ sctp/src \ + sctp/datachannel \ $(NULL) endif diff --git a/netwerk/build/Makefile.in b/netwerk/build/Makefile.in index 163c9274d5d..0b7f8385359 100644 --- a/netwerk/build/Makefile.in +++ b/netwerk/build/Makefile.in @@ -45,6 +45,7 @@ endif ifdef MOZ_SCTP SHARED_LIBRARY_LIBS += \ ../sctp/src/$(LIB_PREFIX)nksctp_s.$(LIB_SUFFIX) \ + ../sctp/datachannel/$(LIB_PREFIX)nkdatachan_s.$(LIB_SUFFIX) \ $(NULL) endif diff --git a/netwerk/sctp/datachannel/DataChannel.cpp b/netwerk/sctp/datachannel/DataChannel.cpp new file mode 100644 index 00000000000..88d50e3188d --- /dev/null +++ b/netwerk/sctp/datachannel/DataChannel.cpp @@ -0,0 +1,1973 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include +#include +#include +#include +#if !defined(__Userspace_os_Windows) +#include +#endif + +#define SCTP_DEBUG 1 +#define SCTP_STDINT_INCLUDE "mozilla/StandardInteger.h" +#include "usrsctp.h" + +#include "DataChannelLog.h" + +#include "nsServiceManagerUtils.h" +#include "nsIObserverService.h" +#include "nsIObserver.h" +#include "mozilla/Services.h" +#include "nsThreadUtils.h" +#include "nsAutoPtr.h" +#include "nsNetUtil.h" +#ifdef MOZ_PEERCONNECTION +#include "mtransport/runnable_utils.h" +#endif +#include "DataChannel.h" +#include "DataChannelProtocol.h" + +#ifdef PR_LOGGING +PRLogModuleInfo* dataChannelLog = PR_NewLogModule("DataChannel"); +#endif + +static bool sctp_initialized; + +namespace mozilla { + +class DataChannelShutdown; +nsCOMPtr gDataChannelShutdown; + +class DataChannelShutdown : public nsIObserver +{ +public: + // This needs to be tied to some form object that is guaranteed to be + // around (singleton likely) unless we want to shutdown sctp whenever + // we're not using it (and in which case we'd keep a refcnt'd object + // ref'd by each DataChannelConnection to release the SCTP usrlib via + // sctp_finish) + + NS_DECL_ISUPPORTS + + DataChannelShutdown() {} + + void Init() + { + nsCOMPtr observerService = + mozilla::services::GetObserverService(); + if (!observerService) + return; + + nsresult rv = observerService->AddObserver(this, + "profile-change-net-teardown", + false); + MOZ_ASSERT(rv == NS_OK); + (void) rv; + } + + virtual ~DataChannelShutdown() + { + nsCOMPtr observerService = + mozilla::services::GetObserverService(); + if (observerService) + observerService->RemoveObserver(this, "profile-change-net-teardown"); + } + + NS_IMETHODIMP Observe(nsISupports* aSubject, const char* aTopic, + const PRUnichar* aData) { + if (strcmp(aTopic, "profile-change-net-teardown") == 0) { + LOG(("Shutting down SCTP")); + if (sctp_initialized) { + usrsctp_finish(); + sctp_initialized = false; + } + } + return NS_OK; + } +}; + +NS_IMPL_ISUPPORTS1(DataChannelShutdown, nsIObserver); + + +BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data, + uint32_t length) : mLength(length) +{ + mSpa = new sctp_sendv_spa; + *mSpa = spa; + char *tmp = new char[length]; // infallible malloc! + memcpy(tmp, data, length); + mData = tmp; +} + +BufferedMsg::~BufferedMsg() +{ + delete mSpa; + delete mData; +} + +static int +receive_cb(struct socket* sock, union sctp_sockstore addr, + void *data, size_t datalen, + struct sctp_rcvinfo rcv, int flags, void *ulp_info) +{ + DataChannelConnection *connection = static_cast(ulp_info); + return connection->ReceiveCallback(sock, data, datalen, rcv, flags); +} + +DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) : + mLock("netwerk::sctp::DataChannel") +{ + mState = CLOSED; + mSocket = nullptr; + mMasterSocket = nullptr; + mListener = listener; + mLocalPort = 0; + mRemotePort = 0; + mDeferTimeout = 10; + mTimerRunning = false; + LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener)); +} + +DataChannelConnection::~DataChannelConnection() +{ + // XXX Move CloseAll() to a Destroy() call + // Though it's probably ok to do this and close the sockets; + // if we really want it to do true clean shutdowns it can + // create a dependant Internal object that would remain around + // until the network shut down the association or timed out. + CloseAll(); + if (mSocket && mSocket != mMasterSocket) + usrsctp_close(mSocket); + if (mMasterSocket) + usrsctp_close(mMasterSocket); +} + +NS_IMPL_THREADSAFE_ISUPPORTS1(DataChannelConnection, + nsITimerCallback) + +bool +DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls) +{ + struct sctp_initmsg initmsg; + struct sctp_udpencaps encaps; + struct sctp_assoc_value av; + struct sctp_event event; + socklen_t len; + + uint16_t event_types[] = {SCTP_ASSOC_CHANGE, + SCTP_PEER_ADDR_CHANGE, + SCTP_REMOTE_ERROR, + SCTP_SHUTDOWN_EVENT, + SCTP_ADAPTATION_INDICATION, + SCTP_SEND_FAILED_EVENT, + SCTP_STREAM_RESET_EVENT, + SCTP_STREAM_CHANGE_EVENT}; + { + MOZ_ASSERT(NS_IsMainThread()); + + // MutexAutoLock lock(mLock); Not needed since we're on mainthread always + if (!sctp_initialized) { + if (aUsingDtls) { + LOG(("sctp_init(DTLS)")); +#ifdef MOZ_PEERCONNECTION + usrsctp_init(0, DataChannelConnection::SctpDtlsOutput); +#else + NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport"); +#endif + } else { + LOG(("sctp_init(%d)", aPort)); + usrsctp_init(aPort, nullptr); + } + + usrsctp_sysctl_set_sctp_debug_on(0 /* SCTP_DEBUG_ALL */); + usrsctp_sysctl_set_sctp_blackhole(2); + sctp_initialized = true; + + gDataChannelShutdown = new DataChannelShutdown(); + gDataChannelShutdown->Init(); + } + } + + // Open sctp association across tunnel + if ((mMasterSocket = usrsctp_socket( + aUsingDtls ? AF_CONN : AF_INET, + SOCK_STREAM, IPPROTO_SCTP, receive_cb, nullptr, 0, this)) == nullptr) { + return false; + } + + if (!aUsingDtls) { + memset(&encaps, 0, sizeof(encaps)); + encaps.sue_address.ss_family = AF_INET; + encaps.sue_port = htons(aPort); + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT, + (const void*)&encaps, + (socklen_t)sizeof(struct sctp_udpencaps)) < 0) { + LOG(("*** failed encaps errno %d", errno)); + goto error_cleanup; + } + LOG(("SCTP encapsulation local port %d", aPort)); + } + + av.assoc_id = SCTP_ALL_ASSOC; + av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, + (socklen_t)sizeof(struct sctp_assoc_value)) < 0) { + LOG(("*** failed enable stream reset errno %d", errno)); + goto error_cleanup; + } + + /* Enable the events of interest. */ + memset(&event, 0, sizeof(event)); + event.se_assoc_id = SCTP_ALL_ASSOC; + event.se_on = 1; + for (uint32_t i = 0; i < sizeof(event_types)/sizeof(event_types[0]); ++i) { + event.se_type = event_types[i]; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) { + LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno)); + goto error_cleanup; + } + } + + // Update number of streams + mStreamsOut.AppendElements(aNumStreams); + mStreamsIn.AppendElements(aNumStreams); // make sure both are the same length + for (uint32_t i = 0; i < aNumStreams; ++i) { + mStreamsOut[i] = nullptr; + mStreamsIn[i] = nullptr; + } + memset(&initmsg, 0, sizeof(initmsg)); + len = sizeof(initmsg); + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) { + LOG(("*** failed getsockopt SCTP_INITMSG")); + goto error_cleanup; + } + LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams, + initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams)); + initmsg.sinit_num_ostreams = aNumStreams; + initmsg.sinit_max_instreams = MAX_NUM_STREAMS; + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, + (socklen_t)sizeof(initmsg)) < 0) { + LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno)); + goto error_cleanup; + } + + mSocket = nullptr; + return true; + +error_cleanup: + usrsctp_close(mMasterSocket); + mMasterSocket = nullptr; + return false; +} + +void +DataChannelConnection::StartDefer() +{ + nsresult rv; + if (!NS_IsMainThread()) { + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::START_DEFER, + this, nullptr)); + return; + } + + MOZ_ASSERT(NS_IsMainThread()); + if (!mDeferredTimer) { + mDeferredTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); + MOZ_ASSERT(mDeferredTimer); + } + + if (!mTimerRunning) { + rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, + nsITimer::TYPE_ONE_SHOT); + NS_ENSURE_TRUE(rv == NS_OK, /* */); + + mTimerRunning = true; + } +} + +// nsITimerCallback + +NS_IMETHODIMP +DataChannelConnection::Notify(nsITimer *timer) +{ + MOZ_ASSERT(NS_IsMainThread()); + LOG(("%s: %p [%p] (%dms), sending deferred messages", __FUNCTION__, this, timer, mDeferTimeout)); + + if (timer == mDeferredTimer) { + if (SendDeferredMessages()) { + // Still blocked + // we don't need a lock, since this must be main thread... + nsresult rv = mDeferredTimer->InitWithCallback(this, mDeferTimeout, + nsITimer::TYPE_ONE_SHOT); + if (NS_FAILED(rv)) { + LOG(("%s: cannot initialize open timer", __FUNCTION__)); + // XXX and do....? + return rv; + } + mTimerRunning = true; + } else { + LOG(("Turned off deferred send timer")); + mTimerRunning = false; + } + } + return NS_OK; +} + +#ifdef MOZ_PEERCONNECTION +bool +DataChannelConnection::ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport) +{ + LOG(("Connect DTLS local %d, remote %d", localport, remoteport)); + + NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectDTLS!"); + NS_ENSURE_TRUE(aFlow, false); + + mTransportFlow = aFlow; + mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::PacketReceived); + mLocalPort = localport; + mRemotePort = remoteport; + + PR_CreateThread( + PR_SYSTEM_THREAD, + DataChannelConnection::DTLSConnectThread, this, + PR_PRIORITY_NORMAL, + PR_GLOBAL_THREAD, + PR_JOINABLE_THREAD, 0 + ); + + return true; // not finished yet +} + +/* static */ +void +DataChannelConnection::DTLSConnectThread(void *data) +{ + DataChannelConnection *_this = static_cast(data); + struct sockaddr_conn addr; + + memset(&addr, 0, sizeof(addr)); + addr.sconn_family = AF_CONN; +#if !defined(__Userspace_os_Linux) && !defined(__Userspace_os_Windows) + addr.sconn_len = sizeof(addr); +#endif + addr.sconn_port = htons(_this->mLocalPort); + + int r = usrsctp_bind(_this->mMasterSocket, reinterpret_cast(&addr), + sizeof(addr)); + if (r < 0) { + LOG(("usrsctp_bind failed: %d", r)); + return; + } + + // This is the remote addr + addr.sconn_port = htons(_this->mRemotePort); + addr.sconn_addr = static_cast(_this); + r = usrsctp_connect(_this->mMasterSocket, reinterpret_cast(&addr), + sizeof(addr)); + if (r < 0) { + LOG(("usrsctp_connect failed: %d", r)); + return; + } + + // Notify Connection open + LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, _this)); + _this->mSocket = _this->mMasterSocket; + _this->mState = OPEN; + LOG(("DTLS connect() succeeded! Entering connected mode")); + + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CONNECTION, + _this, nullptr)); +} + +void +DataChannelConnection::PacketReceived(TransportFlow *flow, + const unsigned char *data, size_t len) +{ + //LOG(("%p: SCTP/DTLS received %ld bytes", this, len)); + + // Pass the data to SCTP + usrsctp_conninput(static_cast(this), data, len, 0); +} + +// XXX Merge with SctpDtlsOutput? +int +DataChannelConnection::SendPacket(const unsigned char *data, size_t len) +{ + //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len)); + return mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0; +} + +/* static */ +int +DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length, + uint8_t tos, uint8_t set_df) +{ + DataChannelConnection *peer = static_cast(addr); + + return peer->SendPacket(static_cast(buffer), length); +} +#endif + +// listen for incoming associations +// Blocks! - Don't call this from main thread! +bool +DataChannelConnection::Listen(unsigned short port) +{ + struct sockaddr_in addr; + socklen_t addr_len; + + NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); + + /* Acting as the 'server' */ + memset((void *)&addr, 0, sizeof(addr)); +#ifdef HAVE_SIN_LEN + addr.sin_len = sizeof(struct sockaddr_in); +#endif + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + LOG(("Waiting for connections on port %d", ntohs(addr.sin_port))); + mState = CONNECTING; + if (usrsctp_bind(mMasterSocket, reinterpret_cast(&addr), sizeof(struct sockaddr_in)) < 0) { + LOG(("***Failed userspace_bind")); + return false; + } + if (usrsctp_listen(mMasterSocket, 1) < 0) { + LOG(("***Failed userspace_listen")); + return false; + } + + LOG(("Accepting connection")); + addr_len = 0; + if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) { + LOG(("***Failed accept")); + return false; + } + mState = OPEN; + + // Notify Connection open + // XXX We need to make sure connection sticks around until the message is delivered + LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CONNECTION, + this, nullptr)); + return true; +} + +// Blocks! - Don't call this from main thread! +bool +DataChannelConnection::Connect(const char *addr, unsigned short port) +{ + struct sockaddr_in addr4; + struct sockaddr_in6 addr6; + + NS_WARN_IF_FALSE(!NS_IsMainThread(), "Blocks, do not call from main thread!!!"); + + /* Acting as the connector */ + LOG(("Connecting to %s, port %u", addr, port)); + memset((void *)&addr4, 0, sizeof(struct sockaddr_in)); + memset((void *)&addr6, 0, sizeof(struct sockaddr_in6)); +#ifdef HAVE_SIN_LEN + addr4.sin_len = sizeof(struct sockaddr_in); +#endif +#ifdef HAVE_SIN6_LEN + addr6.sin6_len = sizeof(struct sockaddr_in6); +#endif + addr4.sin_family = AF_INET; + addr6.sin6_family = AF_INET6; + addr4.sin_port = htons(port); + addr6.sin6_port = htons(port); + mState = CONNECTING; + +#if !defined(__Userspace_os_Windows) + if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) { + if (usrsctp_connect(mMasterSocket, reinterpret_cast(&addr6), sizeof(struct sockaddr_in6)) < 0) { + LOG(("*** Failed userspace_connect")); + return false; + } + } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) { + if (usrsctp_connect(mMasterSocket, reinterpret_cast(&addr4), sizeof(struct sockaddr_in)) < 0) { + LOG(("*** Failed userspace_connect")); + return false; + } + } else { + LOG(("*** Illegal destination address.")); + } +#else + { + struct sockaddr_storage ss; + int sslen = sizeof(ss); + + if (!WSAStringToAddressA(const_cast(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) { + addr6.sin6_addr = (reinterpret_cast(&ss))->sin6_addr; + if (usrsctp_connect(mMasterSocket, reinterpret_cast(&addr6), sizeof(struct sockaddr_in6)) < 0) { + LOG(("*** Failed userspace_connect")); + return false; + } + } else if (!WSAStringToAddressA(const_cast(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) { + addr4.sin_addr = (reinterpret_cast(&ss))->sin_addr; + if (usrsctp_connect(mMasterSocket, reinterpret_cast(&addr4), sizeof(struct sockaddr_in)) < 0) { + LOG(("*** Failed userspace_connect")); + return false; + } + } else { + LOG(("*** Illegal destination address.")); + } + } +#endif + + mSocket = mMasterSocket; + + LOG(("connect() succeeded! Entering connected mode")); + mState = OPEN; + + // Notify Connection open + // XXX We need to make sure connection sticks around until the message is delivered + LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this)); + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CONNECTION, + this, nullptr)); + return true; +} + +DataChannel * +DataChannelConnection::FindChannelByStreamIn(uint16_t streamIn) +{ + // Auto-extend mStreamsIn as needed + if (((uint32_t) streamIn) + 1 > mStreamsIn.Length()) { + uint32_t old_len = mStreamsIn.Length(); + LOG(("Extending mStreamsIn[] to %d elements", ((int32_t) streamIn)+1)); + mStreamsIn.AppendElements((streamIn+1) - mStreamsIn.Length()); + for (uint32_t i = old_len; i < mStreamsIn.Length(); ++i) + mStreamsIn[i] = nullptr; + } + // Should always be safe in practice + return mStreamsIn.SafeElementAt(streamIn); +} + +DataChannel * +DataChannelConnection::FindChannelByStreamOut(uint16_t streamOut) +{ + return mStreamsOut.SafeElementAt(streamOut); +} + +uint16_t +DataChannelConnection::FindFreeStreamOut() +{ + uint32_t i, limit; + + limit = mStreamsOut.Length(); + if (limit > MAX_NUM_STREAMS) + limit = MAX_NUM_STREAMS; + for (i = 0; i < limit; ++i) { + if (!mStreamsOut[i]) { + break; + } + } + if (i == limit) { + return INVALID_STREAM; + } + return i; +} + +bool +DataChannelConnection::RequestMoreStreamsOut(int32_t aNeeded) +{ + struct sctp_status status; + struct sctp_add_streams sas; + uint32_t outStreamsNeeded; + socklen_t len; + + if (aNeeded + mStreamsOut.Length() > MAX_NUM_STREAMS) + aNeeded = MAX_NUM_STREAMS - mStreamsOut.Length(); + if (aNeeded <= 0) + return false; + + len = (socklen_t)sizeof(struct sctp_status); + if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) { + LOG(("***failed: getsockopt SCTP_STATUS")); + return false; + } + outStreamsNeeded = aNeeded; // number to add + + memset(&sas, 0, sizeof(struct sctp_add_streams)); + sas.sas_instrms = 0; + sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */ + // Doesn't block, we get an event when it succeeds or fails + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, + (socklen_t) sizeof(struct sctp_add_streams)) < 0) { + if (errno == EALREADY) + return true; + + LOG(("***failed: setsockopt ADD errno=%d", errno)); + return false; + } + LOG(("Requested %u more streams", outStreamsNeeded)); + return true; +} + +int32_t +DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t streamOut) +{ + struct sctp_sndinfo sndinfo; + + // Note: Main-thread IO, but doesn't block + memset(&sndinfo, 0, sizeof(struct sctp_sndinfo)); + sndinfo.snd_sid = streamOut; + sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL); + if (usrsctp_sendv(mSocket, msg, len, nullptr, 0, + &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo), + SCTP_SENDV_SNDINFO, 0) < 0) { + //LOG(("***failed: sctp_sendv")); don't log because errno is a return! + return (0); + } + return (1); +} + +int32_t +DataChannelConnection::SendOpenResponseMessage(uint16_t streamOut, uint16_t streamIn) +{ + struct rtcweb_datachannel_open_response rsp; + + memset(&rsp, 0, sizeof(struct rtcweb_datachannel_open_response)); + rsp.msg_type = DATA_CHANNEL_OPEN_RESPONSE; + rsp.reverse_stream = htons(streamIn); + + return SendControlMessage(&rsp, sizeof(rsp), streamOut); +} + + +int32_t +DataChannelConnection::SendOpenAckMessage(uint16_t streamOut) +{ + struct rtcweb_datachannel_ack ack; + + memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack)); + ack.msg_type = DATA_CHANNEL_ACK; + + return SendControlMessage(&ack, sizeof(ack), streamOut); +} + +int32_t +DataChannelConnection::SendOpenRequestMessage(const nsACString& label, + uint16_t streamOut, bool unordered, + uint16_t prPolicy, uint32_t prValue) +{ + int len = label.Length(); // not including nul + struct rtcweb_datachannel_open_request *req = + (struct rtcweb_datachannel_open_request*) moz_xmalloc(sizeof(*req)+len); + // careful - ok because request includes 1 char label + + memset(req, 0, sizeof(struct rtcweb_datachannel_open_request)); + req->msg_type = DATA_CHANNEL_OPEN_REQUEST; + switch (prPolicy) { + case SCTP_PR_SCTP_NONE: + req->channel_type = DATA_CHANNEL_RELIABLE; + break; + case SCTP_PR_SCTP_TTL: + req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED; + break; + case SCTP_PR_SCTP_RTX: + req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT; + break; + default: + // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno! + moz_free(req); + return (0); + } + req->flags = htons(0); + if (unordered) { + req->flags |= htons(DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED); + } + req->reliability_params = htons((uint16_t)prValue); /* XXX Why 16-bit */ + req->priority = htons(0); /* XXX: add support */ + strcpy(&req->label[0], PromiseFlatCString(label).get()); + + int32_t result = SendControlMessage(req, sizeof(*req)+len, streamOut); + + moz_free(req); + return result; +} + +// XXX This should use a separate thread (outbound queue) which should +// select() to know when to *try* to send data to the socket again. +// Alternatively, it can use a timeout, but that's guaranteed to be wrong +// (just not sure in what direction). We could re-implement NSPR's +// PR_POLL_WRITE/etc handling... with a lot of work. + +// returns if we're still blocked or not +bool +DataChannelConnection::SendDeferredMessages() +{ + uint32_t i; + DataChannel *channel; + bool still_blocked = false; + bool sent = false; + + // This may block while something is modifying channels, but should not block for IO + MutexAutoLock lock(mLock); + + // XXX For total fairness, on a still_blocked we'd start next time at the + // same index. Sorry, not going to bother for now. + for (i = 0; i < mStreamsOut.Length(); ++i) { + channel = mStreamsOut[i]; + if (!channel) + continue; + + // Only one of these should be set.... + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) { + if (SendOpenRequestMessage(channel->mLabel, channel->mStreamOut, + channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED, + channel->mPrPolicy, channel->mPrValue)) { + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ; + sent = true; + } else { + if (errno == EAGAIN) { + still_blocked = true; + } else { + // Close the channel, inform the user + mStreamsOut[channel->mStreamOut] = nullptr; + channel->mState = CLOSED; + // Don't need to reset; we didn't open it + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, + channel)); + } + } + } + if (still_blocked) + break; + + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_RSP) { + if (SendOpenResponseMessage(channel->mStreamOut, channel->mStreamIn)) { + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_RSP; + sent = true; + } else { + if (errno == EAGAIN) { + still_blocked = true; + } else { + // Close the channel + // Don't need to reset; we didn't open it + // The other side may be left with a hanging Open. Our inability to + // send the open response means we can't easily tell them about it + // We haven't informed the user/DOM of the creation yet, so just + // delete the channel. + mStreamsIn[channel->mStreamIn] = nullptr; + mStreamsOut[channel->mStreamOut] = nullptr; + delete channel; + } + } + } + if (still_blocked) + break; + + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) { + if (SendOpenAckMessage(channel->mStreamOut)) { + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK; + sent = true; + } else { + if (errno == EAGAIN) { + still_blocked = true; + } else { + // Close the channel, inform the user + Close(channel->mStreamOut); + } + } + } + if (still_blocked) + break; + + if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) { + bool failed_send = false; + int32_t result; + + if (channel->mState == CLOSED || channel->mState == CLOSING) { + channel->mBufferedData.Clear(); + } + while (!channel->mBufferedData.IsEmpty() && + !failed_send) { + struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa; + const char *data = channel->mBufferedData[0]->mData; + uint32_t len = channel->mBufferedData[0]->mLength; + + // SCTP will return EMSGSIZE if the message is bigger than the buffer + // size (or EAGAIN if there isn't space) + if ((result = usrsctp_sendv(mSocket, data, len, + nullptr, 0, + (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa), + SCTP_SENDV_SPA, + spa->sendv_sndinfo.snd_flags) < 0)) { + if (errno == EAGAIN) { + // leave queued for resend + failed_send = true; + LOG(("queue full again when resending %d bytes (%d)", len, result)); + } else { + LOG(("error %d re-sending string", errno)); + failed_send = true; + } + } else { + LOG(("Resent buffer of %d bytes (%d)", len, result)); + sent = true; + channel->mBufferedData.RemoveElementAt(0); + } + } + if (channel->mBufferedData.IsEmpty()) + channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA; + else + still_blocked = true; + } + if (still_blocked) + break; + } + + if (!still_blocked) { + // mDeferTimeout becomes an estimate of how long we need to wait next time we block + return false; + } + // adjust time? More time for next wait if we didn't send anything, less if did + // Pretty crude, but better than nothing; just to keep CPU use down + if (!sent && mDeferTimeout < 50) + mDeferTimeout++; + else if (sent && mDeferTimeout > 10) + mDeferTimeout--; + + return true; +} + +void +DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, + size_t length, + uint16_t streamIn) +{ + DataChannel *channel; + uint32_t prValue; + uint16_t prPolicy; + uint32_t flags; + nsCString label(nsDependentCString(req->label)); + + mLock.AssertCurrentThreadOwns(); + + if ((channel = FindChannelByStreamIn(streamIn))) { + LOG(("ERROR: HandleOpenRequestMessage: channel for stream %d is in state %d instead of CLOSED.", + streamIn, channel->mState)); + /* XXX: some error handling */ + return; + } + switch (req->channel_type) { + case DATA_CHANNEL_RELIABLE: + prPolicy = SCTP_PR_SCTP_NONE; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: + prPolicy = SCTP_PR_SCTP_RTX; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: + prPolicy = SCTP_PR_SCTP_TTL; + break; + default: + /* XXX error handling */ + return; + } + prValue = ntohs(req->reliability_params); + flags = ntohs(req->flags) & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED; + channel = new DataChannel(this, INVALID_STREAM, streamIn, + DataChannel::CONNECTING, + label, + prPolicy, prValue, + flags, + nullptr, nullptr); + mStreamsIn[streamIn] = channel; + + OpenResponseFinish(channel); +} + +void +DataChannelConnection::OpenResponseFinish(DataChannel *channel) +{ + uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM! + + mLock.AssertCurrentThreadOwns(); + + LOG(("Finished response: channel %p, streamOut = %u", channel, streamOut)); + + if (streamOut == INVALID_STREAM) { + if (!RequestMoreStreamsOut()) { + /* XXX: Signal error to the other end. */ + mStreamsIn[channel->mStreamIn] = nullptr; + // we can do this with the lock held because mStreamOut is INVALID_STREAM, + // so there's no outbound channel to reset + delete channel; + return; + } + LOG(("Queuing channel %p to finish response", channel)); + channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_RSP; + mPending.Push(channel); + // can't notify the user until we can send an OpenResponse + } else { + channel->mStreamOut = streamOut; + mStreamsOut[streamOut] = channel; + if (SendOpenResponseMessage(streamOut, channel->mStreamIn)) { + LOG(("successful incoming open of '%s' in: %u, out: %u\n", + channel->mLabel.get(), channel->mStreamIn, streamOut)); + + /* Notify ondatachannel */ + // XXX We need to make sure connection sticks around until the message is delivered + LOG(("%s: sending ON_CHANNEL_CREATED for %p", __FUNCTION__, channel)); + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, + this, channel)); + } else { + if (errno == EAGAIN) { + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_RSP; + StartDefer(); + } else { + /* XXX: Signal error to the other end. */ + mStreamsIn[channel->mStreamIn] = nullptr; + mStreamsOut[streamOut] = nullptr; + channel->mStreamOut = INVALID_STREAM; + // we can do this with the lock held because mStreamOut is INVALID_STREAM, + // so there's no outbound channel to reset (we failed to send on it) + delete channel; + return; // paranoia against future changes since we unlocked + } + } + } +} + + +void +DataChannelConnection::HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp, + size_t length, uint16_t streamIn) +{ + uint16_t streamOut; + DataChannel *channel; + + mLock.AssertCurrentThreadOwns(); + + streamOut = ntohs(rsp->reverse_stream); + channel = FindChannelByStreamOut(streamOut); + + NS_ENSURE_TRUE(channel != nullptr, /* */); + NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */); + + if (rsp->error) { + LOG(("%s: error in response to open of channel %d (%s)", + __FUNCTION__, streamOut, channel->mLabel.get())); + + } else { + NS_ENSURE_TRUE(!FindChannelByStreamIn(streamIn), /* */); + + channel->mStreamIn = streamIn; + channel->mState = OPEN; + channel->mReady = true; + mStreamsIn[streamIn] = channel; + if (SendOpenAckMessage(streamOut)) { + channel->mFlags = 0; + } else { + // XXX Only on EAGAIN!? And if not, then close the channel?? + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK; + StartDefer(); + } + LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel)); + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, + channel)); + } +} + +void +DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, + size_t length, uint16_t streamIn) +{ + DataChannel *channel; + + mLock.AssertCurrentThreadOwns(); + + channel = FindChannelByStreamIn(streamIn); + + NS_ENSURE_TRUE(channel != nullptr, /* */); + NS_ENSURE_TRUE(channel->mState == CONNECTING, /* */); + + channel->mState = channel->mReady ? DataChannel::OPEN : DataChannel::WAITING_TO_OPEN; + if (channel->mState == OPEN) { + LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel)); + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, + channel)); + } else { + LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel)); + } +} + +void +DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t streamIn) +{ + /* XXX: Send an error message? */ + LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, streamIn)); + // XXX Log to JS error console if possible +} + +void +DataChannelConnection::HandleDataMessage(uint32_t ppid, + const void *data, size_t length, + uint16_t streamIn) +{ + DataChannel *channel; + const char *buffer = (const char *) data; + + mLock.AssertCurrentThreadOwns(); + + channel = FindChannelByStreamIn(streamIn); + + // XXX A closed channel may trip this... check + NS_ENSURE_TRUE(channel != nullptr, /* */); + NS_ENSURE_TRUE(channel->mState != CONNECTING, /* */); + + // XXX should this be a simple if, no warnings/debugbreaks? + NS_ENSURE_TRUE(channel->mState != CLOSED, /* */); + + { + nsAutoCString recvData(buffer, length); + + switch (ppid) { + case DATA_CHANNEL_PPID_DOMSTRING: + LOG(("DataChannel: String message received of length %lu on channel %d: %.*s", + length, channel->mStreamOut, (int)PR_MIN(length, 80), buffer)); + length = -1; // Flag for DOMString + + // WebSockets checks IsUTF8() here; we can try to deliver it + + NS_WARN_IF_FALSE(channel->mBinaryBuffer.IsEmpty(), "Binary message aborted by text message!"); + if (!channel->mBinaryBuffer.IsEmpty()) + channel->mBinaryBuffer.Truncate(0); + break; + + case DATA_CHANNEL_PPID_BINARY: + channel->mBinaryBuffer += recvData; + LOG(("DataChannel: Received binary message of length %lu (total %u) on channel id %d", + length, channel->mBinaryBuffer.Length(), channel->mStreamOut)); + return; // Not ready to notify application + + case DATA_CHANNEL_PPID_BINARY_LAST: + LOG(("DataChannel: Received binary message of length %lu on channel id %d", + length, channel->mStreamOut)); + if (!channel->mBinaryBuffer.IsEmpty()) { + channel->mBinaryBuffer += recvData; + LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel)); + SendOrQueue(channel, + new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DATA, this, + channel, channel->mBinaryBuffer, + channel->mBinaryBuffer.Length())); + channel->mBinaryBuffer.Truncate(0); + return; + } + // else send using recvData normally + break; + + default: + NS_ERROR("Unknown data PPID"); + return; + } + /* Notify onmessage */ + LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel)); + SendOrQueue(channel, + new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_DATA, this, + channel, recvData, length)); + } +} + +// Called with mLock locked! +void +DataChannelConnection::SendOrQueue(DataChannel *aChannel, + DataChannelOnMessageAvailable *aMessage) +{ + mLock.AssertCurrentThreadOwns(); + + if (!aChannel->mReady && + (aChannel->mState == DataChannel::CONNECTING || + aChannel->mState == DataChannel::WAITING_TO_OPEN)) { + aChannel->mQueuedMessages.AppendElement(aMessage); + } else { + NS_DispatchToMainThread(aMessage); + } +} + +// Called with mLock locked! +void +DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t streamIn) +{ + const struct rtcweb_datachannel_open_request *req; + const struct rtcweb_datachannel_open_response *rsp; + const struct rtcweb_datachannel_ack *ack, *msg; + + mLock.AssertCurrentThreadOwns(); + + switch (ppid) { + case DATA_CHANNEL_PPID_CONTROL: + NS_ENSURE_TRUE(length >= sizeof(*ack), /* */); // Ack is the smallest + + msg = static_cast(buffer); + switch (msg->msg_type) { + case DATA_CHANNEL_OPEN_REQUEST: + LOG(("length %u, sizeof(*req) = %u", length, sizeof(*req))); + NS_ENSURE_TRUE(length >= sizeof(*req), /* */); + + req = static_cast(buffer); + HandleOpenRequestMessage(req, length, streamIn); + break; + case DATA_CHANNEL_OPEN_RESPONSE: + NS_ENSURE_TRUE(length >= sizeof(*rsp), /* */); + + rsp = static_cast(buffer); + HandleOpenResponseMessage(rsp, length, streamIn); + break; + case DATA_CHANNEL_ACK: + // >= sizeof(*ack) checked above + + ack = static_cast(buffer); + HandleOpenAckMessage(ack, length, streamIn); + break; + default: + HandleUnknownMessage(ppid, length, streamIn); + break; + } + break; + case DATA_CHANNEL_PPID_DOMSTRING: + case DATA_CHANNEL_PPID_BINARY: + case DATA_CHANNEL_PPID_BINARY_LAST: + HandleDataMessage(ppid, buffer, length, streamIn); + break; + default: + LOG(("Message of length %lu, PPID %u on stream %u received.", + length, ppid, streamIn)); + break; + } +} + +void +DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac) +{ + uint32_t i, n; + + switch (sac->sac_state) { + case SCTP_COMM_UP: + LOG(("Association change: SCTP_COMM_UP")); + break; + case SCTP_COMM_LOST: + LOG(("Association change: SCTP_COMM_LOST")); + break; + case SCTP_RESTART: + LOG(("Association change: SCTP_RESTART")); + break; + case SCTP_SHUTDOWN_COMP: + LOG(("Association change: SCTP_SHUTDOWN_COMP")); + break; + case SCTP_CANT_STR_ASSOC: + LOG(("Association change: SCTP_CANT_STR_ASSOC")); + break; + default: + LOG(("Association change: UNKNOWN")); + break; + } + LOG(("Association change: streams (in/out) = (%u/%u)", + sac->sac_inbound_streams, sac->sac_outbound_streams)); + + NS_ENSURE_TRUE(sizeof(*sac) >= sac->sac_length, /* */); + n = sac->sac_length - sizeof(*sac); + if (((sac->sac_state == SCTP_COMM_UP) || + (sac->sac_state == SCTP_RESTART)) && (n > 0)) { + for (i = 0; i < n; ++i) { + switch (sac->sac_info[i]) { + case SCTP_ASSOC_SUPPORTS_PR: + LOG(("Supports: PR")); + break; + case SCTP_ASSOC_SUPPORTS_AUTH: + LOG(("Supports: AUTH")); + break; + case SCTP_ASSOC_SUPPORTS_ASCONF: + LOG(("Supports: ASCONF")); + break; + case SCTP_ASSOC_SUPPORTS_MULTIBUF: + LOG(("Supports: MULTIBUF")); + break; + case SCTP_ASSOC_SUPPORTS_RE_CONFIG: + LOG(("Supports: RE-CONFIG")); + break; + default: + LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i])); + break; + } + } + } else if (((sac->sac_state == SCTP_COMM_LOST) || + (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) { + LOG(("Association: ABORT =")); + for (i = 0; i < n; ++i) { + LOG((" 0x%02x", sac->sac_info[i])); + } + } + if ((sac->sac_state == SCTP_CANT_STR_ASSOC) || + (sac->sac_state == SCTP_SHUTDOWN_COMP) || + (sac->sac_state == SCTP_COMM_LOST)) { + return; + } +} + +void +DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc) +{ + char addr_buf[INET6_ADDRSTRLEN]; + const char *addr = ""; + struct sockaddr_in *sin; + struct sockaddr_in6 *sin6; +#if defined(__Userspace_os_Windows) + DWORD addr_len = INET6_ADDRSTRLEN; +#endif + + switch (spc->spc_aaddr.ss_family) { + case AF_INET: + sin = (struct sockaddr_in *)&spc->spc_aaddr; +#if !defined(__Userspace_os_Windows) + addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN); +#else + if (WSAAddressToStringA((LPSOCKADDR)sin, sizeof(sin->sin_addr), nullptr, + addr_buf, &addr_len)) { + return; + } +#endif + break; + case AF_INET6: + sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr; +#if !defined(__Userspace_os_Windows) + addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN); +#else + if (WSAAddressToStringA((LPSOCKADDR)sin6, sizeof(sin6), nullptr, + addr_buf, &addr_len)) { + return; + } +#endif + case AF_CONN: + addr = "DTLS connection"; + break; + default: + break; + } + LOG(("Peer address %s is now ", addr)); + switch (spc->spc_state) { + case SCTP_ADDR_AVAILABLE: + LOG(("SCTP_ADDR_AVAILABLE")); + break; + case SCTP_ADDR_UNREACHABLE: + LOG(("SCTP_ADDR_UNREACHABLE")); + break; + case SCTP_ADDR_REMOVED: + LOG(("SCTP_ADDR_REMOVED")); + break; + case SCTP_ADDR_ADDED: + LOG(("SCTP_ADDR_ADDED")); + break; + case SCTP_ADDR_MADE_PRIM: + LOG(("SCTP_ADDR_MADE_PRIM")); + break; + case SCTP_ADDR_CONFIRMED: + LOG(("SCTP_ADDR_CONFIRMED")); + break; + default: + LOG(("UNKNOWN")); + break; + } + LOG((" (error = 0x%08x).\n", spc->spc_error)); +} + +void +DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre) +{ + size_t i, n; + + n = sre->sre_length - sizeof(struct sctp_remote_error); + LOG(("Remote Error (error = 0x%04x): ", sre->sre_error)); + for (i = 0; i < n; ++i) { + LOG((" 0x%02x", sre-> sre_data[i])); + } +} + +void +DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse) +{ + LOG(("Shutdown event.")); + /* XXX: notify all channels. */ + // Attempts to actually send anything will fail +} + +void +DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai) +{ + LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind)); +} + +void +DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe) +{ + size_t i, n; + + if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) { + LOG(("Unsent ")); + } + if (ssfe->ssfe_flags & SCTP_DATA_SENT) { + LOG(("Sent ")); + } + if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) { + LOG(("(flags = %x) ", ssfe->ssfe_flags)); + } + LOG(("message with PPID = %d, SID = %d, flags: 0x%04x due to error = 0x%08x", + ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid, + ssfe->ssfe_info.snd_flags, ssfe->ssfe_error)); + n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event); + for (i = 0; i < n; ++i) { + LOG((" 0x%02x", ssfe->ssfe_data[i])); + } +} + +void +DataChannelConnection::ResetOutgoingStream(uint16_t streamOut) +{ + uint32_t i; + + mLock.AssertCurrentThreadOwns(); + // Rarely has more than a couple items and only for a short time + for (i = 0; i < mStreamsResetting.Length(); ++i) { + if (mStreamsResetting[i] == streamOut) { + return; + } + } + mStreamsResetting.AppendElement(streamOut); +} + +void +DataChannelConnection::SendOutgoingStreamReset() +{ + struct sctp_reset_streams *srs; + uint32_t i; + size_t len; + + mLock.AssertCurrentThreadOwns(); + if (mStreamsResetting.IsEmpty() == 0) { + return; + } + len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t); + srs = static_cast (moz_xmalloc(len)); // infallible malloc + memset(srs, 0, len); + srs->srs_flags = SCTP_STREAM_RESET_OUTGOING; + srs->srs_number_streams = mStreamsResetting.Length(); + for (i = 0; i < mStreamsResetting.Length(); ++i) { + srs->srs_stream_list[i] = mStreamsResetting[i]; + } + if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) { + LOG(("***failed: setsockopt RESET, errno %d", errno)); + } else { + mStreamsResetting.Clear(); + } + moz_free(srs); +} + +void +DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst) +{ + uint32_t n, i; + DataChannel *channel; + + if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) && + !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) { + n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t); + for (i = 0; i < n; ++i) { + if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { + channel = FindChannelByStreamIn(strrst->strreset_stream_list[i]); + if (channel != nullptr) { + mStreamsIn[channel->mStreamIn] = nullptr; + channel->mStreamIn = INVALID_STREAM; + if (channel->mStreamOut == INVALID_STREAM) { + channel->mPrPolicy = SCTP_PR_SCTP_NONE; + channel->mPrValue = 0; + channel->mFlags = 0; + channel->mState = CLOSED; + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, + channel)); + } else { + ResetOutgoingStream(channel->mStreamOut); + channel->mState = CLOSING; + } + } + } + if (strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) { + channel = FindChannelByStreamOut(strrst->strreset_stream_list[i]); + if (channel != nullptr && channel->mStreamOut != INVALID_STREAM) { + mStreamsOut[channel->mStreamOut] = nullptr; + channel->mStreamOut = INVALID_STREAM; + if (channel->mStreamIn == INVALID_STREAM) { + channel->mPrPolicy = SCTP_PR_SCTP_NONE; + channel->mPrValue = 0; + channel->mFlags = 0; + channel->mState = CLOSED; + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, + channel)); + } + } + } + } + } +} + +void +DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg) +{ + uint16_t streamOut; + uint32_t i; + DataChannel *channel; + + if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) { + LOG(("*** Failed increasing number of streams from %u (%u/%u)", + mStreamsOut.Length(), + strchg->strchange_instrms, + strchg->strchange_outstrms)); + // XXX FIX! notify pending opens of failure + return; + } else { + if (strchg->strchange_instrms > mStreamsIn.Length()) { + LOG(("Other side increased streamds from %u to %u", + mStreamsIn.Length(), strchg->strchange_instrms)); + } + if (strchg->strchange_outstrms > mStreamsOut.Length()) { + uint16_t old_len = mStreamsOut.Length(); + LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)", + old_len, + strchg->strchange_outstrms, + strchg->strchange_outstrms - old_len, + strchg->strchange_instrms)); + // make sure both are the same length + mStreamsOut.AppendElements(strchg->strchange_outstrms - old_len); + LOG(("New length = %d (was %d)", mStreamsOut.Length(), old_len)); + for (uint32_t i = old_len; i < mStreamsOut.Length(); ++i) { + mStreamsOut[i] = nullptr; + } + // Re-process any channels waiting for streams. + // Linear search, but we don't increase channels often and + // the array would only get long in case of an app error normally + + // Make sure we request enough streams if there's a big jump in streams + // Could make a more complex API for OpenXxxFinish() and avoid this loop + int32_t num_needed = mPending.GetSize(); + LOG(("%d of %d new streams already needed", num_needed, + strchg->strchange_outstrms - old_len)); + num_needed -= (strchg->strchange_outstrms - old_len); // number we added + if (num_needed > 0) { + if (num_needed < 16) + num_needed = 16; + LOG(("Not enough new streams, asking for %d more", num_needed)); + RequestMoreStreamsOut(num_needed); + } + + // Can't copy nsDeque's. Move into temp array since any that fail will + // go back to mPending + nsDeque temp; + while (nullptr != (channel = static_cast(mPending.PopFront()))) { + temp.Push(channel); + } + + // Now assign our new streams + while (nullptr != (channel = static_cast(temp.PopFront()))) { + if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_RSP) { + channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_RSP; + OpenResponseFinish(channel); // may reset the flag and re-push + } else if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) { + channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN; + OpenFinish(channel); // may reset the flag and re-push + } + } + } + // else probably not a change in # of streams + } + + for (i = 0; i < mStreamsOut.Length(); ++i) { + channel = mStreamsOut[i]; + if (!channel) + continue; + + if ((channel->mState == CONNECTING) && + (channel->mStreamOut == INVALID_STREAM)) { + if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) || + (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) { + /* XXX: Signal to the other end. */ + if (channel->mStreamIn != INVALID_STREAM) { + mStreamsIn[channel->mStreamIn] = nullptr; + } + channel->mState = CLOSED; + // inform user! + // XXX delete channel; + } else { + streamOut = FindFreeStreamOut(); + if (streamOut != INVALID_STREAM) { + channel->mStreamOut = streamOut; + mStreamsOut[streamOut] = channel; + if (channel->mStreamIn == INVALID_STREAM) { + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; + } else { + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_RSP; + } + StartDefer(); + } else { + /* We will not find more ... */ + break; + } + } + } + } +} + + +// Called with mLock locked! +void +DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n) +{ + mLock.AssertCurrentThreadOwns(); + if (notif->sn_header.sn_length != (uint32_t)n) { + return; + } + switch (notif->sn_header.sn_type) { + case SCTP_ASSOC_CHANGE: + HandleAssociationChangeEvent(&(notif->sn_assoc_change)); + break; + case SCTP_PEER_ADDR_CHANGE: + HandlePeerAddressChangeEvent(&(notif->sn_paddr_change)); + break; + case SCTP_REMOTE_ERROR: + HandleRemoteErrorEvent(&(notif->sn_remote_error)); + break; + case SCTP_SHUTDOWN_EVENT: + HandleShutdownEvent(&(notif->sn_shutdown_event)); + break; + case SCTP_ADAPTATION_INDICATION: + HandleAdaptationIndication(&(notif->sn_adaptation_event)); + break; + case SCTP_PARTIAL_DELIVERY_EVENT: + LOG(("SCTP_PARTIAL_DELIVERY_EVENT")); + break; + case SCTP_AUTHENTICATION_EVENT: + LOG(("SCTP_AUTHENTICATION_EVENT")); + break; + case SCTP_SENDER_DRY_EVENT: + //LOG(("SCTP_SENDER_DRY_EVENT")); + break; + case SCTP_NOTIFICATIONS_STOPPED_EVENT: + LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT")); + break; + case SCTP_SEND_FAILED_EVENT: + HandleSendFailedEvent(&(notif->sn_send_failed_event)); + break; + case SCTP_STREAM_RESET_EVENT: + HandleStreamResetEvent(&(notif->sn_strreset_event)); + break; + case SCTP_ASSOC_RESET_EVENT: + LOG(("SCTP_ASSOC_RESET_EVENT")); + break; + case SCTP_STREAM_CHANGE_EVENT: + HandleStreamChangeEvent(&(notif->sn_strchange_event)); + break; + default: + LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type)); + break; + } + } + +int +DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen, + struct sctp_rcvinfo rcv, int32_t flags) +{ + MOZ_ASSERT(!NS_IsMainThread()); + + if (!data) { + usrsctp_close(sock); // SCTP has finished shutting down + } else { + MutexAutoLock lock(mLock); + if (flags & MSG_NOTIFICATION) { + HandleNotification(static_cast(data), datalen); + } else { + HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid); + } + } + // usrsctp defines the callback as returning an int, but doesn't use it + return 1; +} + +DataChannel * +DataChannelConnection::Open(const nsACString& label, Type type, bool inOrder, + uint32_t prValue, DataChannelListener *aListener, + nsISupports *aContext) +{ + DataChannel *channel; + uint16_t prPolicy = SCTP_PR_SCTP_NONE; + uint32_t flags; + + LOG(("DC Open: label %s, type %u, inorder %d, prValue %u, listener %p, context %p", + PromiseFlatCString(label).get(), type, inOrder, prValue, aListener, aContext)); + switch (type) { + case DATA_CHANNEL_RELIABLE: + prPolicy = SCTP_PR_SCTP_NONE; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT: + prPolicy = SCTP_PR_SCTP_RTX; + break; + case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED: + prPolicy = SCTP_PR_SCTP_TTL; + break; + } + if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) { + return nullptr; + } + + flags = !inOrder ? DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED : 0; + channel = new DataChannel(this, INVALID_STREAM, INVALID_STREAM, + DataChannel::CONNECTING, + label, type, prValue, + flags, + aListener, aContext); // infallible malloc + + MutexAutoLock lock(mLock); // OpenFinish assumes this + return OpenFinish(channel); +} + +// Separate routine so we can also call it to finish up from pending opens +DataChannel * +DataChannelConnection::OpenFinish(DataChannel *channel) +{ + uint16_t streamOut = FindFreeStreamOut(); // may be INVALID_STREAM! + + mLock.AssertCurrentThreadOwns(); + + LOG(("Finishing open: channel %p, streamOut = %u", channel, streamOut)); + + if (streamOut == INVALID_STREAM) { + if (!RequestMoreStreamsOut()) { + if (channel->mFlags &= DATA_CHANNEL_FLAGS_FINISH_OPEN) { + // We already returned the channel to the app. Mark it closed + channel->mState = CLOSED; + NS_ERROR("Failed to request more streams"); + return channel; + } + // we can do this with the lock held because mStreamOut is INVALID_STREAM, + // so there's no outbound channel to reset + delete channel; + return nullptr; + } + LOG(("Queuing channel %p to finish open", channel)); + // Also serves to mark we told the app + channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN; + mPending.Push(channel); + return channel; + } + mStreamsOut[streamOut] = channel; + channel->mStreamOut = streamOut; + + if (!SendOpenRequestMessage(channel->mLabel, streamOut, + !!(channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED), + channel->mPrPolicy, channel->mPrValue)) { + LOG(("SendOpenRequest failed, errno = %d", errno)); + if (errno == EAGAIN) { + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ; + StartDefer(); + } else { + // XXX FIX! can't do this if we previously returned it! Need to internally mark it dead + // and file onerror + mStreamsOut[streamOut] = nullptr; + channel->mStreamOut = INVALID_STREAM; + // we can do this with the lock held because mStreamOut is INVALID_STREAM, + // so there's no outbound channel to reset (we didn't sent anything) + delete channel; + return nullptr; + } + } + return channel; +} + +int32_t +DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data, + uint32_t length, uint32_t ppid) +{ + uint16_t flags; + struct sctp_sendv_spa spa; + int32_t result; + + NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0); + NS_WARN_IF_FALSE(length > 0, "Length is 0?!"); + + flags = (channel->mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED) ? SCTP_UNORDERED : 0; + + // To avoid problems where an in-order OPEN_RESPONSE is lost and an + // out-of-order data message "beats" it, require data to be in-order + // until we get an ACK. + if (channel->mState == CONNECTING) { + flags &= ~SCTP_UNORDERED; + } + spa.sendv_sndinfo.snd_ppid = htonl(ppid); + spa.sendv_sndinfo.snd_sid = channel->mStreamOut; + spa.sendv_sndinfo.snd_flags = flags; + spa.sendv_sndinfo.snd_context = 0; + spa.sendv_sndinfo.snd_assoc_id = 0; + + spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL; + spa.sendv_prinfo.pr_value = channel->mPrValue; + + spa.sendv_flags = SCTP_SEND_SNDINFO_VALID | SCTP_SEND_PRINFO_VALID; + + // Note: Main-thread IO, but doesn't block! + // XXX FIX! to deal with heavy overruns of JS trying to pass data in + // (more than the buffersize) queue data onto another thread to do the + // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp + + // SCTP will return EMSGSIZE if the message is bigger than the buffer + // size (or EAGAIN if there isn't space) + if (channel->mBufferedData.IsEmpty()) { + result = usrsctp_sendv(mSocket, data, length, + nullptr, 0, + (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa), + SCTP_SENDV_SPA, flags); + LOG(("Sent buffer (len=%u), result=%d", length, result)); + } else { + // Fake EAGAIN if we're already buffering data + result = -1; + errno = EAGAIN; + } + if (result < 0) { + if (errno == EAGAIN) { + // queue data for resend! And queue any further data for the stream until it is... + BufferedMsg *buffered = new BufferedMsg(spa, data, length); // infallible malloc + channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array + channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA; + LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length)); + StartDefer(); + return 0; + } + LOG(("error %d sending string", errno)); + } + return result; +} + +// Handles fragmenting binary messages +int32_t +DataChannelConnection::SendBinary(DataChannel *channel, const char *data, + uint32_t len) +{ + // Since there's a limit on network buffer size and no limits on message + // size, and we don't want to use EOR mode (multiple writes for a + // message, but all other streams are blocked until you finish sending + // this message), we need to add application-level fragmentation of large + // messages. On a reliable channel, these can be simply rebuilt into a + // large message. On an unreliable channel, we can't and don't know how + // long to wait, and there are no retransmissions, and no easy way to + // tell the user "this part is missing", so on unreliable channels we + // need to return an error if sending more bytes than the network buffers + // can hold, and perhaps a lower number. + + // We *really* don't want to do this from main thread! - and SendMsgInternal + // avoids blocking. + if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT && + channel->mPrPolicy == DATA_CHANNEL_RELIABLE) { + int32_t sent=0; + uint32_t origlen = len; + LOG(("Sending binary message length %u in chunks", len)); + // XXX check flags for out-of-order, or force in-order for large binary messages + while (len > 0) { + uint32_t sendlen = PR_MIN(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT); + uint32_t ppid; + len -= sendlen; + ppid = len > 0 ? DATA_CHANNEL_PPID_BINARY : DATA_CHANNEL_PPID_BINARY_LAST; + LOG(("Send chunk of %d bytes, ppid %d", sendlen, ppid)); + // Note that these might end up being deferred and queued. + sent += SendMsgInternal(channel, data, sendlen, ppid); + data += sendlen; + } + LOG(("Sent %d buffers for %u bytes, %d sent immediately, % buffers queued", + (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT, + origlen, sent, + channel->mBufferedData.Length())); + return sent; + } + NS_WARN_IF_FALSE(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT, + "Sending too-large data on unreliable channel!"); + + // This will fail if the message is too large + return SendMsgInternal(channel, data, len, DATA_CHANNEL_PPID_BINARY_LAST); +} + +int32_t +DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob) +{ + DataChannel *channel = mStreamsOut[stream]; + NS_ENSURE_TRUE(channel, 0); + // Spawn a thread to send the data + + LOG(("Sending blob to stream %u", stream)); + + // XXX to do this safely, we must enqueue these atomically onto the + // output socket. We need a sender thread(s?) to enque data into the + // socket and to avoid main-thread IO that might block. Even on a + // background thread, we may not want to block on one stream's data. + // I.e. run non-blocking and service multiple channels. + + // For now as a hack, send as a single blast of queued packets which may + // be deferred until buffer space is available. + nsAutoPtr temp(new nsCString()); + uint64_t len; + aBlob->Available(&len); + nsresult rv = NS_ReadInputStreamToString(aBlob, *temp, len); + + NS_ENSURE_SUCCESS(rv, rv); + + aBlob->Close(); + //aBlob->Release(); We didn't AddRef() the way WebSocket does in OutboundMessage (yet) + + // Consider if it makes sense to split the message ourselves for + // transmission, at least on RELIABLE channels. Sending large blobs via + // unreliable channels requires some level of application involvement, OR + // sending them at big, single messages, which if large will probably not + // get through. + + // XXX For now, send as one large binary message. We should also signal + // (via PPID) that it's a blob. + const char *data = temp.get()->BeginReading(); + len = temp.get()->Length(); + + return SendBinary(channel, data, len); +} + +int32_t +DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg, + bool isBinary) +{ + MOZ_ASSERT(NS_IsMainThread()); + // We really could allow this from other threads, so long as we deal with + // asynchronosity issues with channels closing, in particular access to + // mStreamsOut, and issues with the association closing (access to mSocket). + + const char *data = aMsg.BeginReading(); + uint32_t len = aMsg.Length(); + DataChannel *channel; + + if (isBinary) + LOG(("Sending to stream %u: %u bytes", stream, len)); + else + LOG(("Sending to stream %u: %s", stream, data)); + // XXX if we want more efficiency, translate flags once at open time + channel = mStreamsOut[stream]; + NS_ENSURE_TRUE(channel, 0); + + if (isBinary) + return SendBinary(channel, data, len); + return SendMsgInternal(channel, data, len, DATA_CHANNEL_PPID_DOMSTRING); +} + +void +DataChannelConnection::Close(uint16_t streamOut) +{ + DataChannel *channel; + + MutexAutoLock lock(mLock); + LOG(("Closing stream %d",streamOut)); + channel = FindChannelByStreamOut(streamOut); + if (channel) { + channel->mBufferedData.Clear(); + ResetOutgoingStream(channel->mStreamOut); + SendOutgoingStreamReset(); + channel->mState = CLOSING; + } +} + +void DataChannelConnection::CloseAll() +{ + LOG(("Closing all channels")); + // Don't need to lock here + + // Make sure no more channels will be opened + mState = CLOSED; + + // Close current channels + // FIX! if there are runnables, they must use weakrefs or hold a strong + // ref and keep the channel and/or connection alive + for (uint32_t i = 0; i < mStreamsOut.Length(); ++i) { + if (mStreamsOut[i]) { + mStreamsOut[i]->Close(); + } + } + + // Clean up any pending opens for channels + DataChannel *channel; + while (nullptr != (channel = static_cast(mPending.PopFront()))) + channel->Close(); +} + +void +DataChannel::Close() +{ + if (mState == CLOSING || mState == CLOSED || + mStreamOut == INVALID_STREAM) { + return; + } + mState = CLOSING; + mConnection->Close(mStreamOut); + mStreamOut = INVALID_STREAM; + mStreamIn = INVALID_STREAM; +} + +void +DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext) +{ + MOZ_ASSERT(!mListener); // only should be set once, avoids races w/o locking + mContext = aContext; + mListener = aListener; +} + +// May be called from another (i.e. Main) thread! +void +DataChannel::AppReady() +{ + MutexAutoLock lock(mConnection->mLock); + + mReady = true; + if (mState == WAITING_TO_OPEN) { + mState = OPEN; + NS_DispatchToMainThread(new DataChannelOnMessageAvailable( + DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection, + this)); + for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) { + nsCOMPtr runnable = mQueuedMessages[i]; + MOZ_ASSERT(runnable); + NS_DispatchToMainThread(runnable); + } + } else { + NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN"); + } + mQueuedMessages.Clear(); + mQueuedMessages.Compact(); + // We never use it again... We could even allocate the array in the odd + // cases we need it. +} + +uint32_t +DataChannel::GetBufferedAmount() +{ + uint32_t buffered = 0; + for (uint32_t i = 0; i < mBufferedData.Length(); ++i) { + buffered += mBufferedData[i]->mLength; + } + return buffered; +} + +} // namespace mozilla + diff --git a/netwerk/sctp/datachannel/DataChannel.h b/netwerk/sctp/datachannel/DataChannel.h new file mode 100644 index 00000000000..583ecc8df28 --- /dev/null +++ b/netwerk/sctp/datachannel/DataChannel.h @@ -0,0 +1,456 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_ +#define NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_ + +#include +#include "nsISupports.h" +#include "nsCOMPtr.h" +#include "nsString.h" +#include "nsThreadUtils.h" +#include "nsTArray.h" +#include "nsDeque.h" +#include "nsIInputStream.h" +#include "nsITimer.h" +#include "mozilla/Mutex.h" +#include "DataChannelProtocol.h" +#ifdef SCTP_DTLS_SUPPORTED +#include "talk/base/sigslot.h" +#include "mtransport/transportflow.h" +#include "mtransport/transportlayer.h" +#include "mtransport/transportlayerprsock.h" +#endif + +extern "C" { + struct socket; + struct sctp_rcvinfo; +} + +namespace mozilla { + +class DTLSConnection; +class DataChannelConnection; +class DataChannel; +class DataChannelOnMessageAvailable; + +class BufferedMsg +{ +public: + BufferedMsg(struct sctp_sendv_spa &spa,const char *data, + uint32_t length); + ~BufferedMsg(); + + struct sctp_sendv_spa *mSpa; + const char *mData; + uint32_t mLength; +}; + +// Implemented by consumers of a Channel to receive messages. +// Can't nest it in DataChannelConnection because C++ doesn't allow forward +// refs to embedded classes +class DataChannelListener { +public: + virtual ~DataChannelListener() {} + + // Called when a DOMString message is received. + virtual nsresult OnMessageAvailable(nsISupports *aContext, + const nsACString& message) = 0; + + // Called when a binary message is received. + virtual nsresult OnBinaryMessageAvailable(nsISupports *aContext, + const nsACString& message) = 0; + + // Called when the channel is connected + virtual nsresult OnChannelConnected(nsISupports *aContext) = 0; + + // Called when the channel is closed + virtual nsresult OnChannelClosed(nsISupports *aContext) = 0; +}; + + +// One per PeerConnection +class DataChannelConnection: public nsITimerCallback +#ifdef SCTP_DTLS_SUPPORTED + , public sigslot::has_slots<> +#endif +{ +public: + NS_DECL_ISUPPORTS + NS_DECL_NSITIMERCALLBACK + + class DataConnectionListener { + public: + virtual ~DataConnectionListener() {} + + // Called when a the connection is open + virtual void NotifyConnection() = 0; + + // Called when a the connection is lost/closed + virtual void NotifyClosedConnection() = 0; + + // Called when a new DataChannel has been opened by the other side. + virtual void NotifyDataChannel(DataChannel *channel) = 0; + }; + + DataChannelConnection(DataConnectionListener *listener); + virtual ~DataChannelConnection(); + + bool Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls); + + // These block; they require something to decide on listener/connector + // (though you can do simultaneous Connect()). Do not call these from + // the main thread! + bool Listen(unsigned short port); + bool Connect(const char *addr, unsigned short port); + +#ifdef SCTP_DTLS_SUPPORTED + // Connect using a TransportFlow (DTLS) channel + bool ConnectDTLS(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport); +#endif + + typedef enum { + RELIABLE=0, + PARTIAL_RELIABLE_REXMIT = 1, + PARTIAL_RELIABLE_TIMED = 2 + } Type; + + DataChannel *Open(const nsACString& label, + Type type, bool inOrder, + uint32_t prValue, DataChannelListener *aListener, + nsISupports *aContext); + + void Close(uint16_t stream); + void CloseAll(); + + int32_t SendMsg(uint16_t stream, const nsACString &aMsg) + { + return SendMsgCommon(stream, aMsg, false); + } + int32_t SendBinaryMsg(uint16_t stream, const nsACString &aMsg) + { + return SendMsgCommon(stream, aMsg, true); + } + int32_t SendBlob(uint16_t stream, nsIInputStream *aBlob); + + // Called on data reception from the SCTP library + // must(?) be public so my c->c++ tramploine can call it + int ReceiveCallback(struct socket* sock, void *data, size_t datalen, + struct sctp_rcvinfo rcv, int32_t flags); + + // Find out state + enum { + CONNECTING = 0U, + OPEN = 1U, + CLOSING = 2U, + CLOSED = 3U + }; + uint16_t GetReadyState() { return mState; } + + friend class DataChannel; + Mutex mLock; + +protected: + friend class DataChannelOnMessageAvailable; + DataConnectionListener *mListener; + +private: +#ifdef SCTP_DTLS_SUPPORTED + static void DTLSConnectThread(void *data); + int SendPacket(const unsigned char* data, size_t len); + void PacketReceived(TransportFlow *flow, const unsigned char *data, size_t len); + static int SctpDtlsOutput(void *addr, void *buffer, size_t length, uint8_t tos, uint8_t set_df); +#endif + DataChannel* FindChannelByStreamIn(uint16_t streamIn); + DataChannel* FindChannelByStreamOut(uint16_t streamOut); + uint16_t FindFreeStreamOut(); + bool RequestMoreStreamsOut(int32_t aNeeded = 16); + int32_t SendControlMessage(void *msg, uint32_t len, uint16_t streamOut); + int32_t SendOpenRequestMessage(const nsACString& label,uint16_t streamOut, + bool unordered, uint16_t prPolicy, uint32_t prValue); + int32_t SendOpenResponseMessage(uint16_t streamOut, uint16_t streamIn); + int32_t SendOpenAckMessage(uint16_t streamOut); + int32_t SendMsgInternal(DataChannel *channel, const char *data, + uint32_t length, uint32_t ppid); + int32_t SendBinary(DataChannel *channel, const char *data, + uint32_t len); + int32_t SendMsgCommon(uint16_t stream, const nsACString &aMsg, bool isBinary); + + DataChannel *OpenFinish(DataChannel *channel); + + void SendOrQueue(DataChannel *aChannel, DataChannelOnMessageAvailable *aMessage); + void StartDefer(); + bool SendDeferredMessages(); + void SendOutgoingStreamReset(); + void ResetOutgoingStream(uint16_t streamOut); + void HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req, + size_t length, + uint16_t streamIn); + void OpenResponseFinish(DataChannel *channel); + void HandleOpenResponseMessage(const struct rtcweb_datachannel_open_response *rsp, + size_t length, uint16_t streamIn); + void HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack, + size_t length, uint16_t streamIn); + void HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t streamIn); + void HandleDataMessage(uint32_t ppid, const void *buffer, size_t length, uint16_t streamIn); + void HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t streamIn); + void HandleAssociationChangeEvent(const struct sctp_assoc_change *sac); + void HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc); + void HandleRemoteErrorEvent(const struct sctp_remote_error *sre); + void HandleShutdownEvent(const struct sctp_shutdown_event *sse); + void HandleAdaptationIndication(const struct sctp_adaptation_event *sai); + void HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe); + void HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst); + void HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg); + void HandleNotification(const union sctp_notification *notif, size_t n); + + // NOTE: while these arrays will auto-expand, increases in the number of + // channels available from the stack must be negotiated! + nsAutoTArray mStreamsOut; + nsAutoTArray mStreamsIn; + nsDeque mPending; // Holds DataChannels + + // Streams pending reset + nsAutoTArray mStreamsResetting; + + struct socket *mMasterSocket; + struct socket *mSocket; + uint16_t mState; + +#ifdef SCTP_DTLS_SUPPORTED + nsRefPtr mTransportFlow; +#endif + uint16_t mLocalPort; + uint16_t mRemotePort; + + // Timer to control when we try to resend blocked messages + nsCOMPtr mDeferredTimer; + uint32_t mDeferTimeout; // in ms + bool mTimerRunning; +}; + +class DataChannel { +public: + enum { + CONNECTING = 0U, + OPEN = 1U, + CLOSING = 2U, + CLOSED = 3U, + WAITING_TO_OPEN = 4U + }; + + DataChannel(DataChannelConnection *connection, + uint16_t streamOut, uint16_t streamIn, + uint16_t state, + const nsACString& label, + uint16_t policy, uint32_t value, + uint32_t flags, + DataChannelListener *aListener, + nsISupports *aContext) + : mListener(aListener) + , mConnection(connection) + , mLabel(label) + , mState(state) + , mReady(false) + , mStreamOut(streamOut) + , mStreamIn(streamIn) + , mPrPolicy(policy) + , mPrValue(value) + , mFlags(0) + , mContext(aContext) + { + NS_ASSERTION(mConnection,"NULL connection"); + } + + ~DataChannel() + { + Close(); + } + + // Close this DataChannel. Can be called multiple times. + void Close(); + + // Set the listener (especially for channels created from the other side) + // Note: The Listener and Context should only be set once + void SetListener(DataChannelListener *aListener, nsISupports *aContext); + + // Send a string + bool SendMsg(const nsACString &aMsg) + { + if (mStreamOut != INVALID_STREAM) + return (mConnection->SendMsg(mStreamOut, aMsg) > 0); + else + return false; + } + + // Send a binary message (TypedArray) + bool SendBinaryMsg(const nsACString &aMsg) + { + if (mStreamOut != INVALID_STREAM) + return (mConnection->SendBinaryMsg(mStreamOut, aMsg) > 0); + else + return false; + } + + // Send a binary blob + bool SendBinaryStream(nsIInputStream *aBlob, uint32_t msgLen) + { + if (mStreamOut != INVALID_STREAM) + return (mConnection->SendBlob(mStreamOut, aBlob) > 0); + else + return false; + } + + uint16_t GetType() { return mPrPolicy; } + + bool GetOrdered() { return !(mFlags & DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED); } + + // Amount of data buffered to send + uint32_t GetBufferedAmount(); + + // Find out state + uint16_t GetReadyState() + { + if (mState == WAITING_TO_OPEN) + return CONNECTING; + return mState; + } + + void SetReadyState(uint16_t aState) { mState = aState; } + + void GetLabel(nsAString& aLabel) { CopyUTF8toUTF16(mLabel, aLabel); } + + void AppReady(); + +protected: + DataChannelListener *mListener; + +private: + friend class DataChannelOnMessageAvailable; + friend class DataChannelConnection; + + nsresult AddDataToBinaryMsg(const char *data, uint32_t size); + + nsRefPtr mConnection; + nsCString mLabel; + uint16_t mState; + bool mReady; + uint16_t mStreamOut; + uint16_t mStreamIn; + uint16_t mPrPolicy; + uint32_t mPrValue; + uint32_t mFlags; + uint32_t mId; + nsCOMPtr mContext; + nsCString mBinaryBuffer; + nsTArray > mBufferedData; + nsTArray > mQueuedMessages; +}; + +// used to dispatch notifications of incoming data to the main thread +// Patterned on CallOnMessageAvailable in WebSockets +// Also used to proxy other items to MainThread +class DataChannelOnMessageAvailable : public nsRunnable +{ +public: + enum { + ON_CONNECTION, + ON_DISCONNECTED, + ON_CHANNEL_CREATED, + ON_CHANNEL_OPEN, + ON_CHANNEL_CLOSED, + ON_DATA, + START_DEFER, + }; /* types */ + + DataChannelOnMessageAvailable(int32_t aType, + DataChannelConnection *aConnection, + DataChannel *aChannel, + nsCString &aData, // XXX this causes inefficiency + int32_t aLen) + : mType(aType), + mChannel(aChannel), + mConnection(aConnection), + mData(aData), + mLen(aLen) {} + + DataChannelOnMessageAvailable(int32_t aType, + DataChannel *aChannel) + : mType(aType), + mChannel(aChannel) {} + // XXX is it safe to leave mData/mLen uninitialized? This should only be + // used for notifications that don't use them, but I'd like more + // bulletproof compile-time checking. + + DataChannelOnMessageAvailable(int32_t aType, + DataChannelConnection *aConnection, + DataChannel *aChannel) + : mType(aType), + mChannel(aChannel), + mConnection(aConnection) {} + + NS_IMETHOD Run() + { + switch (mType) { + case ON_DATA: + case ON_CHANNEL_OPEN: + case ON_CHANNEL_CLOSED: + if (!mChannel->mListener) + return NS_OK; + break; + case ON_CHANNEL_CREATED: + case ON_CONNECTION: + case ON_DISCONNECTED: + if (!mConnection->mListener) + return NS_OK; + break; + case START_DEFER: + break; + } + switch (mType) { + case ON_DATA: + if (mLen < 0) { + mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData); + } else { + mChannel->mListener->OnBinaryMessageAvailable(mChannel->mContext, mData); + } + break; + case ON_CHANNEL_OPEN: + mChannel->mListener->OnChannelConnected(mChannel->mContext); + break; + case ON_CHANNEL_CLOSED: + mChannel->mListener->OnChannelClosed(mChannel->mContext); + break; + case ON_CHANNEL_CREATED: + mConnection->mListener->NotifyDataChannel(mChannel); + break; + case ON_CONNECTION: + mConnection->mListener->NotifyConnection(); + break; + case ON_DISCONNECTED: + mConnection->mListener->NotifyClosedConnection(); + break; + case START_DEFER: + mConnection->StartDefer(); + break; + } + return NS_OK; + } + +private: + ~DataChannelOnMessageAvailable() {} + + int32_t mType; + // XXX should use union + DataChannel *mChannel; // XXX careful of ownership! + nsRefPtr mConnection; + nsCString mData; + int32_t mLen; +}; + +} + +#endif // NETWERK_SCTP_DATACHANNEL_DATACHANNEL_H_ diff --git a/netwerk/sctp/datachannel/DataChannelLog.h b/netwerk/sctp/datachannel/DataChannelLog.h new file mode 100644 index 00000000000..be0ee4b9da6 --- /dev/null +++ b/netwerk/sctp/datachannel/DataChannelLog.h @@ -0,0 +1,28 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef DataChannelLog_h +#define DataChannelLog_h + +#ifdef MOZ_LOGGING +#define FORCE_PR_LOG +#endif + +#if defined(PR_LOG) +#error "This file must be #included before any IPDL-generated files or other files that #include prlog.h" +#endif + +#include "base/basictypes.h" +#include "prlog.h" + +#ifdef PR_LOGGING +extern PRLogModuleInfo* dataChannelLog; +#endif + +#undef LOG +#define LOG(args) PR_LOG(dataChannelLog, PR_LOG_DEBUG, args) + +#endif diff --git a/netwerk/sctp/datachannel/DataChannelProtocol.h b/netwerk/sctp/datachannel/DataChannelProtocol.h new file mode 100644 index 00000000000..523d89151b4 --- /dev/null +++ b/netwerk/sctp/datachannel/DataChannelProtocol.h @@ -0,0 +1,81 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this file, + * You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#ifndef NETWERK_SCTP_DATACHANNEL_DATACHANNELPROTOCOL_H_ +#define NETWERK_SCTP_DATACHANNEL_DATACHANNELPROTOCOL_H_ + +#if defined(__GNUC__) +#define SCTP_PACKED __attribute__((packed)) +#elif defined(_MSC_VER) +#pragma pack (push, 1) +#define SCTP_PACKED +#else +#error "Unsupported compiler" +#endif + +#define DATA_CHANNEL_PPID_CONTROL 50 +#define DATA_CHANNEL_PPID_DOMSTRING 51 +#define DATA_CHANNEL_PPID_BINARY 52 +#define DATA_CHANNEL_PPID_BINARY_LAST 53 + +#define DATA_CHANNEL_MAX_BINARY_FRAGMENT 0x4000 + +#define DATA_CHANNEL_FLAGS_SEND_REQ 0x00000001 +#define DATA_CHANNEL_FLAGS_SEND_RSP 0x00000002 +#define DATA_CHANNEL_FLAGS_SEND_ACK 0x00000004 +#define DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED 0x00000008 +#define DATA_CHANNEL_FLAGS_SEND_DATA 0x00000010 +#define DATA_CHANNEL_FLAGS_FINISH_OPEN 0x00000020 +#define DATA_CHANNEL_FLAGS_FINISH_RSP 0x00000040 + +#define INVALID_STREAM (0xFFFF) +// max is 0xFFFF: Streams 0 to 0xFFFE = 0xFFFF streams +#define MAX_NUM_STREAMS (2048) + +struct rtcweb_datachannel_open_request { + uint8_t msg_type; // DATA_CHANNEL_OPEN + uint8_t channel_type; + uint16_t flags; + uint16_t reliability_params; + int16_t priority; + char label[1]; // keep VC++ happy... UTF8 null-terminated string +} SCTP_PACKED; + +struct rtcweb_datachannel_open_response { + uint8_t msg_type; // DATA_CHANNEL_OPEN_RESPONSE + uint8_t error; // 0 == no error + uint16_t flags; + uint16_t reverse_stream; +} SCTP_PACKED; + +struct rtcweb_datachannel_ack { + uint8_t msg_type; // DATA_CHANNEL_ACK +} SCTP_PACKED; + +/* msg_type values: */ +#define DATA_CHANNEL_OPEN_REQUEST 0 +#define DATA_CHANNEL_OPEN_RESPONSE 1 +#define DATA_CHANNEL_ACK 2 + +/* channel_type values: */ +#define DATA_CHANNEL_RELIABLE 0 +#define DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT 1 +#define DATA_CHANNEL_PARTIAL_RELIABLE_TIMED 2 + +/* flags values: */ +#define DATA_CHANNEL_FLAG_OUT_OF_ORDER_ALLOWED 0x0001 +/* all other bits reserved and should be set to 0 */ + + +#define ERR_DATA_CHANNEL_ALREADY_OPEN 1 +#define ERR_DATA_CHANNEL_NONE_AVAILABLE 2 + +#if defined(_MSC_VER) +#pragma pack (pop) +#undef SCTP_PACKED +#endif + +#endif diff --git a/netwerk/sctp/datachannel/Makefile.in b/netwerk/sctp/datachannel/Makefile.in new file mode 100644 index 00000000000..311c9997892 --- /dev/null +++ b/netwerk/sctp/datachannel/Makefile.in @@ -0,0 +1,69 @@ +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +DEPTH = @DEPTH@ +topsrcdir = @top_srcdir@ +srcdir = @srcdir@ +VPATH = @srcdir@ + +include $(DEPTH)/config/autoconf.mk + +MODULE = necko +LIBRARY_NAME = nkdatachan_s +LIBXUL_LIBRARY = 1 +XPIDL_MODULE = necko_datachan +GRE_MODULE = 1 +FORCE_STATIC_LIB = 1 + +EXPORTS_NAMESPACES = mozilla/net + +CPPSRCS = \ + DataChannel.cpp \ + $(NULL) + +EXPORTS_mozilla/net = \ + DataChannel.h \ + DataChannelProtocol.h \ + $(NULL) + +LOCAL_INCLUDES = \ + -I$(topsrcdir)/xpcom/ds \ + -I$(srcdir)/../src \ + -I$(DEPTH)/dist/include/mozilla/net \ + -I$(topsrcdir)/media/webrtc/trunk/third_party/libjingle/source \ + -I$(topsrcdir)/media/mtransport \ + $(NULL) + +DEFINES = \ + -DINET=1 \ + -DINET6=1 \ + -DSCTP_DEBUG=1 \ + $(NULL) + +ifeq ($(OS_TARGET),WINNT) +DEFINES += -D__Userspace_os_Windows=1 +else +ifeq ($(OS_TARGET),Darwin) +DEFINES += -D__Userspace_os_Darwin=1 +else +ifeq ($(OS_TARGET),Linux) +DEFINES += -D__Userspace_os_Linux=1 +else +ifeq ($(OS_TARGET),FreeBSD) +DEFINES += -D__Userspace_os_FreeBSD=1 +else +#default_fallback; probably doesn't work +DEFINES += -D__Userspace_os_$(OS_TARGET)=1 +endif +endif +endif +endif + + +include $(topsrcdir)/config/config.mk +include $(topsrcdir)/ipc/chromium/chromium-config.mk +include $(topsrcdir)/config/rules.mk + +DEFINES += -DIMPL_NS_NET