Files
ice/agent.go
T
Sean DuBois 217c656222 Assert MessageIntegrity/Username for inbound
When we get an inbound message assert these values, also discard any
other packet types besides binding. In the future we should extend to
handle inbound error messages

Resolves #19
Resolves #21
2019-04-14 01:22:18 -07:00

911 lines
25 KiB
Go

// Package ice implements the Interactive Connectivity Establishment (ICE)
// protocol defined in rfc5245.
package ice
import (
"bytes"
"encoding/binary"
"fmt"
"math/rand"
"net"
"sort"
"sync"
"time"
"errors"
"github.com/pion/logging"
"github.com/pion/stun"
"github.com/pion/transport/packetio"
)
const (
// taskLoopInterval is the interval at which the agent performs checks
defaultTaskLoopInterval = 2 * time.Second
// keepaliveInterval used to keep candidates alive
defaultKeepaliveInterval = 10 * time.Second
// defaultConnectionTimeout used to declare a connection dead
defaultConnectionTimeout = 30 * time.Second
// the number of bytes that can be buffered before we start to error
maxBufferSize = 1000 * 1000 // 1MB
stunAttrHeaderLength = 4
)
// Agent represents the ICE agent
type Agent struct {
onConnectionStateChangeHdlr func(ConnectionState)
onSelectedCandidatePairChangeHdlr func(*Candidate, *Candidate)
// Used to block double Dial/Accept
opened bool
// State owned by the taskLoop
taskChan chan task
onConnected chan struct{}
onConnectedOnce sync.Once
connectivityChan <-chan time.Time
// force candidate to be contacted immediately (instead of waiting for connectivityChan)
forceCandidateContact chan bool
tieBreaker uint64
connectionState ConnectionState
gatheringState GatheringState
haveStarted bool
isControlling bool
portmin uint16
portmax uint16
//How long should a pair stay quiet before we declare it dead?
//0 means never timeout
connectionTimeout time.Duration
//How often should we send keepalive packets?
//0 means never
keepaliveInterval time.Duration
// How after should we run our internal taskLoop
taskLoopInterval time.Duration
localUfrag string
localPwd string
localCandidates map[NetworkType][]*Candidate
remoteUfrag string
remotePwd string
remoteCandidates map[NetworkType][]*Candidate
selectedPair *candidatePair
validPairs []*candidatePair
buffer *packetio.Buffer
// State for closing
done chan struct{}
err atomicError
log logging.LeveledLogger
}
func (a *Agent) ok() error {
select {
case <-a.done:
return a.getErr()
default:
}
return nil
}
func (a *Agent) getErr() error {
err := a.err.Load()
if err != nil {
return err
}
return ErrClosed
}
// AgentConfig collects the arguments to ice.Agent construction into
// a single structure, for future-proofness of the interface
type AgentConfig struct {
Urls []*URL
// PortMin and PortMax are optional. Leave them 0 for the default UDP port allocation strategy.
PortMin uint16
PortMax uint16
// ConnectionTimeout defaults to 30 seconds when this property is nil.
// If the duration is 0, we will never timeout this connection.
ConnectionTimeout *time.Duration
// KeepaliveInterval determines how often should we send ICE
// keepalives (should be less then connectiontimeout above)
// when this is nil, it defaults to 10 seconds.
// A keepalive interval of 0 means we never send keepalive packets
KeepaliveInterval *time.Duration
// NetworkTypes is an optional configuration for disabling or enablding
// support for specific network types.
NetworkTypes []NetworkType
LoggerFactory logging.LoggerFactory
// taskLoopInterval controls how often our internal task loop runs, this
// task loop handles things like sending keepAlives. This is only value for testing
// keepAlive behavior should be modified with KeepaliveInterval and ConnectionTimeout
taskLoopInterval time.Duration
}
// NewAgent creates a new Agent
func NewAgent(config *AgentConfig) (*Agent, error) {
if config.PortMax < config.PortMin {
return nil, ErrPort
}
loggerFactory := config.LoggerFactory
if loggerFactory == nil {
loggerFactory = logging.NewDefaultLoggerFactory()
}
a := &Agent{
tieBreaker: rand.New(rand.NewSource(time.Now().UnixNano())).Uint64(),
gatheringState: GatheringStateComplete, // TODO trickle-ice
connectionState: ConnectionStateNew,
localCandidates: make(map[NetworkType][]*Candidate),
remoteCandidates: make(map[NetworkType][]*Candidate),
localUfrag: randSeq(16),
localPwd: randSeq(32),
taskChan: make(chan task),
onConnected: make(chan struct{}),
buffer: packetio.NewBuffer(),
done: make(chan struct{}),
portmin: config.PortMin,
portmax: config.PortMax,
log: loggerFactory.NewLogger("ice"),
forceCandidateContact: make(chan bool, 1),
}
// Make sure the buffer doesn't grow indefinitely.
// NOTE: We actually won't get anywhere close to this limit.
// SRTP will constantly read from the endpoint and drop packets if it's full.
a.buffer.SetLimitSize(maxBufferSize)
// connectionTimeout used to declare a connection dead
if config.ConnectionTimeout == nil {
a.connectionTimeout = defaultConnectionTimeout
} else {
a.connectionTimeout = *config.ConnectionTimeout
}
if config.KeepaliveInterval == nil {
a.keepaliveInterval = defaultKeepaliveInterval
} else {
a.keepaliveInterval = *config.KeepaliveInterval
}
if config.taskLoopInterval == 0 {
a.taskLoopInterval = defaultTaskLoopInterval
} else {
a.taskLoopInterval = config.taskLoopInterval
}
// Initialize local candidates
a.gatherCandidatesLocal(config.NetworkTypes)
a.gatherCandidatesReflective(config.Urls, config.NetworkTypes)
go a.taskLoop()
return a, nil
}
// OnConnectionStateChange sets a handler that is fired when the connection state changes
func (a *Agent) OnConnectionStateChange(f func(ConnectionState)) error {
return a.run(func(agent *Agent) {
agent.onConnectionStateChangeHdlr = f
})
}
// OnSelectedCandidatePairChange sets a handler that is fired when the final candidate
// pair is selected
func (a *Agent) OnSelectedCandidatePairChange(f func(*Candidate, *Candidate)) error {
return a.run(func(agent *Agent) {
agent.onSelectedCandidatePairChangeHdlr = f
})
}
func (a *Agent) onSelectedCandidatePairChange(p *candidatePair) {
if p != nil {
if a.onSelectedCandidatePairChangeHdlr != nil {
a.onSelectedCandidatePairChangeHdlr(p.local, p.remote)
}
}
}
func (a *Agent) listenUDP(network string, laddr *net.UDPAddr) (*net.UDPConn, error) {
if (laddr.Port != 0) || ((a.portmin == 0) && (a.portmax == 0)) {
return net.ListenUDP(network, laddr)
}
var i, j int
i = int(a.portmin)
if i == 0 {
i = 1
}
j = int(a.portmax)
if j == 0 {
j = 0xFFFF
}
for i <= j {
c, e := net.ListenUDP(network, &net.UDPAddr{IP: laddr.IP, Port: i})
if e == nil {
return c, e
}
i++
}
return nil, ErrPort
}
func (a *Agent) gatherCandidatesLocal(networkTypes []NetworkType) {
localIPs := localInterfaces(networkTypes)
for _, ip := range localIPs {
for _, network := range supportedNetworks {
conn, err := a.listenUDP(network, &net.UDPAddr{IP: ip, Port: 0})
if err != nil {
a.log.Warnf("could not listen %s %s\n", network, ip)
continue
}
port := conn.LocalAddr().(*net.UDPAddr).Port
c, err := NewCandidateHost(network, ip, port, ComponentRTP)
if err != nil {
a.log.Warnf("Failed to create host candidate: %s %s %d: %v\n", network, ip, port, err)
continue
}
networkType := c.NetworkType
set := a.localCandidates[networkType]
set = append(set, c)
a.localCandidates[networkType] = set
c.start(a, conn)
}
}
}
func (a *Agent) gatherCandidatesReflective(urls []*URL, networkTypes []NetworkType) {
for _, networkType := range networkTypes {
network := networkType.String()
for _, url := range urls {
switch url.Scheme {
case SchemeTypeSTUN:
laddr, xoraddr, err := allocateUDP(network, url)
if err != nil {
a.log.Warnf("could not allocate %s %s: %v\n", network, url, err)
continue
}
conn, err := net.ListenUDP(network, laddr)
if err != nil {
a.log.Warnf("could not listen %s %s: %v\n", network, laddr, err)
}
ip := xoraddr.IP
port := xoraddr.Port
relIP := laddr.IP.String()
relPort := laddr.Port
c, err := NewCandidateServerReflexive(network, ip, port, ComponentRTP, relIP, relPort)
if err != nil {
a.log.Warnf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err)
continue
}
networkType := c.NetworkType
set := a.localCandidates[networkType]
set = append(set, c)
a.localCandidates[networkType] = set
c.start(a, conn)
default:
a.log.Warnf("scheme %s is not implemented\n", url.Scheme)
continue
}
}
}
}
func allocateUDP(network string, url *URL) (*net.UDPAddr, *stun.XorAddress, error) {
// TODO Do we want the timeout to be configurable?
client, err := stun.NewClient(network, fmt.Sprintf("%s:%d", url.Host, url.Port), time.Second*5)
if err != nil {
return nil, nil, flattenErrs([]error{errors.New("failed to create STUN client"), err})
}
localAddr, ok := client.LocalAddr().(*net.UDPAddr)
if !ok {
return nil, nil, fmt.Errorf("failed to cast STUN client to UDPAddr")
}
resp, err := client.Request()
if err != nil {
return nil, nil, flattenErrs([]error{errors.New("failed to make STUN request"), err})
}
if err = client.Close(); err != nil {
return nil, nil, flattenErrs([]error{errors.New("failed to close STUN client"), err})
}
attr, ok := resp.GetOneAttribute(stun.AttrXORMappedAddress)
if !ok {
return nil, nil, fmt.Errorf("got response from STUN server that did not contain XORAddress")
}
var addr stun.XorAddress
if err = addr.Unpack(resp, attr); err != nil {
return nil, nil, flattenErrs([]error{errors.New("failed to unpack STUN XorAddress response"), err})
}
return localAddr, &addr, nil
}
func (a *Agent) startConnectivityChecks(isControlling bool, remoteUfrag, remotePwd string) error {
switch {
case a.haveStarted:
return fmt.Errorf("attempted to start agent twice")
case remoteUfrag == "":
return fmt.Errorf("remoteUfrag is empty")
case remotePwd == "":
return fmt.Errorf("remotePwd is empty")
}
a.log.Debugf("Started agent: isControlling? %t, remoteUfrag: %q, remotePwd: %q", isControlling, remoteUfrag, remotePwd)
return a.run(func(agent *Agent) {
agent.isControlling = isControlling
agent.remoteUfrag = remoteUfrag
agent.remotePwd = remotePwd
agent.updateConnectionState(ConnectionStateChecking)
// TODO this should be dynamic, and grow when the connection is stable
agent.forceCandidateContact <- true
t := time.NewTicker(a.taskLoopInterval)
agent.connectivityChan = t.C
})
}
func (a *Agent) pingCandidate(local, remote *Candidate) {
var msg *stun.Message
var err error
// The controlling agent MUST include the USE-CANDIDATE attribute in
// order to nominate a candidate pair (Section 8.1.1). The controlled
// agent MUST NOT include the USE-CANDIDATE attribute in a Binding
// request.
if a.isControlling {
msg, err = stun.Build(stun.ClassRequest, stun.MethodBinding, stun.GenerateTransactionID(),
&stun.Username{Username: a.remoteUfrag + ":" + a.localUfrag},
&stun.UseCandidate{},
&stun.IceControlling{TieBreaker: a.tieBreaker},
&stun.Priority{Priority: local.Priority()},
&stun.MessageIntegrity{
Key: []byte(a.remotePwd),
},
&stun.Fingerprint{},
)
} else {
msg, err = stun.Build(stun.ClassRequest, stun.MethodBinding, stun.GenerateTransactionID(),
&stun.Username{Username: a.remoteUfrag + ":" + a.localUfrag},
&stun.IceControlled{TieBreaker: a.tieBreaker},
&stun.Priority{Priority: local.Priority()},
&stun.MessageIntegrity{
Key: []byte(a.remotePwd),
},
&stun.Fingerprint{},
)
}
if err != nil {
a.log.Debug(err.Error())
return
}
a.log.Tracef("ping STUN from %s to %s\n", local.String(), remote.String())
a.sendSTUN(msg, local, remote)
}
func (a *Agent) updateConnectionState(newState ConnectionState) {
if a.connectionState != newState {
a.log.Infof("Setting new connection state: %s", newState)
a.connectionState = newState
hdlr := a.onConnectionStateChangeHdlr
if hdlr != nil {
// Call handler async since we may be holding the agent lock
// and the handler may also require it
go hdlr(newState)
}
}
}
type candidatePairs []*candidatePair
func (cp candidatePairs) Len() int { return len(cp) }
func (cp candidatePairs) Swap(i, j int) { cp[i], cp[j] = cp[j], cp[i] }
type byPairPriority struct{ candidatePairs }
// NB: Reverse sort so our candidates start at highest priority
func (bp byPairPriority) Less(i, j int) bool {
return bp.candidatePairs[i].Priority() > bp.candidatePairs[j].Priority()
}
func (a *Agent) setValidPair(local, remote *Candidate, selected, controlling bool) {
// TODO: avoid duplicates
p := newCandidatePair(local, remote, controlling)
a.log.Tracef("Found valid candidate pair: %s (selected? %t)", p, selected)
if selected {
// Notify when the selected pair changes
if !a.selectedPair.Equal(p) {
a.onSelectedCandidatePairChange(p)
}
a.selectedPair = p
a.validPairs = nil
// TODO: only set state to connected on selecting final pair?
a.updateConnectionState(ConnectionStateConnected)
} else {
// keep track of pairs with succesfull bindings since any of them
// can be used for communication until the final pair is selected:
// https://tools.ietf.org/html/draft-ietf-ice-rfc5245bis-20#section-12
a.validPairs = append(a.validPairs, p)
// Sort the candidate pairs by priority of the remotes
sort.Sort(byPairPriority{a.validPairs})
}
// Signal connected
a.onConnectedOnce.Do(func() { close(a.onConnected) })
}
// A task is a
type task func(*Agent)
func (a *Agent) run(t task) error {
err := a.ok()
if err != nil {
return err
}
select {
case <-a.done:
return a.getErr()
case a.taskChan <- t:
}
return nil
}
func (a *Agent) taskLoop() {
contactCandidates := func() {
if a.validateSelectedPair() {
a.log.Trace("checking keepalive")
a.checkKeepalive()
} else {
a.log.Trace("pinging all candidates")
a.pingAllCandidates()
}
}
for {
select {
case <-a.forceCandidateContact:
contactCandidates()
case <-a.connectivityChan:
contactCandidates()
case t := <-a.taskChan:
// Run the task
t(a)
case <-a.done:
return
}
}
}
// validateSelectedPair checks if the selected pair is (still) valid
// Note: the caller should hold the agent lock.
func (a *Agent) validateSelectedPair() bool {
if a.selectedPair == nil {
// Not valid since not selected
return false
}
if (a.connectionTimeout != 0) &&
(time.Since(a.selectedPair.remote.LastReceived()) > a.connectionTimeout) {
a.selectedPair = nil
a.updateConnectionState(ConnectionStateDisconnected)
return false
}
return true
}
// checkKeepalive sends STUN Binding Indications to the selected pair
// if no packet has been sent on that pair in the last keepaliveInterval
// Note: the caller should hold the agent lock.
func (a *Agent) checkKeepalive() {
if a.selectedPair == nil {
return
}
if (a.keepaliveInterval != 0) &&
(time.Since(a.selectedPair.local.LastSent()) > a.keepaliveInterval) {
a.keepaliveCandidate(a.selectedPair.local, a.selectedPair.remote)
}
}
// pingAllCandidates sends STUN Binding Requests to all candidates
// Note: the caller should hold the agent lock.
func (a *Agent) pingAllCandidates() {
for networkType, localCandidates := range a.localCandidates {
if remoteCandidates, ok := a.remoteCandidates[networkType]; ok {
for _, localCandidate := range localCandidates {
for _, remoteCandidate := range remoteCandidates {
a.pingCandidate(localCandidate, remoteCandidate)
}
}
}
}
}
// AddRemoteCandidate adds a new remote candidate
func (a *Agent) AddRemoteCandidate(c *Candidate) error {
return a.run(func(agent *Agent) {
agent.addRemoteCandidate(c)
})
}
// addRemoteCandidate assumes you are holding the lock (must be execute using a.run)
func (a *Agent) addRemoteCandidate(c *Candidate) {
networkType := c.NetworkType
set := a.remoteCandidates[networkType]
for _, candidate := range set {
if candidate.Equal(c) {
return
}
}
set = append(set, c)
a.remoteCandidates[networkType] = set
}
// GetLocalCandidates returns the local candidates
func (a *Agent) GetLocalCandidates() ([]*Candidate, error) {
res := make(chan []*Candidate)
err := a.run(func(agent *Agent) {
var candidates []*Candidate
for _, set := range agent.localCandidates {
candidates = append(candidates, set...)
}
res <- candidates
})
if err != nil {
return nil, err
}
return <-res, nil
}
// GetLocalUserCredentials returns the local user credentials
func (a *Agent) GetLocalUserCredentials() (frag string, pwd string) {
return a.localUfrag, a.localPwd
}
// Close cleans up the Agent
func (a *Agent) Close() error {
done := make(chan struct{})
err := a.run(func(agent *Agent) {
defer func() {
close(done)
}()
agent.err.Store(ErrClosed)
close(agent.done)
// Cleanup all candidates
for net, cs := range agent.localCandidates {
for _, c := range cs {
err := c.close()
if err != nil {
a.log.Warnf("Failed to close candidate %s: %v", c, err)
}
}
delete(agent.localCandidates, net)
}
for net, cs := range agent.remoteCandidates {
for _, c := range cs {
err := c.close()
if err != nil {
a.log.Warnf("Failed to close candidate %s: %v", c, err)
}
}
delete(agent.remoteCandidates, net)
}
err := a.buffer.Close()
if err != nil {
a.log.Warnf("failed to close buffer: %v", err)
}
})
if err != nil {
return err
}
<-done
return nil
}
func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) *Candidate {
var ip net.IP
var port int
switch casted := addr.(type) {
case *net.UDPAddr:
ip = casted.IP
port = casted.Port
case *net.TCPAddr:
ip = casted.IP
port = casted.Port
default:
a.log.Warnf("unsupported address type %T", a)
return nil
}
set := a.remoteCandidates[networkType]
for _, c := range set {
base := c
if base.IP.Equal(ip) &&
base.Port == port {
return c
}
}
return nil
}
func (a *Agent) sendBindingSuccess(m *stun.Message, local, remote *Candidate) {
base := remote
if out, err := stun.Build(stun.ClassSuccessResponse, stun.MethodBinding, m.TransactionID,
&stun.XorMappedAddress{
XorAddress: stun.XorAddress{
IP: base.IP,
Port: base.Port,
},
},
&stun.MessageIntegrity{
Key: []byte(a.localPwd),
},
&stun.Fingerprint{},
); err != nil {
a.log.Warnf("Failed to handle inbound ICE from: %s to: %s error: %s", local, remote, err)
} else {
a.sendSTUN(out, local, remote)
}
}
func (a *Agent) handleInboundControlled(m *stun.Message, localCandidate, remoteCandidate *Candidate) {
if _, isControlled := m.GetOneAttribute(stun.AttrIceControlled); isControlled && !a.isControlling {
a.log.Debug("inbound isControlled && a.isControlling == false")
return
}
successResponse := m.Method == stun.MethodBinding && m.Class == stun.ClassSuccessResponse
_, usepair := m.GetOneAttribute(stun.AttrUseCandidate)
a.log.Tracef("got controlled message (success? %t, usepair? %t)", successResponse, usepair)
// Remember the working pair and select it when marked with usepair
a.setValidPair(localCandidate, remoteCandidate, usepair, false)
if !successResponse {
// Send success response
a.sendBindingSuccess(m, localCandidate, remoteCandidate)
}
}
func (a *Agent) handleInboundControlling(m *stun.Message, localCandidate, remoteCandidate *Candidate) {
if _, isControlling := m.GetOneAttribute(stun.AttrIceControlling); isControlling && a.isControlling {
a.log.Debug("inbound isControlling && a.isControlling == true")
return
} else if _, useCandidate := m.GetOneAttribute(stun.AttrUseCandidate); useCandidate && a.isControlling {
a.log.Debug("useCandidate && a.isControlling == true")
return
}
a.log.Tracef("got controlling message: %#v", m)
successResponse := m.Method == stun.MethodBinding && m.Class == stun.ClassSuccessResponse
// Remember the working pair and select it when receiving a success response
a.setValidPair(localCandidate, remoteCandidate, successResponse, true)
if !successResponse {
// Send success response
a.sendBindingSuccess(m, localCandidate, remoteCandidate)
// We received a ping from the controlled agent. We know the pair works so now we ping with use-candidate set:
a.pingCandidate(localCandidate, remoteCandidate)
}
}
// handleNewPeerReflexiveCandidate adds an unseen remote transport address
// to the remote candidate list as a peer-reflexive candidate.
func (a *Agent) handleNewPeerReflexiveCandidate(local *Candidate, remote net.Addr) error {
var ip net.IP
var port int
switch addr := remote.(type) {
case *net.UDPAddr:
ip = addr.IP
port = addr.Port
case *net.TCPAddr:
ip = addr.IP
port = addr.Port
default:
return fmt.Errorf("unsupported address type %T", addr)
}
pflxCandidate, err := NewCandidatePeerReflexive(
local.NetworkType.String(), // assume, same as that of local
ip,
port,
local.Component,
"", // unknown at this moment. TODO: need a review
0, // unknown at this moment. TODO: need a review
)
if err != nil {
return flattenErrs([]error{fmt.Errorf("failed to create peer-reflexive candidate: %v", remote), err})
}
// Add pflxCandidate to the remote candidate list
a.addRemoteCandidate(pflxCandidate)
return nil
}
func (a *Agent) assertInboundUsername(m *stun.Message) error {
usernameAttr := &stun.Username{}
usernameRawAttr, usernameFound := m.GetOneAttribute(stun.AttrUsername)
if !usernameFound {
return fmt.Errorf("inbound packet missing Username")
} else if err := usernameAttr.Unpack(m, usernameRawAttr); err != nil {
return err
}
expectedUsername := a.localUfrag + ":" + a.remoteUfrag
if usernameAttr.Username != expectedUsername {
return fmt.Errorf("username mismatch expected(%x) actual(%x)", expectedUsername, usernameAttr.Username)
}
return nil
}
func (a *Agent) assertInboundMessageIntegrity(m *stun.Message, key []byte) error {
messageIntegrityAttr := &stun.MessageIntegrity{}
messageIntegrityRawAttr, messageIntegrityAttrFound := m.GetOneAttribute(stun.AttrMessageIntegrity)
if !messageIntegrityAttrFound {
return fmt.Errorf("inbound packet missing MessageIntegrity")
} else if err := messageIntegrityAttr.Unpack(m, messageIntegrityRawAttr); err != nil {
return err
}
tailLength := messageIntegrityRawAttr.Length + stunAttrHeaderLength
rawCopy := make([]byte, len(m.Raw))
copy(rawCopy, m.Raw)
// If we have a fingerprint we need to exclude it from the MessageIntegrity computation
if rawFingerprint, hasFingerprint := m.GetOneAttribute(stun.AttrFingerprint); hasFingerprint {
fingerprintLength := rawFingerprint.Length + stunAttrHeaderLength
tailLength += fingerprintLength
// Rewrite the packet header to be new length (excluding values we don't care about)
currLength := binary.BigEndian.Uint16(rawCopy[2:4])
binary.BigEndian.PutUint16(rawCopy[2:], currLength-fingerprintLength)
}
lengthToHash := len(rawCopy) - int(tailLength)
if lengthToHash < 1 {
return fmt.Errorf("unable to assert MessageIntegrity, length calculation failed (%d)", lengthToHash)
}
computedMessageIntegrity, err := stun.MessageIntegrityCalculateHMAC(key, rawCopy[:lengthToHash])
if err != nil {
return err
} else if !bytes.Equal(computedMessageIntegrity, messageIntegrityRawAttr.Value) {
return fmt.Errorf("messageIntegrity mismatch expected(%x) actual(%x)", computedMessageIntegrity, messageIntegrityRawAttr.Value)
}
return nil
}
// handleInbound processes STUN traffic from a remote candidate
func (a *Agent) handleInbound(m *stun.Message, local *Candidate, remote net.Addr) {
if m == nil || local == nil {
return
}
a.log.Tracef("inbound STUN from %s to %s", remote.String(), local.String())
switch {
case m.Method == stun.MethodBinding && m.Class == stun.ClassSuccessResponse:
if err := a.assertInboundMessageIntegrity(m, []byte(a.remotePwd)); err != nil {
a.log.Warnf("discard message from (%s), %v", remote, err)
return
}
case m.Method == stun.MethodBinding && m.Class == stun.ClassRequest:
if err := a.assertInboundUsername(m); err != nil {
a.log.Warnf("discard message from (%s), %v", remote, err)
return
} else if err := a.assertInboundMessageIntegrity(m, []byte(a.localPwd)); err != nil {
a.log.Warnf("discard message from (%s), %v", remote, err)
return
}
default:
return
}
remoteCandidate := a.findRemoteCandidate(local.NetworkType, remote)
if remoteCandidate == nil {
a.log.Debugf("detected a new peer-reflexive candiate: %s ", remote)
err := a.handleNewPeerReflexiveCandidate(local, remote)
if err != nil {
// Log warning, then move on..
a.log.Warn(err.Error())
}
return
}
remoteCandidate.seen(false)
if a.isControlling {
a.handleInboundControlling(m, local, remoteCandidate)
} else {
a.handleInboundControlled(m, local, remoteCandidate)
}
}
// noSTUNSeen processes non STUN traffic from a remote candidate
func (a *Agent) noSTUNSeen(local *Candidate, remote net.Addr) {
remoteCandidate := a.findRemoteCandidate(local.NetworkType, remote)
if remoteCandidate != nil {
remoteCandidate.seen(false)
}
}
func (a *Agent) getBestPair() (*candidatePair, error) {
res := make(chan *candidatePair)
err := a.run(func(agent *Agent) {
if agent.selectedPair != nil {
res <- agent.selectedPair
return
}
for _, p := range agent.validPairs {
res <- p
return
}
res <- nil
})
if err != nil {
return nil, err
}
out := <-res
if out == nil {
return nil, ErrNoCandidatePairs
}
return out, nil
}