Skip to content

Instantly share code, notes, and snippets.

@zonque
Last active September 11, 2024 07:17
Show Gist options
  • Save zonque/2eb70398d3a007b49f00c53856504d51 to your computer and use it in GitHub Desktop.
Save zonque/2eb70398d3a007b49f00c53856504d51 to your computer and use it in GitHub Desktop.
Multi mulitcast receiver in Go
package main
import (
"fmt"
"net"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
"golang.org/x/net/ipv4"
)
type receiver struct {
udpAddr *net.UDPAddr
netPacketConn net.PacketConn
ipv4PacketConn *ipv4.PacketConn
ifi *net.Interface
counter atomic.Uint64
}
func newReceiver(addr *net.UDPAddr, ifi *net.Interface) (*receiver, error) {
udpConn, err := net.ListenPacket("udp4", fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port))
if err != nil {
return nil, fmt.Errorf("failed to listen: %w", err)
}
pc := ipv4.NewPacketConn(udpConn)
if err := pc.SetControlMessage(ipv4.FlagDst, true); err != nil {
return nil, fmt.Errorf("failed to set control message: %w", err)
}
if err := pc.JoinGroup(ifi, addr); err != nil {
return nil, fmt.Errorf("failed to join group: %w", err)
}
r := &receiver{
udpAddr: addr,
netPacketConn: udpConn,
ipv4PacketConn: pc,
ifi: ifi,
}
return r, nil
}
func (r *receiver) String() string {
return fmt.Sprintf("addr %s, port %d, interface %s", r.udpAddr.IP, r.udpAddr.Port, r.ifi.Name)
}
func (r *receiver) run() {
for {
buf := make([]byte, r.ifi.MTU)
_, _, _, err := r.ipv4PacketConn.ReadFrom(buf)
if err != nil {
panic(err)
}
r.counter.Add(1)
}
}
func parseArg(s string) (*net.UDPAddr, *net.Interface, error) {
a := strings.Split(s, "@")
if len(a) != 2 {
return &net.UDPAddr{}, nil, fmt.Errorf("invalid argument %s", s)
}
addrString, portString, err := net.SplitHostPort(a[0])
if err != nil {
return &net.UDPAddr{}, nil, fmt.Errorf("failed to split host port: %w", err)
}
port, err := strconv.Atoi(portString)
if err != nil {
return &net.UDPAddr{}, nil, fmt.Errorf("failed to convert port to int: %w", err)
}
ifi, err := net.InterfaceByName(a[1])
if err != nil {
return &net.UDPAddr{}, nil, fmt.Errorf("failed to get interface by name: %w", err)
}
addr := &net.UDPAddr{
IP: net.ParseIP(addrString),
Port: port,
}
return addr, ifi, nil
}
func main() {
receivers := make([]*receiver, 0)
for _, arg := range os.Args[1:] {
addr, ifi, err := parseArg(arg)
if err != nil {
panic(err)
}
r, err := newReceiver(addr, ifi)
if err != nil {
panic(err)
}
receivers = append(receivers, r)
fmt.Printf("receiver: %s\n", r)
}
if len(receivers) == 0 {
fmt.Print("usage: %s <addr>:<port>@<interface> ...\n", os.Args[0])
return
}
for _, r := range receivers {
go r.run()
}
for {
time.Sleep(time.Second)
for _, r := range receivers {
fmt.Printf("%s: %d packets\n", r, r.counter.Swap(0))
}
fmt.Println("-----")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment