Skip to content

Instantly share code, notes, and snippets.

@davidroman0O
Created September 1, 2024 21:53
Show Gist options
  • Save davidroman0O/9903d7d73d85e3ae29478890690ea6e9 to your computer and use it in GitHub Desktop.
Save davidroman0O/9903d7d73d85e3ae29478890690ea6e9 to your computer and use it in GitHub Desktop.
Experiment on using etcd for a library i'm putting together
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/url"
"os"
"strings"
"sync"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
)
const (
actionListKey = "/actions/"
responseKey = "/responses/"
)
type EtcdMode string
const (
ModeStandalone EtcdMode = "standalone"
ModeEmbedded EtcdMode = "embedded"
ModeRemote EtcdMode = "remote"
)
type ClientConfig struct {
Mode EtcdMode
Endpoints []string
}
type Action struct {
ID string `json:"id"`
Type string `json:"type"`
NodeID string `json:"nodeId"`
Data string `json:"data"`
}
type ActionResponse struct {
ActionID string `json:"actionId"`
Success bool `json:"success"`
Message string `json:"message"`
}
type DAGClient struct {
etcdClient *clientv3.Client
etcdServer *embed.Etcd
mu sync.Mutex
pending map[string]chan ActionResponse
ctx context.Context
cancel context.CancelFunc
}
func NewDAGClient(cfg ClientConfig) (*DAGClient, error) {
ctx, cancel := context.WithCancel(context.Background())
client := &DAGClient{
pending: make(map[string]chan ActionResponse),
ctx: ctx,
cancel: cancel,
}
var err error
var endpoints []string
switch cfg.Mode {
case ModeStandalone, ModeEmbedded:
client.etcdServer, endpoints, err = startEmbeddedEtcd()
if err != nil {
return nil, fmt.Errorf("failed to start embedded etcd: %v", err)
}
case ModeRemote:
endpoints = cfg.Endpoints
default:
return nil, fmt.Errorf("invalid mode: %s", cfg.Mode)
}
client.etcdClient, err = connectWithRetry(endpoints, 30*time.Second)
if err != nil {
if client.etcdServer != nil {
client.etcdServer.Close()
}
return nil, fmt.Errorf("failed to create etcd client: %v", err)
}
go client.watchResponses()
return client, nil
}
func connectWithRetry(endpoints []string, timeout time.Duration) (*clientv3.Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
var client *clientv3.Client
var err error
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout while connecting to etcd: %v", err)
default:
client, err = clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err == nil {
return client, nil
}
log.Printf("Failed to connect to etcd, retrying: %v", err)
time.Sleep(1 * time.Second)
}
}
}
func startEmbeddedEtcd() (*embed.Etcd, []string, error) {
cfg := embed.NewConfig()
cfg.Dir = "client_etcd_data"
os.RemoveAll(cfg.Dir) // Clean up any existing data
// Use random available ports
clientPort, err := getAvailablePort()
if err != nil {
return nil, nil, fmt.Errorf("failed to get available client port: %v", err)
}
peerPort, err := getAvailablePort()
if err != nil {
return nil, nil, fmt.Errorf("failed to get available peer port: %v", err)
}
clientURL := fmt.Sprintf("http://localhost:%d", clientPort)
peerURL := fmt.Sprintf("http://localhost:%d", peerPort)
cfg.ListenClientUrls, _ = urlsFromStrings([]string{clientURL})
cfg.AdvertiseClientUrls = cfg.ListenClientUrls
cfg.ListenPeerUrls, _ = urlsFromStrings([]string{peerURL})
cfg.InitialCluster = fmt.Sprintf("default=%s", peerURL)
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, nil, err
}
select {
case <-e.Server.ReadyNotify():
log.Printf("Embedded etcd server is ready!")
case <-time.After(10 * time.Second):
e.Server.Stop() // trigger a shutdown
return nil, nil, fmt.Errorf("embedded etcd server took too long to start")
}
return e, []string{clientURL}, nil
}
func urlsFromStrings(strs []string) ([]url.URL, error) {
urls := make([]url.URL, len(strs))
for i, s := range strs {
u, err := url.Parse(s)
if err != nil {
return nil, err
}
urls[i] = *u
}
return urls, nil
}
func getAvailablePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}
func (c *DAGClient) Close() {
c.cancel()
if c.etcdServer != nil {
c.etcdServer.Close()
}
if c.etcdClient != nil {
c.etcdClient.Close()
}
}
func (c *DAGClient) SendAction(action Action) <-chan ActionResponse {
respChan := make(chan ActionResponse, 1)
c.mu.Lock()
c.pending[action.ID] = respChan
c.mu.Unlock()
go func() {
actionJSON, err := json.Marshal(action)
if err != nil {
c.mu.Lock()
delete(c.pending, action.ID)
c.mu.Unlock()
respChan <- ActionResponse{
ActionID: action.ID,
Success: false,
Message: fmt.Sprintf("Error marshaling action: %v", err),
}
close(respChan)
return
}
_, err = c.etcdClient.Put(c.ctx, actionListKey+action.ID, string(actionJSON))
if err != nil {
c.mu.Lock()
delete(c.pending, action.ID)
c.mu.Unlock()
respChan <- ActionResponse{
ActionID: action.ID,
Success: false,
Message: fmt.Sprintf("Error sending action to etcd: %v", err),
}
close(respChan)
return
}
}()
return respChan
}
func (c *DAGClient) watchResponses() {
watchChan := c.etcdClient.Watch(c.ctx, responseKey, clientv3.WithPrefix())
for watchResponse := range watchChan {
for _, event := range watchResponse.Events {
if event.Type == clientv3.EventTypePut {
var response ActionResponse
err := json.Unmarshal(event.Kv.Value, &response)
if err != nil {
log.Printf("Error unmarshaling response: %v", err)
continue
}
c.mu.Lock()
if respChan, ok := c.pending[response.ActionID]; ok {
respChan <- response
close(respChan)
delete(c.pending, response.ActionID)
}
c.mu.Unlock()
}
}
}
}
func main() {
mode := flag.String("mode", "standalone", "etcd mode: standalone, embedded, or remote")
endpoints := flag.String("endpoints", "", "comma-separated list of etcd endpoints (for remote mode)")
flag.Parse()
cfg := ClientConfig{
Mode: EtcdMode(*mode),
}
if *mode == "remote" {
cfg.Endpoints = strings.Split(*endpoints, ",")
}
client, err := NewDAGClient(cfg)
if err != nil {
log.Fatalf("Failed to create DAG client: %v", err)
}
defer client.Close()
// Example actions
actions := []Action{
{ID: "1", Type: "add", NodeID: "1", Data: "Node 1"},
{ID: "2", Type: "add", NodeID: "2", Data: "Node 2"},
{ID: "3", Type: "addChild", NodeID: "1", Data: "2"},
{ID: "4", Type: "update", NodeID: "2", Data: "Updated Node 2"},
{ID: "5", Type: "delete", NodeID: "2"},
}
// Send actions and wait for responses
for _, action := range actions {
respChan := client.SendAction(action)
select {
case response := <-respChan:
fmt.Printf("Received response for action %s: Success=%v, Message=%s\n",
response.ActionID, response.Success, response.Message)
case <-time.After(5 * time.Second):
fmt.Printf("Timeout waiting for response to action %s\n", action.ID)
}
}
// Print the final status
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.etcdClient.Get(ctx, "/dag")
if err != nil {
log.Printf("Failed to get DAG status: %v", err)
} else if len(resp.Kvs) > 0 {
fmt.Printf("Final DAG status: %s\n", resp.Kvs[0].Value)
} else {
fmt.Println("DAG is empty")
}
}
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
)
const (
actionListKey = "/actions/"
responseKey = "/responses/"
dagKey = "/dag"
)
type EtcdMode string
const (
ModeEmbedded EtcdMode = "embedded"
ModeRemote EtcdMode = "remote"
)
type ServerConfig struct {
Mode EtcdMode
Endpoints []string
}
type Action struct {
ID string `json:"id"`
Type string `json:"type"`
NodeID string `json:"nodeId"`
Data string `json:"data"`
}
type ActionResponse struct {
ActionID string `json:"actionId"`
Success bool `json:"success"`
Message string `json:"message"`
}
type DAGNode struct {
ID string `json:"id"`
Data string `json:"data"`
Children []string `json:"children"`
}
type DAG struct {
Nodes map[string]*DAGNode `json:"nodes"`
}
type Server struct {
etcdServer *embed.Etcd
etcdClient *clientv3.Client
dag DAG
}
func NewServer(cfg ServerConfig) (*Server, error) {
var err error
s := &Server{
dag: DAG{Nodes: make(map[string]*DAGNode)},
}
switch cfg.Mode {
case ModeEmbedded:
s.etcdServer, err = startEmbeddedEtcd()
if err != nil {
return nil, fmt.Errorf("failed to start embedded etcd: %v", err)
}
s.etcdClient, err = clientv3.New(clientv3.Config{
Endpoints: []string{s.etcdServer.Config().ListenClientUrls[0].String()},
DialTimeout: 5 * time.Second,
})
case ModeRemote:
s.etcdClient, err = clientv3.New(clientv3.Config{
Endpoints: cfg.Endpoints,
DialTimeout: 5 * time.Second,
})
default:
return nil, fmt.Errorf("invalid mode: %s", cfg.Mode)
}
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %v", err)
}
return s, nil
}
func (s *Server) Close() {
if s.etcdClient != nil {
s.etcdClient.Close()
}
if s.etcdServer != nil {
s.etcdServer.Close()
}
}
func (s *Server) Run() error {
s.saveDag()
go s.watchActions()
http.HandleFunc("/status", s.handleStatus)
go func() {
log.Fatal(http.ListenAndServe(":8080", nil))
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutting down server...")
return nil
}
func startEmbeddedEtcd() (*embed.Etcd, error) {
cfg := embed.NewConfig()
cfg.Dir = "default.etcd"
return embed.StartEtcd(cfg)
}
func (s *Server) watchActions() {
watchChan := s.etcdClient.Watch(context.Background(), actionListKey, clientv3.WithPrefix())
for watchResponse := range watchChan {
for _, event := range watchResponse.Events {
if event.Type == clientv3.EventTypePut {
var action Action
err := json.Unmarshal(event.Kv.Value, &action)
if err != nil {
log.Printf("Error unmarshaling action: %v", err)
continue
}
go s.handleAction(action)
}
}
}
}
func (s *Server) handleAction(action Action) {
response := ActionResponse{ActionID: action.ID}
err := s.processAction(action)
if err != nil {
response.Success = false
response.Message = err.Error()
} else {
response.Success = true
response.Message = "Action processed successfully"
}
s.sendResponse(response)
}
func (s *Server) processAction(action Action) error {
switch action.Type {
case "add":
if _, exists := s.dag.Nodes[action.NodeID]; exists {
return fmt.Errorf("node with ID %s already exists", action.NodeID)
}
s.dag.Nodes[action.NodeID] = &DAGNode{
ID: action.NodeID,
Data: action.Data,
Children: []string{},
}
case "update":
if node, ok := s.dag.Nodes[action.NodeID]; ok {
node.Data = action.Data
} else {
return fmt.Errorf("node with ID %s does not exist", action.NodeID)
}
case "delete":
if _, ok := s.dag.Nodes[action.NodeID]; ok {
delete(s.dag.Nodes, action.NodeID)
} else {
return fmt.Errorf("node with ID %s does not exist", action.NodeID)
}
case "addChild":
if node, ok := s.dag.Nodes[action.NodeID]; ok {
if _, childExists := s.dag.Nodes[action.Data]; !childExists {
return fmt.Errorf("child node with ID %s does not exist", action.Data)
}
node.Children = append(node.Children, action.Data)
} else {
return fmt.Errorf("parent node with ID %s does not exist", action.NodeID)
}
default:
return fmt.Errorf("unknown action type: %s", action.Type)
}
s.saveDag()
return nil
}
func (s *Server) sendResponse(response ActionResponse) {
responseJSON, err := json.Marshal(response)
if err != nil {
log.Printf("Error marshaling response: %v", err)
return
}
_, err = s.etcdClient.Put(context.Background(), responseKey+response.ActionID, string(responseJSON))
if err != nil {
log.Printf("Error sending response to etcd: %v", err)
}
}
func (s *Server) saveDag() {
dagJSON, err := json.Marshal(s.dag)
if err != nil {
log.Printf("Error marshaling DAG: %v", err)
return
}
_, err = s.etcdClient.Put(context.Background(), dagKey, string(dagJSON))
if err != nil {
log.Printf("Error saving DAG to etcd: %v", err)
}
}
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
dagJSON, err := json.Marshal(s.dag)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(dagJSON)
}
func main() {
mode := flag.String("mode", "embedded", "etcd mode: embedded or remote")
endpoints := flag.String("endpoints", "", "comma-separated list of etcd endpoints (for remote mode)")
flag.Parse()
cfg := ServerConfig{
Mode: EtcdMode(*mode),
}
if *mode == "remote" {
cfg.Endpoints = strings.Split(*endpoints, ",")
}
server, err := NewServer(cfg)
if err != nil {
log.Fatalf("Failed to create server: %v", err)
}
defer server.Close()
if err := server.Run(); err != nil {
log.Fatalf("Server error: %v", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment