Skip to content

Instantly share code, notes, and snippets.

@kennyfrc
Last active September 28, 2025 17:50
Show Gist options
  • Save kennyfrc/09993710b9b60a5544b1f3a802f751ec to your computer and use it in GitHub Desktop.
Save kennyfrc/09993710b9b60a5544b1f3a802f751ec to your computer and use it in GitHub Desktop.
`apply_patch` implementation for use with GPT-4.1+. initially sourced from the openai cookbook, extended by using the rust `apply_patch` implementation in `openai/codex` as inspiration (it has better feedback with A,M,D updates)
#!/usr/bin/env python3
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class ActionType(str, Enum):
ADD = "add"
DELETE = "delete"
UPDATE = "update"
class FileChange(BaseModel):
type: ActionType
old_content: Optional[str] = None
new_content: Optional[str] = None
move_path: Optional[str] = None
class Commit(BaseModel):
changes: dict[str, FileChange] = Field(default_factory=dict)
def assemble_changes(
orig: dict[str, Optional[str]], dest: dict[str, Optional[str]]
) -> Commit:
commit = Commit()
for path in sorted(set(orig.keys()).union(dest.keys())):
old_content = orig.get(path)
new_content = dest.get(path)
if old_content != new_content:
if old_content is not None and new_content is not None:
commit.changes[path] = FileChange(
type=ActionType.UPDATE,
old_content=old_content,
new_content=new_content,
)
elif new_content:
commit.changes[path] = FileChange(
type=ActionType.ADD,
new_content=new_content,
)
elif old_content:
commit.changes[path] = FileChange(
type=ActionType.DELETE,
old_content=old_content,
)
else:
assert False
return commit
from pydantic import BaseModel, Field
class Chunk(BaseModel):
orig_index: int = -1 # line index of the first line in the original file
del_lines: list[str] = Field(default_factory=list)
ins_lines: list[str] = Field(default_factory=list)
class PatchAction(BaseModel):
type: ActionType
new_file: Optional[str] = None
chunks: list[Chunk] = Field(default_factory=list)
move_path: Optional[str] = None
class Patch(BaseModel):
actions: dict[str, PatchAction] = Field(default_factory=dict)
from pydantic import BaseModel, Field
class Parser(BaseModel):
current_files: dict[str, str] = Field(default_factory=dict)
lines: list[str] = Field(default_factory=list)
index: int = 0
patch: Patch = Field(default_factory=Patch)
fuzz: int = 0
def is_done(self, prefixes: Optional[tuple[str, ...]] = None) -> bool:
if self.index >= len(self.lines):
return True
if prefixes and self.lines[self.index].startswith(prefixes):
return True
return False
def startswith(self, prefix: Optional[tuple[str, ...]]) -> bool:
assert self.index < len(self.lines), f"Index: {self.index} >= {len(self.lines)}"
if self.lines[self.index].startswith(prefix):
return True
return False
def read_str(self, prefix: str = "", return_everything: bool = False) -> str:
assert self.index < len(self.lines), f"Index: {self.index} >= {len(self.lines)}"
if self.lines[self.index].startswith(prefix):
if return_everything:
text = self.lines[self.index]
else:
text = self.lines[self.index][len(prefix) :]
self.index += 1
return text
return ""
def parse(self):
while not self.is_done(("*** End Patch",)):
path = self.read_str("*** Update File: ")
if path:
if path in self.patch.actions:
raise DiffError(f"Update File Error: Duplicate Path: {path}")
move_to = self.read_str("*** Move to: ")
if path not in self.current_files:
raise DiffError(f"Update File Error: Missing File: {path}")
text = self.current_files[path]
action = self.parse_update_file(text)
# TODO: Check move_to is valid
action.move_path = move_to
self.patch.actions[path] = action
continue
path = self.read_str("*** Delete File: ")
if path:
if path in self.patch.actions:
raise DiffError(f"Delete File Error: Duplicate Path: {path}")
self.patch.actions[path] = PatchAction(
type=ActionType.DELETE,
)
continue
path = self.read_str("*** Add File: ")
if path:
if path in self.patch.actions:
raise DiffError(f"Add File Error: Duplicate Path: {path}")
self.patch.actions[path] = self.parse_add_file()
continue
raise DiffError(f"Unknown Line: {self.lines[self.index]}")
if not self.startswith("*** End Patch"):
raise DiffError("Missing End Patch")
self.index += 1
def parse_update_file(self, text: str) -> PatchAction:
# self.lines / self.index refers to the patch
# lines / index refers to the file being modified
# print("parse update file")
action = PatchAction(
type=ActionType.UPDATE,
)
lines = text.split("\n")
index = 0
while not self.is_done(
(
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
"*** End of File",
)
):
def_str = self.read_str("@@ ")
section_str = ""
if not def_str:
if self.lines[self.index] == "@@":
section_str = self.lines[self.index]
self.index += 1
if not (def_str or section_str or index == 0):
raise DiffError(f"Invalid Line:\n{self.lines[self.index]}")
if def_str.strip():
found = False
if not [s for s in lines[:index] if s == def_str]:
# def str is a skip ahead operator
for i, s in enumerate(lines[index:], index):
if s == def_str:
# print(f"Jump ahead @@: {index} -> {i}: {def_str}")
index = i + 1
found = True
break
if not found and not [
s for s in lines[:index] if s.strip() == def_str.strip()
]:
# def str is a skip ahead operator
for i, s in enumerate(lines[index:], index):
if s.strip() == def_str.strip():
# print(f"Jump ahead @@: {index} -> {i}: {def_str}")
index = i + 1
self.fuzz += 1
found = True
break
next_chunk_context, chunks, end_patch_index, eof = peek_next_section(
self.lines, self.index
)
next_chunk_text = "\n".join(next_chunk_context)
new_index, fuzz = find_context(lines, next_chunk_context, index, eof)
if new_index == -1:
if eof:
raise DiffError(f"Invalid EOF Context {index}:\n{next_chunk_text}")
else:
raise DiffError(f"Invalid Context {index}:\n{next_chunk_text}")
self.fuzz += fuzz
# print(f"Jump ahead: {index} -> {new_index}")
for ch in chunks:
ch.orig_index += new_index
action.chunks.append(ch)
index = new_index + len(next_chunk_context)
self.index = end_patch_index
continue
return action
def parse_add_file(self) -> PatchAction:
lines = []
while not self.is_done(
("*** End Patch", "*** Update File:", "*** Delete File:", "*** Add File:")
):
s = self.read_str()
if not s.startswith("+"):
raise DiffError(f"Invalid Add File Line: {s}")
s = s[1:]
lines.append(s)
return PatchAction(
type=ActionType.ADD,
new_file="\n".join(lines),
)
def find_context_core(
lines: list[str], context: list[str], start: int
) -> tuple[int, int]:
if not context:
print("context is empty")
return start, 0
# Prefer identical
for i in range(start, len(lines)):
if lines[i : i + len(context)] == context:
return i, 0
# RStrip is ok
for i in range(start, len(lines)):
if [s.rstrip() for s in lines[i : i + len(context)]] == [
s.rstrip() for s in context
]:
return i, 1
# Fine, Strip is ok too.
for i in range(start, len(lines)):
if [s.strip() for s in lines[i : i + len(context)]] == [
s.strip() for s in context
]:
return i, 100
return -1, 0
def find_context(
lines: list[str], context: list[str], start: int, eof: bool
) -> tuple[int, int]:
if eof:
new_index, fuzz = find_context_core(lines, context, len(lines) - len(context))
if new_index != -1:
return new_index, fuzz
new_index, fuzz = find_context_core(lines, context, start)
return new_index, fuzz + 10000
return find_context_core(lines, context, start)
def peek_next_section(
lines: list[str], index: int
) -> tuple[list[str], list[Chunk], int, bool]:
old: list[str] = []
del_lines: list[str] = []
ins_lines: list[str] = []
chunks: list[Chunk] = []
mode = "keep"
orig_index = index
while index < len(lines):
s = lines[index]
if s.startswith(
(
"@@",
"*** End Patch",
"*** Update File:",
"*** Delete File:",
"*** Add File:",
"*** End of File",
)
):
break
if s == "***":
break
elif s.startswith("***"):
raise DiffError(f"Invalid Line: {s}")
index += 1
last_mode = mode
if s == "":
s = " "
if s[0] == "+":
mode = "add"
elif s[0] == "-":
mode = "delete"
elif s[0] == " ":
mode = "keep"
else:
raise DiffError(f"Invalid Line: {s}")
s = s[1:]
if mode == "keep" and last_mode != mode:
if ins_lines or del_lines:
chunks.append(
Chunk(
orig_index=len(old) - len(del_lines),
del_lines=del_lines,
ins_lines=ins_lines,
)
)
del_lines = []
ins_lines = []
if mode == "delete":
del_lines.append(s)
old.append(s)
elif mode == "add":
ins_lines.append(s)
elif mode == "keep":
old.append(s)
if ins_lines or del_lines:
chunks.append(
Chunk(
orig_index=len(old) - len(del_lines),
del_lines=del_lines,
ins_lines=ins_lines,
)
)
del_lines = []
ins_lines = []
if index < len(lines) and lines[index] == "*** End of File":
index += 1
return old, chunks, index, True
if index == orig_index:
raise DiffError(f"Nothing in this section - {index=} {lines[index]}")
return old, chunks, index, False
def text_to_patch(text: str, orig: dict[str, str]) -> tuple[Patch, int]:
lines = text.strip().split("\n")
if (
len(lines) < 2
or not lines[0].startswith("*** Begin Patch")
or lines[-1] != "*** End Patch"
):
raise DiffError("Invalid patch text")
parser = Parser(
current_files=orig,
lines=lines,
index=1,
)
parser.parse()
return parser.patch, parser.fuzz
def identify_files_needed(text: str) -> list[str]:
lines = text.strip().split("\n")
result = set()
for line in lines:
if line.startswith("*** Update File: "):
result.add(line[len("*** Update File: ") :])
return list(result)
def _get_updated_file(text: str, action: PatchAction, path: str) -> str:
assert action.type == ActionType.UPDATE
orig_lines = text.split("\n")
dest_lines = []
orig_index = 0
dest_index = 0
for chunk in action.chunks:
# Process the unchanged lines before the chunk
if chunk.orig_index > len(orig_lines):
print(
f"_get_updated_file: {path}: chunk.orig_index {chunk.orig_index} > len(lines) {len(orig_lines)}"
)
raise DiffError(
f"_get_updated_file: {path}: chunk.orig_index {chunk.orig_index} > len(lines) {len(orig_lines)}"
)
if orig_index > chunk.orig_index:
raise DiffError(
f"_get_updated_file: {path}: orig_index {orig_index} > chunk.orig_index {chunk.orig_index}"
)
assert orig_index <= chunk.orig_index
dest_lines.extend(orig_lines[orig_index : chunk.orig_index])
delta = chunk.orig_index - orig_index
orig_index += delta
dest_index += delta
# Process the inserted lines
if chunk.ins_lines:
for i in range(len(chunk.ins_lines)):
dest_lines.append(chunk.ins_lines[i])
dest_index += len(chunk.ins_lines)
orig_index += len(chunk.del_lines)
# Final part
dest_lines.extend(orig_lines[orig_index:])
delta = len(orig_lines) - orig_index
orig_index += delta
dest_index += delta
assert orig_index == len(orig_lines)
assert dest_index == len(dest_lines)
return "\n".join(dest_lines)
def patch_to_commit(patch: Patch, orig: dict[str, str]) -> Commit:
commit = Commit()
for path, action in patch.actions.items():
if action.type == ActionType.DELETE:
# For delete operations, we don't need the original content from orig
commit.changes[path] = FileChange(
type=ActionType.DELETE, old_content=None
)
elif action.type == ActionType.ADD:
commit.changes[path] = FileChange(
type=ActionType.ADD, new_content=action.new_file
)
elif action.type == ActionType.UPDATE:
new_content = _get_updated_file(text=orig[path], action=action, path=path)
commit.changes[path] = FileChange(
type=ActionType.UPDATE,
old_content=orig[path],
new_content=new_content,
move_path=action.move_path,
)
return commit
class DiffError(ValueError):
pass
class ApplyPatchError(Exception):
"""Base class for apply patch errors"""
pass
class ParseError(ApplyPatchError):
"""Error parsing patch text"""
pass
class IoError(ApplyPatchError):
"""I/O error during patch application"""
def __init__(self, context: str, source: Exception):
self.context = context
self.source = source
super().__init__(f"{context}: {source}")
class ComputeReplacementsError(ApplyPatchError):
"""Error computing replacements for patch chunks"""
pass
import os
from typing import Callable
def load_files(paths: list[str], open_fn: Callable) -> dict[str, str]:
orig = {}
for path in paths:
orig[path] = open_fn(path)
return orig
def apply_commit(commit: Commit, write_fn: Callable, remove_fn: Callable) -> None:
for path, change in commit.changes.items():
if change.type == ActionType.DELETE:
remove_fn(path)
elif change.type == ActionType.ADD:
write_fn(path, change.new_content)
elif change.type == ActionType.UPDATE:
if change.move_path:
write_fn(change.move_path, change.new_content)
remove_fn(path)
else:
write_fn(path, change.new_content)
def process_patch(
text: str, open_fn: Callable, write_fn: Callable, remove_fn: Callable
) -> dict[str, str]:
"""
Process a patch and return a summary of changes made.
Returns dict with keys: added, modified, deleted
"""
if not text.startswith("*** Begin Patch"):
raise ParseError("Patch must start with '*** Begin Patch'")
paths = identify_files_needed(text)
orig = load_files(paths, open_fn)
patch, fuzz = text_to_patch(text, orig)
commit = patch_to_commit(patch, orig)
# Track changes for summary
changes = {"added": [], "modified": [], "deleted": []}
for path, change in commit.changes.items():
if change.type == ActionType.DELETE:
remove_fn(path)
changes["deleted"].append(path)
elif change.type == ActionType.ADD:
write_fn(path, change.new_content)
changes["added"].append(path)
elif change.type == ActionType.UPDATE:
if change.move_path:
write_fn(change.move_path, change.new_content)
remove_fn(path)
changes["modified"].append(change.move_path)
else:
write_fn(path, change.new_content)
changes["modified"].append(path)
return changes
def open_file(path: str) -> str:
with open(path, "rt") as f:
return f.read()
def write_file(path: str, content: str) -> None:
if path.startswith("/"):
print("We do not support absolute paths.")
return
if "/" in path:
parent = "/".join(path.split("/")[:-1])
os.makedirs(parent, exist_ok=True)
with open(path, "wt") as f:
f.write(content)
def remove_file(path: str) -> None:
os.remove(path)
def main():
import sys
# Support help flags so users can run `apply_patch --help` or `apply_patch -h`.
if len(sys.argv) > 1 and sys.argv[1] in ("-h", "--help"):
print(
"""usage: apply_patch [OPTIONS]
Reads a patch from STDIN and applies it.
Examples:
apply_patch < patch.txt
cat patch.txt | apply_patch
Patch format summary:
*** Begin Patch
[*** Update File: <path>]
Optional: *** Move to: <new-path>
One or more hunks follow. Each hunk contains lines prefixed with a
single character:
' ' (space) - context/unchanged line
'-' - line removed from original
'+' - line added to destination
A hunk is preceded by a marker starting with '@@ ' (or a bare '@@').
Optionally a '*** End of File' line may be used to indicate EOF context.
[*** Add File: <path>]
Lines for the new file must each start with '+'. These lines are joined
(without the leading '+') to form the file contents.
[*** Delete File: <path>]
*** End Patch
Notes:
- Multiple operations (add/update/delete) may appear in a single patch.
- Update sections may contain multiple hunks; context lines are used to
locate where to apply the changes in the original file.
- After applying, the script prints a summary with lines prefixed by
'A' (added), 'M' (modified), and 'D' (deleted).
Options:
-h, --help Show this message and exit
"""
)
return 0
patch_text = sys.stdin.read()
if not patch_text:
print("Please pass patch text through stdin")
return 1
try:
changes = process_patch(patch_text, open_file, write_file, remove_file)
# Print summary like Rust version
if not any(changes.values()):
print("No files were modified.")
return 0
print("Success. Updated the following files:")
for path in changes["added"]:
print(f"A {path}")
for path in changes["modified"]:
print(f"M {path}")
for path in changes["deleted"]:
print(f"D {path}")
return 0
except ApplyPatchError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"Unexpected error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
main()

Installation Guide for apply_patch.py

Quick Install

Download the script

curl -O https://gist.githubusercontent.com/kennyfrc/09993710b9b60a5544b1f3a802f751ec/raw/apply_patch.py

Make executable and install

chmod +x apply_patch.py
sudo cp apply_patch.py /usr/local/bin/apply_patch # macos. pls google for linux/windows

Dependencies

pip install pydantic

Usage

apply_patch --help
@aemr3
Copy link

aemr3 commented Aug 29, 2025

#!/usr/bin/env -S uv run --quiet --script
# /// script
# dependencies = ["pydantic>=2.0.0"]
# ///

use these shebang lines so you won't need to pip install pydantic, works with uv

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment