Last active
August 19, 2024 18:52
-
-
Save sergiobuj/4fa8026a5545279d3e96996ecddfc818 to your computer and use it in GitHub Desktop.
Cli parse kv params / encode/decode file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import re | |
from collections import defaultdict | |
from dataclasses import dataclass, field | |
from pathlib import Path | |
from xml.etree import ElementTree | |
OUTPUT_NODE = "{*}output" | |
INPUT_NODE = "{*}input" | |
CONDITION_EXPRESSION = "{*}conditionExpression" | |
@dataclass | |
class BPMNObject: | |
_xml: dict | |
level: int | |
tag: str = field(init=False) | |
type: str = field(init=False) | |
id: str = field(init=False) | |
name: str = field(init=False) | |
source_ref: str = field(init=False) | |
target_ref: str = field(init=False) | |
inputs: list[dict] = field(init=False) | |
outputs: list[dict] = field(init=False) | |
def __post_init__(self): | |
self.tag = self._xml.tag | |
self.type = self.tag.split("}")[1] | |
self.id = self._xml.attrib.get("id") | |
self.name = self._xml.attrib.get("name") | |
self.source_ref = self._xml.attrib.get("sourceRef") | |
self.target_ref = self._xml.attrib.get("targetRef") | |
self.outputs = [] | |
for output in self._xml.findall(f".//{OUTPUT_NODE}"): | |
self.outputs.append(output.attrib) | |
self.inputs = [] | |
for input in self._xml.findall(f".//{INPUT_NODE}"): | |
self.inputs.append(input.attrib) | |
for condition in self._xml.findall(f".//{CONDITION_EXPRESSION}"): | |
self.inputs.append({"source": condition.text}) | |
def __repr__(self) -> str: | |
return f"{self.level:3}{'\t' * self.level} {self.type} {self.id} inputs:{len(self.inputs)} outputs:{len(self.outputs)}" | |
def unsatisfied_inputs(inputs: list[dict], outputs: list[dict]) -> list[str]: | |
if not inputs: | |
return [] | |
missing_inputs = [] | |
for input in inputs: | |
src = re.sub("=", "", re.sub(r"\s+[0-9]*\s*", "", input["source"])) | |
satisfied = False | |
for output in outputs: | |
if output["target"] == src: | |
satisfied = True | |
if not satisfied: | |
missing_inputs.append(src) | |
return missing_inputs | |
def show_dependencies(graph, node_info): | |
to_visit = [("start_event", [])] | |
visited = set([]) | |
while to_visit: | |
element, env = to_visit.pop(0) | |
visited.add(element) | |
node_data = node_info[element] | |
if missing_inputs := unsatisfied_inputs(node_data.inputs, env): | |
print( | |
f"{node_data.id} ({node_data.name}) has unknown inputs:", | |
*missing_inputs, | |
) | |
for child in graph[element]: | |
if child in visited: | |
continue | |
new_env = env + node_info[element].outputs | |
to_visit.insert(0, (child, new_env)) | |
def should_skip(node: dict) -> bool: | |
skip_list = ("BPMNDiagram",) | |
return any(map(lambda x: x in node.tag, skip_list)) | |
def parsebpmn(filepath: Path, debug: bool = False) -> tuple[dict, dict]: | |
tree = ElementTree.parse(filepath) | |
root = tree.getroot() | |
graph = defaultdict(set) | |
elements = {} | |
to_visit = [(root, 0)] | |
while to_visit: | |
xml_node, level = to_visit.pop(0) | |
if should_skip(xml_node): | |
continue | |
node = BPMNObject(_xml=xml_node, level=level) | |
if debug: | |
print(node) | |
if node.id: | |
elements[node.id] = node | |
if node.source_ref: | |
graph[node.source_ref].add(node.id) | |
graph[node.id].add(node.target_ref) | |
for child in xml_node: | |
to_visit.insert(0, (child, level + 1)) | |
return graph, elements | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-b", "--bpmn", type=Path, required=True) | |
args = parser.parse_args() | |
bpmn = args.bpmn | |
graph, elements = parsebpmn(bpmn, debug=False) | |
show_dependencies(graph, elements) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import base64 | |
import random | |
from pathlib import Path | |
def update_process(process_file: Path): | |
raw = "" | |
with open(process_file, "rb") as f: | |
raw = f.read() | |
encoded = base64.b64encode(raw) | |
with open("encoded.txt", "w") as f: | |
f.write(encoded.decode("utf-8")) | |
def create_instance(process_file: Path, params: dict): | |
pass | |
def run_process_with_params(process_file: Path, params: dict): | |
process = update_process(process_file) | |
instance = create_instance(process, params) | |
def _parse_kv_params(kvargs: list[str]) -> dict[str, str]: | |
values = {} | |
for kv in kvargs: | |
k, v = kv.split("=", maxsplit=1) | |
values[k] = v | |
return values | |
def recreate_file(encoded_bytes: str): | |
raw_bytes = base64.b64decode(encoded_bytes) | |
_id = random.randint(0, 100) | |
filename = f"flag_{_id}.png" | |
with open(filename, "wb") as f: | |
f.write(raw_bytes) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-b", "--bpmn", type=Path, required=True) | |
parser.add_argument("-p", "--param", metavar="k=v", required=True, action="append") | |
parser.add_argument("--raw", type=str) | |
args = parser.parse_args() | |
bpmn = args.bpmn | |
params = _parse_kv_params(args.param) | |
print(f"{bpmn=}") | |
print(f"{params=}") | |
run_process_with_params(bpmn, params) | |
recreate_file(args.raw) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment