Created
June 1, 2021 04:52
-
-
Save mmitou/fbc95752316b3a75510d7b4e8fb9702e to your computer and use it in GitHub Desktop.
websocketを使うwebrtc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>signaling</title> | |
</head> | |
<body> | |
<div class="video"> | |
<video id="localVideo" width="320" height="240" style="border: 1px solid black;" autoplay></video> | |
<video id="remoteVideo" width="320" height="240" style="border: 1px solid black;" autoplay></video> | |
</div> | |
<div class="sdp"> | |
<button id="sdpStartButton">open</button> | |
</div> | |
<script src="main.js"></script> | |
</body> | |
</html> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"strings" | |
"time" | |
"github.com/go-redis/redis/v8" | |
"github.com/gorilla/websocket" | |
"github.com/labstack/echo/v4" | |
"github.com/rs/zerolog" | |
"github.com/rs/zerolog/log" | |
"github.com/rs/zerolog/pkgerrors" | |
) | |
var upgrader = websocket.Upgrader{ | |
ReadBufferSize: 1024, | |
WriteBufferSize: 1024, | |
} | |
type wsclient struct { | |
roomID string | |
id string | |
conn *websocket.Conn | |
} | |
func (c wsclient) sender(snd <-chan wsMessage, unregister chan<- wsclient) { | |
defer func() { | |
c.conn.Close() | |
unregister<- c | |
}() | |
for msg := range snd { | |
if err := c.conn.WriteMessage(msg.messageType, msg.payload); err != nil { | |
log.Debug().Err(err).Msg("c.conn.WriteMessage") | |
return | |
} | |
} | |
} | |
func (c wsclient) receiver(ctx context.Context, msg chan<- message, unregister chan<- wsclient) { | |
defer func() { | |
c.conn.Close() | |
unregister<- c | |
}() | |
for { | |
mt, p, err := c.conn.ReadMessage() | |
if err != nil { | |
log.Debug().Err(err).Msg("c.conn.ReadMessage") | |
return | |
} | |
msg <- message{roomID: c.roomID, clientID: c.id, wsMessage: wsMessage{messageType: mt, payload: p}} | |
} | |
} | |
type wsMessage struct { | |
messageType int | |
payload []byte | |
} | |
type message struct { | |
roomID string | |
clientID string | |
wsMessage | |
err error | |
} | |
type registrar chan<- wsclient | |
var i = 0 | |
func (r registrar) registerWebsocket(c echo.Context) error { | |
i++ | |
w := c.Response() | |
req := c.Request() | |
roomID := c.Param("id") | |
conn, err := upgrader.Upgrade(w, req, nil) | |
if err != nil { | |
return err | |
} | |
r <- wsclient{roomID: roomID, id: fmt.Sprintf("hello%d", i), conn: conn} | |
return nil | |
} | |
func runPubSub(ctx context.Context) registrar { | |
register := make(chan wsclient) | |
go func() { | |
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379", Password: "", DB: 0}) | |
sub := rdb.Subscribe(ctx, "sdp") | |
rooms := make(map[string]map[string]chan<- wsMessage) | |
unregister := make(chan wsclient) | |
msg := make(chan message) | |
for { | |
select { | |
case m := <-msg: | |
payload := append([]byte(m.roomID + "\n" + m.clientID + "\n"), m.payload ...) | |
if err := rdb.Publish(ctx, "sdp", payload).Err(); err != nil { | |
log.Debug().Err(err).Msg("rdb.Publish") | |
return | |
} | |
case client := <-unregister: | |
if _, ok := rooms[client.roomID]; !ok { | |
continue | |
} | |
if _, ok := rooms[client.roomID][client.id]; !ok { | |
continue | |
} | |
close(rooms[client.roomID][client.id]) | |
delete(rooms[client.roomID], client.id) | |
if len(rooms[client.roomID]) == 0 { | |
delete(rooms, client.roomID) | |
} | |
if err := rdb.Decr(ctx, client.roomID).Err(); err != nil { | |
log.Error().Err(err).Msg("rdb.Decr") | |
return | |
} | |
case client := <-register: | |
if err := rdb.Incr(ctx, client.roomID).Err(); err != nil { | |
log.Error().Err(err).Msg("rdb.Incr") | |
return | |
} | |
if _, ok := rooms[client.roomID]; !ok { | |
rooms[client.roomID] = make(map[string]chan<- wsMessage) | |
} | |
snd := make(chan wsMessage) | |
rooms[client.roomID][client.id] = snd | |
go client.sender(snd, unregister) | |
go client.receiver(ctx, msg, unregister) | |
case rmsg := <-sub.Channel(): | |
ss := strings.Split(rmsg.Payload, "\n") | |
if len(ss) != 4 { | |
log.Error().Str("payload", rmsg.Payload).Msg("parseHeader") | |
return | |
} | |
roomID, from, to := ss[0], ss[1], ss[2] | |
if to == "*" { | |
for id, snd := range rooms[roomID] { | |
if id != from { | |
snd <- wsMessage{messageType: websocket.TextMessage, payload: []byte(rmsg.Payload)} | |
} | |
} | |
continue | |
} | |
if snd, ok := rooms[roomID][to]; ok { | |
snd <- wsMessage{messageType: websocket.TextMessage, payload: []byte(rmsg.Payload)} | |
} | |
} | |
} | |
} () | |
return registrar(register) | |
} | |
func main() { | |
zerolog.TimeFieldFormat = time.RFC3339Nano | |
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack | |
zerolog.SetGlobalLevel(zerolog.DebugLevel) | |
e := echo.New() | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
r := runPubSub(ctx) | |
e.HTTPErrorHandler = func(err error, c echo.Context) { | |
log.Debug().Err(err).Msg(fmt.Sprintf("%+v", err)) | |
e.DefaultHTTPErrorHandler(err, c) | |
} | |
e.GET("/rooms/:id/ws", r.registerWebsocket) | |
e.Static("/", "./web") | |
e.Logger.Fatal(e.Start(":8080")) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
document | |
.getElementById("sdpStartButton") | |
.addEventListener("click", async (event) => { | |
event.target.disabled = true; | |
try { | |
const ws = new WebSocket("ws://localhost:8080/rooms/hello/ws"); | |
ws.addEventListener("open", () => console.log("open")); | |
ws.addEventListener("close", () => console.log("close")); | |
ws.addEventListener("error", (event) => { | |
console.log("wserror:", event); | |
}); | |
const stream = await navigator.mediaDevices.getUserMedia({ | |
audio: false, | |
video: true, | |
}); | |
const localVideo = document.getElementById("localVideo"); | |
localVideo.srcObject = stream; | |
const conn = new RTCPeerConnection({ | |
iceServers: [ | |
{ | |
urls: ["stun:stun.l.google.com:19302"], | |
}, | |
], | |
}); | |
conn.addEventListener("connectionstatechange", console.log); | |
const candidates = []; | |
conn.addEventListener("icecandidate", (event) => { | |
if (event.candidate) { | |
console.log("icecandidate", event.candidate); | |
candidates.push(event.candidate); | |
} | |
}); | |
conn.addEventListener("icecandidateerror", ({ errorCode, errorText }) => { | |
console.log("icecandidateerror", { errorCode, errorText }); | |
}); | |
conn.addEventListener("iceconnectionstatechange", (event) => { | |
console.log("iceconnectionstatechange", { | |
iceConnectionState: event.currentTarget.iceConnectionState, | |
iceGatheringState: event.currentTarget.iceGatheringState, | |
}); | |
}); | |
conn.addEventListener("icegatheringstatechange", (event) => { | |
console.log("icegatheringstatechange", { | |
iceConnectionState: event.currentTarget.iceConnectionState, | |
iceGatheringState: event.currentTarget.iceGatheringState, | |
}); | |
}); | |
conn.addEventListener("negotiationneeded", console.log); | |
conn.addEventListener("statsended", console.log); | |
conn.addEventListener("track", (event) => { | |
console.log("ontrack", "remote video start"); | |
const stream = event.streams[0]; | |
document.getElementById("remoteVideo").srcObject = stream; | |
}); | |
stream.getTracks().forEach((track) => { | |
conn.addTrack(track, stream); | |
}); | |
const offer = await conn.createOffer(); | |
await conn.setLocalDescription(offer); | |
console.log("offer", offer); | |
ws.addEventListener("message", async (event) => { | |
console.log("wsmessage", event); | |
if (typeof event.data !== "string") { | |
console.log("invalid data type"); | |
return; | |
} | |
const [roomId, from, to, payload] = event.data.split("\n"); | |
console.log("onmessage", roomId, from, to); | |
const obj = JSON.parse(payload); | |
console.log(obj); | |
if (obj.type === "offer") { | |
await conn.setRemoteDescription(obj); | |
const answer = await conn.createAnswer(); | |
await conn.setLocalDescription(answer); | |
console.log("recieved offer, send answer", answer); | |
ws.send(from + "\n" + JSON.stringify(answer)); | |
} else if (obj.type === "answer") { | |
await conn.setRemoteDescription(obj); | |
console.log("recieved answer and send icecandidates", candidates); | |
ws.send( | |
from + "\n" + JSON.stringify({ type: "icecandidate", candidates }) | |
); | |
} else if (obj.type === "icecandidate") { | |
console.log("recieved icecandidates", candidates); | |
for (const candidate of obj.candidates) { | |
await conn.addIceCandidate(candidate); | |
} | |
} | |
}); | |
ws.send("*\n" + JSON.stringify(offer)); | |
} catch (err) { | |
console.log(err); | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment