Skip to content

Instantly share code, notes, and snippets.

@trygveaa
Last active May 17, 2025 14:41
Show Gist options
  • Save trygveaa/232a1cad073d59415f55099a562065a1 to your computer and use it in GitHub Desktop.
Save trygveaa/232a1cad073d59415f55099a562065a1 to your computer and use it in GitHub Desktop.
import concurrent.futures
import os
import queue
import urllib.request
import weechat
URLS = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
]
msg_queue = queue.Queue()
r_fd, w_fd = os.pipe()
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
result = conn.read()
msg_queue.put(result)
os.write(w_fd, b"x") # Signal main thread
return result
def my_command_cb(data: str, buffer: str, args: str) -> int:
executor = concurrent.futures.ThreadPoolExecutor()
for url in URLS:
executor.submit(load_url, url, 60)
return weechat.WEECHAT_RC_OK
def notify_main_thread(data, fd):
os.read(fd, 1) # Clear signal
try:
msg = msg_queue.get_nowait()
weechat.prnt("", f"Thread done: {msg}")
except queue.Empty:
pass
return weechat.WEECHAT_RC_OK
def noop_timer_cb(data, remaining_calls):
return weechat.WEECHAT_RC_OK
if weechat.register("threads", "trygveaa", "1.0", "MIT", "", "", ""):
weechat.hook_command(
"call",
"",
"",
"",
"",
"my_command_cb",
"",
)
weechat.hook_fd(r_fd, 1, 0, 0, "notify_main_thread", "")
# Uncomment this to see that it finishes when it's active
# weechat.hook_timer(100, 0, 0, "noop_timer_cb", "")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment