Last active
August 28, 2025 23:54
-
-
Save joshdurbin/f4c1681d799eb0d72b515da6f2bbbe71 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"log" | |
"math/rand" | |
"os" | |
"os/signal" | |
"sort" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/redis/go-redis/v9" | |
) | |
type Message struct { | |
Identifier string `json:"identifier"` | |
Cluster string `json:"cluster"` | |
Timestamp time.Time `json:"timestamp"` | |
IsReady bool `json:"is_ready"` | |
} | |
type Producer struct { | |
ID int | |
ClusterID int | |
Client *redis.Client | |
} | |
type Consumer struct { | |
ID int | |
Client *redis.Client | |
Views map[string][]Message // cluster -> messages | |
ViewTTL time.Duration | |
mu sync.RWMutex | |
} | |
const ( | |
REDIS_TOPIC = "status_updates" | |
) | |
var ( | |
outputProducer bool | |
outputConsumer bool | |
) | |
func main() { | |
var ( | |
numProducers = flag.Int("producers", 240, "Number of producers") | |
numConsumers = flag.Int("consumers", 300, "Number of consumers") | |
redisAddr = flag.String("redis", "localhost:6379", "Redis address") | |
viewTTL = flag.Duration("view-ttl", 15*time.Second, "View TTL duration") | |
producerInterval = flag.Duration("producer-interval", time.Second, "How often producers send messages") | |
cleanupInterval = flag.Duration("cleanup-interval", time.Second, "How often consumers clean up expired messages") | |
workEvalInterval = flag.Duration("work-eval-interval", 2*time.Second, "How often consumers evaluate work") | |
outputProducerFlag = flag.Bool("output-producer", false, "Enable producer activity output") | |
outputConsumerFlag = flag.Bool("output-consumer", false, "Enable consumer activity output") | |
) | |
flag.Parse() | |
outputProducer = *outputProducerFlag | |
outputConsumer = *outputConsumerFlag | |
// Setup graceful shutdown | |
ctx, cancel := context.WithCancel(context.Background()) | |
sigCh := make(chan os.Signal, 1) | |
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) | |
go func() { | |
<-sigCh | |
if outputProducer || outputConsumer { | |
log.Println("Shutting down...") | |
} | |
cancel() | |
}() | |
var wg sync.WaitGroup | |
// Start producers | |
for i := 1; i <= *numProducers; i++ { | |
clusterID := ((i - 1) / 3) + 1 | |
producer := &Producer{ | |
ID: i, | |
ClusterID: clusterID, | |
Client: redis.NewClient(&redis.Options{ | |
Addr: *redisAddr, | |
}), | |
} | |
// Test connection | |
if err := producer.Client.Ping(ctx).Err(); err != nil { | |
log.Fatalf("Producer-%d: Failed to connect to Redis: %v", i, err) | |
} | |
wg.Add(1) | |
go func(p *Producer, interval time.Duration) { | |
defer wg.Done() | |
defer p.Client.Close() | |
runProducer(ctx, p, interval) | |
}(producer, *producerInterval) | |
} | |
// Start consumers | |
for i := 1; i <= *numConsumers; i++ { | |
consumer := &Consumer{ | |
ID: i, | |
Client: redis.NewClient(&redis.Options{ | |
Addr: *redisAddr, | |
}), | |
Views: make(map[string][]Message), | |
ViewTTL: *viewTTL, | |
} | |
// Test connection | |
if err := consumer.Client.Ping(ctx).Err(); err != nil { | |
log.Fatalf("Consumer-%d: Failed to connect to Redis: %v", i, err) | |
} | |
wg.Add(1) | |
go func(c *Consumer, cleanupInt, workEvalInt time.Duration) { | |
defer wg.Done() | |
defer c.Client.Close() | |
runConsumer(ctx, c, cleanupInt, workEvalInt) | |
}(consumer, *cleanupInterval, *workEvalInterval) | |
} | |
wg.Wait() | |
} | |
func runProducer(ctx context.Context, p *Producer, interval time.Duration) { | |
ticker := time.NewTicker(interval) | |
defer ticker.Stop() | |
if outputProducer { | |
log.Printf("Started producer-%d in cluster-%d", p.ID, p.ClusterID) | |
} | |
for { | |
select { | |
case <-ctx.Done(): | |
if outputProducer { | |
log.Printf("Producer-%d shutting down", p.ID) | |
} | |
return | |
case <-ticker.C: | |
msg := Message{ | |
Identifier: fmt.Sprintf("producer-%d", p.ID), | |
Cluster: fmt.Sprintf("cluster-%d", p.ClusterID), | |
Timestamp: time.Now(), | |
IsReady: rand.Intn(2) == 1, // Random boolean | |
} | |
data, err := json.Marshal(msg) | |
if err != nil { | |
log.Printf("Producer-%d: Error marshaling message: %v", p.ID, err) | |
continue | |
} | |
// Publish message to Redis pub/sub | |
err = p.Client.Publish(ctx, REDIS_TOPIC, string(data)).Err() | |
if err != nil { | |
log.Printf("Producer-%d: Error publishing message: %v", p.ID, err) | |
continue | |
} | |
if outputProducer { | |
log.Printf("Producer-%d: Published message - Ready: %t", p.ID, msg.IsReady) | |
} | |
} | |
} | |
} | |
func runConsumer(ctx context.Context, c *Consumer, cleanupInterval, workEvalInterval time.Duration) { | |
if outputConsumer { | |
log.Printf("Started consumer-%d", c.ID) | |
} | |
// Subscribe to Redis pub/sub | |
pubsub := c.Client.Subscribe(ctx, REDIS_TOPIC) | |
defer pubsub.Close() | |
// Start cleanup routine for expired messages | |
go c.cleanupExpiredMessages(ctx, cleanupInterval) | |
// Start work evaluation routine | |
go c.evaluateWork(ctx, workEvalInterval) | |
// Process incoming messages | |
ch := pubsub.Channel() | |
for { | |
select { | |
case <-ctx.Done(): | |
if outputConsumer { | |
log.Printf("Consumer-%d shutting down", c.ID) | |
} | |
return | |
case msg, ok := <-ch: | |
if !ok { | |
if outputConsumer { | |
log.Printf("Consumer-%d: Channel closed", c.ID) | |
} | |
return | |
} | |
var message Message | |
if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil { | |
log.Printf("Consumer-%d: Error unmarshaling message: %v", c.ID, err) | |
continue | |
} | |
c.addMessageToView(message) | |
if outputConsumer { | |
log.Printf("Consumer-%d: Received message from %s (Ready: %t)", | |
c.ID, message.Identifier, message.IsReady) | |
} | |
} | |
} | |
} | |
func (c *Consumer) addMessageToView(msg Message) { | |
c.mu.Lock() | |
defer c.mu.Unlock() | |
clusterKey := msg.Cluster | |
c.Views[clusterKey] = append(c.Views[clusterKey], msg) | |
// Sort by timestamp, then by identifier for consistent ordering | |
sort.Slice(c.Views[clusterKey], func(i, j int) bool { | |
messages := c.Views[clusterKey] | |
if messages[i].Timestamp.Equal(messages[j].Timestamp) { | |
return messages[i].Identifier < messages[j].Identifier | |
} | |
return messages[i].Timestamp.Before(messages[j].Timestamp) | |
}) | |
} | |
func (c *Consumer) cleanupExpiredMessages(ctx context.Context, interval time.Duration) { | |
ticker := time.NewTicker(interval) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case <-ticker.C: | |
c.mu.Lock() | |
now := time.Now() | |
for cluster, messages := range c.Views { | |
// Remove messages older than ViewTTL | |
var validMessages []Message | |
for _, msg := range messages { | |
if now.Sub(msg.Timestamp) <= c.ViewTTL { | |
validMessages = append(validMessages, msg) | |
} | |
} | |
if len(validMessages) == 0 { | |
delete(c.Views, cluster) | |
} else { | |
c.Views[cluster] = validMessages | |
} | |
} | |
c.mu.Unlock() | |
} | |
} | |
} | |
func (c *Consumer) evaluateWork(ctx context.Context, interval time.Duration) { | |
ticker := time.NewTicker(interval) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case <-ticker.C: | |
c.mu.RLock() | |
if len(c.Views) == 0 { | |
c.mu.RUnlock() | |
continue | |
} | |
// Get all known clusters | |
clusters := make([]string, 0, len(c.Views)) | |
for cluster := range c.Views { | |
clusters = append(clusters, cluster) | |
} | |
// Randomly select a cluster to evaluate | |
selectedCluster := clusters[rand.Intn(len(clusters))] | |
messages := make([]Message, len(c.Views[selectedCluster])) | |
copy(messages, c.Views[selectedCluster]) | |
c.mu.RUnlock() | |
// Evaluate work for the selected cluster | |
if len(messages) > 0 { | |
c.evaluateClusterWork(selectedCluster, messages) | |
} | |
} | |
} | |
} | |
func (c *Consumer) evaluateClusterWork(cluster string, messages []Message) { | |
// Group messages by producer identifier to get the most recent status per producer | |
latestByProducer := make(map[string]Message) | |
for _, msg := range messages { | |
if existing, ok := latestByProducer[msg.Identifier]; !ok || msg.Timestamp.After(existing.Timestamp) { | |
latestByProducer[msg.Identifier] = msg | |
} | |
} | |
// Check if any producer in the cluster is not ready | |
var notReadyProducers []string | |
for identifier, msg := range latestByProducer { | |
if !msg.IsReady { | |
notReadyProducers = append(notReadyProducers, identifier) | |
} | |
} | |
if len(notReadyProducers) > 0 { | |
if outputConsumer { | |
log.Printf("Consumer-%d: Work is rejected for %s (not ready: %v)", | |
c.ID, cluster, notReadyProducers) | |
} | |
} else { | |
if outputConsumer { | |
log.Printf("Consumer-%d: %s is ready for work (%d producers ready)", | |
c.ID, cluster, len(latestByProducer)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment