Last active
September 11, 2020 15:13
-
-
Save jvanveen/26026e78010889a26bc85c01e61b0294 to your computer and use it in GitHub Desktop.
ion-sfu gprc/json-rpc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package cmd contains an entrypoint for running an ion-sfu instance. | |
package main | |
import ( | |
"context" | |
"encoding/json" | |
"errors" | |
"flag" | |
"fmt" | |
"io" | |
"net" | |
http "net/http" | |
"os" | |
"time" | |
"github.com/gorilla/websocket" | |
sfu "github.com/pion/ion-sfu/pkg" | |
"github.com/pion/ion-sfu/pkg/log" | |
"github.com/pion/webrtc/v3" | |
"github.com/sourcegraph/jsonrpc2" | |
websocketjsonrpc2 "github.com/sourcegraph/jsonrpc2/websocket" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/codes" | |
"google.golang.org/grpc/status" | |
"github.com/spf13/viper" | |
pb "github.com/pion/ion-sfu/cmd/server/grpc/proto" | |
) | |
type contextKey struct { | |
name string | |
} | |
var peerCtxKey = &contextKey{"peer"} | |
type grpcConfig struct { | |
Port string `mapstructure:"port"` | |
} | |
// Config defines parameters for configuring the sfu instance | |
type Config struct { | |
sfu.Config `mapstructure:",squash"` | |
GRPC grpcConfig `mapstructure:"grpc"` | |
} | |
type peerContext struct { | |
peer *sfu.WebRTCTransport | |
} | |
func forContext(ctx context.Context) *peerContext { | |
raw, _ := ctx.Value(peerCtxKey).(*peerContext) | |
return raw | |
} | |
var ( | |
conf = Config{} | |
file string | |
gRPCAddr string | |
jsonRPCAddr string | |
errNoPeer = errors.New("no peer exists") | |
) | |
type server struct { | |
pb.UnimplementedSFUServer | |
sfu *sfu.SFU | |
} | |
// Join message sent when initializing a peer connection | |
type Join struct { | |
Sid string `json:"sid"` | |
Offer webrtc.SessionDescription `json:"offer"` | |
} | |
// Negotiation message sent when renegotiating | |
type Negotiation struct { | |
Desc webrtc.SessionDescription `json:"desc"` | |
} | |
// Trickle message sent when renegotiating | |
type Trickle struct { | |
Candidate webrtc.ICECandidateInit `json:"candidate"` | |
} | |
func NewRPC() *server { | |
return &server{ | |
sfu: sfu.NewSFU(conf.Config), | |
} | |
} | |
const ( | |
portRangeLimit = 100 | |
) | |
func showHelp() { | |
fmt.Printf("Usage:%s {params}\n", os.Args[0]) | |
fmt.Println(" -c {config file}") | |
fmt.Println(" -a {grpc listen addr}") | |
fmt.Println(" -j {json-rpc listen addr}") | |
fmt.Println(" -h (show help info)") | |
} | |
func load() bool { | |
_, err := os.Stat(file) | |
if err != nil { | |
return false | |
} | |
viper.SetConfigFile(file) | |
viper.SetConfigType("toml") | |
err = viper.ReadInConfig() | |
if err != nil { | |
fmt.Printf("config file %s read failed. %v\n", file, err) | |
return false | |
} | |
err = viper.GetViper().Unmarshal(&conf) | |
fmt.Printf("log level: %s\n", conf.Log.Level) | |
if err != nil { | |
fmt.Printf("sfu config file %s loaded failed. %v\n", file, err) | |
return false | |
} | |
if len(conf.WebRTC.ICEPortRange) > 2 { | |
fmt.Printf("config file %s loaded failed. range port must be [min,max]\n", file) | |
return false | |
} | |
if len(conf.WebRTC.ICEPortRange) != 0 && conf.WebRTC.ICEPortRange[1]-conf.WebRTC.ICEPortRange[0] < portRangeLimit { | |
fmt.Printf("config file %s loaded failed. range port must be [min, max] and max - min >= %d\n", file, portRangeLimit) | |
return false | |
} | |
fmt.Printf("config file: %s\n", file) | |
return true | |
} | |
func parse() bool { | |
flag.StringVar(&file, "c", "config.toml", "config file") | |
flag.StringVar(&gRPCAddr, "g", ":50051", "grpc address to use") | |
flag.StringVar(&jsonRPCAddr, "j", ":7000", "json-rpc address to use") | |
help := flag.Bool("h", false, "help info") | |
flag.Parse() | |
if !load() { | |
return false | |
} | |
if *help { | |
showHelp() | |
return false | |
} | |
return true | |
} | |
// JSON-RPC SFU Handler | |
func (r *server) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) { | |
p := forContext(ctx) | |
switch req.Method { | |
case "join": | |
if p.peer != nil { | |
log.Errorf("connect: peer already exists for connection") | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", errors.New("peer already exists")), | |
}) | |
break | |
} | |
var join Join | |
err := json.Unmarshal(*req.Params, &join) | |
if err != nil { | |
log.Errorf("connect: error parsing offer: %v", err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
peer, err := r.sfu.NewWebRTCTransport(join.Sid, join.Offer) | |
if err != nil { | |
log.Errorf("connect: error creating peer: %v", err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
log.Infof("peer %s join session %s", peer.ID(), join.Sid) | |
err = peer.SetRemoteDescription(join.Offer) | |
if err != nil { | |
log.Errorf("Offer error: %v", err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
answer, err := peer.CreateAnswer() | |
if err != nil { | |
log.Errorf("Offer error: answer=%v err=%v", answer, err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
err = peer.SetLocalDescription(answer) | |
if err != nil { | |
log.Errorf("Offer error: answer=%v err=%v", answer, err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
// Notify user of trickle candidates | |
peer.OnICECandidate(func(c *webrtc.ICECandidate) { | |
log.Debugf("Sending ICE candidate") | |
if c == nil { | |
// Gathering done | |
return | |
} | |
if err := conn.Notify(ctx, "trickle", c.ToJSON()); err != nil { | |
log.Errorf("error sending trickle %s", err) | |
} | |
}) | |
peer.OnNegotiationNeeded(func() { | |
log.Debugf("on negotiation needed called") | |
offer, err := p.peer.CreateOffer() | |
if err != nil { | |
log.Errorf("CreateOffer error: %v", err) | |
return | |
} | |
err = p.peer.SetLocalDescription(offer) | |
if err != nil { | |
log.Errorf("SetLocalDescription error: %v", err) | |
return | |
} | |
if err := conn.Notify(ctx, "offer", offer); err != nil { | |
log.Errorf("error sending offer %s", err) | |
} | |
}) | |
p.peer = peer | |
_ = conn.Reply(ctx, req.ID, answer) | |
// Hack until renegotation is supported in pion. Force renegotation incase there are unmatched | |
// receviers (i.e. sfu has more than one sender). We just naively create server offer. It is | |
// noop if things are already matched. We can remove once https://github.com/pion/webrtc/pull/1322 | |
// is merged | |
time.Sleep(1000 * time.Millisecond) | |
log.Debugf("on negotiation needed called") | |
offer, err := p.peer.CreateOffer() | |
if err != nil { | |
log.Errorf("CreateOffer error: %v", err) | |
return | |
} | |
err = p.peer.SetLocalDescription(offer) | |
if err != nil { | |
log.Errorf("SetLocalDescription error: %v", err) | |
return | |
} | |
if err := conn.Notify(ctx, "offer", offer); err != nil { | |
log.Errorf("error sending offer %s", err) | |
} | |
case "offer": | |
if p.peer == nil { | |
log.Errorf("connect: no peer exists for connection") | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", errors.New("no peer exists")), | |
}) | |
break | |
} | |
log.Infof("peer %s offer", p.peer.ID()) | |
var negotiation Negotiation | |
err := json.Unmarshal(*req.Params, &negotiation) | |
if err != nil { | |
log.Errorf("connect: error parsing offer: %v", err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
// Peer exists, renegotiating existing peer | |
err = p.peer.SetRemoteDescription(negotiation.Desc) | |
if err != nil { | |
log.Errorf("Offer error: %v", err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
answer, err := p.peer.CreateAnswer() | |
if err != nil { | |
log.Errorf("Offer error: answer=%v err=%v", answer, err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
err = p.peer.SetLocalDescription(answer) | |
if err != nil { | |
log.Errorf("Offer error: answer=%v err=%v", answer, err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
_ = conn.Reply(ctx, req.ID, answer) | |
case "answer": | |
if p.peer == nil { | |
log.Errorf("connect: no peer exists for connection") | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", errors.New("no peer exists")), | |
}) | |
break | |
} | |
log.Infof("peer %s answer", p.peer.ID()) | |
var negotiation Negotiation | |
err := json.Unmarshal(*req.Params, &negotiation) | |
if err != nil { | |
log.Errorf("connect: error parsing answer: %v", err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
err = p.peer.SetRemoteDescription(negotiation.Desc) | |
if err != nil { | |
log.Errorf("error setting remote description %s", err) | |
} | |
case "trickle": | |
log.Debugf("trickle") | |
if p.peer == nil { | |
log.Errorf("connect: no peer exists for connection") | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", errors.New("no peer exists")), | |
}) | |
break | |
} | |
log.Infof("peer %s trickle", p.peer.ID()) | |
var trickle Trickle | |
err := json.Unmarshal(*req.Params, &trickle) | |
if err != nil { | |
log.Errorf("connect: error parsing candidate: %v", err) | |
_ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{ | |
Code: 500, | |
Message: fmt.Sprintf("%s", err), | |
}) | |
break | |
} | |
err = p.peer.AddICECandidate(trickle.Candidate) | |
if err != nil { | |
log.Errorf("error setting ice candidate %s", err) | |
} | |
} | |
} | |
// Publish a stream to the sfu. Publish creates a bidirectional | |
// streaming rpc connection between the client and sfu. | |
// | |
// The sfu will respond with a message containing the stream pid | |
// and one of two different payload types: | |
// 1. `Connect` containing the session answer description. This | |
// message is *always* returned first. | |
// 2. `Trickle` containg candidate information for Trickle ICE. | |
// | |
// If the webrtc connection is closed, the server will close this stream. | |
// | |
// The client should send a message containg the session id | |
// and one of two different payload types: | |
// 1. `Connect` containing the session offer description. This | |
// message must *always* be sent first. | |
// 2. `Trickle` containing candidate information for Trickle ICE. | |
// | |
// If the client closes this stream, the webrtc stream will be closed. | |
// GRPC SFU Handler | |
func (s *server) Signal(stream pb.SFU_SignalServer) error { | |
var pid string | |
var peer *sfu.WebRTCTransport | |
for { | |
in, err := stream.Recv() | |
if err != nil { | |
if peer != nil { | |
peer.Close() | |
} | |
if err == io.EOF { | |
return nil | |
} | |
errStatus, _ := status.FromError(err) | |
if errStatus.Code() == codes.Canceled { | |
return nil | |
} | |
log.Errorf("signal error %v %v", errStatus.Message(), errStatus.Code()) | |
return err | |
} | |
switch payload := in.Payload.(type) { | |
case *pb.SignalRequest_Join: | |
var answer webrtc.SessionDescription | |
log.Infof("signal->join called:\n%v", string(payload.Join.Offer.Sdp)) | |
if peer != nil { | |
// already joined | |
log.Errorf("peer already exists") | |
return status.Errorf(codes.FailedPrecondition, "peer already exists") | |
} | |
offer := webrtc.SessionDescription{ | |
Type: webrtc.SDPTypeOffer, | |
SDP: string(payload.Join.Offer.Sdp), | |
} | |
peer, err = s.sfu.NewWebRTCTransport(payload.Join.Sid, offer) | |
if err != nil { | |
log.Errorf("join error: %v", err) | |
return status.Errorf(codes.InvalidArgument, "join error %s", err) | |
} | |
log.Infof("peer %s join session %s", peer.ID(), payload.Join.Sid) | |
err = peer.SetRemoteDescription(offer) | |
if err != nil { | |
log.Errorf("join error: %v", err) | |
return status.Errorf(codes.Internal, "join error %s", err) | |
} | |
answer, err := peer.CreateAnswer() | |
if err != nil { | |
log.Errorf("join error: %v", err) | |
return status.Errorf(codes.Internal, "join error %s", err) | |
} | |
err = peer.SetLocalDescription(answer) | |
if err != nil { | |
log.Errorf("join error: %v", err) | |
return status.Errorf(codes.Internal, "join error %s", err) | |
} | |
// Notify user of trickle candidates | |
peer.OnICECandidate(func(c *webrtc.ICECandidate) { | |
if c == nil { | |
// Gathering done | |
return | |
} | |
bytes, err := json.Marshal(c.ToJSON()) | |
if err != nil { | |
log.Errorf("OnIceCandidate error %s", err) | |
} | |
err = stream.Send(&pb.SignalReply{ | |
Payload: &pb.SignalReply_Trickle{ | |
Trickle: &pb.Trickle{ | |
Init: string(bytes), | |
}, | |
}, | |
}) | |
if err != nil { | |
log.Errorf("OnIceCandidate error %s", err) | |
} | |
}) | |
peer.OnNegotiationNeeded(func() { | |
log.Debugf("on negotiation needed called") | |
offer, err := peer.CreateOffer() | |
if err != nil { | |
log.Errorf("CreateOffer error: %v", err) | |
return | |
} | |
err = peer.SetLocalDescription(offer) | |
if err != nil { | |
log.Errorf("SetLocalDescription error: %v", err) | |
return | |
} | |
err = stream.Send(&pb.SignalReply{ | |
Payload: &pb.SignalReply_Negotiate{ | |
Negotiate: &pb.SessionDescription{ | |
Type: offer.Type.String(), | |
Sdp: []byte(offer.SDP), | |
}, | |
}, | |
}) | |
if err != nil { | |
log.Errorf("negotiation error %s", err) | |
} | |
}) | |
err = stream.Send(&pb.SignalReply{ | |
Payload: &pb.SignalReply_Join{ | |
Join: &pb.JoinReply{ | |
Pid: pid, | |
Answer: &pb.SessionDescription{ | |
Type: answer.Type.String(), | |
Sdp: []byte(answer.SDP), | |
}, | |
}, | |
}, | |
}) | |
if err != nil { | |
log.Errorf("error sending join response %s", err) | |
return status.Errorf(codes.Internal, "join error %s", err) | |
} | |
// Hack until renegotation is supported in pion. Force renegotation incase there are unmatched | |
// receviers (i.e. sfu has more than one sender). We just naively create server offer. It is | |
// noop if things are already matched. We can remove once https://github.com/pion/webrtc/pull/1322 | |
// is merged | |
time.Sleep(1000 * time.Millisecond) | |
offer, err = peer.CreateOffer() | |
if err != nil { | |
log.Errorf("CreateOffer error: %v", err) | |
return status.Errorf(codes.Internal, "join error %s", err) | |
} | |
err = peer.SetLocalDescription(offer) | |
if err != nil { | |
log.Errorf("SetLocalDescription error: %v", err) | |
return status.Errorf(codes.Internal, "join error %s", err) | |
} | |
err = stream.Send(&pb.SignalReply{ | |
Payload: &pb.SignalReply_Negotiate{ | |
Negotiate: &pb.SessionDescription{ | |
Type: offer.Type.String(), | |
Sdp: []byte(offer.SDP), | |
}, | |
}, | |
}) | |
if err != nil { | |
return status.Errorf(codes.Internal, "%s", err) | |
} | |
case *pb.SignalRequest_Negotiate: | |
if peer == nil { | |
return status.Errorf(codes.FailedPrecondition, "%s", errNoPeer) | |
} | |
if payload.Negotiate.Type == webrtc.SDPTypeOffer.String() { | |
offer := webrtc.SessionDescription{ | |
Type: webrtc.SDPTypeOffer, | |
SDP: string(payload.Negotiate.Sdp), | |
} | |
// Peer exists, renegotiating existing peer | |
err = peer.SetRemoteDescription(offer) | |
if err != nil { | |
return status.Errorf(codes.Internal, "%s", err) | |
} | |
answer, err := peer.CreateAnswer() | |
if err != nil { | |
return status.Errorf(codes.Internal, "%s", err) | |
} | |
err = peer.SetLocalDescription(answer) | |
if err != nil { | |
return status.Errorf(codes.Internal, "%s", err) | |
} | |
err = stream.Send(&pb.SignalReply{ | |
Payload: &pb.SignalReply_Negotiate{ | |
Negotiate: &pb.SessionDescription{ | |
Type: answer.Type.String(), | |
Sdp: []byte(answer.SDP), | |
}, | |
}, | |
}) | |
if err != nil { | |
return status.Errorf(codes.Internal, "%s", err) | |
} | |
} else if payload.Negotiate.Type == webrtc.SDPTypeAnswer.String() { | |
err = peer.SetRemoteDescription(webrtc.SessionDescription{ | |
Type: webrtc.SDPTypeAnswer, | |
SDP: string(payload.Negotiate.Sdp), | |
}) | |
if err != nil { | |
return status.Errorf(codes.Internal, "%s", err) | |
} | |
} | |
case *pb.SignalRequest_Trickle: | |
if peer == nil { | |
return status.Errorf(codes.FailedPrecondition, "%s", errNoPeer) | |
} | |
var candidate webrtc.ICECandidateInit | |
err := json.Unmarshal([]byte(payload.Trickle.Init), &candidate) | |
if err != nil { | |
log.Errorf("error parsing ice candidate: %v", err) | |
} | |
if err := peer.AddICECandidate(candidate); err != nil { | |
return status.Errorf(codes.Internal, "error adding ice candidate") | |
} | |
} | |
} | |
} | |
func main() { | |
if !parse() { | |
showHelp() | |
os.Exit(-1) | |
} | |
log.Infof("--- Starting SFU Node ---") | |
rpc := NewRPC() | |
upgrader := websocket.Upgrader{ | |
CheckOrigin: func(r *http.Request) bool { | |
return true | |
}, | |
ReadBufferSize: 1024, | |
WriteBufferSize: 1024, | |
} | |
http.Handle("/ws", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
c, err := upgrader.Upgrade(w, r, nil) | |
if err != nil { | |
panic(err) | |
} | |
defer c.Close() | |
p := &peerContext{} | |
ctx := context.WithValue(r.Context(), peerCtxKey, p) | |
jc := jsonrpc2.NewConn(ctx, websocketjsonrpc2.NewObjectStream(c), rpc) | |
<-jc.DisconnectNotify() | |
if p.peer != nil { | |
log.Infof("Closing peer") | |
p.peer.Close() | |
} | |
})) | |
var err error | |
log.Infof("GPRC listening at http://[%s]", gRPCAddr) | |
log.Infof("JSON-RPC listening at http://[%s]", jsonRPCAddr) | |
if err != nil { | |
panic(err) | |
} | |
// GRPC Listener | |
go func() { | |
lis, err := net.Listen("tcp", gRPCAddr) | |
s := grpc.NewServer() | |
if err != nil { | |
log.Panicf("%s", err) | |
} | |
pb.RegisterSFUServer(s, rpc) | |
if err := s.Serve(lis); err != nil { | |
log.Panicf("failed to serve: %v", err) | |
} | |
}() | |
go func() { | |
// JSON-RPC Listener | |
err = http.ListenAndServe(jsonRPCAddr, nil) | |
}() | |
select {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment