Skip to content

Instantly share code, notes, and snippets.

@ssttevee
Created March 25, 2019 07:48
Show Gist options
  • Select an option

  • Save ssttevee/108533268688a93444db19ac6fe2d0b6 to your computer and use it in GitHub Desktop.

Select an option

Save ssttevee/108533268688a93444db19ac6fe2d0b6 to your computer and use it in GitHub Desktop.
tiny server that relays a video stream from a usb camera as a mjpeg stream on linux
package main
import (
"context"
"log"
"io"
"math/rand"
"net/http"
"os/exec"
"sync"
)
type Stream struct {
mu sync.Mutex
kill chan<- struct{}
streams map[int64]io.Writer
errors map[int64]error
cancels map[int64]func()
}
func (s *Stream) Write(b []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
for id, w := range s.streams {
if _, ok := s.errors[id]; ok {
continue
}
if _, err := w.Write(b); err != nil {
s.errors[id] = err
s.cancels[id]()
}
}
return len(b), nil
}
func (s *Stream) insert(w io.Writer) (int64, <-chan struct{}) {
s.mu.Lock()
defer s.mu.Unlock()
if s.streams == nil {
s.streams = make(map[int64]io.Writer)
}
if s.errors == nil {
s.errors = make(map[int64]error)
}
if s.cancels == nil {
s.cancels = make(map[int64]func())
}
var id int64
for {
id = rand.Int63()
if _, ok := s.streams[id]; !ok {
break
}
}
done := make(chan struct{})
s.streams[id] = w
s.cancels[id] = func() {
close(done)
}
return id, done
}
func (s *Stream) remove(id int64) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.streams, id)
delete(s.errors, id)
delete(s.cancels, id)
if len(s.streams) == 0 {
close(s.kill)
}
}
func (s *Stream) init() {
s.mu.Lock()
defer s.mu.Unlock()
if s.kill != nil {
return
}
log.Println("starting ffmpeg")
cmd := exec.Command("ffmpeg", "-f", "v4l2", "-i", "/dev/video0", "-an", "-f", "mjpeg", "-vsync", "2", "-")
cmd.Stdout = s
if err := cmd.Start(); err != nil {
panic(err)
}
done := make(chan struct{})
go func() {
defer log.Println("ffmpeg exited")
defer close(done)
if err := cmd.Wait(); err != nil {
log.Println(err)
}
s.mu.Lock()
defer s.mu.Unlock()
for _, cancel := range s.cancels {
cancel()
}
s.streams = nil
}()
kill := make(chan struct{})
s.kill = kill
go func() {
select {
case <-done:
case <-kill:
if err := cmd.Process.Kill(); err != nil {
log.Println("failed to kill process:", err)
}
}
s.mu.Lock()
defer s.mu.Unlock()
s.kill = nil
}()
}
func (s *Stream) start(ctx context.Context, w io.Writer) error {
s.init()
id, done := s.insert(w)
defer s.remove(id)
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
if err := s.errors[id]; err != nil {
return err
}
}
return nil
}
func (s *Stream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.start(r.Context(), w)
}
func main() {
http.ListenAndServe(":8080", &Stream{})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment