Skip to content

Instantly share code, notes, and snippets.

@mrmamongo
Created November 18, 2023 17:41
Show Gist options
  • Save mrmamongo/31057726a5578e34ae7c1b3d00c83114 to your computer and use it in GitHub Desktop.
Save mrmamongo/31057726a5578e34ae7c1b3d00c83114 to your computer and use it in GitHub Desktop.
import json
import logging
import sys
from dataclasses import dataclass
import adaptix
@dataclass
class ExecutionData:
name: str
value: float
def main():
retort = adaptix.Retort()
value = retort.load(json.loads(input()), ExecutionData)
value.name = "PROCESSED"
value.value += 100500
print(json.dumps(retort.dump(value)))
if __name__ == '__main__':
main()
import json
import logging
import multiprocessing
import sys
from dataclasses import dataclass
from io import StringIO
from multiprocessing import Process
from pathlib import Path
from pprint import pprint
from typing import Any, IO
import adaptix
@dataclass(init=True, kw_only=True)
class ToExecute:
path: Path
input_data: str
@dataclass(kw_only=True)
class ExecuteResult:
result: str
success: bool
class ScriptExecutor:
def __init__(self, script_path: Path) -> None:
self.logger = logging.getLogger(__name__)
self.script_path = script_path
self.stdin_hook = StringIO()
self.prev_stdin_hook: IO | None = None
self.stdout_hook = StringIO()
self.prev_stdout_hook: IO | None = None
self.stderr_hook = StringIO()
self.prev_stderr_hook: IO | None = None
def __enter__(self) -> "ScriptExecutor":
sys.stdin.flush()
sys.stdout.flush()
sys.stderr.flush()
self.prev_stdin_hook = sys.stdin
sys.stdin = self.stdin_hook
self.prev_stdout_hook = sys.stdout
sys.stdout = self.stdout_hook
self.prev_stderr_hook = sys.stderr
sys.stderr = self.stderr_hook
sys.displayhook = self.logger.error
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.stdin_hook.flush()
self.stdout_hook.flush()
self.stderr_hook.flush()
self.stdin_hook.close()
self.stdout_hook.close()
self.stderr_hook.close()
sys.stdin = self.prev_stdin_hook
sys.stdout = self.prev_stdout_hook
sys.stderr = self.prev_stderr_hook
def execute_script(self, input_data: str) -> ExecuteResult:
try:
with open(self.script_path.resolve()) as file:
source_code = file.read()
self.stdin_hook.write(input_data)
self.logger.info("Executing script, %s", input_data, extra={"script_path": self.script_path.resolve()})
exec(compile(source_code, '<inline>', 'exec'), {**globals(), "print": self.stderr_hook.write}, {})
try:
return ExecuteResult(result=self.stderr_hook.read(), success=False)
except Exception as e:
self.logger.error("Script executing success: %s", e)
result_data = self.stdout_hook.read()
self.logger.debug("Executing successful: %s", result_data)
return ExecuteResult(result=result_data, success=True)
except Exception as e:
self.logger.error(e, extra={"script_path": self.script_path})
class ExecutorProcess(Process):
def __init__(self, script_queue: multiprocessing.Queue,
result_queue: multiprocessing.Queue):
super().__init__(name="Executor", daemon=True)
self.script_executor_class = ScriptExecutor
self.executors: dict[Path, ScriptExecutor] = {}
self.script_queue = script_queue
self.result_queue = result_queue
self.logger = logging.getLogger(self.__class__.__name__)
def run(self) -> None:
while True:
to_execute = self.script_queue.get()
try:
with self.executors.setdefault(to_execute.path,
self.script_executor_class(script_path=to_execute.path)) as executor:
results = executor.execute_script(to_execute.input_data)
self.logger.error("EXECUTED %s", results)
self.result_queue.put(results)
except Exception as e:
self.logger.error(e)
class ExecutorAdapter:
def __init__(self, script_queue: multiprocessing.Queue,
result_queue: multiprocessing.Queue):
self.script_queue = script_queue
self.result_queue = result_queue
def execute_script(self, script_path: Path, input_data: dict[str, Any]) -> ExecuteResult:
self.script_queue.put(ToExecute(path=script_path.resolve(), input_data=json.dumps(input_data)))
return self.result_queue.get()
def main() -> None:
logging.basicConfig(format="%(levelname)s:%(name)s:%(process)d - %(message)s")
script_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
ExecutorProcess(script_queue, result_queue).start()
script_path = Path('./generate_smth.py')
input_data = {"name": "", "value": 150.0}
adapter = ExecutorAdapter(script_queue, result_queue)
logging.error(adapter.execute_script(script_path, input_data=input_data))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment