An ETL service is comprised of this stuff:
- IO - network calls
- transformations - computations
The IO stuff has two variants:
- extract - grabbing data from elsewhere
- load - saving data elsewhere
It's called ETL because of the order of these operations: extract, transform, load. It's a thing: https://en.wikipedia.org/wiki/Extract,_transform,_load This terminology is annoying. I like load, compute, save. Let's do that.
An LCS service is comprised of this stuff:
- load - grab data from elsewhere (IO bound)
- compute - produce new data from the data (CPU bound)
- save - save the new data elsewhere (IO bound)
This seems pretty simple.
Simple LCS logic is straightforward: there is IO bound work at the beginning (load), CPU bound work in the middle (compute), and IO bound work at the end (save). IO-CPU-IO. It's all one function that runs sequentially.
Doing this violates a pretty fundamental separation of concern. The unit of work is a mix of IO and CPU. If the unit of work is comprised of both IO and CPU bound work, then the unit is non-deterministic in these senses:
- you don't know how long it will take
- you don't know how hard it will work
AppEngine can scale off of CPU declaratively. This scaling is super simple, requires no code, and so is desireable. CPU scaling has thus become a requirement. This requirement means that our pipeline services must be capable of hitting a CPU 'target'. Example targets:
- 30% CPU over 5 minutes
- 100% CPU over 20 minutes
But you can't know how hard a simple LCS function will work. This is because of IO in the unit of work. The network isn't predictable. The network doesn't care about time complexities, and scoffs at your assumptions of repeatability. You silly sequentist. Here's a simple LCS 'job':
no_compute_workers = 4
batch_size = 8
op. | load compute save
----------------------------------------
time | 0.2s 0.08s 0.1sBased on this specification, we can figure out a suitable CPU target for scaling by tweaking batch size. That is, if everything remains exactly the same, then we are a-ok. But it doesn't. The load and save ops may take longer, in which case a job will suddenly spend more time waiting on IO than delighting the CPU. The idea of a 'job' being a sequential mix of IO and CPU doesn't work.
It's just code.(tm)
Let's get all CSP up in here. Pretend the pipeline program is comprised of many smaller programs. These programs all run in their own loops. Some of them are IO bound, others are CPU bound. They talk to one another via thread-safe and process-safe queues. You can think of these queues as channels (if you're in to that). You can think of the smaller programs as goroutines (if you're in to that). Exception-handling code elided:
async def load_process(load_queue, compute_queue, load):
"""Hi, I'm a 'program' and I run forever"""
while True:
work = await load_queue.get() # suspends
load(work, compute_queue) # fire and forget
def compute_process(compute_queue, save_queue, compute):
"""Hi, I'm a 'program' and I run forever"""
while True:
work = compute_queue.get() # blocks
with register_as_working(): # a multiprocessing increment/decrement
new_data = compute(data)
save_queue.put(new_data)
async def save_process(save_queue, done_queue, save):
"""Hi, I'm a 'program' and I run forever"""
while True:
work = await save_queue.get() # suspends
save(work, done_queue) # fire and forgetHere is 'load' and 'save', and 'compute':
async def load(work, compute_queue):
data = await network_calls(work)
compute_queue.put(data)
def compute(data):
"..."
# return result
async def save(work, done_queue):
result = await network_calls(work)
done_queue.put(result)There's another looping program that feeds this, and it's where we shove the complexity (ossify our complications). It's the best spot ever for bugs:
async def manage_process(state):
"""Hi, I'm a 'program' and I run forever"""
while True:
# sufficient work is pending
if state.queues['compute'].size() >= state.no_compute_processes:
# decrease batch size, increase sleep time
state.backoff()
await state.sleep()
continue
# sufficient work is running
if state.no_active_compute_processes == state.no_compute_processes:
# decrease batch size, increase sleep time
state.backoff()
await state.sleep()
continue
# batch of tasks, min = state.no_compute_processes
tasks = await state.get_tasks()
if not tasks:
# decrease batch size, increase sleep time
state.backoff()
await state.sleep()
continue
for task in tasks:
state.queues['load'].put(task) # fire and forget
# increase batch size, reset sleep time
state.backon() # bacon?
await state.sleep()The compute processes are still bounded. We say up front how many we want. But the IO processes are effectively unbounded - they inflate/deflate as necessary. This allows them to adjust based on current network conditions. We grab as much work as we can to keep the CPU bound processes busy by ratcheting (batch size, sleep time) up and down.