Last active
October 3, 2023 10:34
-
-
Save Henry-E/70a7e33dfd4bd4162aaeb3ff8dd309fe to your computer and use it in GitHub Desktop.
IB, PyQt and Aiogram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from PyQt6.QtWidgets import QApplication, QWidget, QVBoxLayout, QTextEdit, QPushButton | |
from PyQt6.QtCore import Qt, pyqtSignal | |
from ib_insync import IB, util | |
from aiogram import Bot, Dispatcher, types | |
import asyncio | |
import threading | |
# Import logging | |
import logging | |
# Setup logging | |
logging.basicConfig(level=logging.DEBUG) | |
# Initialize IB-insync | |
# ib = IB() | |
# Initialize Aiogram | |
token = None | |
approval_chat_id = None | |
bot = Bot(token=token) | |
dp = Dispatcher(bot) | |
class App(QWidget): | |
# Define a PyQt signal to update UI from another thread | |
update_text_signal = pyqtSignal(str) | |
approval_received_signal = pyqtSignal() | |
def __init__(self): | |
super().__init__() | |
self.aiogram_loop = None | |
self.setWindowTitle('IB-insync + PyQt6 + Aiogram Example') | |
self.setGeometry(100, 100, 600, 400) | |
layout = QVBoxLayout() | |
self.text_edit = QTextEdit(self) | |
self.text_edit.setReadOnly(True) | |
layout.addWidget(self.text_edit) | |
self.trigger_button = QPushButton("Trigger Order", self) | |
self.trigger_button.clicked.connect(self.trigger_order) | |
layout.addWidget(self.trigger_button) | |
self.setLayout(layout) | |
# Connect to IB | |
# ib.connect(host='127.0.0.1', port=7497, clientId=2) | |
# Connect signal to slot | |
self.update_text_signal.connect(self.update_text) | |
self.approval_received_signal.connect(self.approval_received) | |
def update_text(self, text): | |
logging.info(f'Updating text with: {text}') # Logging | |
self.text_edit.append(text) | |
def trigger_order(self): | |
print("triggering order") | |
logging.info(f'In PyQt: {asyncio.get_event_loop()}') | |
message_text = "Approval needed for order. Reply 'Approve' to proceed." | |
asyncio.run_coroutine_threadsafe( | |
bot.send_message(approval_chat_id, message_text), | |
self.aiogram_loop | |
) | |
def approval_received(self): | |
self.update_text("Order Approved!") | |
async def aiogram_async_thread(qApp): | |
@dp.message_handler(chat_id=approval_chat_id, commands=['start', 'help']) | |
async def handle_approval(message: types.Message): | |
logging.info("Order Approval received.") | |
qApp.approval_received_signal.emit() # Send signal to main thread | |
# Aiogram message handler | |
@dp.message_handler() | |
async def handle_message(message: types.Message): | |
chat_id = message.chat.id | |
logging.info(f'Received message from chat ID: {chat_id}') # Logging | |
logging.info(f'In Aiogram: {asyncio.get_event_loop()}') | |
qApp.update_text_signal.emit( | |
f"Incoming message from chat ID: {chat_id}") | |
# Start the Aiogram polling | |
await dp.start_polling() | |
def run_aiogram(qApp): | |
loop = asyncio.new_event_loop() | |
qApp.aiogram_loop = loop | |
asyncio.set_event_loop(loop) | |
try: | |
loop.run_until_complete(aiogram_async_thread(qApp)) | |
except Exception as e: | |
# Exception logging | |
logging.exception(f'An exception occurred in Aiogram event loop: {e}') | |
if __name__ == '__main__': | |
# Patch asyncio | |
# util.patchAsyncio() | |
# Create a Qt application | |
app = QApplication([]) | |
# Initialize our App class | |
qApp = App() | |
qApp.show() | |
# Run Aiogram on a separate thread | |
aiogram_thread_obj = threading.Thread(target=run_aiogram, args=(qApp,)) | |
aiogram_thread_obj.start() | |
# Thread State Monitoring | |
logging.info(f'Aiogram thread started: {aiogram_thread_obj.is_alive()}') | |
# Thread State Monitoring | |
logging.info(f'Active Threads: {threading.enumerate()}') | |
# Run the Qt event loop | |
app.exec() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for reply