Bug 1161020: Use new socket-connector interface in socket classes, r=kmachulis

This patch converts the socket I/O classes to use the new interface
of the socket-connector classes. All sockets are now created and set
up by a socket connector.
This commit is contained in:
Thomas Zimmermann 2015-05-18 11:28:30 +02:00
parent 7577295617
commit cff86911f3
3 changed files with 124 additions and 295 deletions

View File

@ -30,11 +30,10 @@ class BluetoothSocket::BluetoothSocketIO final
public:
BluetoothSocketIO(MessageLoop* mIOLoop,
BluetoothSocket* aConsumer,
UnixSocketConnector* aConnector,
const nsACString& aAddress);
UnixSocketConnector* aConnector);
~BluetoothSocketIO();
void GetSocketAddr(nsAString& aAddrStr) const;
void GetSocketAddr(nsAString& aAddrStr) const;
BluetoothSocket* GetBluetoothSocket();
DataSocket* GetDataSocket();
@ -94,9 +93,6 @@ private:
void FireSocketError();
// Set up flags on file descriptor.
static bool SetSocketFlags(int aFd);
/**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from main thread. All non-main-thread accesses should happen with
@ -115,19 +111,14 @@ private:
bool mShuttingDownOnIOThread;
/**
* Address we are connecting to, assuming we are creating a client connection.
* Number of valid bytes in |mAddress|
*/
nsCString mAddress;
socklen_t mAddressLength;
/**
* Size of the socket address struct
* Address structure of the socket currently in use
*/
socklen_t mAddrSize;
/**
* Address struct of the socket currently in use
*/
sockaddr_any mAddr;
struct sockaddr_storage mAddress;
/**
* Task member for delayed connect task. Should only be access on main thread.
@ -143,13 +134,12 @@ private:
BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
MessageLoop* mIOLoop,
BluetoothSocket* aConsumer,
UnixSocketConnector* aConnector,
const nsACString& aAddress)
UnixSocketConnector* aConnector)
: UnixSocketWatcher(mIOLoop)
, mConsumer(aConsumer)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddress(aAddress)
, mAddressLength(0)
, mDelayedConnectTask(nullptr)
{
MOZ_ASSERT(mConsumer);
@ -170,7 +160,17 @@ BluetoothSocket::BluetoothSocketIO::GetSocketAddr(nsAString& aAddrStr) const
aAddrStr.Truncate();
return;
}
mConnector->GetSocketAddr(mAddr, aAddrStr);
nsCString addressString;
nsresult rv = mConnector->ConvertAddressToString(
*reinterpret_cast<const struct sockaddr*>(&mAddress), mAddressLength,
addressString);
if (NS_FAILED(rv)) {
aAddrStr.Truncate();
return;
}
aAddrStr.Assign(NS_ConvertUTF8toUTF16(addressString));
}
BluetoothSocket*
@ -221,34 +221,20 @@ BluetoothSocket::BluetoothSocketIO::Listen()
MOZ_ASSERT(mConnector);
if (!IsOpen()) {
int fd = mConnector->Create();
if (fd < 0) {
NS_WARNING("Cannot create socket fd!");
FireSocketError();
return;
}
if (!SetSocketFlags(fd)) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
}
if (!mConnector->SetUpListenSocket(fd)) {
NS_WARNING("Could not set up listen socket!");
FireSocketError();
return;
}
// This will set things we don't particularly care about, but it will hand
// back the correct structure size which is what we do care about.
if (!mConnector->CreateAddr(true, mAddrSize, mAddr, nullptr)) {
NS_WARNING("Cannot create socket address!");
mAddressLength = sizeof(mAddress);
int fd;
nsresult rv = mConnector->CreateListenSocket(
reinterpret_cast<struct sockaddr*>(&mAddress), &mAddressLength, fd);
if (NS_WARN_IF(NS_FAILED(rv))) {
FireSocketError();
return;
}
SetFd(fd);
// calls OnListening on success, or OnError otherwise
nsresult rv = UnixSocketWatcher::Listen(
reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
rv = UnixSocketWatcher::Listen(
reinterpret_cast<struct sockaddr*>(&mAddress), mAddressLength);
NS_WARN_IF(NS_FAILED(rv));
}
}
@ -260,24 +246,12 @@ BluetoothSocket::BluetoothSocketIO::Connect()
MOZ_ASSERT(mConnector);
if (!IsOpen()) {
int fd = mConnector->Create();
if (fd < 0) {
NS_WARNING("Cannot create socket fd!");
FireSocketError();
return;
}
if (!SetSocketFlags(fd)) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
}
if (!mConnector->SetUp(fd)) {
NS_WARNING("Could not set up socket!");
FireSocketError();
return;
}
if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) {
NS_WARNING("Cannot create socket address!");
mAddressLength = sizeof(mAddress);
int fd;
nsresult rv = mConnector->CreateStreamSocket(
reinterpret_cast<struct sockaddr*>(&mAddress), &mAddressLength, fd);
if (NS_WARN_IF(NS_FAILED(rv))) {
FireSocketError();
return;
}
@ -286,7 +260,7 @@ BluetoothSocket::BluetoothSocketIO::Connect()
// calls OnConnected() on success, or OnError() otherwise
nsresult rv = UnixSocketWatcher::Connect(
reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
reinterpret_cast<struct sockaddr*>(&mAddress), mAddressLength);
NS_WARN_IF(NS_FAILED(rv));
}
@ -338,18 +312,14 @@ BluetoothSocket::BluetoothSocketIO::OnSocketCanAcceptWithoutBlocking()
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
socklen_t mAddrSize = sizeof(mAddr);
int fd = TEMP_FAILURE_RETRY(accept(GetFd(),
reinterpret_cast<struct sockaddr*>(&mAddr), &mAddrSize));
if (fd < 0) {
OnError("accept", errno);
return;
}
if (!SetSocketFlags(fd)) {
return;
}
if (!mConnector->SetUp(fd)) {
NS_WARNING("Could not set up socket!");
mAddressLength = sizeof(mAddress);
int fd;
nsresult rv = mConnector->AcceptStreamSocket(
GetFd(),
reinterpret_cast<struct sockaddr*>(&mAddress), &mAddressLength, fd);
if (NS_WARN_IF(NS_FAILED(rv))) {
FireSocketError();
return;
}
@ -411,38 +381,6 @@ BluetoothSocket::BluetoothSocketIO::FireSocketError()
}
bool
BluetoothSocket::BluetoothSocketIO::SetSocketFlags(int aFd)
{
// Set socket addr to be reused even if kernel is still waiting to close
int n = 1;
if (setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof(n)) < 0) {
return false;
}
// Set close-on-exec bit.
int flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFD));
if (-1 == flags) {
return false;
}
flags |= FD_CLOEXEC;
if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFD, flags))) {
return false;
}
// Set non-blocking status flag.
flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFL));
if (-1 == flags) {
return false;
}
flags |= O_NONBLOCK;
if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags))) {
return false;
}
return true;
}
// |DataSocketIO|
nsresult
@ -730,9 +668,8 @@ BluetoothSocket::ConnectSocket(BluetoothUnixSocketConnector* aConnector,
return false;
}
nsCString addr(aAddress);
MessageLoop* ioLoop = XRE_GetIOMessageLoop();
mIO = new BluetoothSocketIO(ioLoop, this, connector.forget(), addr);
mIO = new BluetoothSocketIO(ioLoop, this, connector.forget());
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {
DelayedConnectTask* connectTask = new DelayedConnectTask(mIO);
@ -757,10 +694,12 @@ BluetoothSocket::ListenSocket(BluetoothUnixSocketConnector* aConnector)
return false;
}
mIO = new BluetoothSocketIO(
XRE_GetIOMessageLoop(), this, connector.forget(), EmptyCString());
MessageLoop* ioLoop = XRE_GetIOMessageLoop();
mIO = new BluetoothSocketIO(ioLoop, this, connector.forget());
SetConnectionStatus(SOCKET_LISTENING);
XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new ListenTask(mIO));
ioLoop->PostTask(FROM_HERE, new ListenTask(mIO));
return true;
}

View File

@ -28,8 +28,7 @@ public:
ListenSocketIO(MessageLoop* mIOLoop,
ListenSocket* aListenSocket,
UnixSocketConnector* aConnector,
const nsACString& aAddress);
UnixSocketConnector* aConnector);
~ListenSocketIO();
void GetSocketAddr(nsAString& aAddrStr) const;
@ -64,9 +63,6 @@ public:
private:
void FireSocketError();
// Set up flags on file descriptor.
static bool SetSocketFlags(int aFd);
/**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from main thread. All non-main-thread accesses should happen with
@ -85,33 +81,27 @@ private:
bool mShuttingDownOnIOThread;
/**
* Address we are connecting to, assuming we are creating a client connection.
* Number of valid bytes in |mAddress|
*/
nsCString mAddress;
socklen_t mAddressLength;
/**
* Size of the socket address struct
* Address structure of the socket currently in use
*/
socklen_t mAddrSize;
/**
* Address struct of the socket currently in use
*/
sockaddr_any mAddr;
struct sockaddr_storage mAddress;
ConnectionOrientedSocketIO* mCOSocketIO;
};
ListenSocketIO::ListenSocketIO(MessageLoop* mIOLoop,
ListenSocket* aListenSocket,
UnixSocketConnector* aConnector,
const nsACString& aAddress)
UnixSocketConnector* aConnector)
: UnixSocketWatcher(mIOLoop)
, SocketIOBase()
, mListenSocket(aListenSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddress(aAddress)
, mAddressLength(0)
, mCOSocketIO(nullptr)
{
MOZ_ASSERT(mListenSocket);
@ -132,7 +122,16 @@ ListenSocketIO::GetSocketAddr(nsAString& aAddrStr) const
aAddrStr.Truncate();
return;
}
mConnector->GetSocketAddr(mAddr, aAddrStr);
nsCString addressString;
nsresult rv = mConnector->ConvertAddressToString(
*reinterpret_cast<const struct sockaddr*>(&mAddress), mAddressLength,
addressString);
if (NS_FAILED(rv)) {
return;
}
aAddrStr.Assign(NS_ConvertUTF8toUTF16(addressString));
}
void
@ -142,28 +141,14 @@ ListenSocketIO::Listen(ConnectionOrientedSocketIO* aCOSocketIO)
MOZ_ASSERT(mConnector);
MOZ_ASSERT(aCOSocketIO);
struct sockaddr* address = reinterpret_cast<struct sockaddr*>(&mAddress);
mAddressLength = sizeof(mAddress);
if (!IsOpen()) {
int fd = mConnector->Create();
if (fd < 0) {
NS_WARNING("Cannot create socket fd!");
FireSocketError();
return;
}
if (!SetSocketFlags(fd)) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
}
if (!mConnector->SetUpListenSocket(GetFd())) {
NS_WARNING("Could not set up listen socket!");
FireSocketError();
return;
}
// This will set things we don't particularly care about, but
// it will hand back the correct structure size which is what
// we do care about.
if (!mConnector->CreateAddr(true, mAddrSize, mAddr, nullptr)) {
NS_WARNING("Cannot create socket address!");
int fd;
nsresult rv = mConnector->CreateListenSocket(address, &mAddressLength,
fd);
if (NS_FAILED(rv)) {
FireSocketError();
return;
}
@ -173,8 +158,7 @@ ListenSocketIO::Listen(ConnectionOrientedSocketIO* aCOSocketIO)
mCOSocketIO = aCOSocketIO;
// calls OnListening on success, or OnError otherwise
nsresult rv = UnixSocketWatcher::Listen(
reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
nsresult rv = UnixSocketWatcher::Listen(address, mAddressLength);
NS_WARN_IF(NS_FAILED(rv));
}
@ -221,41 +205,6 @@ ListenSocketIO::FireSocketError()
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
}
bool
ListenSocketIO::SetSocketFlags(int aFd)
{
static const int reuseaddr = 1;
// Set socket addr to be reused even if kernel is still waiting to close
int res = setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR,
&reuseaddr, sizeof(reuseaddr));
if (res < 0) {
return false;
}
// Set close-on-exec bit.
int flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFD));
if (-1 == flags) {
return false;
}
flags |= FD_CLOEXEC;
if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFD, flags))) {
return false;
}
// Set non-blocking status flag.
flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFL));
if (-1 == flags) {
return false;
}
flags |= O_NONBLOCK;
if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags))) {
return false;
}
return true;
}
void
ListenSocketIO::OnSocketCanAcceptWithoutBlocking()
{
@ -263,20 +212,24 @@ ListenSocketIO::OnSocketCanAcceptWithoutBlocking()
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING);
MOZ_ASSERT(mCOSocketIO);
struct sockaddr_storage addr;
socklen_t addrLen = sizeof(addr);
int fd = TEMP_FAILURE_RETRY(accept(GetFd(),
reinterpret_cast<struct sockaddr*>(&addr), &addrLen));
if (fd < 0) {
OnError("accept", errno);
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
struct sockaddr_storage storage;
socklen_t addressLength = sizeof(storage);
int fd;
nsresult rv = mConnector->AcceptStreamSocket(
GetFd(),
reinterpret_cast<struct sockaddr*>(&storage), &addressLength,
fd);
if (NS_FAILED(rv)) {
FireSocketError();
return;
}
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
mCOSocketIO->Accept(fd,
reinterpret_cast<union sockaddr_any*>(&addr),
addrLen);
reinterpret_cast<union sockaddr_any*>(&storage),
addressLength);
}
// |SocketIOBase|
@ -397,8 +350,7 @@ ListenSocket::Listen(UnixSocketConnector* aConnector,
return false;
}
mIO = new ListenSocketIO(
XRE_GetIOMessageLoop(), this, connector.forget(), EmptyCString());
mIO = new ListenSocketIO(XRE_GetIOMessageLoop(), this, connector.forget());
// Prepared I/O object, now start listening.
return Listen(aCOSocket);

View File

@ -30,13 +30,11 @@ public:
StreamSocketIO(MessageLoop* mIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector,
const nsACString& aAddress);
UnixSocketConnector* aConnector);
StreamSocketIO(MessageLoop* mIOLoop, int aFd,
ConnectionStatus aConnectionStatus,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector,
const nsACString& aAddress);
UnixSocketConnector* aConnector);
~StreamSocketIO();
void GetSocketAddr(nsAString& aAddrStr) const;
@ -98,9 +96,6 @@ public:
private:
void FireSocketError();
// Set up flags on file descriptor.
static bool SetSocketFlags(int aFd);
/**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from main thread. All non-main-thread accesses should happen with
@ -119,19 +114,14 @@ private:
bool mShuttingDownOnIOThread;
/**
* Address we are connecting to, assuming we are creating a client connection.
* Number of valid bytes in |mAddress|
*/
nsCString mAddress;
socklen_t mAddressLength;
/**
* Size of the socket address struct
* Address structure of the socket currently in use
*/
socklen_t mAddrSize;
/**
* Address struct of the socket currently in use
*/
sockaddr_any mAddr;
struct sockaddr_storage mAddress;
/**
* Task member for delayed connect task. Should only be access on main thread.
@ -146,13 +136,12 @@ private:
StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector,
const nsACString& aAddress)
UnixSocketConnector* aConnector)
: UnixSocketWatcher(mIOLoop)
, mStreamSocket(aStreamSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddress(aAddress)
, mAddressLength(0)
, mDelayedConnectTask(nullptr)
{
MOZ_ASSERT(mStreamSocket);
@ -162,13 +151,12 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd,
ConnectionStatus aConnectionStatus,
StreamSocket* aStreamSocket,
UnixSocketConnector* aConnector,
const nsACString& aAddress)
UnixSocketConnector* aConnector)
: UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus)
, mStreamSocket(aStreamSocket)
, mConnector(aConnector)
, mShuttingDownOnIOThread(false)
, mAddress(aAddress)
, mAddressLength(0)
, mDelayedConnectTask(nullptr)
{
MOZ_ASSERT(mStreamSocket);
@ -189,7 +177,16 @@ StreamSocketIO::GetSocketAddr(nsAString& aAddrStr) const
aAddrStr.Truncate();
return;
}
mConnector->GetSocketAddr(mAddr, aAddrStr);
nsCString addressString;
nsresult rv = mConnector->ConvertAddressToString(
*reinterpret_cast<const struct sockaddr*>(&mAddress), mAddressLength,
addressString);
if (NS_FAILED(rv)) {
return;
}
aAddrStr.Assign(NS_ConvertUTF8toUTF16(addressString));
}
StreamSocket*
@ -239,34 +236,21 @@ StreamSocketIO::Connect()
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(mConnector);
if (!IsOpen()) {
int fd = mConnector->Create();
if (fd < 0) {
NS_WARNING("Cannot create socket fd!");
FireSocketError();
return;
}
if (!SetSocketFlags(fd)) {
NS_WARNING("Cannot set socket flags!");
FireSocketError();
return;
}
if (!mConnector->SetUp(GetFd())) {
NS_WARNING("Could not set up socket!");
FireSocketError();
return;
}
if (!mConnector->CreateAddr(false, mAddrSize, mAddr, mAddress.get())) {
NS_WARNING("Cannot create socket address!");
FireSocketError();
return;
}
SetFd(fd);
MOZ_ASSERT(!IsOpen());
struct sockaddr* address = reinterpret_cast<struct sockaddr*>(&mAddress);
mAddressLength = sizeof(mAddress);
int fd;
nsresult rv = mConnector->CreateStreamSocket(address, &mAddressLength, fd);
if (NS_FAILED(rv)) {
FireSocketError();
return;
}
SetFd(fd);
// calls OnConnected() on success, or OnError() otherwise
nsresult rv = UnixSocketWatcher::Connect(
reinterpret_cast<struct sockaddr*>(&mAddr), mAddrSize);
rv = UnixSocketWatcher::Connect(address, mAddressLength);
NS_WARN_IF(NS_FAILED(rv));
}
@ -354,41 +338,6 @@ StreamSocketIO::FireSocketError()
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR));
}
bool
StreamSocketIO::SetSocketFlags(int aFd)
{
static const int reuseaddr = 1;
// Set socket addr to be reused even if kernel is still waiting to close
int res = setsockopt(aFd, SOL_SOCKET, SO_REUSEADDR,
&reuseaddr, sizeof(reuseaddr));
if (res < 0) {
return false;
}
// Set close-on-exec bit.
int flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFD));
if (-1 == flags) {
return false;
}
flags |= FD_CLOEXEC;
if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFD, flags))) {
return false;
}
// Set non-blocking status flag.
flags = TEMP_FAILURE_RETRY(fcntl(aFd, F_GETFL));
if (-1 == flags) {
return false;
}
flags |= O_NONBLOCK;
if (-1 == TEMP_FAILURE_RETRY(fcntl(aFd, F_SETFL, flags))) {
return false;
}
return true;
}
// |ConnectionOrientedSocketIO|
nsresult
@ -398,21 +347,11 @@ StreamSocketIO::Accept(int aFd,
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTING);
// File-descriptor setup
if (!SetSocketFlags(aFd)) {
return NS_ERROR_FAILURE;
}
if (!mConnector->SetUp(aFd)) {
NS_WARNING("Could not set up socket!");
return NS_ERROR_FAILURE;
}
SetSocket(aFd, SOCKET_IS_CONNECTED);
// Address setup
memcpy(&mAddr, aAddr, aAddrLen);
mAddrSize = aAddrLen;
mAddressLength = aAddrLen;
memcpy(&mAddress, aAddr, mAddressLength);
// Signal success
NS_DispatchToMainThread(
@ -652,9 +591,8 @@ StreamSocket::Connect(UnixSocketConnector* aConnector,
return false;
}
nsCString addr(aAddress);
MessageLoop* ioLoop = XRE_GetIOMessageLoop();
mIO = new StreamSocketIO(ioLoop, this, connector.forget(), addr);
mIO = new StreamSocketIO(ioLoop, this, connector.forget());
SetConnectionStatus(SOCKET_CONNECTING);
if (aDelayMs > 0) {
StreamSocketIO::DelayedConnectTask* connectTask =
@ -681,7 +619,7 @@ StreamSocket::PrepareAccept(UnixSocketConnector* aConnector)
mIO = new StreamSocketIO(XRE_GetIOMessageLoop(),
-1, UnixSocketWatcher::SOCKET_IS_CONNECTING,
this, connector.forget(), EmptyCString());
this, connector.forget());
return mIO;
}