-
-
Save taras/17ff09d96115d78a53b2734d73a3bb86 to your computer and use it in GitHub Desktop.
What is the relationship between tasks and execution trees?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { execute, call } from 'effection'; | |
// One of the things that struck me was that when I was doing the | |
// implementation for the `enqueue` function in the task constraining | |
// functions | |
// (https://gist.github.com/cowboyd/19122da0c59c674cdea86cf6d70e9c75), | |
// was the eventual form it took. | |
// | |
// Like the other task constructors, it returned a function. Let's | |
// call it the "performance" operation. Unlike the others, the performance | |
// operation seemed really unique and cool. It doesn't really have any | |
// logic in of itself, but only resumes execution of a loop that forks | |
// a sub-process. Here it is again. | |
export function enqueue(proc) { | |
let loop = execute(function*() { | |
let tail = function*() {}; | |
while (true) { | |
let args = yield; | |
// implementation | |
tail = this.fork(function*(prior) { | |
yield prior; | |
yield call(proc, ...args); | |
}, [tail]); | |
} | |
}); | |
// performance funtion | |
return function(...args) { | |
loop.resume(args); | |
}; | |
} | |
// The unique shape of this performance function seemed so generic | |
// that it made me wonder if I could re-write the other, more trivial | |
// task modifiers using this structure. It turns out that you | |
// can. If you model each one as an infinite loop that forks off a | |
// subprocess with each performance. | |
// | |
// This is what it looked like before: | |
export function throttleBefore(proc, maxConcurrency) { | |
let concurrency = 0; | |
return function(...args) { | |
if (concurrency < maxConcurrency) { | |
concurrency++; | |
execute(function*() { | |
try { | |
yield call(proc, ...args); | |
} finally { | |
concurrency--; | |
} | |
}); | |
} | |
}; | |
} | |
// And this is what it looks like after. | |
export function throttle(proc, maxConcurrency) { | |
let loop = execute(function*() { | |
// local state | |
let concurrency = 0; | |
while (true) { | |
// props for this invocation | |
let args = yield; | |
this.fork(function*() { | |
if (concurrency < maxConcurrency) { | |
try { | |
concurrency++; | |
yield call(proc, ...args); | |
} finally { | |
concurrency--; | |
} | |
} | |
}); | |
} | |
}); | |
// performance function | |
return (...args) => loop.resume(args); | |
} | |
// By moving all of the logit out of the performance function, we make | |
// the overall complexity of the function more complex, but the | |
// trade-off is that we can get our performance function for free, and | |
// it's clear where to slot in the logic: it's just the generator that | |
// gets forked. | |
// Here is the restart modelled the same way: | |
export function restartBefore(proc) { | |
// state | |
let current = { halt() {} }; | |
//performance function | |
return function(...args) { | |
//logic | |
current.halt(); | |
current = execute(proc, ...args); | |
}; | |
} | |
//after | |
export function restart(proc) { | |
let loop = execute(function*() { | |
// state | |
let current = { halt() {} }; | |
while (true) { | |
let args = yield; | |
this.fork(function() { | |
//logic | |
current.halt(); | |
current = this.fork(proc, args); | |
}); | |
} | |
}); | |
// performance function | |
return (...args) => loop.resume(args); | |
} | |
// Again, the bits of the task fit into the familiar slots. | |
// | |
// For completeness the drop task constructor: | |
//before | |
export function dropBefore(proc) { | |
// state | |
let current = { isPending: false }; | |
// performance | |
return function(...args) { | |
if (!current.isPending) { | |
current = execute(proc, ...args); | |
} | |
}; | |
} | |
//after | |
export function drop(proc) { | |
let loop = execute(function*() { | |
let current = { isPending: false }; | |
while (true) { | |
let args = yield; | |
this.fork(function() { | |
if (!current.isPending) { | |
current = this.fork(proc, args); | |
} | |
}); | |
} | |
}); | |
return (...args) => loop.resume(args); | |
} | |
// By now, we can see enough of a pattern to pose the question: what | |
// if a Task as we know it, is really just an execution tree "lifted" | |
// into the context of an infinite loop? We have enough information to | |
// take a stab at a `Task` class. | |
export class Task { | |
constructor(proc) { | |
this.proc = proc; | |
this.loop = execute(function*() { | |
while (true) { | |
let args = yield; | |
this.fork(proc, args); | |
} | |
}); | |
} | |
perform(...args) { | |
this.loop.resume(args); | |
} | |
throttle(maxConcurrency) { | |
let concurrency = 0; | |
return this.flatMap(proc => function*(...args) { | |
if (concurrency < maxConcurrency) { | |
try { | |
concurrency++; | |
yield call(proc, ...args); | |
} finally { | |
concurrency--; | |
} | |
} | |
}); | |
} | |
flatMap(sequence) { | |
let next = sequence(this.proc); | |
if (next instanceof Task) { | |
return next; | |
} else { | |
return new Task(next); | |
} | |
} | |
} | |
// We have a quasi-composable promitive in that now the main execution | |
// is always at the top level, instead of hidden somewhere inside of a | |
// performance function, and we can always derive new tasks using the | |
// flatMap function for sequencing tasks. | |
// | |
// Another alternative to flatmapping is to use the JavaScript | |
// pipeline operation when it becomes available. This pipeline is | |
// meaningless, but demonstrates what it might look like: | |
// let perfom = Task(countdownFrom |> throttle(2) |> enqueue |> drop); | |
// perform(10); | |
// I don't know if this is on the right track, but it seems like a | |
// step forward. I'm wonderisng how each task might be composed into a | |
// greater tree. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment