Skip to content

Instantly share code, notes, and snippets.

@joshdurbin
Last active August 28, 2025 23:54
Show Gist options
  • Save joshdurbin/f4c1681d799eb0d72b515da6f2bbbe71 to your computer and use it in GitHub Desktop.
Save joshdurbin/f4c1681d799eb0d72b515da6f2bbbe71 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sort"
"sync"
"syscall"
"time"
"github.com/redis/go-redis/v9"
)
type Message struct {
Identifier string `json:"identifier"`
Cluster string `json:"cluster"`
Timestamp time.Time `json:"timestamp"`
IsReady bool `json:"is_ready"`
}
type Producer struct {
ID int
ClusterID int
Client *redis.Client
}
type Consumer struct {
ID int
Client *redis.Client
Views map[string][]Message // cluster -> messages
ViewTTL time.Duration
mu sync.RWMutex
}
const (
REDIS_TOPIC = "status_updates"
)
var (
outputProducer bool
outputConsumer bool
)
func main() {
var (
numProducers = flag.Int("producers", 240, "Number of producers")
numConsumers = flag.Int("consumers", 300, "Number of consumers")
redisAddr = flag.String("redis", "localhost:6379", "Redis address")
viewTTL = flag.Duration("view-ttl", 15*time.Second, "View TTL duration")
producerInterval = flag.Duration("producer-interval", time.Second, "How often producers send messages")
cleanupInterval = flag.Duration("cleanup-interval", time.Second, "How often consumers clean up expired messages")
workEvalInterval = flag.Duration("work-eval-interval", 2*time.Second, "How often consumers evaluate work")
outputProducerFlag = flag.Bool("output-producer", false, "Enable producer activity output")
outputConsumerFlag = flag.Bool("output-consumer", false, "Enable consumer activity output")
)
flag.Parse()
outputProducer = *outputProducerFlag
outputConsumer = *outputConsumerFlag
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
if outputProducer || outputConsumer {
log.Println("Shutting down...")
}
cancel()
}()
var wg sync.WaitGroup
// Start producers
for i := 1; i <= *numProducers; i++ {
clusterID := ((i - 1) / 3) + 1
producer := &Producer{
ID: i,
ClusterID: clusterID,
Client: redis.NewClient(&redis.Options{
Addr: *redisAddr,
}),
}
// Test connection
if err := producer.Client.Ping(ctx).Err(); err != nil {
log.Fatalf("Producer-%d: Failed to connect to Redis: %v", i, err)
}
wg.Add(1)
go func(p *Producer, interval time.Duration) {
defer wg.Done()
defer p.Client.Close()
runProducer(ctx, p, interval)
}(producer, *producerInterval)
}
// Start consumers
for i := 1; i <= *numConsumers; i++ {
consumer := &Consumer{
ID: i,
Client: redis.NewClient(&redis.Options{
Addr: *redisAddr,
}),
Views: make(map[string][]Message),
ViewTTL: *viewTTL,
}
// Test connection
if err := consumer.Client.Ping(ctx).Err(); err != nil {
log.Fatalf("Consumer-%d: Failed to connect to Redis: %v", i, err)
}
wg.Add(1)
go func(c *Consumer, cleanupInt, workEvalInt time.Duration) {
defer wg.Done()
defer c.Client.Close()
runConsumer(ctx, c, cleanupInt, workEvalInt)
}(consumer, *cleanupInterval, *workEvalInterval)
}
wg.Wait()
}
func runProducer(ctx context.Context, p *Producer, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
if outputProducer {
log.Printf("Started producer-%d in cluster-%d", p.ID, p.ClusterID)
}
for {
select {
case <-ctx.Done():
if outputProducer {
log.Printf("Producer-%d shutting down", p.ID)
}
return
case <-ticker.C:
msg := Message{
Identifier: fmt.Sprintf("producer-%d", p.ID),
Cluster: fmt.Sprintf("cluster-%d", p.ClusterID),
Timestamp: time.Now(),
IsReady: rand.Intn(2) == 1, // Random boolean
}
data, err := json.Marshal(msg)
if err != nil {
log.Printf("Producer-%d: Error marshaling message: %v", p.ID, err)
continue
}
// Publish message to Redis pub/sub
err = p.Client.Publish(ctx, REDIS_TOPIC, string(data)).Err()
if err != nil {
log.Printf("Producer-%d: Error publishing message: %v", p.ID, err)
continue
}
if outputProducer {
log.Printf("Producer-%d: Published message - Ready: %t", p.ID, msg.IsReady)
}
}
}
}
func runConsumer(ctx context.Context, c *Consumer, cleanupInterval, workEvalInterval time.Duration) {
if outputConsumer {
log.Printf("Started consumer-%d", c.ID)
}
// Subscribe to Redis pub/sub
pubsub := c.Client.Subscribe(ctx, REDIS_TOPIC)
defer pubsub.Close()
// Start cleanup routine for expired messages
go c.cleanupExpiredMessages(ctx, cleanupInterval)
// Start work evaluation routine
go c.evaluateWork(ctx, workEvalInterval)
// Process incoming messages
ch := pubsub.Channel()
for {
select {
case <-ctx.Done():
if outputConsumer {
log.Printf("Consumer-%d shutting down", c.ID)
}
return
case msg, ok := <-ch:
if !ok {
if outputConsumer {
log.Printf("Consumer-%d: Channel closed", c.ID)
}
return
}
var message Message
if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil {
log.Printf("Consumer-%d: Error unmarshaling message: %v", c.ID, err)
continue
}
c.addMessageToView(message)
if outputConsumer {
log.Printf("Consumer-%d: Received message from %s (Ready: %t)",
c.ID, message.Identifier, message.IsReady)
}
}
}
}
func (c *Consumer) addMessageToView(msg Message) {
c.mu.Lock()
defer c.mu.Unlock()
clusterKey := msg.Cluster
c.Views[clusterKey] = append(c.Views[clusterKey], msg)
// Sort by timestamp, then by identifier for consistent ordering
sort.Slice(c.Views[clusterKey], func(i, j int) bool {
messages := c.Views[clusterKey]
if messages[i].Timestamp.Equal(messages[j].Timestamp) {
return messages[i].Identifier < messages[j].Identifier
}
return messages[i].Timestamp.Before(messages[j].Timestamp)
})
}
func (c *Consumer) cleanupExpiredMessages(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.mu.Lock()
now := time.Now()
for cluster, messages := range c.Views {
// Remove messages older than ViewTTL
var validMessages []Message
for _, msg := range messages {
if now.Sub(msg.Timestamp) <= c.ViewTTL {
validMessages = append(validMessages, msg)
}
}
if len(validMessages) == 0 {
delete(c.Views, cluster)
} else {
c.Views[cluster] = validMessages
}
}
c.mu.Unlock()
}
}
}
func (c *Consumer) evaluateWork(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.mu.RLock()
if len(c.Views) == 0 {
c.mu.RUnlock()
continue
}
// Get all known clusters
clusters := make([]string, 0, len(c.Views))
for cluster := range c.Views {
clusters = append(clusters, cluster)
}
// Randomly select a cluster to evaluate
selectedCluster := clusters[rand.Intn(len(clusters))]
messages := make([]Message, len(c.Views[selectedCluster]))
copy(messages, c.Views[selectedCluster])
c.mu.RUnlock()
// Evaluate work for the selected cluster
if len(messages) > 0 {
c.evaluateClusterWork(selectedCluster, messages)
}
}
}
}
func (c *Consumer) evaluateClusterWork(cluster string, messages []Message) {
// Group messages by producer identifier to get the most recent status per producer
latestByProducer := make(map[string]Message)
for _, msg := range messages {
if existing, ok := latestByProducer[msg.Identifier]; !ok || msg.Timestamp.After(existing.Timestamp) {
latestByProducer[msg.Identifier] = msg
}
}
// Check if any producer in the cluster is not ready
var notReadyProducers []string
for identifier, msg := range latestByProducer {
if !msg.IsReady {
notReadyProducers = append(notReadyProducers, identifier)
}
}
if len(notReadyProducers) > 0 {
if outputConsumer {
log.Printf("Consumer-%d: Work is rejected for %s (not ready: %v)",
c.ID, cluster, notReadyProducers)
}
} else {
if outputConsumer {
log.Printf("Consumer-%d: %s is ready for work (%d producers ready)",
c.ID, cluster, len(latestByProducer))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment