Skip to content

Instantly share code, notes, and snippets.

@illiafox
Created August 10, 2025 18:38
Show Gist options
  • Save illiafox/224f3fd1f6b4f290fa9cce501a42af72 to your computer and use it in GitHub Desktop.
Save illiafox/224f3fd1f6b4f290fa9cce501a42af72 to your computer and use it in GitHub Desktop.
Pure go rtmp -> hls streamer example
package main
import (
gohls "github.com/bluenviron/gohlslib/v2"
"io"
"net"
"net/http"
"github.com/rs/cors"
log "github.com/sirupsen/logrus"
"github.com/yutopp/go-rtmp"
)
var globalMuxer *gohls.Muxer
func serveHTTP() {
mux := http.NewServeMux()
mux.HandleFunc("/hls/", func(w http.ResponseWriter, r *http.Request) {
// allow quick testing
w.Header().Set("Access-Control-Allow-Origin", "*")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
m := globalMuxer
if m == nil {
http.Error(w, "muxer not ready", http.StatusServiceUnavailable)
return
}
m.Handle(w, r)
})
log.Printf("[HTTP] HLS on :8080 (GET /hls/stream.m3u8)")
_ = http.ListenAndServe(":8080", cors.Default().Handler(mux))
}
func main() {
go serveHTTP()
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1935")
if err != nil {
log.Panicf("Failed: %+v", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Panicf("Failed: %+v", err)
}
srv := rtmp.NewServer(&rtmp.ServerConfig{
OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) {
l := log.StandardLogger()
h := &Handler{}
return conn, &rtmp.ConnConfig{
Handler: h,
ControlState: rtmp.StreamControlStateConfig{
DefaultBandwidthWindowSize: 6 * 1024 * 1024 / 8,
},
Logger: l,
}
},
})
if err := srv.Serve(listener); err != nil {
log.Panicf("Failed: %+v", err)
}
}
package main
import (
"bytes"
"fmt"
mp4avc "github.com/Eyevinn/mp4ff/avc"
gohls "github.com/bluenviron/gohlslib/v2"
"github.com/bluenviron/gohlslib/v2/pkg/codecs"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio"
flvtag "github.com/yutopp/go-flv/tag"
"github.com/yutopp/go-rtmp"
rtmpmsg "github.com/yutopp/go-rtmp/message"
"io"
"log"
"time"
)
var _ rtmp.Handler = (*Handler)(nil)
// Handler An RTMP connection handler
func (h *Handler) OnServe(conn *rtmp.Conn) {
}
func (h *Handler) OnConnect(timestamp uint32, cmd *rtmpmsg.NetConnectionConnect) error {
log.Printf("OnConnect: %#v", cmd)
return nil
}
func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCreateStream) error {
log.Printf("OnCreateStream: %#v", cmd)
return nil
}
type Handler struct {
rtmp.DefaultHandler
muxer *gohls.Muxer
isRunning bool
videoTrack *gohls.Track
audioTrack *gohls.Track
// state from sequence headers
aacSR int // sample rate
}
func (h *Handler) OnPublish(_ *rtmp.StreamContext, _ uint32, _ *rtmpmsg.NetStreamPublish) error {
// don't start muxer yet; wait for seq headers
h.videoTrack = &gohls.Track{ClockRate: 90000}
h.audioTrack = &gohls.Track{} // set ClockRate after parsing AAC ASC
h.muxer = &gohls.Muxer{
Tracks: []*gohls.Track{h.videoTrack, h.audioTrack},
Variant: gohls.MuxerVariantMPEGTS,
SegmentCount: 6,
SegmentMinDuration: 2 * time.Second,
PartMinDuration: 200 * time.Millisecond,
}
h.aacSR = 0
return nil
}
func (h *Handler) startMuxerIfNeeded() {
// start once at least one track has Codec set
if h.muxer == nil {
return
}
if h.isRunning {
return
}
if h.videoTrack.Codec == nil || h.audioTrack.Codec == nil {
return
}
fmt.Println("starting")
if err := h.muxer.Start(); err != nil {
log.Printf("muxer start error: %v", err)
}
globalMuxer = h.muxer
h.isRunning = true
}
func (h *Handler) OnAudio(timestamp uint32, payload io.Reader) error {
var ad flvtag.AudioData
if err := flvtag.DecodeAudioData(payload, &ad); err != nil {
return err
}
// only AAC handled
if ad.SoundFormat != flvtag.SoundFormatAAC {
return nil
}
body := new(bytes.Buffer)
if _, err := io.Copy(body, ad.Data); err != nil {
return err
}
b := body.Bytes()
switch ad.AACPacketType {
case flvtag.AACPacketTypeSequenceHeader:
var asc mpeg4audio.AudioSpecificConfig
if err := asc.Unmarshal(b); err != nil {
log.Printf("bad AAC ASC: %v", err)
return nil
}
h.aacSR = asc.SampleRate
h.audioTrack.ClockRate = asc.SampleRate
h.audioTrack.Codec = &codecs.MPEG4Audio{Config: asc}
h.startMuxerIfNeeded()
case flvtag.AACPacketTypeRaw:
if h.audioTrack.Codec == nil || h.aacSR == 0 || !h.isRunning {
return nil
}
// pts in "samples" timebase
pts := int64(timestamp) * int64(h.aacSR) / 1000
_ = h.muxer.WriteMPEG4Audio(h.audioTrack, time.Now(), pts, [][]byte{b})
}
return nil
}
func (h *Handler) OnVideo(timestamp uint32, payload io.Reader) error {
var vd flvtag.VideoData
if err := flvtag.DecodeVideoData(payload, &vd); err != nil {
return err
}
if vd.CodecID != flvtag.CodecIDAVC {
return nil
}
body := new(bytes.Buffer)
if _, err := io.Copy(body, vd.Data); err != nil {
return err
}
b := body.Bytes()
switch vd.AVCPacketType {
case flvtag.AVCPacketTypeSequenceHeader:
dcr, err := mp4avc.DecodeAVCDecConfRec(b) // the avcC bytes
if err != nil {
return fmt.Errorf("decode AVC conf rec error: %v", err)
}
h.videoTrack.Codec = &codecs.H264{SPS: dcr.SPSnalus[0], PPS: dcr.PPSnalus[0]}
h.videoTrack.ClockRate = 90000
fmt.Println("detected pps: ")
h.startMuxerIfNeeded()
case flvtag.AVCPacketTypeNALU:
if h.videoTrack.Codec == nil || !h.isRunning {
return nil
}
var f = new(h264.AVCC)
if err := f.Unmarshal(b); err != nil { /* handle */
log.Printf("bad AVC NALU: %v", err)
return fmt.Errorf("AVC decoder error: %v", err)
}
// RTMP: timestamp is DTS (ms); CompositionTime is PTS-DTS (ms, signed)
dts90k := int64(timestamp) * 90000 / 1000
ct90k := int64(vd.CompositionTime) * 90000 / 1000
pts := dts90k + ct90k
_ = h.muxer.WriteH264(h.videoTrack, time.Now(), pts, *f)
}
return nil
}
func (h *Handler) OnClose() {
log.Printf("OnClose")
//if h.muxer != nil {
// h.muxer.Close()
//}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment