Skip to content

Instantly share code, notes, and snippets.

@aemr3
Forked from kennyfrc/apply_patch.py
Last active August 29, 2025 08:58
Show Gist options
  • Save aemr3/021f31d2bd86e78a29e59553fb210883 to your computer and use it in GitHub Desktop.
Save aemr3/021f31d2bd86e78a29e59553fb210883 to your computer and use it in GitHub Desktop.
`apply_patch` implementation for use with GPT-4.1+. initially sourced from the openai cookbook, extended by using the rust `apply_patch` implementation in `openai/codex` as inspiration (it has better feedback with A,M,D updates)
#!/usr/bin/env -S uv run --quiet --script
# /// script
# dependencies = ["pydantic>=2.0.0"]
# ///
import os
from collections.abc import Callable
from enum import Enum
from pydantic import BaseModel, Field
class ActionType(str, Enum):
ADD = "add"
DELETE = "delete"
UPDATE = "update"
class FileChange(BaseModel):
type: ActionType
old_content: str | None = None
new_content: str | None = None
move_path: str | None = None
class Commit(BaseModel):
changes: dict[str, FileChange] = Field(default_factory=dict)
def assemble_changes(
orig: dict[str, str | None], dest: dict[str, str | None]
) -> Commit:
commit = Commit()
for path in sorted(set(orig.keys()).union(dest.keys())):
old_content = orig.get(path)
new_content = dest.get(path)
if old_content != new_content:
if old_content is not None and new_content is not None:
commit.changes[path] = FileChange(
type=ActionType.UPDATE,
old_content=old_content,
new_content=new_content,
)
elif new_content:
commit.changes[path] = FileChange(
type=ActionType.ADD,
new_content=new_content,
)
elif old_content:
commit.changes[path] = FileChange(
type=ActionType.DELETE,
old_content=old_content,
)
else:
raise AssertionError("Both old and new content are None")
return commit
class Chunk(BaseModel):
orig_index: int = -1 # line index of the first line in the original file
del_lines: list[str] = Field(default_factory=list)
ins_lines: list[str] = Field(default_factory=list)
class PatchAction(BaseModel):
type: ActionType
new_file: str | None = None
chunks: list[Chunk] = Field(default_factory=list)
move_path: str | None = None
class Patch(BaseModel):
actions: dict[str, PatchAction] = Field(default_factory=dict)
class Parser(BaseModel):
current_files: dict[str, str] = Field(default_factory=dict)
lines: list[str] = Field(default_factory=list)
index: int = 0
patch: Patch = Field(default_factory=Patch)
fuzz: int = 0
def is_done(self, prefixes: tuple[str, ...] | None = None) -> bool:
if self.index >= len(self.lines):
return True
if prefixes and self.lines[self.index].startswith(prefixes):
return True
return False
def startswith(self, prefix: str | tuple[str, ...]) -> bool:
assert self.index < len(self.lines), f"Index: {self.index} >= {len(self.lines)}"
if self.lines[self.index].startswith(prefix):
return True
return False
def read_str(self, prefix: str = "", return_everything: bool = False) -> str:
assert self.index < len(self.lines), f"Index: {self.index} >= {len(self.lines)}"
if self.lines[self.index].startswith(prefix):
if return_everything:
text = self.lines[self.index]
else:
text = self.lines[self.index][len(prefix) :]
self.index += 1
return text
return ""
def parse(self):
while not self.is_done(("*** End Patch",)):
path = self.read_str("*** Update File: ")
if path:
if path in self.patch.actions:
raise DiffError(f"Update File Error: Duplicate Path: {path}")
move_to = self.read_str("*** Move to: ")
if path not in self.current_files:
raise DiffError(f"Update File Error: Missing File: {path}")
text = self.current_files[path]
action = self.parse_update_file(text)
# TODO: Check move_to is valid
action.move_path = move_to
self.patch.actions[path] = action
continue
path = self.read_str("*** Delete File: ")
if path:
if path in self.patch.actions:
raise DiffError(f"Delete File Error: Duplicate Path: {path}")
self.patch.actions[path] = PatchAction(
type=ActionType.DELETE,
)
continue
path = self.read_str("*** Add File: ")
if path:
if path in self.patch.actions:
raise DiffError(f"Add File Error: Duplicate Path: {path}")
self.patch.actions[path] = self.parse_add_file()
continue
raise DiffError(f"Unknown Line: {self.lines[self.index]}")
if not self.startswith(("*** End Patch",)):
raise DiffError("Missing End Patch")
self.index += 1
def parse_update_file(self, text: str) -> PatchAction:
# self.lines / self.index refers to the patch
# lines / index refers to the file being modified
# print("parse update file")
action = PatchAction(
type=ActionType.UPDATE,
)
lines = text.split("\n")
index = 0
while not self.is_done(
(
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
"*** End of File",
)
):
def_str = self.read_str("@@ ")
section_str = ""
if not def_str:
if self.lines[self.index] == "@@":
section_str = self.lines[self.index]
self.index += 1
if not (def_str or section_str or index == 0):
raise DiffError(f"Invalid Line:\n{self.lines[self.index]}")
if def_str.strip():
found = False
if not [s for s in lines[:index] if s == def_str]:
# def str is a skip ahead operator
for i, s in enumerate(lines[index:], index):
if s == def_str:
# print(f"Jump ahead @@: {index} -> {i}: {def_str}")
index = i + 1
found = True
break
if not found and not [
s for s in lines[:index] if s.strip() == def_str.strip()
]:
# def str is a skip ahead operator
for i, s in enumerate(lines[index:], index):
if s.strip() == def_str.strip():
# print(f"Jump ahead @@: {index} -> {i}: {def_str}")
index = i + 1
self.fuzz += 1
found = True
break
next_chunk_context, chunks, end_patch_index, eof = peek_next_section(
self.lines, self.index
)
next_chunk_text = "\n".join(next_chunk_context)
new_index, fuzz = find_context(lines, next_chunk_context, index, eof)
if new_index == -1:
if eof:
raise DiffError(f"Invalid EOF Context {index}:\n{next_chunk_text}")
else:
raise DiffError(f"Invalid Context {index}:\n{next_chunk_text}")
self.fuzz += fuzz
# print(f"Jump ahead: {index} -> {new_index}")
for ch in chunks:
ch.orig_index += new_index
action.chunks.append(ch)
index = new_index + len(next_chunk_context)
self.index = end_patch_index
continue
return action
def parse_add_file(self) -> PatchAction:
lines = []
while not self.is_done(
("*** End Patch", "*** Update File:", "*** Delete File:", "*** Add File:")
):
s = self.read_str()
if not s.startswith("+"):
raise DiffError(f"Invalid Add File Line: {s}")
s = s[1:]
lines.append(s)
return PatchAction(
type=ActionType.ADD,
new_file="\n".join(lines),
)
def find_context_core(
lines: list[str], context: list[str], start: int
) -> tuple[int, int]:
if not context:
print("context is empty")
return start, 0
# Prefer identical
for i in range(start, len(lines)):
if lines[i : i + len(context)] == context:
return i, 0
# RStrip is ok
for i in range(start, len(lines)):
if [s.rstrip() for s in lines[i : i + len(context)]] == [
s.rstrip() for s in context
]:
return i, 1
# Fine, Strip is ok too.
for i in range(start, len(lines)):
if [s.strip() for s in lines[i : i + len(context)]] == [
s.strip() for s in context
]:
return i, 100
return -1, 0
def find_context(
lines: list[str], context: list[str], start: int, eof: bool
) -> tuple[int, int]:
if eof:
new_index, fuzz = find_context_core(lines, context, len(lines) - len(context))
if new_index != -1:
return new_index, fuzz
new_index, fuzz = find_context_core(lines, context, start)
return new_index, fuzz + 10000
return find_context_core(lines, context, start)
def peek_next_section(
lines: list[str], index: int
) -> tuple[list[str], list[Chunk], int, bool]:
old: list[str] = []
del_lines: list[str] = []
ins_lines: list[str] = []
chunks: list[Chunk] = []
mode = "keep"
orig_index = index
while index < len(lines):
s = lines[index]
if s.startswith(
(
"@@",
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
"*** End of File",
)
):
break
if s == "***":
break
elif s.startswith("***"):
raise DiffError(f"Invalid Line: {s}")
index += 1
last_mode = mode
if s == "":
s = " "
if s[0] == "+":
mode = "add"
elif s[0] == "-":
mode = "delete"
elif s[0] == " ":
mode = "keep"
else:
raise DiffError(f"Invalid Line: {s}")
s = s[1:]
if mode == "keep" and last_mode != mode:
if ins_lines or del_lines:
chunks.append(
Chunk(
orig_index=len(old) - len(del_lines),
del_lines=del_lines,
ins_lines=ins_lines,
)
)
del_lines = []
ins_lines = []
if mode == "delete":
del_lines.append(s)
old.append(s)
elif mode == "add":
ins_lines.append(s)
elif mode == "keep":
old.append(s)
if ins_lines or del_lines:
chunks.append(
Chunk(
orig_index=len(old) - len(del_lines),
del_lines=del_lines,
ins_lines=ins_lines,
)
)
del_lines = []
ins_lines = []
if index < len(lines) and lines[index] == "*** End of File":
index += 1
return old, chunks, index, True
if index == orig_index:
raise DiffError(f"Nothing in this section - {index=} {lines[index]}")
return old, chunks, index, False
def text_to_patch(text: str, orig: dict[str, str]) -> tuple[Patch, int]:
lines = text.strip().split("\n")
if (
len(lines) < 2
or not lines[0].startswith("*** Begin Patch")
or lines[-1] != "*** End Patch"
):
raise DiffError("Invalid patch text")
parser = Parser(
current_files=orig,
lines=lines,
index=1,
)
parser.parse()
return parser.patch, parser.fuzz
def identify_files_needed(text: str) -> list[str]:
lines = text.strip().split("\n")
result = set()
for line in lines:
if line.startswith("*** Update File: "):
result.add(line[len("*** Update File: ") :])
return list(result)
def _get_updated_file(text: str, action: PatchAction, path: str) -> str:
assert action.type == ActionType.UPDATE
orig_lines = text.split("\n")
dest_lines = []
orig_index = 0
dest_index = 0
for chunk in action.chunks:
# Process the unchanged lines before the chunk
if chunk.orig_index > len(orig_lines):
print(
f"_get_updated_file: {path}: chunk.orig_index {chunk.orig_index} > len(lines) {len(orig_lines)}"
)
raise DiffError(
f"_get_updated_file: {path}: chunk.orig_index {chunk.orig_index} > len(lines) {len(orig_lines)}"
)
if orig_index > chunk.orig_index:
raise DiffError(
f"_get_updated_file: {path}: orig_index {orig_index} > chunk.orig_index {chunk.orig_index}"
)
assert orig_index <= chunk.orig_index
dest_lines.extend(orig_lines[orig_index : chunk.orig_index])
delta = chunk.orig_index - orig_index
orig_index += delta
dest_index += delta
# Process the inserted lines
if chunk.ins_lines:
for i in range(len(chunk.ins_lines)):
dest_lines.append(chunk.ins_lines[i])
dest_index += len(chunk.ins_lines)
orig_index += len(chunk.del_lines)
# Final part
dest_lines.extend(orig_lines[orig_index:])
delta = len(orig_lines) - orig_index
orig_index += delta
dest_index += delta
assert orig_index == len(orig_lines)
assert dest_index == len(dest_lines)
return "\n".join(dest_lines)
def patch_to_commit(patch: Patch, orig: dict[str, str]) -> Commit:
commit = Commit()
for path, action in patch.actions.items():
if action.type == ActionType.DELETE:
# For delete operations, we don't need the original content from orig
commit.changes[path] = FileChange(type=ActionType.DELETE, old_content=None)
elif action.type == ActionType.ADD:
commit.changes[path] = FileChange(
type=ActionType.ADD, new_content=action.new_file
)
elif action.type == ActionType.UPDATE:
new_content = _get_updated_file(text=orig[path], action=action, path=path)
commit.changes[path] = FileChange(
type=ActionType.UPDATE,
old_content=orig[path],
new_content=new_content,
move_path=action.move_path,
)
return commit
class DiffError(ValueError):
pass
class ApplyPatchError(Exception):
"""Base class for apply patch errors"""
pass
class ParseError(ApplyPatchError):
"""Error parsing patch text"""
pass
class IoError(ApplyPatchError):
"""I/O error during patch application"""
def __init__(self, context: str, source: Exception):
self.context = context
self.source = source
super().__init__(f"{context}: {source}")
class ComputeReplacementsError(ApplyPatchError):
"""Error computing replacements for patch chunks"""
pass
def load_files(paths: list[str], open_fn: Callable) -> dict[str, str]:
orig = {}
for path in paths:
orig[path] = open_fn(path)
return orig
def apply_commit(commit: Commit, write_fn: Callable, remove_fn: Callable) -> None:
for path, change in commit.changes.items():
if change.type == ActionType.DELETE:
remove_fn(path)
elif change.type == ActionType.ADD:
write_fn(path, change.new_content)
elif change.type == ActionType.UPDATE:
if change.move_path:
write_fn(change.move_path, change.new_content)
remove_fn(path)
else:
write_fn(path, change.new_content)
def process_patch(
text: str, open_fn: Callable, write_fn: Callable, remove_fn: Callable
) -> dict[str, list[str]]:
"""
Process a patch and return a summary of changes made.
Returns dict with keys: added, modified, deleted
"""
if not text.startswith("*** Begin Patch"):
raise ParseError("Patch must start with '*** Begin Patch'")
paths = identify_files_needed(text)
orig = load_files(paths, open_fn)
patch, fuzz = text_to_patch(text, orig)
commit = patch_to_commit(patch, orig)
# Track changes for summary
changes = {"added": [], "modified": [], "deleted": []}
for path, change in commit.changes.items():
if change.type == ActionType.DELETE:
remove_fn(path)
changes["deleted"].append(path)
elif change.type == ActionType.ADD:
write_fn(path, change.new_content)
changes["added"].append(path)
elif change.type == ActionType.UPDATE:
if change.move_path:
write_fn(change.move_path, change.new_content)
remove_fn(path)
changes["modified"].append(change.move_path)
else:
write_fn(path, change.new_content)
changes["modified"].append(path)
return changes
def open_file(path: str) -> str:
with open(path) as f:
return f.read()
def write_file(path: str, content: str) -> None:
if path.startswith("/"):
print("We do not support absolute paths.")
return
if "/" in path:
parent = "/".join(path.split("/")[:-1])
os.makedirs(parent, exist_ok=True)
with open(path, "w") as f:
f.write(content)
def remove_file(path: str) -> None:
os.remove(path)
def main():
import sys
# Support help flags so users can run `apply_patch --help` or `apply_patch -h`.
if len(sys.argv) > 1 and sys.argv[1] in ("-h", "--help"):
print(
"""usage: apply_patch [OPTIONS]
Reads a patch from STDIN and applies it.
Examples:
apply_patch < patch.txt
cat patch.txt | apply_patch
Patch format summary:
*** Begin Patch
[*** Update File: <path>]
Optional: *** Move to: <new-path>
One or more hunks follow. Each hunk contains lines prefixed with a
single character:
' ' (space) - context/unchanged line
'-' - line removed from original
'+' - line added to destination
A hunk is preceded by a marker starting with '@@ ' (or a bare '@@').
Optionally a '*** End of File' line may be used to indicate EOF context.
[*** Add File: <path>]
Lines for the new file must each start with '+'. These lines are joined
(without the leading '+') to form the file contents.
[*** Delete File: <path>]
*** End Patch
Notes:
- Multiple operations (add/update/delete) may appear in a single patch.
- Update sections may contain multiple hunks; context lines are used to
locate where to apply the changes in the original file.
- After applying, the script prints a summary with lines prefixed by
'A' (added), 'M' (modified), and 'D' (deleted).
Options:
-h, --help Show this message and exit
"""
)
return 0
patch_text = sys.stdin.read()
if not patch_text:
print("Please pass patch text through stdin")
return 1
try:
changes = process_patch(patch_text, open_file, write_file, remove_file)
# Print summary like Rust version
if not any(changes.values()):
print("No files were modified.")
return 0
print("Success. Updated the following files:")
for path in changes["added"]:
print(f"A {path}")
for path in changes["modified"]:
print(f"M {path}")
for path in changes["deleted"]:
print(f"D {path}")
return 0
except ApplyPatchError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Unexpected error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
main()

Installation Guide for apply_patch.py

Quick Install

Download the script

curl -O https://gist.githubusercontent.com/aemr3/021f31d2bd86e78a29e59553fb210883/raw/apply_patch.py

Make executable and install

chmod +x apply_patch.py
sudo cp apply_patch.py /usr/local/bin/apply_patch # macos. pls google for linux/windows

Dependencies

This requires uv in your system, check the installation steps here: https://docs.astral.sh/uv/getting-started/installation/

Usage

apply_patch --help

Thanks

Original implementation: https://gist.github.com/kennyfrc/09993710b9b60a5544b1f3a802f751ec

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment