Last active
October 24, 2017 10:13
-
-
Save cfchou/c2ac4060aaf0fcada38a3d85b3c07a71 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"gopkg.in/cfchou/go-gentle.v3/gentle" | |
"context" | |
"math/rand" | |
"database/sql" | |
_ "github.com/mattn/go-sqlite3" | |
"fmt" | |
"time" | |
) | |
// GameScore implements gentle.Message interface | |
type GameScore struct { | |
id string // better to be unique for tracing it in log | |
Score int | |
} | |
// ID is the only method that a gentle.Message must have | |
func (s GameScore) ID() string { | |
return s.id | |
} | |
// scoreStream is a gentle.Stream that wraps an API call to an external service for | |
// getting game scores. | |
// For simple cases that the logic can be defined entirely in a function, we can | |
// to just declare it to be a gentle.SimpleStream. | |
var scoreStream gentle.SimpleStream = func(_ context.Context) (gentle.Message, error) { | |
// simulate a result from an external service | |
return &GameScore{ | |
id: "", | |
Score: rand.Intn(100), | |
}, nil | |
} | |
// DbWriter is a gentle.Handler which writes scores to the database. | |
// Instead of using gentle.SimpleHandler, we define a struct explicitly | |
// implementing gentle.Handler interface. | |
type DbWriter struct { | |
db *sql.DB | |
table string | |
} | |
func (h *DbWriter) Handle(_ context.Context, msg gentle.Message) (gentle.Message, error) { | |
gameScore := msg.(*GameScore) | |
statement := fmt.Sprintf("INSERT INTO %s (score, date) VALUES (?, DATETIME());", h.table) | |
_, err := h.db.Exec(statement, gameScore.Score) | |
if err != nil { | |
return nil, err | |
} | |
return msg, nil | |
} | |
func main() { | |
db, _ := sql.Open("sqlite3", "scores.sqlite") | |
defer db.Close() | |
db.Exec("DROP TABLE IF EXISTS game;") | |
db.Exec("CREATE TABLE game (score INTEGER, date DATETIME);") | |
dbWriter := &DbWriter{ | |
db: db, | |
table: "game", | |
} | |
// Rate-limit the queries while allowing burst of some | |
gentleScoreStream := gentle.NewRateLimitedStream( | |
gentle.NewRateLimitedStreamOpts("myApp", "rlQuery", | |
gentle.NewTokenBucketRateLimit(500*time.Millisecond, 5)), | |
scoreStream) | |
// Limit concurrent writes to Db | |
limitedDbWriter := gentle.NewBulkheadHandler( | |
gentle.NewBulkheadHandlerOpts("myApp", "bkWrite", 16), | |
dbWriter) | |
// Constantly backing off when limitedDbWriter returns an error | |
backoffFactory := gentle.NewConstBackOffFactory( | |
gentle.NewConstBackOffFactoryOpts(500*time.Millisecond, 5*time.Minute)) | |
gentleDbWriter := gentle.NewRetryHandler( | |
gentle.NewRetryHandlerOpts("myApp", "rtWrite", backoffFactory), | |
limitedDbWriter) | |
// Compose the final Stream | |
stream := gentle.AppendHandlersStream(gentleScoreStream, gentleDbWriter) | |
// Keep fetching scores from the remote service to our database. | |
// The amount of simultaneous go-routines are capped by the size of ticketPool. | |
ticketPool := make(chan struct{}, 1000) | |
for { | |
ticketPool <- struct{}{} | |
go func() { | |
defer func(){ <-ticketPool }() | |
stream.Get(context.Background()) | |
}() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment