Last active
October 27, 2021 03:12
-
-
Save krsna1729/497fe4c816ed733c043bcecb37c07089 to your computer and use it in GitHub Desktop.
example organization of pfcp layer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module krsna1729/udp-test | |
go 1.17 | |
require ( | |
github.com/libp2p/go-reuseport v0.1.0 | |
go.uber.org/goleak v1.1.12 | |
) | |
require golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 // indirect |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"log" | |
"net" | |
"os" | |
"os/signal" | |
"time" | |
"go.uber.org/goleak" | |
) | |
func init() { | |
log.SetFlags(log.LstdFlags | log.Lshortfile) | |
} | |
func main() { | |
ctx, cancel := context.WithCancel(context.Background()) | |
for i := 0; i < 100; i++ { | |
go peer(ctx) | |
} | |
node := NewPFCPNode(ctx, "localhost:9000") | |
go node.Serve() | |
sig := make(chan os.Signal, 1) | |
signal.Notify(sig, os.Interrupt) | |
<-sig | |
cancel() | |
// Or yield to canceled goroutines with runtime.Gosched() allowing cleanup. | |
time.Sleep(1 * time.Second) | |
err := goleak.Find() | |
if err != nil { | |
log.Println("Found leaked goroutines", err) | |
} | |
} | |
func peer(ctx context.Context) { | |
time.Sleep(1 * time.Second) | |
b := []byte("Hello World!") | |
rAddr, err := net.ResolveUDPAddr("udp", "localhost:9000") | |
if err != nil { | |
log.Fatalln(err) | |
} | |
rConn, err := net.DialUDP("udp", nil, rAddr) | |
if err != nil { | |
log.Fatalln("peer: dial socket failed", err) | |
} | |
go func() { | |
for { | |
rConn.SetWriteDeadline(time.Now().Add(20 * time.Second)) | |
n, err := rConn.Write(b) | |
if err != nil { | |
if errors.Is(err, net.ErrClosed) { | |
return | |
} | |
log.Println("peer: unable to write", err) | |
continue | |
} | |
log.Println("peer: sent bytes", n) | |
rConn.SetReadDeadline(time.Now().Add(20 * time.Second)) | |
n, err = rConn.Read(b) | |
if err != nil { | |
if errors.Is(err, net.ErrClosed) { | |
return | |
} | |
log.Println("peer: unable to read", err) | |
continue | |
} | |
log.Println("peer: received bytes", n) | |
} | |
}() | |
<-ctx.Done() | |
rConn.Close() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"log" | |
"net" | |
reuse "github.com/libp2p/go-reuseport" | |
) | |
// PFCPNode represents a PFCP endpoint of the UPF. | |
type PFCPNode struct { | |
ctx context.Context | |
// listening socket for new "PFCP connections" | |
net.PacketConn | |
// channel for PFCPConn to signal exit by sending their remote address | |
done chan string | |
// map of existing connections | |
pconns map[string]*PFCPConn | |
} | |
// NewPFCPNode create a new PFCPNode listening on local address. | |
func NewPFCPNode(ctx context.Context, lAddr string) *PFCPNode { | |
conn, err := reuse.ListenPacket("udp", lAddr) | |
if err != nil { | |
log.Fatalln("ListenUDP failed", err) | |
} | |
return &PFCPNode{ | |
ctx: ctx, | |
PacketConn: conn, | |
done: make(chan string, 100), | |
pconns: make(map[string]*PFCPConn), | |
} | |
} | |
// Serve listens for the first packet from a new PFCP peer and creates PFCPConn. | |
func (node *PFCPNode) Serve() { | |
log.Println("listening for new PFCP connections on", node.LocalAddr().String()) | |
go func() { | |
lAddrStr := node.LocalAddr().String() | |
for { | |
buf := make([]byte, 1024) | |
n, rAddr, err := node.ReadFrom(buf) | |
if err != nil { | |
if errors.Is(err, net.ErrClosed) { | |
return | |
} | |
continue | |
} | |
rAddrStr := rAddr.String() | |
_, ok := node.pconns[rAddrStr] | |
if ok { | |
log.Println("Received packet for existing PFCPconn from", rAddrStr) | |
continue | |
} | |
log.Println(lAddrStr, "received new connection from", rAddrStr) | |
p := NewPFCPConn(node.ctx, node.done, lAddrStr, rAddrStr) | |
node.pconns[rAddrStr] = p | |
p.HandlePFCPMsg(buf[:n]) | |
go p.Serve() | |
} | |
}() | |
go func(ctx context.Context) { | |
for { | |
select { | |
case rAddr := <-node.done: | |
delete(node.pconns, rAddr) | |
case <-ctx.Done(): | |
return | |
} | |
} | |
}(node.ctx) | |
<-node.ctx.Done() | |
node.Shutdown() | |
} | |
// Shutdown closes it's connection and issues shutdown to all PFCPConn. | |
func (node *PFCPNode) Shutdown() { | |
err := node.Close() | |
if err != nil { | |
log.Println("Error closing Conn", err) | |
} | |
log.Println("PFCPNode: Shutdown complete") | |
} | |
// PFCPConn represents a PFCP connection with a unique PFCP peer. | |
type PFCPConn struct { | |
ctx context.Context | |
// child socket for all subsequent packets from an "established PFCP connection" | |
net.Conn | |
// channel to signal PFCPNode on exit | |
done chan<- string | |
} | |
// NewPFCPConn creates a connected UDP socket to the rAddr PFCP peer specified. | |
func NewPFCPConn(ctx context.Context, done chan<- string, lAddr, rAddr string) *PFCPConn { | |
conn, err := reuse.Dial("udp", lAddr, rAddr) | |
if err != nil { | |
log.Fatalln("dial socket failed", err) | |
} | |
log.Println("Created PFCPConn for", conn.RemoteAddr().String()) | |
return &PFCPConn{ | |
ctx: ctx, | |
Conn: conn, | |
done: done, | |
} | |
} | |
// Serve serves forever a single PFCP peer. | |
func (pConn *PFCPConn) Serve() { | |
go func() { | |
for { | |
buf := make([]byte, 1024) | |
n, err := pConn.Read(buf) | |
if err != nil { | |
if errors.Is(err, net.ErrClosed) { | |
return | |
} | |
continue | |
} | |
pConn.HandlePFCPMsg(buf[:n]) | |
} | |
}() | |
<-pConn.ctx.Done() | |
pConn.Shutdown() | |
} | |
// Shutdown stops connection backing PFCPConn. | |
func (pConn *PFCPConn) Shutdown() error { | |
pConn.done <- pConn.LocalAddr().String() | |
err := pConn.Close() | |
if err != nil { | |
return err | |
} | |
log.Println("PFCPConn: Shutdown complete", pConn.RemoteAddr().String()) | |
return nil | |
} | |
// SetupAssociation initiates PFCP Association with peer in PFCPConn. | |
func (pConn *PFCPConn) SetupAssociation() { | |
log.Println("Sending Association Setup to", pConn.RemoteAddr().String()) | |
} | |
// HandlePFCPMsg handles different types of PFCP messages. | |
func (pConn *PFCPConn) HandlePFCPMsg(buf []byte) { | |
log.Println("Handling PFCP message from", pConn.RemoteAddr().String()) | |
pConn.Write(buf) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment