Skip to content

Instantly share code, notes, and snippets.

@thrawn01
Created August 2, 2022 19:58
Show Gist options
  • Save thrawn01/97c38687790ea5d4469fe22afb3d45cf to your computer and use it in GitHub Desktop.
Save thrawn01/97c38687790ea5d4469fe22afb3d45cf to your computer and use it in GitHub Desktop.
commit b1fbfd306a573e327cc2aac9176e7d43ec87e17d
Author: Derrick Wippler <[email protected]>
Date: Thu Sep 23 16:46:02 2021 -0500
WIP: added steve job library
diff --git a/steve/README.md b/steve/README.md
new file mode 100644
index 0000000..e75ce66
--- /dev/null
+++ b/steve/README.md
@@ -0,0 +1,42 @@
+## Steve Jobs
+A tmux/screen like job running system
+
+The idea here is that this library would be used in a service to facilitate starting
+a local job remotely via HTTP or Websockets and allow the client to disconnect then
+reconnect later and see the previous output and stream any further output. (like
+tmux or screen sessions in ssh)
+
+Users create a job with a Start and Stop method
+```go
+type Job interface {
+ // Start the job, returns an error if the job failed to start or context was cancelled
+ Start(context.Context, io.Writer) error
+
+ // Stop the job, returns an error if the context was cancelled before job was stopped
+ Stop(context.Context) error
+}
+```
+
+The job uses the provided `io.Writer` for any output which will be saved into the job
+buffer which can be broadcast to any clients who are connected via `io.ReadClosers`
+to the buffer.
+
+Once the job is started remote clients via HTTP or Websockets can read the job buffer
+by calling.
+```go
+reader, err := jobRunner.NewReader("job-id")
+```
+Clients should read from this returned `reader` until `io.EOF`, indicating the job
+is complete and no more output can be read. Reads to this reader will block until new
+data in the buffer is available to read. This will make it simple to stream data back
+to clients via what ever transport the implementor has choosen, GRPC, HTTP, or Websockets.
+
+The library is designed to allow multiple clients to read from the same buffer
+simultaneously, in this way many clients can monitor the progress of a job in real time.
+
+
+
+
+
+
+
diff --git a/steve/job.go b/steve/job.go
new file mode 100644
index 0000000..21d2712
--- /dev/null
+++ b/steve/job.go
@@ -0,0 +1,54 @@
+package steve
+
+import (
+ "context"
+ "io"
+ "time"
+)
+
+type Status struct {
+ ID ID `json:"id"`
+ Running bool `json:"running"`
+ Started time.Time `json:"created"`
+ Stopped time.Time `json:"stopped"`
+}
+
+type Job interface {
+ // Start the job, returns an error if the job failed to start or context was cancelled
+ Start(context.Context, io.Writer) error
+
+ // Stop the job, returns an error if the context was cancelled before job was stopped
+ Stop(context.Context) error
+}
+
+type ID string
+
+// Runner provides a job running service which runs a single job. The job is provided a writer which
+// is buffered and stored for live monitoring or later retrieval. A client interested in a job may
+// request a reader, then close it, then request a new reader and resume monitoring the output
+// of the job. In this way long running jobs can be monitored for output, disconnect and resume
+// monitoring later.
+type Runner interface {
+ // Run the provided job, returning a ID which can be used to track the status of a job.
+ // Returns an error if the job failed to start of context was cancelled.
+ Run(context.Context, Job) (ID, error)
+
+ // NewReader returns an io.Reader which can be read to get the most current output from a running job.
+ // Job runner supports multiple readers for the same job. In this way multiple remote clients may monitor
+ // the output of the job simultaneously. Reader will return io.EOF when the job is no longer running and all
+ // output has been read. Caller should called Close() on the reader when it is done reading, this will
+ // free up resources.
+ NewReader(ID) (io.ReadCloser, error)
+
+ // Stop a currently running job, returns an error if the context was cancelled before the job stopped.
+ Stop(context.Context, ID) error
+
+ // Close all currently running jobs
+ Close(context.Context) error
+
+ // Status returns the status of the job, returns false if the job doesn't exist
+ Status(ID) (Status, bool)
+
+ // List all jobs
+ List() []Status
+}
diff --git a/steve/job_test.go b/steve/job_test.go
new file mode 100644
index 0000000..94c8762
--- /dev/null
+++ b/steve/job_test.go
@@ -0,0 +1,101 @@
+package steve_test
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "io"
+ "testing"
+ "time"
+
+ "github.com/mailgun/holster/v4/steve"
+ "github.com/mailgun/holster/v4/syncutil"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type testJob struct {
+ wg syncutil.WaitGroup
+}
+
+func (t *testJob) Start(ctx context.Context, writer io.Writer) error {
+ fmt.Fprintf(writer, "Job Start\n")
+ var count int
+
+ t.wg.Until(func(done chan struct{}) bool {
+ fmt.Fprintf(writer, "line: %d\n", count)
+ count++
+ select {
+ case <-done:
+ fmt.Fprintf(writer, "Job Stop\n")
+ return false
+ case <-time.After(time.Millisecond * 300):
+ return true
+ }
+ })
+ return nil
+}
+
+func (t *testJob) Stop(ctx context.Context) error {
+ t.wg.Stop()
+ return nil
+}
+
+func TestRunner(t *testing.T) {
+ runner := steve.NewJobRunner(20)
+ require.NotNil(t, runner)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+ defer cancel()
+
+ id, err := runner.Run(ctx, &testJob{})
+ require.NoError(t, err)
+ assert.NotEmpty(t, id)
+
+ // Supports Multiple Readers for the same job
+ go func() {
+ r, err := runner.NewReader(id)
+ require.NoError(t, err)
+
+ buf := bufio.NewReader(r)
+ for {
+ line, err := buf.ReadBytes('\n')
+ if err != nil {
+ return
+ }
+ fmt.Printf("+ GOT: %s", string(line))
+ }
+ }()
+
+ go func() {
+ r, err := runner.NewReader(id)
+ require.NoError(t, err)
+
+ buf := bufio.NewReader(r)
+ for {
+ line, err := buf.ReadBytes('\n')
+ if err != nil {
+ return
+ }
+ fmt.Printf("- GOT: %s", string(line))
+ }
+ }()
+
+ time.Sleep(time.Second)
+
+ s, ok := runner.Status(id)
+ require.True(t, ok)
+ assert.Equal(t, id, s.ID)
+ assert.Equal(t, true, s.Running)
+ assert.False(t, s.Started.IsZero())
+ assert.True(t, s.Stopped.IsZero())
+
+ err = runner.Stop(ctx, id)
+ require.NoError(t, err)
+
+ // List should show the job as not running
+ l := runner.List()
+ assert.Equal(t, id, l[0].ID)
+ assert.Equal(t, false, l[0].Running)
+
+}
diff --git a/steve/runner.go b/steve/runner.go
new file mode 100644
index 0000000..d43dbbf
--- /dev/null
+++ b/steve/runner.go
@@ -0,0 +1,249 @@
+package steve
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/mailgun/holster/v4/collections"
+ "github.com/mailgun/holster/v4/syncutil"
+)
+
+var (
+ ErrJobNotFound = errors.New("no such job found")
+ ErrJobNotRunning = errors.New("job not running")
+)
+
+type jobIO struct {
+ br syncutil.Broadcaster
+ writer io.WriteCloser
+ mutex sync.Mutex
+ buffer bytes.Buffer
+ id ID
+ running int64
+ job Job
+}
+
+type runner struct {
+ jobs *collections.LRUCache
+ //jobs map[ID]*jobIO
+ mutex sync.Mutex
+ wg syncutil.WaitGroup
+}
+
+func NewJobRunner(capacity int) Runner {
+ return &runner{
+ //jobs: make(map[ID]*jobIO),
+ jobs: collections.NewLRUCache(capacity),
+ }
+}
+
+func (r *runner) Run(ctx context.Context, job Job) (ID, error) {
+ reader, writer := io.Pipe()
+
+ j := jobIO{
+ id: ID(uuid.New().String()),
+ br: syncutil.NewBroadcaster(),
+ writer: writer,
+ job: job,
+ }
+
+ // Spawn a go routine to monitor job output, storing the output into the j.buffer
+ r.wg.Go(func() {
+ ch := make(chan []byte)
+ atomic.StoreInt64(&j.running, 1)
+
+ // Spawn a separate go routine as the read could block forever
+ go func() {
+ buf := make([]byte, 2024)
+ for {
+ n, err := reader.Read(buf)
+ if err != nil {
+ close(ch)
+ return
+ }
+ out := make([]byte, n)
+ copy(out, buf[:n])
+ ch <- out
+ }
+ }()
+
+ for {
+ select {
+ case line, ok := <-ch:
+ if !ok {
+ atomic.StoreInt64(&j.running, 0)
+ j.br.Broadcast()
+ return
+ }
+ j.mutex.Lock()
+ j.buffer.Write(line)
+ j.br.Broadcast()
+ j.mutex.Unlock()
+ }
+ }
+ })
+ r.jobs.Add(j.id, &j)
+
+ if err := job.Start(ctx, writer); err != nil {
+ return "", err
+ }
+
+ for {
+ if atomic.LoadInt64(&j.running) == 1 {
+ break
+ }
+ select {
+ case <-ctx.Done():
+ return "", ctx.Err()
+ }
+ }
+
+ return j.id, nil
+}
+
+func (r *runner) NewReader(id ID) (io.ReadCloser, error) {
+ defer r.mutex.Unlock()
+ r.mutex.Lock()
+
+ obj, ok := r.jobs.Get(id)
+ if !ok {
+ return nil, ErrJobNotFound
+ }
+ j := obj.(*jobIO)
+
+ // If the job isn't running, then copy the current buffer
+ // into a read closer and return that to the caller.
+ if atomic.LoadInt64(&j.running) == 0 {
+ j.mutex.Lock()
+ defer j.mutex.Unlock()
+ buf := bytes.Buffer{}
+ buf.Write(j.buffer.Bytes())
+ return ioutil.NopCloser(&buf), nil
+ }
+
+ // Create a go routine that sends all unread bytes to the reader then
+ // waits for new bytes to be written to the j.buffer via the broadcaster.
+ reader, writer := io.Pipe()
+ r.wg.Go(func() {
+ var idx = 0
+ for {
+ // Grab any bytes from the buffer we haven't sent to our reader
+ j.mutex.Lock()
+ src := j.buffer.Bytes()
+ dst := make([]byte, j.buffer.Len()-idx)
+ copy(dst, src[idx:j.buffer.Len()])
+ j.mutex.Unlock()
+
+ // Preform the write outside the mutex as it could block and we don't
+ // want to hold on to the mutex for long
+ n, err := writer.Write(dst)
+ if err != nil {
+ // If the reader called Close() on the pipe
+ return
+ }
+ idx += n
+
+ // The job routine will broadcast when it stops the job and no
+ // more bytes are available to read.
+ if atomic.LoadInt64(&j.running) == 0 {
+ writer.Close()
+ return
+ }
+
+ // Wait for broadcaster to tell us there are new bytes to read.
+ j.br.Wait(string(j.id))
+
+ }
+ })
+
+ return reader, nil
+}
+
+func (r *runner) Stop(ctx context.Context, id ID) error {
+ defer r.mutex.Unlock()
+ r.mutex.Lock()
+
+ obj, ok := r.jobs.Get(id)
+ if !ok {
+ return ErrJobNotFound
+ }
+ j := obj.(*jobIO)
+
+ // Ignore if already stopped
+ if atomic.LoadInt64(&j.running) == 0 {
+ return ErrJobNotRunning
+ }
+
+ return r.stop(ctx, j)
+}
+
+func (r *runner) stop(ctx context.Context, j *jobIO) error {
+ // Stop the job
+ if err := j.job.Stop(ctx); err != nil {
+ return err
+ }
+
+ // Close the writer, this should tell the reading go routine to shutdown
+ j.writer.Close()
+ return nil
+}
+
+func (r *runner) Status(id ID) (Status, bool) {
+ obj, ok := r.jobs.Get(id)
+ if !ok {
+ return Status{}, false
+ }
+ j := obj.(*jobIO)
+ // TODO: Add Status to the jobIO
+ return Status{
+ ID: "",
+ Running: false,
+ Started: time.Time{},
+ Stopped: time.Time{},
+ }
+}
+
+func (r *runner) List() []Status {
+ defer r.mutex.Unlock()
+ r.mutex.Lock()
+
+ var result []Status
+ r.jobs.Each(1, func(key interface{}, value interface{}) error {
+ j := value.(*jobIO)
+ result = append(result, Status{
+ ID: j.id,
+ Running: atomic.LoadInt64(&j.running) == 1,
+ })
+ return nil
+ })
+ return result
+}
+
+func (r *runner) Close(ctx context.Context) error {
+ defer r.mutex.Unlock()
+ r.mutex.Lock()
+
+ for _, s := range r.List() {
+ obj, ok := r.jobs.Get(s.ID)
+ if !ok {
+ continue
+ }
+ j := obj.(*jobIO)
+ // Skip if not running
+ if atomic.LoadInt64(&j.running) == 0 {
+ continue
+ }
+ if err := r.stop(ctx, j); err != nil {
+ return fmt.Errorf("while stopping '%s': %w", j.id, err)
+ }
+ }
+ return nil
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment