Last active
November 16, 2024 06:44
-
-
Save aont/5d03f8946d4e96c737ceb51a61aa5807 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import aioimaplib | |
import sys | |
async def wait_for_new_message(host, user, password): | |
while True: | |
imap_client = aioimaplib.IMAP4_SSL(host=host) | |
await imap_client.wait_hello_from_server() | |
await imap_client.login(user, password) | |
await imap_client.select() | |
# idle = await imap_client.idle_start(timeout=10) | |
idle = await imap_client.idle_start() | |
while imap_client.has_pending_idle(): | |
msg = await imap_client.wait_server_push() | |
sys.stderr.write(f"{msg=}\n") | |
if msg == aioimaplib.STOP_WAIT_SERVER_PUSH: | |
break | |
# imap_client.idle_done() | |
# try: | |
# await asyncio.wait_for(idle, 1) | |
# except TimeoutError: | |
# idle = await imap_client.idle_start() | |
# continue | |
await imap_client.logout() | |
if __name__ == '__main__': | |
with open("auth.json", "rt") as fp: | |
authobj = json.load(fp) | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
loop.run_until_complete(wait_for_new_message(authobj["host"], authobj["username"], authobj["password"])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import sys | |
import ssl | |
class IMAPIdleClient: | |
def __init__(self, auth_file): | |
with open(auth_file, "rt") as fp: | |
self.authobj = json.load(fp) | |
self.context = ssl.create_default_context() | |
self.command_num = 0 | |
self.reader = None | |
self.writer = None | |
async def connect(self): | |
"""IMAPサーバーへの接続を確立する""" | |
self.reader, self.writer = await asyncio.open_connection( | |
self.authobj["host"], 993, ssl=self.context | |
) | |
async def send_command(self, command): | |
"""IMAPサーバーにコマンドを送信する""" | |
sys.stderr.write(f"client: {command}\n") | |
self.writer.write(command.encode() + b'\r\n') | |
await self.writer.drain() | |
async def read_response(self): | |
"""IMAPサーバーからの応答を読み取る""" | |
try: | |
response = await self.reader.readuntil(separator=b'\r\n') | |
return response.decode() | |
except asyncio.exceptions.IncompleteReadError: | |
sys.stderr.write("Warning: Incomplete read, retrying...\n") | |
return None | |
async def login(self): | |
"""IMAPサーバーにログインする""" | |
await self.send_command(f'{self.command_num:04d} LOGIN {self.authobj["username"]} {self.authobj["password"]}') | |
self.command_num += 1 | |
response = await self.read_response() | |
if response: | |
sys.stderr.write(f"server: {response}") | |
async def select_mailbox(self): | |
"""メールボックスを選択する""" | |
await self.send_command(f'{self.command_num:04d} SELECT INBOX') | |
self.command_num += 1 | |
while True: | |
response = await self.read_response() | |
if response: | |
sys.stderr.write(f"server: {response}") | |
if "OK [READ-WRITE]" in response: | |
break | |
async def idle(self): | |
"""IDLEモードを開始し、新着メールを監視する""" | |
await self.send_command(f'{self.command_num:04d} IDLE') | |
self.command_num += 1 | |
response = await self.read_response() | |
if response: | |
sys.stderr.write(f"server: {response}") | |
if "+ idling" in response.lower(): | |
sys.stderr.write("info: IDLE mode started. Waiting for new emails...\n") | |
while True: | |
notification = await self.read_response() | |
if notification is None: | |
break | |
if notification: | |
sys.stderr.write(f"server: {notification}") | |
if "EXISTS" in notification: | |
sys.stderr.write("info: New email notification received!\n") | |
await self.send_command("DONE") | |
response = await self.read_response() | |
if response: | |
sys.stderr.write(f"server: {response}") | |
break | |
async def logout(self): | |
"""IMAPサーバーからログアウトする""" | |
await self.send_command(f'{self.command_num:04d} LOGOUT') | |
self.command_num += 1 | |
response = await self.read_response() | |
if response: | |
sys.stderr.write(f"server: {response}") | |
self.writer.close() | |
await self.writer.wait_closed() | |
async def run(self): | |
"""IMAP IDLEクライアントのメインループ""" | |
while True: | |
try: | |
await self.connect() | |
greeting = await self.read_response() | |
if greeting: | |
sys.stderr.write(f"server: {greeting}") | |
await self.login() | |
await self.select_mailbox() | |
await self.idle() | |
except Exception as e: | |
sys.stderr.write(f"[exception] {e}\n") | |
await asyncio.sleep(1) # 再接続の前に短い遅延 | |
except KeyboardInterrupt: | |
sys.stderr.write("\nExiting...\n") | |
await self.logout() | |
break | |
finally: | |
sys.stderr.write("[debug] reconnect\n") | |
# メイン実行 | |
if __name__ == "__main__": | |
client = IMAPIdleClient("auth.json") | |
asyncio.run(client.run()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import imaplib | |
import time | |
import sys | |
with open("auth.json", "rt") as fp: | |
authobj = json.load(fp) | |
def prepare_client(authobj: dict): | |
mail = imaplib.IMAP4_SSL(authobj["host"]) | |
mail.login(authobj["username"], authobj["password"]) | |
mail.select('inbox') | |
return mail | |
def idle_mailbox(): | |
# IMAPサーバーに接続 | |
mail = prepare_client(authobj) | |
sys.stderr.write("info: Waiting for new email notifications...\n") | |
try: | |
while True: | |
# サーバーにIDLEコマンドを送信 | |
request = b'0001 IDLE\r\n' | |
mail.send(request) | |
sys.stderr.write(f"client: {request}\n") | |
response = mail.readline() | |
sys.stderr.write(f"server: {response}\n") | |
if response == b'': | |
# mail.logout() | |
mail = prepare_client(authobj) | |
if b'+ idling' in response.lower(): | |
sys.stderr.write("info: IDLE mode started. Waiting for new emails...\n") | |
while True: | |
# サーバーからの応答を読み取る | |
response = mail.readline() | |
sys.stderr.write(f"server: {response}\n") | |
if response == b'': | |
# mail.logout() | |
# mail = prepare_client(authobj) | |
break | |
# 新着メールが届いたときの通知を確認 | |
if b'EXISTS' in response: | |
sys.stderr.write("info: New email notification received!\n") | |
# IDLEモードの終了 | |
request = b'DONE\r\n' | |
mail.send(request) | |
sys.stderr.write(f"client: {request}\n") | |
# サーバーがOK Idle completedを返すのを待つ | |
while True: | |
response = mail.readline() | |
sys.stderr.write(f"server: {response}\n") | |
if b'OK Idle completed' in response: | |
sys.stderr.write("info: IDLE completed, processing new emails...\n") | |
break | |
# # 新着メールの検索 | |
# status, message_numbers = mail.search(None, 'UNSEEN') | |
# sys.stderr.write(f"{status=} {message_numbers=}\n") | |
# new_messages = message_numbers[0].split() | |
# sys.stderr.write(f"{new_messages=}\n") | |
# if new_messages: | |
# sys.stderr.write("info: New email(s) received!\n") | |
break | |
elif b'OK Idle completed' in response: | |
sys.stderr.write("info: IDLE completed, restarting...\n") | |
break | |
# 短い休止を入れて再度IDLEに入る | |
time.sleep(1) | |
except KeyboardInterrupt: | |
sys.stderr.write("\nExiting...\n") | |
finally: | |
# ログアウトと接続の終了 | |
mail.logout() | |
# 実行 | |
idle_mailbox() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment