Bug 977672: Cleanup runnables and tasks in UnixSocket.cpp, r=kyle

This patch cleans up runnables and tasks in UnixSocket.cpp. Every
runnable's name now ends in Runnable. There are a base classes for
runnables and tasks that hold reference to UnixSocketImpl and from
which all runnables and tasks inherit.
This commit is contained in:
Thomas Zimmermann 2014-02-28 10:16:53 +01:00
parent a3e0c85af9
commit aeb9f13274

View File

@ -210,7 +210,26 @@ private:
T* mInstance; T* mInstance;
}; };
class OnSocketEventTask : public nsRunnable class UnixSocketImplRunnable : public nsRunnable
{
public:
UnixSocketImpl* GetImpl() const
{
return mImpl;
}
protected:
UnixSocketImplRunnable(UnixSocketImpl* aImpl)
: mImpl(aImpl)
{
MOZ_ASSERT(aImpl);
}
virtual ~UnixSocketImplRunnable()
{ }
private:
UnixSocketImpl* mImpl;
};
class OnSocketEventRunnable : public UnixSocketImplRunnable
{ {
public: public:
enum SocketEvent { enum SocketEvent {
@ -219,109 +238,82 @@ public:
DISCONNECT DISCONNECT
}; };
OnSocketEventTask(UnixSocketImpl* aImpl, SocketEvent e) : OnSocketEventRunnable(UnixSocketImpl* aImpl, SocketEvent e)
mImpl(aImpl), : UnixSocketImplRunnable(aImpl)
mEvent(e) , mEvent(e)
{ {
MOZ_ASSERT(aImpl);
MOZ_ASSERT(!NS_IsMainThread()); MOZ_ASSERT(!NS_IsMainThread());
} }
NS_IMETHOD Run() NS_IMETHOD Run() MOZ_OVERRIDE
{ {
MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(NS_IsMainThread());
if (mImpl->IsShutdownOnMainThread()) {
UnixSocketImpl* impl = GetImpl();
if (impl->IsShutdownOnMainThread()) {
NS_WARNING("CloseSocket has already been called!"); NS_WARNING("CloseSocket has already been called!");
// Since we've already explicitly closed and the close happened before // Since we've already explicitly closed and the close happened before
// this, this isn't really an error. Since we've warned, return OK. // this, this isn't really an error. Since we've warned, return OK.
return NS_OK; return NS_OK;
} }
if (mEvent == CONNECT_SUCCESS) { if (mEvent == CONNECT_SUCCESS) {
mImpl->mConsumer->NotifySuccess(); impl->mConsumer->NotifySuccess();
} else if (mEvent == CONNECT_ERROR) { } else if (mEvent == CONNECT_ERROR) {
mImpl->mConsumer->NotifyError(); impl->mConsumer->NotifyError();
} else if (mEvent == DISCONNECT) { } else if (mEvent == DISCONNECT) {
mImpl->mConsumer->NotifyDisconnect(); impl->mConsumer->NotifyDisconnect();
} }
return NS_OK; return NS_OK;
} }
private: private:
UnixSocketImpl* mImpl;
SocketEvent mEvent; SocketEvent mEvent;
}; };
class SocketReceiveTask : public nsRunnable class SocketReceiveRunnable : public UnixSocketImplRunnable
{ {
public: public:
SocketReceiveTask(UnixSocketImpl* aImpl, UnixSocketRawData* aData) : SocketReceiveRunnable(UnixSocketImpl* aImpl, UnixSocketRawData* aData)
mImpl(aImpl), : UnixSocketImplRunnable(aImpl)
mRawData(aData) , mRawData(aData)
{ {
MOZ_ASSERT(aImpl);
MOZ_ASSERT(aData); MOZ_ASSERT(aData);
} }
NS_IMETHOD Run() NS_IMETHOD Run() MOZ_OVERRIDE
{ {
MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(NS_IsMainThread());
if (mImpl->IsShutdownOnMainThread()) {
UnixSocketImpl* impl = GetImpl();
if (impl->IsShutdownOnMainThread()) {
NS_WARNING("mConsumer is null, aborting receive!"); NS_WARNING("mConsumer is null, aborting receive!");
// Since we've already explicitly closed and the close happened before // Since we've already explicitly closed and the close happened before
// this, this isn't really an error. Since we've warned, return OK. // this, this isn't really an error. Since we've warned, return OK.
return NS_OK; return NS_OK;
} }
MOZ_ASSERT(mImpl->mConsumer); MOZ_ASSERT(impl->mConsumer);
mImpl->mConsumer->ReceiveSocketData(mRawData); impl->mConsumer->ReceiveSocketData(mRawData);
return NS_OK; return NS_OK;
} }
private: private:
UnixSocketImpl* mImpl;
nsAutoPtr<UnixSocketRawData> mRawData; nsAutoPtr<UnixSocketRawData> mRawData;
}; };
class SocketSendTask : public Task class RequestClosingSocketRunnable : public UnixSocketImplRunnable
{ {
public: public:
SocketSendTask(UnixSocketConsumer* aConsumer, UnixSocketImpl* aImpl, RequestClosingSocketRunnable(UnixSocketImpl* aImpl)
UnixSocketRawData* aData) : UnixSocketImplRunnable(aImpl)
: mConsumer(aConsumer), { }
mImpl(aImpl),
mData(aData)
{
MOZ_ASSERT(aConsumer);
MOZ_ASSERT(aImpl);
MOZ_ASSERT(aData);
}
void NS_IMETHOD Run() MOZ_OVERRIDE
Run()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!mImpl->IsShutdownOnIOThread());
mImpl->QueueWriteData(mData);
}
private:
nsRefPtr<UnixSocketConsumer> mConsumer;
UnixSocketImpl* mImpl;
UnixSocketRawData* mData;
};
class RequestClosingSocketTask : public nsRunnable
{
public:
RequestClosingSocketTask(UnixSocketImpl* aImpl) : mImpl(aImpl)
{
MOZ_ASSERT(aImpl);
}
NS_IMETHOD Run()
{ {
MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(NS_IsMainThread());
if (mImpl->IsShutdownOnMainThread()) { UnixSocketImpl* impl = GetImpl();
if (impl->IsShutdownOnMainThread()) {
NS_WARNING("CloseSocket has already been called!"); NS_WARNING("CloseSocket has already been called!");
// Since we've already explicitly closed and the close happened before // Since we've already explicitly closed and the close happened before
// this, this isn't really an error. Since we've warned, return OK. // this, this isn't really an error. Since we've warned, return OK.
@ -330,100 +322,144 @@ public:
// Start from here, same handling flow as calling CloseSocket() from // Start from here, same handling flow as calling CloseSocket() from
// upper layer // upper layer
mImpl->mConsumer->CloseSocket(); impl->mConsumer->CloseSocket();
return NS_OK; return NS_OK;
} }
};
class UnixSocketImplTask : public CancelableTask
{
public:
UnixSocketImpl* GetImpl() const
{
return mImpl;
}
void Cancel() MOZ_OVERRIDE
{
mImpl = nullptr;
}
bool IsCanceled() const
{
return !mImpl;
}
protected:
UnixSocketImplTask(UnixSocketImpl* aImpl)
: mImpl(aImpl)
{
MOZ_ASSERT(mImpl);
}
private: private:
UnixSocketImpl* mImpl; UnixSocketImpl* mImpl;
}; };
class SocketListenTask : public CancelableTask class SocketSendTask : public UnixSocketImplTask
{ {
virtual void Run();
UnixSocketImpl* mImpl;
public: public:
SocketListenTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { } SocketSendTask(UnixSocketImpl* aImpl,
UnixSocketConsumer* aConsumer,
virtual void Cancel() UnixSocketRawData* aData)
: UnixSocketImplTask(aImpl)
, mConsumer(aConsumer)
, mData(aData)
{
MOZ_ASSERT(aConsumer);
MOZ_ASSERT(aData);
}
void Run() MOZ_OVERRIDE
{ {
MOZ_ASSERT(!NS_IsMainThread()); MOZ_ASSERT(!NS_IsMainThread());
mImpl = nullptr; MOZ_ASSERT(!IsCanceled());
UnixSocketImpl* impl = GetImpl();
MOZ_ASSERT(!impl->IsShutdownOnIOThread());
impl->QueueWriteData(mData);
}
private:
nsRefPtr<UnixSocketConsumer> mConsumer;
UnixSocketRawData* mData;
};
class SocketListenTask : public UnixSocketImplTask
{
public:
SocketListenTask(UnixSocketImpl* aImpl)
: UnixSocketImplTask(aImpl)
{ }
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(!NS_IsMainThread());
if (!IsCanceled()) {
GetImpl()->Listen();
}
} }
}; };
void SocketListenTask::Run() class SocketConnectTask : public UnixSocketImplTask
{ {
MOZ_ASSERT(!NS_IsMainThread());
if (mImpl) {
mImpl->Listen();
}
}
class SocketConnectTask : public Task {
virtual void Run();
UnixSocketImpl* mImpl;
public: public:
SocketConnectTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { } SocketConnectTask(UnixSocketImpl* aImpl)
: UnixSocketImplTask(aImpl)
{ }
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsCanceled());
GetImpl()->Connect();
}
}; };
void SocketConnectTask::Run() class SocketDelayedConnectTask : public UnixSocketImplTask
{ {
MOZ_ASSERT(!NS_IsMainThread());
mImpl->Connect();
}
class SocketDelayedConnectTask : public CancelableTask {
virtual void Run();
UnixSocketImpl* mImpl;
public: public:
SocketDelayedConnectTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { } SocketDelayedConnectTask(UnixSocketImpl* aImpl)
: UnixSocketImplTask(aImpl)
{ }
virtual void Cancel() void Run() MOZ_OVERRIDE
{ {
MOZ_ASSERT(NS_IsMainThread()); MOZ_ASSERT(NS_IsMainThread());
mImpl = nullptr; if (IsCanceled()) {
return;
}
UnixSocketImpl* impl = GetImpl();
if (impl->IsShutdownOnMainThread()) {
return;
}
impl->ClearDelayedConnectTask();
XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketConnectTask(impl));
} }
}; };
void SocketDelayedConnectTask::Run() class ShutdownSocketTask : public UnixSocketImplTask
{ {
MOZ_ASSERT(NS_IsMainThread());
if (!mImpl || mImpl->IsShutdownOnMainThread()) {
return;
}
mImpl->ClearDelayedConnectTask();
XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketConnectTask(mImpl));
}
class ShutdownSocketTask : public Task {
virtual void Run();
UnixSocketImpl* mImpl;
public: public:
ShutdownSocketTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { } ShutdownSocketTask(UnixSocketImpl* aImpl)
: UnixSocketImplTask(aImpl)
{ }
void Run() MOZ_OVERRIDE
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!IsCanceled());
UnixSocketImpl* impl = GetImpl();
// At this point, there should be no new events on the IO thread after this
// one with the possible exception of a SocketListenTask that
// ShutdownOnIOThread will cancel for us. We are now fully shut down, so we
// can send a message to the main thread that will delete impl safely knowing
// that no more tasks reference it.
impl->ShutdownOnIOThread();
nsRefPtr<nsIRunnable> r(new DeleteInstanceRunnable<UnixSocketImpl>(impl));
nsresult rv = NS_DispatchToMainThread(r);
NS_ENSURE_SUCCESS_VOID(rv);
}
}; };
void ShutdownSocketTask::Run()
{
MOZ_ASSERT(!NS_IsMainThread());
// At this point, there should be no new events on the IO thread after this
// one with the possible exception of a SocketListenTask that
// ShutdownOnIOThread will cancel for us. We are now fully shut down, so we
// can send a message to the main thread that will delete mImpl safely knowing
// that no more tasks reference it.
mImpl->ShutdownOnIOThread();
nsRefPtr<nsIRunnable> t(new DeleteInstanceRunnable<UnixSocketImpl>(mImpl));
nsresult rv = NS_DispatchToMainThread(t);
NS_ENSURE_SUCCESS_VOID(rv);
}
void void
UnixSocketImpl::FireSocketError() UnixSocketImpl::FireSocketError()
{ {
@ -433,9 +469,9 @@ UnixSocketImpl::FireSocketError()
Close(); Close();
// Tell the main thread we've errored // Tell the main thread we've errored
nsRefPtr<OnSocketEventTask> t = nsRefPtr<OnSocketEventRunnable> r =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR); new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_ERROR);
NS_DispatchToMainThread(t); NS_DispatchToMainThread(r);
} }
void void
@ -556,9 +592,9 @@ UnixSocketImpl::OnAccepted(int aFd)
} }
SetSocket(aFd, SOCKET_IS_CONNECTED); SetSocket(aFd, SOCKET_IS_CONNECTED);
nsRefPtr<OnSocketEventTask> t = nsRefPtr<OnSocketEventRunnable> r =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS); new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS);
NS_DispatchToMainThread(t); NS_DispatchToMainThread(r);
AddWatchers(READ_WATCHER, true); AddWatchers(READ_WATCHER, true);
if (!mOutgoingQ.IsEmpty()) { if (!mOutgoingQ.IsEmpty()) {
@ -584,9 +620,9 @@ UnixSocketImpl::OnConnected()
return; return;
} }
nsRefPtr<OnSocketEventTask> t = nsRefPtr<OnSocketEventRunnable> r =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS); new OnSocketEventRunnable(this, OnSocketEventRunnable::CONNECT_SUCCESS);
NS_DispatchToMainThread(t); NS_DispatchToMainThread(r);
AddWatchers(READ_WATCHER, true); AddWatchers(READ_WATCHER, true);
if (!mOutgoingQ.IsEmpty()) { if (!mOutgoingQ.IsEmpty()) {
@ -647,15 +683,16 @@ UnixSocketImpl::OnSocketCanReceiveWithoutBlocking()
// We're done with our descriptors. Ensure that spurious events don't // We're done with our descriptors. Ensure that spurious events don't
// cause us to end up back here. // cause us to end up back here.
RemoveWatchers(READ_WATCHER|WRITE_WATCHER); RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
nsRefPtr<RequestClosingSocketTask> t = new RequestClosingSocketTask(this); nsRefPtr<RequestClosingSocketRunnable> r =
NS_DispatchToMainThread(t); new RequestClosingSocketRunnable(this);
NS_DispatchToMainThread(r);
return; return;
} }
incoming->mSize = ret; incoming->mSize = ret;
nsRefPtr<SocketReceiveTask> t = nsRefPtr<SocketReceiveRunnable> r =
new SocketReceiveTask(this, incoming.forget()); new SocketReceiveRunnable(this, incoming.forget());
NS_DispatchToMainThread(t); NS_DispatchToMainThread(r);
// If ret is less than MAX_READ_SIZE, there's no // If ret is less than MAX_READ_SIZE, there's no
// more data in the socket for us to read now. // more data in the socket for us to read now.
@ -729,7 +766,7 @@ UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData)
MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); MOZ_ASSERT(!mImpl->IsShutdownOnMainThread());
XRE_GetIOMessageLoop()->PostTask(FROM_HERE, XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketSendTask(this, mImpl, aData)); new SocketSendTask(mImpl, this, aData));
return true; return true;
} }
@ -748,7 +785,7 @@ UnixSocketConsumer::SendSocketData(const nsACString& aStr)
UnixSocketRawData* d = new UnixSocketRawData(aStr.BeginReading(), UnixSocketRawData* d = new UnixSocketRawData(aStr.BeginReading(),
aStr.Length()); aStr.Length());
XRE_GetIOMessageLoop()->PostTask(FROM_HERE, XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketSendTask(this, mImpl, d)); new SocketSendTask(mImpl, this, d));
return true; return true;
} }