Created
June 6, 2018 19:36
-
-
Save jhartman86/9fe89a4ee0ece98b27d3274f171024f6 to your computer and use it in GitHub Desktop.
optimistic fan-out concurrency job queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package queue | |
import ( | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
"github.com/jhartman86/conduit/pkg/system/database" | |
) | |
type Worker func(p interface{}) (msg string, err error) | |
type Config struct { | |
Key string | |
Concurrency int | |
BatchLoadSize int | |
Interval int | |
Consumer Worker | |
// notice this is *unexported*; its set internally to true only | |
// if the configuration is valid (and defaults to false) | |
valid bool | |
} | |
/* | |
Kue is an encompassing struct that contains all the settings and | |
handlers for running a Kue. | |
*/ | |
type kue struct { | |
config Config | |
running bool | |
iterations int | |
locker sync.RWMutex | |
} | |
/* | |
WorkUnit would otherwise be an unexported type, if we didn't need to use it | |
during database migrations for the models setup; it should be considered a | |
representation for *internal* payloads used by the Queue. | |
*/ | |
type WorkUnit struct { | |
ID string `gorm:"primary_key;default:from_uuid(uuid_v4())"` | |
Status string `gorm:"not null;index"` | |
Key string `gorm:"not null;index"` | |
Payload string `gorm:"not null"` | |
ResponseMessage database.NullString `gorm:"default:null"` | |
ResponseError database.NullString `gorm:"default:null"` | |
CreatedAt time.Time | |
UpdatedAt time.Time | |
} | |
/* | |
New returns a configured kue, but does error checking first to ensure | |
the configs are valid. | |
*/ | |
func New(c Config) (k kue, err error) { | |
if c.Concurrency < 1 { | |
err = fmt.Errorf(`Queues cannot be created with concurrency less than 1`) | |
return | |
} | |
c.valid = true | |
k.config = c | |
return | |
} | |
/* | |
Start is the public entry to begin the long running blocking loop | |
*/ | |
func (k *kue) Start() error { | |
if !k.config.valid { | |
return fmt.Errorf(`Queue with invalid configuration will not be started`) | |
} | |
if k.IsRunning() { | |
return fmt.Errorf(`Queue is already running`) | |
} | |
k.setRunning(true) | |
return k.next() | |
} | |
/* | |
Stop will set the running property to set, which has the positive | |
consequence of *letting the currently in-flight next iterations* | |
complete before trying to stop the next() loop. | |
*/ | |
func (k *kue) Stop() { | |
k.setRunning(false) | |
} | |
/* | |
IsRunning atomically checks whether the queue should continue | |
running; it *does not* check whether there is a currently in | |
flight - but as of yet unfinished - set of workers running in | |
the current next() iteration. | |
*/ | |
func (k *kue) IsRunning() bool { | |
k.locker.Lock() | |
defer k.locker.Unlock() | |
return k.running | |
} | |
/* | |
Only used internally and put into a method so the locking | |
mechanics can be normalized. | |
*/ | |
func (k *kue) setRunning(to bool) { | |
k.locker.Lock() | |
defer k.locker.Unlock() | |
k.running = to | |
} | |
/* | |
Next is a recursive call which handles: | |
- Fanning out to the number of goroutines set by the concurrency level | |
- Each routine listens for sends on the pipeline channel, and handling | |
the results by invoking the Consumer func passed to the Kue config | |
- Waiting until all goroutines have completed | |
- ... pausing ... then invoking next() again | |
*/ | |
func (k *kue) next() error { | |
// Increment and log the iteration we're on | |
k.iterations = k.iterations + 1 // @todo: definite race type condition; use atomic counter | |
log.Printf("\n\n--- Queue: %s [iteration: %d] ---\n\n", k.config.Key, k.iterations) | |
// Initialize a wait group instance to ensure concurrent routines | |
// complete before moving to the next iteration | |
var wg sync.WaitGroup | |
// Create a pipeline of *up to* the batch size to send jobs into | |
pipeline := make(chan WorkUnit, k.config.BatchLoadSize) | |
// Create config.Concurrency number of go routines, each of which | |
// will range over the pipeline and accept requests to send to | |
// the Consumers. In each routine, when its no longer possible to | |
// range over the pipeline, we invoke wg.Done() and let the goroutine | |
// go to GC | |
for i := 1; i <= k.config.Concurrency; i++ { | |
// Add to the waitgroup | |
wg.Add(1) | |
// Invoke goroutine and pass requisite arguments in to ensure scoping! | |
go func(w Worker, pl chan WorkUnit) { | |
defer wg.Done() | |
for job := range pl { | |
// @todo capture results of syncronously invoked func and send back out to a pipeline | |
// --OR--, syncronously call a success/error handler on the config | |
// which should also be blocking... | |
w(job.Payload) | |
} | |
}(k.config.Consumer, pipeline) | |
} | |
// The for loop above create a bunch of non-blocking routines, so | |
// by the time we get to here, we can pass the pipeline to the | |
// loader which will actually send jobs | |
k.batchLoader(pipeline) | |
// Wait for all routines to complete | |
wg.Wait() // <-pipeline @todo: block on the pipeline instead of waitgroups??? | |
// NOW we check to see if the queue should still be running, | |
// and IF NOT, we don't invoke the next() loop again (also | |
// don't need to do a sleep interval) | |
if !k.IsRunning() { | |
return nil | |
} | |
// Take a break for a set interval | |
time.Sleep(time.Duration(k.config.Interval) * time.Second) | |
// Start the next iteration, incrementing the value + 1 | |
return k.next() | |
} | |
/* | |
Batch loader, in charge or querying for the next set of things to | |
work on. | |
*/ | |
func (k *kue) batchLoader(c chan WorkUnit) { | |
for i := 1; i <= k.config.BatchLoadSize; i++ { | |
c <- WorkUnit{Payload: fmt.Sprintf(`id-%d`, i)} | |
} | |
close(c) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment