Skip to content

Instantly share code, notes, and snippets.

@sergiobuj
Last active August 19, 2024 18:52
Show Gist options
  • Save sergiobuj/4fa8026a5545279d3e96996ecddfc818 to your computer and use it in GitHub Desktop.
Save sergiobuj/4fa8026a5545279d3e96996ecddfc818 to your computer and use it in GitHub Desktop.
Cli parse kv params / encode/decode file
#!/usr/bin/env python3
import argparse
import re
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from xml.etree import ElementTree
OUTPUT_NODE = "{*}output"
INPUT_NODE = "{*}input"
CONDITION_EXPRESSION = "{*}conditionExpression"
@dataclass
class BPMNObject:
_xml: dict
level: int
tag: str = field(init=False)
type: str = field(init=False)
id: str = field(init=False)
name: str = field(init=False)
source_ref: str = field(init=False)
target_ref: str = field(init=False)
inputs: list[dict] = field(init=False)
outputs: list[dict] = field(init=False)
def __post_init__(self):
self.tag = self._xml.tag
self.type = self.tag.split("}")[1]
self.id = self._xml.attrib.get("id")
self.name = self._xml.attrib.get("name")
self.source_ref = self._xml.attrib.get("sourceRef")
self.target_ref = self._xml.attrib.get("targetRef")
self.outputs = []
for output in self._xml.findall(f".//{OUTPUT_NODE}"):
self.outputs.append(output.attrib)
self.inputs = []
for input in self._xml.findall(f".//{INPUT_NODE}"):
self.inputs.append(input.attrib)
for condition in self._xml.findall(f".//{CONDITION_EXPRESSION}"):
self.inputs.append({"source": condition.text})
def __repr__(self) -> str:
return f"{self.level:3}{'\t' * self.level} {self.type} {self.id} inputs:{len(self.inputs)} outputs:{len(self.outputs)}"
def unsatisfied_inputs(inputs: list[dict], outputs: list[dict]) -> list[str]:
if not inputs:
return []
missing_inputs = []
for input in inputs:
src = re.sub("=", "", re.sub(r"\s+[0-9]*\s*", "", input["source"]))
satisfied = False
for output in outputs:
if output["target"] == src:
satisfied = True
if not satisfied:
missing_inputs.append(src)
return missing_inputs
def show_dependencies(graph, node_info):
to_visit = [("start_event", [])]
visited = set([])
while to_visit:
element, env = to_visit.pop(0)
visited.add(element)
node_data = node_info[element]
if missing_inputs := unsatisfied_inputs(node_data.inputs, env):
print(
f"{node_data.id} ({node_data.name}) has unknown inputs:",
*missing_inputs,
)
for child in graph[element]:
if child in visited:
continue
new_env = env + node_info[element].outputs
to_visit.insert(0, (child, new_env))
def should_skip(node: dict) -> bool:
skip_list = ("BPMNDiagram",)
return any(map(lambda x: x in node.tag, skip_list))
def parsebpmn(filepath: Path, debug: bool = False) -> tuple[dict, dict]:
tree = ElementTree.parse(filepath)
root = tree.getroot()
graph = defaultdict(set)
elements = {}
to_visit = [(root, 0)]
while to_visit:
xml_node, level = to_visit.pop(0)
if should_skip(xml_node):
continue
node = BPMNObject(_xml=xml_node, level=level)
if debug:
print(node)
if node.id:
elements[node.id] = node
if node.source_ref:
graph[node.source_ref].add(node.id)
graph[node.id].add(node.target_ref)
for child in xml_node:
to_visit.insert(0, (child, level + 1))
return graph, elements
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bpmn", type=Path, required=True)
args = parser.parse_args()
bpmn = args.bpmn
graph, elements = parsebpmn(bpmn, debug=False)
show_dependencies(graph, elements)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import argparse
import base64
import random
from pathlib import Path
def update_process(process_file: Path):
raw = ""
with open(process_file, "rb") as f:
raw = f.read()
encoded = base64.b64encode(raw)
with open("encoded.txt", "w") as f:
f.write(encoded.decode("utf-8"))
def create_instance(process_file: Path, params: dict):
pass
def run_process_with_params(process_file: Path, params: dict):
process = update_process(process_file)
instance = create_instance(process, params)
def _parse_kv_params(kvargs: list[str]) -> dict[str, str]:
values = {}
for kv in kvargs:
k, v = kv.split("=", maxsplit=1)
values[k] = v
return values
def recreate_file(encoded_bytes: str):
raw_bytes = base64.b64decode(encoded_bytes)
_id = random.randint(0, 100)
filename = f"flag_{_id}.png"
with open(filename, "wb") as f:
f.write(raw_bytes)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bpmn", type=Path, required=True)
parser.add_argument("-p", "--param", metavar="k=v", required=True, action="append")
parser.add_argument("--raw", type=str)
args = parser.parse_args()
bpmn = args.bpmn
params = _parse_kv_params(args.param)
print(f"{bpmn=}")
print(f"{params=}")
run_process_with_params(bpmn, params)
recreate_file(args.raw)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment