Created
July 9, 2024 05:49
-
-
Save thomasahle/2779feb075e784c6a25e9f69d6caaf04 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import instructor | |
from pydantic import BaseModel, Field | |
from typing import overload, Union, Literal, Generator | |
from tqdm.asyncio import tqdm | |
import asyncio | |
import numpy as np | |
import json | |
import os, sys | |
import diskcache, inspect, functools | |
import random | |
import openai | |
import dotenv | |
dotenv.load_dotenv() | |
client = instructor.from_openai(openai.AsyncOpenAI()) | |
cache = diskcache.Cache("./my_cache_directory") | |
def instructor_cache(func): | |
"""Cache a function that returns a Pydantic model""" | |
return_type = inspect.signature(func).return_annotation # | |
if not issubclass(return_type, BaseModel): # | |
raise ValueError("The return type must be a Pydantic model") | |
@functools.wraps(func) | |
def wrapper(*args, **kwargs): | |
key = f"{func.__name__}-{functools._make_key(args, kwargs, typed=False)}" # | |
# Check if the result is already cached | |
if (cached := cache.get(key)) is not None: | |
# Deserialize from JSON based on the return type | |
return return_type.model_validate_json(cached) | |
# Call the function and cache its result | |
result = func(*args, **kwargs) | |
serialized_result = result.model_dump_json() | |
cache.set(key, serialized_result) | |
return result | |
return wrapper | |
def format_input(pydantic_object: BaseModel) -> Generator[dict[str, str], None, None]: | |
schema = type(pydantic_object).schema() | |
# First give the doctstring, if there is one. | |
if "description" in schema: | |
yield {"role": "user", "content": schema["description"]} | |
for key, value in pydantic_object.dict().items(): | |
props = schema["properties"][key] | |
yield { | |
"role": "user", | |
"content": f"{props['title']}: {props.get('description', '')}", | |
} | |
yield {"role": "user", "content": str(value)} | |
def format_examples( | |
objects: tuple[str, BaseModel] | |
) -> Generator[dict[str, str], None, None]: | |
yield { | |
"role": "user", | |
"content": "Below are some examples of user and agent interactions.", | |
} | |
for role, obj in objects: | |
schema = type(obj).schema() | |
for key, value in obj.dict().items(): | |
props = schema["properties"][key] | |
yield {"role": role, "content": f"{props['title']}:"} | |
yield {"role": role, "content": str(value)} | |
yield { | |
"role": "user", | |
"content": "That was all the examples. Now comes the main question.", | |
} | |
# @instructor_cache | |
async def answer(context, question, examples=()): | |
class Question(BaseModel): | |
"Answer question using the following context" | |
context: str | |
question: str | |
tips: str = ( | |
"Make sure to cite your sources, and use the exact words from the context." | |
) | |
class Answer(BaseModel): | |
relevant_text: list[str] = Field( | |
description="2-3 relevant snippets of text from the context that that are relevant to answering the question, even if indirectly so." | |
) | |
thought_process: str = Field( | |
description="Think sceptically about the context and the question. What is the context trying to say? What is the question asking? How did the user judge the answers in the examples above?" | |
) | |
extended_answer: str = Field( | |
description="Comprehensive answer with all relevant information. Including possible altnerative answers." | |
) | |
answer: str = Field(description="Short factual summary of the most likely answer.") | |
issues: str = Field( | |
description="What are the potential issues with the answer? What are the limitations of the answer? Could the user have interpreted the answer differently? How should this impact the certainty of the answer?" | |
) | |
certainty: float = Field( | |
description="How certain are you that the answer is correct? You will be judged using NLL loss, so be EXTREMELY careful and never give 0 or 1. Always some value in between.", | |
gt=0, | |
lt=1, | |
) | |
# TODO: It would be nice to sample multiple answers here, and then have a model that combines them. | |
# However, instructor doesn't support multiple completions yet. | |
return await client.chat.completions.create( | |
model="gpt-4-turbo", | |
response_model=Answer, | |
messages=[ | |
{ | |
"role": "system", | |
"content": "You a very knowledgeable support agent, answering questions based on context from a manual.", | |
}, | |
*format_examples(examples), | |
*format_input(Question(context=context, question=question)), | |
] | |
) | |
async def combine(): | |
# TODO | |
class ShortAnswer(BaseModel): | |
relevant_text: list[str] | |
answer: str | |
confidence: float | |
class Question(BaseModel): | |
"A user has asked a question, and multiple people have answered it. Combine the answers into a single response." | |
question: str | |
answers: list[ShortAnswer] | |
class CombinedAnswer(BaseModel): | |
thought_process: str | |
answer: str | |
confidence: float | |
return await client.chat.completions.create( | |
model="gpt-4-turbo", | |
response_model=CombinedAnswer, | |
messages=[ | |
*format_input(Question(context=context, question=question)), | |
], | |
) | |
# @instructor_cache | |
async def judge(context, question, student_response, correct_answer): | |
class JudgeInput(BaseModel): | |
"""Below is a question, followed by a student's response, followed by the correct answer. Check if the student's response is correct. The student's response does not need to have exactly the same wording as the correct answer, but it should convey the same information. If the student's response is correct, answer 'Yes'. If the student's response is incorrect, answer 'No'""" | |
context: str | |
question: str | |
student_response: str | |
correct_answer: str | |
class JudgeOutput(BaseModel): | |
feedback: str = Field( | |
description="Pros and cons of the student's response. What did they do well? What could they improve?" | |
) | |
is_correct: Literal["yes", "no"] | |
return await client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
#model="gpt-4-turbo", | |
response_model=JudgeOutput, | |
messages=[ | |
*format_input( | |
JudgeInput( | |
context=context, | |
question=question, | |
student_response=student_response, | |
correct_answer=correct_answer, | |
) | |
) | |
], | |
) | |
def compute_ece(confidences, correctness, num_bins=10): | |
bin_boundaries = np.linspace(0, 1, num_bins + 1) | |
bin_lowers = bin_boundaries[:-1] | |
bin_uppers = bin_boundaries[1:] | |
ece = 0 | |
for bin_lower, bin_upper in zip(bin_lowers, bin_uppers): | |
in_bin = np.logical_and(confidences > bin_lower, confidences <= bin_upper) | |
prop_in_bin = np.mean(in_bin) | |
if prop_in_bin > 0: | |
accuracy_in_bin = np.mean(correctness[in_bin]) | |
avg_confidence_in_bin = np.mean(confidences[in_bin]) | |
ece += np.abs(accuracy_in_bin - avg_confidence_in_bin) * prop_in_bin | |
return ece | |
async def task(i, answer_examples): | |
question = open(f"data/questions/{i}.txt").read() | |
context = open(f"data/descriptions/{i}.txt").read() | |
true_answer = open(f"data/answers/{i}.txt").read() | |
student_answer = await answer(context, question, answer_examples) | |
judge_answer = await judge(context, question, student_answer.answer, true_answer) | |
log_entry = { | |
"question_id": i, | |
"question": question, | |
"student_answer": student_answer.answer, | |
"student_context": student_answer.relevant_text, | |
"confidence": student_answer.certainty, | |
"true_answer": true_answer, | |
"judge_answer": judge_answer.feedback, | |
"judge_verdict": judge_answer.is_correct == "yes", | |
} | |
return i, judge_answer.is_correct == "yes", student_answer.certainty, log_entry | |
def compute_nll(confidences, correctness): | |
eps = 1e-15 # small constant to avoid log(0) | |
return -np.mean( | |
correctness * np.log(confidences + eps) | |
+ (1 - correctness) * np.log(1 - confidences + eps) | |
) | |
def load_examples(path, n_correct, n_incorrect): | |
data = json.load(open(path)) | |
random.shuffle(data) | |
correct = [d for d in data if d["judge_verdict"]] | |
incorrect = [d for d in data if not d["judge_verdict"]] | |
for d in correct + incorrect: | |
i = d["question_id"] | |
d["context"] = open(f"data/descriptions/{i}.txt").read() | |
res = correct[:n_correct] + incorrect[:n_incorrect] | |
random.shuffle(res) | |
return res | |
class ExampleQuestion(BaseModel): | |
context: str | |
question: str | |
class ExampleAnswer(BaseModel): | |
answer: str | |
confidence: float | |
class ExampleJudge(BaseModel): | |
intended_answer: str | |
feedback: str | |
verdict: str | |
async def main(): | |
# Sample n questions at random to answer | |
n = int(sys.argv[1]) | |
qids = [int(d.split('.')[0]) for d in os.listdir("data/questions")] | |
random.shuffle(qids) | |
qids = qids[:n] | |
# Since I'm using Instructor, and not DSPy, the examples are not tuned, but just picked | |
# to be half correct examples and half incorrect examples. | |
# In my experiments, I found that including examples didn't actually make much of a difference. | |
# So it could probably be improved. | |
examples = [] | |
for ex in load_examples("100_answers.json", 5, 5): | |
examples.append( | |
("user", ExampleQuestion(context=ex["context"], question=ex["question"])) | |
) | |
examples.append( | |
( | |
"assistant", | |
ExampleAnswer(answer=ex["student_answer"], confidence=ex["confidence"]), | |
) | |
) | |
examples.append( | |
( | |
"user", | |
ExampleJudge( | |
intended_answer=ex["true_answer"], | |
feedback=ex["judge_answer"], | |
verdict="correct" if ex["judge_verdict"] else "incorrect", | |
), | |
) | |
) | |
# Create all the tasks using async | |
tasks = [task(i, examples) for i in qids] | |
results = [] | |
confidences = [] | |
correctness = [] | |
log_entries = [] | |
pbar = tqdm(total=len(tasks), ncols=120) | |
for future in asyncio.as_completed(tasks): | |
i, correct, confidence, log_entry = await future | |
results.append((i, correct, confidence)) | |
confidences.append(confidence) | |
correctness.append(correct) | |
log_entries.append(log_entry) | |
accuracy = np.mean(correctness) | |
ece = compute_ece(np.array(confidences), np.array(correctness)) | |
nll = compute_nll(np.array(confidences), np.array(correctness)) | |
mean_confidence = np.mean(confidences) | |
pbar.set_postfix( | |
{"Acc": f"{accuracy:.2f}", "Conf": f"{mean_confidence:.2f}", "ECE": f"{ece:.2f}", "NLL": f"{nll:.2f}"} | |
) | |
pbar.update(1) | |
pbar.close() | |
for i, correct, confidence in sorted(results): | |
print( | |
f"Question {i} is {'correct' if correct else 'incorrect'}, confidence: {confidence:.2f}" | |
) | |
print(f"\nFinal Accuracy: {accuracy:.2f}") | |
print(f"Mean Confidence: {mean_confidence:.2f}") | |
print(f"Final ECE: {ece:.2f}") | |
print(f"Final NLL: {nll:.2f}") | |
# Log all entries to a file | |
log_filename = "answer_log.json" | |
with open(log_filename, "w") as f: | |
json.dump(log_entries, f, indent=2) | |
print(f"\nDetailed logs have been written to {os.path.abspath(log_filename)}") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment