Created
October 7, 2023 15:32
-
-
Save FoobarProtocol/52619bc47cb2b852f87b891b147083e5 to your computer and use it in GitHub Desktop.
This is an extraction of the 'least to most' method from the researcher's GitHub; there are still some alterations that need to be made but this extracts the necessary code, makes some fixes to it, ensures that all calls are made to defined functions etc.; pairing this with the correct dataset from the repo will allow us to provide solutions to …
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# !/usr/bin/env python3 | |
# _*_ coding:utf-8 _*_ | |
import openai | |
import random | |
import time | |
import json | |
import re | |
def create_response( eng,prompt_input, max_tokens=256, temperature=0.0, stop=None): | |
if stop is None: | |
response = openai.Completion.create( | |
model=eng, | |
prompt=prompt_input, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
top_p=1.0, | |
frequency_penalty=0.0, | |
presence_penalty=0.0, | |
# stop=["{}:".format(stop)] | |
) | |
# print(response) | |
else: | |
response = openai.Completion.create( | |
model=eng, | |
prompt=prompt_input, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
top_p=1.0, | |
frequency_penalty=0.0, | |
presence_penalty=0.0, | |
stop=["{}:".format(stop)] | |
) | |
return response | |
def create_response_chat(eng="gpt-3.5-turbo", prompt_input=None, max_tokens=256, temperature=0.0, stop="Q"): | |
response = openai.ChatCompletion.create( | |
model=eng, | |
messages=prompt_input, | |
temperature=temperature, | |
) | |
return response | |
def answer_cleansing(method,direct_answer_trigger_for_fewshot,dataset, pred): | |
# print("pred_before : " + pred) | |
if method in ("few_shot", "few_shot_cot","least_to_most"): | |
preds = pred.split(direct_answer_trigger_for_fewshot) | |
answer_flag = True if len(preds) > 1 else False | |
pred = preds[-1] | |
if dataset == "aqua": | |
pred = pred.upper() | |
pred = re.findall(r'A|B|C|D|E', pred) | |
elif dataset in ("gsm8k", "addsub", "multiarith", "svamp", "singleeq","gsmic"): | |
pred=pred.replace(",", "") | |
pred = [delete_extra_zero(s.replace(",", "")) for s in re.findall(r'-?\d+/?\.?\d*', pred)] | |
else: | |
raise ValueError("dataset is not properly defined ...") | |
# If there is no candidate in list, null is set. | |
if len(pred) == 0: | |
pred = "" | |
else: | |
if method in ("few_shot", "few_shot_cot","least_to_most"): | |
if answer_flag: | |
# choose the first element in list ... | |
pred = pred[0] | |
else: | |
# choose the last element in list ... | |
pred = pred[-1] | |
elif method in ("zero_shot", "zero_shot_cot"): | |
# choose the first element in list ... | |
pred = pred[0] | |
else: | |
raise ValueError("method is not properly defined ...") | |
# (For arithmetic tasks) if a word ends with period, it will be omitted ... | |
if pred != "": | |
if pred[-1] == ".": | |
pred = pred[:-1] | |
if pred[-1] == "/": | |
pred = pred[:-1] | |
return pred | |
def delete_extra_zero(self, n): | |
try: | |
n = float(n) | |
except Exception as e: | |
print(f"None {n}") | |
return n | |
if isinstance(n, int): | |
return str(n) | |
if isinstance(n, float): | |
n = str(n).rstrip('0') # 删除小数点后多余的0 | |
n = int(n.rstrip('.')) if n.endswith('.') else float(n) # 只剩小数点直接转int,否则转回float | |
n = str(n) | |
return n | |
class AnswerMethod(): | |
def __init__(self,answer_method="few_shot", | |
eng="text-davinci-003", | |
few_shot_prompt_path=None | |
): | |
self.answer_method = answer_method | |
self.eng = eng | |
pass | |
def forward(self,question,answer,direct_answer_trigger_for_fewshot,dataset): | |
pass | |
def delete_extra_zero(self, n): | |
try: | |
n = float(n) | |
except Exception as e: | |
print(f"None {n}") | |
return n | |
if isinstance(n, int): | |
return str(n) | |
if isinstance(n, float): | |
n = str(n).rstrip('0') # 删除小数点后多余的0 | |
n = int(n.rstrip('.')) if n.endswith('.') else float(n) # 只剩小数点直接转int,否则转回float | |
n = str(n) | |
return n | |
def generate_CoT_and_answer(self,eng,method,direct_answer_trigger_for_fewshot,dataset, prompt_input): | |
if eng in ["gpt-3.5-turbo", "gpt-4"]: | |
# gpt4 | |
CoT_and_answer = create_response_chat( | |
eng, | |
prompt_input=[ | |
{"role": "system", "content": "Follow the given examples and answer the question."}, | |
{"role": "user", "content": prompt_input} | |
], | |
temperature=0, | |
max_tokens=256 | |
)["choices"][0]["message"]["content"] | |
time.sleep(21) | |
else: | |
CoT_and_answer = create_response( | |
eng, | |
prompt_input=prompt_input, | |
temperature=0, | |
max_tokens=256 | |
)["choices"][0]["text"] | |
answer = answer_cleansing(method, direct_answer_trigger_for_fewshot, dataset, | |
CoT_and_answer.split(direct_answer_trigger_for_fewshot)[-1]) | |
return CoT_and_answer, answer | |
class AnswerFewShotLtM(AnswerMethod): | |
def __init__(self,answer_method="least_to_most", | |
eng="text-davinci-003", | |
problem_reducing_prompt_path=None, | |
problem_solving_prompt_path=None | |
): | |
self.answer_method = answer_method | |
self.eng = eng | |
assert problem_reducing_prompt_path is not None | |
assert problem_solving_prompt_path is not None | |
with open(file=problem_reducing_prompt_path, mode="r", encoding="utf-8") as f: | |
self.problem_reducing_prompt = f.read().strip() | |
with open(file=problem_solving_prompt_path, mode="r", encoding="utf-8") as f: | |
self.problem_solving_prompt = f.read().strip() | |
def answer_cleansing(method,direct_answer_trigger_for_fewshot,dataset, pred): | |
# print("pred_before : " + pred) | |
if method in ("few_shot", "few_shot_cot","least_to_most"): | |
preds = pred.split(direct_answer_trigger_for_fewshot) | |
answer_flag = True if len(preds) > 1 else False | |
pred = preds[-1] | |
if dataset == "aqua": | |
pred = pred.upper() | |
pred = re.findall(r'A|B|C|D|E', pred) | |
elif dataset in ("gsm8k", "addsub", "multiarith", "svamp", "singleeq","gsmic"): | |
pred=pred.replace(",", "") | |
pred = [delete_extra_zero(s.replace(",", "")) for s in re.findall(r'-?\d+/?\.?\d*', pred)] | |
else: | |
raise ValueError("dataset is not properly defined ...") | |
# If there is no candidate in list, null is set. | |
if len(pred) == 0: | |
pred = "" | |
else: | |
if method in ("few_shot", "few_shot_cot","least_to_most"): | |
if answer_flag: | |
# choose the first element in list ... | |
pred = pred[0] | |
else: | |
# choose the last element in list ... | |
pred = pred[-1] | |
elif method in ("zero_shot", "zero_shot_cot"): | |
# choose the first element in list ... | |
pred = pred[0] | |
else: | |
raise ValueError("method is not properly defined ...") | |
# (For arithmetic tasks) if a word ends with period, it will be omitted ... | |
if pred != "": | |
if pred[-1] == ".": | |
pred = pred[:-1] | |
if pred[-1] == "/": | |
pred = pred[:-1] | |
return pred | |
def forward(self,question,answer,direct_answer_trigger_for_fewshot,dataset): | |
try: | |
problem_reducing_prompt_input = self.problem_reducing_prompt + "\n\nQ: {}\nA:".format(question) | |
# reduce | |
if self.eng in ["gpt-3.5-turbo", "gpt-4"]: | |
# gpt4 | |
reduced_problem = create_response_chat( | |
eng=self.eng, | |
prompt_input=[ | |
{"role": "system", "content": "Follow the given examples and answer the question."}, | |
{"role": "user", "content": problem_reducing_prompt_input} | |
], | |
temperature=0, | |
max_tokens=256 | |
)["choices"][0]["message"]["content"] | |
time.sleep(20) | |
else: | |
reduced_problem = create_response( | |
eng=self.eng, | |
prompt_input=problem_reducing_prompt_input, | |
temperature=0, | |
max_tokens=256 | |
)["choices"][0]["text"] | |
print("Reducing Response: {}".format(reduced_problem)) | |
reduced_problem_list = reduced_problem.split("\"") | |
reduced_problem_list = [i for i in reduced_problem_list if "?" in i] | |
final_q = reduced_problem_list[0] | |
# reduced_problem_list.reverse() | |
reduced_problem_list = reduced_problem_list[1:] + [final_q] | |
print("---Reduced to {} questions. They are: {}".format(len(reduced_problem_list), reduced_problem_list)) | |
# solve | |
problem_solving_prompt_input = self.problem_solving_prompt + "\n\n{}".format(question) | |
sub_answer = None | |
for sub_q_count in range(len(reduced_problem_list)): | |
sub_q = reduced_problem_list[sub_q_count] | |
problem_solving_prompt_input = problem_solving_prompt_input + "\n\nQ: {}\nA:".format(sub_q) | |
# solve | |
print("-------------------------------------") | |
# if sub_q_count == len(reduced_problem_list) - 1: | |
# print("Current sub prompt input: {}".format(problem_solving_prompt_input)) | |
print(f"---Current sub question {sub_q_count}: {sub_q}") | |
if self.eng in ["gpt-3.5-turbo", "gpt-4"]: | |
# gpt4 | |
sub_answer = create_response_chat( | |
eng=self.eng, | |
prompt_input=[ | |
{"role": "system", "content": "Follow the given examples and answer the question."}, | |
{"role": "user", "content": problem_solving_prompt_input} | |
], | |
temperature=0, | |
max_tokens=256, | |
stop="Q" | |
)["choices"][0]["message"]["content"] | |
time.sleep(20) | |
else: | |
sub_answer = create_response( | |
eng=self.eng, | |
prompt_input=problem_solving_prompt_input, | |
temperature=0, | |
max_tokens=256, | |
stop="Q" | |
)["choices"][0]["text"] | |
problem_solving_prompt_input = problem_solving_prompt_input + sub_answer | |
print(f"---Current sub answer: {sub_answer}") | |
least_to_most_answer = answer_cleansing(method=self.answer_method, direct_answer_trigger_for_fewshot=direct_answer_trigger_for_fewshot, | |
dataset=dataset, | |
pred=sub_answer.split(direct_answer_trigger_for_fewshot)[-1] | |
) | |
# least_to_most_answer = answer_cleansing(args, "3/5") | |
print(f"Question: {question}") | |
print(f"Ans: {least_to_most_answer}") | |
print(f"golden: {answer}") | |
return least_to_most_answer | |
except Exception as e: | |
print(f"An error occurred: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment