mirror of
https://gitlab.winehq.org/wine/wine-gecko.git
synced 2024-09-13 09:24:08 -07:00
Bug 830100: Refactor transport flow to allow destruction on any thread r=derf
This commit is contained in:
parent
2163169c68
commit
6ae9e48673
@ -124,13 +124,19 @@ class TransportTestPeer : public sigslot::has_slots<> {
|
||||
usrsctp_deregister_address(static_cast<void *>(this));
|
||||
|
||||
test_utils->sts_target()->Dispatch(WrapRunnable(this,
|
||||
&TransportTestPeer::DisconnectInt),
|
||||
&TransportTestPeer::Disconnect_s),
|
||||
NS_DISPATCH_SYNC);
|
||||
|
||||
std::cerr << "~TransportTestPeer() completed" << std::endl;
|
||||
}
|
||||
|
||||
void ConnectSocket(TransportTestPeer *peer) {
|
||||
test_utils->sts_target()->Dispatch(WrapRunnable(
|
||||
this, &TransportTestPeer::ConnectSocket_s, peer),
|
||||
NS_DISPATCH_SYNC);
|
||||
}
|
||||
|
||||
void ConnectSocket_s(TransportTestPeer *peer) {
|
||||
loopback_->Connect(peer->loopback_);
|
||||
|
||||
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(loopback_));
|
||||
@ -150,7 +156,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
|
||||
ASSERT_GE(0, r);
|
||||
}
|
||||
|
||||
void DisconnectInt() {
|
||||
void Disconnect_s() {
|
||||
if (flow_) {
|
||||
flow_ = nullptr;
|
||||
}
|
||||
@ -191,10 +197,20 @@ class TransportTestPeer : public sigslot::has_slots<> {
|
||||
int received() const { return received_; }
|
||||
bool connected() const { return connected_; }
|
||||
|
||||
TransportResult SendPacket(const unsigned char* data, size_t len) {
|
||||
TransportResult SendPacket_s(const unsigned char* data, size_t len) {
|
||||
return flow_->SendPacket(data, len);
|
||||
}
|
||||
|
||||
TransportResult SendPacket(const unsigned char* data, size_t len) {
|
||||
TransportResult res;
|
||||
|
||||
test_utils->sts_target()->Dispatch(WrapRunnableRet(
|
||||
this, &TransportTestPeer::SendPacket_s, data, len, &res),
|
||||
NS_DISPATCH_SYNC);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void PacketReceived(TransportFlow * flow, const unsigned char* data,
|
||||
size_t len) {
|
||||
std::cerr << "Received " << len << " bytes" << std::endl;
|
||||
|
@ -42,10 +42,40 @@ MOZ_MTLOG_MODULE("mtransport")
|
||||
|
||||
MtransportTestUtils *test_utils;
|
||||
|
||||
// Layer class which can't be initialized.
|
||||
class TransportLayerDummy : public TransportLayer {
|
||||
public:
|
||||
TransportLayerDummy(bool allow_init, bool *destroyed)
|
||||
: allow_init_(allow_init),
|
||||
destroyed_(destroyed) {
|
||||
*destroyed_ = false;
|
||||
}
|
||||
|
||||
virtual ~TransportLayerDummy() {
|
||||
*destroyed_ = true;
|
||||
}
|
||||
|
||||
virtual nsresult InitInternal() {
|
||||
return allow_init_ ? NS_OK : NS_ERROR_FAILURE;
|
||||
}
|
||||
|
||||
virtual TransportResult SendPacket(const unsigned char *data, size_t len) {
|
||||
MOZ_CRASH(); // Should never be called.
|
||||
return 0;
|
||||
}
|
||||
|
||||
TRANSPORT_LAYER_ID("lossy")
|
||||
|
||||
private:
|
||||
bool allow_init_;
|
||||
bool *destroyed_;
|
||||
};
|
||||
|
||||
// Class to simulate various kinds of network lossage
|
||||
class TransportLayerLossy : public TransportLayer {
|
||||
public:
|
||||
TransportLayerLossy() : loss_mask_(0), packet_(0) {}
|
||||
~TransportLayerLossy () {}
|
||||
|
||||
virtual TransportResult SendPacket(const unsigned char *data, size_t len) {
|
||||
MOZ_MTLOG(PR_LOG_NOTICE, LAYER_INFO << "SendPacket(" << len << ")");
|
||||
@ -137,16 +167,26 @@ class TransportTestPeer : public sigslot::has_slots<> {
|
||||
NS_DISPATCH_SYNC);
|
||||
}
|
||||
|
||||
|
||||
void DestroyFlow() {
|
||||
loopback_->Disconnect();
|
||||
flow_ = nullptr;
|
||||
if (flow_) {
|
||||
loopback_->Disconnect();
|
||||
flow_ = nullptr;
|
||||
}
|
||||
ice_ctx_ = nullptr;
|
||||
}
|
||||
|
||||
void DisconnectDestroyFlow() {
|
||||
loopback_->Disconnect();
|
||||
disconnect_all(); // Disconnect from the signals;
|
||||
flow_ = nullptr;
|
||||
}
|
||||
|
||||
void SetDtlsAllowAll() {
|
||||
nsresult res = dtls_->SetVerificationAllowAll();
|
||||
ASSERT_TRUE(NS_SUCCEEDED(res));
|
||||
}
|
||||
|
||||
void SetDtlsPeer(TransportTestPeer *peer, int digests, unsigned int damage) {
|
||||
unsigned int mask = 1;
|
||||
|
||||
@ -171,7 +211,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
|
||||
}
|
||||
|
||||
|
||||
void ConnectSocket(TransportTestPeer *peer) {
|
||||
void ConnectSocket_s(TransportTestPeer *peer) {
|
||||
nsresult res;
|
||||
res = loopback_->Init();
|
||||
ASSERT_EQ((nsresult)NS_OK, res);
|
||||
@ -186,6 +226,13 @@ class TransportTestPeer : public sigslot::has_slots<> {
|
||||
flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
|
||||
}
|
||||
|
||||
void ConnectSocket(TransportTestPeer *peer) {
|
||||
RUN_ON_THREAD(test_utils->sts_target(),
|
||||
WrapRunnable(this, & TransportTestPeer::ConnectSocket_s,
|
||||
peer),
|
||||
NS_DISPATCH_SYNC);
|
||||
}
|
||||
|
||||
void InitIce() {
|
||||
nsresult res;
|
||||
|
||||
@ -283,7 +330,6 @@ class TransportTestPeer : public sigslot::has_slots<> {
|
||||
|
||||
TransportResult SendPacket(const unsigned char* data, size_t len) {
|
||||
TransportResult ret;
|
||||
|
||||
test_utils->sts_target()->Dispatch(
|
||||
WrapRunnableRet(flow_, &TransportFlow::SendPacket, data, len, &ret),
|
||||
NS_DISPATCH_SYNC);
|
||||
@ -308,12 +354,22 @@ class TransportTestPeer : public sigslot::has_slots<> {
|
||||
lossy_->SetLoss(loss);
|
||||
}
|
||||
|
||||
TransportLayer::State state() {
|
||||
TransportLayer::State tstate;
|
||||
|
||||
RUN_ON_THREAD(test_utils->sts_target(),
|
||||
WrapRunnableRet(flow_, &TransportFlow::state, &tstate),
|
||||
NS_DISPATCH_SYNC);
|
||||
|
||||
return tstate;
|
||||
}
|
||||
|
||||
bool connected() {
|
||||
return flow_->state() == TransportLayer::TS_OPEN;
|
||||
return state() == TransportLayer::TS_OPEN;
|
||||
}
|
||||
|
||||
bool failed() {
|
||||
return flow_->state() == TransportLayer::TS_ERROR;
|
||||
return state() == TransportLayer::TS_ERROR;
|
||||
}
|
||||
|
||||
size_t received() { return received_; }
|
||||
@ -355,6 +411,11 @@ class TransportTest : public ::testing::Test {
|
||||
// PR_Close(fds_[1]);
|
||||
}
|
||||
|
||||
void DestroyPeerFlows() {
|
||||
p1_->DisconnectDestroyFlow();
|
||||
p2_->DisconnectDestroyFlow();
|
||||
}
|
||||
|
||||
void SetUp() {
|
||||
nsresult rv;
|
||||
target_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
|
||||
@ -441,6 +502,12 @@ TEST_F(TransportTest, TestConnect) {
|
||||
ConnectSocket();
|
||||
}
|
||||
|
||||
TEST_F(TransportTest, TestConnectDestroyFlowsMainThread) {
|
||||
SetDtlsPeer();
|
||||
ConnectSocket();
|
||||
DestroyPeerFlows();
|
||||
}
|
||||
|
||||
TEST_F(TransportTest, TestConnectAllowAll) {
|
||||
SetDtlsAllowAll();
|
||||
ConnectSocket();
|
||||
@ -500,6 +567,59 @@ TEST_F(TransportTest, TestTransferIce) {
|
||||
TransferTest(1);
|
||||
}
|
||||
|
||||
TEST(PushTests, LayerFail) {
|
||||
TransportFlow flow;
|
||||
nsresult rv;
|
||||
bool destroyed1, destroyed2;
|
||||
|
||||
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
|
||||
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
||||
|
||||
rv = flow.PushLayer(new TransportLayerDummy(false, &destroyed2));
|
||||
ASSERT_TRUE(NS_FAILED(rv));
|
||||
|
||||
ASSERT_EQ(TransportLayer::TS_ERROR, flow.state());
|
||||
ASSERT_EQ(true, destroyed1);
|
||||
ASSERT_EQ(true, destroyed2);
|
||||
|
||||
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
|
||||
ASSERT_TRUE(NS_FAILED(rv));
|
||||
ASSERT_EQ(true, destroyed1);
|
||||
}
|
||||
|
||||
|
||||
TEST(PushTests, LayersFail) {
|
||||
TransportFlow flow;
|
||||
nsresult rv;
|
||||
bool destroyed1, destroyed2, destroyed3;
|
||||
|
||||
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
|
||||
ASSERT_TRUE(NS_SUCCEEDED(rv));
|
||||
|
||||
nsAutoPtr<std::queue<TransportLayer *> > layers(
|
||||
new std::queue<TransportLayer *>());
|
||||
|
||||
layers->push(new TransportLayerDummy(true, &destroyed2));
|
||||
layers->push(new TransportLayerDummy(false, &destroyed3));
|
||||
|
||||
rv = flow.PushLayers(layers);
|
||||
ASSERT_TRUE(NS_FAILED(rv));
|
||||
|
||||
ASSERT_EQ(TransportLayer::TS_ERROR, flow.state());
|
||||
ASSERT_EQ(true, destroyed1);
|
||||
ASSERT_EQ(true, destroyed2);
|
||||
ASSERT_EQ(true, destroyed3);
|
||||
|
||||
layers = new std::queue<TransportLayer *>();
|
||||
layers->push(new TransportLayerDummy(true, &destroyed2));
|
||||
layers->push(new TransportLayerDummy(true, &destroyed3));
|
||||
rv = flow.PushLayers(layers);
|
||||
|
||||
ASSERT_TRUE(NS_FAILED(rv));
|
||||
ASSERT_EQ(true, destroyed2);
|
||||
ASSERT_EQ(true, destroyed3);
|
||||
}
|
||||
|
||||
} // end namespace
|
||||
|
||||
int main(int argc, char **argv)
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <prlog.h>
|
||||
|
||||
#include "logging.h"
|
||||
#include "runnable_utils.h"
|
||||
#include "transportflow.h"
|
||||
#include "transportlayer.h"
|
||||
|
||||
@ -17,14 +18,51 @@ namespace mozilla {
|
||||
|
||||
MOZ_MTLOG_MODULE("mtransport")
|
||||
|
||||
// There are some hacks here to allow destruction off of
|
||||
// the main thread.
|
||||
TransportFlow::~TransportFlow() {
|
||||
for (std::deque<TransportLayer *>::iterator it = layers_.begin();
|
||||
it != layers_.end(); ++it) {
|
||||
delete *it;
|
||||
// Make sure that if we are off the right thread, we have
|
||||
// no more attached signals.
|
||||
if (!CheckThreadInt()) {
|
||||
MOZ_ASSERT(SignalStateChange.is_empty());
|
||||
MOZ_ASSERT(SignalPacketReceived.is_empty());
|
||||
}
|
||||
|
||||
// Push the destruction onto the STS thread. Note that there
|
||||
// is still some possibility that someone is accessing this
|
||||
// object simultaneously, but as long as smart pointer discipline
|
||||
// is maintained, it shouldn't be possible to access and
|
||||
// destroy it simultaneously. The conversion to an nsAutoPtr
|
||||
// ensures automatic destruction of the queue at exit of
|
||||
// DestroyFinal.
|
||||
nsAutoPtr<std::deque<TransportLayer*> > layers_tmp(layers_.forget());
|
||||
RUN_ON_THREAD(target_,
|
||||
WrapRunnableNM(&TransportFlow::DestroyFinal, layers_tmp),
|
||||
NS_DISPATCH_NORMAL);
|
||||
}
|
||||
|
||||
void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers) {
|
||||
ClearLayers(layers);
|
||||
}
|
||||
|
||||
void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) {
|
||||
while (!layers->empty()) {
|
||||
delete layers->front();
|
||||
layers->pop();
|
||||
}
|
||||
}
|
||||
|
||||
void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
|
||||
while (!layers->empty()) {
|
||||
delete layers->front();
|
||||
layers->pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
nsresult TransportFlow::PushLayer(TransportLayer *layer) {
|
||||
CheckThread();
|
||||
ScopedDeletePtr<TransportLayer> layer_tmp(layer); // Destroy on failure.
|
||||
|
||||
// Don't allow pushes once we are in error state.
|
||||
if (state_ == TransportLayer::TS_ERROR) {
|
||||
MOZ_MTLOG(PR_LOG_ERROR, id_ + ": Can't call PushLayer in error state for flow ");
|
||||
@ -33,20 +71,26 @@ nsresult TransportFlow::PushLayer(TransportLayer *layer) {
|
||||
|
||||
nsresult rv = layer->Init();
|
||||
if (!NS_SUCCEEDED(rv)) {
|
||||
// Destroy the rest of the flow, because it's no longer in an acceptable
|
||||
// state.
|
||||
ClearLayers(layers_.get());
|
||||
|
||||
// Set ourselves to have failed.
|
||||
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Layer initialization failed; invalidating");
|
||||
StateChangeInt(TransportLayer::TS_ERROR);
|
||||
|
||||
return rv;
|
||||
}
|
||||
EnsureSameThread(layer);
|
||||
|
||||
TransportLayer *old_layer = layers_.empty() ? nullptr : layers_.front();
|
||||
TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
|
||||
|
||||
// Re-target my signals to the new layer
|
||||
if (old_layer) {
|
||||
old_layer->SignalStateChange.disconnect(this);
|
||||
old_layer->SignalPacketReceived.disconnect(this);
|
||||
}
|
||||
layers_.push_front(layer);
|
||||
layers_->push_front(layer_tmp.forget());
|
||||
layer->Inserted(this, old_layer);
|
||||
|
||||
layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
|
||||
@ -58,6 +102,8 @@ nsresult TransportFlow::PushLayer(TransportLayer *layer) {
|
||||
|
||||
// This is all-or-nothing.
|
||||
nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) {
|
||||
CheckThread();
|
||||
|
||||
MOZ_ASSERT(!layers->empty());
|
||||
if (layers->empty()) {
|
||||
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Can't call PushLayers with empty layers");
|
||||
@ -67,6 +113,7 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
|
||||
// Don't allow pushes once we are in error state.
|
||||
if (state_ == TransportLayer::TS_ERROR) {
|
||||
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Can't call PushLayers in error state for flow ");
|
||||
ClearLayers(layers.get());
|
||||
return NS_ERROR_FAILURE;
|
||||
}
|
||||
|
||||
@ -78,7 +125,7 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
|
||||
TransportLayer *layer;
|
||||
|
||||
while (!layers->empty()) {
|
||||
TransportLayer *old_layer = layers_.empty() ? nullptr : layers_.front();
|
||||
TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
|
||||
layer = layers->front();
|
||||
|
||||
rv = layer->Init();
|
||||
@ -87,25 +134,21 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
|
||||
break;
|
||||
}
|
||||
|
||||
EnsureSameThread(layer);
|
||||
|
||||
// Push the layer onto the queue.
|
||||
layers_.push_front(layer);
|
||||
layers_->push_front(layer);
|
||||
layers->pop();
|
||||
layer->Inserted(this, old_layer);
|
||||
}
|
||||
|
||||
if (NS_FAILED(rv)) {
|
||||
// Destroy any layers we could not push.
|
||||
while (!layers->empty()) {
|
||||
delete layers->front();
|
||||
layers->pop();
|
||||
}
|
||||
ClearLayers(layers);
|
||||
|
||||
// Now destroy the rest of the flow, because it's no longer
|
||||
// in an acceptable state.
|
||||
while (!layers_.empty()) {
|
||||
delete layers_.front();
|
||||
layers_.pop_front();
|
||||
}
|
||||
ClearLayers(layers_);
|
||||
|
||||
// Set ourselves to have failed.
|
||||
StateChangeInt(TransportLayer::TS_ERROR);
|
||||
@ -123,12 +166,16 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
|
||||
}
|
||||
|
||||
TransportLayer *TransportFlow::top() const {
|
||||
return layers_.empty() ? nullptr : layers_.front();
|
||||
CheckThread();
|
||||
|
||||
return layers_->empty() ? nullptr : layers_->front();
|
||||
}
|
||||
|
||||
TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
|
||||
for (std::deque<TransportLayer *>::const_iterator it = layers_.begin();
|
||||
it != layers_.end(); ++it) {
|
||||
CheckThread();
|
||||
|
||||
for (std::deque<TransportLayer *>::const_iterator it = layers_->begin();
|
||||
it != layers_->end(); ++it) {
|
||||
if ((*it)->id() == id)
|
||||
return *it;
|
||||
}
|
||||
@ -137,18 +184,38 @@ TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
|
||||
}
|
||||
|
||||
TransportLayer::State TransportFlow::state() {
|
||||
CheckThread();
|
||||
|
||||
return state_;
|
||||
}
|
||||
|
||||
TransportResult TransportFlow::SendPacket(const unsigned char *data,
|
||||
size_t len) {
|
||||
CheckThread();
|
||||
|
||||
if (state_ != TransportLayer::TS_OPEN) {
|
||||
return TE_ERROR;
|
||||
}
|
||||
return top() ? top()->SendPacket(data, len) : TE_ERROR;
|
||||
}
|
||||
|
||||
void TransportFlow::EnsureSameThread(TransportLayer *layer) {
|
||||
// Enforce that if any of the layers have a thread binding,
|
||||
// they all have the same binding.
|
||||
if (target_) {
|
||||
const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread();
|
||||
|
||||
if (lthread && (lthread != target_))
|
||||
MOZ_CRASH();
|
||||
}
|
||||
else {
|
||||
target_ = layer->GetThread();
|
||||
}
|
||||
}
|
||||
|
||||
void TransportFlow::StateChangeInt(TransportLayer::State state) {
|
||||
CheckThread();
|
||||
|
||||
if (state == state_) {
|
||||
return;
|
||||
}
|
||||
@ -159,12 +226,16 @@ void TransportFlow::StateChangeInt(TransportLayer::State state) {
|
||||
|
||||
void TransportFlow::StateChange(TransportLayer *layer,
|
||||
TransportLayer::State state) {
|
||||
CheckThread();
|
||||
|
||||
StateChangeInt(state);
|
||||
}
|
||||
|
||||
void TransportFlow::PacketReceived(TransportLayer* layer,
|
||||
const unsigned char *data,
|
||||
size_t len) {
|
||||
CheckThread();
|
||||
|
||||
SignalPacketReceived(this, data, len);
|
||||
}
|
||||
|
||||
|
@ -15,21 +15,49 @@
|
||||
|
||||
#include "nscore.h"
|
||||
#include "nsISupportsImpl.h"
|
||||
#include "mozilla/Scoped.h"
|
||||
#include "transportlayer.h"
|
||||
#include "m_cpp_utils.h"
|
||||
|
||||
// A stack of transport layers acts as a flow.
|
||||
// Generally, one reads and writes to the top layer.
|
||||
|
||||
// This code has a confusing hybrid threading model which
|
||||
// probably needs some eventual refactoring.
|
||||
// TODO(ekr@rtfm.com): Bug 844891
|
||||
//
|
||||
// TransportFlows are not inherently bound to a thread *but*
|
||||
// TransportLayers can be. If any layer in a flow is bound
|
||||
// to a given thread, then all layers in the flow MUST be
|
||||
// bound to that thread and you can only manipulate the
|
||||
// flow (push layers, write, etc.) on that thread.
|
||||
//
|
||||
// The sole official exception to this is that you are
|
||||
// allowed to *destroy* a flow off the bound thread provided
|
||||
// that there are no listeners on its signals. This exception
|
||||
// is designed to allow idioms where you create the flow
|
||||
// and then something goes wrong and you destroy it and
|
||||
// you don't want to bother with a thread dispatch.
|
||||
//
|
||||
// Eventually we hope to relax the "no listeners"
|
||||
// restriction by thread-locking the signals, but previous
|
||||
// attempts have caused deadlocks.
|
||||
//
|
||||
// Most of these invariants are enforced by hard asserts
|
||||
// (i.e., those which fire even in production builds).
|
||||
|
||||
namespace mozilla {
|
||||
|
||||
class TransportFlow : public sigslot::has_slots<> {
|
||||
public:
|
||||
TransportFlow()
|
||||
: id_("(anonymous)"),
|
||||
state_(TransportLayer::TS_NONE) {}
|
||||
state_(TransportLayer::TS_NONE),
|
||||
layers_(new std::deque<TransportLayer *>) {}
|
||||
TransportFlow(const std::string id)
|
||||
: id_(id),
|
||||
state_(TransportLayer::TS_NONE) {}
|
||||
state_(TransportLayer::TS_NONE),
|
||||
layers_(new std::deque<TransportLayer *>) {}
|
||||
|
||||
~TransportFlow();
|
||||
|
||||
@ -72,14 +100,39 @@ class TransportFlow : public sigslot::has_slots<> {
|
||||
private:
|
||||
DISALLOW_COPY_ASSIGN(TransportFlow);
|
||||
|
||||
// Check if we are on the right thread
|
||||
void CheckThread() const {
|
||||
if (!CheckThreadInt())
|
||||
MOZ_CRASH();
|
||||
}
|
||||
|
||||
bool CheckThreadInt() const {
|
||||
bool on;
|
||||
|
||||
if (!target_) // OK if no thread set.
|
||||
return true;
|
||||
if (NS_FAILED(target_->IsOnCurrentThread(&on)))
|
||||
return false;
|
||||
|
||||
return on;
|
||||
}
|
||||
|
||||
void EnsureSameThread(TransportLayer *layer);
|
||||
|
||||
void StateChange(TransportLayer *layer, TransportLayer::State state);
|
||||
void StateChangeInt(TransportLayer::State state);
|
||||
void PacketReceived(TransportLayer* layer, const unsigned char *data,
|
||||
size_t len);
|
||||
static void DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers);
|
||||
|
||||
// Overload needed because we use deque internally and queue externally.
|
||||
static void ClearLayers(std::deque<TransportLayer *>* layers);
|
||||
static void ClearLayers(std::queue<TransportLayer *>* layers);
|
||||
|
||||
std::string id_;
|
||||
std::deque<TransportLayer *> layers_;
|
||||
TransportLayer::State state_;
|
||||
ScopedDeletePtr<std::deque<TransportLayer *> > layers_;
|
||||
nsCOMPtr<nsIEventTarget> target_;
|
||||
};
|
||||
|
||||
} // close namespace
|
||||
|
@ -32,9 +32,8 @@ nsresult TransportLayer::Init() {
|
||||
}
|
||||
|
||||
void TransportLayer::Inserted(TransportFlow *flow, TransportLayer *downward) {
|
||||
flow_ = flow;
|
||||
downward_ = downward;
|
||||
|
||||
flow_id_ = flow->id();
|
||||
MOZ_MTLOG(PR_LOG_DEBUG, LAYER_INFO << "Inserted: downward='" <<
|
||||
(downward ? downward->id(): "none") << "'");
|
||||
|
||||
@ -49,9 +48,4 @@ void TransportLayer::SetState(State state) {
|
||||
}
|
||||
}
|
||||
|
||||
const std::string& TransportLayer::flow_id() {
|
||||
static const std::string empty;
|
||||
|
||||
return flow_ ? flow_->id() : empty;
|
||||
}
|
||||
} // close namespace
|
||||
|
@ -45,7 +45,7 @@ class TransportLayer : public sigslot::has_slots<> {
|
||||
TransportLayer(Mode mode = STREAM) :
|
||||
mode_(mode),
|
||||
state_(TS_NONE),
|
||||
flow_(nullptr),
|
||||
flow_id_(),
|
||||
downward_(nullptr) {}
|
||||
|
||||
virtual ~TransportLayer() {}
|
||||
@ -82,6 +82,11 @@ class TransportLayer : public sigslot::has_slots<> {
|
||||
// Must be implemented by derived classes
|
||||
virtual TransportResult SendPacket(const unsigned char *data, size_t len) = 0;
|
||||
|
||||
// Get the thread.
|
||||
const nsCOMPtr<nsIEventTarget> GetThread() const {
|
||||
return target_;
|
||||
}
|
||||
|
||||
// Event definitions that one can register for
|
||||
// State has changed
|
||||
sigslot::signal2<TransportLayer*, State> SignalStateChange;
|
||||
@ -93,19 +98,21 @@ class TransportLayer : public sigslot::has_slots<> {
|
||||
virtual const std::string id() = 0;
|
||||
|
||||
// The id of the flow
|
||||
virtual const std::string& flow_id();
|
||||
const std::string& flow_id() {
|
||||
return flow_id_;
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void WasInserted() {}
|
||||
virtual void SetState(State state);
|
||||
|
||||
// Check if we are on the right thread
|
||||
void CheckThread() {
|
||||
NS_ABORT_IF_FALSE(CheckThreadInt(), "Wrong thread");
|
||||
}
|
||||
|
||||
Mode mode_;
|
||||
State state_;
|
||||
TransportFlow *flow_; // The flow this is part of
|
||||
std::string flow_id_;
|
||||
TransportLayer *downward_; // The next layer in the stack
|
||||
nsCOMPtr<nsIEventTarget> target_;
|
||||
|
||||
@ -114,7 +121,10 @@ class TransportLayer : public sigslot::has_slots<> {
|
||||
|
||||
bool CheckThreadInt() {
|
||||
bool on;
|
||||
NS_ENSURE_TRUE(target_, false);
|
||||
|
||||
if (!target_) // OK if no thread set.
|
||||
return true;
|
||||
|
||||
NS_ENSURE_SUCCESS(target_->IsOnCurrentThread(&on), false);
|
||||
NS_ENSURE_TRUE(on, false);
|
||||
|
||||
|
@ -34,7 +34,6 @@ class TransportLayerLoopback : public TransportLayer {
|
||||
TransportLayerLoopback() :
|
||||
peer_(nullptr),
|
||||
timer_(nullptr),
|
||||
target_(nullptr),
|
||||
packets_(),
|
||||
packets_lock_(nullptr),
|
||||
deliverer_(nullptr) {}
|
||||
@ -130,7 +129,6 @@ class TransportLayerLoopback : public TransportLayer {
|
||||
|
||||
TransportLayerLoopback* peer_;
|
||||
nsCOMPtr<nsITimer> timer_;
|
||||
nsCOMPtr<nsIEventTarget> target_;
|
||||
std::queue<QueuedPacket *> packets_;
|
||||
PRLock *packets_lock_;
|
||||
nsRefPtr<Deliverer> deliverer_;
|
||||
|
Loading…
Reference in New Issue
Block a user