Last active
January 28, 2020 14:50
-
-
Save Niriel/34358dd85588b8cf9aa9c57b54e4d917 to your computer and use it in GitHub Desktop.
Pause/resume/load/save a long data analysis process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pickle | |
import time | |
import tkinter as tk | |
from dataclasses import dataclass | |
from enum import Enum | |
from functools import partial | |
from multiprocessing import Process | |
from multiprocessing.connection import Connection, Pipe | |
from pathlib import Path | |
from typing import List, BinaryIO | |
# ****************************************************************************** | |
# ************************* ADAPT THIS TO YOUR NEEDS ************************* | |
# ****************************************************************************** | |
# Frozen data. | |
# ============ | |
# This is your initial data, the raw data that comes from the outside world and | |
# does not need to change. | |
@dataclass | |
class FrozenData: | |
table: List[List[float]] | |
scale_factor: float | |
# Working data. | |
# ============= | |
# This is the temporary data that you need during your processing. Is is the | |
# data that you save on the hard drive when you need to unplug your laptop. | |
# Do not copy the frozen data here. | |
@dataclass | |
class WorkingData: | |
row_id: int # The next task to accomplish. | |
so_far: List # Cumulated result of the analysis. | |
# So here I use a "List" for ``so_far``. Because that's the free monoid, the | |
# most general one there is, the monoid that does nothing but putting stuff | |
# together. It's initialized in ``init_working_data`` as ``[]``, and updated | |
# after each completed task by ``+`` in ``update_working_data``. | |
# | |
# You don't have to use [] and +, you can use any other monoid. Just do | |
# some fold/reduce on the fly if you want, that will save memory. Or just | |
# keep the last value. Replacing a value by another is also a monoid. | |
# Final data. | |
# =========== | |
# This is what you want. Maybe it's equal to the ``so_far`` of the working | |
# data, but if you need to do more processing in the end (like take medians | |
# or stuff that aren't easy to do on the fly) then FinalData will collect the | |
# real good stuff. | |
@dataclass | |
class FinalData: | |
n: int # We analyzed all that just to get ONE int out? Brilliant. | |
def create_mock_frozen_data() -> FrozenData: | |
import random | |
rnd = random.Random(x=0) # x is the seed, I want reproducibility | |
# 10000 rows of 10 columns of crap. | |
table = [ | |
[rnd.random() for c in range(10)] | |
for r in range(10000) | |
] | |
return FrozenData(table=table, scale_factor=10.0) | |
# Create and update data. | |
# ======================= | |
def init_working_data(_: FrozenData) -> WorkingData: | |
# Maybe creating the working data requires info from the frozen data. | |
# I can't think of why, but you've got the option. | |
return WorkingData( | |
row_id=0, # Next task to perform. | |
so_far=[] # Monoid identity. | |
) | |
def update_working_data(fd: FrozenData, wd: WorkingData) -> WorkingData: | |
# Find out what our next task is. | |
row_id = wd.row_id | |
# Get a handle on the frozen data that's relevant for this task. | |
row = fd.table[row_id] | |
# Do the actual analysis. | |
row_result = ( | |
# Throw in a time stamp, just in case for performance analysis later. | |
# Also would reveal if we've paused the processing. | |
# Yes it's not pure, sue me. | |
datetime.datetime.utcnow(), | |
# Here comes the serious analysis :D. | |
len(row), | |
min(row) * fd.scale_factor, | |
max(row) * fd.scale_factor, | |
) | |
# Pretend it's hard so you have time to test the command line. | |
time.sleep(0.1) # TODO PLEASE REMOVE THAT | |
# Merge the result for this task with the previous results. | |
return WorkingData( | |
row_id=row_id + 1, | |
so_far=wd.so_far + [row_result] # Monoid operation. | |
) | |
def create_final_data(wd: WorkingData) -> FinalData: | |
# I hope you want to do more with your data than just counting | |
# how many rows it had. | |
return FinalData(len(wd.so_far)) | |
# ****************************************************************************** | |
# ***************************** COMMON MACHINERY ***************************** | |
# ****************************************************************************** | |
# Save and load Working and Final data. | |
# ===================================== | |
def data_save(f: BinaryIO, obj): | |
pickle.dump(obj, f) | |
def data_load(f: BinaryIO): | |
return pickle.load(f) | |
class SlaveState(Enum): | |
WORKING = 'working' | |
RESTING = 'resting' | |
class Commands(Enum): | |
WORK = 'work' | |
REST = 'rest' | |
LOAD = 'load' | |
SAVE = 'save' | |
ABORT = 'abort' | |
REPORT = 'report' | |
# Slave process: the process that does the actual work. | |
# ===================================================== | |
SLEEP_PERIOD = 0.1 # second. | |
def slave_loop(final_save_path: Path, | |
tmp_save_path: Path, | |
commands: Connection, | |
progress: Connection, | |
fd: FrozenData, | |
wd: WorkingData): | |
state = SlaveState.RESTING | |
while True: | |
# Listen to commands and obey them. | |
while commands.poll(): | |
cmd = commands.recv() | |
if cmd == Commands.WORK: | |
state = SlaveState.WORKING | |
elif cmd == Commands.REST: | |
state = SlaveState.RESTING | |
elif cmd == Commands.SAVE: | |
with tmp_save_path.open('wb') as f: | |
data_save(f, wd) | |
elif cmd == Commands.LOAD: | |
with tmp_save_path.open('rb') as f: | |
wd = data_load(f) | |
elif cmd == Commands.REPORT: | |
progress.send((state, wd.row_id, len(fd.table))) | |
elif cmd == Commands.ABORT: | |
return | |
else: | |
raise ValueError(cmd) | |
# Perform the action for the current state. | |
if state == SlaveState.WORKING: | |
if wd.row_id >= len(fd.table): | |
final_data = create_final_data(wd) # <=== Actual work. | |
with final_save_path.open('wb') as f: | |
data_save(f, final_data) | |
progress.send((state, wd.row_id, len(fd.table))) | |
progress.send('done') | |
return | |
wd = update_working_data(fd, wd) # <=== Actual work. | |
elif state == SlaveState.RESTING: | |
# Don't hog CPU. | |
time.sleep(SLEEP_PERIOD) | |
else: | |
raise ValueError(state) | |
# Master process: the process that controls the slave process. | |
# ============================================================ | |
parse_command = { | |
# There probably is a better way to transpose Commands. | |
'work': Commands.WORK, | |
'rest': Commands.REST, | |
'save': Commands.SAVE, | |
'load': Commands.LOAD, | |
'abort': Commands.ABORT, | |
} | |
def master_loop_command_line(commands: Connection, progress: Connection): | |
# So we're going to control the slave with the most terrible command line | |
# in the universe. | |
# What happens if the slave has finished its job, or crashed, and isn't | |
# replying anymore? I guess it's a surprise. | |
# Print the user manual. | |
print('Valid commands: {}.'.format(', '.join(parse_command))) | |
while True: | |
commands.send(Commands.REPORT) | |
progress_reported = progress.recv() # Wait for at least one report. | |
while progress.poll(): | |
# If more reports, flush them all. | |
progress_reported = progress.recv() # Wait for at least one report. | |
if progress_reported == 'done': | |
print('Slave is done') | |
return | |
print('Progress: ', progress_reported) | |
s = input('> ') | |
cmd = parse_command.get(s, None) | |
if cmd: | |
commands.send(cmd) | |
if cmd == Commands.ABORT: | |
return | |
elif s: # If user just presses Enter, don't scream. | |
print('Invalid command.') | |
print('Valid commands: {}.'.format(', '.join(parse_command))) | |
def master_loop_gui(commands: Connection, progress: Connection): | |
log_period = 1000 # millisecond | |
def send_command(s): | |
line = f'{datetime.datetime.now()}: command: {s}\n' | |
log(line) | |
cmd = parse_command[s] | |
commands.send(cmd) | |
if cmd == Commands.ABORT: | |
root.destroy() | |
root = tk.Tk() | |
parent = root | |
# Command buttons. | |
frame = tk.Frame(parent) | |
frame.pack(fill=tk.X, expand=False) | |
parent = frame | |
row = 0 | |
for column, text in enumerate(parse_command): | |
b = tk.Button(parent, text=text, command=partial(send_command, text)) | |
b.grid(row=row, column=column, sticky=tk.NSEW) | |
parent.columnconfigure(index=column, weight=1, uniform='whatev') | |
row += 1 | |
parent = root | |
# Status log. | |
frame = tk.Frame(parent) | |
frame.pack(fill=tk.BOTH, expand=True) | |
parent = frame | |
scroll_bar = tk.Scrollbar(parent) | |
scroll_bar.pack(side=tk.RIGHT, fill=tk.Y, expand=False) | |
status_log = tk.Text(parent, yscrollcommand=scroll_bar.set) | |
status_log.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) | |
scroll_bar.config(command=status_log.yview) | |
def log(txt): | |
status_log.insert(tk.END, txt) | |
if scroll_bar.get()[1] == 1.0: | |
# Auto scroll down if the scrollbar is already at the end. | |
# Otherwise don't: let the user look at the log. | |
status_log.see(tk.END) | |
def show_status(): | |
commands.send(Commands.REPORT) | |
progress_reported = progress.recv() # Wait for at least one report. | |
while progress.poll(): | |
# If more reports, flush them all. | |
progress_reported = progress.recv() # Wait for at least one report. | |
line = f'{datetime.datetime.now()}: status: {progress_reported}\n' | |
log(line) | |
root.after(log_period, show_status) | |
def on_closing(): | |
send_command('abort') | |
show_status() | |
root.protocol("WM_DELETE_WINDOW", on_closing) | |
root.mainloop() | |
# Start the whole thing. | |
# ====================== | |
def _main(): | |
# Configure. | |
final_save_path = Path('final.data') | |
tmp_save_path = Path('tmp.data') | |
# Load the frozen data. | |
fd = create_mock_frozen_data() | |
wd = init_working_data(fd) | |
master_commands, slave_commands = Pipe() | |
master_progress, slave_progress = Pipe() | |
slave = Process( | |
target=slave_loop, | |
args=( | |
final_save_path, | |
tmp_save_path, | |
slave_commands, | |
slave_progress, | |
fd, | |
wd) | |
) | |
slave.start() | |
# master_loop_command_line(master_commands, master_progress) | |
master_loop_gui(master_commands, master_progress) | |
slave.join() | |
try: | |
with final_save_path.open('rb') as f: | |
final = data_load(f) | |
print('Final result:') | |
print(final) | |
except FileNotFoundError: | |
print('Work is not finished') | |
if __name__ == '__main__': | |
_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment