Skip to content

Instantly share code, notes, and snippets.

@thomasahle
Created July 9, 2024 05:49
Show Gist options
  • Save thomasahle/2779feb075e784c6a25e9f69d6caaf04 to your computer and use it in GitHub Desktop.
Save thomasahle/2779feb075e784c6a25e9f69d6caaf04 to your computer and use it in GitHub Desktop.
import instructor
from pydantic import BaseModel, Field
from typing import overload, Union, Literal, Generator
from tqdm.asyncio import tqdm
import asyncio
import numpy as np
import json
import os, sys
import diskcache, inspect, functools
import random
import openai
import dotenv
dotenv.load_dotenv()
client = instructor.from_openai(openai.AsyncOpenAI())
cache = diskcache.Cache("./my_cache_directory")
def instructor_cache(func):
"""Cache a function that returns a Pydantic model"""
return_type = inspect.signature(func).return_annotation #
if not issubclass(return_type, BaseModel): #
raise ValueError("The return type must be a Pydantic model")
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}" #
# Check if the result is already cached
if (cached := cache.get(key)) is not None:
# Deserialize from JSON based on the return type
return return_type.model_validate_json(cached)
# Call the function and cache its result
result = func(*args, **kwargs)
serialized_result = result.model_dump_json()
cache.set(key, serialized_result)
return result
return wrapper
def format_input(pydantic_object: BaseModel) -> Generator[dict[str, str], None, None]:
schema = type(pydantic_object).schema()
# First give the doctstring, if there is one.
if "description" in schema:
yield {"role": "user", "content": schema["description"]}
for key, value in pydantic_object.dict().items():
props = schema["properties"][key]
yield {
"role": "user",
"content": f"{props['title']}: {props.get('description', '')}",
}
yield {"role": "user", "content": str(value)}
def format_examples(
objects: tuple[str, BaseModel]
) -> Generator[dict[str, str], None, None]:
yield {
"role": "user",
"content": "Below are some examples of user and agent interactions.",
}
for role, obj in objects:
schema = type(obj).schema()
for key, value in obj.dict().items():
props = schema["properties"][key]
yield {"role": role, "content": f"{props['title']}:"}
yield {"role": role, "content": str(value)}
yield {
"role": "user",
"content": "That was all the examples. Now comes the main question.",
}
# @instructor_cache
async def answer(context, question, examples=()):
class Question(BaseModel):
"Answer question using the following context"
context: str
question: str
tips: str = (
"Make sure to cite your sources, and use the exact words from the context."
)
class Answer(BaseModel):
relevant_text: list[str] = Field(
description="2-3 relevant snippets of text from the context that that are relevant to answering the question, even if indirectly so."
)
thought_process: str = Field(
description="Think sceptically about the context and the question. What is the context trying to say? What is the question asking? How did the user judge the answers in the examples above?"
)
extended_answer: str = Field(
description="Comprehensive answer with all relevant information. Including possible altnerative answers."
)
answer: str = Field(description="Short factual summary of the most likely answer.")
issues: str = Field(
description="What are the potential issues with the answer? What are the limitations of the answer? Could the user have interpreted the answer differently? How should this impact the certainty of the answer?"
)
certainty: float = Field(
description="How certain are you that the answer is correct? You will be judged using NLL loss, so be EXTREMELY careful and never give 0 or 1. Always some value in between.",
gt=0,
lt=1,
)
# TODO: It would be nice to sample multiple answers here, and then have a model that combines them.
# However, instructor doesn't support multiple completions yet.
return await client.chat.completions.create(
model="gpt-4-turbo",
response_model=Answer,
messages=[
{
"role": "system",
"content": "You a very knowledgeable support agent, answering questions based on context from a manual.",
},
*format_examples(examples),
*format_input(Question(context=context, question=question)),
]
)
async def combine():
# TODO
class ShortAnswer(BaseModel):
relevant_text: list[str]
answer: str
confidence: float
class Question(BaseModel):
"A user has asked a question, and multiple people have answered it. Combine the answers into a single response."
question: str
answers: list[ShortAnswer]
class CombinedAnswer(BaseModel):
thought_process: str
answer: str
confidence: float
return await client.chat.completions.create(
model="gpt-4-turbo",
response_model=CombinedAnswer,
messages=[
*format_input(Question(context=context, question=question)),
],
)
# @instructor_cache
async def judge(context, question, student_response, correct_answer):
class JudgeInput(BaseModel):
"""Below is a question, followed by a student's response, followed by the correct answer. Check if the student's response is correct. The student's response does not need to have exactly the same wording as the correct answer, but it should convey the same information. If the student's response is correct, answer 'Yes'. If the student's response is incorrect, answer 'No'"""
context: str
question: str
student_response: str
correct_answer: str
class JudgeOutput(BaseModel):
feedback: str = Field(
description="Pros and cons of the student's response. What did they do well? What could they improve?"
)
is_correct: Literal["yes", "no"]
return await client.chat.completions.create(
model="gpt-3.5-turbo",
#model="gpt-4-turbo",
response_model=JudgeOutput,
messages=[
*format_input(
JudgeInput(
context=context,
question=question,
student_response=student_response,
correct_answer=correct_answer,
)
)
],
)
def compute_ece(confidences, correctness, num_bins=10):
bin_boundaries = np.linspace(0, 1, num_bins + 1)
bin_lowers = bin_boundaries[:-1]
bin_uppers = bin_boundaries[1:]
ece = 0
for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
in_bin = np.logical_and(confidences > bin_lower, confidences <= bin_upper)
prop_in_bin = np.mean(in_bin)
if prop_in_bin > 0:
accuracy_in_bin = np.mean(correctness[in_bin])
avg_confidence_in_bin = np.mean(confidences[in_bin])
ece += np.abs(accuracy_in_bin - avg_confidence_in_bin) * prop_in_bin
return ece
async def task(i, answer_examples):
question = open(f"data/questions/{i}.txt").read()
context = open(f"data/descriptions/{i}.txt").read()
true_answer = open(f"data/answers/{i}.txt").read()
student_answer = await answer(context, question, answer_examples)
judge_answer = await judge(context, question, student_answer.answer, true_answer)
log_entry = {
"question_id": i,
"question": question,
"student_answer": student_answer.answer,
"student_context": student_answer.relevant_text,
"confidence": student_answer.certainty,
"true_answer": true_answer,
"judge_answer": judge_answer.feedback,
"judge_verdict": judge_answer.is_correct == "yes",
}
return i, judge_answer.is_correct == "yes", student_answer.certainty, log_entry
def compute_nll(confidences, correctness):
eps = 1e-15 # small constant to avoid log(0)
return -np.mean(
correctness * np.log(confidences + eps)
+ (1 - correctness) * np.log(1 - confidences + eps)
)
def load_examples(path, n_correct, n_incorrect):
data = json.load(open(path))
random.shuffle(data)
correct = [d for d in data if d["judge_verdict"]]
incorrect = [d for d in data if not d["judge_verdict"]]
for d in correct + incorrect:
i = d["question_id"]
d["context"] = open(f"data/descriptions/{i}.txt").read()
res = correct[:n_correct] + incorrect[:n_incorrect]
random.shuffle(res)
return res
class ExampleQuestion(BaseModel):
context: str
question: str
class ExampleAnswer(BaseModel):
answer: str
confidence: float
class ExampleJudge(BaseModel):
intended_answer: str
feedback: str
verdict: str
async def main():
# Sample n questions at random to answer
n = int(sys.argv[1])
qids = [int(d.split('.')[0]) for d in os.listdir("data/questions")]
random.shuffle(qids)
qids = qids[:n]
# Since I'm using Instructor, and not DSPy, the examples are not tuned, but just picked
# to be half correct examples and half incorrect examples.
# In my experiments, I found that including examples didn't actually make much of a difference.
# So it could probably be improved.
examples = []
for ex in load_examples("100_answers.json", 5, 5):
examples.append(
("user", ExampleQuestion(context=ex["context"], question=ex["question"]))
)
examples.append(
(
"assistant",
ExampleAnswer(answer=ex["student_answer"], confidence=ex["confidence"]),
)
)
examples.append(
(
"user",
ExampleJudge(
intended_answer=ex["true_answer"],
feedback=ex["judge_answer"],
verdict="correct" if ex["judge_verdict"] else "incorrect",
),
)
)
# Create all the tasks using async
tasks = [task(i, examples) for i in qids]
results = []
confidences = []
correctness = []
log_entries = []
pbar = tqdm(total=len(tasks), ncols=120)
for future in asyncio.as_completed(tasks):
i, correct, confidence, log_entry = await future
results.append((i, correct, confidence))
confidences.append(confidence)
correctness.append(correct)
log_entries.append(log_entry)
accuracy = np.mean(correctness)
ece = compute_ece(np.array(confidences), np.array(correctness))
nll = compute_nll(np.array(confidences), np.array(correctness))
mean_confidence = np.mean(confidences)
pbar.set_postfix(
{"Acc": f"{accuracy:.2f}", "Conf": f"{mean_confidence:.2f}", "ECE": f"{ece:.2f}", "NLL": f"{nll:.2f}"}
)
pbar.update(1)
pbar.close()
for i, correct, confidence in sorted(results):
print(
f"Question {i} is {'correct' if correct else 'incorrect'}, confidence: {confidence:.2f}"
)
print(f"\nFinal Accuracy: {accuracy:.2f}")
print(f"Mean Confidence: {mean_confidence:.2f}")
print(f"Final ECE: {ece:.2f}")
print(f"Final NLL: {nll:.2f}")
# Log all entries to a file
log_filename = "answer_log.json"
with open(log_filename, "w") as f:
json.dump(log_entries, f, indent=2)
print(f"\nDetailed logs have been written to {os.path.abspath(log_filename)}")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment