Last active
June 15, 2023 09:36
-
-
Save IzumiSatoshi/c462ba0959fedf0960cff5cb13c3918a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import openai | |
import threading | |
import time | |
import textwrap | |
import random | |
import re | |
import copy | |
import requests, simpleaudio, tempfile, json | |
import pyaudio | |
import wave | |
import numpy as np | |
from concurrent.futures import ThreadPoolExecutor | |
openai.api_key = open("./openai_key.txt", "r").read().strip("\n") | |
## MODEL SETTINGS | |
ENABLE_COT = True | |
# MODEL = "gpt-4" | |
MODEL = "gpt-3.5-turbo" | |
TEMPERATURE = 1 | |
MAX_TOKEN = None | |
MESSAGE_LOG_DIR = "./werewolf/log_messages" | |
## GAME SETTINGS | |
TALKS_PER_DAY = 2 | |
SHUFFLE = True | |
PLAYER_NAMES = ["イズミ", "ズンダモン", "カスカベ", "アオヤマ", "キガシマ", "ネコツカ"] | |
## MIC | |
FORMAT = pyaudio.paInt16 # 16-bitの量子化ビット深度で録音する | |
CHANNELS = 1 # モノラルで録音する | |
RATE = 44100 # サンプリングレートを44100Hzに設定する | |
CHUNK = 1024 # バッファサイズを1024に設定する | |
RECORD_SECONDS = 100 # 録音する秒数を設定する | |
SILENCE_DURATION = 1 | |
VOLUME_THRESHOLD_END = 2000 | |
def main(): | |
# roles=["村人", "村人", "村人", "村人", "村人", "人狼"], | |
# roles = ["村人", "村人", "騎士", "占い師", "狂人", "人狼"] | |
roles = ["村人", "狂人", "騎士", "村人", "占い師", "人狼"] | |
gm = GameMaster( | |
players=[ | |
Player("カスカベ"), | |
Player("イズミ", is_human=True), | |
Player("アオヤマ"), | |
Player("ズンダモン"), | |
Player("キガシマ"), | |
Player("ネコツカ"), | |
], | |
roles=roles, | |
) | |
# init | |
gm.set_roles() | |
gm.init_player_message() | |
day = 1 | |
# 1st night | |
gm.speak_to_everyone(f"1日目の夜です。") | |
gm.manage_night_actions(is_first_day=True) | |
# 1st morning | |
gm.speak_to_everyone("朝になりました。目を開けてください。") | |
gm.manage_talks() | |
gm.take_vote_and_execute() | |
day += 1 | |
# 2nd night ~ | |
while True: | |
# night | |
gm.speak_to_everyone(f"{day}日目の夜になりました。") | |
gm.manage_night_actions() | |
# morning | |
if len(gm.killed_player_names_last_night) == 0: | |
gm.speak_to_everyone(f"{day}日目の朝になりました。昨日は誰も殺されませんでした。") | |
else: | |
gm.speak_to_everyone( | |
f"{day}日目の朝になりました。昨日殺されたのは{gm.killed_player_names_last_night}さんです。" | |
) | |
is_end = gm.judge() | |
if is_end: | |
break | |
gm.manage_talks() | |
gm.take_vote_and_execute() | |
is_end = gm.judge() | |
if is_end: | |
break | |
day += 1 | |
class GameMaster: | |
def __init__(self, players, roles): | |
self.players = copy.deepcopy(players) | |
self.roles = roles | |
self.killed_player_names_last_night = [] | |
self.dead_players = [] | |
def set_roles(self): | |
shuffled_role = copy.deepcopy(self.roles) | |
if SHUFFLE: | |
random.shuffle(shuffled_role) | |
for player in self.players: | |
player.set_role(shuffled_role.pop()) | |
# save init roles | |
self.init_players = copy.deepcopy(self.players) | |
def init_player_message(self): | |
for player in self.players: | |
player.init_message(player.role, self.roles) | |
def manage_talks(self): | |
# 人間が声を聞いてるあいだに、AIは先を考えてる。 | |
human_hearing_thread_future = None | |
for turn_count in range(TALKS_PER_DAY): | |
for idx, player in enumerate(self.players): | |
""" | |
gm_speak_thread = threading.Thread( | |
target=voicevox, args=(f"{player.name}さん、発言してください。", "メイメイ") | |
) | |
""" | |
# 人間が発言を聞いている間に、先にAPIを呼ぶことで速度を上げる | |
name = player.name | |
# 人間は先読みを出来ない | |
if not player.is_human: | |
talk_thread = ThreadPoolExecutor() | |
talk_thread_future = talk_thread.submit(player.talk) | |
# 最初はhearing futureがない | |
# 人間(自分)の発言回のあとはhuman_hearing_threadが作られない | |
if turn_count == 0 and idx == 0: | |
pass | |
elif self.players[idx - 1].is_human: | |
pass | |
else: | |
print("wait hearing") | |
human_hearing_thread_future.result() | |
# GMが時間を稼ぐことでレスポンスを速く見せる。 | |
if idx != 0: | |
time.sleep(1) | |
voicevox(f"{player.name}さん、発言してください。", "メイメイ") | |
# 人間は先読みを出来ない | |
if not player.is_human: | |
words = talk_thread_future.result() | |
talk_thread.shutdown() | |
# 人間がしゃべるとき | |
if player.is_human: | |
words = player.talk() | |
# hear | |
# include dead player | |
for p in self.players: | |
if p.name is not name: | |
if p.is_human: | |
human_hearing_thread = ThreadPoolExecutor() | |
human_hearing_thread_future = human_hearing_thread.submit( | |
p.hear, name, words | |
) | |
else: | |
p.hear(name, words) | |
for p in self.dead_players: | |
if p.name is not name: | |
if p.is_human: | |
human_hearing_thread = ThreadPoolExecutor() | |
human_hearing_thread_future = human_hearing_thread.submit( | |
p.hear, name, words | |
) | |
else: | |
p.hear(name, words) | |
# talkを終える前に、聞き終わったことを確認する | |
print("wait hearing") | |
human_hearing_thread_future.result() | |
def take_vote_and_execute(self): | |
self.speak_to_everyone("議論の時間が終わりました。投票を行います。誰を処刑するか選んでください。") | |
# voted_player:count | |
vote_dict = {} | |
for p in self.players: | |
voted = self.manage_selection(p) | |
if voted is not None: | |
if voted in vote_dict: | |
vote_dict[voted] += 1 | |
else: | |
vote_dict[voted] = 1 | |
# find max voted | |
max_votes = 0 | |
executed = None | |
for p, count in vote_dict.items(): | |
if count > max_votes: | |
max_votes = count | |
executed = p | |
result_str = "\n".join( | |
[ | |
f"{voted_player.name}:{count}" | |
for voted_player, count in vote_dict.items() | |
] | |
) | |
# TODO: bad indent | |
self.speak_to_everyone( | |
f""" | |
投票の結果は | |
{result_str} | |
となりました。 | |
処刑されることになったのは{executed.name}さんです。 | |
""" | |
) | |
self.kill_player(executed) | |
def manage_night_actions(self, is_first_day=False): | |
self.killed_player_names_last_night = [] | |
attacked_player = None | |
protected_player = None | |
for player in self.players: | |
if player.role == "人狼": | |
# Werewolf don't move on the first day right? | |
if not is_first_day: | |
self.speak_to_one(player, "あなたは人狼です。誰を殺すか決めてください。") | |
attacked_player = self.manage_selection(player) | |
if player.role == "騎士": | |
if not is_first_day: | |
self.speak_to_one(player, "あなたは騎士です。人狼から守る人を一人選んでください。") | |
protected_player = self.manage_selection(player) | |
if player.role == "占い師": | |
self.speak_to_one(player, "あなたは占い師です。誰を占うか決めてください。必ず誰かを占ってください。") | |
selected_player = self.manage_selection(player) | |
if selected_player is not None: | |
if selected_player.role == "人狼": | |
self.speak_to_one(player, f"{selected_player.name}さんは人狼です。") | |
else: | |
self.speak_to_one( | |
player, f"{selected_player.name}さんは人狼ではありません。" | |
) | |
else: | |
self.speak_to_one(player, f"選択に失敗しました。") | |
if attacked_player is not None and attacked_player != protected_player: | |
self.killed_player_names_last_night.append(attacked_player.name) | |
self.kill_player(attacked_player) | |
def judge(self): | |
# return end game or not | |
werewolves = 0 | |
villagers = 0 | |
for p in self.players: | |
if p.role == "人狼": | |
werewolves += 1 | |
else: | |
villagers += 1 | |
init_roles_str = "" | |
for player in self.init_players: | |
init_roles_str += f"{player.name}:{player.role}\n" | |
if werewolves >= villagers: | |
self.speak_to_everyone( | |
f""" | |
残っているプレイヤーの数が | |
人狼陣営 :{werewolves} | |
村人陣営 :{villagers} | |
となりました。 | |
人狼陣営の勝利です。 | |
配役は | |
{init_roles_str} | |
でした。 | |
皆様、お疲れさまでした。 | |
""" | |
) | |
return True | |
if werewolves == 0: | |
self.speak_to_everyone( | |
f""" | |
残っているプレイヤーの数が | |
人狼陣営 :{werewolves} | |
村人陣営 :{villagers} | |
となりました。 | |
村人陣営の勝利です! | |
配役は | |
{init_roles_str} | |
でした。 | |
皆様、お疲れさまでした。 | |
""" | |
) | |
return True | |
return False | |
def init_coming_out(self): | |
for player in self.init_players: | |
f"{player.name}:{player.role}" | |
self.speak_to_everyone( | |
f""" | |
""" | |
) | |
def manage_selection(self, player): | |
# select player from players | |
# output player instance | |
names_string = "" | |
names_list = [] | |
# p's scope is out of for, so it's confusing. | |
for idx, p in enumerate(self.players): | |
names_list.append(p.name) | |
# 自分は選択できない | |
if p != player: | |
names_string += f"{idx}:{p.name}\n" | |
player.hear( | |
"GM", | |
f"あなたは\n{names_string}から番号を選び、番号のみを発言してください。", | |
voice=False, | |
) | |
response = player.talk(whisper=True) | |
numbers = re.findall(r"\d+", response) | |
if len(numbers) == 1: | |
selected_name = names_list[int(numbers[0])] | |
return self.get_player_from_name(selected_name) | |
else: | |
return None | |
def kill_player(self, player): | |
self.players.remove(player) | |
self.dead_players.append(player) | |
def speak_to_one(self, player, words): | |
player.hear("GM", words) | |
def speak_to_everyone(self, words): | |
# include dead players | |
for player in self.players: | |
player.hear("GM", words) | |
for player in self.dead_players: | |
player.hear("GM", words) | |
def get_players_from_role(self, role): | |
ret = [] | |
for player in self.players: | |
if player.role == role: | |
ret.append(player) | |
return ret | |
def get_player_from_name(self, name): | |
target_player = None | |
for player in self.players: | |
if player.name == name: | |
target_player = player | |
break | |
return target_player | |
class Player: | |
def __init__(self, name, is_human=False): | |
self.name = name | |
self.is_human = is_human | |
self.working_memory = "" # All words received from my turn to turn | |
def init_message(self, role, roles): | |
if self.is_human: | |
self.hear("GM", f"あなたの役職は{self.role}です。") | |
if ENABLE_COT: | |
self.messages = [ | |
{ | |
# systemに役職を書いた方がいいのだろうか | |
"role": "system", | |
"content": textwrap.dedent( | |
f"""\ | |
## problem | |
テキストベースの人狼ゲームに参加しているプレイヤーとして振舞ってください。 | |
あなたのプレイヤー名は{self.name}です。 | |
ゲーム進行を務めるのはGM(Game Master)です。 | |
プレイヤーは全部で{len(roles)}人で、内訳は{roles}です。 | |
各プレイヤーが順番に発言していき{TALKS_PER_DAY}周回ると投票に移ります。 | |
以下は各役職の説明です。 | |
村人: ただの村人です。 | |
占い師: 一日目の夜から、誰か一人を選んでその人の役職を知ることが出来ます。 | |
騎士: 二日目の夜から、誰か一人を選んでその人を人狼の襲撃から守ることが出来ます。 | |
狂人: 村人として扱われますが、人狼の味方をします。人狼が勝つことで狂人も勝利出来ます。 | |
人狼: 二日目の夜から、誰か一人を選んで殺すことが出来ます。 | |
## format | |
Userから入力される人狼ゲームの情報に対して、Assistantは以下のフォーマットでプレイヤーとしての発言をします。 | |
- <thought></thought>ブロック内に、現在の状況、考察などを論理的に、ステップバイステップで書き出す。 | |
- <statement></statement>ブロック内に発言内容を記入する。 | |
例: | |
<thought> | |
1. GMから朝であることを告げられました。 | |
2. 挨拶をします。 | |
</thought> | |
<statement> | |
おはようございます。 | |
</statement> | |
## あなたの役職 | |
あなたの役職は{role}です。 | |
""" | |
), | |
}, | |
{ | |
"role": "user", | |
"content": textwrap.dedent( | |
f""" | |
### GM ### | |
ではゲームを始めましょう。準備は良いですか? | |
""" | |
), | |
}, | |
{ | |
"role": "assistant", | |
"content": textwrap.dedent( | |
f""" | |
<thought> | |
1. Game Masterから準備が出来ているか聞かれています。 | |
2. 準備が出来ていることを伝えます。 | |
</thought> | |
<statement> | |
準備完了です。 | |
</statement> | |
""" | |
), | |
}, | |
] | |
else: | |
self.messages = [ | |
{ | |
# systemに役職を書いた方がいいのだろうか | |
"role": "system", | |
"content": textwrap.dedent( | |
f"""\ | |
## problem | |
テキストベースの人狼ゲームに参加しているプレイヤーとして振舞ってください。 | |
あなたのプレイヤー名は{self.name}です。 | |
ゲーム進行を務めるのはGM(Game Master)です。 | |
プレイヤーは全部で{len(roles)}人で、内訳は{roles}です。 | |
各プレイヤーが順番に発言していき{TALKS_PER_DAY}周回ると投票に移ります。 | |
以下は各役職の説明です。 | |
村人: ただの村人です。 | |
占い師: 一日目の夜から、誰か一人を選んでその人の役職を知ることが出来ます。 | |
騎士: 二日目の夜から、誰か一人を選んでその人を人狼の襲撃から守ることが出来ます。 | |
狂人: 村人として扱われますが、人狼の味方をします。人狼が勝つことで狂人も勝利出来ます。 | |
人狼: 二日目の夜から、誰か一人を選んで殺すことが出来ます。 | |
## format | |
Userから入力される人狼ゲームの情報に対して、Assistantは以下のフォーマットでプレイヤーとしての発言をします。 | |
- <statement></statement>ブロック内に発言内容を記入する。 | |
例: | |
<statement> | |
おはようございます。 | |
</statement> | |
## あなたの役職 | |
あなたの役職は{role}です。 | |
""" | |
), | |
}, | |
{ | |
"role": "user", | |
"content": textwrap.dedent( | |
f""" | |
### GM ### | |
ではゲームを始めましょう。準備は良いですか? | |
""" | |
), | |
}, | |
{ | |
"role": "assistant", | |
"content": textwrap.dedent( | |
f""" | |
<statement> | |
準備完了です。 | |
</statement> | |
""" | |
), | |
}, | |
] | |
def set_role(self, role): | |
self.role = role | |
def talk(self, whisper=False): | |
# whisper: don't speak with voice | |
self.messages.append({"role": "user", "content": self.working_memory}) | |
statement = "" | |
if self.is_human: | |
print(f"### {self.name} ###") | |
if whisper: | |
response = input(">") | |
statement = response | |
print("") | |
else: | |
response = stt() | |
statement = response | |
print(f">{statement}") | |
else: | |
# print(f"[{self.name}]") | |
# print(self.messages) | |
response = call_gpt(self.messages) | |
match = re.search(r"<statement>(.*?)</statement>", response, re.DOTALL) | |
if match: | |
statement = match.group(1).strip() | |
self.messages.append({"role": "assistant", "content": response}) | |
self.working_memory = "" # clear working memory | |
write_messages(self.messages, MESSAGE_LOG_DIR + "/" + self.name) | |
return statement | |
def hear(self, name, words, voice=True): | |
# TODO: Is structure good? | |
structure = f"### {name} ### \n {words} \n" | |
if self.is_human: | |
print(structure) | |
if voice: | |
if name == "GM": | |
voicevox(words, "メイメイ") | |
else: | |
voicevox(words, name) | |
self.working_memory += structure | |
def call_gpt(messages, stop=None, max_retries=5, timeout=60): | |
def api_call(api_result, event): | |
try: | |
print("calling api") | |
completion = openai.ChatCompletion.create( | |
model=MODEL, | |
messages=messages, | |
stop=stop, | |
temperature=TEMPERATURE, | |
max_tokens=MAX_TOKEN, | |
) | |
api_result["response"] = completion.choices[0].message.content | |
except Exception as e: | |
api_result["error"] = e | |
finally: | |
event.set() | |
for attempt in range(max_retries): | |
api_result = {"response": None, "error": None} | |
event = threading.Event() | |
api_thread = threading.Thread(target=api_call, args=(api_result, event)) | |
api_thread.start() | |
finished = event.wait(timeout) | |
if not finished: | |
print( | |
f"Timeout exceeded: {timeout}s. Attempt {attempt + 1} of {max_retries}. Retrying..." | |
) | |
else: | |
if api_result["error"] is not None: | |
print(api_result["error"]) | |
print( | |
f"API error: {api_result['error']}. Attempt {attempt + 1} of {max_retries}. Retrying..." | |
) | |
else: | |
return api_result["response"] | |
print("Reached maximum retries. Aborting.") | |
return None | |
def voicevox(text, speaker): | |
speaker_id_dict = { | |
"ズンダモン": 3, | |
"カスカベ": 8, | |
"アオヤマ": 13, | |
"メイメイ": 14, | |
"キガシマ": 53, | |
"ネコツカ": 58, | |
} | |
host = "127.0.0.1" | |
port = 50021 | |
params = ( | |
("text", text), | |
("speaker", speaker_id_dict[speaker]), | |
) | |
response1 = requests.post(f"http://{host}:{port}/audio_query", params=params) | |
response2 = requests.post( | |
f"http://{host}:{port}/synthesis", | |
headers={"Content-Type": "application/json"}, | |
params=params, | |
data=json.dumps(response1.json()), | |
) | |
with tempfile.TemporaryDirectory() as tmp: | |
with open(f"{tmp}/audi.wav", "wb") as f: | |
f.write(response2.content) | |
wav_obj = simpleaudio.WaveObject.from_wave_file(f"{tmp}/audi.wav") | |
play_obj = wav_obj.play() | |
play_obj.wait_done() | |
def stt(): | |
# speach to text with whisper | |
audio = pyaudio.PyAudio() # PyAudioオブジェクトを作成する | |
# 録音用のストリームを開く | |
stream = audio.open( | |
format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK | |
) | |
print("録音を開始します...") | |
frames = [] | |
# 録音する時間だけバッファから音声データを読み込む | |
# 音量が一定よりも低い時間が続いたら終了 | |
silence_threshold = int(RATE / CHUNK * SILENCE_DURATION) | |
silence_chunks = 0 | |
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): | |
data = stream.read(CHUNK) | |
audio_data = np.frombuffer(data, dtype=np.int16) | |
volume = np.linalg.norm(audio_data) / np.sqrt(len(audio_data)) | |
if volume < VOLUME_THRESHOLD_END: | |
silence_chunks += 1 | |
else: | |
silence_chunks = 0 | |
frames.append(data) | |
if silence_chunks > silence_threshold: | |
break | |
print("録音が終了しました。") | |
# 録音用のストリームを閉じる | |
stream.stop_stream() | |
stream.close() | |
audio.terminate() | |
with tempfile.TemporaryDirectory() as tmp: | |
# 録音した音声データをファイルに保存する | |
filepath = f"{tmp}/out.wav" | |
waveFile = wave.open(filepath, "wb") | |
waveFile.setnchannels(CHANNELS) | |
waveFile.setsampwidth(audio.get_sample_size(FORMAT)) | |
waveFile.setframerate(RATE) | |
waveFile.writeframes(b"".join(frames)) | |
waveFile.close() | |
# 書き起こし | |
with open(filepath, "rb") as f: | |
prompt = ",".join(PLAYER_NAMES) | |
transcript = openai.Audio.transcribe( | |
"whisper-1", f, language="ja", prompt=prompt | |
) | |
return transcript["text"] | |
def write_messages(messages, file_path): | |
message_log = "" | |
for message in messages: | |
role = message["role"] | |
content = message["content"] | |
message_log += f"【{role.capitalize()}】\n{content}\n" | |
with open(file_path, "w", encoding="utf-8") as file: | |
file.write(message_log) | |
if __name__ == "__main__": | |
# print(stt()) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment