Created
August 2, 2022 19:58
-
-
Save thrawn01/97c38687790ea5d4469fe22afb3d45cf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit b1fbfd306a573e327cc2aac9176e7d43ec87e17d | |
Author: Derrick Wippler <[email protected]> | |
Date: Thu Sep 23 16:46:02 2021 -0500 | |
WIP: added steve job library | |
diff --git a/steve/README.md b/steve/README.md | |
new file mode 100644 | |
index 0000000..e75ce66 | |
--- /dev/null | |
+++ b/steve/README.md | |
@@ -0,0 +1,42 @@ | |
+## Steve Jobs | |
+A tmux/screen like job running system | |
+ | |
+The idea here is that this library would be used in a service to facilitate starting | |
+a local job remotely via HTTP or Websockets and allow the client to disconnect then | |
+reconnect later and see the previous output and stream any further output. (like | |
+tmux or screen sessions in ssh) | |
+ | |
+Users create a job with a Start and Stop method | |
+```go | |
+type Job interface { | |
+ // Start the job, returns an error if the job failed to start or context was cancelled | |
+ Start(context.Context, io.Writer) error | |
+ | |
+ // Stop the job, returns an error if the context was cancelled before job was stopped | |
+ Stop(context.Context) error | |
+} | |
+``` | |
+ | |
+The job uses the provided `io.Writer` for any output which will be saved into the job | |
+buffer which can be broadcast to any clients who are connected via `io.ReadClosers` | |
+to the buffer. | |
+ | |
+Once the job is started remote clients via HTTP or Websockets can read the job buffer | |
+by calling. | |
+```go | |
+reader, err := jobRunner.NewReader("job-id") | |
+``` | |
+Clients should read from this returned `reader` until `io.EOF`, indicating the job | |
+is complete and no more output can be read. Reads to this reader will block until new | |
+data in the buffer is available to read. This will make it simple to stream data back | |
+to clients via what ever transport the implementor has choosen, GRPC, HTTP, or Websockets. | |
+ | |
+The library is designed to allow multiple clients to read from the same buffer | |
+simultaneously, in this way many clients can monitor the progress of a job in real time. | |
+ | |
+ | |
+ | |
+ | |
+ | |
+ | |
+ | |
diff --git a/steve/job.go b/steve/job.go | |
new file mode 100644 | |
index 0000000..21d2712 | |
--- /dev/null | |
+++ b/steve/job.go | |
@@ -0,0 +1,54 @@ | |
+package steve | |
+ | |
+import ( | |
+ "context" | |
+ "io" | |
+ "time" | |
+) | |
+ | |
+type Status struct { | |
+ ID ID `json:"id"` | |
+ Running bool `json:"running"` | |
+ Started time.Time `json:"created"` | |
+ Stopped time.Time `json:"stopped"` | |
+} | |
+ | |
+type Job interface { | |
+ // Start the job, returns an error if the job failed to start or context was cancelled | |
+ Start(context.Context, io.Writer) error | |
+ | |
+ // Stop the job, returns an error if the context was cancelled before job was stopped | |
+ Stop(context.Context) error | |
+} | |
+ | |
+type ID string | |
+ | |
+// Runner provides a job running service which runs a single job. The job is provided a writer which | |
+// is buffered and stored for live monitoring or later retrieval. A client interested in a job may | |
+// request a reader, then close it, then request a new reader and resume monitoring the output | |
+// of the job. In this way long running jobs can be monitored for output, disconnect and resume | |
+// monitoring later. | |
+type Runner interface { | |
+ // Run the provided job, returning a ID which can be used to track the status of a job. | |
+ // Returns an error if the job failed to start of context was cancelled. | |
+ Run(context.Context, Job) (ID, error) | |
+ | |
+ // NewReader returns an io.Reader which can be read to get the most current output from a running job. | |
+ // Job runner supports multiple readers for the same job. In this way multiple remote clients may monitor | |
+ // the output of the job simultaneously. Reader will return io.EOF when the job is no longer running and all | |
+ // output has been read. Caller should called Close() on the reader when it is done reading, this will | |
+ // free up resources. | |
+ NewReader(ID) (io.ReadCloser, error) | |
+ | |
+ // Stop a currently running job, returns an error if the context was cancelled before the job stopped. | |
+ Stop(context.Context, ID) error | |
+ | |
+ // Close all currently running jobs | |
+ Close(context.Context) error | |
+ | |
+ // Status returns the status of the job, returns false if the job doesn't exist | |
+ Status(ID) (Status, bool) | |
+ | |
+ // List all jobs | |
+ List() []Status | |
+} | |
diff --git a/steve/job_test.go b/steve/job_test.go | |
new file mode 100644 | |
index 0000000..94c8762 | |
--- /dev/null | |
+++ b/steve/job_test.go | |
@@ -0,0 +1,101 @@ | |
+package steve_test | |
+ | |
+import ( | |
+ "bufio" | |
+ "context" | |
+ "fmt" | |
+ "io" | |
+ "testing" | |
+ "time" | |
+ | |
+ "github.com/mailgun/holster/v4/steve" | |
+ "github.com/mailgun/holster/v4/syncutil" | |
+ "github.com/stretchr/testify/assert" | |
+ "github.com/stretchr/testify/require" | |
+) | |
+ | |
+type testJob struct { | |
+ wg syncutil.WaitGroup | |
+} | |
+ | |
+func (t *testJob) Start(ctx context.Context, writer io.Writer) error { | |
+ fmt.Fprintf(writer, "Job Start\n") | |
+ var count int | |
+ | |
+ t.wg.Until(func(done chan struct{}) bool { | |
+ fmt.Fprintf(writer, "line: %d\n", count) | |
+ count++ | |
+ select { | |
+ case <-done: | |
+ fmt.Fprintf(writer, "Job Stop\n") | |
+ return false | |
+ case <-time.After(time.Millisecond * 300): | |
+ return true | |
+ } | |
+ }) | |
+ return nil | |
+} | |
+ | |
+func (t *testJob) Stop(ctx context.Context) error { | |
+ t.wg.Stop() | |
+ return nil | |
+} | |
+ | |
+func TestRunner(t *testing.T) { | |
+ runner := steve.NewJobRunner(20) | |
+ require.NotNil(t, runner) | |
+ | |
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) | |
+ defer cancel() | |
+ | |
+ id, err := runner.Run(ctx, &testJob{}) | |
+ require.NoError(t, err) | |
+ assert.NotEmpty(t, id) | |
+ | |
+ // Supports Multiple Readers for the same job | |
+ go func() { | |
+ r, err := runner.NewReader(id) | |
+ require.NoError(t, err) | |
+ | |
+ buf := bufio.NewReader(r) | |
+ for { | |
+ line, err := buf.ReadBytes('\n') | |
+ if err != nil { | |
+ return | |
+ } | |
+ fmt.Printf("+ GOT: %s", string(line)) | |
+ } | |
+ }() | |
+ | |
+ go func() { | |
+ r, err := runner.NewReader(id) | |
+ require.NoError(t, err) | |
+ | |
+ buf := bufio.NewReader(r) | |
+ for { | |
+ line, err := buf.ReadBytes('\n') | |
+ if err != nil { | |
+ return | |
+ } | |
+ fmt.Printf("- GOT: %s", string(line)) | |
+ } | |
+ }() | |
+ | |
+ time.Sleep(time.Second) | |
+ | |
+ s, ok := runner.Status(id) | |
+ require.True(t, ok) | |
+ assert.Equal(t, id, s.ID) | |
+ assert.Equal(t, true, s.Running) | |
+ assert.False(t, s.Started.IsZero()) | |
+ assert.True(t, s.Stopped.IsZero()) | |
+ | |
+ err = runner.Stop(ctx, id) | |
+ require.NoError(t, err) | |
+ | |
+ // List should show the job as not running | |
+ l := runner.List() | |
+ assert.Equal(t, id, l[0].ID) | |
+ assert.Equal(t, false, l[0].Running) | |
+ | |
+} | |
diff --git a/steve/runner.go b/steve/runner.go | |
new file mode 100644 | |
index 0000000..d43dbbf | |
--- /dev/null | |
+++ b/steve/runner.go | |
@@ -0,0 +1,249 @@ | |
+package steve | |
+ | |
+import ( | |
+ "bytes" | |
+ "context" | |
+ "errors" | |
+ "fmt" | |
+ "io" | |
+ "io/ioutil" | |
+ "sync" | |
+ "sync/atomic" | |
+ "time" | |
+ | |
+ "github.com/google/uuid" | |
+ "github.com/mailgun/holster/v4/collections" | |
+ "github.com/mailgun/holster/v4/syncutil" | |
+) | |
+ | |
+var ( | |
+ ErrJobNotFound = errors.New("no such job found") | |
+ ErrJobNotRunning = errors.New("job not running") | |
+) | |
+ | |
+type jobIO struct { | |
+ br syncutil.Broadcaster | |
+ writer io.WriteCloser | |
+ mutex sync.Mutex | |
+ buffer bytes.Buffer | |
+ id ID | |
+ running int64 | |
+ job Job | |
+} | |
+ | |
+type runner struct { | |
+ jobs *collections.LRUCache | |
+ //jobs map[ID]*jobIO | |
+ mutex sync.Mutex | |
+ wg syncutil.WaitGroup | |
+} | |
+ | |
+func NewJobRunner(capacity int) Runner { | |
+ return &runner{ | |
+ //jobs: make(map[ID]*jobIO), | |
+ jobs: collections.NewLRUCache(capacity), | |
+ } | |
+} | |
+ | |
+func (r *runner) Run(ctx context.Context, job Job) (ID, error) { | |
+ reader, writer := io.Pipe() | |
+ | |
+ j := jobIO{ | |
+ id: ID(uuid.New().String()), | |
+ br: syncutil.NewBroadcaster(), | |
+ writer: writer, | |
+ job: job, | |
+ } | |
+ | |
+ // Spawn a go routine to monitor job output, storing the output into the j.buffer | |
+ r.wg.Go(func() { | |
+ ch := make(chan []byte) | |
+ atomic.StoreInt64(&j.running, 1) | |
+ | |
+ // Spawn a separate go routine as the read could block forever | |
+ go func() { | |
+ buf := make([]byte, 2024) | |
+ for { | |
+ n, err := reader.Read(buf) | |
+ if err != nil { | |
+ close(ch) | |
+ return | |
+ } | |
+ out := make([]byte, n) | |
+ copy(out, buf[:n]) | |
+ ch <- out | |
+ } | |
+ }() | |
+ | |
+ for { | |
+ select { | |
+ case line, ok := <-ch: | |
+ if !ok { | |
+ atomic.StoreInt64(&j.running, 0) | |
+ j.br.Broadcast() | |
+ return | |
+ } | |
+ j.mutex.Lock() | |
+ j.buffer.Write(line) | |
+ j.br.Broadcast() | |
+ j.mutex.Unlock() | |
+ } | |
+ } | |
+ }) | |
+ r.jobs.Add(j.id, &j) | |
+ | |
+ if err := job.Start(ctx, writer); err != nil { | |
+ return "", err | |
+ } | |
+ | |
+ for { | |
+ if atomic.LoadInt64(&j.running) == 1 { | |
+ break | |
+ } | |
+ select { | |
+ case <-ctx.Done(): | |
+ return "", ctx.Err() | |
+ } | |
+ } | |
+ | |
+ return j.id, nil | |
+} | |
+ | |
+func (r *runner) NewReader(id ID) (io.ReadCloser, error) { | |
+ defer r.mutex.Unlock() | |
+ r.mutex.Lock() | |
+ | |
+ obj, ok := r.jobs.Get(id) | |
+ if !ok { | |
+ return nil, ErrJobNotFound | |
+ } | |
+ j := obj.(*jobIO) | |
+ | |
+ // If the job isn't running, then copy the current buffer | |
+ // into a read closer and return that to the caller. | |
+ if atomic.LoadInt64(&j.running) == 0 { | |
+ j.mutex.Lock() | |
+ defer j.mutex.Unlock() | |
+ buf := bytes.Buffer{} | |
+ buf.Write(j.buffer.Bytes()) | |
+ return ioutil.NopCloser(&buf), nil | |
+ } | |
+ | |
+ // Create a go routine that sends all unread bytes to the reader then | |
+ // waits for new bytes to be written to the j.buffer via the broadcaster. | |
+ reader, writer := io.Pipe() | |
+ r.wg.Go(func() { | |
+ var idx = 0 | |
+ for { | |
+ // Grab any bytes from the buffer we haven't sent to our reader | |
+ j.mutex.Lock() | |
+ src := j.buffer.Bytes() | |
+ dst := make([]byte, j.buffer.Len()-idx) | |
+ copy(dst, src[idx:j.buffer.Len()]) | |
+ j.mutex.Unlock() | |
+ | |
+ // Preform the write outside the mutex as it could block and we don't | |
+ // want to hold on to the mutex for long | |
+ n, err := writer.Write(dst) | |
+ if err != nil { | |
+ // If the reader called Close() on the pipe | |
+ return | |
+ } | |
+ idx += n | |
+ | |
+ // The job routine will broadcast when it stops the job and no | |
+ // more bytes are available to read. | |
+ if atomic.LoadInt64(&j.running) == 0 { | |
+ writer.Close() | |
+ return | |
+ } | |
+ | |
+ // Wait for broadcaster to tell us there are new bytes to read. | |
+ j.br.Wait(string(j.id)) | |
+ | |
+ } | |
+ }) | |
+ | |
+ return reader, nil | |
+} | |
+ | |
+func (r *runner) Stop(ctx context.Context, id ID) error { | |
+ defer r.mutex.Unlock() | |
+ r.mutex.Lock() | |
+ | |
+ obj, ok := r.jobs.Get(id) | |
+ if !ok { | |
+ return ErrJobNotFound | |
+ } | |
+ j := obj.(*jobIO) | |
+ | |
+ // Ignore if already stopped | |
+ if atomic.LoadInt64(&j.running) == 0 { | |
+ return ErrJobNotRunning | |
+ } | |
+ | |
+ return r.stop(ctx, j) | |
+} | |
+ | |
+func (r *runner) stop(ctx context.Context, j *jobIO) error { | |
+ // Stop the job | |
+ if err := j.job.Stop(ctx); err != nil { | |
+ return err | |
+ } | |
+ | |
+ // Close the writer, this should tell the reading go routine to shutdown | |
+ j.writer.Close() | |
+ return nil | |
+} | |
+ | |
+func (r *runner) Status(id ID) (Status, bool) { | |
+ obj, ok := r.jobs.Get(id) | |
+ if !ok { | |
+ return Status{}, false | |
+ } | |
+ j := obj.(*jobIO) | |
+ // TODO: Add Status to the jobIO | |
+ return Status{ | |
+ ID: "", | |
+ Running: false, | |
+ Started: time.Time{}, | |
+ Stopped: time.Time{}, | |
+ } | |
+} | |
+ | |
+func (r *runner) List() []Status { | |
+ defer r.mutex.Unlock() | |
+ r.mutex.Lock() | |
+ | |
+ var result []Status | |
+ r.jobs.Each(1, func(key interface{}, value interface{}) error { | |
+ j := value.(*jobIO) | |
+ result = append(result, Status{ | |
+ ID: j.id, | |
+ Running: atomic.LoadInt64(&j.running) == 1, | |
+ }) | |
+ return nil | |
+ }) | |
+ return result | |
+} | |
+ | |
+func (r *runner) Close(ctx context.Context) error { | |
+ defer r.mutex.Unlock() | |
+ r.mutex.Lock() | |
+ | |
+ for _, s := range r.List() { | |
+ obj, ok := r.jobs.Get(s.ID) | |
+ if !ok { | |
+ continue | |
+ } | |
+ j := obj.(*jobIO) | |
+ // Skip if not running | |
+ if atomic.LoadInt64(&j.running) == 0 { | |
+ continue | |
+ } | |
+ if err := r.stop(ctx, j); err != nil { | |
+ return fmt.Errorf("while stopping '%s': %w", j.id, err) | |
+ } | |
+ } | |
+ return nil | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment