Skip to content

Instantly share code, notes, and snippets.

@nakamuray
nakamuray / client.py
Created March 30, 2019 08:16
事前起動させておいた python process からの fork により、 python アプリケーションの起動高速化をしてみるテスト
import array
import json
import os
import socket
import sys
SOCKET_PATH = '/tmp/pyd.sock'
sock = socket.socket(socket.AF_UNIX)
#!/bin/sh
# XXX: 雑
mkdir -p ~/.piping
if [ "$1" = "-w" ]; then
mkfifo ~/.piping/"$2" || exit 1
cat > ~/.piping/"$2"
rm ~/.piping/"$2"
elif [ "$1" = "-r" ]; then
cat ~/.piping/"$2"
import email.message
msg = email.message.EmailMessage()
msg['Subject'] = '[こんにちは、世界] 新しい朝が来た、希望の朝だ'
print(msg.as_string())
import asyncio
class FunctionPool:
'''Connection pool の関数版みたいなやつ考える
* 同時に使っていい分だけ関数が pool されている
* 関数を使いたいときはそこから1個取り出して使う
* 使い終わったら pool に戻す
* pool が空なら誰かが使い終わって戻してくれるまで待つ
import asyncio
import functools
import time
import urllib.parse
import aiohttp
def throttling(sleep):
def decorator(func):
@nakamuray
nakamuray / webview.vim
Created September 21, 2016 05:57
open webview within vim (gtk3 build)
python3 <<EOP
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('WebKit2', '4.0')
from gi.repository import Gtk
from gi.repository import WebKit2
import threading
from asyncio_extras.threads import threadpool
async def main():
print('main:', threading.current_thread())
for _ in range(4):
async with threadpool():
@nakamuray
nakamuray / circular-typing.py
Created June 25, 2016 06:18
循環して参照しているようなヤツの type hint どうやって書けばいいの?
class A(object):
# XXX: こういう、お互いに循環して使っているクラスの type hint ってどうやって書くの?
#def use_B(self, b: B):
def use_B(self, b):
print("I'm using {}".format(b))
class B(object):
def use_A(self, a: A):
print("I'm using {}".format(a))
import asyncio
import threading
def unset_event_loop_policy_forever():
while True:
asyncio.set_event_loop_policy(None)
t = threading.Thread(target=unset_event_loop_policy_forever)

ふわふわした asyncio の話

(具体的な実装についてはなるべく触れず、なんかイメージ的な話をする。)

イベント駆動

  • 「これこれのイベントが起こったら、この続く処理を実行してくれ」