From 5866e150ca571056a557235c90af4680115f8d88 Mon Sep 17 00:00:00 2001 From: Thomas Zimmermann Date: Thu, 31 Jul 2014 13:55:29 +0200 Subject: [PATCH] Bug 1046109: Add |SocketIOBase|, r=kyle |SocketIOBase| is a base class for Socket I/O classes. It's not a requirement, but provides a number of helpful methods for common I/O operations on the I/O thread. --- ipc/unixsocket/SocketBase.cpp | 29 ++++++++ ipc/unixsocket/SocketBase.h | 122 ++++++++++++++++++++++++++++++++++ ipc/unixsocket/UnixSocket.cpp | 111 +++++-------------------------- 3 files changed, 168 insertions(+), 94 deletions(-) diff --git a/ipc/unixsocket/SocketBase.cpp b/ipc/unixsocket/SocketBase.cpp index f0509ad07ed..46d0de9955a 100644 --- a/ipc/unixsocket/SocketBase.cpp +++ b/ipc/unixsocket/SocketBase.cpp @@ -122,5 +122,34 @@ SocketConsumerBase::SetConnectionStatus( mConnectionStatus = aConnectionStatus; } +// +// SocketIOBase +// + +SocketIOBase::~SocketIOBase() +{ } + +void +SocketIOBase::EnqueueData(UnixSocketRawData* aData) +{ + if (!aData->mSize) { + delete aData; // delete empty data immediately + return; + } + mOutgoingQ.AppendElement(aData); +} + +bool +SocketIOBase::HasPendingData() const +{ + return !mOutgoingQ.IsEmpty(); +} + +SocketIOBase::SocketIOBase(size_t aMaxReadSize) + : mMaxReadSize(aMaxReadSize) +{ + MOZ_ASSERT(mMaxReadSize); +} + } } diff --git a/ipc/unixsocket/SocketBase.h b/ipc/unixsocket/SocketBase.h index 45a9ec6986a..dfa97204cfa 100644 --- a/ipc/unixsocket/SocketBase.h +++ b/ipc/unixsocket/SocketBase.h @@ -9,9 +9,18 @@ #ifndef mozilla_ipc_SocketBase_h #define mozilla_ipc_SocketBase_h +#include +#include +#include "base/message_loop.h" #include "nsAutoPtr.h" +#include "nsTArray.h" #include "nsThreadUtils.h" +#ifdef MOZ_TASK_TRACER +#include "GeckoTaskTracer.h" +using namespace mozilla::tasktracer; +#endif + namespace mozilla { namespace ipc { @@ -297,6 +306,119 @@ private: nsAutoPtr mInstance; }; +// +// SocketIOBase +// + +/* |SocketIOBase| is a base class for Socket I/O classes that + * perform operations on the I/O thread. It provides methds + * for the most common read and write scenarios. + */ +class SocketIOBase +{ +public: + virtual ~SocketIOBase(); + + void EnqueueData(UnixSocketRawData* aData); + bool HasPendingData() const; + + template + nsresult ReceiveData(int aFd, T* aIO) + { + MOZ_ASSERT(aFd >= 0); + MOZ_ASSERT(aIO); + + do { + nsAutoPtr incoming( + new UnixSocketRawData(mMaxReadSize)); + + ssize_t res = + TEMP_FAILURE_RETRY(read(aFd, incoming->mData, incoming->mSize)); + + if (res < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return NS_OK; /* no more data available */ + } + /* an error occored */ + nsRefPtr r = new SocketIORequestClosingRunnable(aIO); + NS_DispatchToMainThread(r); + return NS_ERROR_FAILURE; + } else if (!res) { + /* EOF or peer shut down sending */ + nsRefPtr r = new SocketIORequestClosingRunnable(aIO); + NS_DispatchToMainThread(r); + return NS_OK; + } + +#ifdef MOZ_TASK_TRACER + // Make unix socket creation events to be the source events of TaskTracer, + // and originate the rest correlation tasks from here. + AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET); +#endif + + incoming->mSize = res; + nsRefPtr r = + new SocketIOReceiveRunnable(aIO, incoming.forget()); + NS_DispatchToMainThread(r); + } while (true); + + return NS_OK; + } + + template + nsresult SendPendingData(int aFd, T* aIO) + { + MOZ_ASSERT(aFd >= 0); + MOZ_ASSERT(aIO); + + do { + if (!HasPendingData()) { + return NS_OK; + } + + UnixSocketRawData* outgoing = mOutgoingQ.ElementAt(0); + MOZ_ASSERT(outgoing->mSize); + + const uint8_t* data = outgoing->mData + outgoing->mCurrentWriteOffset; + size_t size = outgoing->mSize - outgoing->mCurrentWriteOffset; + + ssize_t res = TEMP_FAILURE_RETRY(write(aFd, data, size)); + + if (res < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return NS_OK; /* no more data available */ + } + /* an error occored */ + nsRefPtr r = new SocketIORequestClosingRunnable(aIO); + NS_DispatchToMainThread(r); + return NS_ERROR_FAILURE; + } else if (!res) { + return NS_OK; /* nothing written */ + } + + outgoing->mCurrentWriteOffset += res; + + if (outgoing->mCurrentWriteOffset == outgoing->mSize) { + mOutgoingQ.RemoveElementAt(0); + delete data; + } + } while (true); + + return NS_OK; + } + +protected: + SocketIOBase(size_t aMaxReadSize); + +private: + const size_t mMaxReadSize; + + /** + * Raw data queue. Must be pushed/popped from I/O thread only. + */ + nsTArray mOutgoingQ; +}; + } } diff --git a/ipc/unixsocket/UnixSocket.cpp b/ipc/unixsocket/UnixSocket.cpp index fef86336026..b1c99ccc9d7 100644 --- a/ipc/unixsocket/UnixSocket.cpp +++ b/ipc/unixsocket/UnixSocket.cpp @@ -9,11 +9,6 @@ #include "nsXULAppAPI.h" #include -#ifdef MOZ_TASK_TRACER -#include "GeckoTaskTracer.h" -using namespace mozilla::tasktracer; -#endif - static const size_t MAX_READ_SIZE = 1 << 16; namespace mozilla { @@ -24,12 +19,14 @@ namespace ipc { // class UnixSocketImpl : public UnixSocketWatcher + , protected SocketIOBase { public: UnixSocketImpl(MessageLoop* mIOLoop, UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector, const nsACString& aAddress) : UnixSocketWatcher(mIOLoop) + , SocketIOBase(MAX_READ_SIZE) , mConsumer(aConsumer) , mConnector(aConnector) , mShuttingDownOnIOThread(false) @@ -44,9 +41,9 @@ public: MOZ_ASSERT(IsShutdownOnMainThread()); } - void QueueWriteData(UnixSocketRawData* aData) + void Send(UnixSocketRawData* aData) { - mOutgoingQ.AppendElement(aData); + EnqueueData(aData); AddWatchers(WRITE_WATCHER, false); } @@ -145,12 +142,6 @@ private: void FireSocketError(); - /** - * Raw data queue. Must be pushed/popped from IO thread only. - */ - typedef nsTArray UnixSocketRawDataQueue; - UnixSocketRawDataQueue mOutgoingQ; - /** * Connector object used to create the connection we are currently using. */ @@ -228,7 +219,7 @@ public: UnixSocketImpl* impl = GetImpl(); MOZ_ASSERT(!impl->IsShutdownOnIOThread()); - impl->QueueWriteData(mData); + impl->Send(mData); } private: nsRefPtr mConsumer; @@ -463,7 +454,7 @@ UnixSocketImpl::OnAccepted(int aFd, NS_DispatchToMainThread(r); AddWatchers(READ_WATCHER, true); - if (!mOutgoingQ.IsEmpty()) { + if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } @@ -492,7 +483,7 @@ UnixSocketImpl::OnConnected() NS_DispatchToMainThread(r); AddWatchers(READ_WATCHER, true); - if (!mOutgoingQ.IsEmpty()) { + if (HasPendingData()) { AddWatchers(WRITE_WATCHER, false); } } @@ -527,51 +518,10 @@ UnixSocketImpl::OnSocketCanReceiveWithoutBlocking() MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 - // Read all of the incoming data. - while (true) { - nsAutoPtr incoming(new UnixSocketRawData(MAX_READ_SIZE)); - - ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize); - if (ret <= 0) { - if (ret == -1) { - if (errno == EINTR) { - continue; // retry system call when interrupted - } - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return; // no data available: return and re-poll - } - -#ifdef DEBUG - NS_WARNING("Cannot read from network"); -#endif - // else fall through to error handling on other errno's - } - - // We're done with our descriptors. Ensure that spurious events don't - // cause us to end up back here. - RemoveWatchers(READ_WATCHER|WRITE_WATCHER); - nsRefPtr r = - new SocketIORequestClosingRunnable(this); - NS_DispatchToMainThread(r); - return; - } - -#ifdef MOZ_TASK_TRACER - // Make unix socket creation events to be the source events of TaskTracer, - // and originate the rest correlation tasks from here. - AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET); -#endif - - incoming->mSize = ret; - nsRefPtr r = - new SocketIOReceiveRunnable(this, incoming.forget()); - NS_DispatchToMainThread(r); - - // If ret is less than MAX_READ_SIZE, there's no - // more data in the socket for us to read now. - if (ret < ssize_t(MAX_READ_SIZE)) { - return; - } + nsresult rv = ReceiveData(GetFd(), this); + if (NS_FAILED(rv)) { + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); + return; } } @@ -581,40 +531,13 @@ UnixSocketImpl::OnSocketCanSendWithoutBlocking() MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 - // Try to write the bytes of mCurrentRilRawData. If all were written, continue. - // - // Otherwise, save the byte position of the next byte to write - // within mCurrentWriteOffset, and request another write when the - // system won't block. - // - while (true) { - UnixSocketRawData* data; - if (mOutgoingQ.IsEmpty()) { - return; - } - data = mOutgoingQ.ElementAt(0); - const uint8_t *toWrite; - toWrite = data->mData; + nsresult rv = SendPendingData(GetFd(), this); + if (NS_FAILED(rv)) { + return; + } - while (data->mCurrentWriteOffset < data->mSize) { - ssize_t write_amount = data->mSize - data->mCurrentWriteOffset; - ssize_t written; - written = write (GetFd(), toWrite + data->mCurrentWriteOffset, - write_amount); - if (written > 0) { - data->mCurrentWriteOffset += written; - } - if (written != write_amount) { - break; - } - } - - if (data->mCurrentWriteOffset != data->mSize) { - AddWatchers(WRITE_WATCHER, false); - return; - } - mOutgoingQ.RemoveElementAt(0); - delete data; + if (HasPendingData()) { + AddWatchers(WRITE_WATCHER, false); } }