Bug 1158876: Move management of socket I/O buffers into socket I/O classes, r=kmachulis

This patch moves management of received socket I/O buffers from
|DataSocketIO| into the I/O classes. Each I/O class is responsible
for (de-)allocating buffers, and consuming them once data has been
received.

All current I/O classes forward their buffers to the main thread,
but other operations are possible. For example, received data can
be parsed and processed directly in the I/O thread.
This commit is contained in:
Thomas Zimmermann 2015-04-28 10:18:13 +02:00
parent f1427ebbb7
commit 358002fcbd
5 changed files with 160 additions and 14 deletions

View File

@ -75,7 +75,6 @@ public:
DroidSocketImpl(MessageLoop* aIOLoop, BluetoothSocket* aConsumer) DroidSocketImpl(MessageLoop* aIOLoop, BluetoothSocket* aConsumer)
: ipc::UnixFdWatcher(aIOLoop) : ipc::UnixFdWatcher(aIOLoop)
, DataSocketIO(MAX_READ_SIZE)
, mConsumer(aConsumer) , mConsumer(aConsumer)
, mShuttingDownOnIOThread(false) , mShuttingDownOnIOThread(false)
, mConnectionStatus(SOCKET_IS_DISCONNECTED) , mConnectionStatus(SOCKET_IS_DISCONNECTED)
@ -145,6 +144,13 @@ public:
return GetDataSocket(); return GetDataSocket();
} }
// Methods for |DataSocket|
//
nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer);
void ConsumeBuffer();
void DiscardBuffer();
/** /**
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
* directly from main thread. All non-main-thread accesses should happen with * directly from main thread. All non-main-thread accesses should happen with
@ -180,6 +186,11 @@ private:
bool mShuttingDownOnIOThread; bool mShuttingDownOnIOThread;
ConnectionStatus mConnectionStatus; ConnectionStatus mConnectionStatus;
/**
* I/O buffer for received data
*/
nsAutoPtr<UnixSocketRawData> mBuffer;
}; };
class SocketConnectTask final : public SocketIOTask<DroidSocketImpl> class SocketConnectTask final : public SocketIOTask<DroidSocketImpl>
@ -492,6 +503,33 @@ DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd)
} }
} }
nsresult
DroidSocketImpl::QueryReceiveBuffer(
UnixSocketIOBuffer** aBuffer)
{
MOZ_ASSERT(aBuffer);
if (!mBuffer) {
mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
}
*aBuffer = mBuffer.get();
return NS_OK;
}
void
DroidSocketImpl::ConsumeBuffer()
{
NS_DispatchToMainThread(
new SocketIOReceiveRunnable<DroidSocketImpl>(this, mBuffer.forget()));
}
void
DroidSocketImpl::DiscardBuffer()
{
// Nothing to do.
}
BluetoothSocket::BluetoothSocket(BluetoothSocketObserver* aObserver, BluetoothSocket::BluetoothSocket(BluetoothSocketObserver* aObserver,
BluetoothSocketType aType, BluetoothSocketType aType,
bool aAuth, bool aAuth,

View File

@ -80,6 +80,13 @@ public:
void OnSocketCanReceiveWithoutBlocking() override; void OnSocketCanReceiveWithoutBlocking() override;
void OnSocketCanSendWithoutBlocking() override; void OnSocketCanSendWithoutBlocking() override;
// Methods for |DataSocket|
//
nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer);
void ConsumeBuffer();
void DiscardBuffer();
private: private:
void FireSocketError(); void FireSocketError();
@ -122,6 +129,11 @@ private:
* Task member for delayed connect task. Should only be access on main thread. * Task member for delayed connect task. Should only be access on main thread.
*/ */
CancelableTask* mDelayedConnectTask; CancelableTask* mDelayedConnectTask;
/**
* I/O buffer for received data
*/
nsAutoPtr<UnixSocketRawData> mBuffer;
}; };
BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO( BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
@ -130,7 +142,6 @@ BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
UnixSocketConnector* aConnector, UnixSocketConnector* aConnector,
const nsACString& aAddress) const nsACString& aAddress)
: UnixSocketWatcher(mIOLoop) : UnixSocketWatcher(mIOLoop)
, DataSocketIO(MAX_READ_SIZE)
, mConsumer(aConsumer) , mConsumer(aConsumer)
, mConnector(aConnector) , mConnector(aConnector)
, mShuttingDownOnIOThread(false) , mShuttingDownOnIOThread(false)
@ -475,6 +486,33 @@ BluetoothSocket::BluetoothSocketIO::SetSocketFlags(int aFd)
return true; return true;
} }
nsresult
BluetoothSocket::BluetoothSocketIO::QueryReceiveBuffer(
UnixSocketIOBuffer** aBuffer)
{
MOZ_ASSERT(aBuffer);
if (!mBuffer) {
mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
}
*aBuffer = mBuffer.get();
return NS_OK;
}
void
BluetoothSocket::BluetoothSocketIO::ConsumeBuffer()
{
NS_DispatchToMainThread(
new SocketIOReceiveRunnable<BluetoothSocketIO>(this, mBuffer.forget()));
}
void
BluetoothSocket::BluetoothSocketIO::DiscardBuffer()
{
// Nothing to do.
}
// //
// Socket tasks // Socket tasks
// //

View File

@ -34,8 +34,7 @@ DataSocketIO::HasPendingData() const
return !mOutgoingQ.IsEmpty(); return !mOutgoingQ.IsEmpty();
} }
DataSocketIO::DataSocketIO(size_t aMaxReadSize) DataSocketIO::DataSocketIO()
: mMaxReadSize(aMaxReadSize)
{ } { }
// //

View File

@ -101,6 +101,37 @@ class DataSocketIO : public SocketIOBase
public: public:
virtual ~DataSocketIO(); virtual ~DataSocketIO();
/**
* Allocates a buffer for receiving data from the socket. The method
* shall return the buffer in the arguments. The buffer is owned by the
* I/O class. |DataSocketIO| will never ask for more than one buffer
* at a time, so I/O classes can handout the same buffer on each invokation
* of this method. I/O-thread only.
*
* @param[out] aBuffer returns a pointer to the I/O buffer
* @return NS_OK on success, or an error code otherwise
*/
virtual nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer) = 0;
/**
* Marks the current socket buffer to by consumed by the I/O class. The
* class is resonsible for releasing the buffer afterwards. I/O-thread
* only.
*
* @param aIndex the socket's index
* @param[out] aBuffer the receive buffer
* @param[out] aSize the receive buffer's size
*/
virtual void ConsumeBuffer() = 0;
/**
* Marks the current socket buffer to be discarded. The I/O class is
* resonsible for releasing the buffer's memory. I/O-thread only.
*
* @param aIndex the socket's index
*/
virtual void DiscardBuffer() = 0;
void EnqueueData(UnixSocketIOBuffer* aBuffer); void EnqueueData(UnixSocketIOBuffer* aBuffer);
bool HasPendingData() const; bool HasPendingData() const;
@ -110,17 +141,25 @@ public:
MOZ_ASSERT(aFd >= 0); MOZ_ASSERT(aFd >= 0);
MOZ_ASSERT(aIO); MOZ_ASSERT(aIO);
nsAutoPtr<UnixSocketRawData> incoming( UnixSocketIOBuffer* incoming;
new UnixSocketRawData(mMaxReadSize)); nsresult rv = QueryReceiveBuffer(&incoming);
if (NS_FAILED(rv)) {
/* an error occured */
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
NS_DispatchToMainThread(r);
return -1;
}
ssize_t res = incoming->Receive(aFd); ssize_t res = incoming->Receive(aFd);
if (res < 0) { if (res < 0) {
/* an I/O error occured */ /* an I/O error occured */
DiscardBuffer();
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO); nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
NS_DispatchToMainThread(r); NS_DispatchToMainThread(r);
return -1; return -1;
} else if (!res) { } else if (!res) {
/* EOF or peer shut down sending */ /* EOF or peer shut down sending */
DiscardBuffer();
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO); nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
NS_DispatchToMainThread(r); NS_DispatchToMainThread(r);
return 0; return 0;
@ -132,9 +171,7 @@ public:
AutoSourceEvent taskTracerEvent(SourceEventType::Unixsocket); AutoSourceEvent taskTracerEvent(SourceEventType::Unixsocket);
#endif #endif
nsRefPtr<nsRunnable> r = ConsumeBuffer();
new SocketIOReceiveRunnable<T>(aIO, incoming.forget());
NS_DispatchToMainThread(r);
return res; return res;
} }
@ -168,11 +205,9 @@ public:
} }
protected: protected:
DataSocketIO(size_t aMaxReadSize); DataSocketIO();
private: private:
const size_t mMaxReadSize;
/** /**
* Raw data queue. Must be pushed/popped from I/O thread only. * Raw data queue. Must be pushed/popped from I/O thread only.
*/ */

View File

@ -86,6 +86,13 @@ public:
void OnSocketCanReceiveWithoutBlocking() override; void OnSocketCanReceiveWithoutBlocking() override;
void OnSocketCanSendWithoutBlocking() override; void OnSocketCanSendWithoutBlocking() override;
// Methods for |DataSocket|
//
nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer);
void ConsumeBuffer();
void DiscardBuffer();
private: private:
void FireSocketError(); void FireSocketError();
@ -128,6 +135,11 @@ private:
* Task member for delayed connect task. Should only be access on main thread. * Task member for delayed connect task. Should only be access on main thread.
*/ */
CancelableTask* mDelayedConnectTask; CancelableTask* mDelayedConnectTask;
/**
* I/O buffer for received data
*/
nsAutoPtr<UnixSocketRawData> mBuffer;
}; };
StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
@ -135,7 +147,6 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
UnixSocketConnector* aConnector, UnixSocketConnector* aConnector,
const nsACString& aAddress) const nsACString& aAddress)
: UnixSocketWatcher(mIOLoop) : UnixSocketWatcher(mIOLoop)
, DataSocketIO(MAX_READ_SIZE)
, mStreamSocket(aStreamSocket) , mStreamSocket(aStreamSocket)
, mConnector(aConnector) , mConnector(aConnector)
, mShuttingDownOnIOThread(false) , mShuttingDownOnIOThread(false)
@ -152,7 +163,6 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd,
UnixSocketConnector* aConnector, UnixSocketConnector* aConnector,
const nsACString& aAddress) const nsACString& aAddress)
: UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus) : UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus)
, DataSocketIO(MAX_READ_SIZE)
, mStreamSocket(aStreamSocket) , mStreamSocket(aStreamSocket)
, mConnector(aConnector) , mConnector(aConnector)
, mShuttingDownOnIOThread(false) , mShuttingDownOnIOThread(false)
@ -498,6 +508,32 @@ StreamSocketIO::SetSocketFlags(int aFd)
return true; return true;
} }
nsresult
StreamSocketIO::QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer)
{
MOZ_ASSERT(aBuffer);
if (!mBuffer) {
mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
}
*aBuffer = mBuffer.get();
return NS_OK;
}
void
StreamSocketIO::ConsumeBuffer()
{
NS_DispatchToMainThread(
new SocketIOReceiveRunnable<StreamSocketIO>(this, mBuffer.forget()));
}
void
StreamSocketIO::DiscardBuffer()
{
// Nothing to do.
}
// //
// Socket tasks // Socket tasks
// //