Backed out changeset 25663e7d7f56 a=memory leak

This commit is contained in:
Jason Duell 2012-06-28 16:36:14 -07:00
parent 63599909fe
commit c9fc31d72f
3 changed files with 220 additions and 486 deletions

View File

@ -919,10 +919,6 @@ pref("network.websocket.max-connections", 200);
// (i.e. wss://) websockets.
pref("network.websocket.allowInsecureFromHTTPS", false);
// by default we delay websocket reconnects to same host/port if previous
// connection failed, per RFC 6455 section 7.2.3
pref("network.websocket.delay-failed-reconnects", true);
// </ws>
// Server-Sent Events

View File

@ -34,7 +34,6 @@
#include "nsProxyRelease.h"
#include "nsNetUtil.h"
#include "mozilla/Attributes.h"
#include "TimeStamp.h"
#include "plbase64.h"
#include "prmem.h"
@ -42,14 +41,8 @@
#include "prbit.h"
#include "zlib.h"
// rather than slurp up all of nsIWebSocket.idl, which lives outside necko, just
// dupe one constant we need from it
#define CLOSE_GOING_AWAY 1001
extern PRThread *gSocketThread;
using namespace mozilla;
namespace mozilla {
namespace net {
@ -84,385 +77,6 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocketChannel,
// some helper classes
//-----------------------------------------------------------------------------
// FailDelayManager
//
// Stores entries (searchable by {host, port}) of connections that have recently
// failed, so we can do delay of reconnects per RFC 6455 Section 7.2.3
//-----------------------------------------------------------------------------
// Initial reconnect delay is randomly chosen between 200-400 ms.
// This is a gentler backoff than the 0-5 seconds the spec offhandedly suggests.
const PRUint32 kWSReconnectInitialBaseDelay = 200;
const PRUint32 kWSReconnectInitialRandomDelay = 200;
// Base lifetime (in ms) of a FailDelay: kept longer if more failures occur
const PRUint32 kWSReconnectBaseLifeTime = 60 * 1000;
// Maximum reconnect delay (in ms)
const PRUint32 kWSReconnectMaxDelay = 60 * 1000;
// hold record of failed connections, and calculates needed delay for reconnects
// to same host/port.
class FailDelay
{
public:
FailDelay(nsCString address, PRInt32 port)
: mAddress(address), mPort(port)
{
mLastFailure = TimeStamp::Now();
mNextDelay = kWSReconnectInitialBaseDelay +
(rand() % kWSReconnectInitialRandomDelay);
}
// Called to update settings when connection fails again.
void FailedAgain()
{
mLastFailure = TimeStamp::Now();
// We use a truncated exponential backoff as suggested by RFC 6455,
// but multiply by 1.5 instead of 2 to be more gradual.
mNextDelay = PR_MIN(kWSReconnectMaxDelay, mNextDelay * 1.5);
LOG(("WebSocket: FailedAgain: host=%s, port=%d: incremented delay to %lu",
mAddress.get(), mPort, mNextDelay));
}
// returns 0 if there is no need to delay (i.e. delay interval is over)
PRUint32 RemainingDelay(TimeStamp rightNow)
{
TimeDuration dur = rightNow - mLastFailure;
PRUint32 sinceFail = (PRUint32) dur.ToMilliseconds();
if (sinceFail > mNextDelay)
return 0;
return mNextDelay - sinceFail;
}
bool IsExpired(TimeStamp rightNow)
{
return (mLastFailure +
TimeDuration::FromMilliseconds(kWSReconnectBaseLifeTime + mNextDelay))
<= rightNow;
}
nsCString mAddress; // IP address (or hostname if using proxy)
PRInt32 mPort;
private:
TimeStamp mLastFailure; // Time of last failed attempt
// mLastFailure + mNextDelay is the soonest we'll allow a reconnect
PRUint32 mNextDelay; // milliseconds
};
class FailDelayManager
{
public:
FailDelayManager()
{
mDelaysDisabled = false;
nsCOMPtr<nsIPrefBranch> prefService =
do_GetService(NS_PREFSERVICE_CONTRACTID);
bool boolpref = true;
nsresult rv;
rv = prefService->GetBoolPref("network.websocket.delay-failed-reconnects",
&boolpref);
if (NS_SUCCEEDED(rv) && !boolpref) {
mDelaysDisabled = true;
}
}
void Add(nsCString &address, PRInt32 port)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
if (mDelaysDisabled)
return;
FailDelay *record = new FailDelay(address, port);
mEntries.AppendElement(record);
}
// Element returned may not be valid after next main thread event: don't keep
// pointer to it around
FailDelay* Lookup(nsCString &address, PRInt32 port,
PRUint32 *outIndex = nsnull)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
if (mDelaysDisabled)
return nsnull;
for (PRUint32 i = 0; i < mEntries.Length(); i++) {
FailDelay *fail = mEntries[i];
if (fail->mAddress.Equals(address) && fail->mPort == port) {
if (outIndex)
*outIndex = i;
return fail;
}
}
return nsnull;
}
// returns true if channel connects immediately, or false if it's delayed
bool DelayOrBegin(WebSocketChannel *ws)
{
if (!mDelaysDisabled) {
PRUint32 failIndex = 0;
FailDelay *fail = Lookup(ws->mAddress, ws->mPort, &failIndex);
if (fail) {
TimeStamp rightNow = TimeStamp::Now();
PRUint32 remainingDelay = fail->RemainingDelay(rightNow);
if (remainingDelay) {
// reconnecting within delay interval: delay by remaining time
nsresult rv;
ws->mReconnectDelayTimer =
do_CreateInstance("@mozilla.org/timer;1", &rv);
if (NS_SUCCEEDED(rv)) {
rv = ws->mReconnectDelayTimer->InitWithCallback(
ws, remainingDelay, nsITimer::TYPE_ONE_SHOT);
if (NS_SUCCEEDED(rv)) {
LOG(("WebSocket: delaying websocket [this=%p] by %lu ms",
ws, (unsigned long)remainingDelay));
ws->mConnecting = CONNECTING_DELAYED;
return false;
}
}
// if timer fails (which is very unlikely), drop down to BeginOpen call
} else if (fail->IsExpired(rightNow)) {
mEntries.RemoveElementAt(failIndex);
delete fail;
}
}
}
// Delays disabled, or no previous failure, or we're reconnecting after scheduled
// delay interval has passed: connect.
//
ws->mConnecting = CONNECTING_IN_PROGRESS;
// If BeginOpen fails, it calls AbortSession, which calls OnStopSession,
// which will ensure any remaining queued connection(s) are scheduled
return ws->BeginOpen();
}
// Remove() also deletes all expired entries as it iterates: better for
// battery life than using a periodic timer.
void Remove(nsCString &address, PRInt32 port)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
TimeStamp rightNow = TimeStamp::Now();
// iterate from end, to make deletion indexing easier
for (PRInt32 i = mEntries.Length() - 1; i >= 0; --i) {
FailDelay *entry = mEntries[i];
if ((entry->mAddress.Equals(address) && entry->mPort == port) ||
entry->IsExpired(rightNow)) {
mEntries.RemoveElementAt(i);
delete entry;
}
}
}
private:
nsTArray<FailDelay *> mEntries;
bool mDelaysDisabled;
};
//-----------------------------------------------------------------------------
// nsWSAdmissionManager
//
// 1) Ensures that only one websocket at a time is CONNECTING to a given IP
// address (or hostname, if using proxy), per RFC 6455 Section 4.1.
// 2) Delays reconnects to IP/host after connection failure, per Section 7.2.3
//-----------------------------------------------------------------------------
class nsWSAdmissionManager
{
public:
nsWSAdmissionManager() : mSessionCount(0)
{
MOZ_COUNT_CTOR(nsWSAdmissionManager);
}
class nsOpenConn
{
public:
nsOpenConn(nsCString &addr, WebSocketChannel *channel)
: mAddress(addr), mChannel(channel) { MOZ_COUNT_CTOR(nsOpenConn); }
~nsOpenConn() { MOZ_COUNT_DTOR(nsOpenConn); }
nsCString mAddress;
WebSocketChannel *mChannel;
};
~nsWSAdmissionManager()
{
MOZ_COUNT_DTOR(nsWSAdmissionManager);
for (PRUint32 i = 0; i < mQueue.Length(); i++)
delete mQueue[i];
}
// Determine if we will open connection immediately (returns true), or
// delay/queue the connection (returns false)
bool ConditionallyConnect(WebSocketChannel *ws)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
NS_ABORT_IF_FALSE(ws->mConnecting == NOT_CONNECTING, "opening state");
// If there is already another WS channel connecting to this IP address,
// defer BeginOpen and mark as waiting in queue.
bool found = (IndexOf(ws->mAddress) >= 0);
// Always add ourselves to queue, even if we'll connect immediately
nsOpenConn *newdata = new nsOpenConn(ws->mAddress, ws);
mQueue.AppendElement(newdata);
if (found) {
ws->mConnecting = CONNECTING_QUEUED;
return false;
}
return mFailures.DelayOrBegin(ws);
}
bool OnConnected(WebSocketChannel *aChannel)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
NS_ABORT_IF_FALSE(aChannel->mConnecting == CONNECTING_IN_PROGRESS,
"Channel completed connect, but not connecting?");
aChannel->mConnecting = NOT_CONNECTING;
// Remove from queue
RemoveFromQueue(aChannel);
// Connection succeeded, so stop keeping track of any previous failures
mFailures.Remove(aChannel->mAddress, aChannel->mPort);
// Check for queued connections to same host.
// Note: still need to check for failures, since next websocket with same
// host may have different port
return ConnectNext(aChannel->mAddress);
}
// Called every time a websocket channel ends its session (including going away
// w/o ever successfully creating a connection)
bool OnStopSession(WebSocketChannel *aChannel, nsresult aReason)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
if (NS_FAILED(aReason)) {
// Have we seen this failure before?
FailDelay *knownFailure = mFailures.Lookup(aChannel->mAddress,
aChannel->mPort);
if (knownFailure) {
// repeated failure to connect: increase delay for next connection
knownFailure->FailedAgain();
} else {
// new connection failure: record it.
LOG(("WebSocket: connection to %s, %d failed: [this=%p]",
aChannel->mAddress.get(), (int)aChannel->mPort, aChannel));
mFailures.Add(aChannel->mAddress, aChannel->mPort);
}
}
if (aChannel->mConnecting) {
// Only way a connecting channel may get here w/o failing is if it was
// closed with GOING_AWAY (1001) because of navigation, tab close, etc.
NS_ABORT_IF_FALSE(NS_FAILED(aReason) ||
aChannel->mScriptCloseCode == CLOSE_GOING_AWAY,
"websocket closed while connecting w/o failing?");
RemoveFromQueue(aChannel);
bool wasNotQueued = (aChannel->mConnecting != CONNECTING_QUEUED);
aChannel->mConnecting = NOT_CONNECTING;
if (wasNotQueued)
return ConnectNext(aChannel->mAddress);
}
return false;
}
bool ConnectNext(nsCString &hostName)
{
PRInt32 index = IndexOf(hostName);
if (index >= 0) {
WebSocketChannel *chan = mQueue[index]->mChannel;
NS_ABORT_IF_FALSE(chan->mConnecting == CONNECTING_QUEUED,
"transaction not queued but in queue");
LOG(("WebSocket: ConnectNext: found channel [this=%p] in queue", chan));
return mFailures.DelayOrBegin(chan);
}
return false;
}
void IncrementSessionCount()
{
PR_ATOMIC_INCREMENT(&mSessionCount);
}
void DecrementSessionCount()
{
PR_ATOMIC_DECREMENT(&mSessionCount);
}
PRInt32 SessionCount()
{
return mSessionCount;
}
private:
void RemoveFromQueue(WebSocketChannel *aChannel)
{
PRInt32 index = IndexOf(aChannel);
NS_ABORT_IF_FALSE(index >= 0, "connection to remove not in queue");
if (index >= 0) {
nsOpenConn *olddata = mQueue[index];
mQueue.RemoveElementAt(index);
delete olddata;
}
}
PRInt32 IndexOf(nsCString &aStr)
{
for (PRUint32 i = 0; i < mQueue.Length(); i++)
if (aStr == (mQueue[i])->mAddress)
return i;
return -1;
}
PRInt32 IndexOf(WebSocketChannel *aChannel)
{
for (PRUint32 i = 0; i < mQueue.Length(); i++)
if (aChannel == (mQueue[i])->mChannel)
return i;
return -1;
}
// SessionCount might be decremented from the main or the socket
// thread, so manage it with atomic counters
PRInt32 mSessionCount;
// Queue for websockets that have not completed connecting yet.
// The first nsOpenConn with a given address will be either be
// CONNECTING_IN_PROGRESS or CONNECTING_DELAYED. Later ones with the same
// hostname must be CONNECTING_QUEUED.
//
// We could hash hostnames instead of using a single big vector here, but the
// dataset is expected to be small.
nsTArray<nsOpenConn *> mQueue;
FailDelayManager mFailures;
};
static nsWSAdmissionManager *sWebSocketAdmissions = nsnull;
//-----------------------------------------------------------------------------
// CallOnMessageAvailable
//-----------------------------------------------------------------------------
@ -507,18 +121,14 @@ public:
NS_DECL_ISUPPORTS
CallOnStop(WebSocketChannel *aChannel,
nsresult aReason)
nsresult aData)
: mChannel(aChannel),
mReason(aReason) {}
mData(aData) {}
NS_IMETHOD Run()
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
sWebSocketAdmissions->OnStopSession(mChannel, mReason);
if (mChannel->mListener)
mChannel->mListener->OnStop(mChannel->mContext, mReason);
mChannel->mListener->OnStop(mChannel->mContext, mData);
return NS_OK;
}
@ -526,7 +136,7 @@ private:
~CallOnStop() {}
nsRefPtr<WebSocketChannel> mChannel;
nsresult mReason;
nsresult mData;
};
NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnStop, nsIRunnable)
@ -760,6 +370,176 @@ private:
};
NS_IMPL_THREADSAFE_ISUPPORTS1(OutboundEnqueuer, nsIRunnable)
//-----------------------------------------------------------------------------
// nsWSAdmissionManager
//-----------------------------------------------------------------------------
// Section 4.1 requires that only a single websocket at a time can be CONNECTING
// to any given IP address (or hostname, if proxy doing DNS for us). This class
// ensures that we delay connecting until any pending connection for the same
// IP/addr is complete (i.e. until before the 101 upgrade complete response
// comes back and an 'open' javascript event is created)
class nsWSAdmissionManager
{
public:
nsWSAdmissionManager() : mSessionCount(0)
{
MOZ_COUNT_CTOR(nsWSAdmissionManager);
}
class nsOpenConn
{
public:
nsOpenConn(nsCString &addr, WebSocketChannel *channel)
: mAddress(addr), mChannel(channel) { MOZ_COUNT_CTOR(nsOpenConn); }
~nsOpenConn() { MOZ_COUNT_DTOR(nsOpenConn); }
nsCString mAddress;
WebSocketChannel *mChannel;
};
~nsWSAdmissionManager()
{
MOZ_COUNT_DTOR(nsWSAdmissionManager);
for (PRUint32 i = 0; i < mData.Length(); i++)
delete mData[i];
}
bool ConditionallyConnect(nsCString &aStr, WebSocketChannel *ws)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
// if aStr is not in mData then we return true, else false.
// in either case aStr is then added to mData - meaning
// there will be duplicates when this function has been
// called with the same parameter multiple times.
// we could hash this, but the dataset is expected to be
// small
// There may already be another WS channel connecting to this IP address, in
// which case we'll still create a new nsOpenConn but defer BeginOpen until
// that channel completes connecting.
bool found = (IndexOf(aStr) >= 0);
nsOpenConn *newdata = new nsOpenConn(aStr, ws);
mData.AppendElement(newdata);
NS_ABORT_IF_FALSE (!ws->mOpenRunning && !ws->mOpenBlocked,
"opening state");
if (!found) {
ws->mOpenRunning = 1;
ws->BeginOpen();
} else {
ws->mOpenBlocked = 1;
}
return !found;
}
bool Complete(WebSocketChannel *aChannel)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
NS_ABORT_IF_FALSE(!aChannel->mOpenBlocked,
"blocked, but complete nsOpenConn");
// It is possible this has already been canceled
if (!aChannel->mOpenRunning)
return false;
PRInt32 index = IndexOf(aChannel);
NS_ABORT_IF_FALSE(index >= 0, "completed connection not in open list");
aChannel->mOpenRunning = 0;
nsOpenConn *olddata = mData[index];
mData.RemoveElementAt(index);
delete olddata;
// are there more of the same address pending dispatch?
return ConnectNext(aChannel->mAddress);
}
bool Cancel(WebSocketChannel *aChannel)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
PRInt32 index = IndexOf(aChannel);
NS_ABORT_IF_FALSE(index >= 0, "Cancelled connection not in open list");
NS_ABORT_IF_FALSE(aChannel->mOpenRunning ^ aChannel->mOpenBlocked,
"cancel without running xor blocked");
bool wasRunning = aChannel->mOpenRunning;
aChannel->mOpenRunning = 0;
aChannel->mOpenBlocked = 0;
nsOpenConn *olddata = mData[index];
mData.RemoveElementAt(index);
delete olddata;
// if we are running we can run another one
if (wasRunning)
return ConnectNext(aChannel->mAddress);
return false;
}
bool ConnectNext(nsCString &hostName)
{
PRInt32 index = IndexOf(hostName);
if (index >= 0) {
WebSocketChannel *chan = mData[index]->mChannel;
NS_ABORT_IF_FALSE(chan->mOpenBlocked,
"transaction not blocked but in queue");
NS_ABORT_IF_FALSE(!chan->mOpenRunning, "transaction already running");
chan->mOpenBlocked = 0;
chan->mOpenRunning = 1;
chan->BeginOpen();
return true;
}
return false;
}
void IncrementSessionCount()
{
PR_ATOMIC_INCREMENT(&mSessionCount);
}
void DecrementSessionCount()
{
PR_ATOMIC_DECREMENT(&mSessionCount);
}
PRInt32 SessionCount()
{
return mSessionCount;
}
private:
nsTArray<nsOpenConn *> mData;
PRInt32 IndexOf(nsCString &aStr)
{
for (PRUint32 i = 0; i < mData.Length(); i++)
if (aStr == (mData[i])->mAddress)
return i;
return -1;
}
PRInt32 IndexOf(WebSocketChannel *aChannel)
{
for (PRUint32 i = 0; i < mData.Length(); i++)
if (aChannel == (mData[i])->mChannel)
return i;
return -1;
}
// SessionCount might be decremented from the main or the socket
// thread, so manage it with atomic counters
PRInt32 mSessionCount;
};
//-----------------------------------------------------------------------------
// nsWSCompression
//
@ -887,15 +667,15 @@ private:
PRUint8 mBuffer[kBufferLen];
};
static nsWSAdmissionManager *sWebSocketAdmissions = nsnull;
//-----------------------------------------------------------------------------
// WebSocketChannel
//-----------------------------------------------------------------------------
WebSocketChannel::WebSocketChannel() :
mPort(0),
mCloseTimeout(20000),
mOpenTimeout(20000),
mConnecting(NOT_CONNECTING),
mPingTimeout(0),
mPingResponseTimeout(10000),
mMaxConcurrentConnections(200),
@ -911,6 +691,8 @@ WebSocketChannel::WebSocketChannel() :
mAutoFollowRedirects(0),
mReleaseOnTransmit(0),
mTCPClosed(0),
mOpenBlocked(0),
mOpenRunning(0),
mChannelWasOpened(0),
mDataStarted(0),
mIncrementedSessionCount(0),
@ -945,7 +727,7 @@ WebSocketChannel::~WebSocketChannel()
// this stop is a nop if the normal connect/close is followed
StopSession(NS_ERROR_UNEXPECTED);
NS_ABORT_IF_FALSE(mConnecting == NOT_CONNECTING, "op");
NS_ABORT_IF_FALSE(!mOpenRunning && !mOpenBlocked, "op");
moz_free(mBuffer);
moz_free(mDynamicOutput);
@ -999,7 +781,7 @@ WebSocketChannel::Shutdown()
sWebSocketAdmissions = nsnull;
}
bool
nsresult
WebSocketChannel::BeginOpen()
{
LOG(("WebSocketChannel::BeginOpen() %p\n", this));
@ -1011,40 +793,29 @@ WebSocketChannel::BeginOpen()
LOG(("WebSocketChannel::BeginOpen: Resuming Redirect\n"));
rv = mRedirectCallback->OnRedirectVerifyCallback(NS_OK);
mRedirectCallback = nsnull;
return false;
return rv;
}
nsCOMPtr<nsIChannel> localChannel = do_QueryInterface(mChannel, &rv);
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel::BeginOpen: cannot async open\n"));
AbortSession(NS_ERROR_UNEXPECTED);
return false;
return rv;
}
rv = localChannel->AsyncOpen(this, mHttpChannel);
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel::BeginOpen: cannot async open\n"));
AbortSession(NS_ERROR_CONNECTION_REFUSED);
return false;
return rv;
}
mChannelWasOpened = 1;
mOpenTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel::BeginOpen: cannot create open timer\n"));
AbortSession(NS_ERROR_UNEXPECTED);
return false;
}
if (NS_SUCCEEDED(rv))
mOpenTimer->InitWithCallback(this, mOpenTimeout, nsITimer::TYPE_ONE_SHOT);
rv = mOpenTimer->InitWithCallback(this, mOpenTimeout,
nsITimer::TYPE_ONE_SHOT);
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel::BeginOpen: cannot initialize open timer\n"));
AbortSession(NS_ERROR_UNEXPECTED);
return false;
}
return true;
return rv;
}
bool
@ -1807,6 +1578,9 @@ WebSocketChannel::StopSession(nsresult reason)
mCallbacks = nsnull;
}
if (mOpenRunning || mOpenBlocked)
sWebSocketAdmissions->Cancel(this);
if (mCloseTimer) {
mCloseTimer->Cancel();
mCloseTimer = nsnull;
@ -1817,11 +1591,6 @@ WebSocketChannel::StopSession(nsresult reason)
mOpenTimer = nsnull;
}
if (mReconnectDelayTimer) {
mReconnectDelayTimer->Cancel();
mReconnectDelayTimer = nsnull;
}
if (mPingTimer) {
mPingTimer->Cancel();
mPingTimer = nsnull;
@ -1888,7 +1657,8 @@ WebSocketChannel::StopSession(nsresult reason)
if (!mCalledOnStop) {
mCalledOnStop = 1;
NS_DispatchToMainThread(new CallOnStop(this, reason));
if (mListener)
NS_DispatchToMainThread(new CallOnStop(this, reason));
}
return;
@ -2123,10 +1893,6 @@ WebSocketChannel::ApplyForAdmission()
rv = mURI->GetHost(hostName);
NS_ENSURE_SUCCESS(rv, rv);
mAddress = hostName;
rv = mURI->GetPort(&mPort);
NS_ENSURE_SUCCESS(rv, rv);
if (mPort == -1)
mPort = (mEncrypted ? kDefaultWSSPort : kDefaultWSPort);
// expect the callback in ::OnLookupComplete
LOG(("WebSocketChannel::ApplyForAdmission: checking for concurrent open\n"));
@ -2147,12 +1913,7 @@ WebSocketChannel::StartWebsocketData()
LOG(("WebSocketChannel::StartWebsocketData() %p", this));
NS_ABORT_IF_FALSE(!mDataStarted, "StartWebsocketData twice");
mDataStarted = 1;
// We're now done CONNECTING, which means we can now open another,
// perhaps parallel, connection to the same host if one
// is pending
sWebSocketAdmissions->OnConnected(this);
LOG(("WebSocketChannel::StartWebsocketData Notifying Listener %p\n",
mListener.get()));
@ -2192,7 +1953,7 @@ WebSocketChannel::OnLookupComplete(nsICancelable *aRequest,
LOG(("WebSocketChannel::OnLookupComplete: Failed GetNextAddr\n"));
}
if (sWebSocketAdmissions->ConditionallyConnect(this)) {
if (sWebSocketAdmissions->ConditionallyConnect(mAddress, this)) {
LOG(("WebSocketChannel::OnLookupComplete: Proceeding with Open\n"));
} else {
LOG(("WebSocketChannel::OnLookupComplete: Deferring Open\n"));
@ -2316,19 +2077,17 @@ WebSocketChannel::AsyncOnChannelRedirect(
return rv;
}
// Redirected-to URI may need to be delayed by 1-connecting-per-host and
// delay-after-fail algorithms. So hold off calling OnRedirectVerifyCallback
// until BeginOpen, when we know it's OK to proceed with new channel.
// We cannot just tell the callback OK right now due to the 1 connect at a
// time policy. First we need to complete the old location and then start the
// lookup chain for the new location - once that is complete and we have been
// admitted, OnRedirectVerifyCallback(NS_OK) will be called out of BeginOpen()
sWebSocketAdmissions->Complete(this);
mAddress.Truncate();
mRedirectCallback = callback;
// Mark old channel as successfully connected so we'll clear any FailDelay
// associated with the old URI. Note: no need to also call OnStopSession:
// it's a no-op for successful, already-connected channels.
sWebSocketAdmissions->OnConnected(this);
// ApplyForAdmission as if we were starting from fresh...
mAddress.Truncate();
mChannelWasOpened = 0;
rv = ApplyForAdmission();
if (NS_FAILED(rv)) {
LOG(("WebSocketChannel: Redirect failed due to DNS failure\n"));
@ -2368,16 +2127,6 @@ WebSocketChannel::Notify(nsITimer *timer)
return NS_OK;
AbortSession(NS_ERROR_NET_TIMEOUT);
} else if (timer == mReconnectDelayTimer) {
NS_ABORT_IF_FALSE(mConnecting == CONNECTING_DELAYED,
"woke up from delay w/o being delayed?");
mReconnectDelayTimer = nsnull;
LOG(("WebSocketChannel: connecting [this=%p] after reconnect delay", this));
// - if BeginOpen fails, it calls AbortSession, which calls OnStopSession,
// which will ensure any remaining queued connection(s) are scheduled
mConnecting = CONNECTING_IN_PROGRESS;
this->BeginOpen();
} else if (timer == mPingTimer) {
NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread,
"not socket thread");
@ -2602,6 +2351,12 @@ WebSocketChannel::Close(PRUint16 code, const nsACString & reason)
return NS_OK;
}
if (!mTransport) {
LOG(("WebSocketChannel::Close() without transport - aborting."));
AbortSession(NS_ERROR_NOT_CONNECTED);
return NS_ERROR_NOT_CONNECTED;
}
// The API requires the UTF-8 string to be 123 or less bytes
if (reason.Length() > 123)
return NS_ERROR_ILLEGAL_VALUE;
@ -2610,20 +2365,6 @@ WebSocketChannel::Close(PRUint16 code, const nsACString & reason)
mScriptCloseReason = reason;
mScriptCloseCode = code;
if (!mTransport) {
nsresult rv;
if (code == CLOSE_GOING_AWAY) {
// Not an error: for example, tab has closed or navigated away
LOG(("WebSocketChannel::Close() GOING_AWAY without transport."));
rv = NS_OK;
} else {
LOG(("WebSocketChannel::Close() without transport - error."));
rv = NS_ERROR_NOT_CONNECTED;
}
StopSession(rv);
return rv;
}
return mSocketThread->Dispatch(
new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)),
nsIEventTarget::DISPATCH_NORMAL);
@ -2729,6 +2470,16 @@ WebSocketChannel::OnStartRequest(nsIRequest *aRequest,
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
NS_ABORT_IF_FALSE(!mRecvdHttpOnStartRequest, "OTA duplicated");
// Generating the onStart event will take us out of the
// CONNECTING state which means we can now open another,
// perhaps parallel, connection to the same host if one
// is pending
if (sWebSocketAdmissions->Complete(this))
LOG(("WebSocketChannel::OnStartRequest: Starting Pending Open\n"));
else
LOG(("WebSocketChannel::OnStartRequest: No More Pending Opens\n"));
if (mOpenTimer) {
mOpenTimer->Cancel();
mOpenTimer = nsnull;

View File

@ -42,14 +42,6 @@ class CallOnStop;
class CallOnServerClose;
class CallAcknowledge;
// Used to enforce "1 connecting websocket per host" rule, and reconnect delays
enum wsConnectingState {
NOT_CONNECTING = 0, // Not yet (or no longer) trying to open connection
CONNECTING_QUEUED, // Waiting for other ws to same host to finish opening
CONNECTING_DELAYED, // Delayed by "reconnect after failure" algorithm
CONNECTING_IN_PROGRESS // Started connection: waiting for result
};
class WebSocketChannel : public BaseWebSocketChannel,
public nsIHttpUpgradeListener,
public nsIStreamListener,
@ -109,7 +101,6 @@ protected:
private:
friend class OutboundEnqueuer;
friend class nsWSAdmissionManager;
friend class FailDelayManager;
friend class CallOnMessageAvailable;
friend class CallOnStop;
friend class CallOnServerClose;
@ -126,7 +117,7 @@ private:
void GeneratePong(PRUint8 *payload, PRUint32 len);
void GeneratePing();
bool BeginOpen();
nsresult BeginOpen();
nsresult HandleExtensions();
nsresult SetupRequest();
nsresult ApplyForAdmission();
@ -158,11 +149,7 @@ private:
nsCOMPtr<nsIRandomGenerator> mRandomGenerator;
nsCString mHashedSecret;
// Used as key for connection managment: Initially set to hostname from URI,
// then to IP address (unless we're leaving DNS resolution to a proxy server)
nsCString mAddress;
PRInt32 mPort; // WS server port
nsCOMPtr<nsISocketTransport> mTransport;
nsCOMPtr<nsIAsyncInputStream> mSocketIn;
@ -173,8 +160,6 @@ private:
nsCOMPtr<nsITimer> mOpenTimer;
PRUint32 mOpenTimeout; /* milliseconds */
wsConnectingState mConnecting; /* 0 if not connecting */
nsCOMPtr<nsITimer> mReconnectDelayTimer;
nsCOMPtr<nsITimer> mPingTimer;
PRUint32 mPingTimeout; /* milliseconds */
@ -198,6 +183,8 @@ private:
PRUint32 mAutoFollowRedirects : 1;
PRUint32 mReleaseOnTransmit : 1;
PRUint32 mTCPClosed : 1;
PRUint32 mOpenBlocked : 1;
PRUint32 mOpenRunning : 1;
PRUint32 mChannelWasOpened : 1;
PRUint32 mDataStarted : 1;
PRUint32 mIncrementedSessionCount : 1;