Created
September 4, 2025 19:53
-
-
Save mtask/86cf48a3b80a5ff93096bc72f730e9c8 to your computer and use it in GitHub Desktop.
Autocreate canvas with Obsidian based on entity like notes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import re | |
import json | |
import argparse | |
import sys | |
from typing import Optional, Dict, Set, List, Tuple | |
WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]") | |
ENTITY_COLOR_MAPPER = {"event": "1", "ip": "3", "domain":"4", "email": "5", "device": "6"} | |
def parse_wikilinks(text: str) -> List[str]: | |
"""Extract wikilink targets, stripping alias, headings, block refs, and .md.""" | |
out = [] | |
for raw in WIKILINK_RE.findall(text): | |
target = raw.split("|", 1)[0] | |
target = target.split("#", 1)[0].split("^", 1)[0] | |
target = target.strip() | |
if not target: | |
continue | |
if target.lower().endswith(".md"): | |
target = target[:-3] | |
out.append(target) | |
return out | |
def parse_frontmatter_entity(text: str) -> Optional[str]: | |
"""Extract 'entity' key from YAML frontmatter if present.""" | |
if not text.startswith("---"): | |
return None | |
lines = text.splitlines() | |
entity_val = None | |
i = 1 | |
while i < len(lines): | |
line = lines[i].strip() | |
if line == "---": | |
break | |
if line.lower().startswith("entity:"): | |
entity_val = line.split(":", 1)[1].strip() | |
i += 1 | |
return entity_val or None | |
def _rel_to_abs(vault: str, rel: str) -> str: | |
return os.path.normpath(os.path.join(vault, *rel.split("/"))) | |
def _abs_to_rel(vault: str, abs_path: str) -> str: | |
rel = os.path.relpath(abs_path, vault) | |
return rel.replace(os.sep, "/") | |
class Resolver: | |
"""Resolve wikilinks on-demand using DFS preference, cache results.""" | |
def __init__(self, vault: str, verbose: bool=False): | |
self.vault = os.path.abspath(vault) | |
self.cache: Dict[Tuple[str, str], Optional[str]] = {} | |
self.verbose = verbose | |
def _log(self, *a, **k): | |
if self.verbose: | |
print(*a, **k, file=sys.stderr) | |
def resolve(self, link: str, current_dir_rel: str) -> Optional[str]: | |
key = (link, current_dir_rel or "") | |
if key in self.cache: | |
return self.cache[key] | |
link_rel_candidate = link if link.endswith(".md") else link + ".md" | |
link_rel_candidate = link_rel_candidate.replace("\\", "/") | |
# 1) Path-style link | |
if "/" in link_rel_candidate: | |
abs_path = _rel_to_abs(self.vault, link_rel_candidate) | |
if os.path.isfile(abs_path): | |
rel = _abs_to_rel(self.vault, abs_path) | |
self.cache[key] = rel | |
return rel | |
# 2) Same directory as current file | |
if current_dir_rel: | |
candidate = (current_dir_rel + "/" + link_rel_candidate).lstrip("/") | |
abs_path = _rel_to_abs(self.vault, candidate) | |
if os.path.isfile(abs_path): | |
rel = _abs_to_rel(self.vault, abs_path) | |
self.cache[key] = rel | |
return rel | |
# 3) Vault root | |
abs_path = _rel_to_abs(self.vault, link_rel_candidate) | |
if os.path.isfile(abs_path): | |
rel = _abs_to_rel(self.vault, abs_path) | |
self.cache[key] = rel | |
return rel | |
# 4) Scan vault | |
for root, _, files in os.walk(self.vault): | |
for fn in files: | |
if fn == os.path.basename(link_rel_candidate): | |
abs_found = os.path.join(root, fn) | |
rel = _abs_to_rel(self.vault, abs_found) | |
self.cache[key] = rel | |
return rel | |
self.cache[key] = None | |
return None | |
def normalize_start_file_arg(vault: str, start: str) -> str: | |
if os.path.isabs(start): | |
start_abs = os.path.normpath(start) | |
rel = os.path.relpath(start_abs, vault).replace(os.sep, "/") | |
else: | |
rel = start.replace("\\", "/") | |
if not rel.lower().endswith(".md"): | |
rel = rel + ".md" | |
return rel.lstrip("./") | |
def crawl(vault: str, start_rel: str, resolver: Resolver) -> Tuple[Set[str], List[Tuple[str, str]], Dict[str,str]]: | |
visited: Set[str] = set() | |
edges_list: List[Tuple[str, str]] = [] | |
edges_seen: Set[frozenset] = set() # undirected edge dedup | |
entities: Dict[str,str] = {} | |
def rel_to_abs(rel: str) -> str: | |
return _rel_to_abs(vault, rel) | |
def dfs(rel: str): | |
if rel in visited: | |
return | |
visited.add(rel) | |
abs_path = rel_to_abs(rel) | |
if not os.path.isfile(abs_path): | |
return | |
try: | |
with open(abs_path, "r", encoding="utf-8") as fh: | |
content = fh.read() | |
except Exception: | |
return | |
# parse entity | |
ent = parse_frontmatter_entity(content) | |
if ent: | |
entities[rel] = ent | |
current_dir_rel = os.path.dirname(rel) | |
for link in parse_wikilinks(content): | |
target = resolver.resolve(link, current_dir_rel) | |
if not target: | |
continue | |
edge_pair = frozenset({rel, target}) | |
if edge_pair not in edges_seen: | |
edges_seen.add(edge_pair) | |
edges_list.append((rel, target)) # DFS order | |
dfs(target) | |
dfs(start_rel) | |
edges = edges_list # keep DFS order | |
return visited, edges, entities | |
def make_canvas_json(nodes: Set[str], edges: List[Tuple[str, str]], entities: Dict[str,str]) -> dict: | |
nodes_sorted = sorted(nodes) | |
canvas = {"nodes": [], "edges": []} | |
spacing = 600 | |
cols = 5 | |
for i, node in enumerate(nodes_sorted): | |
if node in entities: | |
_ent = entities[node] | |
else: | |
_ent = None | |
if _ent and _ent in ENTITY_COLOR_MAPPER: | |
color = ENTITY_COLOR_MAPPER[_ent] | |
else: | |
color = "0" | |
canvas["nodes"].append({ | |
"id": node, | |
"type": "file", | |
"file": node, | |
"x": (i % cols) * spacing, | |
"y": (i // cols) * spacing, | |
"width": 500, | |
"height": 500, | |
"color": color | |
}) | |
for i, (src, dst) in enumerate(edges): | |
edge_obj = { | |
"id": f"e{i:06d}", | |
"fromNode": src, | |
"fromSide": "right", | |
"toNode": dst, | |
"toSide": "left" | |
} | |
if dst in entities: | |
edge_obj["label"] = entities[dst] | |
canvas["edges"].append(edge_obj) | |
return canvas | |
def main(argv=None): | |
p = argparse.ArgumentParser() | |
p.add_argument("-f", "--file", required=True) | |
p.add_argument("-o", "--output", required=True) | |
p.add_argument("--verbose", action="store_true") | |
args = p.parse_args(argv) | |
vault = os.path.abspath(os.path.curdir) | |
if ".obsidian" not in os.listdir('.'): | |
print("[!] CWD needs to be root of the related Obsidian vault") | |
sys.exit(1) | |
start_rel = normalize_start_file_arg(vault, args.file) | |
start_abs = _rel_to_abs(vault, start_rel) | |
if not os.path.isfile(start_abs): | |
sys.exit(f"Start file not found: {start_rel}") | |
resolver = Resolver(vault, verbose=args.verbose) | |
nodes, edges, entities = crawl(vault, start_rel, resolver) | |
canvas = make_canvas_json(nodes, edges, entities) | |
with open(args.output, "w", encoding="utf-8") as outfh: | |
json.dump(canvas, outfh, indent=2) | |
print(f"Written {args.output} — nodes: {len(nodes)} edges: {len(edges)}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment