From 358002fcbd6a510a191ae5bdc4ce9c3c3e3079b3 Mon Sep 17 00:00:00 2001 From: Thomas Zimmermann Date: Tue, 28 Apr 2015 10:18:13 +0200 Subject: [PATCH] Bug 1158876: Move management of socket I/O buffers into socket I/O classes, r=kmachulis This patch moves management of received socket I/O buffers from |DataSocketIO| into the I/O classes. Each I/O class is responsible for (de-)allocating buffers, and consuming them once data has been received. All current I/O classes forward their buffers to the main thread, but other operations are possible. For example, received data can be parsed and processed directly in the I/O thread. --- dom/bluetooth/bluedroid/BluetoothSocket.cpp | 40 +++++++++++++++- dom/bluetooth/bluez/BluetoothSocket.cpp | 40 +++++++++++++++- ipc/unixsocket/DataSocket.cpp | 3 +- ipc/unixsocket/DataSocket.h | 51 +++++++++++++++++---- ipc/unixsocket/StreamSocket.cpp | 40 +++++++++++++++- 5 files changed, 160 insertions(+), 14 deletions(-) diff --git a/dom/bluetooth/bluedroid/BluetoothSocket.cpp b/dom/bluetooth/bluedroid/BluetoothSocket.cpp index df3a9f61901..c411297c635 100644 --- a/dom/bluetooth/bluedroid/BluetoothSocket.cpp +++ b/dom/bluetooth/bluedroid/BluetoothSocket.cpp @@ -75,7 +75,6 @@ public: DroidSocketImpl(MessageLoop* aIOLoop, BluetoothSocket* aConsumer) : ipc::UnixFdWatcher(aIOLoop) - , DataSocketIO(MAX_READ_SIZE) , mConsumer(aConsumer) , mShuttingDownOnIOThread(false) , mConnectionStatus(SOCKET_IS_DISCONNECTED) @@ -145,6 +144,13 @@ public: return GetDataSocket(); } + // Methods for |DataSocket| + // + + nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer); + void ConsumeBuffer(); + void DiscardBuffer(); + /** * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated * directly from main thread. All non-main-thread accesses should happen with @@ -180,6 +186,11 @@ private: bool mShuttingDownOnIOThread; ConnectionStatus mConnectionStatus; + + /** + * I/O buffer for received data + */ + nsAutoPtr mBuffer; }; class SocketConnectTask final : public SocketIOTask @@ -492,6 +503,33 @@ DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd) } } +nsresult +DroidSocketImpl::QueryReceiveBuffer( + UnixSocketIOBuffer** aBuffer) +{ + MOZ_ASSERT(aBuffer); + + if (!mBuffer) { + mBuffer = new UnixSocketRawData(MAX_READ_SIZE); + } + *aBuffer = mBuffer.get(); + + return NS_OK; +} + +void +DroidSocketImpl::ConsumeBuffer() +{ + NS_DispatchToMainThread( + new SocketIOReceiveRunnable(this, mBuffer.forget())); +} + +void +DroidSocketImpl::DiscardBuffer() +{ + // Nothing to do. +} + BluetoothSocket::BluetoothSocket(BluetoothSocketObserver* aObserver, BluetoothSocketType aType, bool aAuth, diff --git a/dom/bluetooth/bluez/BluetoothSocket.cpp b/dom/bluetooth/bluez/BluetoothSocket.cpp index 54c02a8261e..1a295bbb74e 100644 --- a/dom/bluetooth/bluez/BluetoothSocket.cpp +++ b/dom/bluetooth/bluez/BluetoothSocket.cpp @@ -80,6 +80,13 @@ public: void OnSocketCanReceiveWithoutBlocking() override; void OnSocketCanSendWithoutBlocking() override; + // Methods for |DataSocket| + // + + nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer); + void ConsumeBuffer(); + void DiscardBuffer(); + private: void FireSocketError(); @@ -122,6 +129,11 @@ private: * Task member for delayed connect task. Should only be access on main thread. */ CancelableTask* mDelayedConnectTask; + + /** + * I/O buffer for received data + */ + nsAutoPtr mBuffer; }; BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO( @@ -130,7 +142,6 @@ BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO( UnixSocketConnector* aConnector, const nsACString& aAddress) : UnixSocketWatcher(mIOLoop) - , DataSocketIO(MAX_READ_SIZE) , mConsumer(aConsumer) , mConnector(aConnector) , mShuttingDownOnIOThread(false) @@ -475,6 +486,33 @@ BluetoothSocket::BluetoothSocketIO::SetSocketFlags(int aFd) return true; } +nsresult +BluetoothSocket::BluetoothSocketIO::QueryReceiveBuffer( + UnixSocketIOBuffer** aBuffer) +{ + MOZ_ASSERT(aBuffer); + + if (!mBuffer) { + mBuffer = new UnixSocketRawData(MAX_READ_SIZE); + } + *aBuffer = mBuffer.get(); + + return NS_OK; +} + +void +BluetoothSocket::BluetoothSocketIO::ConsumeBuffer() +{ + NS_DispatchToMainThread( + new SocketIOReceiveRunnable(this, mBuffer.forget())); +} + +void +BluetoothSocket::BluetoothSocketIO::DiscardBuffer() +{ + // Nothing to do. +} + // // Socket tasks // diff --git a/ipc/unixsocket/DataSocket.cpp b/ipc/unixsocket/DataSocket.cpp index d9c48ed55c0..f153f8419c2 100644 --- a/ipc/unixsocket/DataSocket.cpp +++ b/ipc/unixsocket/DataSocket.cpp @@ -34,8 +34,7 @@ DataSocketIO::HasPendingData() const return !mOutgoingQ.IsEmpty(); } -DataSocketIO::DataSocketIO(size_t aMaxReadSize) - : mMaxReadSize(aMaxReadSize) +DataSocketIO::DataSocketIO() { } // diff --git a/ipc/unixsocket/DataSocket.h b/ipc/unixsocket/DataSocket.h index 864a5c2d62c..c35bb108f1a 100644 --- a/ipc/unixsocket/DataSocket.h +++ b/ipc/unixsocket/DataSocket.h @@ -101,6 +101,37 @@ class DataSocketIO : public SocketIOBase public: virtual ~DataSocketIO(); + /** + * Allocates a buffer for receiving data from the socket. The method + * shall return the buffer in the arguments. The buffer is owned by the + * I/O class. |DataSocketIO| will never ask for more than one buffer + * at a time, so I/O classes can handout the same buffer on each invokation + * of this method. I/O-thread only. + * + * @param[out] aBuffer returns a pointer to the I/O buffer + * @return NS_OK on success, or an error code otherwise + */ + virtual nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer) = 0; + + /** + * Marks the current socket buffer to by consumed by the I/O class. The + * class is resonsible for releasing the buffer afterwards. I/O-thread + * only. + * + * @param aIndex the socket's index + * @param[out] aBuffer the receive buffer + * @param[out] aSize the receive buffer's size + */ + virtual void ConsumeBuffer() = 0; + + /** + * Marks the current socket buffer to be discarded. The I/O class is + * resonsible for releasing the buffer's memory. I/O-thread only. + * + * @param aIndex the socket's index + */ + virtual void DiscardBuffer() = 0; + void EnqueueData(UnixSocketIOBuffer* aBuffer); bool HasPendingData() const; @@ -110,17 +141,25 @@ public: MOZ_ASSERT(aFd >= 0); MOZ_ASSERT(aIO); - nsAutoPtr incoming( - new UnixSocketRawData(mMaxReadSize)); + UnixSocketIOBuffer* incoming; + nsresult rv = QueryReceiveBuffer(&incoming); + if (NS_FAILED(rv)) { + /* an error occured */ + nsRefPtr r = new SocketIORequestClosingRunnable(aIO); + NS_DispatchToMainThread(r); + return -1; + } ssize_t res = incoming->Receive(aFd); if (res < 0) { /* an I/O error occured */ + DiscardBuffer(); nsRefPtr r = new SocketIORequestClosingRunnable(aIO); NS_DispatchToMainThread(r); return -1; } else if (!res) { /* EOF or peer shut down sending */ + DiscardBuffer(); nsRefPtr r = new SocketIORequestClosingRunnable(aIO); NS_DispatchToMainThread(r); return 0; @@ -132,9 +171,7 @@ public: AutoSourceEvent taskTracerEvent(SourceEventType::Unixsocket); #endif - nsRefPtr r = - new SocketIOReceiveRunnable(aIO, incoming.forget()); - NS_DispatchToMainThread(r); + ConsumeBuffer(); return res; } @@ -168,11 +205,9 @@ public: } protected: - DataSocketIO(size_t aMaxReadSize); + DataSocketIO(); private: - const size_t mMaxReadSize; - /** * Raw data queue. Must be pushed/popped from I/O thread only. */ diff --git a/ipc/unixsocket/StreamSocket.cpp b/ipc/unixsocket/StreamSocket.cpp index 8429ff96919..2769952ad55 100644 --- a/ipc/unixsocket/StreamSocket.cpp +++ b/ipc/unixsocket/StreamSocket.cpp @@ -86,6 +86,13 @@ public: void OnSocketCanReceiveWithoutBlocking() override; void OnSocketCanSendWithoutBlocking() override; + // Methods for |DataSocket| + // + + nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer); + void ConsumeBuffer(); + void DiscardBuffer(); + private: void FireSocketError(); @@ -128,6 +135,11 @@ private: * Task member for delayed connect task. Should only be access on main thread. */ CancelableTask* mDelayedConnectTask; + + /** + * I/O buffer for received data + */ + nsAutoPtr mBuffer; }; StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, @@ -135,7 +147,6 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, UnixSocketConnector* aConnector, const nsACString& aAddress) : UnixSocketWatcher(mIOLoop) - , DataSocketIO(MAX_READ_SIZE) , mStreamSocket(aStreamSocket) , mConnector(aConnector) , mShuttingDownOnIOThread(false) @@ -152,7 +163,6 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd, UnixSocketConnector* aConnector, const nsACString& aAddress) : UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus) - , DataSocketIO(MAX_READ_SIZE) , mStreamSocket(aStreamSocket) , mConnector(aConnector) , mShuttingDownOnIOThread(false) @@ -498,6 +508,32 @@ StreamSocketIO::SetSocketFlags(int aFd) return true; } +nsresult +StreamSocketIO::QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer) +{ + MOZ_ASSERT(aBuffer); + + if (!mBuffer) { + mBuffer = new UnixSocketRawData(MAX_READ_SIZE); + } + *aBuffer = mBuffer.get(); + + return NS_OK; +} + +void +StreamSocketIO::ConsumeBuffer() +{ + NS_DispatchToMainThread( + new SocketIOReceiveRunnable(this, mBuffer.forget())); +} + +void +StreamSocketIO::DiscardBuffer() +{ + // Nothing to do. +} + // // Socket tasks //