Last active
August 12, 2016 01:20
-
-
Save scottfrazer/3bd7b73479a55a307985c4b0a42a533f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
+------------------------------------------------+ | |
| Cli | | |
+------------------------------------------------+ | |
| ^ | |
V | | |
+---+------------------------------------------------+ | |
| | Http | | |
| +------------------------------------------------+ | |
| L | Kernel | | |
| o +-----------------------+------------------------+ $ maple server | |
| g | Database | Backend (+ Filesystem) | | |
| +-----------------------+------------------------+ | |
| | | UNIX proc / AWS Job | | |
+---+ +------------------------+ | |
Kernel (kernel.go) | |
================== | |
The Kernel API will be a Go-only API that will allow submission, manipulation, and querying workflows. | |
The Kernel will be the only thing that has access to the Database layer. Kernel should allow only non-destructive type pass-through functions, like GetWorkflowStatus(), but not SetWorkflowStatus(), and perhaps some minor repackaging of data (e.g. SnapshotOf() returning a *WorkflowContext) | |
NewKernel(settings...) *Kernel | |
kernel.Run(wdl, inputs, options string) *WorkflowContext // returns when workflow finishes | |
kernel.Submit(...) uuid // same as above, but returns quick once it's scheduled. | |
kernel.SnapshotOf(uuid) *WorkflowContext | |
kernel.Abort(uuid) | |
kernel.AbortCall(uuid, fqn string) | |
kernel.List() []uuid | |
States: Submitted -> Running -> [Aborted|Done|Failed] | |
On Startup (always): | |
* Check database state, fix anything weird (maybe server was shutdown prematurely) | |
* Handle workflow FIFOs... maybe just reconnect to them, maybe remove them | |
Database (db.go) | |
================ | |
Only called by the Kernel to persist data. | |
NewMapleDb(settings...) *MapleDb | |
db.GetJobStatus() | |
db.NewWorkflow() | |
db.NewJob() | |
db.SetWorkflowStatus() | |
Backend | |
======= | |
ONE backend per workflow. Backend and filesystem are married. Backends are identified by strings like "SGE+unixfs", "GCE", "AWS", "local". Backends need to know how to return Readers and Writers for all files | |
Optionally, some tasks can be run inline (locally), but only if certain criteria are met: | |
1) No File inputs/outputs | |
interface MapleBackend { | |
Run(cmd, fqn string) handle | |
Abort(h handle) | |
} | |
interface WdlFile { | |
Read(p []byte) (n int, err error) | |
Write(p []byte) (n int, err error) | |
} | |
NewLocalBackend(wf *WorkflowContext, settings...) *MapleBackend | |
backend.DbInit() error | |
backend.Run(job *JobContext) handle | |
backend.Abort(h handle) | |
backend.Results(h handle) []*WdlFile | |
backend.Status(h handle) string | |
backend.Wait(h handle) | |
backend.JobFile(job *JobContext, relpath string) *WdlFile | |
backend.File(abspath string) *WdlFile | |
Filesystem | |
========== | |
NewUnixFs(path) *FileSystem | |
NewGcsFs(path) *FileSystem | |
fs. | |
Command Line (cli.go) | |
============ | |
$ maple submit foo.wdl foo.inputs | |
cefd19cb-a8b1-474d-bf58-9c4522a5af98 | |
(Sends POST /submit) | |
$ maple tail cefd19cb | |
... tailed log lines ... | |
(Sends GET /fifo/cefd19cb) | |
$ maple run foo.wdl foo.inputs | |
(= submit + tail) | |
$ maple ls | |
cefd19cb foo.wdl running 2016-08-10T01:39:32+00:00 | |
93453875 bar.wdl submitted 2016-08-10T01:39:32+00:00 | |
9ce50dbf baz.wdl completed 2016-08-10T01:39:32+00:00 | |
(Sends GET /list) | |
$ maple show 93453875 | |
w.a running local 2016-08-10T01:39:32+00:00 | |
w.b running local 2016-08-10T01:39:32+00:00 | |
w.$scatter_0 completed - 2016-08-10T01:39:32+00:00 | |
(Sends GET /show/93453875) | |
$ maple abort cefd19cb | |
(Sends POST /abort/cefd19cb) | |
$ maple server --port=8765 | |
HTTP API (http.go) | |
======== | |
POST /submit | |
GET /fifo/:uuid | |
GET /list | |
GET /show/:uuid | |
POST /abort/:uuid |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"golang.org/x/net/context" | |
"sync" | |
"time" | |
) | |
var c = make(chan int) | |
func g(id string, ctx context.Context, wg *sync.WaitGroup) { | |
defer func() { | |
wg.Done() | |
fmt.Printf("%s exit\n", id) | |
}() | |
select { | |
case <-ctx.Done(): | |
fmt.Printf("%s cancel: %s\n", id, ctx.Err()) | |
case <-c: | |
fmt.Printf("shouldn't happen") | |
} | |
} | |
func f(id string, ctx context.Context, wg *sync.WaitGroup) { | |
//childCtx, cancel := context.WithTimeout(ctx, time.Second*2) | |
childCtx, cancel := context.WithCancel(ctx) | |
var childWg sync.WaitGroup | |
defer func() { | |
cancel() | |
childWg.Wait() | |
wg.Done() | |
fmt.Printf("%s exit\n", id) | |
}() | |
for i := 0; i < 2; i++ { | |
childWg.Add(1) | |
go g(fmt.Sprintf("%s-g%d", id, i), childCtx, &childWg) | |
} | |
select { | |
case <-ctx.Done(): | |
fmt.Printf("%s cancel: %s\n", id, ctx.Err()) | |
case <-c: | |
fmt.Printf("shouldn't happen") | |
} | |
} | |
func main() { | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
var wg sync.WaitGroup | |
for i := 0; i < 2; i++ { | |
wg.Add(1) | |
go f(fmt.Sprintf("f%d", i), ctx, &wg) | |
} | |
time.Sleep(time.Second * 5) | |
fmt.Println("cancel()") | |
cancel() | |
wg.Wait() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"errors" | |
_ "github.com/mattn/go-sqlite3" | |
"github.com/satori/go.uuid" | |
"strconv" | |
"strings" | |
"sync" | |
"time" | |
) | |
type MapleDb struct { | |
driverName string | |
dataSourceName string | |
log *Logger | |
db *sql.DB | |
mtx *sync.Mutex | |
} | |
func NewMapleDb(driverName, dataSourceName string, log *Logger) *MapleDb { | |
db, err := sql.Open(driverName, dataSourceName) | |
if err != nil { | |
panic(err) | |
} | |
var mtx sync.Mutex | |
dsp := &MapleDb{driverName, dataSourceName, log, db, &mtx} | |
dsp.setup() | |
return dsp | |
} | |
func (dsp *MapleDb) Close() { | |
// TODO: close dsp.db | |
} | |
func (dsp *MapleDb) tables() ([]string, error) { | |
query := "SELECT name FROM sqlite_master WHERE type='table';" | |
dsp.log.DbQuery(query) | |
rows, err := dsp.db.Query(query) | |
if err != nil { | |
return nil, err | |
} | |
defer rows.Close() | |
var tables []string | |
for rows.Next() { | |
var name string | |
err = rows.Scan(&name) | |
if err != nil { | |
return nil, err | |
} | |
tables = append(tables, name) | |
} | |
err = rows.Err() | |
if err != nil { | |
return nil, err | |
} | |
return tables, nil | |
} | |
func (dsp *MapleDb) query(query string) { | |
dsp.log.DbQuery(query) | |
_, err := dsp.db.Exec(query) | |
if err != nil { | |
panic(err) | |
} | |
} | |
func (dsp *MapleDb) setup() { | |
tableNames, err := dsp.tables() | |
if err != nil { | |
panic(err) | |
} | |
if !contains("workflow", tableNames) { | |
dsp.query(`CREATE TABLE workflow ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
uuid TEXT | |
);`) | |
} | |
if !contains("workflow_status", tableNames) { | |
dsp.query(`CREATE TABLE workflow_status ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
workflow_id INTEGER NOT NULL, | |
status TEXT, | |
date TEXT, | |
FOREIGN KEY(workflow_id) REFERENCES workflow(id) | |
);`) | |
} | |
if !contains("job", tableNames) { | |
dsp.query(`CREATE TABLE job ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
workflow_id INTEGER NOT NULL, | |
call_fqn TEXT, | |
shard INT, | |
attempt INT, | |
FOREIGN KEY(workflow_id) REFERENCES workflow(id) | |
);`) | |
} | |
if !contains("job_status", tableNames) { | |
dsp.query(`CREATE TABLE job_status ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
job_id INTEGER NOT NULL, | |
status TEXT, | |
date TEXT, | |
FOREIGN KEY(job_id) REFERENCES job(id) | |
);`) | |
} | |
if !contains("workflow_sources", tableNames) { | |
dsp.query(`CREATE TABLE workflow_sources ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
workflow_id INTEGER NOT NULL, | |
wdl TEXT, | |
inputs TEXT, | |
options TEXT, | |
FOREIGN KEY(workflow_id) REFERENCES workflow(id) | |
);`) | |
} | |
} | |
func (dsp *MapleDb) NewJob(wfCtx *WorkflowContext, node *Node, log *Logger) (*JobContext, error) { | |
dsp.mtx.Lock() | |
defer dsp.mtx.Unlock() | |
db := dsp.db | |
var success = false | |
var jobId int64 = -1 | |
tx, err := db.Begin() | |
defer func() { | |
if tx != nil { | |
if success { | |
tx.Commit() | |
} else { | |
tx.Rollback() | |
} | |
} | |
}() | |
if err != nil { | |
return nil, err | |
} | |
query := `INSERT INTO job (workflow_id, call_fqn, shard, attempt) VALUES (?, ?, ?, ?)` | |
log.DbQuery(query, strconv.FormatInt(wfCtx.primaryKey, 10), node.name, "0", "1") | |
res, err := tx.Exec(query, wfCtx.primaryKey, node.name, 0, 1) | |
if err != nil { | |
return nil, err | |
} | |
jobId, err = res.LastInsertId() | |
if err != nil { | |
return nil, err | |
} | |
rows, err := res.RowsAffected() | |
if err != nil { | |
return nil, err | |
} | |
if rows != 1 { | |
return nil, errors.New("could not insert into 'job' table") | |
} | |
now := time.Now().Format("2006-01-02 15:04:05.999") | |
query = `INSERT INTO job_status (job_id, status, date) VALUES (?, 'NotStarted', ?)` | |
log.DbQuery(query, strconv.FormatInt(jobId, 10), now) | |
res, err = tx.Exec(query, jobId, now) | |
if err != nil { | |
return nil, err | |
} | |
rows, err = res.RowsAffected() | |
if err != nil { | |
return nil, err | |
} | |
if rows != 1 { | |
return nil, errors.New("could not insert into 'job_status' table") | |
} | |
ctx := JobContext{jobId, node, 0, 1, "NotStarted"} | |
success = true | |
return &ctx, nil | |
} | |
func (dsp *MapleDb) SetJobStatus(jobCtx *JobContext, status string, log *Logger) (bool, error) { | |
dsp.mtx.Lock() | |
defer dsp.mtx.Unlock() | |
db := dsp.db | |
var nowISO8601 = time.Now().Format("2006-01-02 15:04:05.999") | |
var query = `INSERT INTO job_status (job_id, status, date) VALUES (?, ?, ?)` | |
log.DbQuery(query, strconv.FormatInt(jobCtx.primaryKey, 10), status, nowISO8601) | |
_, err := db.Exec(query, jobCtx.primaryKey, status, nowISO8601) | |
if err != nil { | |
return false, err | |
} | |
jobCtx.status = status | |
return true, nil | |
} | |
func (dsp *MapleDb) GetJobStatus(jobId int64, log *Logger) (string, error) { | |
dsp.mtx.Lock() | |
defer dsp.mtx.Unlock() | |
db := dsp.db | |
var query = `SELECT status FROM job_status WHERE job_id=? ORDER BY datetime(date) DESC, id DESC LIMIT 1` | |
log.DbQuery(query, strconv.FormatInt(jobId, 10)) | |
row := db.QueryRow(query, jobId) | |
var status string | |
err := row.Scan(&status) | |
if err != nil { | |
return "", err | |
} | |
return status, nil | |
} | |
func (dsp *MapleDb) NewWorkflow(uuid uuid.UUID, sources *WorkflowSources, log *Logger) (*WorkflowContext, error) { | |
dsp.mtx.Lock() | |
defer dsp.mtx.Unlock() | |
db := dsp.db | |
var success = false | |
var workflowId int64 = -1 | |
tx, err := db.Begin() | |
defer func() { | |
if tx != nil { | |
if success { | |
tx.Commit() | |
} else { | |
tx.Rollback() | |
} | |
} | |
}() | |
if err != nil { | |
return nil, err | |
} | |
query := `INSERT INTO workflow (uuid) VALUES (?)` | |
log.DbQuery(query, uuid.String()) | |
res, err := tx.Exec(query, uuid) | |
if err != nil { | |
return nil, err | |
} | |
workflowId, err = res.LastInsertId() | |
if err != nil { | |
return nil, err | |
} | |
rows, err := res.RowsAffected() | |
if err != nil { | |
return nil, err | |
} | |
if rows != 1 { | |
return nil, errors.New("could not insert into 'workflow' table") | |
} | |
query = `INSERT INTO workflow_sources (workflow_id, wdl, inputs, options) VALUES (?, ?, ?, ?)` | |
log.DbQuery(query, strconv.FormatInt(workflowId, 10), "{omit}", "{omit}", "{omit}") | |
res, err = tx.Exec(query, workflowId, sources.wdl, sources.inputs, sources.options) | |
if err != nil { | |
return nil, err | |
} | |
rows, err = res.RowsAffected() | |
if err != nil { | |
return nil, err | |
} | |
if rows != 1 { | |
return nil, errors.New("could not insert into 'workflow_sources' table") | |
} | |
now := time.Now().Format("2006-01-02 15:04:05.999") | |
query = `INSERT INTO workflow_status (workflow_id, status, date) VALUES (?, 'NotStarted', ?)` | |
log.DbQuery(query, strconv.FormatInt(workflowId, 10), now) | |
res, err = tx.Exec(query, workflowId, now) | |
if err != nil { | |
return nil, err | |
} | |
rows, err = res.RowsAffected() | |
if err != nil { | |
return nil, err | |
} | |
if rows != 1 { | |
return nil, errors.New("could not insert into 'workflow_status' table") | |
} | |
ctx := WorkflowContext{uuid, workflowId, make(chan *WorkflowContext, 1), sources, "NotStarted", nil} | |
success = true | |
return &ctx, nil | |
} | |
func (dsp *MapleDb) LoadWorkflow(uuid uuid.UUID, log *Logger) (*WorkflowContext, error) { | |
dsp.mtx.Lock() | |
defer dsp.mtx.Unlock() | |
db := dsp.db | |
var context WorkflowContext | |
context.uuid = uuid | |
context.done = make(chan *WorkflowContext) | |
query := `SELECT id FROM workflow WHERE uuid=?` | |
log.DbQuery(query, uuid.String()) | |
row := db.QueryRow(query, uuid) | |
err := row.Scan(&context.primaryKey) | |
if err != nil { | |
return nil, err | |
} | |
return dsp._LoadWorkflowSources(log, &context, context.primaryKey) | |
} | |
func (dsp *MapleDb) SetWorkflowStatus(wfId WorkflowIdentifier, status string, log *Logger) (bool, error) { | |
dsp.mtx.Lock() | |
defer dsp.mtx.Unlock() | |
db := dsp.db | |
var nowISO8601 = time.Now().Format("2006-01-02 15:04:05.999") | |
var query = `INSERT INTO workflow_status (workflow_id, status, date) VALUES (?, ?, ?)` | |
log.DbQuery(query, strconv.FormatInt(wfId.dbKey(), 10), status, nowISO8601) | |
_, err := db.Exec(query, wfId.dbKey(), status, nowISO8601) | |
if err != nil { | |
return false, err | |
} | |
return true, nil | |
} | |
func (dsp *MapleDb) GetWorkflowStatus(wfId WorkflowIdentifier, log *Logger) (string, error) { | |
dsp.mtx.Lock() | |
defer dsp.mtx.Unlock() | |
db := dsp.db | |
var query = `SELECT status FROM workflow_status WHERE workflow_id=? ORDER BY datetime(date) DESC, id DESC LIMIT 1` | |
log.DbQuery(query, strconv.FormatInt(wfId.dbKey(), 10)) | |
row := db.QueryRow(query, wfId.dbKey()) | |
var status string | |
err := row.Scan(&status) | |
if err != nil { | |
return "", err | |
} | |
return status, nil | |
} | |
func (dsp *MapleDb) GetWorkflowsByStatus(log *Logger, status ...string) ([]*WorkflowContext, error) { | |
dsp.mtx.Lock() | |
defer dsp.mtx.Unlock() | |
db := dsp.db | |
questionMarks := make([]string, len(status)) | |
for i := 0; i < len(status); i++ { | |
questionMarks[i] = "?" | |
} | |
var query = `SELECT workflow_id FROM (SELECT workflow_id, status, MAX(date) FROM workflow_status GROUP BY workflow_id) WHERE status IN (` + strings.Join(questionMarks, ", ") + `);` | |
log.DbQuery(query, status...) | |
queryParams := make([]interface{}, len(status)) | |
for i := range status { | |
queryParams[i] = status[i] | |
} | |
rows, err := db.Query(query, queryParams...) | |
if err != nil { | |
return nil, err | |
} | |
defer rows.Close() | |
var contexts []*WorkflowContext | |
for rows.Next() { | |
var id int64 | |
err = rows.Scan(&id) | |
if err != nil { | |
return nil, err | |
} | |
context, err := dsp._LoadWorkflowPK(log, id) | |
if err != nil { | |
return nil, err | |
} | |
contexts = append(contexts, context) | |
} | |
err = rows.Err() | |
if err != nil { | |
return nil, err | |
} | |
return contexts, nil | |
} | |
func (dsp *MapleDb) _GetWorkflowStatus(log *Logger, wfId WorkflowIdentifier) (string, error) { | |
db := dsp.db | |
var query = `SELECT status FROM workflow_status WHERE workflow_id=? ORDER BY datetime(date) DESC, id DESC LIMIT 1` | |
log.DbQuery(query, strconv.FormatInt(wfId.dbKey(), 10)) | |
row := db.QueryRow(query, wfId.dbKey()) | |
var status string | |
err := row.Scan(&status) | |
if err != nil { | |
return "", err | |
} | |
return status, nil | |
} | |
func (dsp *MapleDb) _LoadWorkflowPK(log *Logger, primaryKey int64) (*WorkflowContext, error) { | |
db := dsp.db | |
var context WorkflowContext | |
context.primaryKey = primaryKey | |
query := `SELECT uuid FROM workflow WHERE id=?` | |
log.DbQuery(query, strconv.FormatInt(primaryKey, 10)) | |
row := db.QueryRow(query, primaryKey) | |
err := row.Scan(&context.uuid) | |
if err != nil { | |
return nil, err | |
} | |
return dsp._LoadWorkflowSources(log, &context, primaryKey) | |
} | |
func (dsp *MapleDb) _LoadWorkflowSources(log *Logger, context *WorkflowContext, primaryKey int64) (*WorkflowContext, error) { | |
db := dsp.db | |
var sources WorkflowSources | |
var err error | |
context.done = make(chan *WorkflowContext) | |
context.status, err = dsp._GetWorkflowStatus(log, context) | |
if err != nil { | |
return nil, err | |
} | |
query := `SELECT wdl, inputs, options FROM workflow_sources WHERE workflow_id=?` | |
log.DbQuery(query, strconv.FormatInt(context.primaryKey, 10)) | |
row := db.QueryRow(query, context.primaryKey) | |
err = row.Scan(&sources.wdl, &sources.inputs, &sources.options) | |
if err != nil { | |
return nil, err | |
} | |
context.source = &sources | |
return context, nil | |
} | |
func contains(a string, list []string) bool { | |
for _, b := range list { | |
if b == a { | |
return true | |
} | |
} | |
return false | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"github.com/satori/go.uuid" | |
"testing" | |
) | |
func TestDbDispatcher(t *testing.T) { | |
var buf bytes.Buffer | |
log := NewLogger().ToWriter(&buf) | |
dsp := NewMapleDb("sqlite3", "testdb", log) | |
id := uuid.NewV4() | |
dsp.NewWorkflow(id, &WorkflowSources{"wdl", "inputs", "options"}, log) | |
wf := dsp.LoadWorkflow(id, log) | |
if wf.uuid != id { | |
t.Fatalf("Bad UUID") | |
} | |
if wf.status != "NotStarted" { | |
t.Fatalf("Bad Status") | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"strconv" | |
"strings" | |
"sync" | |
"syscall" | |
"time" | |
) | |
type NonBlockingFifo struct { | |
fp *os.File | |
path string | |
} | |
func NewNonBlockingFifo(path string) (*NonBlockingFifo, error) { | |
if _, err := os.Stat(path); os.IsNotExist(err) { | |
syscall.Mkfifo(path, 0777) | |
} | |
fifo := NonBlockingFifo{nil, path} | |
return &fifo, nil | |
} | |
func (fifo *NonBlockingFifo) Write(p []byte) (n int, err error) { | |
return fifo.fp.Write(p) | |
} | |
/* Give client `timeout` amount of time to connect and read or separate goroutine will connect */ | |
func (fifo *NonBlockingFifo) Unblock(timeout time.Duration) *NonBlockingFifo { | |
fifoOpen := make(chan error) | |
go func() { | |
fp, err := os.OpenFile(fifo.path, os.O_WRONLY, os.ModeNamedPipe) | |
if err == nil { | |
fifo.fp = fp | |
} | |
fifoOpen <- err | |
}() | |
select { | |
case err := <-fifoOpen: | |
if err != nil { | |
fmt.Printf("error opening (w) %s: %s\n", fifo.path, err) | |
return nil | |
} | |
case <-time.After(timeout): | |
reader, err := os.OpenFile(fifo.path, os.O_RDONLY, os.ModeNamedPipe) | |
if err != nil { | |
fmt.Printf("error opening (r) %s: %s\n", fifo.path, err) | |
return nil | |
} | |
reader.Close() | |
<-fifoOpen | |
} | |
return fifo | |
} | |
func (fifo *NonBlockingFifo) Close() { | |
fifo.fp.Close() | |
os.Remove(fifo.path) | |
} | |
func main() { | |
if os.Args[1] == "client" { | |
if os.Args[2] == "submit" { | |
resp, err := http.Get("http://localhost:8765/submit") | |
if err != nil { | |
log.Fatalf("Could not connect to server: %s", err) | |
} | |
defer resp.Body.Close() | |
body, err := ioutil.ReadAll(resp.Body) | |
fmt.Printf("/submit: %s\n", body) | |
} | |
if os.Args[2] == "attach" { | |
resp, err := http.Get(fmt.Sprintf("http://localhost:8765/attach?id=%s", os.Args[3])) | |
if err != nil { | |
log.Fatalf("Could not connect to server: %s", err) | |
} | |
defer resp.Body.Close() | |
fifoPath, err := ioutil.ReadAll(resp.Body) | |
fmt.Printf("/attach: %s\n", fifoPath) | |
fp, err := os.OpenFile(string(fifoPath), os.O_RDONLY, 0777) | |
if err != nil { | |
fmt.Printf("Error opening %s: %s\n", fifoPath, err) | |
} | |
tee := io.TeeReader(fp, os.Stdout) | |
ioutil.ReadAll(tee) | |
} | |
} | |
if os.Args[1] == "server" { | |
type Proc struct { | |
id int | |
fifo *NonBlockingFifo | |
log *Logger | |
} | |
logger := NewLogger().ToWriter(os.Stdout) | |
dots := strings.Repeat(".", 985) | |
procs := make(map[int]*Proc) | |
procsId := 0 | |
var procsMutex sync.Mutex | |
http.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) { | |
procsMutex.Lock() | |
defer procsMutex.Unlock() | |
reqId := procsId | |
procsId += 1 | |
proc := &Proc{reqId, nil, logger} | |
procs[reqId] = proc | |
go func(proc *Proc) { | |
for i := 0; i < 10; i++ { | |
proc.log.Info("%04d-%010d%s\n", proc.id, i, dots) | |
time.Sleep(time.Millisecond * 1000) | |
} | |
if proc.fifo != nil { | |
proc.fifo.Close() | |
proc.fifo = nil | |
} | |
procsMutex.Lock() | |
defer procsMutex.Unlock() | |
delete(procs, reqId) | |
}(proc) | |
fmt.Fprintf(w, "%d", reqId) | |
}) | |
http.HandleFunc("/attach", func(w http.ResponseWriter, r *http.Request) { | |
procsMutex.Lock() | |
defer procsMutex.Unlock() | |
reqId, _ := strconv.Atoi(r.URL.Query().Get("id")) | |
fifoName := fmt.Sprintf("fifo_%d", reqId) | |
val, ok := procs[reqId] | |
if !ok { | |
w.Header().Set("Content-Type", "text/plain; charset=utf-8") | |
w.WriteHeader(http.StatusInternalServerError) | |
io.WriteString(w, "ID does not exist") | |
return | |
} | |
if val.fifo == nil { | |
fifo, err := NewNonBlockingFifo(fifoName) | |
if err != nil { | |
w.Header().Set("Content-Type", "text/plain; charset=utf-8") | |
w.WriteHeader(http.StatusInternalServerError) | |
io.WriteString(w, "Could not create FIFO") | |
return | |
} | |
val.fifo = fifo | |
go func(reqId int) { | |
val.fifo.Unblock(time.Second * 2) | |
procsMutex.Lock() | |
defer procsMutex.Unlock() | |
procs[reqId].fifo = fifo | |
procs[reqId].log = procs[reqId].log.ToWriter(fifo) | |
}(reqId) | |
} | |
fmt.Fprintf(w, fifoName) | |
}) | |
log.Fatal(http.ListenAndServe(":8765", nil)) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[1]A[2] | |
[1]B[3,4] | |
[1]C[2] | |
[1]D[1] | |
[D,1]E[] | |
[D]F[0] | |
[F]G[] | |
[G]H[] | |
[G]I[] | |
[G]J[] | |
[G]K[] | |
[G]L[] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"fmt" | |
"github.com/satori/go.uuid" | |
"golang.org/x/net/context" | |
"gopkg.in/alecthomas/kingpin.v2" | |
"io" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"os/exec" | |
"os/signal" | |
"strings" | |
"sync" | |
"syscall" | |
"time" | |
) | |
type WorkflowDispatcher struct { | |
isAlive bool | |
maxWorkers int | |
submitChannel chan *WorkflowContext | |
submitChannelMutex *sync.Mutex | |
cancel func() | |
waitGroup *sync.WaitGroup | |
db *MapleDb | |
workflowMaxRuntime time.Duration | |
log *Logger | |
} | |
type WorkflowIdentifier interface { | |
dbKey() int64 | |
id() uuid.UUID | |
} | |
type WorkflowSources struct { | |
wdl string | |
inputs string | |
options string | |
} | |
type JobContext struct { | |
primaryKey int64 | |
node *Node | |
index int | |
attempt int | |
status string | |
} | |
func (ctx *JobContext) String() string { | |
return fmt.Sprintf("%s (%s)", ctx.node.name, ctx.status) | |
} | |
type WorkflowContext struct { | |
uuid uuid.UUID | |
primaryKey int64 | |
done chan *WorkflowContext | |
source *WorkflowSources | |
status string | |
calls []*JobContext | |
} | |
func (c WorkflowContext) id() uuid.UUID { | |
return c.uuid | |
} | |
func (c WorkflowContext) dbKey() int64 { | |
return c.primaryKey | |
} | |
func (s WorkflowSources) String() string { | |
return fmt.Sprintf("<workflow %s>", s.wdl) | |
} | |
func SubmitHttpEndpoint(wd *WorkflowDispatcher) http.HandlerFunc { | |
return func(w http.ResponseWriter, r *http.Request) { | |
fp, _, err := r.FormFile("wdl") | |
if err != nil { | |
w.Header().Set("Content-Type", "application/json; charset=utf-8") | |
w.WriteHeader(http.StatusBadRequest) | |
io.WriteString(w, `{"message": "no WDL file"}`) | |
return | |
} | |
var bytes, _ = ioutil.ReadAll(fp) | |
wdl := string(bytes) | |
fp, _, err = r.FormFile("inputs") | |
var inputs = "{}" | |
if err != nil { | |
bytes, _ = ioutil.ReadAll(fp) | |
inputs = string(bytes) | |
} | |
fp, _, err = r.FormFile("options") | |
var options = "{}" | |
if err != nil { | |
bytes, _ = ioutil.ReadAll(fp) | |
options = string(bytes) | |
} | |
sources := WorkflowSources{strings.TrimSpace(wdl), strings.TrimSpace(inputs), strings.TrimSpace(options)} | |
uuid := uuid.NewV4() | |
ctx, err := wd.db.NewWorkflow(uuid, &sources, wd.log) | |
if err != nil { | |
w.Header().Set("Content-Type", "application/json; charset=utf-8") | |
w.WriteHeader(http.StatusInternalServerError) | |
io.WriteString(w, fmt.Sprintf(`{"message": "could not persist worflow"}`, r)) | |
return | |
} | |
wd.log.Info("HTTP endpoint /submit/ received: %s\n", sources) | |
defer func() { | |
if r := recover(); r != nil { | |
w.Header().Set("Content-Type", "application/json; charset=utf-8") | |
w.WriteHeader(http.StatusInternalServerError) | |
io.WriteString(w, fmt.Sprintf(`{"message": "/submit/ panic: %s"}`, r)) | |
} | |
}() | |
select { | |
case wd.submitChannel <- ctx: | |
case <-time.After(time.Millisecond * 500): | |
w.Header().Set("Content-Type", "application/json; charset=utf-8") | |
w.WriteHeader(http.StatusRequestTimeout) | |
io.WriteString(w, `{"message": "timeout submitting workflow (500ms)"}`) | |
return | |
} | |
} | |
} | |
func (wd *WorkflowDispatcher) runJob(wfCtx *WorkflowContext, cmd *exec.Cmd, callCtx *JobContext, done chan<- *JobContext, jobCtx context.Context) { | |
var cmdDone = make(chan bool, 1) | |
var log = wd.log.ForJob(wfCtx.uuid, callCtx) | |
var isAborting = false | |
log.Info("runJob: enter") | |
defer log.Info("runJob: exit") | |
subprocessCtx, subprocessCancel := context.WithCancel(jobCtx) | |
wd.db.SetJobStatus(callCtx, "Started", log) | |
go func() { | |
select { | |
case <-time.After(time.Second * 2): | |
case <-subprocessCtx.Done(): | |
} | |
//cmd.Run() | |
cmdDone <- true | |
}() | |
/*var kill = func(status string) { | |
err := cmd.Process.Kill() | |
if err != nil { | |
panic(err) | |
} | |
}*/ | |
for { | |
select { | |
case <-cmdDone: | |
var status = "Done" | |
/*if cmd.ProcessState == nil { | |
status = "no-create" | |
} else if !cmd.ProcessState.Success() { | |
status = "failed" | |
}*/ | |
log.Info("runJob: done (status %s)", status) | |
wd.db.SetJobStatus(callCtx, status, log) | |
done <- callCtx | |
return | |
case <-jobCtx.Done(): | |
if !isAborting { | |
log.Info("runJob: abort") | |
subprocessCancel() | |
isAborting = true | |
} | |
} | |
} | |
} | |
func (wd *WorkflowDispatcher) runWorkflow(wfCtx *WorkflowContext, workflowResultsChannel chan<- *WorkflowContext, ctx context.Context) { | |
var log = wd.log.ForWorkflow(wfCtx.uuid) | |
log.Info("runWorkflow: start") | |
// TODO: push these two lines into SetWorkflowStatus() | |
wd.db.SetWorkflowStatus(wfCtx, "Started", log) | |
wfCtx.status = "Started" | |
var jobDone = make(chan *JobContext) | |
// TODO: get rid of jobAbort, set the cancellation function on the JobContext | |
var jobAbort = make(map[*Node]func()) | |
var jobAbortMutex sync.Mutex | |
var workflowDone = make(chan bool) | |
var calls = make(chan *Node, 20) | |
var isAborting = false | |
var doneCalls = make(chan *JobContext) | |
defer func() { | |
wfCtx.done <- wfCtx | |
close(wfCtx.done) | |
close(doneCalls) | |
workflowResultsChannel <- wfCtx | |
}() | |
var abortSubprocesses = func() { | |
for _, jobCloseFunc := range jobAbort { | |
jobCloseFunc() | |
} | |
wd.db.SetWorkflowStatus(wfCtx, "Aborted", log) | |
wfCtx.status = "Aborted" | |
isAborting = true | |
} | |
go func() { | |
reader := strings.NewReader(wfCtx.source.wdl) | |
graph := LoadGraph(reader) | |
for _, root := range graph.Root() { | |
calls <- root | |
} | |
for call := range doneCalls { | |
wfCtx.calls = append(wfCtx.calls, call) | |
jobAbortMutex.Lock() | |
delete(jobAbort, call.node) | |
jobAbortMutex.Unlock() | |
if len(wfCtx.calls) == len(graph.nodes) || (isAborting && len(jobAbort) == 0) { | |
workflowDone <- true | |
return | |
} else if !isAborting { | |
for _, nextCall := range graph.Downstream(call.node) { | |
calls <- nextCall | |
} | |
} | |
} | |
}() | |
for { | |
if isAborting { | |
select { | |
case <-workflowDone: | |
log.Info("workflow: completed") | |
if wfCtx.status != "Aborted" { | |
wfCtx.status = "Done" | |
wd.db.SetWorkflowStatus(wfCtx, "Done", log) | |
} | |
return | |
case call := <-jobDone: | |
log.Info("workflow: subprocess finished: %s", call.status) | |
doneCalls <- call | |
} | |
} else { | |
select { | |
case call := <-calls: | |
log.Info("workflow: launching call: %s", call) | |
jobAbortMutex.Lock() | |
jobCtx, cancel := context.WithCancel(context.Background()) | |
jobAbort[call] = cancel | |
job, err := wd.db.NewJob(wfCtx, call, log) | |
if err != nil { | |
// TODO: don't panic! | |
panic(fmt.Sprintf("Couldn't persist job: %s", err)) | |
} | |
go wd.runJob(wfCtx, exec.Command("sleep", "2"), job, jobDone, jobCtx) | |
jobAbortMutex.Unlock() | |
case <-workflowDone: | |
log.Info("workflow: completed") | |
wfCtx.status = "Done" | |
wd.db.SetWorkflowStatus(wfCtx, "Done", log) | |
return | |
case call := <-jobDone: | |
log.Info("workflow: subprocess finished: %s", call.status) | |
doneCalls <- call | |
case <-ctx.Done(): | |
// this is for cancellations AND timeouts | |
log.Info("workflow: aborting...") | |
abortSubprocesses() | |
jobAbortMutex.Lock() | |
if len(jobAbort) == 0 { | |
jobAbortMutex.Unlock() | |
return | |
} | |
jobAbortMutex.Unlock() | |
} | |
} | |
} | |
} | |
func (wd *WorkflowDispatcher) runDispatcher(ctx context.Context) { | |
var workers = 0 | |
var isAborting = false | |
var workflowDone = make(chan *WorkflowContext) | |
var workflowAbort = make(map[string]func()) | |
var runningWorkflows = make(map[string]*WorkflowContext) | |
var log = wd.log | |
log.Info("dispatcher: enter") | |
defer func() { | |
wd.waitGroup.Done() | |
log.Info("dispatcher: exit") | |
}() | |
var abort = func() { | |
wd.submitChannelMutex.Lock() | |
close(wd.submitChannel) | |
wd.isAlive = false | |
wd.submitChannelMutex.Unlock() | |
isAborting = true | |
for _, wfCancelFunc := range workflowAbort { | |
wfCancelFunc() | |
} | |
} | |
var processDone = func(result *WorkflowContext) { | |
log.Info("dispatcher: workflow %s finished: %s", result.uuid, result.status) | |
delete(workflowAbort, fmt.Sprintf("%s", result.uuid)) | |
delete(runningWorkflows, fmt.Sprintf("%s", result.uuid)) | |
workers-- | |
} | |
for { | |
if isAborting { | |
if len(runningWorkflows) == 0 { | |
return | |
} | |
select { | |
case d := <-workflowDone: | |
processDone(d) | |
} | |
} else if workers < wd.maxWorkers { | |
select { | |
case wfContext := <-wd.submitChannel: | |
workers++ | |
runningWorkflows[fmt.Sprintf("%s", wfContext.uuid)] = wfContext | |
workflowCtx, workflowCancel := context.WithTimeout(ctx, wd.workflowMaxRuntime) | |
workflowAbort[fmt.Sprintf("%s", wfContext.uuid)] = workflowCancel | |
log.Info("dispatcher: starting %s", wfContext.uuid) | |
go wd.runWorkflow(wfContext, workflowDone, workflowCtx) | |
case d := <-workflowDone: | |
processDone(d) | |
case <-ctx.Done(): | |
abort() | |
} | |
} else { | |
select { | |
case d := <-workflowDone: | |
processDone(d) | |
case <-ctx.Done(): | |
abort() | |
} | |
} | |
} | |
} | |
func NewWorkflowDispatcher(workers int, buffer int, log *Logger, db *MapleDb) *WorkflowDispatcher { | |
var waitGroup sync.WaitGroup | |
var mutex sync.Mutex | |
dispatcherCtx, dispatcherCancel := context.WithCancel(context.Background()) | |
mutex.Lock() | |
defer mutex.Unlock() | |
wd := &WorkflowDispatcher{ | |
true, | |
workers, | |
make(chan *WorkflowContext, buffer), | |
&mutex, | |
dispatcherCancel, | |
&waitGroup, | |
db, | |
time.Second * 600, | |
log} | |
waitGroup.Add(1) | |
go wd.runDispatcher(dispatcherCtx) | |
return wd | |
} | |
func (wd *WorkflowDispatcher) Abort() { | |
if !wd.isAlive { | |
return | |
} | |
wd.cancel() | |
wd.Wait() | |
} | |
func (wd *WorkflowDispatcher) Wait() { | |
wd.waitGroup.Wait() | |
} | |
func (wd *WorkflowDispatcher) IsAlive() bool { | |
wd.submitChannelMutex.Lock() | |
defer wd.submitChannelMutex.Unlock() | |
return wd.isAlive | |
} | |
func (wd *WorkflowDispatcher) SubmitWorkflow(wdl, inputs, options string, id uuid.UUID) (*WorkflowContext, error) { | |
sources := WorkflowSources{strings.TrimSpace(wdl), strings.TrimSpace(inputs), strings.TrimSpace(options)} | |
log := wd.log.ForWorkflow(id) | |
ctx, err := wd.db.NewWorkflow(id, &sources, log) | |
if err != nil { | |
return nil, err | |
} | |
wd.SubmitExistingWorkflow(ctx) | |
return ctx, nil | |
} | |
func (wd *WorkflowDispatcher) SubmitExistingWorkflow(ctx *WorkflowContext) error { | |
wd.submitChannelMutex.Lock() | |
defer wd.submitChannelMutex.Unlock() | |
if wd.isAlive == true { | |
wd.submitChannel <- ctx | |
} else { | |
return errors.New("workflow submission is closed") | |
} | |
return nil | |
} | |
func (wd *WorkflowDispatcher) AbortWorkflow(id uuid.UUID) { | |
return | |
} | |
func SignalHandler(wd *WorkflowDispatcher) { | |
sigs := make(chan os.Signal, 1) | |
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) | |
go func(wd *WorkflowDispatcher) { | |
sig := <-sigs | |
wd.log.Info("%s signal detected... aborting dispatcher", sig) | |
wd.Abort() | |
wd.log.Info("%s signal detected... aborted dispatcher", sig) | |
os.Exit(130) | |
}(wd) | |
} | |
type Kernel struct { | |
wd *WorkflowDispatcher | |
log *Logger | |
db *MapleDb | |
} | |
func NewKernel(log *Logger, dbName string, dbConnection string, concurrentWorkflows int, submitQueueSize int) *Kernel { | |
db := NewMapleDb(dbName, dbConnection, log) | |
wd := NewWorkflowDispatcher(concurrentWorkflows, submitQueueSize, log, db) | |
SignalHandler(wd) | |
return &Kernel{wd, log, db} | |
} | |
func (kernel *Kernel) RunWorkflow(wdl, inputs, options string, id uuid.UUID) *WorkflowContext { | |
ctx, err := kernel.wd.SubmitWorkflow(wdl, inputs, options, id) | |
if err != nil { | |
return nil | |
} | |
return <-ctx.done | |
} | |
func (kernel *Kernel) SubmitWorkflow(wdl, inputs, options string, id uuid.UUID) *WorkflowContext { | |
return nil | |
} | |
func (kernel *Kernel) AbortWorkflow(uuid uuid.UUID) error { | |
return nil | |
} | |
func (kernel *Kernel) ListWorkflows() []uuid.UUID { | |
return nil | |
} | |
func (kernel *Kernel) Shutdown() { | |
} | |
func main() { | |
var ( | |
app = kingpin.New("myapp", "A workflow engine") | |
queueSize = app.Flag("queue-size", "Submission queue size").Default("1000").Int() | |
concurrentWf = app.Flag("concurrent-workflows", "Number of workflows").Default("1000").Int() | |
logPath = app.Flag("log", "Path to write logs").Default("maple.log").String() | |
restart = app.Command("restart", "Restart workflows") | |
run = app.Command("run", "Run workflows") | |
runGraph = run.Arg("wdl", "Graph file").Required().String() | |
runN = run.Arg("count", "Number of instances").Required().Int() | |
server = app.Command("server", "Start HTTP server") | |
) | |
args, err := app.Parse(os.Args[1:]) | |
log := NewLogger().ToFile(*logPath).ToWriter(os.Stdout) | |
engine := NewKernel(log, "sqlite3", "DB", *concurrentWf, *queueSize) | |
switch kingpin.MustParse(args, err) { | |
case restart.FullCommand(): | |
restartableWorkflows, _ := engine.db.GetWorkflowsByStatus(log, "Aborted", "NotStarted", "Started") | |
var restartWg sync.WaitGroup | |
for _, restartableWfContext := range restartableWorkflows { | |
fmt.Printf("restarting %s\n", restartableWfContext.uuid) | |
restartWg.Add(1) | |
go func(ctx *WorkflowContext) { | |
engine.wd.SubmitExistingWorkflow(ctx) | |
<-ctx.done | |
restartWg.Done() | |
}(restartableWfContext) | |
} | |
restartWg.Wait() | |
case run.FullCommand(): | |
var wg sync.WaitGroup | |
for i := 0; i < *runN; i++ { | |
wg.Add(1) | |
go func() { | |
contents, err := ioutil.ReadFile(*runGraph) | |
if err != nil { | |
// TODO: don't panic | |
panic(err) | |
} | |
id := uuid.NewV4() | |
ctx := engine.RunWorkflow(string(contents), "inputs", "options", id) | |
if ctx != nil { | |
engine.log.Info("Workflow Complete: %s (status %s)", id, ctx.status) | |
} else { | |
engine.log.Info("Workflow Incomplete") | |
} | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
case server.FullCommand(): | |
log.Info("Listening on :8000 ...") | |
http.HandleFunc("/submit", SubmitHttpEndpoint(engine.wd)) | |
http.ListenAndServe(":8000", nil) | |
} | |
engine.wd.Abort() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"github.com/satori/go.uuid" | |
"strings" | |
"testing" | |
) | |
func testEngine(buf *bytes.Buffer) *Engine { | |
log := NewLogger().ToWriter(buf) | |
return NewEngine(log, 1, 1) | |
} | |
func TestStartDispatcher(t *testing.T) { | |
var buf bytes.Buffer | |
engine := testEngine(&buf) | |
if !engine.wd.IsAlive() { | |
t.Fatalf("Expecting the dispatcher to be alive after starting it") | |
} | |
} | |
func TestCreateWorkflow(t *testing.T) { | |
var buf bytes.Buffer | |
log := NewLogger().ToWriter(&buf) | |
db := NewMapleDb("sqlite3", "DBfortest", log) | |
ctx := db.NewWorkflow(uuid.NewV4(), &WorkflowSources{"wdl", "inputs", "options"}, log) | |
if db.GetWorkflowStatus(ctx, log) != "NotStarted" { | |
t.Fatalf("Expecting workflow in NotStarted state") | |
} | |
db.SetWorkflowStatus(ctx, "Started", log) | |
if db.GetWorkflowStatus(ctx, log) != "Started" { | |
t.Fatalf("Expecting workflow in Started state") | |
} | |
} | |
func TestRunWorkflow(t *testing.T) { | |
t.Parallel() | |
var buf bytes.Buffer | |
engine := testEngine(&buf) | |
context := engine.RunWorkflow("wdl", "inputs", "options", uuid.NewV4()) | |
if context.status != "Done" { | |
t.Fatalf("Expecting workflow status to be 'Done'") | |
} | |
if !strings.Contains(buf.String(), "Workflow Completed") { | |
t.Fatalf("Expecting a 'Workflow Completed' message") | |
} | |
} | |
func TestRunWorkflow2(t *testing.T) { | |
t.Parallel() | |
var buf bytes.Buffer | |
engine := testEngine(&buf) | |
context := engine.RunWorkflow("wdl", "inputs", "options", uuid.NewV4()) | |
if context.status != "Done" { | |
t.Fatalf("Expecting workflow status to be 'Done'") | |
} | |
if !strings.Contains(buf.String(), "Workflow Completed") { | |
t.Fatalf("Expecting a 'Workflow Completed' message") | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"github.com/satori/go.uuid" | |
"io" | |
"io/ioutil" | |
"os" | |
"strings" | |
"sync" | |
"time" | |
) | |
type Logger struct { | |
prefix string | |
writer io.Writer | |
mutex *sync.Mutex | |
wfLogsPath string | |
callLogsPath string | |
logQueries bool | |
} | |
func NewLogger() *Logger { | |
var mutex sync.Mutex | |
return &Logger{"", ioutil.Discard, &mutex, "", "", true} | |
} | |
func (log *Logger) ToFile(path string) *Logger { | |
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) | |
if err != nil { | |
panic(fmt.Sprintf("Failed to open log file %s: %s", path, err)) | |
} | |
log.writer = io.MultiWriter(log.writer, file) | |
return log | |
} | |
func (log *Logger) ToWriter(writer io.Writer) *Logger { | |
w := io.MultiWriter(log.writer, writer) | |
return &Logger{log.prefix, w, log.mutex, log.wfLogsPath, log.callLogsPath, log.logQueries} | |
} | |
func (log *Logger) ForWorkflow(uuid uuid.UUID) *Logger { | |
prefix := fmt.Sprintf("[%s] ", uuid.String()[:8]) | |
return &Logger{prefix, log.writer, log.mutex, log.wfLogsPath, log.callLogsPath, log.logQueries} | |
} | |
func (log *Logger) ForJob(uuid uuid.UUID, job *JobContext) *Logger { | |
prefix := fmt.Sprintf("[%s:%s] ", uuid.String()[:8], job.node.name) | |
return &Logger{prefix, log.writer, log.mutex, log.wfLogsPath, log.callLogsPath, log.logQueries} | |
} | |
func (log *Logger) Info(format string, args ...interface{}) { | |
log.mutex.Lock() | |
defer log.mutex.Unlock() | |
now := time.Now().Format("2006-01-02 15:04:05.999") | |
fmt.Fprintf(log.writer, now+" "+log.prefix+fmt.Sprintf(format, args...)+"\n") | |
} | |
func (log *Logger) DbQuery(query string, args ...string) { | |
if log.logQueries { | |
log.Info("[QUERY] %s [ARGS] "+strings.Join(args, ", "), query) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
clean: | |
-rm DB kernel maple.log | |
compile: | |
go build kernel.go db.go log.go parse.go |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
https://cloud.google.com/genomics/reference/rest/v1alpha2/pipelines?authuser=1 | |
https://cloud.google.com/genomics/reference/rest/v1alpha2/operations |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"io" | |
"io/ioutil" | |
"regexp" | |
"strings" | |
) | |
type Graph struct { | |
nodes []*Node | |
} | |
func (g *Graph) Add(node *Node) { | |
g.nodes = append(g.nodes, node) | |
} | |
type Node struct { | |
in []string | |
name string | |
out []string | |
} | |
func (n *Node) String() string { | |
return fmt.Sprintf("[Node name=%s in=%s out=%s]", n.name, n.in, n.out) | |
} | |
func (g *Graph) Find(name string) *Node { | |
for _, node := range g.nodes { | |
if node != nil && node.name == name { | |
return node | |
} | |
} | |
return nil | |
} | |
func (g *Graph) Root() []*Node { | |
root := make([]*Node, 0) | |
for _, node := range g.nodes { | |
if len(g.Upstream(node)) == 0 { | |
root = append(root, node) | |
} | |
} | |
return root | |
} | |
func (g *Graph) Upstream(n *Node) []*Node { | |
upstream := make([]*Node, 0) | |
for _, input := range n.in { | |
for _, node := range g.nodes { | |
if node != nil && node.name == input { | |
upstream = append(upstream, node) | |
} | |
} | |
} | |
return upstream | |
} | |
func (g *Graph) Downstream(n *Node) []*Node { | |
downstream := make([]*Node, 0) | |
for _, node := range g.nodes { | |
for _, node2 := range g.Upstream(node) { | |
if node2 == n { | |
downstream = append(downstream, node) | |
} | |
} | |
} | |
return downstream | |
} | |
func LoadGraph(reader io.Reader) *Graph { | |
bytes, _ := ioutil.ReadAll(reader) | |
lines := strings.Split(string(bytes), "\n") | |
graph := Graph{make([]*Node, 0)} | |
for _, line := range lines { | |
if len(line) == 0 { | |
continue | |
} | |
re := regexp.MustCompile("\\[([a-zA-Z0-9,]*)\\]([a-zA-Z0-9]+)\\[([a-zA-Z0-9,]*)\\]") | |
parsed := re.FindStringSubmatch(line) | |
in := strings.Split(parsed[1], ",") | |
name := parsed[2] | |
out := strings.Split(parsed[3], ",") | |
node := Node{in, name, out} | |
graph.Add(&node) | |
} | |
return &graph | |
} | |
func main2() { | |
reader := strings.NewReader(` | |
[1]A[2] | |
[1]B[3,4] | |
[1]C[2] | |
[1]D[1] | |
[D,1]E[] | |
[D]F[0] | |
[F]G[]`) | |
g := LoadGraph(reader) | |
fmt.Println(g.Find("A")) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment