Bug 1046109: Add |SocketIOBase|, r=kyle

|SocketIOBase| is a base class for Socket I/O classes. It's not a
requirement, but provides a number of helpful methods for common
I/O operations on the I/O thread.
This commit is contained in:
Thomas Zimmermann 2014-07-31 13:55:29 +02:00
parent 63e43f2cc7
commit 5866e150ca
3 changed files with 168 additions and 94 deletions

View File

@ -122,5 +122,34 @@ SocketConsumerBase::SetConnectionStatus(
mConnectionStatus = aConnectionStatus;
}
//
// SocketIOBase
//
SocketIOBase::~SocketIOBase()
{ }
void
SocketIOBase::EnqueueData(UnixSocketRawData* aData)
{
if (!aData->mSize) {
delete aData; // delete empty data immediately
return;
}
mOutgoingQ.AppendElement(aData);
}
bool
SocketIOBase::HasPendingData() const
{
return !mOutgoingQ.IsEmpty();
}
SocketIOBase::SocketIOBase(size_t aMaxReadSize)
: mMaxReadSize(aMaxReadSize)
{
MOZ_ASSERT(mMaxReadSize);
}
}
}

View File

@ -9,9 +9,18 @@
#ifndef mozilla_ipc_SocketBase_h
#define mozilla_ipc_SocketBase_h
#include <errno.h>
#include <unistd.h>
#include "base/message_loop.h"
#include "nsAutoPtr.h"
#include "nsTArray.h"
#include "nsThreadUtils.h"
#ifdef MOZ_TASK_TRACER
#include "GeckoTaskTracer.h"
using namespace mozilla::tasktracer;
#endif
namespace mozilla {
namespace ipc {
@ -297,6 +306,119 @@ private:
nsAutoPtr<T> mInstance;
};
//
// SocketIOBase
//
/* |SocketIOBase| is a base class for Socket I/O classes that
* perform operations on the I/O thread. It provides methds
* for the most common read and write scenarios.
*/
class SocketIOBase
{
public:
virtual ~SocketIOBase();
void EnqueueData(UnixSocketRawData* aData);
bool HasPendingData() const;
template <typename T>
nsresult ReceiveData(int aFd, T* aIO)
{
MOZ_ASSERT(aFd >= 0);
MOZ_ASSERT(aIO);
do {
nsAutoPtr<UnixSocketRawData> incoming(
new UnixSocketRawData(mMaxReadSize));
ssize_t res =
TEMP_FAILURE_RETRY(read(aFd, incoming->mData, incoming->mSize));
if (res < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return NS_OK; /* no more data available */
}
/* an error occored */
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
NS_DispatchToMainThread(r);
return NS_ERROR_FAILURE;
} else if (!res) {
/* EOF or peer shut down sending */
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
NS_DispatchToMainThread(r);
return NS_OK;
}
#ifdef MOZ_TASK_TRACER
// Make unix socket creation events to be the source events of TaskTracer,
// and originate the rest correlation tasks from here.
AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET);
#endif
incoming->mSize = res;
nsRefPtr<nsRunnable> r =
new SocketIOReceiveRunnable<T>(aIO, incoming.forget());
NS_DispatchToMainThread(r);
} while (true);
return NS_OK;
}
template <typename T>
nsresult SendPendingData(int aFd, T* aIO)
{
MOZ_ASSERT(aFd >= 0);
MOZ_ASSERT(aIO);
do {
if (!HasPendingData()) {
return NS_OK;
}
UnixSocketRawData* outgoing = mOutgoingQ.ElementAt(0);
MOZ_ASSERT(outgoing->mSize);
const uint8_t* data = outgoing->mData + outgoing->mCurrentWriteOffset;
size_t size = outgoing->mSize - outgoing->mCurrentWriteOffset;
ssize_t res = TEMP_FAILURE_RETRY(write(aFd, data, size));
if (res < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return NS_OK; /* no more data available */
}
/* an error occored */
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
NS_DispatchToMainThread(r);
return NS_ERROR_FAILURE;
} else if (!res) {
return NS_OK; /* nothing written */
}
outgoing->mCurrentWriteOffset += res;
if (outgoing->mCurrentWriteOffset == outgoing->mSize) {
mOutgoingQ.RemoveElementAt(0);
delete data;
}
} while (true);
return NS_OK;
}
protected:
SocketIOBase(size_t aMaxReadSize);
private:
const size_t mMaxReadSize;
/**
* Raw data queue. Must be pushed/popped from I/O thread only.
*/
nsTArray<UnixSocketRawData*> mOutgoingQ;
};
}
}

View File

@ -9,11 +9,6 @@
#include "nsXULAppAPI.h"
#include <fcntl.h>
#ifdef MOZ_TASK_TRACER
#include "GeckoTaskTracer.h"
using namespace mozilla::tasktracer;
#endif
static const size_t MAX_READ_SIZE = 1 << 16;
namespace mozilla {
@ -24,12 +19,14 @@ namespace ipc {
//
class UnixSocketImpl : public UnixSocketWatcher
, protected SocketIOBase
{
public:
UnixSocketImpl(MessageLoop* mIOLoop,
UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
const nsACString& aAddress)
: UnixSocketWatcher(mIOLoop)
, SocketIOBase(MAX_READ_SIZE)
, mConsumer(aConsumer)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
@ -44,9 +41,9 @@ public:
MOZ_ASSERT(IsShutdownOnMainThread());
}
void QueueWriteData(UnixSocketRawData* aData)
void Send(UnixSocketRawData* aData)
{
mOutgoingQ.AppendElement(aData);
EnqueueData(aData);
AddWatchers(WRITE_WATCHER, false);
}
@ -145,12 +142,6 @@ private:
void FireSocketError();
/**
* Raw data queue. Must be pushed/popped from IO thread only.
*/
typedef nsTArray<UnixSocketRawData* > UnixSocketRawDataQueue;
UnixSocketRawDataQueue mOutgoingQ;
/**
* Connector object used to create the connection we are currently using.
*/
@ -228,7 +219,7 @@ public:
UnixSocketImpl* impl = GetImpl();
MOZ_ASSERT(!impl->IsShutdownOnIOThread());
impl->QueueWriteData(mData);
impl->Send(mData);
}
private:
nsRefPtr<UnixSocketConsumer> mConsumer;
@ -463,7 +454,7 @@ UnixSocketImpl::OnAccepted(int aFd,
NS_DispatchToMainThread(r);
AddWatchers(READ_WATCHER, true);
if (!mOutgoingQ.IsEmpty()) {
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
@ -492,7 +483,7 @@ UnixSocketImpl::OnConnected()
NS_DispatchToMainThread(r);
AddWatchers(READ_WATCHER, true);
if (!mOutgoingQ.IsEmpty()) {
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
@ -527,51 +518,10 @@ UnixSocketImpl::OnSocketCanReceiveWithoutBlocking()
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
// Read all of the incoming data.
while (true) {
nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE));
ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize);
if (ret <= 0) {
if (ret == -1) {
if (errno == EINTR) {
continue; // retry system call when interrupted
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return; // no data available: return and re-poll
}
#ifdef DEBUG
NS_WARNING("Cannot read from network");
#endif
// else fall through to error handling on other errno's
}
// We're done with our descriptors. Ensure that spurious events don't
// cause us to end up back here.
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
nsRefPtr<nsRunnable> r =
new SocketIORequestClosingRunnable<UnixSocketImpl>(this);
NS_DispatchToMainThread(r);
return;
}
#ifdef MOZ_TASK_TRACER
// Make unix socket creation events to be the source events of TaskTracer,
// and originate the rest correlation tasks from here.
AutoSourceEvent taskTracerEvent(SourceEventType::UNIXSOCKET);
#endif
incoming->mSize = ret;
nsRefPtr<nsRunnable> r =
new SocketIOReceiveRunnable<UnixSocketImpl>(this, incoming.forget());
NS_DispatchToMainThread(r);
// If ret is less than MAX_READ_SIZE, there's no
// more data in the socket for us to read now.
if (ret < ssize_t(MAX_READ_SIZE)) {
return;
}
nsresult rv = ReceiveData(GetFd(), this);
if (NS_FAILED(rv)) {
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
return;
}
}
@ -581,40 +531,13 @@ UnixSocketImpl::OnSocketCanSendWithoutBlocking()
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
// Try to write the bytes of mCurrentRilRawData. If all were written, continue.
//
// Otherwise, save the byte position of the next byte to write
// within mCurrentWriteOffset, and request another write when the
// system won't block.
//
while (true) {
UnixSocketRawData* data;
if (mOutgoingQ.IsEmpty()) {
return;
}
data = mOutgoingQ.ElementAt(0);
const uint8_t *toWrite;
toWrite = data->mData;
nsresult rv = SendPendingData(GetFd(), this);
if (NS_FAILED(rv)) {
return;
}
while (data->mCurrentWriteOffset < data->mSize) {
ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
ssize_t written;
written = write (GetFd(), toWrite + data->mCurrentWriteOffset,
write_amount);
if (written > 0) {
data->mCurrentWriteOffset += written;
}
if (written != write_amount) {
break;
}
}
if (data->mCurrentWriteOffset != data->mSize) {
AddWatchers(WRITE_WATCHER, false);
return;
}
mOutgoingQ.RemoveElementAt(0);
delete data;
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}