Created
May 23, 2020 14:24
-
-
Save josharian/e1b2a6aaef1940001d17e50712801caf to your computer and use it in GitHub Desktop.
play opus audio to any browser that connects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Demo of playing opus audio into a webrtc session on macOS. | |
// To install required packages/commands: | |
// brew install ffmpeg pkg-config opus opusfile | |
// Then run this server, and visit http://localhost:2021/ in a browser. | |
package main | |
import ( | |
"encoding/binary" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"math/rand" | |
"net/http" | |
"net/url" | |
"os" | |
"os/exec" | |
"sync" | |
"time" | |
"github.com/hraban/opus" | |
"github.com/pion/webrtc/v2" | |
"github.com/pion/webrtc/v2/pkg/media" | |
) | |
const port = "2021" | |
func main() { | |
rand.Seed(time.Now().UnixNano()) | |
http.HandleFunc("/", handleHome) | |
http.HandleFunc("/signal", handleSignal) | |
packets := generateOpusPackets() | |
go play(packets) | |
log.Printf("starting http server on :%s", port) | |
err := http.ListenAndServe(":"+port, nil) | |
check(err) | |
} | |
// ------------------- Basic webrtc audio SFU server -------------------- | |
func handleHome(w http.ResponseWriter, r *http.Request) { | |
io.WriteString(w, ` | |
<html> | |
<head> | |
<script src="https://code.jquery.com/jquery-3.5.1.min.js"></script> | |
<script> | |
let pc = new RTCPeerConnection({ | |
iceServers: [ | |
{ | |
"urls": "stun:stun.l.google.com:19302" | |
} | |
] | |
}) | |
var log = msg => { | |
console.log(msg) | |
} | |
navigator.mediaDevices.getUserMedia({ audio: true }) | |
.then(stream => { | |
stream.getTracks().forEach(track => pc.addTrack(track, stream)); | |
pc.createOffer().then(d => pc.setLocalDescription(d)).catch(log) | |
}).catch(log) | |
pc.oniceconnectionstatechange = e => log(pc.iceConnectionState) | |
pc.onicecandidate = event => { | |
if (event.candidate === null) { | |
let d = JSON.stringify(pc.localDescription); | |
$.post("/signal", { "data": d, "name": $('#name').attr('val') }, function (sd) { | |
try { | |
pc.setRemoteDescription(new RTCSessionDescription(sd)) | |
} catch (e) { | |
log(e) | |
} | |
}, "json"); | |
} | |
} | |
pc.ontrack = function (event) { | |
var el = document.createElement(event.track.kind) | |
el.srcObject = event.streams[0] | |
el.autoplay = true | |
el.controls = false | |
document.getElementById('audio').replaceWith(el) | |
} | |
</script> | |
</head> | |
<body> | |
<div id="audio"></div> | |
</body> | |
</html> | |
`) | |
} | |
var ( | |
tracksmu sync.Mutex | |
tracks []*webrtc.Track | |
) | |
func handleSignal(w http.ResponseWriter, r *http.Request) { | |
data := r.FormValue("data") | |
var offer webrtc.SessionDescription | |
err := json.Unmarshal([]byte(data), &offer) | |
check(err) | |
var mediaEngine webrtc.MediaEngine | |
err = mediaEngine.PopulateFromSDP(offer) | |
check(err) | |
// TODO: can I create one of these, globally, and use it everywhere, | |
// instead of re-creating it each time? | |
var opusOnly webrtc.MediaEngine | |
opusOnly.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)) | |
api := webrtc.NewAPI(webrtc.WithMediaEngine(opusOnly)) | |
// TODO: same question as with opusOnly media engine | |
config := webrtc.Configuration{ | |
ICEServers: []webrtc.ICEServer{ | |
{ | |
URLs: []string{"stun:stun.l.google.com:19302"}, | |
}, | |
}, | |
} | |
conn, err := api.NewPeerConnection(config) | |
check(err) | |
audio, err := conn.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion") | |
check(err) | |
conn.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) { | |
tracksmu.Lock() | |
tracks = append(tracks, audio) | |
tracksmu.Unlock() | |
buf := make([]byte, 1400) | |
for { | |
// TODO: Does this call to Read block forever on client disconnection? | |
// How does this loop ever complete? How do we avoid leaks? | |
n, err := track.Read(buf) | |
check(err) | |
// Make a copy of the list of outbound tracks, | |
// so we don't hold the mutex while writing data. | |
tracksmu.Lock() | |
out := make([]*webrtc.Track, len(tracks)) | |
copy(out, tracks) | |
tracksmu.Unlock() | |
for _, w := range out { | |
// Don't send audio back to ourselves. | |
if w == audio { | |
continue | |
} | |
_, err := w.Write(buf[:n]) | |
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet | |
if err != nil && !errors.Is(err, io.ErrClosedPipe) { | |
panic(err) | |
} | |
} | |
} | |
}) | |
_, err = conn.AddTrack(audio) | |
check(err) | |
err = conn.SetRemoteDescription(offer) | |
check(err) | |
conn.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { | |
log.Printf("server: connection state has changed: %v", state) | |
}) | |
answer, err := conn.CreateAnswer(nil) | |
check(err) | |
err = conn.SetLocalDescription(answer) | |
check(err) | |
enc := json.NewEncoder(w) | |
err = enc.Encode(answer) | |
check(err) | |
} | |
func check(err error) { | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
// ------------- Opus streamer, used to generate packets below --------------- | |
type Streamer struct { | |
r io.Reader | |
pcmbuf []byte // reusable buffer to read pcm data into | |
pcm []int16 // reusable buffer to write decoded pcm data into | |
packet []byte // reusable buffer to write opus packets into | |
packetLen int // how much of the packet contains data | |
enc *opus.Encoder | |
err error | |
} | |
// NewStreamer is like bufio.Scanner. | |
// Usage: | |
// | |
// s := NewStreamer(r) | |
// for s.Stream() { | |
// p := s.Packet() | |
// // use p, but don't store it, it will get overwritten on the next loop iteration | |
// } | |
// if s.Err() != nil { | |
// panic(err) | |
// } | |
// | |
// r should contain little-endian encoded stero int16 pcm data. | |
// Note that if the final pcm data doesn't fit nicely into | |
// the required frame size (20ms), it will be padded with trailing zeros. | |
func NewStreamer(r io.Reader) *Streamer { | |
const bufferSize = 1000 // choose any buffer size you like. 1k is plenty. | |
const samplerate = 48000 | |
const channels = 2 // 2=stereo, required for RTP | |
const frameMS = 20 // options: 2.5, 5, 10, 20, 40, 60 | |
const nsamples = frameMS * samplerate / 1000 * channels // number of pcm samples required to fill a frame | |
enc, err := opus.NewEncoder(samplerate, channels, opus.AppVoIP) | |
check(err) | |
err = enc.SetInBandFEC(true) | |
check(err) | |
return &Streamer{ | |
r: r, | |
pcmbuf: make([]byte, nsamples*2), // 2 bytes per sample | |
pcm: make([]int16, nsamples), | |
packet: make([]byte, bufferSize), | |
enc: enc, | |
} | |
} | |
// Stream reads from r, and generates a packet. | |
// It reports whether it succeeded. | |
func (s *Streamer) Stream() bool { | |
if s.err != nil { | |
return false | |
} | |
n, err := io.ReadFull(s.r, s.pcmbuf) | |
switch { | |
case err == nil: | |
case errors.Is(err, io.ErrUnexpectedEOF): | |
// Zero out the parts not read into. | |
tail := s.pcmbuf[n:] | |
for i := range tail { | |
tail[i] = 0 | |
} | |
default: // unhandled error | |
s.err = err | |
return false | |
} | |
// Decode pcmbuf into pcm. | |
// TODO: optimize by doing more than 2 bytes at a time | |
for i := range s.pcm { | |
s.pcm[i] = int16(binary.LittleEndian.Uint16(s.pcmbuf[i*2:])) | |
} | |
s.packetLen, s.err = s.enc.Encode(s.pcm, s.packet) | |
if s.err != nil { | |
return false | |
} | |
return true | |
} | |
func (s *Streamer) Packet() []byte { | |
return s.packet[:s.packetLen] | |
} | |
func (s *Streamer) Err() error { | |
if s.err == nil || errors.Is(s.err, io.EOF) { | |
return nil | |
} | |
return s.err | |
} | |
// ------------ Helpers to write audio into a webrtc connection ---------------- | |
// generateOpusPackets generates opus packets to play. | |
func generateOpusPackets() (packets [][]byte) { | |
// Generate some audio data. | |
aiff, err := ioutil.TempFile("", "*.aiff") | |
check(err) | |
defer os.Remove(aiff.Name()) | |
aiff.Close() | |
cmd := exec.Command("say", "i can eat glass, it doesn't hurt me", "-o", aiff.Name()) | |
out, err := cmd.CombinedOutput() | |
if err != nil { | |
fmt.Println(string(out)) | |
log.Fatal(err) | |
} | |
// Convert that audio to raw pcm. | |
pcmfile, err := ioutil.TempFile("", "*.pcm") | |
check(err) | |
defer os.Remove(pcmfile.Name()) | |
pcmfile.Close() | |
cmd = exec.Command("ffmpeg", | |
"-y", // overwrite output file without prompting | |
"-i", aiff.Name(), // input filename | |
"-f", "s16le", // write output format signed 16 bits little-endian | |
"-ac", "2", // 2 output audio channels | |
"-ar", "48000", // output audio sample rate 48000 | |
"-acodec", "pcm_s16le", // output pcm | |
pcmfile.Name(), // output file | |
) | |
out, err = cmd.CombinedOutput() | |
if err != nil { | |
fmt.Println(string(out)) | |
log.Fatal(err) | |
} | |
// Convert that raw pcm to opus packets. | |
in, err := os.Open(pcmfile.Name()) | |
check(err) | |
s := NewStreamer(in) | |
for s.Stream() { | |
packet := make([]byte, len(s.Packet())) | |
copy(packet, s.Packet()) | |
packets = append(packets, packet) | |
} | |
check(s.Err()) | |
return packets | |
} | |
func play(packets [][]byte) { | |
// Hack: Give the server a moment to start up. | |
time.Sleep(time.Second) | |
var opusOnly webrtc.MediaEngine | |
opusOnly.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)) | |
api := webrtc.NewAPI(webrtc.WithMediaEngine(opusOnly)) | |
config := webrtc.Configuration{ | |
ICEServers: []webrtc.ICEServer{ | |
{ | |
URLs: []string{"stun:stun.l.google.com:19302"}, | |
}, | |
}, | |
} | |
conn, err := api.NewPeerConnection(config) | |
if err != nil { | |
panic(err) | |
} | |
ready := make(chan bool) | |
done := make(chan bool) | |
conn.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { | |
log.Printf("player: connection state has changed %s\n", state) | |
switch state { | |
case webrtc.ICEConnectionStateConnected: | |
ready <- true | |
case webrtc.ICEConnectionStateClosed, | |
webrtc.ICEConnectionStateDisconnected, | |
webrtc.ICEConnectionStateFailed: | |
close(done) | |
} | |
}) | |
audioTrack, err := conn.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "i can eat glass") | |
check(err) | |
_, err = conn.AddTrack(audioTrack) | |
check(err) | |
offer, err := conn.CreateOffer(nil) | |
check(err) | |
err = conn.SetLocalDescription(offer) | |
check(err) | |
jd, err := json.Marshal(offer) | |
check(err) | |
resp, err := http.PostForm("http://localhost:"+port+"/signal", | |
url.Values{"data": {string(jd)}}) | |
check(err) | |
dec := json.NewDecoder(resp.Body) | |
var answer webrtc.SessionDescription | |
err = dec.Decode(&answer) | |
check(err) | |
resp.Body.Close() | |
err = conn.SetRemoteDescription(answer) | |
check(err) | |
<-ready | |
Loop: | |
for { | |
for _, data := range packets { | |
sample := media.Sample{ | |
Data: data, | |
Samples: media.NSamples(20*time.Millisecond, 48000), | |
} | |
audioTrack.WriteSample(sample) | |
time.Sleep(20 * time.Millisecond) | |
} | |
select { | |
case <-done: | |
break Loop | |
default: | |
} | |
} | |
conn.Close() | |
<-done | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment