Bug 974410: Inherit UnixSocketImpl from UnixSocketWatcher, r=kyle

The base class UnixSocketWatcher handles the connection state of
the socket. UnixSocketImpl overrides UnixSocketWatcher's callback
methods for implementing it's functionality.
This commit is contained in:
Thomas Zimmermann 2014-02-26 17:52:09 +01:00
parent 0801b527de
commit d3b12775b1

View File

@ -39,20 +39,18 @@ static const int SOCKET_RETRY_TIME_MS = 1000;
namespace mozilla {
namespace ipc {
class UnixSocketImpl : public UnixFdWatcher
class UnixSocketImpl : public UnixSocketWatcher
{
public:
UnixSocketImpl(MessageLoop* mIOLoop,
UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector,
const nsACString& aAddress,
SocketConnectionStatus aConnectionStatus)
: UnixFdWatcher(mIOLoop)
const nsACString& aAddress)
: UnixSocketWatcher(mIOLoop)
, mConsumer(aConsumer)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddress(aAddress)
, mDelayedConnectTask(nullptr)
, mConnectionStatus(aConnectionStatus)
{
}
@ -65,7 +63,7 @@ public:
void QueueWriteData(UnixSocketRawData* aData)
{
mOutgoingQ.AppendElement(aData);
OnFileCanWriteWithoutBlocking(GetFd());
AddWatchers(WRITE_WATCHER, false);
}
bool IsShutdownOnMainThread()
@ -100,6 +98,9 @@ public:
{
MOZ_ASSERT(IsOpen());
AddWatchers(READ_WATCHER, true);
if (!mOutgoingQ.IsEmpty()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void SetDelayedConnectTask(CancelableTask* aTask)
@ -134,17 +135,12 @@ public:
*/
void Listen();
/**
* Accept an incoming connection
*/
void Accept();
/**
* Set up flags on whatever our current file descriptor is.
*
* @return true if successful, false otherwise
*/
bool SetSocketFlags();
bool SetSocketFlags(int aFd);
void GetSocketAddr(nsAString& aAddrStr)
{
@ -163,26 +159,17 @@ public:
*/
RefPtr<UnixSocketConsumer> mConsumer;
void OnAccepted(int aFd) MOZ_OVERRIDE;
void OnConnected() MOZ_OVERRIDE;
void OnError(const char* aFunction, int aErrno) MOZ_OVERRIDE;
void OnListening() MOZ_OVERRIDE;
void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE;
void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE;
private:
void FireSocketError();
/**
* libevent triggered functions that reads data from socket when available and
* guarenteed non-blocking. Only to be called on IO thread.
*
* @param aFd File descriptor to read from
*/
virtual void OnFileCanReadWithoutBlocking(int aFd);
/**
* libevent or developer triggered functions that writes data to socket when
* available and guarenteed non-blocking. Only to be called on IO thread.
*
* @param aFd File descriptor to read from
*/
virtual void OnFileCanWriteWithoutBlocking(int aFd);
/**
* Raw data queue. Must be pushed/popped from IO thread only.
*/
@ -218,12 +205,6 @@ private:
* Task member for delayed connect task. Should only be access on main thread.
*/
CancelableTask* mDelayedConnectTask;
/**
* Socket connection status. Duplicate from UnixSocketConsumer. Should only
* be accessed on I/O thread.
*/
SocketConnectionStatus mConnectionStatus;
};
template<class T>
@ -372,12 +353,13 @@ private:
UnixSocketImpl* mImpl;
};
class SocketAcceptTask : public CancelableTask {
class SocketListenTask : public CancelableTask
{
virtual void Run();
UnixSocketImpl* mImpl;
public:
SocketAcceptTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { }
SocketListenTask(UnixSocketImpl* aImpl) : mImpl(aImpl) { }
virtual void Cancel()
{
@ -386,12 +368,12 @@ public:
}
};
void SocketAcceptTask::Run()
void SocketListenTask::Run()
{
MOZ_ASSERT(!NS_IsMainThread());
if (mImpl) {
mImpl->Accept();
mImpl->Listen();
}
}
@ -447,7 +429,7 @@ void ShutdownSocketTask::Run()
MOZ_ASSERT(!NS_IsMainThread());
// At this point, there should be no new events on the IO thread after this
// one with the possible exception of a SocketAcceptTask that
// one with the possible exception of a SocketListenTask that
// ShutdownOnIOThread will cancel for us. We are now fully shut down, so we
// can send a message to the main thread that will delete mImpl safely knowing
// that no more tasks reference it.
@ -461,11 +443,10 @@ void ShutdownSocketTask::Run()
void
UnixSocketImpl::FireSocketError()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
// Clean up watchers, statuses, fds
Close();
mConnectionStatus = SOCKET_DISCONNECTED;
// Tell the main thread we've errored
nsRefPtr<OnSocketEventTask> t =
@ -474,9 +455,9 @@ UnixSocketImpl::FireSocketError()
}
void
UnixSocketImpl::Accept()
UnixSocketImpl::Listen()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(mConnector);
// This will set things we don't particularly care about, but it will hand
@ -496,43 +477,23 @@ UnixSocketImpl::Accept()
}
SetFd(fd);
if (!SetSocketFlags()) {
if (!SetSocketFlags(GetFd())) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
}
if (bind(GetFd(), (struct sockaddr*)&mAddr, mAddrSize)) {
#ifdef DEBUG
CHROMIUM_LOG("...bind(%d) gave errno %d", GetFd(), errno);
#endif
FireSocketError();
return;
}
if (listen(GetFd(), 1)) {
#ifdef DEBUG
CHROMIUM_LOG("...listen(%d) gave errno %d", GetFd(), errno);
#endif
FireSocketError();
return;
}
if (!mConnector->SetUpListenSocket(GetFd())) {
NS_WARNING("Could not set up listen socket!");
FireSocketError();
return;
}
// calls OnListening on success, or OnError otherwise
nsresult rv = UnixSocketWatcher::Listen(
reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
NS_WARN_IF(NS_FAILED(rv));
}
SetUpIO();
}
void
UnixSocketImpl::Connect()
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(mConnector);
if (!IsOpen()) {
@ -545,54 +506,71 @@ UnixSocketImpl::Connect()
SetFd(fd);
}
int ret;
if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) {
NS_WARNING("Cannot create socket address!");
FireSocketError();
return;
}
// Select non-blocking IO.
if (-1 == fcntl(GetFd(), F_SETFL, O_NONBLOCK)) {
NS_WARNING("Cannot set nonblock!");
FireSocketError();
// calls OnConnected() on success, or OnError() otherwise
nsresult rv = UnixSocketWatcher::Connect(
reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
NS_WARN_IF(NS_FAILED(rv));
}
bool
UnixSocketImpl::SetSocketFlags(int aFd)
{
// Set socket addr to be reused even if kernel is still waiting to close
int n = 1;
setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n));
// Set close-on-exec bit.
int flags = fcntl(aFd, F_GETFD);
if (-1 == flags) {
return false;
}
flags |= FD_CLOEXEC;
if (-1 == fcntl(aFd, F_SETFD, flags)) {
return false;
}
return true;
}
void
UnixSocketImpl::OnAccepted(int aFd)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
if (!mConnector->SetUp(aFd)) {
NS_WARNING("Could not set up socket!");
return;
}
ret = connect(GetFd(), (struct sockaddr*)&mAddr, mAddrSize);
if (ret) {
if (errno == EINPROGRESS) {
// Select blocking IO again, since we've now at least queue'd the connect
// as nonblock.
int current_opts = fcntl(GetFd(), F_GETFL, 0);
if (-1 == current_opts) {
NS_WARNING("Cannot get socket opts!");
FireSocketError();
return;
}
if (-1 == fcntl(GetFd(), F_SETFL, current_opts & ~O_NONBLOCK)) {
NS_WARNING("Cannot set socket opts to blocking!");
FireSocketError();
return;
}
AddWatchers(WRITE_WATCHER, false);
#ifdef DEBUG
CHROMIUM_LOG("UnixSocket Connection delayed!");
#endif
return;
}
#if DEBUG
CHROMIUM_LOG("Socket connect errno=%d\n", errno);
#endif
FireSocketError();
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
Close();
SetSocket(aFd, SOCKET_IS_CONNECTED);
if (!SetSocketFlags(GetFd())) {
return;
}
if (!SetSocketFlags()) {
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
NS_DispatchToMainThread(t);
SetUpIO();
}
void
UnixSocketImpl::OnConnected()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
if (!SetSocketFlags(GetFd())) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
@ -607,30 +585,122 @@ UnixSocketImpl::Connect()
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
NS_DispatchToMainThread(t);
mConnectionStatus = SOCKET_CONNECTED;
SetUpIO();
}
bool
UnixSocketImpl::SetSocketFlags()
void
UnixSocketImpl::OnListening()
{
// Set socket addr to be reused even if kernel is still waiting to close
int n = 1;
setsockopt(GetFd(), SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n));
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
// Set close-on-exec bit.
int flags = fcntl(GetFd(), F_GETFD);
if (-1 == flags) {
return false;
if (!mConnector->SetUpListenSocket(GetFd())) {
NS_WARNING("Could not set up listen socket!");
FireSocketError();
return;
}
flags |= FD_CLOEXEC;
if (-1 == fcntl(GetFd(), F_SETFD, flags)) {
return false;
}
SetUpIO();
}
return true;
void
UnixSocketImpl::OnError(const char* aFunction, int aErrno)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
UnixFdWatcher::OnError(aFunction, aErrno);
FireSocketError();
}
void
UnixSocketImpl::OnSocketCanReceiveWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
// Read all of the incoming data.
while (true) {
nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE));
ssize_t ret = read(GetFd(), incoming->mData, incoming->mSize);
if (ret <= 0) {
if (ret == -1) {
if (errno == EINTR) {
continue; // retry system call when interrupted
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return; // no data available: return and re-poll
}
#ifdef DEBUG
NS_WARNING("Cannot read from network");
#endif
// else fall through to error handling on other errno's
}
// We're done with our descriptors. Ensure that spurious events don't
// cause us to end up back here.
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
nsRefPtr<RequestClosingSocketTask> t = new RequestClosingSocketTask(this);
NS_DispatchToMainThread(t);
return;
}
incoming->mSize = ret;
nsRefPtr<SocketReceiveTask> t =
new SocketReceiveTask(this, incoming.forget());
NS_DispatchToMainThread(t);
// If ret is less than MAX_READ_SIZE, there's no
// more data in the socket for us to read now.
if (ret < ssize_t(MAX_READ_SIZE)) {
return;
}
}
}
void
UnixSocketImpl::OnSocketCanSendWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
// Try to write the bytes of mCurrentRilRawData. If all were written, continue.
//
// Otherwise, save the byte position of the next byte to write
// within mCurrentWriteOffset, and request another write when the
// system won't block.
//
while (true) {
UnixSocketRawData* data;
if (mOutgoingQ.IsEmpty()) {
return;
}
data = mOutgoingQ.ElementAt(0);
const uint8_t *toWrite;
toWrite = data->mData;
while (data->mCurrentWriteOffset < data->mSize) {
ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
ssize_t written;
written = write (GetFd(), toWrite + data->mCurrentWriteOffset,
write_amount);
if (written > 0) {
data->mCurrentWriteOffset += written;
}
if (written != write_amount) {
break;
}
}
if (data->mCurrentWriteOffset != data->mSize) {
AddWatchers(WRITE_WATCHER, false);
return;
}
mOutgoingQ.RemoveElementAt(0);
delete data;
}
}
UnixSocketConsumer::UnixSocketConsumer() : mImpl(nullptr)
@ -700,157 +770,6 @@ UnixSocketConsumer::CloseSocket()
NotifyDisconnect();
}
void
UnixSocketImpl::OnFileCanReadWithoutBlocking(int aFd)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
if (mConnectionStatus == SOCKET_CONNECTED) {
// Read all of the incoming data.
while (true) {
nsAutoPtr<UnixSocketRawData> incoming(new UnixSocketRawData(MAX_READ_SIZE));
ssize_t ret = read(aFd, incoming->mData, incoming->mSize);
if (ret <= 0) {
if (ret == -1) {
if (errno == EINTR) {
continue; // retry system call when interrupted
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return; // no data available: return and re-poll
}
#ifdef DEBUG
NS_WARNING("Cannot read from network");
#endif
// else fall through to error handling on other errno's
}
// We're done with our descriptors. Ensure that spurious events don't
// cause us to end up back here.
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
nsRefPtr<RequestClosingSocketTask> t = new RequestClosingSocketTask(this);
NS_DispatchToMainThread(t);
return;
}
incoming->mSize = ret;
nsRefPtr<SocketReceiveTask> t =
new SocketReceiveTask(this, incoming.forget());
NS_DispatchToMainThread(t);
// If ret is less than MAX_READ_SIZE, there's no
// more data in the socket for us to read now.
if (ret < ssize_t(MAX_READ_SIZE)) {
return;
}
}
MOZ_CRASH("We returned early");
} else if (mConnectionStatus == SOCKET_LISTENING) {
int client_fd = accept(GetFd(), (struct sockaddr*)&mAddr, &mAddrSize);
if (client_fd < 0) {
return;
}
if (!mConnector->SetUp(client_fd)) {
NS_WARNING("Could not set up socket!");
return;
}
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
Close();
SetFd(client_fd);
if (!SetSocketFlags()) {
return;
}
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
NS_DispatchToMainThread(t);
mConnectionStatus = SOCKET_CONNECTED;
SetUpIO();
}
}
void
UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
{
MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!mShuttingDownOnIOThread);
MOZ_ASSERT(aFd >= 0);
if (mConnectionStatus == SOCKET_CONNECTED) {
// Try to write the bytes of mCurrentRilRawData. If all were written, continue.
//
// Otherwise, save the byte position of the next byte to write
// within mCurrentWriteOffset, and request another write when the
// system won't block.
//
while (true) {
UnixSocketRawData* data;
if (mOutgoingQ.IsEmpty()) {
return;
}
data = mOutgoingQ.ElementAt(0);
const uint8_t *toWrite;
toWrite = data->mData;
while (data->mCurrentWriteOffset < data->mSize) {
ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
ssize_t written;
written = write (aFd, toWrite + data->mCurrentWriteOffset,
write_amount);
if (written > 0) {
data->mCurrentWriteOffset += written;
}
if (written != write_amount) {
break;
}
}
if (data->mCurrentWriteOffset != data->mSize) {
AddWatchers(WRITE_WATCHER, false);
return;
}
mOutgoingQ.RemoveElementAt(0);
delete data;
}
} else if (mConnectionStatus == SOCKET_CONNECTING) {
int error, ret;
socklen_t len = sizeof(error);
ret = getsockopt(GetFd(), SOL_SOCKET, SO_ERROR, &error, &len);
if (ret || error) {
NS_WARNING("getsockopt failure on async socket connect!");
FireSocketError();
return;
}
if (!SetSocketFlags()) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
}
if (!mConnector->SetUp(GetFd())) {
NS_WARNING("Could not set up socket!");
FireSocketError();
return;
}
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
NS_DispatchToMainThread(t);
mConnectionStatus = SOCKET_CONNECTED;
SetUpIO();
}
}
void
UnixSocketConsumer::GetSocketAddr(nsAString& aAddrStr)
{
@ -906,7 +825,7 @@ UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector,
nsCString addr(aAddress);
MessageLoop* ioLoop = XRE_GetIOMessageLoop();
mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr, SOCKET_CONNECTING);
mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr);
mConnectionStatus = SOCKET_CONNECTING;
if (aDelayMs > 0) {
SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl);
@ -932,10 +851,10 @@ UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector)
}
mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(),
EmptyCString(), SOCKET_LISTENING);
EmptyCString());
mConnectionStatus = SOCKET_LISTENING;
XRE_GetIOMessageLoop()->PostTask(FROM_HERE,
new SocketAcceptTask(mImpl));
new SocketListenTask(mImpl));
return true;
}