Last active
May 9, 2025 20:38
-
-
Save qpwo/4540d5f3867e9579f83443c62e376b56 to your computer and use it in GitHub Desktop.
inspect pylib
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
grab_docs.py: Auto-generate per-module and per-export docs. | |
- Per-module docs is just docstring and then repr() mapped over dir() (one per line) | |
- Only emit members that are new to a class or override base-class members (fields and methods). | |
- Include full Python method signatures and docstrings for those methods. | |
- Print all new/overridden class fields with their types and docstrings. | |
- For functions, output signature and docstring concisely. | |
- Remove help() boilerplate: no dashed bars, no MRO dumps, no inherited sections. | |
- Keep the format concise, Pythonic, and readable. | |
- Operate via reflection only (no parsing of C++ source). | |
- After generation, strip any lines that appear in more than 10% of all .md files to eliminate common boilerplate, and print those removed lines. | |
- Use a 16-thread background pool + queue for writing export docs, and another 16-thread pool + queue for module docs. Ensure writes complete before reading. | |
- Threads must be non-daemon. KISS. | |
""" | |
import os | |
import importlib | |
import pkgutil | |
import inspect | |
import threading | |
import queue | |
from collections import Counter | |
# === Configuration === | |
root_mod_name = "unreal" | |
outdir = os.path.expanduser("~/Downloads/pydocs") | |
def prnt(*args, **kwargs): | |
print(*args, **kwargs) | |
# === Simple I/O thread pool === | |
class IOPool: | |
def __init__(self, num_threads, worker_fn): | |
self.tasks = queue.Queue() | |
self.worker_fn = worker_fn | |
self.threads = [] | |
for _ in range(num_threads): | |
t = threading.Thread(target=self._run, daemon=False) | |
t.start() | |
self.threads.append(t) | |
def _run(self): | |
while True: | |
item = self.tasks.get() | |
if item is None: | |
break | |
try: | |
self.worker_fn(item) | |
except Exception as e: | |
prnt(f"IOPool error: {e}") | |
finally: | |
self.tasks.task_done() | |
def put(self, item): | |
self.tasks.put(item) | |
def join(self): | |
self.tasks.join() | |
for _ in self.threads: | |
self.tasks.put(None) | |
for t in self.threads: | |
t.join() | |
# === Module discovery === | |
def discover(name): | |
mod = importlib.import_module(name) | |
yield name, mod | |
if hasattr(mod, "__path__"): | |
for _, sub, _ in pkgutil.walk_packages(mod.__path__, name + "."): | |
try: | |
yield sub, importlib.import_module(sub) | |
except ImportError: | |
pass | |
# === Dump helpers === | |
def dump_class(cls, out_f): | |
bases = ", ".join(b.__name__ for b in cls.__bases__) | |
header = f"class {cls.__name__}({bases}):" if bases else f"class {cls.__name__}:" | |
out_f.write("# NOTE: only new/overridden methods & fields are shown.\n") | |
out_f.write(header + "\n") | |
doc = inspect.getdoc(cls) | |
if doc: | |
for line in doc.splitlines(): | |
out_f.write(f" {line.rstrip()}\n") | |
out_f.write("\n") | |
members = [] | |
for name, member in inspect.getmembers(cls): | |
if name.startswith('__') and name.endswith('__') and name != '__init__': | |
continue | |
defined = name in cls.__dict__ | |
override = False | |
base_name = None | |
if not defined: | |
for base in cls.__mro__[1:]: | |
if hasattr(base, name) and getattr(cls, name) is not getattr(base, name): | |
override = True | |
base_name = base.__name__ | |
break | |
if not (defined or override or name == '__init__'): | |
continue | |
kind = 'method' if (inspect.isroutine(member) | |
or inspect.isbuiltin(member) | |
or inspect.ismethoddescriptor(member)) else 'field' | |
members.append((kind, name, member, defined, override, base_name)) | |
# fields first | |
for kind, name, member, defined, override, base in members: | |
if kind != 'field': | |
continue | |
typ = type(member).__name__ | |
out_f.write(f" {name}: {typ}" + (f" # overrides {base}" if override else "") + "\n") | |
if defined: | |
fld_doc = inspect.getdoc(member) | |
if fld_doc: | |
for dl in fld_doc.splitlines(): | |
out_f.write(f" {dl.rstrip()}\n") | |
# then methods | |
for kind, name, member, defined, override, base in members: | |
if kind != 'method': | |
continue | |
try: | |
sig = str(inspect.signature(member)) | |
except Exception: | |
sig = '(...)' | |
out_f.write(f" def {name}{sig}" + (f" # overrides {base}" if override else "") + "\n") | |
if defined or override: | |
m_doc = inspect.getdoc(member) | |
if m_doc: | |
for ml in m_doc.splitlines(): | |
out_f.write(f" {ml.rstrip()}\n") | |
out_f.write("\n") | |
# === Worker functions === | |
def write_export(task): | |
mod_path, name, obj = task | |
ext, header = ('.class.txt', True) if inspect.isclass(obj) else ('.function.txt', False) | |
out_file = os.path.join(mod_path, name + ext) | |
os.makedirs(os.path.dirname(out_file), exist_ok=True) | |
prnt(f"writing export doc: {out_file}") | |
with open(out_file, 'w', encoding='utf-8') as f: | |
f.realwrite = f.write | |
def newwrite(s, *args, **kwargs): | |
return f.realwrite(s.replace('`', ''), *args, **kwargs) | |
f.write = newwrite | |
if header and inspect.isclass(obj): | |
dump_class(obj, f) | |
else: | |
try: | |
sig = str(inspect.signature(obj)) | |
except Exception: | |
sig = '(...)' | |
f.write(f"def {obj.__name__}{sig}:\n") | |
doc = inspect.getdoc(obj) | |
if doc: | |
for line in doc.splitlines(): | |
f.write(f" {line.rstrip()}\n") | |
f.write("\n") | |
def write_module(task): | |
mod_name, mod = task | |
# build and ensure module directory | |
mod_dir = os.path.join(outdir, *mod_name.split('.')) | |
os.makedirs(mod_dir, exist_ok=True) | |
# file named <module>.module.txt inside its dir | |
base = mod_name.split('.')[-1] | |
mod_file = os.path.join(mod_dir, f"{base}.module.txt") | |
prnt(f"writing module index: {mod_file}") | |
with open(mod_file, 'w', encoding='utf-8') as f: | |
# write module docstring | |
mod_doc = inspect.getdoc(mod) | |
if mod_doc: | |
for line in mod_doc.splitlines(): | |
f.write(f"{line.rstrip()}\n") | |
f.write("\n") | |
# then repr of members | |
for name in sorted(dir(mod)): | |
if name.startswith('_'): | |
continue | |
try: | |
rep = repr(getattr(mod, name)) | |
except Exception: | |
rep = '<unreprable>' | |
f.write(f"{mod_name}.{name} = {rep}\n") | |
# === Pools === | |
export_pool = IOPool(16, write_export) | |
module_pool = IOPool(16, write_module) | |
# === Main export generation === | |
for mod_name, mod in discover(root_mod_name): | |
module_pool.put((mod_name, mod)) | |
mod_path = os.path.join(outdir, *mod_name.split('.')) | |
for name in sorted(dir(mod)): | |
if name.startswith('_') and name != '__init__': | |
continue | |
obj = getattr(mod, name) | |
if inspect.isclass(obj) or inspect.isfunction(obj): | |
export_pool.put((mod_path, name, obj)) | |
# wait for docs | |
module_pool.join() | |
export_pool.join() | |
prnt("All docs written.") | |
# Optional post-processing (disabled) | |
if False: | |
# duplicate-line scanning and optional removal | |
all_files = [] | |
for root, _, files in os.walk(outdir): | |
for fname in files: | |
if fname.endswith(('.module.txt', '.class.txt', '.function.txt')): | |
all_files.append(os.path.join(root, fname)) | |
line_file_map = {} | |
line_counts = Counter() | |
char_counts = Counter() | |
results_queue = queue.Queue() | |
def read_file(path): | |
occ = Counter() | |
char_local = Counter() | |
with open(path, encoding='utf-8') as f: | |
for line in f: | |
ln = line.rstrip('\n') | |
if not ln: | |
continue | |
occ[ln] += 1 | |
char_local[ln] += len(ln) | |
results_queue.put((occ, char_local, path)) | |
read_pool = IOPool(16, read_file) | |
for p in all_files: | |
read_pool.put(p) | |
read_pool.join() | |
while not results_queue.empty(): | |
occ, char_local, path = results_queue.get() | |
for ln, cnt in occ.items(): | |
line_counts[ln] += cnt | |
char_counts[ln] += char_local[ln] | |
line_file_map.setdefault(ln, set()).add(path) | |
dupthresh = 500 | |
for ln, paths in line_file_map.items(): | |
if len(paths) > dupthresh: | |
prnt(f"Duplicate line '{ln}' in {len(paths)} files") | |
if False: | |
def remove_dups(path): | |
new = [] | |
with open(path, encoding='utf-8') as f: | |
for line in f: | |
if line.rstrip('\n') in line_file_map and len(line_file_map[line.rstrip('\n')]) > dupthresh: | |
continue | |
new.append(line) | |
with open(path, 'w', encoding='utf-8') as f: | |
f.writelines(new) | |
rm_pool = IOPool(16, remove_dups) | |
for p in all_files: | |
rm_pool.put(p) | |
rm_pool.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment