Last active
October 12, 2019 15:36
-
-
Save kierdavis/ddd6da1c76119841264f20cdaee1958d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
import sys | |
import tty | |
import termios | |
import subprocess | |
import re | |
from pathlib import Path, PurePath | |
def getch(): | |
fd = sys.stdin.fileno() | |
old_settings = termios.tcgetattr(fd) | |
try: | |
tty.setraw(sys.stdin.fileno()) | |
ch = sys.stdin.read(1) | |
finally: | |
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) | |
return ch | |
def input_single_char(prompt): | |
sys.stdout.write(prompt) | |
sys.stdout.flush() | |
response = getch() | |
sys.stdout.write("\n") | |
return response | |
class InvalidAssetCodeException(Exception): | |
def __init__(self, val): | |
self.val = val | |
def __str__(self): | |
return f"Invalid asset code {self.val!r}" | |
class AllOf: | |
def __init__(self, *child_rules): | |
self.child_rules = child_rules | |
class AnyOf: | |
def __init__(self, desc, *child_rules): | |
self.desc = desc | |
self.child_rules = child_rules | |
def times(quantity, rule): | |
return AllOf(*(rule for _ in range(quantity))) | |
INVENTORIED_EXPECTED_CONTENTS = AllOf( | |
times(2, "motor-board-mcv4b"), | |
"ruggeduino", | |
"screw-shield-pwr-side", | |
"screw-shield-usb-side", | |
"power-board-pbv4b", | |
"servo-board-sbv4b", | |
"usb-wifi-tl-wn823n", | |
times(2, "usb-hub-startech"), | |
"odroid-u3", | |
AnyOf("battery charger", | |
"battery-charger-imax-b6", | |
"battery-charger-e4", | |
), | |
AnyOf("battery charger supply", | |
"battery-charger-imax-b6-supply", | |
"battery-charger-e4-supply", | |
), | |
AnyOf("webcam", | |
"webcam-logitech-c270", | |
"webcam-logitech-c500", | |
), | |
AnyOf("lipo bag", | |
"lipo-bag-overlander-22x30", | |
"lipo-bag-hpi-18x22", | |
), | |
times(2, | |
AnyOf("battery", | |
"battery-lipo-11.1v2.2", | |
"battery-lipo-11.1v2.2-turnigy", | |
), | |
), | |
) | |
NON_INVENTORIED_EXPECTED_CONTENTS = [ | |
"a USB stick", | |
"3 macro USB cables", | |
"5 micro USB cables", | |
"a bag of camcons", | |
"a screwdriver", | |
"an odroid power cable", | |
"a bundle of black power wire", | |
"a bundle of red power wire", | |
"a bundle of green power wire", | |
"a bundle of blue signal wire", | |
"a bundle of yellow signal wire", | |
"a kit disclaimer form", | |
] | |
class Asset: | |
@staticmethod | |
def _find(raw_asset_code) -> Path: | |
try: | |
result = subprocess.run(["sr", "inv-findpart", raw_asset_code], check=True, stdout=subprocess.PIPE) | |
except subprocess.CalledProcessError as e: | |
raise InvalidAssetCodeException(raw_asset_code) from e | |
return Path(result.stdout.strip().decode("utf-8")) | |
@classmethod | |
def find(cls, raw_asset_code) -> "Asset": | |
return cls(cls._find(raw_asset_code)) | |
def __init__(self, path): | |
self.path = path | |
@property | |
def name(self): | |
return self.path.name | |
@property | |
def _name_parts(self): | |
match = re.match(r"^(.+?)-sr([a-zA-Z0-9]+)$", self.name) | |
if match is None: | |
raise RuntimeError("sr inv-findpart returned a weird result") | |
return match.groups() | |
@property | |
def type(self): | |
return self._name_parts[0] | |
@property | |
def code(self): | |
return self._name_parts[1] | |
@property | |
def contents(self): | |
for sub_path in self.path.iterdir(): | |
if sub_path.name != "info": | |
yield Asset(sub_path) | |
@property | |
def is_empty(self): | |
try: | |
next(self.contents) | |
return False | |
except StopIteration: | |
return True | |
def move_to(self, dest): | |
subprocess.run(["git", "mv", str(self.path), str(dest)], check=True) | |
self.path = self._find(self.code) | |
def move_into(self, other_asset): | |
self.move_to(other_asset.path) | |
def __str__(self): | |
return self.name | |
def find_asset_with_type(desired_type, assets): | |
for asset in assets: | |
if asset.type == desired_type: | |
return asset | |
return None | |
def verify_recursive(rule, assets): | |
if isinstance(rule, str): | |
asset_type = rule | |
asset = find_asset_with_type(asset_type, assets) | |
if asset is None: | |
return [rule], assets | |
assets = list(assets) | |
assets.remove(asset) | |
return [], assets | |
elif isinstance(rule, AllOf): | |
failed_rules = [] | |
for sub_rule in rule.child_rules: | |
sub_failed_rules, assets = verify_recursive(sub_rule, assets) | |
failed_rules += sub_failed_rules | |
return failed_rules, assets | |
elif isinstance(rule, AnyOf): | |
for sub_rule in rule.child_rules: | |
failed_rules, new_assets = verify_recursive(sub_rule, assets) | |
if not failed_rules: | |
return [], new_assets | |
return [rule], assets | |
else: | |
raise ValueError(rule) | |
def rule_desc(rule): | |
if isinstance(rule, str): | |
return rule | |
elif isinstance(rule, AllOf): | |
# verify_recursive won't ever return this | |
raise RuntimeError("unreachable") | |
elif isinstance(rule, AnyOf): | |
return rule.desc | |
else: | |
raise ValueError(rule) | |
def verify_inventoried_contents(assets): | |
failed_rules, remaining_assets = verify_recursive(INVENTORIED_EXPECTED_CONTENTS, assets) | |
ok = True | |
if failed_rules: | |
ok = False | |
print(f"Missing: {', '.join(rule_desc(rule) for rule in failed_rules)}") | |
if remaining_assets: | |
ok = False | |
for asset in remaining_assets: | |
print(f"{asset} is in this box, but we didn't expect it to be") | |
return ok | |
def get_yes_no(prompt): | |
while True: | |
response = input_single_char(prompt) | |
if response == "y": | |
return True | |
elif response == "n": | |
return False | |
def verify_non_inventoried_contents(): | |
prompts = [f"Is/are there {desc} in the box? (y/n) " for desc in NON_INVENTORIED_EXPECTED_CONTENTS] | |
prompts += [ | |
"Are the batteries inside the battery bag? (y/n) ", | |
"Are the power board, both motor boards, servo board and ruggeduino inside a jiffy bag? (y/n) ", | |
] | |
for prompt in prompts: | |
while not get_yes_no(prompt): | |
pass # try again mate | |
return True | |
def verify_contents(assets): | |
return verify_inventoried_contents(assets) and verify_non_inventoried_contents() | |
def input_box() -> Asset: | |
while True: | |
try: | |
box = Asset.find(input("Scan a RUB:\n> ")) | |
except InvalidAssetCodeException as e: | |
print(e) | |
continue | |
if box.type != "box-18l-rub": | |
print(f"That's a {box.type}, not a box-18l-rub.") | |
continue | |
if not box.is_empty: | |
print(f"Please empty the RUB in the inventory first.") | |
continue | |
return box | |
def find_battery_bag(assets): | |
for asset in assets: | |
if asset.name.startswith("lipo-bag-"): | |
return asset | |
raise RuntimeError("no battery bag; verification should have caught this already") | |
def commit_box(box, contents): | |
box.move_to(".") | |
battery_bag = find_battery_bag(contents) | |
for item in contents: | |
if item.name.startswith("battery-lipo-"): | |
item.move_into(battery_bag) | |
else: | |
item.move_into(box) | |
def add_to_contents(contents, item_to_add): | |
for item_in_contents in list(contents): | |
if item_in_contents.code == item_to_add.code: | |
print(f"{item_to_add} is already in the box!") | |
return | |
contents.append(item_to_add) | |
def remove_from_contents(contents, item_to_remove): | |
for item_in_contents in list(contents): | |
if item_in_contents.code == item_to_remove.code: | |
contents.remove(item_in_contents) | |
return | |
print(f"{item_to_remove} wasn't in the box!") | |
def ensure_batteries_are_physically_inside_bags(): | |
while not get_yes_no("Are the batteries inside the battery bag? (y/n) "): | |
pass | |
def pack_box(): | |
box = input_box() | |
contents = [] | |
while True: | |
verify_result = verify_inventoried_contents(contents) | |
response = input(f"Scan an asset to put it into {box}; or precede asset code with '-' to remove it from {box}; or scan {box} to verify and finish:\n> ") | |
remove = False | |
if response.startswith("-"): | |
remove = True | |
response = response.lstrip("-") | |
try: | |
item = Asset.find(response) | |
except InvalidAssetCodeException as e: | |
print(e) | |
continue | |
if remove: | |
remove_from_contents(contents, item) | |
else: | |
if item.code == box.code: | |
verify_result = verify_result and verify_non_inventoried_contents() | |
if verify_result: | |
ensure_batteries_are_physically_inside_bags() | |
print(f"Verification succeeded, updating the inventory...") | |
commit_box(box, contents) | |
print(f"Finished packing {box}.") | |
print() | |
print() | |
return | |
else: | |
print("Verification failed, fix the problems and try again.") | |
elif item.type == "box-18l-rub": | |
print("You can't start a new until you've finished this one.") | |
else: | |
add_to_contents(contents, item) | |
def pack_boxes_forever(): | |
while True: | |
pack_box() | |
def main(): | |
pack_boxes_forever() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment