Skip to content

Instantly share code, notes, and snippets.

@josharian
Created May 23, 2020 14:24
Show Gist options
  • Save josharian/e1b2a6aaef1940001d17e50712801caf to your computer and use it in GitHub Desktop.
Save josharian/e1b2a6aaef1940001d17e50712801caf to your computer and use it in GitHub Desktop.
play opus audio to any browser that connects
// Demo of playing opus audio into a webrtc session on macOS.
// To install required packages/commands:
// brew install ffmpeg pkg-config opus opusfile
// Then run this server, and visit http://localhost:2021/ in a browser.
package main
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"os/exec"
"sync"
"time"
"github.com/hraban/opus"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
)
const port = "2021"
func main() {
rand.Seed(time.Now().UnixNano())
http.HandleFunc("/", handleHome)
http.HandleFunc("/signal", handleSignal)
packets := generateOpusPackets()
go play(packets)
log.Printf("starting http server on :%s", port)
err := http.ListenAndServe(":"+port, nil)
check(err)
}
// ------------------- Basic webrtc audio SFU server --------------------
func handleHome(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, `
<html>
<head>
<script src="https://code.jquery.com/jquery-3.5.1.min.js"></script>
<script>
let pc = new RTCPeerConnection({
iceServers: [
{
"urls": "stun:stun.l.google.com:19302"
}
]
})
var log = msg => {
console.log(msg)
}
navigator.mediaDevices.getUserMedia({ audio: true })
.then(stream => {
stream.getTracks().forEach(track => pc.addTrack(track, stream));
pc.createOffer().then(d => pc.setLocalDescription(d)).catch(log)
}).catch(log)
pc.oniceconnectionstatechange = e => log(pc.iceConnectionState)
pc.onicecandidate = event => {
if (event.candidate === null) {
let d = JSON.stringify(pc.localDescription);
$.post("/signal", { "data": d, "name": $('#name').attr('val') }, function (sd) {
try {
pc.setRemoteDescription(new RTCSessionDescription(sd))
} catch (e) {
log(e)
}
}, "json");
}
}
pc.ontrack = function (event) {
var el = document.createElement(event.track.kind)
el.srcObject = event.streams[0]
el.autoplay = true
el.controls = false
document.getElementById('audio').replaceWith(el)
}
</script>
</head>
<body>
<div id="audio"></div>
</body>
</html>
`)
}
var (
tracksmu sync.Mutex
tracks []*webrtc.Track
)
func handleSignal(w http.ResponseWriter, r *http.Request) {
data := r.FormValue("data")
var offer webrtc.SessionDescription
err := json.Unmarshal([]byte(data), &offer)
check(err)
var mediaEngine webrtc.MediaEngine
err = mediaEngine.PopulateFromSDP(offer)
check(err)
// TODO: can I create one of these, globally, and use it everywhere,
// instead of re-creating it each time?
var opusOnly webrtc.MediaEngine
opusOnly.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000))
api := webrtc.NewAPI(webrtc.WithMediaEngine(opusOnly))
// TODO: same question as with opusOnly media engine
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
conn, err := api.NewPeerConnection(config)
check(err)
audio, err := conn.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "pion")
check(err)
conn.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
tracksmu.Lock()
tracks = append(tracks, audio)
tracksmu.Unlock()
buf := make([]byte, 1400)
for {
// TODO: Does this call to Read block forever on client disconnection?
// How does this loop ever complete? How do we avoid leaks?
n, err := track.Read(buf)
check(err)
// Make a copy of the list of outbound tracks,
// so we don't hold the mutex while writing data.
tracksmu.Lock()
out := make([]*webrtc.Track, len(tracks))
copy(out, tracks)
tracksmu.Unlock()
for _, w := range out {
// Don't send audio back to ourselves.
if w == audio {
continue
}
_, err := w.Write(buf[:n])
// ErrClosedPipe means we don't have any subscribers, this is ok if no peers have connected yet
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
panic(err)
}
}
}
})
_, err = conn.AddTrack(audio)
check(err)
err = conn.SetRemoteDescription(offer)
check(err)
conn.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
log.Printf("server: connection state has changed: %v", state)
})
answer, err := conn.CreateAnswer(nil)
check(err)
err = conn.SetLocalDescription(answer)
check(err)
enc := json.NewEncoder(w)
err = enc.Encode(answer)
check(err)
}
func check(err error) {
if err != nil {
log.Fatal(err)
}
}
// ------------- Opus streamer, used to generate packets below ---------------
type Streamer struct {
r io.Reader
pcmbuf []byte // reusable buffer to read pcm data into
pcm []int16 // reusable buffer to write decoded pcm data into
packet []byte // reusable buffer to write opus packets into
packetLen int // how much of the packet contains data
enc *opus.Encoder
err error
}
// NewStreamer is like bufio.Scanner.
// Usage:
//
// s := NewStreamer(r)
// for s.Stream() {
// p := s.Packet()
// // use p, but don't store it, it will get overwritten on the next loop iteration
// }
// if s.Err() != nil {
// panic(err)
// }
//
// r should contain little-endian encoded stero int16 pcm data.
// Note that if the final pcm data doesn't fit nicely into
// the required frame size (20ms), it will be padded with trailing zeros.
func NewStreamer(r io.Reader) *Streamer {
const bufferSize = 1000 // choose any buffer size you like. 1k is plenty.
const samplerate = 48000
const channels = 2 // 2=stereo, required for RTP
const frameMS = 20 // options: 2.5, 5, 10, 20, 40, 60
const nsamples = frameMS * samplerate / 1000 * channels // number of pcm samples required to fill a frame
enc, err := opus.NewEncoder(samplerate, channels, opus.AppVoIP)
check(err)
err = enc.SetInBandFEC(true)
check(err)
return &Streamer{
r: r,
pcmbuf: make([]byte, nsamples*2), // 2 bytes per sample
pcm: make([]int16, nsamples),
packet: make([]byte, bufferSize),
enc: enc,
}
}
// Stream reads from r, and generates a packet.
// It reports whether it succeeded.
func (s *Streamer) Stream() bool {
if s.err != nil {
return false
}
n, err := io.ReadFull(s.r, s.pcmbuf)
switch {
case err == nil:
case errors.Is(err, io.ErrUnexpectedEOF):
// Zero out the parts not read into.
tail := s.pcmbuf[n:]
for i := range tail {
tail[i] = 0
}
default: // unhandled error
s.err = err
return false
}
// Decode pcmbuf into pcm.
// TODO: optimize by doing more than 2 bytes at a time
for i := range s.pcm {
s.pcm[i] = int16(binary.LittleEndian.Uint16(s.pcmbuf[i*2:]))
}
s.packetLen, s.err = s.enc.Encode(s.pcm, s.packet)
if s.err != nil {
return false
}
return true
}
func (s *Streamer) Packet() []byte {
return s.packet[:s.packetLen]
}
func (s *Streamer) Err() error {
if s.err == nil || errors.Is(s.err, io.EOF) {
return nil
}
return s.err
}
// ------------ Helpers to write audio into a webrtc connection ----------------
// generateOpusPackets generates opus packets to play.
func generateOpusPackets() (packets [][]byte) {
// Generate some audio data.
aiff, err := ioutil.TempFile("", "*.aiff")
check(err)
defer os.Remove(aiff.Name())
aiff.Close()
cmd := exec.Command("say", "i can eat glass, it doesn't hurt me", "-o", aiff.Name())
out, err := cmd.CombinedOutput()
if err != nil {
fmt.Println(string(out))
log.Fatal(err)
}
// Convert that audio to raw pcm.
pcmfile, err := ioutil.TempFile("", "*.pcm")
check(err)
defer os.Remove(pcmfile.Name())
pcmfile.Close()
cmd = exec.Command("ffmpeg",
"-y", // overwrite output file without prompting
"-i", aiff.Name(), // input filename
"-f", "s16le", // write output format signed 16 bits little-endian
"-ac", "2", // 2 output audio channels
"-ar", "48000", // output audio sample rate 48000
"-acodec", "pcm_s16le", // output pcm
pcmfile.Name(), // output file
)
out, err = cmd.CombinedOutput()
if err != nil {
fmt.Println(string(out))
log.Fatal(err)
}
// Convert that raw pcm to opus packets.
in, err := os.Open(pcmfile.Name())
check(err)
s := NewStreamer(in)
for s.Stream() {
packet := make([]byte, len(s.Packet()))
copy(packet, s.Packet())
packets = append(packets, packet)
}
check(s.Err())
return packets
}
func play(packets [][]byte) {
// Hack: Give the server a moment to start up.
time.Sleep(time.Second)
var opusOnly webrtc.MediaEngine
opusOnly.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000))
api := webrtc.NewAPI(webrtc.WithMediaEngine(opusOnly))
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
conn, err := api.NewPeerConnection(config)
if err != nil {
panic(err)
}
ready := make(chan bool)
done := make(chan bool)
conn.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
log.Printf("player: connection state has changed %s\n", state)
switch state {
case webrtc.ICEConnectionStateConnected:
ready <- true
case webrtc.ICEConnectionStateClosed,
webrtc.ICEConnectionStateDisconnected,
webrtc.ICEConnectionStateFailed:
close(done)
}
})
audioTrack, err := conn.NewTrack(webrtc.DefaultPayloadTypeOpus, rand.Uint32(), "audio", "i can eat glass")
check(err)
_, err = conn.AddTrack(audioTrack)
check(err)
offer, err := conn.CreateOffer(nil)
check(err)
err = conn.SetLocalDescription(offer)
check(err)
jd, err := json.Marshal(offer)
check(err)
resp, err := http.PostForm("http://localhost:"+port+"/signal",
url.Values{"data": {string(jd)}})
check(err)
dec := json.NewDecoder(resp.Body)
var answer webrtc.SessionDescription
err = dec.Decode(&answer)
check(err)
resp.Body.Close()
err = conn.SetRemoteDescription(answer)
check(err)
<-ready
Loop:
for {
for _, data := range packets {
sample := media.Sample{
Data: data,
Samples: media.NSamples(20*time.Millisecond, 48000),
}
audioTrack.WriteSample(sample)
time.Sleep(20 * time.Millisecond)
}
select {
case <-done:
break Loop
default:
}
}
conn.Close()
<-done
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment