Skip to content

Instantly share code, notes, and snippets.

@killerstorm
Created March 26, 2023 17:04
Show Gist options
  • Save killerstorm/2296b282c818ffcfe4ceb729ce639911 to your computer and use it in GitHub Desktop.
Save killerstorm/2296b282c818ffcfe4ceb729ce639911 to your computer and use it in GitHub Desktop.
import re
from typing import List, Union, Dict, Optional
import yaml
import openai
import requests
MARKER_PREFIX = "///"
class CodeFragment:
def __init__(self, marker: str, description: str, content: str):
self.marker = marker
self.description = description
self.content = content
def dump(self) -> str:
return f"""{MARKER_PREFIX} BEGIN {self.marker}: {self.description}
{self.content}
{MARKER_PREFIX} END {self.marker}"""
def __repr__(self):
return f"CodeFragment({self.marker}, {self.description})"
class CodeFile:
def __init__(self, path: str, description: str, fragments: List[CodeFragment]):
self.path = path
self.description = description
self.fragments = fragments
def dump(self) -> str:
return f"""{MARKER_PREFIX} BEGIN_FILE {self.path}: {self.description}
{"".join(fragment.dump() for fragment in self.fragments)}
{MARKER_PREFIX} END_FILE {self.path}"""
def __repr__(self):
return f"CodeFile({self.path}, {len(self.fragments)} fragments)"
def parse_code(text: str) -> List[CodeFile]:
file_pattern = re.compile(r"/// BEGIN_FILE (.*?): (.*?)\n(.*?)\n/// END_FILE \1", re.DOTALL)
fragment_pattern = re.compile(r"/// BEGIN (.*?): ?(.*?)\n(.*?)\n/// END \1", re.DOTALL)
code_files = []
for file_match in file_pattern.finditer(text):
path, file_desc, file_content = file_match.groups()
fragments = []
for fragment_match in fragment_pattern.finditer(file_content):
marker, desc, content = fragment_match.groups()
fragments.append(CodeFragment(marker, desc, content))
else:
if file_content.strip():
fragments.append(CodeFragment("entire", "entire file", file_content))
code_files.append(CodeFile(path, file_desc, fragments))
return code_files
class TaskResponseStatus:
COMPLETE = "COMPLETE"
PARTIAL = "PARTIAL"
REDO = "REDO"
class TaskStatus:
DONE = "DONE"
PARTIAL = "PARTIAL"
TODO = "TODO"
class TaskItem:
def __init__(self, id: str, description: Optional[str], status: str, subtasks: List['TaskItem'] = None):
self.id = str(id)
self.status = status
self.description = description
self.subtasks = subtasks or []
def __repr__(self):
return f"TaskItem({self.id}, {self.status}, {self.description})"
def to_dict(self) -> dict:
return {
'id': self.id,
'status': self.status,
'description': self.description,
'subtasks': [subtask.to_dict() for subtask in self.subtasks]
}
class TaskResponse:
def __init__(self, status: str, description: str, subtasks: List[TaskItem]):
self.status = status
self.description = description
self.subtasks = subtasks
def __repr__(self):
return f"Task({self.status}, {len(self.subtasks)} subtasks)"
def parse_task_info(task_info_yaml: str) -> TaskResponse:
task_info = yaml.safe_load(task_info_yaml)
subtasks = [
TaskItem(id=subtask["id"], status=subtask["status"], description=subtask["description"])
for subtask in task_info.get("subtasks", [])
]
return TaskResponse(status=task_info["CODE_GENERATION_STATUS"], description=task_info["description"],
subtasks=subtasks)
def parse_generation_response(text: str) -> (List[CodeFile], TaskItem):
code_text, task_info_yaml = re.split(r"\n\n(?=\s*CODE_GENERATION_STATUS:)", text, maxsplit=1)
code_files = parse_code(code_text)
task_info = parse_task_info(task_info_yaml)
return code_files, task_info
class PlanningResponse:
observations: str
updated_tasks: List[TaskItem]
selected_task_id: str
required_code_fragments: Dict[str, List[str]]
def __init__(self, observations: str, updated_tasks: List[TaskItem], selected_task_id: str, required_code_fragments: Dict[str, List[str]]):
self.observations = observations
self.updated_tasks = updated_tasks
self.selected_task_id = selected_task_id
self.required_code_fragments = required_code_fragments
def parse_planning_response(yaml_string: str) -> PlanningResponse:
response = yaml.safe_load(yaml_string)
def unpack_task_item(task: dict) -> TaskItem:
return TaskItem(
id=task["id"],
status=task["status"],
description=task["description"] if "description" in task else None,
subtasks=[unpack_task_item(subtask) for subtask in task["subtasks"]] if "subtasks" in task else []
)
return PlanningResponse(
observations=response["observations"],
updated_tasks=[unpack_task_item(task) for task in response["updated_tasks"]],
selected_task_id=response["selected_task_id"],
required_code_fragments=response["required_code_fragments"]
)
class GenerationEngineState:
code_files: Dict[str, CodeFile]
tasks: List[TaskItem]
current_task: Optional[TaskItem]
def __init__(self, initial_task: str):
self.tasks = [TaskItem(id="t_0", status=TaskStatus.TODO, description=initial_task)]
self.code_files = {}
self.current_task = None
def find_task(self, task_id: str) -> Optional[TaskItem]:
""" Finds a task by id by scanning the task tree recursively. """
task_id = str(task_id)
def find_subtask(task: TaskItem) -> Optional[TaskItem]:
if task.id == task_id:
return task
for subtask in task.subtasks:
found = find_subtask(subtask)
if found:
return found
return None
for task in self.tasks:
found = find_subtask(task)
if found:
return found
return None
def code_files_summary(self) -> str:
""" Returns a summary of the code files. """
stuff = ""
for code_file in self.code_files.values():
stuff += f"{code_file.path}: {code_file.description}\n"
for fragment in code_file.fragments:
stuff += f" {fragment.marker}: {fragment.description}\n"
return stuff
def select_code_fragments(self, selection: Optional[Dict[str, List[str]]]) -> List[CodeFile]:
""" Selects the code fragments from the current state. """
if selection is None:
return []
code_files = []
for path, markers in selection.items():
if path not in self.code_files: continue
code_file = self.code_files[path]
fragments = [fragment for fragment in code_file.fragments if fragment.marker in markers]
code_files.append(CodeFile(path, code_file.description, fragments))
return code_files
def merge_code_file(self, code_file: CodeFile):
if code_file.path in self.code_files:
existing_code_file = self.code_files[code_file.path]
# go through fragments and update existing ones
for fragment in code_file.fragments:
for existing_fragment in existing_code_file.fragments:
if existing_fragment.marker == fragment.marker:
existing_fragment.content = fragment.content
break
else:
existing_code_file.fragments.append(fragment)
else:
self.code_files[code_file.path] = code_file
def merge_task(self, task: TaskItem):
existing_task = self.find_task(task.id)
if existing_task:
existing_task.status = task.status
existing_task.description = task.description if task.description else existing_task.description
# merge subtasks
for subtask in task.subtasks:
for existing_subtask in existing_task.subtasks:
if existing_subtask.id == subtask.id:
existing_subtask.status = subtask.status
existing_subtask.description = subtask.description
break
else:
existing_task.subtasks.append(subtask)
def merge_task_response(self, task: TaskResponse):
if task.status == TaskResponseStatus.COMPLETE:
self.current_task.status = TaskStatus.DONE
elif (task.status == TaskResponseStatus.PARTIAL) or (task.status == TaskResponseStatus.REDO):
self.current_task.status = TaskStatus.PARTIAL
self.current_task.description = task.description
# merge subtasks
for subtask in task.subtasks:
for existing_subtask in self.current_task.subtasks:
if existing_subtask.id == subtask.id:
existing_subtask.status = subtask.status
existing_subtask.description = subtask.description
break
else:
self.current_task.subtasks.append(subtask)
class GenerationEngine:
def __init__(self, api_key: str):
self.api_key = api_key
openai.api_key = api_key
def get_response_chat(self, messages: List[Dict[str, str]]) -> str:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages
)
print("Response:")
print(response['choices'][0]['message']['content'])
return response['choices'][0]['message']['content']
def get_response_completion(self, messages: List[Dict[str, str]]) -> str:
prompt = ""
for message in messages:
prompt += message['content']
prompt += "\n"
response = openai.Completion.create(
model="code-davinci-002",
prompt=prompt,
max_tokens=4000,
)
response_text = response['choices'][0]['text']
print("Response:")
print(response_text)
return response_text
def get_response(self, messages: List[Dict[str, str]]) -> str:
return self.get_response_chat(messages)
def run_planning(self, state: GenerationEngineState):
messages = [
{"role": "system", "content":
"""You're a code construction AI which creates code iteratively.
You're given a recursive task description and a list of existing code fragments.
You need to update the plan (e.g. expanding subtasks or adding new tasks to improve the code base)
and select the next task to work on.
The response should be in YAML format according to the schema defined below:
class TaskStatus:
DONE = "DONE"
PARTIAL = "PARTIAL"
TODO = "TODO"
class TaskItem:
def __init__(self, id: str, description: str, status: str, subtasks: List['TaskItem'] = None):
self.id = id
self.status = status
self.description = description
self.subtasks = subtasks or []
class PlanningResponse:
observations: str
updated_tasks: List[TaskItem] # new or updated tasks, including subtasks, if any
selected_task_id: str # id of the selected task to work on next
# fragments which are relevant to the selected task
# use only fragments which exist in the code_base_summary
required_code_fragments: Dict[str, List[str]] # path -> list of fragment markers
Respond with raw YAML data starting with "observations:".
"""
},
{
"role": "user",
"content":
f"Code base summary:\n{state.code_files_summary()}\n\n"
f"Tasks:\n---\n{yaml.dump([task.to_dict() for task in state.tasks])}\n...\n"
}
]
print("Code base summary:")
print(state.code_files_summary())
response_text = self.get_response(messages)
response = parse_planning_response(response_text)
for task in response.updated_tasks:
state.merge_task(task)
state.current_task = state.find_task(response.selected_task_id)
return response
def run_task(self, state: GenerationEngineState, selected_code_fragments: List[CodeFile]):
messages = [
{"role": "system", "content":
"""You're a code construction AI which creates code iteratively.
You're given a list of existing code fragments and a task to work on.
Files are normally broken into multiple fragments to reduce context size.
The response should be in the following format:
// Observations on the task and the code base, if any
// A plan to implement the task
// Code fragments to be added to the code base. Use the following markers to delimit code fragments.
// (Normally a fragment would be a function, class, or a list of related lines)
/// BEGIN_FILE <path>: <description>
/// BEGIN <marker>: <description>
<code>
/// END <marker>
/// END_FILE <path>
CODE_GENERATION_STATUS: <status> # COMPLETE, PARTIAL, REDO
description: <description> # updated description if the status is PARTIAL or REDO
subtasks: # updated subtasks if the status is PARTIAL or REDO
- id: <id>
status: <status> # DONE, PARTIAL, TODO
description: <description>
"""},
{
"role": "user",
"content": (yaml.dump(
{
"entire_plan": [task.to_dict() for task in state.tasks],
"current_task": state.current_task.to_dict(),
}
)) + "\n" + "Existing code:\n"
+ "\n".join([code_file.dump() for code_file in selected_code_fragments])
}]
print("Entire plan:")
print(yaml.dump([task.to_dict() for task in state.tasks]))
print("Code so far:")
print("\n".join([code_file.dump() for code_file in selected_code_fragments]))
print("----")
response_text = self.get_response(messages)
generated_code_files, response = parse_generation_response(response_text)
if response.status == TaskResponseStatus.PARTIAL or response.status == TaskResponseStatus.REDO:
state.merge_task_response(response)
else:
state.current_task.status = TaskStatus.DONE
if response.status == TaskResponseStatus.COMPLETE or response.status == TaskResponseStatus.PARTIAL:
for code_file in generated_code_files:
state.merge_code_file(code_file)
return response
def run_code_generation(self, initial_task: str, n_iterations: int = 10):
state = GenerationEngineState(initial_task=initial_task)
for i in range(n_iterations):
while True:
try:
planning = self.run_planning(state)
code_files = state.select_code_fragments(planning.required_code_fragments)
self.run_task(state, code_files)
break
except Exception as e:
print("Error: ", e)
print("Retrying...")
continue
print("Final code:")
for code_file in state.code_files.values():
print(code_file.dump())
if __name__ == "__main__":
engine = GenerationEngine("ur OpenAI API key")
engine.run_code_generation("""Write a reddit clone in TypeScript. Start by planning and creating subtasks.""", n_iterations=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment