Skip to content

Instantly share code, notes, and snippets.

@aont
Last active November 16, 2024 06:44
Show Gist options
  • Save aont/5d03f8946d4e96c737ceb51a61aa5807 to your computer and use it in GitHub Desktop.
Save aont/5d03f8946d4e96c737ceb51a61aa5807 to your computer and use it in GitHub Desktop.
import asyncio
import json
import aioimaplib
import sys
async def wait_for_new_message(host, user, password):
while True:
imap_client = aioimaplib.IMAP4_SSL(host=host)
await imap_client.wait_hello_from_server()
await imap_client.login(user, password)
await imap_client.select()
# idle = await imap_client.idle_start(timeout=10)
idle = await imap_client.idle_start()
while imap_client.has_pending_idle():
msg = await imap_client.wait_server_push()
sys.stderr.write(f"{msg=}\n")
if msg == aioimaplib.STOP_WAIT_SERVER_PUSH:
break
# imap_client.idle_done()
# try:
# await asyncio.wait_for(idle, 1)
# except TimeoutError:
# idle = await imap_client.idle_start()
# continue
await imap_client.logout()
if __name__ == '__main__':
with open("auth.json", "rt") as fp:
authobj = json.load(fp)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(wait_for_new_message(authobj["host"], authobj["username"], authobj["password"]))
import asyncio
import json
import sys
import ssl
class IMAPIdleClient:
def __init__(self, auth_file):
with open(auth_file, "rt") as fp:
self.authobj = json.load(fp)
self.context = ssl.create_default_context()
self.command_num = 0
self.reader = None
self.writer = None
async def connect(self):
"""IMAPサーバーへの接続を確立する"""
self.reader, self.writer = await asyncio.open_connection(
self.authobj["host"], 993, ssl=self.context
)
async def send_command(self, command):
"""IMAPサーバーにコマンドを送信する"""
sys.stderr.write(f"client: {command}\n")
self.writer.write(command.encode() + b'\r\n')
await self.writer.drain()
async def read_response(self):
"""IMAPサーバーからの応答を読み取る"""
try:
response = await self.reader.readuntil(separator=b'\r\n')
return response.decode()
except asyncio.exceptions.IncompleteReadError:
sys.stderr.write("Warning: Incomplete read, retrying...\n")
return None
async def login(self):
"""IMAPサーバーにログインする"""
await self.send_command(f'{self.command_num:04d} LOGIN {self.authobj["username"]} {self.authobj["password"]}')
self.command_num += 1
response = await self.read_response()
if response:
sys.stderr.write(f"server: {response}")
async def select_mailbox(self):
"""メールボックスを選択する"""
await self.send_command(f'{self.command_num:04d} SELECT INBOX')
self.command_num += 1
while True:
response = await self.read_response()
if response:
sys.stderr.write(f"server: {response}")
if "OK [READ-WRITE]" in response:
break
async def idle(self):
"""IDLEモードを開始し、新着メールを監視する"""
await self.send_command(f'{self.command_num:04d} IDLE')
self.command_num += 1
response = await self.read_response()
if response:
sys.stderr.write(f"server: {response}")
if "+ idling" in response.lower():
sys.stderr.write("info: IDLE mode started. Waiting for new emails...\n")
while True:
notification = await self.read_response()
if notification is None:
break
if notification:
sys.stderr.write(f"server: {notification}")
if "EXISTS" in notification:
sys.stderr.write("info: New email notification received!\n")
await self.send_command("DONE")
response = await self.read_response()
if response:
sys.stderr.write(f"server: {response}")
break
async def logout(self):
"""IMAPサーバーからログアウトする"""
await self.send_command(f'{self.command_num:04d} LOGOUT')
self.command_num += 1
response = await self.read_response()
if response:
sys.stderr.write(f"server: {response}")
self.writer.close()
await self.writer.wait_closed()
async def run(self):
"""IMAP IDLEクライアントのメインループ"""
while True:
try:
await self.connect()
greeting = await self.read_response()
if greeting:
sys.stderr.write(f"server: {greeting}")
await self.login()
await self.select_mailbox()
await self.idle()
except Exception as e:
sys.stderr.write(f"[exception] {e}\n")
await asyncio.sleep(1) # 再接続の前に短い遅延
except KeyboardInterrupt:
sys.stderr.write("\nExiting...\n")
await self.logout()
break
finally:
sys.stderr.write("[debug] reconnect\n")
# メイン実行
if __name__ == "__main__":
client = IMAPIdleClient("auth.json")
asyncio.run(client.run())
import json
import imaplib
import time
import sys
with open("auth.json", "rt") as fp:
authobj = json.load(fp)
def prepare_client(authobj: dict):
mail = imaplib.IMAP4_SSL(authobj["host"])
mail.login(authobj["username"], authobj["password"])
mail.select('inbox')
return mail
def idle_mailbox():
# IMAPサーバーに接続
mail = prepare_client(authobj)
sys.stderr.write("info: Waiting for new email notifications...\n")
try:
while True:
# サーバーにIDLEコマンドを送信
request = b'0001 IDLE\r\n'
mail.send(request)
sys.stderr.write(f"client: {request}\n")
response = mail.readline()
sys.stderr.write(f"server: {response}\n")
if response == b'':
# mail.logout()
mail = prepare_client(authobj)
if b'+ idling' in response.lower():
sys.stderr.write("info: IDLE mode started. Waiting for new emails...\n")
while True:
# サーバーからの応答を読み取る
response = mail.readline()
sys.stderr.write(f"server: {response}\n")
if response == b'':
# mail.logout()
# mail = prepare_client(authobj)
break
# 新着メールが届いたときの通知を確認
if b'EXISTS' in response:
sys.stderr.write("info: New email notification received!\n")
# IDLEモードの終了
request = b'DONE\r\n'
mail.send(request)
sys.stderr.write(f"client: {request}\n")
# サーバーがOK Idle completedを返すのを待つ
while True:
response = mail.readline()
sys.stderr.write(f"server: {response}\n")
if b'OK Idle completed' in response:
sys.stderr.write("info: IDLE completed, processing new emails...\n")
break
# # 新着メールの検索
# status, message_numbers = mail.search(None, 'UNSEEN')
# sys.stderr.write(f"{status=} {message_numbers=}\n")
# new_messages = message_numbers[0].split()
# sys.stderr.write(f"{new_messages=}\n")
# if new_messages:
# sys.stderr.write("info: New email(s) received!\n")
break
elif b'OK Idle completed' in response:
sys.stderr.write("info: IDLE completed, restarting...\n")
break
# 短い休止を入れて再度IDLEに入る
time.sleep(1)
except KeyboardInterrupt:
sys.stderr.write("\nExiting...\n")
finally:
# ログアウトと接続の終了
mail.logout()
# 実行
idle_mailbox()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment