Skip to content

Instantly share code, notes, and snippets.

@mtask
Created September 4, 2025 19:53
Show Gist options
  • Save mtask/86cf48a3b80a5ff93096bc72f730e9c8 to your computer and use it in GitHub Desktop.
Save mtask/86cf48a3b80a5ff93096bc72f730e9c8 to your computer and use it in GitHub Desktop.
Autocreate canvas with Obsidian based on entity like notes
#!/usr/bin/env python3
import os
import re
import json
import argparse
import sys
from typing import Optional, Dict, Set, List, Tuple
WIKILINK_RE = re.compile(r"\[\[([^\]]+)\]\]")
ENTITY_COLOR_MAPPER = {"event": "1", "ip": "3", "domain":"4", "email": "5", "device": "6"}
def parse_wikilinks(text: str) -> List[str]:
"""Extract wikilink targets, stripping alias, headings, block refs, and .md."""
out = []
for raw in WIKILINK_RE.findall(text):
target = raw.split("|", 1)[0]
target = target.split("#", 1)[0].split("^", 1)[0]
target = target.strip()
if not target:
continue
if target.lower().endswith(".md"):
target = target[:-3]
out.append(target)
return out
def parse_frontmatter_entity(text: str) -> Optional[str]:
"""Extract 'entity' key from YAML frontmatter if present."""
if not text.startswith("---"):
return None
lines = text.splitlines()
entity_val = None
i = 1
while i < len(lines):
line = lines[i].strip()
if line == "---":
break
if line.lower().startswith("entity:"):
entity_val = line.split(":", 1)[1].strip()
i += 1
return entity_val or None
def _rel_to_abs(vault: str, rel: str) -> str:
return os.path.normpath(os.path.join(vault, *rel.split("/")))
def _abs_to_rel(vault: str, abs_path: str) -> str:
rel = os.path.relpath(abs_path, vault)
return rel.replace(os.sep, "/")
class Resolver:
"""Resolve wikilinks on-demand using DFS preference, cache results."""
def __init__(self, vault: str, verbose: bool=False):
self.vault = os.path.abspath(vault)
self.cache: Dict[Tuple[str, str], Optional[str]] = {}
self.verbose = verbose
def _log(self, *a, **k):
if self.verbose:
print(*a, **k, file=sys.stderr)
def resolve(self, link: str, current_dir_rel: str) -> Optional[str]:
key = (link, current_dir_rel or "")
if key in self.cache:
return self.cache[key]
link_rel_candidate = link if link.endswith(".md") else link + ".md"
link_rel_candidate = link_rel_candidate.replace("\\", "/")
# 1) Path-style link
if "/" in link_rel_candidate:
abs_path = _rel_to_abs(self.vault, link_rel_candidate)
if os.path.isfile(abs_path):
rel = _abs_to_rel(self.vault, abs_path)
self.cache[key] = rel
return rel
# 2) Same directory as current file
if current_dir_rel:
candidate = (current_dir_rel + "/" + link_rel_candidate).lstrip("/")
abs_path = _rel_to_abs(self.vault, candidate)
if os.path.isfile(abs_path):
rel = _abs_to_rel(self.vault, abs_path)
self.cache[key] = rel
return rel
# 3) Vault root
abs_path = _rel_to_abs(self.vault, link_rel_candidate)
if os.path.isfile(abs_path):
rel = _abs_to_rel(self.vault, abs_path)
self.cache[key] = rel
return rel
# 4) Scan vault
for root, _, files in os.walk(self.vault):
for fn in files:
if fn == os.path.basename(link_rel_candidate):
abs_found = os.path.join(root, fn)
rel = _abs_to_rel(self.vault, abs_found)
self.cache[key] = rel
return rel
self.cache[key] = None
return None
def normalize_start_file_arg(vault: str, start: str) -> str:
if os.path.isabs(start):
start_abs = os.path.normpath(start)
rel = os.path.relpath(start_abs, vault).replace(os.sep, "/")
else:
rel = start.replace("\\", "/")
if not rel.lower().endswith(".md"):
rel = rel + ".md"
return rel.lstrip("./")
def crawl(vault: str, start_rel: str, resolver: Resolver) -> Tuple[Set[str], List[Tuple[str, str]], Dict[str,str]]:
visited: Set[str] = set()
edges_list: List[Tuple[str, str]] = []
edges_seen: Set[frozenset] = set() # undirected edge dedup
entities: Dict[str,str] = {}
def rel_to_abs(rel: str) -> str:
return _rel_to_abs(vault, rel)
def dfs(rel: str):
if rel in visited:
return
visited.add(rel)
abs_path = rel_to_abs(rel)
if not os.path.isfile(abs_path):
return
try:
with open(abs_path, "r", encoding="utf-8") as fh:
content = fh.read()
except Exception:
return
# parse entity
ent = parse_frontmatter_entity(content)
if ent:
entities[rel] = ent
current_dir_rel = os.path.dirname(rel)
for link in parse_wikilinks(content):
target = resolver.resolve(link, current_dir_rel)
if not target:
continue
edge_pair = frozenset({rel, target})
if edge_pair not in edges_seen:
edges_seen.add(edge_pair)
edges_list.append((rel, target)) # DFS order
dfs(target)
dfs(start_rel)
edges = edges_list # keep DFS order
return visited, edges, entities
def make_canvas_json(nodes: Set[str], edges: List[Tuple[str, str]], entities: Dict[str,str]) -> dict:
nodes_sorted = sorted(nodes)
canvas = {"nodes": [], "edges": []}
spacing = 600
cols = 5
for i, node in enumerate(nodes_sorted):
if node in entities:
_ent = entities[node]
else:
_ent = None
if _ent and _ent in ENTITY_COLOR_MAPPER:
color = ENTITY_COLOR_MAPPER[_ent]
else:
color = "0"
canvas["nodes"].append({
"id": node,
"type": "file",
"file": node,
"x": (i % cols) * spacing,
"y": (i // cols) * spacing,
"width": 500,
"height": 500,
"color": color
})
for i, (src, dst) in enumerate(edges):
edge_obj = {
"id": f"e{i:06d}",
"fromNode": src,
"fromSide": "right",
"toNode": dst,
"toSide": "left"
}
if dst in entities:
edge_obj["label"] = entities[dst]
canvas["edges"].append(edge_obj)
return canvas
def main(argv=None):
p = argparse.ArgumentParser()
p.add_argument("-f", "--file", required=True)
p.add_argument("-o", "--output", required=True)
p.add_argument("--verbose", action="store_true")
args = p.parse_args(argv)
vault = os.path.abspath(os.path.curdir)
if ".obsidian" not in os.listdir('.'):
print("[!] CWD needs to be root of the related Obsidian vault")
sys.exit(1)
start_rel = normalize_start_file_arg(vault, args.file)
start_abs = _rel_to_abs(vault, start_rel)
if not os.path.isfile(start_abs):
sys.exit(f"Start file not found: {start_rel}")
resolver = Resolver(vault, verbose=args.verbose)
nodes, edges, entities = crawl(vault, start_rel, resolver)
canvas = make_canvas_json(nodes, edges, entities)
with open(args.output, "w", encoding="utf-8") as outfh:
json.dump(canvas, outfh, indent=2)
print(f"Written {args.output} — nodes: {len(nodes)} edges: {len(edges)}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment