Skip to content

Instantly share code, notes, and snippets.

@krsna1729
Last active October 27, 2021 03:12
Show Gist options
  • Save krsna1729/497fe4c816ed733c043bcecb37c07089 to your computer and use it in GitHub Desktop.
Save krsna1729/497fe4c816ed733c043bcecb37c07089 to your computer and use it in GitHub Desktop.
example organization of pfcp layer
module krsna1729/udp-test
go 1.17
require (
github.com/libp2p/go-reuseport v0.1.0
go.uber.org/goleak v1.1.12
)
require golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 // indirect
package main
import (
"context"
"errors"
"log"
"net"
"os"
"os/signal"
"time"
"go.uber.org/goleak"
)
func init() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 100; i++ {
go peer(ctx)
}
node := NewPFCPNode(ctx, "localhost:9000")
go node.Serve()
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)
<-sig
cancel()
// Or yield to canceled goroutines with runtime.Gosched() allowing cleanup.
time.Sleep(1 * time.Second)
err := goleak.Find()
if err != nil {
log.Println("Found leaked goroutines", err)
}
}
func peer(ctx context.Context) {
time.Sleep(1 * time.Second)
b := []byte("Hello World!")
rAddr, err := net.ResolveUDPAddr("udp", "localhost:9000")
if err != nil {
log.Fatalln(err)
}
rConn, err := net.DialUDP("udp", nil, rAddr)
if err != nil {
log.Fatalln("peer: dial socket failed", err)
}
go func() {
for {
rConn.SetWriteDeadline(time.Now().Add(20 * time.Second))
n, err := rConn.Write(b)
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
log.Println("peer: unable to write", err)
continue
}
log.Println("peer: sent bytes", n)
rConn.SetReadDeadline(time.Now().Add(20 * time.Second))
n, err = rConn.Read(b)
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
log.Println("peer: unable to read", err)
continue
}
log.Println("peer: received bytes", n)
}
}()
<-ctx.Done()
rConn.Close()
}
package main
import (
"context"
"errors"
"log"
"net"
reuse "github.com/libp2p/go-reuseport"
)
// PFCPNode represents a PFCP endpoint of the UPF.
type PFCPNode struct {
ctx context.Context
// listening socket for new "PFCP connections"
net.PacketConn
// channel for PFCPConn to signal exit by sending their remote address
done chan string
// map of existing connections
pconns map[string]*PFCPConn
}
// NewPFCPNode create a new PFCPNode listening on local address.
func NewPFCPNode(ctx context.Context, lAddr string) *PFCPNode {
conn, err := reuse.ListenPacket("udp", lAddr)
if err != nil {
log.Fatalln("ListenUDP failed", err)
}
return &PFCPNode{
ctx: ctx,
PacketConn: conn,
done: make(chan string, 100),
pconns: make(map[string]*PFCPConn),
}
}
// Serve listens for the first packet from a new PFCP peer and creates PFCPConn.
func (node *PFCPNode) Serve() {
log.Println("listening for new PFCP connections on", node.LocalAddr().String())
go func() {
lAddrStr := node.LocalAddr().String()
for {
buf := make([]byte, 1024)
n, rAddr, err := node.ReadFrom(buf)
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
continue
}
rAddrStr := rAddr.String()
_, ok := node.pconns[rAddrStr]
if ok {
log.Println("Received packet for existing PFCPconn from", rAddrStr)
continue
}
log.Println(lAddrStr, "received new connection from", rAddrStr)
p := NewPFCPConn(node.ctx, node.done, lAddrStr, rAddrStr)
node.pconns[rAddrStr] = p
p.HandlePFCPMsg(buf[:n])
go p.Serve()
}
}()
go func(ctx context.Context) {
for {
select {
case rAddr := <-node.done:
delete(node.pconns, rAddr)
case <-ctx.Done():
return
}
}
}(node.ctx)
<-node.ctx.Done()
node.Shutdown()
}
// Shutdown closes it's connection and issues shutdown to all PFCPConn.
func (node *PFCPNode) Shutdown() {
err := node.Close()
if err != nil {
log.Println("Error closing Conn", err)
}
log.Println("PFCPNode: Shutdown complete")
}
// PFCPConn represents a PFCP connection with a unique PFCP peer.
type PFCPConn struct {
ctx context.Context
// child socket for all subsequent packets from an "established PFCP connection"
net.Conn
// channel to signal PFCPNode on exit
done chan<- string
}
// NewPFCPConn creates a connected UDP socket to the rAddr PFCP peer specified.
func NewPFCPConn(ctx context.Context, done chan<- string, lAddr, rAddr string) *PFCPConn {
conn, err := reuse.Dial("udp", lAddr, rAddr)
if err != nil {
log.Fatalln("dial socket failed", err)
}
log.Println("Created PFCPConn for", conn.RemoteAddr().String())
return &PFCPConn{
ctx: ctx,
Conn: conn,
done: done,
}
}
// Serve serves forever a single PFCP peer.
func (pConn *PFCPConn) Serve() {
go func() {
for {
buf := make([]byte, 1024)
n, err := pConn.Read(buf)
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
continue
}
pConn.HandlePFCPMsg(buf[:n])
}
}()
<-pConn.ctx.Done()
pConn.Shutdown()
}
// Shutdown stops connection backing PFCPConn.
func (pConn *PFCPConn) Shutdown() error {
pConn.done <- pConn.LocalAddr().String()
err := pConn.Close()
if err != nil {
return err
}
log.Println("PFCPConn: Shutdown complete", pConn.RemoteAddr().String())
return nil
}
// SetupAssociation initiates PFCP Association with peer in PFCPConn.
func (pConn *PFCPConn) SetupAssociation() {
log.Println("Sending Association Setup to", pConn.RemoteAddr().String())
}
// HandlePFCPMsg handles different types of PFCP messages.
func (pConn *PFCPConn) HandlePFCPMsg(buf []byte) {
log.Println("Handling PFCP message from", pConn.RemoteAddr().String())
pConn.Write(buf)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment