(具体的な実装についてはなるべく触れず、なんかイメージ的な話をする。)
- 「これこれのイベントが起こったら、この続く処理を実行してくれ」
import array | |
import json | |
import os | |
import socket | |
import sys | |
SOCKET_PATH = '/tmp/pyd.sock' | |
sock = socket.socket(socket.AF_UNIX) |
#!/bin/sh | |
# XXX: 雑 | |
mkdir -p ~/.piping | |
if [ "$1" = "-w" ]; then | |
mkfifo ~/.piping/"$2" || exit 1 | |
cat > ~/.piping/"$2" | |
rm ~/.piping/"$2" | |
elif [ "$1" = "-r" ]; then | |
cat ~/.piping/"$2" |
import email.message | |
msg = email.message.EmailMessage() | |
msg['Subject'] = '[こんにちは、世界] 新しい朝が来た、希望の朝だ' | |
print(msg.as_string()) |
import asyncio | |
class FunctionPool: | |
'''Connection pool の関数版みたいなやつ考える | |
* 同時に使っていい分だけ関数が pool されている | |
* 関数を使いたいときはそこから1個取り出して使う | |
* 使い終わったら pool に戻す | |
* pool が空なら誰かが使い終わって戻してくれるまで待つ |
import asyncio | |
import functools | |
import time | |
import urllib.parse | |
import aiohttp | |
def throttling(sleep): | |
def decorator(func): |
python3 <<EOP | |
import gi | |
gi.require_version('Gtk', '3.0') | |
gi.require_version('WebKit2', '4.0') | |
from gi.repository import Gtk | |
from gi.repository import WebKit2 |
import threading | |
from asyncio_extras.threads import threadpool | |
async def main(): | |
print('main:', threading.current_thread()) | |
for _ in range(4): | |
async with threadpool(): |
class A(object): | |
# XXX: こういう、お互いに循環して使っているクラスの type hint ってどうやって書くの? | |
#def use_B(self, b: B): | |
def use_B(self, b): | |
print("I'm using {}".format(b)) | |
class B(object): | |
def use_A(self, a: A): | |
print("I'm using {}".format(a)) |
import asyncio | |
import threading | |
def unset_event_loop_policy_forever(): | |
while True: | |
asyncio.set_event_loop_policy(None) | |
t = threading.Thread(target=unset_event_loop_policy_forever) |