Created
March 26, 2023 17:04
-
-
Save killerstorm/2296b282c818ffcfe4ceb729ce639911 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from typing import List, Union, Dict, Optional | |
import yaml | |
import openai | |
import requests | |
MARKER_PREFIX = "///" | |
class CodeFragment: | |
def __init__(self, marker: str, description: str, content: str): | |
self.marker = marker | |
self.description = description | |
self.content = content | |
def dump(self) -> str: | |
return f"""{MARKER_PREFIX} BEGIN {self.marker}: {self.description} | |
{self.content} | |
{MARKER_PREFIX} END {self.marker}""" | |
def __repr__(self): | |
return f"CodeFragment({self.marker}, {self.description})" | |
class CodeFile: | |
def __init__(self, path: str, description: str, fragments: List[CodeFragment]): | |
self.path = path | |
self.description = description | |
self.fragments = fragments | |
def dump(self) -> str: | |
return f"""{MARKER_PREFIX} BEGIN_FILE {self.path}: {self.description} | |
{"".join(fragment.dump() for fragment in self.fragments)} | |
{MARKER_PREFIX} END_FILE {self.path}""" | |
def __repr__(self): | |
return f"CodeFile({self.path}, {len(self.fragments)} fragments)" | |
def parse_code(text: str) -> List[CodeFile]: | |
file_pattern = re.compile(r"/// BEGIN_FILE (.*?): (.*?)\n(.*?)\n/// END_FILE \1", re.DOTALL) | |
fragment_pattern = re.compile(r"/// BEGIN (.*?): ?(.*?)\n(.*?)\n/// END \1", re.DOTALL) | |
code_files = [] | |
for file_match in file_pattern.finditer(text): | |
path, file_desc, file_content = file_match.groups() | |
fragments = [] | |
for fragment_match in fragment_pattern.finditer(file_content): | |
marker, desc, content = fragment_match.groups() | |
fragments.append(CodeFragment(marker, desc, content)) | |
else: | |
if file_content.strip(): | |
fragments.append(CodeFragment("entire", "entire file", file_content)) | |
code_files.append(CodeFile(path, file_desc, fragments)) | |
return code_files | |
class TaskResponseStatus: | |
COMPLETE = "COMPLETE" | |
PARTIAL = "PARTIAL" | |
REDO = "REDO" | |
class TaskStatus: | |
DONE = "DONE" | |
PARTIAL = "PARTIAL" | |
TODO = "TODO" | |
class TaskItem: | |
def __init__(self, id: str, description: Optional[str], status: str, subtasks: List['TaskItem'] = None): | |
self.id = str(id) | |
self.status = status | |
self.description = description | |
self.subtasks = subtasks or [] | |
def __repr__(self): | |
return f"TaskItem({self.id}, {self.status}, {self.description})" | |
def to_dict(self) -> dict: | |
return { | |
'id': self.id, | |
'status': self.status, | |
'description': self.description, | |
'subtasks': [subtask.to_dict() for subtask in self.subtasks] | |
} | |
class TaskResponse: | |
def __init__(self, status: str, description: str, subtasks: List[TaskItem]): | |
self.status = status | |
self.description = description | |
self.subtasks = subtasks | |
def __repr__(self): | |
return f"Task({self.status}, {len(self.subtasks)} subtasks)" | |
def parse_task_info(task_info_yaml: str) -> TaskResponse: | |
task_info = yaml.safe_load(task_info_yaml) | |
subtasks = [ | |
TaskItem(id=subtask["id"], status=subtask["status"], description=subtask["description"]) | |
for subtask in task_info.get("subtasks", []) | |
] | |
return TaskResponse(status=task_info["CODE_GENERATION_STATUS"], description=task_info["description"], | |
subtasks=subtasks) | |
def parse_generation_response(text: str) -> (List[CodeFile], TaskItem): | |
code_text, task_info_yaml = re.split(r"\n\n(?=\s*CODE_GENERATION_STATUS:)", text, maxsplit=1) | |
code_files = parse_code(code_text) | |
task_info = parse_task_info(task_info_yaml) | |
return code_files, task_info | |
class PlanningResponse: | |
observations: str | |
updated_tasks: List[TaskItem] | |
selected_task_id: str | |
required_code_fragments: Dict[str, List[str]] | |
def __init__(self, observations: str, updated_tasks: List[TaskItem], selected_task_id: str, required_code_fragments: Dict[str, List[str]]): | |
self.observations = observations | |
self.updated_tasks = updated_tasks | |
self.selected_task_id = selected_task_id | |
self.required_code_fragments = required_code_fragments | |
def parse_planning_response(yaml_string: str) -> PlanningResponse: | |
response = yaml.safe_load(yaml_string) | |
def unpack_task_item(task: dict) -> TaskItem: | |
return TaskItem( | |
id=task["id"], | |
status=task["status"], | |
description=task["description"] if "description" in task else None, | |
subtasks=[unpack_task_item(subtask) for subtask in task["subtasks"]] if "subtasks" in task else [] | |
) | |
return PlanningResponse( | |
observations=response["observations"], | |
updated_tasks=[unpack_task_item(task) for task in response["updated_tasks"]], | |
selected_task_id=response["selected_task_id"], | |
required_code_fragments=response["required_code_fragments"] | |
) | |
class GenerationEngineState: | |
code_files: Dict[str, CodeFile] | |
tasks: List[TaskItem] | |
current_task: Optional[TaskItem] | |
def __init__(self, initial_task: str): | |
self.tasks = [TaskItem(id="t_0", status=TaskStatus.TODO, description=initial_task)] | |
self.code_files = {} | |
self.current_task = None | |
def find_task(self, task_id: str) -> Optional[TaskItem]: | |
""" Finds a task by id by scanning the task tree recursively. """ | |
task_id = str(task_id) | |
def find_subtask(task: TaskItem) -> Optional[TaskItem]: | |
if task.id == task_id: | |
return task | |
for subtask in task.subtasks: | |
found = find_subtask(subtask) | |
if found: | |
return found | |
return None | |
for task in self.tasks: | |
found = find_subtask(task) | |
if found: | |
return found | |
return None | |
def code_files_summary(self) -> str: | |
""" Returns a summary of the code files. """ | |
stuff = "" | |
for code_file in self.code_files.values(): | |
stuff += f"{code_file.path}: {code_file.description}\n" | |
for fragment in code_file.fragments: | |
stuff += f" {fragment.marker}: {fragment.description}\n" | |
return stuff | |
def select_code_fragments(self, selection: Optional[Dict[str, List[str]]]) -> List[CodeFile]: | |
""" Selects the code fragments from the current state. """ | |
if selection is None: | |
return [] | |
code_files = [] | |
for path, markers in selection.items(): | |
if path not in self.code_files: continue | |
code_file = self.code_files[path] | |
fragments = [fragment for fragment in code_file.fragments if fragment.marker in markers] | |
code_files.append(CodeFile(path, code_file.description, fragments)) | |
return code_files | |
def merge_code_file(self, code_file: CodeFile): | |
if code_file.path in self.code_files: | |
existing_code_file = self.code_files[code_file.path] | |
# go through fragments and update existing ones | |
for fragment in code_file.fragments: | |
for existing_fragment in existing_code_file.fragments: | |
if existing_fragment.marker == fragment.marker: | |
existing_fragment.content = fragment.content | |
break | |
else: | |
existing_code_file.fragments.append(fragment) | |
else: | |
self.code_files[code_file.path] = code_file | |
def merge_task(self, task: TaskItem): | |
existing_task = self.find_task(task.id) | |
if existing_task: | |
existing_task.status = task.status | |
existing_task.description = task.description if task.description else existing_task.description | |
# merge subtasks | |
for subtask in task.subtasks: | |
for existing_subtask in existing_task.subtasks: | |
if existing_subtask.id == subtask.id: | |
existing_subtask.status = subtask.status | |
existing_subtask.description = subtask.description | |
break | |
else: | |
existing_task.subtasks.append(subtask) | |
def merge_task_response(self, task: TaskResponse): | |
if task.status == TaskResponseStatus.COMPLETE: | |
self.current_task.status = TaskStatus.DONE | |
elif (task.status == TaskResponseStatus.PARTIAL) or (task.status == TaskResponseStatus.REDO): | |
self.current_task.status = TaskStatus.PARTIAL | |
self.current_task.description = task.description | |
# merge subtasks | |
for subtask in task.subtasks: | |
for existing_subtask in self.current_task.subtasks: | |
if existing_subtask.id == subtask.id: | |
existing_subtask.status = subtask.status | |
existing_subtask.description = subtask.description | |
break | |
else: | |
self.current_task.subtasks.append(subtask) | |
class GenerationEngine: | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
openai.api_key = api_key | |
def get_response_chat(self, messages: List[Dict[str, str]]) -> str: | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=messages | |
) | |
print("Response:") | |
print(response['choices'][0]['message']['content']) | |
return response['choices'][0]['message']['content'] | |
def get_response_completion(self, messages: List[Dict[str, str]]) -> str: | |
prompt = "" | |
for message in messages: | |
prompt += message['content'] | |
prompt += "\n" | |
response = openai.Completion.create( | |
model="code-davinci-002", | |
prompt=prompt, | |
max_tokens=4000, | |
) | |
response_text = response['choices'][0]['text'] | |
print("Response:") | |
print(response_text) | |
return response_text | |
def get_response(self, messages: List[Dict[str, str]]) -> str: | |
return self.get_response_chat(messages) | |
def run_planning(self, state: GenerationEngineState): | |
messages = [ | |
{"role": "system", "content": | |
"""You're a code construction AI which creates code iteratively. | |
You're given a recursive task description and a list of existing code fragments. | |
You need to update the plan (e.g. expanding subtasks or adding new tasks to improve the code base) | |
and select the next task to work on. | |
The response should be in YAML format according to the schema defined below: | |
class TaskStatus: | |
DONE = "DONE" | |
PARTIAL = "PARTIAL" | |
TODO = "TODO" | |
class TaskItem: | |
def __init__(self, id: str, description: str, status: str, subtasks: List['TaskItem'] = None): | |
self.id = id | |
self.status = status | |
self.description = description | |
self.subtasks = subtasks or [] | |
class PlanningResponse: | |
observations: str | |
updated_tasks: List[TaskItem] # new or updated tasks, including subtasks, if any | |
selected_task_id: str # id of the selected task to work on next | |
# fragments which are relevant to the selected task | |
# use only fragments which exist in the code_base_summary | |
required_code_fragments: Dict[str, List[str]] # path -> list of fragment markers | |
Respond with raw YAML data starting with "observations:". | |
""" | |
}, | |
{ | |
"role": "user", | |
"content": | |
f"Code base summary:\n{state.code_files_summary()}\n\n" | |
f"Tasks:\n---\n{yaml.dump([task.to_dict() for task in state.tasks])}\n...\n" | |
} | |
] | |
print("Code base summary:") | |
print(state.code_files_summary()) | |
response_text = self.get_response(messages) | |
response = parse_planning_response(response_text) | |
for task in response.updated_tasks: | |
state.merge_task(task) | |
state.current_task = state.find_task(response.selected_task_id) | |
return response | |
def run_task(self, state: GenerationEngineState, selected_code_fragments: List[CodeFile]): | |
messages = [ | |
{"role": "system", "content": | |
"""You're a code construction AI which creates code iteratively. | |
You're given a list of existing code fragments and a task to work on. | |
Files are normally broken into multiple fragments to reduce context size. | |
The response should be in the following format: | |
// Observations on the task and the code base, if any | |
// A plan to implement the task | |
// Code fragments to be added to the code base. Use the following markers to delimit code fragments. | |
// (Normally a fragment would be a function, class, or a list of related lines) | |
/// BEGIN_FILE <path>: <description> | |
/// BEGIN <marker>: <description> | |
<code> | |
/// END <marker> | |
/// END_FILE <path> | |
CODE_GENERATION_STATUS: <status> # COMPLETE, PARTIAL, REDO | |
description: <description> # updated description if the status is PARTIAL or REDO | |
subtasks: # updated subtasks if the status is PARTIAL or REDO | |
- id: <id> | |
status: <status> # DONE, PARTIAL, TODO | |
description: <description> | |
"""}, | |
{ | |
"role": "user", | |
"content": (yaml.dump( | |
{ | |
"entire_plan": [task.to_dict() for task in state.tasks], | |
"current_task": state.current_task.to_dict(), | |
} | |
)) + "\n" + "Existing code:\n" | |
+ "\n".join([code_file.dump() for code_file in selected_code_fragments]) | |
}] | |
print("Entire plan:") | |
print(yaml.dump([task.to_dict() for task in state.tasks])) | |
print("Code so far:") | |
print("\n".join([code_file.dump() for code_file in selected_code_fragments])) | |
print("----") | |
response_text = self.get_response(messages) | |
generated_code_files, response = parse_generation_response(response_text) | |
if response.status == TaskResponseStatus.PARTIAL or response.status == TaskResponseStatus.REDO: | |
state.merge_task_response(response) | |
else: | |
state.current_task.status = TaskStatus.DONE | |
if response.status == TaskResponseStatus.COMPLETE or response.status == TaskResponseStatus.PARTIAL: | |
for code_file in generated_code_files: | |
state.merge_code_file(code_file) | |
return response | |
def run_code_generation(self, initial_task: str, n_iterations: int = 10): | |
state = GenerationEngineState(initial_task=initial_task) | |
for i in range(n_iterations): | |
while True: | |
try: | |
planning = self.run_planning(state) | |
code_files = state.select_code_fragments(planning.required_code_fragments) | |
self.run_task(state, code_files) | |
break | |
except Exception as e: | |
print("Error: ", e) | |
print("Retrying...") | |
continue | |
print("Final code:") | |
for code_file in state.code_files.values(): | |
print(code_file.dump()) | |
if __name__ == "__main__": | |
engine = GenerationEngine("ur OpenAI API key") | |
engine.run_code_generation("""Write a reddit clone in TypeScript. Start by planning and creating subtasks.""", n_iterations=10) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment