Files
2020-01-31 04:06:23 -05:00

299 lines
6.6 KiB
Go

//
// Copyright (c) 2018- yutopp (yutopp@gmail.com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)
//
package rtmp
import (
"bytes"
"context"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"time"
"github.com/encounter/go-rtmp/message"
)
// Stream represents a logical message stream
type Stream struct {
streamID uint32
encTy message.EncodingType
transactions *transactions
handler *streamHandler
cmsg ChunkMessage
conn *Conn
}
func newStream(streamID uint32, conn *Conn) *Stream {
s := &Stream{
streamID: streamID,
encTy: message.EncodingTypeAMF0, // Default AMF encoding type
transactions: newTransactions(),
cmsg: ChunkMessage{
StreamID: streamID,
},
conn: conn,
}
s.handler = newStreamHandler(s)
return s
}
func (s *Stream) WriteWinAckSize(chunkStreamID int, timestamp uint32, msg *message.WinAckSize) error {
return s.Write(chunkStreamID, timestamp, msg)
}
func (s *Stream) WriteSetPeerBandwidth(chunkStreamID int, timestamp uint32, msg *message.SetPeerBandwidth) error {
return s.Write(chunkStreamID, timestamp, msg)
}
func (s *Stream) WriteUserCtrl(chunkStreamID int, timestamp uint32, msg *message.UserCtrl) error {
return s.Write(chunkStreamID, timestamp, msg)
}
func (s *Stream) Connect(
body *message.NetConnectionConnect,
) (*message.NetConnectionConnectResult, error) {
transactionID := int64(1) // Always 1 (7.2.1.1)
t, err := s.transactions.Create(transactionID)
if err != nil {
return nil, err
}
if body == nil {
body = &message.NetConnectionConnect{}
}
chunkStreamID := 3 // TODO: fix
err = s.WriteCommandMessage(
chunkStreamID, 0, // Timestamp is 0
"connect",
transactionID,
body,
)
if err != nil {
return nil, err
}
// TODO: support timeout
timeoutCtx := context.TODO()
select {
case <-timeoutCtx.Done():
return nil, timeoutCtx.Err()
case <-t.doneCh:
amfDec := message.NewAMFDecoder(t.body, t.encoding)
var value message.AMFConvertible
if err := message.DecodeBodyConnectResult(t.body, amfDec, &value); err != nil {
return nil, errors.Wrap(err, "Failed to decode result")
}
result := value.(*message.NetConnectionConnectResult)
if t.commandName == "_error" {
return nil, &ConnectRejectedError{
TransactionID: transactionID,
Result: result,
}
}
s.handler.ChangeState(streamStateClientConnected)
return result, nil
}
//return nil, errors.New("Failed to get result")
}
func (s *Stream) ReplyConnect(
chunkStreamID int,
timestamp uint32,
body *message.NetConnectionConnectResult,
) error {
var commandName string
switch body.Information.Code {
case message.NetConnectionConnectCodeSuccess, message.NetConnectionConnectCodeClosed:
commandName = "_result"
case message.NetConnectionConnectCodeFailed:
commandName = "_error"
}
return s.WriteCommandMessage(
chunkStreamID, timestamp,
commandName,
1, // 7.2.1.2, flow.6
body,
)
}
func (s *Stream) CreateStream(
body *message.NetConnectionConnect,
) (*message.NetConnectionCreateStreamResult, error) {
transactionID := int64(2) // TODO: fix
t, err := s.transactions.Create(transactionID)
if err != nil {
return nil, err
}
if body == nil {
body = &message.NetConnectionConnect{}
}
chunkStreamID := 3 // TODO: fix
err = s.WriteCommandMessage(
chunkStreamID, 0, // TODO: fix, Timestamp is 0
"createStream",
transactionID,
body,
)
if err != nil {
return nil, err
}
// TODO: support timeout
// TODO: check result
timeoutCtx := context.TODO()
select {
case <-timeoutCtx.Done():
return nil, timeoutCtx.Err()
case <-t.doneCh:
amfDec := message.NewAMFDecoder(t.body, t.encoding)
var value message.AMFConvertible
if err := message.DecodeBodyCreateStreamResult(t.body, amfDec, &value); err != nil {
return nil, errors.Wrap(err, "Failed to decode result")
}
result := value.(*message.NetConnectionCreateStreamResult)
if t.commandName == "_error" {
return nil, &CreateStreamRejectedError{
TransactionID: transactionID,
Result: result,
}
}
return result, nil
}
//return nil, errors.New("Failed to get result")
}
func (s *Stream) ReplyCreateStream(
chunkStreamID int,
timestamp uint32,
transactionID int64,
body *message.NetConnectionCreateStreamResult,
) error {
commandName := "_result"
if body == nil {
commandName = "_error"
body = &message.NetConnectionCreateStreamResult{
StreamID: 0, // TODO: Change to error information object...
}
}
return s.WriteCommandMessage(
chunkStreamID, timestamp,
commandName,
transactionID,
body,
)
}
func (s *Stream) Publish(
body *message.NetStreamPublish,
) error {
if body == nil {
body = &message.NetStreamPublish{}
}
chunkStreamID := 3 // TODO: fix
return s.WriteCommandMessage(
chunkStreamID, 0, // TODO: fix, Timestamp is 0
"publish",
int64(0), // Always 0, 7.2.2.6
body,
)
}
func (s *Stream) NotifyStatus(
chunkStreamID int,
timestamp uint32,
body *message.NetStreamOnStatus,
) error {
return s.WriteCommandMessage(
chunkStreamID, timestamp,
"onStatus",
0, // 7.2.2
body,
)
}
func (s *Stream) Play(
chunkStreamID int,
timestamp uint32,
body *message.NetStreamClientPlayMessage,
) error {
s.handler.ChangeState(streamStateClientPlay)
return s.WriteCommandMessage(
chunkStreamID, timestamp,
"play",
0,
body,
)
}
func (s *Stream) Close() error {
return nil // TODO: implement
}
func (s *Stream) WriteCommandMessage(
chunkStreamID int,
timestamp uint32,
commandName string,
transactionID int64,
body message.AMFConvertible,
) error {
buf := new(bytes.Buffer)
amfEnc := message.NewAMFEncoder(buf, s.encTy)
if err := message.EncodeBodyAnyValues(amfEnc, body); err != nil {
return err
}
return s.Write(chunkStreamID, timestamp, &message.CommandMessage{
CommandName: commandName,
TransactionID: transactionID,
Encoding: s.encTy,
Body: buf,
})
}
func (s *Stream) Write(chunkStreamID int, timestamp uint32, msg message.Message) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // TODO: Fix 5s
defer cancel()
s.cmsg.Message = msg
return s.streamer().Write(ctx, chunkStreamID, timestamp, &s.cmsg)
}
func (s *Stream) handle(chunkStreamID int, timestamp uint32, msg message.Message) error {
return s.handler.Handle(chunkStreamID, timestamp, msg)
}
func (s *Stream) streams() *streams {
return s.conn.streams
}
func (s *Stream) streamer() *ChunkStreamer {
return s.conn.streamer
}
func (s *Stream) userHandler() Handler {
return s.conn.handler
}
func (s *Stream) logger() logrus.FieldLogger {
return s.conn.logger
}