Backout changeset 5abbf6fd5466 (Bug 830100) for sctp_unittest failure ON A CLOSED TREE r=ryanvm

This commit is contained in:
Randell Jesup 2013-04-09 17:35:20 -04:00
parent 79c4674bc5
commit 79cb6bdccf
7 changed files with 44 additions and 306 deletions

View File

@ -124,19 +124,13 @@ class TransportTestPeer : public sigslot::has_slots<> {
usrsctp_deregister_address(static_cast<void *>(this));
test_utils->sts_target()->Dispatch(WrapRunnable(this,
&TransportTestPeer::Disconnect_s),
&TransportTestPeer::DisconnectInt),
NS_DISPATCH_SYNC);
std::cerr << "~TransportTestPeer() completed" << std::endl;
}
void ConnectSocket(TransportTestPeer *peer) {
test_utils->sts_target()->Dispatch(WrapRunnable(
this, &TransportTestPeer::ConnectSocket_s, peer),
NS_DISPATCH_SYNC);
}
void ConnectSocket_s(TransportTestPeer *peer) {
loopback_->Connect(peer->loopback_);
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(loopback_));
@ -156,7 +150,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
ASSERT_GE(0, r);
}
void Disconnect_s() {
void DisconnectInt() {
if (flow_) {
flow_ = nullptr;
}
@ -197,18 +191,8 @@ class TransportTestPeer : public sigslot::has_slots<> {
int received() const { return received_; }
bool connected() const { return connected_; }
TransportResult SendPacket_s(const unsigned char* data, size_t len) {
return flow_->SendPacket(data, len);
}
TransportResult SendPacket(const unsigned char* data, size_t len) {
TransportResult res;
test_utils->sts_target()->Dispatch(WrapRunnableRet(
this, &TransportTestPeer::SendPacket_s, data, len, &res),
NS_DISPATCH_SYNC);
return res;
return flow_->SendPacket(data, len);
}
void PacketReceived(TransportFlow * flow, const unsigned char* data,

View File

@ -42,40 +42,10 @@ MOZ_MTLOG_MODULE("mtransport")
MtransportTestUtils *test_utils;
// Layer class which can't be initialized.
class TransportLayerDummy : public TransportLayer {
public:
TransportLayerDummy(bool allow_init, bool *destroyed)
: allow_init_(allow_init),
destroyed_(destroyed) {
*destroyed_ = false;
}
virtual ~TransportLayerDummy() {
*destroyed_ = true;
}
virtual nsresult InitInternal() {
return allow_init_ ? NS_OK : NS_ERROR_FAILURE;
}
virtual TransportResult SendPacket(const unsigned char *data, size_t len) {
MOZ_CRASH(); // Should never be called.
return 0;
}
TRANSPORT_LAYER_ID("lossy")
private:
bool allow_init_;
bool *destroyed_;
};
// Class to simulate various kinds of network lossage
class TransportLayerLossy : public TransportLayer {
public:
TransportLayerLossy() : loss_mask_(0), packet_(0) {}
~TransportLayerLossy () {}
virtual TransportResult SendPacket(const unsigned char *data, size_t len) {
MOZ_MTLOG(PR_LOG_NOTICE, LAYER_INFO << "SendPacket(" << len << ")");
@ -167,26 +137,16 @@ class TransportTestPeer : public sigslot::has_slots<> {
NS_DISPATCH_SYNC);
}
void DestroyFlow() {
if (flow_) {
loopback_->Disconnect();
flow_ = nullptr;
}
ice_ctx_ = nullptr;
}
void DisconnectDestroyFlow() {
loopback_->Disconnect();
disconnect_all(); // Disconnect from the signals;
flow_ = nullptr;
flow_ = nullptr;
ice_ctx_ = nullptr;
}
void SetDtlsAllowAll() {
nsresult res = dtls_->SetVerificationAllowAll();
ASSERT_TRUE(NS_SUCCEEDED(res));
}
void SetDtlsPeer(TransportTestPeer *peer, int digests, unsigned int damage) {
unsigned int mask = 1;
@ -211,7 +171,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
}
void ConnectSocket_s(TransportTestPeer *peer) {
void ConnectSocket(TransportTestPeer *peer) {
nsresult res;
res = loopback_->Init();
ASSERT_EQ((nsresult)NS_OK, res);
@ -226,13 +186,6 @@ class TransportTestPeer : public sigslot::has_slots<> {
flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
}
void ConnectSocket(TransportTestPeer *peer) {
RUN_ON_THREAD(test_utils->sts_target(),
WrapRunnable(this, & TransportTestPeer::ConnectSocket_s,
peer),
NS_DISPATCH_SYNC);
}
void InitIce() {
nsresult res;
@ -330,6 +283,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
TransportResult SendPacket(const unsigned char* data, size_t len) {
TransportResult ret;
test_utils->sts_target()->Dispatch(
WrapRunnableRet(flow_, &TransportFlow::SendPacket, data, len, &ret),
NS_DISPATCH_SYNC);
@ -354,22 +308,12 @@ class TransportTestPeer : public sigslot::has_slots<> {
lossy_->SetLoss(loss);
}
TransportLayer::State state() {
TransportLayer::State tstate;
RUN_ON_THREAD(test_utils->sts_target(),
WrapRunnableRet(flow_, &TransportFlow::state, &tstate),
NS_DISPATCH_SYNC);
return tstate;
}
bool connected() {
return state() == TransportLayer::TS_OPEN;
return flow_->state() == TransportLayer::TS_OPEN;
}
bool failed() {
return state() == TransportLayer::TS_ERROR;
return flow_->state() == TransportLayer::TS_ERROR;
}
size_t received() { return received_; }
@ -411,11 +355,6 @@ class TransportTest : public ::testing::Test {
// PR_Close(fds_[1]);
}
void DestroyPeerFlows() {
p1_->DisconnectDestroyFlow();
p2_->DisconnectDestroyFlow();
}
void SetUp() {
nsresult rv;
target_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
@ -502,12 +441,6 @@ TEST_F(TransportTest, TestConnect) {
ConnectSocket();
}
TEST_F(TransportTest, TestConnectDestroyFlowsMainThread) {
SetDtlsPeer();
ConnectSocket();
DestroyPeerFlows();
}
TEST_F(TransportTest, TestConnectAllowAll) {
SetDtlsAllowAll();
ConnectSocket();
@ -567,59 +500,6 @@ TEST_F(TransportTest, TestTransferIce) {
TransferTest(1);
}
TEST(PushTests, LayerFail) {
TransportFlow flow;
nsresult rv;
bool destroyed1, destroyed2;
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = flow.PushLayer(new TransportLayerDummy(false, &destroyed2));
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(TransportLayer::TS_ERROR, flow.state());
ASSERT_EQ(true, destroyed1);
ASSERT_EQ(true, destroyed2);
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(true, destroyed1);
}
TEST(PushTests, LayersFail) {
TransportFlow flow;
nsresult rv;
bool destroyed1, destroyed2, destroyed3;
rv = flow.PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsAutoPtr<std::queue<TransportLayer *> > layers(
new std::queue<TransportLayer *>());
layers->push(new TransportLayerDummy(true, &destroyed2));
layers->push(new TransportLayerDummy(false, &destroyed3));
rv = flow.PushLayers(layers);
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(TransportLayer::TS_ERROR, flow.state());
ASSERT_EQ(true, destroyed1);
ASSERT_EQ(true, destroyed2);
ASSERT_EQ(true, destroyed3);
layers = new std::queue<TransportLayer *>();
layers->push(new TransportLayerDummy(true, &destroyed2));
layers->push(new TransportLayerDummy(true, &destroyed3));
rv = flow.PushLayers(layers);
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(true, destroyed2);
ASSERT_EQ(true, destroyed3);
}
} // end namespace
int main(int argc, char **argv)

View File

@ -10,7 +10,6 @@
#include <prlog.h>
#include "logging.h"
#include "runnable_utils.h"
#include "transportflow.h"
#include "transportlayer.h"
@ -18,51 +17,14 @@ namespace mozilla {
MOZ_MTLOG_MODULE("mtransport")
// There are some hacks here to allow destruction off of
// the main thread.
TransportFlow::~TransportFlow() {
// Make sure that if we are off the right thread, we have
// no more attached signals.
if (!CheckThreadInt()) {
MOZ_ASSERT(SignalStateChange.is_empty());
MOZ_ASSERT(SignalPacketReceived.is_empty());
}
// Push the destruction onto the STS thread. Note that there
// is still some possibility that someone is accessing this
// object simultaneously, but as long as smart pointer discipline
// is maintained, it shouldn't be possible to access and
// destroy it simultaneously. The conversion to an nsAutoPtr
// ensures automatic destruction of the queue at exit of
// DestroyFinal.
nsAutoPtr<std::deque<TransportLayer*> > layers_tmp(layers_.forget());
RUN_ON_THREAD(target_,
WrapRunnableNM(&TransportFlow::DestroyFinal, layers_tmp),
NS_DISPATCH_NORMAL);
}
void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers) {
ClearLayers(layers);
}
void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) {
while (!layers->empty()) {
delete layers->front();
layers->pop();
}
}
void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
while (!layers->empty()) {
delete layers->front();
layers->pop_front();
for (std::deque<TransportLayer *>::iterator it = layers_.begin();
it != layers_.end(); ++it) {
delete *it;
}
}
nsresult TransportFlow::PushLayer(TransportLayer *layer) {
CheckThread();
ScopedDeletePtr<TransportLayer> layer_tmp(layer); // Destroy on failure.
// Don't allow pushes once we are in error state.
if (state_ == TransportLayer::TS_ERROR) {
MOZ_MTLOG(PR_LOG_ERROR, id_ + ": Can't call PushLayer in error state for flow ");
@ -71,26 +33,20 @@ nsresult TransportFlow::PushLayer(TransportLayer *layer) {
nsresult rv = layer->Init();
if (!NS_SUCCEEDED(rv)) {
// Destroy the rest of the flow, because it's no longer in an acceptable
// state.
ClearLayers(layers_.get());
// Set ourselves to have failed.
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Layer initialization failed; invalidating");
StateChangeInt(TransportLayer::TS_ERROR);
return rv;
}
EnsureSameThread(layer);
TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
TransportLayer *old_layer = layers_.empty() ? nullptr : layers_.front();
// Re-target my signals to the new layer
if (old_layer) {
old_layer->SignalStateChange.disconnect(this);
old_layer->SignalPacketReceived.disconnect(this);
}
layers_->push_front(layer_tmp.forget());
layers_.push_front(layer);
layer->Inserted(this, old_layer);
layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
@ -102,8 +58,6 @@ nsresult TransportFlow::PushLayer(TransportLayer *layer) {
// This is all-or-nothing.
nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) {
CheckThread();
MOZ_ASSERT(!layers->empty());
if (layers->empty()) {
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Can't call PushLayers with empty layers");
@ -113,7 +67,6 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
// Don't allow pushes once we are in error state.
if (state_ == TransportLayer::TS_ERROR) {
MOZ_MTLOG(PR_LOG_ERROR, id_ << ": Can't call PushLayers in error state for flow ");
ClearLayers(layers.get());
return NS_ERROR_FAILURE;
}
@ -125,7 +78,7 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
TransportLayer *layer;
while (!layers->empty()) {
TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
TransportLayer *old_layer = layers_.empty() ? nullptr : layers_.front();
layer = layers->front();
rv = layer->Init();
@ -134,21 +87,25 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
break;
}
EnsureSameThread(layer);
// Push the layer onto the queue.
layers_->push_front(layer);
layers_.push_front(layer);
layers->pop();
layer->Inserted(this, old_layer);
}
if (NS_FAILED(rv)) {
// Destroy any layers we could not push.
ClearLayers(layers);
while (!layers->empty()) {
delete layers->front();
layers->pop();
}
// Now destroy the rest of the flow, because it's no longer
// in an acceptable state.
ClearLayers(layers_);
while (!layers_.empty()) {
delete layers_.front();
layers_.pop_front();
}
// Set ourselves to have failed.
StateChangeInt(TransportLayer::TS_ERROR);
@ -166,16 +123,12 @@ nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > laye
}
TransportLayer *TransportFlow::top() const {
CheckThread();
return layers_->empty() ? nullptr : layers_->front();
return layers_.empty() ? nullptr : layers_.front();
}
TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
CheckThread();
for (std::deque<TransportLayer *>::const_iterator it = layers_->begin();
it != layers_->end(); ++it) {
for (std::deque<TransportLayer *>::const_iterator it = layers_.begin();
it != layers_.end(); ++it) {
if ((*it)->id() == id)
return *it;
}
@ -184,38 +137,18 @@ TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
}
TransportLayer::State TransportFlow::state() {
CheckThread();
return state_;
}
TransportResult TransportFlow::SendPacket(const unsigned char *data,
size_t len) {
CheckThread();
if (state_ != TransportLayer::TS_OPEN) {
return TE_ERROR;
}
return top() ? top()->SendPacket(data, len) : TE_ERROR;
}
void TransportFlow::EnsureSameThread(TransportLayer *layer) {
// Enforce that if any of the layers have a thread binding,
// they all have the same binding.
if (target_) {
const nsCOMPtr<nsIEventTarget>& lthread = layer->GetThread();
if (lthread && (lthread != target_))
MOZ_CRASH();
}
else {
target_ = layer->GetThread();
}
}
void TransportFlow::StateChangeInt(TransportLayer::State state) {
CheckThread();
if (state == state_) {
return;
}
@ -226,16 +159,12 @@ void TransportFlow::StateChangeInt(TransportLayer::State state) {
void TransportFlow::StateChange(TransportLayer *layer,
TransportLayer::State state) {
CheckThread();
StateChangeInt(state);
}
void TransportFlow::PacketReceived(TransportLayer* layer,
const unsigned char *data,
size_t len) {
CheckThread();
SignalPacketReceived(this, data, len);
}

View File

@ -15,49 +15,21 @@
#include "nscore.h"
#include "nsISupportsImpl.h"
#include "mozilla/Scoped.h"
#include "transportlayer.h"
#include "m_cpp_utils.h"
// A stack of transport layers acts as a flow.
// Generally, one reads and writes to the top layer.
// This code has a confusing hybrid threading model which
// probably needs some eventual refactoring.
// TODO(ekr@rtfm.com): Bug 844891
//
// TransportFlows are not inherently bound to a thread *but*
// TransportLayers can be. If any layer in a flow is bound
// to a given thread, then all layers in the flow MUST be
// bound to that thread and you can only manipulate the
// flow (push layers, write, etc.) on that thread.
//
// The sole official exception to this is that you are
// allowed to *destroy* a flow off the bound thread provided
// that there are no listeners on its signals. This exception
// is designed to allow idioms where you create the flow
// and then something goes wrong and you destroy it and
// you don't want to bother with a thread dispatch.
//
// Eventually we hope to relax the "no listeners"
// restriction by thread-locking the signals, but previous
// attempts have caused deadlocks.
//
// Most of these invariants are enforced by hard asserts
// (i.e., those which fire even in production builds).
namespace mozilla {
class TransportFlow : public sigslot::has_slots<> {
public:
TransportFlow()
: id_("(anonymous)"),
state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
state_(TransportLayer::TS_NONE) {}
TransportFlow(const std::string id)
: id_(id),
state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
state_(TransportLayer::TS_NONE) {}
~TransportFlow();
@ -100,39 +72,14 @@ class TransportFlow : public sigslot::has_slots<> {
private:
DISALLOW_COPY_ASSIGN(TransportFlow);
// Check if we are on the right thread
void CheckThread() const {
if (!CheckThreadInt())
MOZ_CRASH();
}
bool CheckThreadInt() const {
bool on;
if (!target_) // OK if no thread set.
return true;
if (NS_FAILED(target_->IsOnCurrentThread(&on)))
return false;
return on;
}
void EnsureSameThread(TransportLayer *layer);
void StateChange(TransportLayer *layer, TransportLayer::State state);
void StateChangeInt(TransportLayer::State state);
void PacketReceived(TransportLayer* layer, const unsigned char *data,
size_t len);
static void DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers);
// Overload needed because we use deque internally and queue externally.
static void ClearLayers(std::deque<TransportLayer *>* layers);
static void ClearLayers(std::queue<TransportLayer *>* layers);
std::string id_;
std::deque<TransportLayer *> layers_;
TransportLayer::State state_;
ScopedDeletePtr<std::deque<TransportLayer *> > layers_;
nsCOMPtr<nsIEventTarget> target_;
};
} // close namespace

View File

@ -32,8 +32,9 @@ nsresult TransportLayer::Init() {
}
void TransportLayer::Inserted(TransportFlow *flow, TransportLayer *downward) {
flow_ = flow;
downward_ = downward;
flow_id_ = flow->id();
MOZ_MTLOG(PR_LOG_DEBUG, LAYER_INFO << "Inserted: downward='" <<
(downward ? downward->id(): "none") << "'");
@ -48,4 +49,9 @@ void TransportLayer::SetState(State state) {
}
}
const std::string& TransportLayer::flow_id() {
static const std::string empty;
return flow_ ? flow_->id() : empty;
}
} // close namespace

View File

@ -45,7 +45,7 @@ class TransportLayer : public sigslot::has_slots<> {
TransportLayer(Mode mode = STREAM) :
mode_(mode),
state_(TS_NONE),
flow_id_(),
flow_(nullptr),
downward_(nullptr) {}
virtual ~TransportLayer() {}
@ -82,11 +82,6 @@ class TransportLayer : public sigslot::has_slots<> {
// Must be implemented by derived classes
virtual TransportResult SendPacket(const unsigned char *data, size_t len) = 0;
// Get the thread.
const nsCOMPtr<nsIEventTarget> GetThread() const {
return target_;
}
// Event definitions that one can register for
// State has changed
sigslot::signal2<TransportLayer*, State> SignalStateChange;
@ -98,21 +93,19 @@ class TransportLayer : public sigslot::has_slots<> {
virtual const std::string id() = 0;
// The id of the flow
const std::string& flow_id() {
return flow_id_;
}
virtual const std::string& flow_id();
protected:
virtual void WasInserted() {}
virtual void SetState(State state);
// Check if we are on the right thread
void CheckThread() {
NS_ABORT_IF_FALSE(CheckThreadInt(), "Wrong thread");
}
Mode mode_;
State state_;
std::string flow_id_;
TransportFlow *flow_; // The flow this is part of
TransportLayer *downward_; // The next layer in the stack
nsCOMPtr<nsIEventTarget> target_;
@ -121,10 +114,7 @@ class TransportLayer : public sigslot::has_slots<> {
bool CheckThreadInt() {
bool on;
if (!target_) // OK if no thread set.
return true;
NS_ENSURE_TRUE(target_, false);
NS_ENSURE_SUCCESS(target_->IsOnCurrentThread(&on), false);
NS_ENSURE_TRUE(on, false);

View File

@ -34,6 +34,7 @@ class TransportLayerLoopback : public TransportLayer {
TransportLayerLoopback() :
peer_(nullptr),
timer_(nullptr),
target_(nullptr),
packets_(),
packets_lock_(nullptr),
deliverer_(nullptr) {}
@ -129,6 +130,7 @@ class TransportLayerLoopback : public TransportLayer {
TransportLayerLoopback* peer_;
nsCOMPtr<nsITimer> timer_;
nsCOMPtr<nsIEventTarget> target_;
std::queue<QueuedPacket *> packets_;
PRLock *packets_lock_;
nsRefPtr<Deliverer> deliverer_;