Created
September 1, 2024 21:53
-
-
Save davidroman0O/9903d7d73d85e3ae29478890690ea6e9 to your computer and use it in GitHub Desktop.
Experiment on using etcd for a library i'm putting together
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"log" | |
"net" | |
"net/url" | |
"os" | |
"strings" | |
"sync" | |
"time" | |
clientv3 "go.etcd.io/etcd/client/v3" | |
"go.etcd.io/etcd/server/v3/embed" | |
) | |
const ( | |
actionListKey = "/actions/" | |
responseKey = "/responses/" | |
) | |
type EtcdMode string | |
const ( | |
ModeStandalone EtcdMode = "standalone" | |
ModeEmbedded EtcdMode = "embedded" | |
ModeRemote EtcdMode = "remote" | |
) | |
type ClientConfig struct { | |
Mode EtcdMode | |
Endpoints []string | |
} | |
type Action struct { | |
ID string `json:"id"` | |
Type string `json:"type"` | |
NodeID string `json:"nodeId"` | |
Data string `json:"data"` | |
} | |
type ActionResponse struct { | |
ActionID string `json:"actionId"` | |
Success bool `json:"success"` | |
Message string `json:"message"` | |
} | |
type DAGClient struct { | |
etcdClient *clientv3.Client | |
etcdServer *embed.Etcd | |
mu sync.Mutex | |
pending map[string]chan ActionResponse | |
ctx context.Context | |
cancel context.CancelFunc | |
} | |
func NewDAGClient(cfg ClientConfig) (*DAGClient, error) { | |
ctx, cancel := context.WithCancel(context.Background()) | |
client := &DAGClient{ | |
pending: make(map[string]chan ActionResponse), | |
ctx: ctx, | |
cancel: cancel, | |
} | |
var err error | |
var endpoints []string | |
switch cfg.Mode { | |
case ModeStandalone, ModeEmbedded: | |
client.etcdServer, endpoints, err = startEmbeddedEtcd() | |
if err != nil { | |
return nil, fmt.Errorf("failed to start embedded etcd: %v", err) | |
} | |
case ModeRemote: | |
endpoints = cfg.Endpoints | |
default: | |
return nil, fmt.Errorf("invalid mode: %s", cfg.Mode) | |
} | |
client.etcdClient, err = connectWithRetry(endpoints, 30*time.Second) | |
if err != nil { | |
if client.etcdServer != nil { | |
client.etcdServer.Close() | |
} | |
return nil, fmt.Errorf("failed to create etcd client: %v", err) | |
} | |
go client.watchResponses() | |
return client, nil | |
} | |
func connectWithRetry(endpoints []string, timeout time.Duration) (*clientv3.Client, error) { | |
ctx, cancel := context.WithTimeout(context.Background(), timeout) | |
defer cancel() | |
var client *clientv3.Client | |
var err error | |
for { | |
select { | |
case <-ctx.Done(): | |
return nil, fmt.Errorf("timeout while connecting to etcd: %v", err) | |
default: | |
client, err = clientv3.New(clientv3.Config{ | |
Endpoints: endpoints, | |
DialTimeout: 5 * time.Second, | |
}) | |
if err == nil { | |
return client, nil | |
} | |
log.Printf("Failed to connect to etcd, retrying: %v", err) | |
time.Sleep(1 * time.Second) | |
} | |
} | |
} | |
func startEmbeddedEtcd() (*embed.Etcd, []string, error) { | |
cfg := embed.NewConfig() | |
cfg.Dir = "client_etcd_data" | |
os.RemoveAll(cfg.Dir) // Clean up any existing data | |
// Use random available ports | |
clientPort, err := getAvailablePort() | |
if err != nil { | |
return nil, nil, fmt.Errorf("failed to get available client port: %v", err) | |
} | |
peerPort, err := getAvailablePort() | |
if err != nil { | |
return nil, nil, fmt.Errorf("failed to get available peer port: %v", err) | |
} | |
clientURL := fmt.Sprintf("http://localhost:%d", clientPort) | |
peerURL := fmt.Sprintf("http://localhost:%d", peerPort) | |
cfg.ListenClientUrls, _ = urlsFromStrings([]string{clientURL}) | |
cfg.AdvertiseClientUrls = cfg.ListenClientUrls | |
cfg.ListenPeerUrls, _ = urlsFromStrings([]string{peerURL}) | |
cfg.InitialCluster = fmt.Sprintf("default=%s", peerURL) | |
e, err := embed.StartEtcd(cfg) | |
if err != nil { | |
return nil, nil, err | |
} | |
select { | |
case <-e.Server.ReadyNotify(): | |
log.Printf("Embedded etcd server is ready!") | |
case <-time.After(10 * time.Second): | |
e.Server.Stop() // trigger a shutdown | |
return nil, nil, fmt.Errorf("embedded etcd server took too long to start") | |
} | |
return e, []string{clientURL}, nil | |
} | |
func urlsFromStrings(strs []string) ([]url.URL, error) { | |
urls := make([]url.URL, len(strs)) | |
for i, s := range strs { | |
u, err := url.Parse(s) | |
if err != nil { | |
return nil, err | |
} | |
urls[i] = *u | |
} | |
return urls, nil | |
} | |
func getAvailablePort() (int, error) { | |
addr, err := net.ResolveTCPAddr("tcp", "localhost:0") | |
if err != nil { | |
return 0, err | |
} | |
l, err := net.ListenTCP("tcp", addr) | |
if err != nil { | |
return 0, err | |
} | |
defer l.Close() | |
return l.Addr().(*net.TCPAddr).Port, nil | |
} | |
func (c *DAGClient) Close() { | |
c.cancel() | |
if c.etcdServer != nil { | |
c.etcdServer.Close() | |
} | |
if c.etcdClient != nil { | |
c.etcdClient.Close() | |
} | |
} | |
func (c *DAGClient) SendAction(action Action) <-chan ActionResponse { | |
respChan := make(chan ActionResponse, 1) | |
c.mu.Lock() | |
c.pending[action.ID] = respChan | |
c.mu.Unlock() | |
go func() { | |
actionJSON, err := json.Marshal(action) | |
if err != nil { | |
c.mu.Lock() | |
delete(c.pending, action.ID) | |
c.mu.Unlock() | |
respChan <- ActionResponse{ | |
ActionID: action.ID, | |
Success: false, | |
Message: fmt.Sprintf("Error marshaling action: %v", err), | |
} | |
close(respChan) | |
return | |
} | |
_, err = c.etcdClient.Put(c.ctx, actionListKey+action.ID, string(actionJSON)) | |
if err != nil { | |
c.mu.Lock() | |
delete(c.pending, action.ID) | |
c.mu.Unlock() | |
respChan <- ActionResponse{ | |
ActionID: action.ID, | |
Success: false, | |
Message: fmt.Sprintf("Error sending action to etcd: %v", err), | |
} | |
close(respChan) | |
return | |
} | |
}() | |
return respChan | |
} | |
func (c *DAGClient) watchResponses() { | |
watchChan := c.etcdClient.Watch(c.ctx, responseKey, clientv3.WithPrefix()) | |
for watchResponse := range watchChan { | |
for _, event := range watchResponse.Events { | |
if event.Type == clientv3.EventTypePut { | |
var response ActionResponse | |
err := json.Unmarshal(event.Kv.Value, &response) | |
if err != nil { | |
log.Printf("Error unmarshaling response: %v", err) | |
continue | |
} | |
c.mu.Lock() | |
if respChan, ok := c.pending[response.ActionID]; ok { | |
respChan <- response | |
close(respChan) | |
delete(c.pending, response.ActionID) | |
} | |
c.mu.Unlock() | |
} | |
} | |
} | |
} | |
func main() { | |
mode := flag.String("mode", "standalone", "etcd mode: standalone, embedded, or remote") | |
endpoints := flag.String("endpoints", "", "comma-separated list of etcd endpoints (for remote mode)") | |
flag.Parse() | |
cfg := ClientConfig{ | |
Mode: EtcdMode(*mode), | |
} | |
if *mode == "remote" { | |
cfg.Endpoints = strings.Split(*endpoints, ",") | |
} | |
client, err := NewDAGClient(cfg) | |
if err != nil { | |
log.Fatalf("Failed to create DAG client: %v", err) | |
} | |
defer client.Close() | |
// Example actions | |
actions := []Action{ | |
{ID: "1", Type: "add", NodeID: "1", Data: "Node 1"}, | |
{ID: "2", Type: "add", NodeID: "2", Data: "Node 2"}, | |
{ID: "3", Type: "addChild", NodeID: "1", Data: "2"}, | |
{ID: "4", Type: "update", NodeID: "2", Data: "Updated Node 2"}, | |
{ID: "5", Type: "delete", NodeID: "2"}, | |
} | |
// Send actions and wait for responses | |
for _, action := range actions { | |
respChan := client.SendAction(action) | |
select { | |
case response := <-respChan: | |
fmt.Printf("Received response for action %s: Success=%v, Message=%s\n", | |
response.ActionID, response.Success, response.Message) | |
case <-time.After(5 * time.Second): | |
fmt.Printf("Timeout waiting for response to action %s\n", action.ID) | |
} | |
} | |
// Print the final status | |
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
defer cancel() | |
resp, err := client.etcdClient.Get(ctx, "/dag") | |
if err != nil { | |
log.Printf("Failed to get DAG status: %v", err) | |
} else if len(resp.Kvs) > 0 { | |
fmt.Printf("Final DAG status: %s\n", resp.Kvs[0].Value) | |
} else { | |
fmt.Println("DAG is empty") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"log" | |
"net/http" | |
"os" | |
"os/signal" | |
"strings" | |
"syscall" | |
"time" | |
clientv3 "go.etcd.io/etcd/client/v3" | |
"go.etcd.io/etcd/server/v3/embed" | |
) | |
const ( | |
actionListKey = "/actions/" | |
responseKey = "/responses/" | |
dagKey = "/dag" | |
) | |
type EtcdMode string | |
const ( | |
ModeEmbedded EtcdMode = "embedded" | |
ModeRemote EtcdMode = "remote" | |
) | |
type ServerConfig struct { | |
Mode EtcdMode | |
Endpoints []string | |
} | |
type Action struct { | |
ID string `json:"id"` | |
Type string `json:"type"` | |
NodeID string `json:"nodeId"` | |
Data string `json:"data"` | |
} | |
type ActionResponse struct { | |
ActionID string `json:"actionId"` | |
Success bool `json:"success"` | |
Message string `json:"message"` | |
} | |
type DAGNode struct { | |
ID string `json:"id"` | |
Data string `json:"data"` | |
Children []string `json:"children"` | |
} | |
type DAG struct { | |
Nodes map[string]*DAGNode `json:"nodes"` | |
} | |
type Server struct { | |
etcdServer *embed.Etcd | |
etcdClient *clientv3.Client | |
dag DAG | |
} | |
func NewServer(cfg ServerConfig) (*Server, error) { | |
var err error | |
s := &Server{ | |
dag: DAG{Nodes: make(map[string]*DAGNode)}, | |
} | |
switch cfg.Mode { | |
case ModeEmbedded: | |
s.etcdServer, err = startEmbeddedEtcd() | |
if err != nil { | |
return nil, fmt.Errorf("failed to start embedded etcd: %v", err) | |
} | |
s.etcdClient, err = clientv3.New(clientv3.Config{ | |
Endpoints: []string{s.etcdServer.Config().ListenClientUrls[0].String()}, | |
DialTimeout: 5 * time.Second, | |
}) | |
case ModeRemote: | |
s.etcdClient, err = clientv3.New(clientv3.Config{ | |
Endpoints: cfg.Endpoints, | |
DialTimeout: 5 * time.Second, | |
}) | |
default: | |
return nil, fmt.Errorf("invalid mode: %s", cfg.Mode) | |
} | |
if err != nil { | |
return nil, fmt.Errorf("failed to create etcd client: %v", err) | |
} | |
return s, nil | |
} | |
func (s *Server) Close() { | |
if s.etcdClient != nil { | |
s.etcdClient.Close() | |
} | |
if s.etcdServer != nil { | |
s.etcdServer.Close() | |
} | |
} | |
func (s *Server) Run() error { | |
s.saveDag() | |
go s.watchActions() | |
http.HandleFunc("/status", s.handleStatus) | |
go func() { | |
log.Fatal(http.ListenAndServe(":8080", nil)) | |
}() | |
quit := make(chan os.Signal, 1) | |
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) | |
<-quit | |
log.Println("Shutting down server...") | |
return nil | |
} | |
func startEmbeddedEtcd() (*embed.Etcd, error) { | |
cfg := embed.NewConfig() | |
cfg.Dir = "default.etcd" | |
return embed.StartEtcd(cfg) | |
} | |
func (s *Server) watchActions() { | |
watchChan := s.etcdClient.Watch(context.Background(), actionListKey, clientv3.WithPrefix()) | |
for watchResponse := range watchChan { | |
for _, event := range watchResponse.Events { | |
if event.Type == clientv3.EventTypePut { | |
var action Action | |
err := json.Unmarshal(event.Kv.Value, &action) | |
if err != nil { | |
log.Printf("Error unmarshaling action: %v", err) | |
continue | |
} | |
go s.handleAction(action) | |
} | |
} | |
} | |
} | |
func (s *Server) handleAction(action Action) { | |
response := ActionResponse{ActionID: action.ID} | |
err := s.processAction(action) | |
if err != nil { | |
response.Success = false | |
response.Message = err.Error() | |
} else { | |
response.Success = true | |
response.Message = "Action processed successfully" | |
} | |
s.sendResponse(response) | |
} | |
func (s *Server) processAction(action Action) error { | |
switch action.Type { | |
case "add": | |
if _, exists := s.dag.Nodes[action.NodeID]; exists { | |
return fmt.Errorf("node with ID %s already exists", action.NodeID) | |
} | |
s.dag.Nodes[action.NodeID] = &DAGNode{ | |
ID: action.NodeID, | |
Data: action.Data, | |
Children: []string{}, | |
} | |
case "update": | |
if node, ok := s.dag.Nodes[action.NodeID]; ok { | |
node.Data = action.Data | |
} else { | |
return fmt.Errorf("node with ID %s does not exist", action.NodeID) | |
} | |
case "delete": | |
if _, ok := s.dag.Nodes[action.NodeID]; ok { | |
delete(s.dag.Nodes, action.NodeID) | |
} else { | |
return fmt.Errorf("node with ID %s does not exist", action.NodeID) | |
} | |
case "addChild": | |
if node, ok := s.dag.Nodes[action.NodeID]; ok { | |
if _, childExists := s.dag.Nodes[action.Data]; !childExists { | |
return fmt.Errorf("child node with ID %s does not exist", action.Data) | |
} | |
node.Children = append(node.Children, action.Data) | |
} else { | |
return fmt.Errorf("parent node with ID %s does not exist", action.NodeID) | |
} | |
default: | |
return fmt.Errorf("unknown action type: %s", action.Type) | |
} | |
s.saveDag() | |
return nil | |
} | |
func (s *Server) sendResponse(response ActionResponse) { | |
responseJSON, err := json.Marshal(response) | |
if err != nil { | |
log.Printf("Error marshaling response: %v", err) | |
return | |
} | |
_, err = s.etcdClient.Put(context.Background(), responseKey+response.ActionID, string(responseJSON)) | |
if err != nil { | |
log.Printf("Error sending response to etcd: %v", err) | |
} | |
} | |
func (s *Server) saveDag() { | |
dagJSON, err := json.Marshal(s.dag) | |
if err != nil { | |
log.Printf("Error marshaling DAG: %v", err) | |
return | |
} | |
_, err = s.etcdClient.Put(context.Background(), dagKey, string(dagJSON)) | |
if err != nil { | |
log.Printf("Error saving DAG to etcd: %v", err) | |
} | |
} | |
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { | |
dagJSON, err := json.Marshal(s.dag) | |
if err != nil { | |
http.Error(w, err.Error(), http.StatusInternalServerError) | |
return | |
} | |
w.Header().Set("Content-Type", "application/json") | |
w.Write(dagJSON) | |
} | |
func main() { | |
mode := flag.String("mode", "embedded", "etcd mode: embedded or remote") | |
endpoints := flag.String("endpoints", "", "comma-separated list of etcd endpoints (for remote mode)") | |
flag.Parse() | |
cfg := ServerConfig{ | |
Mode: EtcdMode(*mode), | |
} | |
if *mode == "remote" { | |
cfg.Endpoints = strings.Split(*endpoints, ",") | |
} | |
server, err := NewServer(cfg) | |
if err != nil { | |
log.Fatalf("Failed to create server: %v", err) | |
} | |
defer server.Close() | |
if err := server.Run(); err != nil { | |
log.Fatalf("Server error: %v", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment