mirror of
https://github.com/encounter/go-rtmp.git
synced 2026-03-30 11:12:49 -07:00
229 lines
5.8 KiB
Go
229 lines
5.8 KiB
Go
//
|
|
// Copyright (c) 2018- yutopp (yutopp@gmail.com)
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
|
|
package rtmp
|
|
|
|
import (
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"log"
|
|
"runtime/debug"
|
|
"sync"
|
|
|
|
"github.com/encounter/go-rtmp/internal"
|
|
"github.com/encounter/go-rtmp/message"
|
|
)
|
|
|
|
type streamState int
|
|
|
|
const (
|
|
streamStateUnknown streamState = iota
|
|
streamStateServerNotConnected
|
|
streamStateServerConnected
|
|
streamStateServerInactive
|
|
streamStateServerPublish
|
|
streamStateServerPlay
|
|
streamStateClientNotConnected
|
|
streamStateClientConnected
|
|
streamStateClientPlay
|
|
)
|
|
|
|
func (s streamState) String() string {
|
|
switch s {
|
|
case streamStateServerNotConnected:
|
|
return "NotConnected(Server)"
|
|
case streamStateServerConnected:
|
|
return "Connected(Server)"
|
|
case streamStateServerInactive:
|
|
return "Inactive(Server)"
|
|
case streamStateServerPublish:
|
|
return "Publish(Server)"
|
|
case streamStateServerPlay:
|
|
return "Play(Server)"
|
|
case streamStateClientNotConnected:
|
|
return "NotConnected(Client)"
|
|
case streamStateClientConnected:
|
|
return "Connected(Client)"
|
|
default:
|
|
return "<Unknown>"
|
|
}
|
|
}
|
|
|
|
// streamHandler A handler per streams.
|
|
// It holds a handler for each states and processes messages sent to the stream
|
|
type streamHandler struct {
|
|
stream *Stream
|
|
handler stateHandler // A handler for each states
|
|
state streamState
|
|
|
|
loggerEntry *logrus.Entry
|
|
m sync.Mutex
|
|
}
|
|
|
|
// newEntryHandler Create an incomplete new instance of entryHandler.
|
|
// msgHandler fields must be assigned by a caller of this function
|
|
func newStreamHandler(s *Stream) *streamHandler {
|
|
return &streamHandler{
|
|
stream: s,
|
|
}
|
|
}
|
|
|
|
func (h *streamHandler) Handle(chunkStreamID int, timestamp uint32, msg message.Message) error {
|
|
//l := h.Logger()
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Println(h, h.stream, h.handler)
|
|
log.Println(string(debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
switch msg := msg.(type) {
|
|
case *message.DataMessage:
|
|
return h.handleData(chunkStreamID, timestamp, msg)
|
|
|
|
case *message.CommandMessage:
|
|
return h.handleCommand(chunkStreamID, timestamp, msg)
|
|
|
|
case *message.SetChunkSize:
|
|
//l.Infof("Handle SetChunkSize: Msg = %#v", msg)
|
|
return h.stream.streamer().PeerState().SetChunkSize(msg.ChunkSize)
|
|
|
|
case *message.WinAckSize:
|
|
//l.Infof("Handle WinAckSize: Msg = %#v", msg)
|
|
return h.handleWinAckSize(chunkStreamID, timestamp, msg)
|
|
|
|
default:
|
|
err := h.handler.onMessage(chunkStreamID, timestamp, msg)
|
|
if err == internal.ErrPassThroughMsg {
|
|
return h.stream.userHandler().OnUnknownMessage(timestamp, msg)
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (h *streamHandler) ChangeState(state streamState) {
|
|
h.m.Lock()
|
|
defer h.m.Unlock()
|
|
|
|
//prevState := h.State()
|
|
|
|
switch state {
|
|
case streamStateUnknown:
|
|
return // DO NOTHING
|
|
case streamStateServerNotConnected:
|
|
h.handler = &serverControlNotConnectedHandler{sh: h}
|
|
case streamStateServerConnected:
|
|
h.handler = &serverControlConnectedHandler{sh: h}
|
|
case streamStateServerInactive:
|
|
h.handler = &serverDataInactiveHandler{sh: h}
|
|
case streamStateServerPublish:
|
|
h.handler = &serverDataPublishHandler{sh: h}
|
|
case streamStateServerPlay:
|
|
h.handler = &serverDataPlayHandler{sh: h}
|
|
case streamStateClientNotConnected:
|
|
h.handler = &clientControlNotConnectedHandler{sh: h}
|
|
case streamStateClientConnected:
|
|
h.handler = &clientControlConnectedHandler{sh: h}
|
|
case streamStateClientPlay:
|
|
h.handler = &clientDataPlayHandler{sh: h}
|
|
default:
|
|
panic("Unexpected")
|
|
}
|
|
h.state = state
|
|
|
|
//l := h.Logger()
|
|
//l.Infof("Change state: From = %s, To = %s", prevState, h.State())
|
|
}
|
|
|
|
func (h *streamHandler) State() streamState {
|
|
return h.state
|
|
}
|
|
|
|
func (h *streamHandler) Logger() *logrus.Entry {
|
|
if h.loggerEntry == nil {
|
|
h.loggerEntry = h.stream.logger().WithFields(logrus.Fields{
|
|
"stream_id": h.stream.streamID,
|
|
})
|
|
}
|
|
|
|
h.loggerEntry.Data["state"] = h.State()
|
|
|
|
return h.loggerEntry
|
|
}
|
|
|
|
func (h *streamHandler) handleData(
|
|
chunkStreamID int,
|
|
timestamp uint32,
|
|
dataMsg *message.DataMessage,
|
|
) error {
|
|
bodyDecoder := message.DataBodyDecoderFor(dataMsg.Name)
|
|
|
|
amfDec := message.NewAMFDecoder(dataMsg.Body, dataMsg.Encoding)
|
|
var value message.AMFConvertible
|
|
if err := bodyDecoder(dataMsg.Body, amfDec, &value); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := h.handler.onData(chunkStreamID, timestamp, dataMsg, value)
|
|
if err == internal.ErrPassThroughMsg {
|
|
return h.stream.userHandler().OnUnknownDataMessage(chunkStreamID, timestamp, dataMsg, value)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (h *streamHandler) handleCommand(
|
|
chunkStreamID int,
|
|
timestamp uint32,
|
|
cmdMsg *message.CommandMessage,
|
|
) error {
|
|
switch cmdMsg.CommandName {
|
|
case "_result", "_error":
|
|
t, err := h.stream.transactions.At(cmdMsg.TransactionID)
|
|
if err == nil {
|
|
// Set result (NOTE: should use a mutex for it?)
|
|
t.Reply(cmdMsg.CommandName, cmdMsg.Encoding, cmdMsg.Body)
|
|
|
|
// Remove transacaction because this transaction is resolved
|
|
if err := h.stream.transactions.Delete(cmdMsg.TransactionID); err != nil {
|
|
return errors.Wrap(err, "Unexpected behavior: transaction is not found")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TODO: Support onStatus
|
|
}
|
|
|
|
amfDec := message.NewAMFDecoder(cmdMsg.Body, cmdMsg.Encoding)
|
|
bodyDecoder := message.CmdBodyDecoderFor(cmdMsg.CommandName, cmdMsg.TransactionID)
|
|
|
|
var value message.AMFConvertible
|
|
if err := bodyDecoder(cmdMsg.Body, amfDec, &value); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := h.handler.onCommand(chunkStreamID, timestamp, cmdMsg, value)
|
|
if err == internal.ErrPassThroughMsg {
|
|
return h.stream.userHandler().OnUnknownCommandMessage(chunkStreamID, timestamp, cmdMsg, value)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (h *streamHandler) handleWinAckSize(
|
|
chunkStreamID int,
|
|
timestamp uint32,
|
|
ackMsg *message.WinAckSize,
|
|
) error {
|
|
err := h.stream.streamer().PeerState().SetAckWindowSize(ackMsg.Size)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return h.handler.onWinAckSize(chunkStreamID, timestamp, ackMsg)
|
|
}
|