Last active
October 24, 2024 04:11
-
-
Save pirate/7193ab54557b051aa1e3a83191b69793 to your computer and use it in GitHub Desktop.
Example of how to pluginize a complex app using a hooks system
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example of a pluginized architecture breaking up a large app | |
with complex behavior (ArchiveBox), into a series of steps | |
that plugins can hook into. | |
(read from the bottom to top to get a quick overview) | |
""" | |
import re | |
import json | |
from datetime import datetime | |
from copy import deepcopy | |
from functools import wraps, partial | |
from typing import Union | |
from prettyprinter import cpprint | |
class Mapping(dict): | |
"""dot.notation access to dictionary attributes""" | |
__getattr__ = dict.get | |
__setattr__ = dict.__setitem__ | |
__delattr__ = dict.__delitem__ | |
class classproperty(object): | |
def __init__(self, f): | |
self.f = f | |
def __get__(self, obj, owner): | |
return self.f(owner) | |
def deep_convert_dict(obj): | |
"""recursively convert native dicts into Mapping objects""" | |
new_obj = obj | |
if isinstance(obj, dict) and not isinstance(obj, Mapping): | |
new_obj = Mapping(obj) | |
try: | |
for key, value in new_obj.items(): | |
new_obj[key] = deep_convert_dict(value) | |
except AttributeError: | |
pass | |
return new_obj | |
def recursive_merge(dict1, dict2, concat_lists=True, lists_unique=True, allow_mismatched_types=False): | |
"""deep merge two dictionaries, concatenating any lists encountered at the same key""" | |
for key in dict.fromkeys(tuple(dict1.keys()) + tuple(dict2.keys())): | |
assert key not in dir({}), f'Key {key} is not allowed in state is it conflicts with a builtin dir method!' | |
val1, val1_type = dict1.get(key), type(dict1.get(key)) | |
val2, val2_type = dict2.get(key), type(dict2.get(key)) | |
# convert native dict objects into Mapping whenever encountered | |
val1 = deep_convert_dict(val1) | |
val2 = deep_convert_dict(val2) | |
# iterate through key:value pairs merging each value | |
if key in dict1 and key in dict2: | |
if val1 is not None and not issubclass(val1_type, val2_type): | |
if allow_mismatched_types: | |
yield (key, val2) | |
else: | |
raise TypeError( | |
f'Value in dict1[{key}] has different type than value in dict2! ' | |
f'{val1} ({val1_type.__name__}) != {val2} ({val2_type.__name__})' | |
) | |
if isinstance(val1, dict) and isinstance(val2, dict): | |
yield (key, Mapping(recursive_merge(val1, val2, concat_lists=concat_lists, lists_unique=lists_unique, allow_mismatched_types=allow_mismatched_types))) | |
elif isinstance(val1, list) and isinstance(val2, list): | |
if concat_lists: | |
if lists_unique: | |
yield (key, list(dict.fromkeys(val1 + val2))) | |
else: | |
yield (key, val1 + val2) | |
else: | |
yield (key, val2) | |
else: | |
# If one of the values is not a dict, you can't continue merging it. | |
# Value from second dict overrides one in first and we move on. | |
yield (key, val2) | |
# Alternatively, replace this with exception raiser to alert you of value conflicts | |
elif key in dict1: | |
yield (key, val1) | |
else: | |
yield (key, val2) | |
def deepmerge(dict1, dict2): | |
return Mapping(recursive_merge(dict1, dict2, concat_lists=True, lists_unique=True, allow_mismatched_types=False)) | |
def update_state(func): | |
"""decorator to apply the returned dict as a patch deepmerged into state""" | |
@wraps(func) | |
def wrapper(cls, state, *args, **kwargs): | |
assert issubclass(cls, ArchiveBoxPlugin) | |
assert isinstance(state, (dict, Mapping)) | |
state_patch = func(cls, state, *args, **kwargs) | |
return deepmerge(state, state_patch) | |
return wrapper | |
def flatten_hooks(hooks, plugins): | |
for plugin in plugins: | |
hooks = deepmerge(hooks, plugin.get_hooks()) | |
return hooks | |
def get_plugin(plugin_name): | |
"""get the plugin python class given the type name""" | |
return globals()[plugin_name] | |
def load_plugins(state, plugins: Union[list, dict]): | |
for plugin_name in plugins: | |
state = deepmerge(state, { | |
'hooks': get_plugin(plugin_name).get_hooks(), | |
'plugins': { | |
plugin_name: { | |
'state': { | |
'loaded': 'imported', | |
'enabled': True, | |
}, | |
}, | |
}, | |
}) | |
return state | |
def run_hooks(state: Mapping, hook_name: str=None): | |
print() | |
print('>', hook_name) | |
for plugin in state.plugins: | |
state = get_plugin(plugin).run_hook(state, hook_name) | |
return deepmerge(state, { | |
'meta': { | |
'active_hook': hook_name, | |
}, | |
}) | |
class ArchiveBoxPlugin: | |
ENABLED = True | |
REQUIRED = False | |
DEFAULT_CONFIG = {} | |
REQUIRED_CONFIG = [] | |
REQUIRED_PLUGINS = [] | |
REQUIRED_HOOKS = [] | |
ADVERTISED_HOOKS = [] | |
@classproperty | |
def NAME(cls): | |
words = re.findall(r'[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))', cls.__name__) | |
return ' '.join(words) | |
@classproperty | |
def CONFIG_NAME(cls): | |
return cls.__name__.replace("Plugin", "").upper() | |
@classmethod | |
def get_hooks(cls, include_super=True): | |
hook_methods = [ | |
method for method in dir(cls) | |
if method.startswith('hook_') | |
] | |
if include_super: | |
return hook_methods | |
# find all methods implemented in the class itself (not superclass) | |
return [ | |
name | |
for name, method in vars(cls).items() | |
if (callable(method) | |
or isinstance(method, classmethod) | |
or isinstance(method, staticmethod)) | |
and name.startswith('hook_') | |
] | |
@classmethod | |
def get_plugin_state(cls, state, plugin_name=None): | |
return state.plugins[plugin_name or cls.__name__].state | |
@classmethod | |
def set_plugin_state(cls, plugin_state, plugin_name=None): | |
return Mapping({ | |
'plugins': { | |
(plugin_name or cls.__name__): { | |
'state': plugin_state, | |
} | |
} | |
}) | |
@classmethod | |
def get_plugin_config(cls, state: dict, plugin_name: str=None, flatten: bool=True): | |
if plugin_name: | |
CONFIG_NAME = state.plugins[plugin_name].config_name | |
else: | |
CONFIG_NAME = cls.CONFIG_NAME | |
CONFIG_PREFIX = f'PLUGIN_{CONFIG_NAME}_' | |
return Mapping({ | |
(key.replace(CONFIG_PREFIX, '').lower() if flatten else key): val | |
for key, val in state.config.state.items() | |
if key.startswith(CONFIG_PREFIX) | |
}) | |
@classmethod | |
def set_plugin_config(cls, flat_lowercase_config: dict): | |
return Mapping({ | |
'config': { | |
f'PLUGIN_{cls.CONFIG_NAME}_{key.upper()}': val | |
for key, val in flat_lowercase_config.items() | |
} | |
}) | |
@classmethod | |
def set_general_config(cls, flat_config: dict): | |
return Mapping({ | |
'config': { | |
'state': flat_config, | |
} | |
}) | |
@classmethod | |
def is_enabled(cls, state): | |
try: | |
return cls.get_plugin_state(state).enabled | |
except KeyError: | |
# before initial plugin state is set up | |
return True | |
@classmethod | |
def run_hook(cls, state, hook_name, *args, **kwargs): | |
# the dynamic calling + result merging logic here is critical and fragile, | |
# be careful editing this function, make sure to test output before and after | |
if not cls.is_enabled(state): | |
return state | |
try: | |
hook_function = getattr(cls, hook_name) | |
print(' >', cls.__name__, hook_name) | |
state = hook_function(state, *args, **kwargs) | |
except AttributeError: | |
pass | |
return deepmerge(state, { | |
'meta': { | |
'active_hook': f'{cls.__name__}.{hook_name}', | |
}, | |
}) | |
@classmethod | |
@update_state | |
def hook_setup_plugins(cls, state): | |
"""load the plugin metadata into the global state object""" | |
return { | |
'plugins': { | |
cls.__name__: { | |
'key': cls.__name__, | |
'name': cls.NAME, | |
'config_name': cls.CONFIG_NAME, | |
'path': f'./plugins/{cls.__name__}', | |
'required': cls.REQUIRED, | |
'state': { | |
'loaded': 'initialized', | |
'enabled': cls.ENABLED, | |
}, | |
'hooks': cls.get_hooks(), | |
'hooks_defined': cls.get_hooks(include_super=False), | |
'hooks_advertised': cls.ADVERTISED_HOOKS, | |
'required_plugins': cls.REQUIRED_PLUGINS, | |
'required_config': cls.REQUIRED_CONFIG, | |
'default_config': cls.DEFAULT_CONFIG, | |
}, | |
}, | |
} | |
@classmethod | |
@update_state | |
def hook_config_start(cls, state): | |
"""load the default config into the global state object""" | |
prefixed_configs = { | |
(f'PLUGIN_{cls.CONFIG_NAME}_{key.upper()}' if key.islower() else key): value | |
for key, value in cls.DEFAULT_CONFIG.items() | |
} | |
return { | |
'config': { | |
'state': { | |
**prefixed_configs, | |
f'PLUGIN_{cls.CONFIG_NAME}_ENABLED': cls.ENABLED, | |
}, | |
}, | |
} | |
@classmethod | |
@update_state | |
def hook_config_plugins(cls, state): | |
"""load the plugin enabled status based its dependencies presence/status""" | |
self_enabled = cls.get_plugin_state(state).enabled | |
has_required_plugins = all( | |
cls.get_plugin_state(state, plugin_name).enabled | |
for plugin_name in cls.REQUIRED_PLUGINS | |
) | |
has_required_configs = all( | |
state.config.state[ | |
config_key | |
if config_key.isupper() else | |
f'PLUGIN_{cls.CONFIG_NAME}_{config_key.upper()}' | |
] | |
for config_key in cls.REQUIRED_CONFIG | |
) and cls.get_plugin_config(state).enabled | |
should_enable = ( | |
self_enabled | |
and has_required_plugins | |
and has_required_configs | |
) | |
if should_enable: | |
assert all( | |
hook_name in state.hooks | |
for hook_name in cls.REQUIRED_HOOKS | |
) | |
return cls.set_plugin_state({ | |
'loaded': 'configured', | |
'enabled': should_enable, | |
}) | |
class ArchiveBoxCorePlugin(ArchiveBoxPlugin): | |
REQUIRED = True | |
class ConfigFilePlugin(ArchiveBoxPlugin): | |
NAME = 'Config via Config File' | |
REQUIRED = True | |
@classmethod | |
@update_state | |
def hook_config(cls, state): | |
# config = load_config_file(state.config.config_file, state.config.schema) | |
return { | |
'config': { | |
'state': {'loaded_file_config': True}, | |
} | |
} | |
@classmethod | |
@update_state | |
def hook_config_save(cls, state): | |
# config = write_config_file(state.config.config_file, state.config.schema, state.config) | |
return { | |
'config': { | |
'state': {'saved_config_file': True}, | |
} | |
} | |
class ConfigEnvPlugin(ArchiveBoxPlugin): | |
NAME = 'Config via Environment Variables' | |
REQUIRED = True | |
@classmethod | |
@update_state | |
def hook_config(cls, state): | |
# config = load_config_env(state.config.schema, state.config.state) | |
# config = {'loaded_env_config': True} | |
return { | |
'config': { | |
'state': { | |
'loaded_env_config': True, | |
'PLUGIN_DARKTHEME_ENABLED': True, # just for testing | |
}, | |
} | |
} | |
class DarkThemePlugin(ArchiveBoxPlugin): | |
DEFAULT_CONFIG = { | |
'TEMPLATE_DIRS': [ | |
'./plugins/DarkThemePlugin', | |
], | |
} | |
class PocketHTMLParserPlugin(ArchiveBoxPlugin): | |
DEFAULT_CONFIG = { | |
'cleanup': True, | |
} | |
@classmethod | |
@update_state | |
def hook_parse(cls, state): | |
for line in state.parse.urls_text.split('\n'): | |
if cls.get_plugin_state(state).cleanup: | |
state.parse.links.append('clean+' + line) | |
else: | |
state.parse.links.append(line) | |
return { | |
'parse': { | |
'links': state.parse.links, | |
} | |
} | |
class ChromiumDependencyPlugin(ArchiveBoxPlugin): | |
REQUIRED_CONFIG = ['binary'] | |
DEFAULT_CONFIG = { | |
'binary': 'chromium', | |
} | |
@classmethod | |
@update_state | |
def hook_config(cls, state): | |
binary = cls.get_plugin_config(state).binary | |
version = '92.234.234' | |
# version = False | |
return { | |
**cls.set_plugin_state({ | |
'binary': binary, | |
'version': version, | |
'enabled': bool(binary and version), | |
}), | |
**cls.set_plugin_config({ | |
'enabled': bool(binary and version), | |
}) | |
} | |
class PlaywrightExtractorPlugin(ArchiveBoxPlugin): | |
REQUIRED_PLUGINS = ['PlaywrightExtractorPlugin'] | |
DEFAULT_CONFIG = { | |
'enabled': True, | |
'user_agent': 'Chrome', | |
'geolocation': '234,234', | |
} | |
ADVERTISED_HOOKS = { | |
'hook_snapshot_start_setup_browser', | |
'hook_snapshot_start_setup_context', | |
'hook_snapshot_start_setup_page', | |
'hook_snapshot_load_start', | |
'hook_snapshot_load', | |
'hook_snapshot_load_end', | |
} | |
@classmethod | |
def hook_snapshot_start(cls, state): | |
state = run_hooks(state, 'hook_snapshot_start_setup_browser') | |
state = run_hooks(state, 'hook_snapshot_start_setup_context') | |
state = run_hooks(state, 'hook_snapshot_start_setup_page') | |
return state | |
@classmethod | |
def hook_snapshot(cls, state): | |
state = run_hooks(state, 'hook_snapshot_load_start') | |
state = run_hooks(state, 'hook_snapshot_load') | |
state = run_hooks(state, 'hook_snapshot_load_end') | |
return state | |
@classmethod | |
@update_state | |
def hook_snapshot_start_setup_browser(cls, state): | |
return cls.set_plugin_state({ | |
'browser': 'sync_playwright.chromium', | |
}) | |
@classmethod | |
@update_state | |
def hook_snapshot_start_setup_context(cls, state): | |
return cls.set_plugin_state({ | |
'context_args': { | |
'executable_path': '/bin/' + cls.get_plugin_state(state, 'ChromiumDependencyPlugin').binary, | |
'timeout': 60_000, | |
}, | |
}) | |
@classmethod | |
@update_state | |
def hook_snapshot_start_setup_page(cls, state): | |
browser = cls.get_plugin_state(state).browser | |
context = 'browser.launch_persistent_context(**runner.context_args)' | |
page = 'context.new_page()' | |
return cls.set_plugin_state({ | |
'context': context, | |
'page': page, | |
}) | |
@classmethod | |
def hook_snapshot_load_start(cls, state): | |
return state | |
@classmethod | |
def hook_snapshot_load(cls, state): | |
return state | |
cls.get_plugin_state(state).page.goto(state.snapshot.url) | |
@classmethod | |
def hook_snapshot_load_end(cls, state): | |
return state | |
class TitleRecorderPlugin(ArchiveBoxPlugin): | |
ENABLED = True | |
REQUIRED_PLUGINS = ['PlaywrightExtractorPlugin'] | |
REQUIRED_HOOKS = ['hook_snapshot_load_end'] | |
@classmethod | |
@update_state | |
def hook_snapshot_load_end(cls, state): | |
# title = cls.get_plugin_state(state, 'PlaywrightExtractorPlugin').page.title() | |
title = 'Example title' | |
return { | |
'snapshot': { | |
'results': [ | |
('title:' + title), | |
], | |
}, | |
} | |
class VideoRecorderPlugin(ArchiveBoxPlugin): | |
ENABLED = True | |
REQUIRED_PLUGINS = ['PlaywrightExtractorPlugin'] | |
REQUIRED_HOOKS = [ | |
'hook_snapshot_start_setup_context', | |
'hook_snapshot_load_end', | |
] | |
DEFAULT_CONFIG = { | |
'TEMPLATE_DIRS': [ | |
'./plugins/VideoRecorderPlugin', | |
], | |
'INDEX_COLUMNS': [ | |
{'key': 'VideoRecorder', 'name': 'Video Recording', 'icon': 'video.png'}, | |
], | |
'SNAPSHOT_PREVIEWS': [ | |
{'key': 'VideoRecorder', 'name': 'Video Recording', 'icon': 'video.png', 'src': 'VideoRecorder/recording.mp4'}, | |
], | |
} | |
@classmethod | |
@update_state | |
def hook_snapshot_start_setup_context(cls, state: dict): | |
return cls.set_plugin_state({ | |
'context_args': { | |
'record_video_dir': './video', | |
'slow_mo': 0, | |
}, | |
}, 'PlaywrightExtractorPlugin') | |
@classmethod | |
@update_state | |
def hook_snapshot_load_end(cls, state: dict): | |
# Path(state.archive.PlaywrightExtractorPlugin.page.video.path()).move_to('./VideoRecorder/recording.mp4') | |
return { | |
'snapshot': { | |
'results': [ | |
'./VideoRecorder/recording.mp4', | |
], | |
}, | |
} | |
ALL_PLUGINS = [ | |
'ArchiveBoxCorePlugin', | |
'ConfigFilePlugin', | |
'ConfigEnvPlugin', | |
'DarkThemePlugin', | |
'PocketHTMLParserPlugin', | |
'ChromiumDependencyPlugin', | |
'PlaywrightExtractorPlugin', | |
'TitleRecorderPlugin', | |
'VideoRecorderPlugin', | |
] | |
INITIAL_STATE = { | |
'hooks': [ | |
'hook_config_start', | |
'hook_config', | |
'hook_config_plugins', | |
'hook_config_end', | |
'hook_config_save', | |
'hook_parse_start', | |
'hook_parse', | |
'hook_parse_end', | |
'hook_archive_start', | |
'hook_archive', | |
'hook_archive_end', | |
'hook_pre_snapshot', | |
'hook_snapshot', | |
'hook_post_snapshot', | |
'hook_pre_save_result', | |
'hook_save_result', | |
'hook_post_save_result', | |
'hook_render_icon', | |
], | |
'plugins': {}, | |
'config': { | |
'config_file_path': None, | |
'schema': {}, | |
'state': {}, | |
}, | |
'meta': { | |
'start_time': None, | |
'end_time': None, | |
'active_hook': None, | |
'version': None, | |
}, | |
'parse': { | |
'urls_text': 'https://example.com\nhttps://example.com/other', | |
'links': [], | |
}, | |
'snapshot': { | |
'url': 'https://example.com', | |
'results': [], | |
}, | |
} | |
VERSION = '0.7.1' | |
def run(state=INITIAL_STATE, plugins=ALL_PLUGINS): | |
try: | |
state = deepmerge(state, { | |
'meta': { | |
'start_time': datetime.now().isoformat(), | |
'version': VERSION, | |
}, | |
}) | |
state = load_plugins(state, plugins) | |
state = run_hooks(state, 'hook_setup_plugins') | |
state = run_hooks(state, 'hook_config_start') | |
state = run_hooks(state, 'hook_config') | |
state = run_hooks(state, 'hook_config_plugins') | |
state = run_hooks(state, 'hook_config_end') | |
# load_django(config=state.config.state) | |
state = run_hooks(state, 'hook_parse_start') | |
state = run_hooks(state, 'hook_parse') | |
state = run_hooks(state, 'hook_parse_end') | |
state = run_hooks(state, 'hook_archive_start') | |
state = run_hooks(state, 'hook_archive') | |
state = run_hooks(state, 'hook_snapshot_start') | |
state = run_hooks(state, 'hook_snapshot') | |
state = run_hooks(state, 'hook_snapshot_end') | |
state = run_hooks(state, 'hook_archive_end') | |
raise Exception('success!') | |
except Exception: | |
state = deepmerge(state, { | |
'meta': { | |
'end_time': datetime.now().isoformat(), | |
}, | |
}) | |
print() | |
print('STATE DUMP:') | |
# print(json.dumps(state, indent=4)) | |
cpprint(state) | |
print() | |
raise | |
return state | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment