Skip to content

Instantly share code, notes, and snippets.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@nakamuray
nakamuray / my_coroutine.py
Created May 24, 2016 11:49
re-implement asyncio.coroutine like something
import asyncio
import functools
import types
def my_coroutine(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
result_fut = asyncio.Future()
@nakamuray
nakamuray / deferred_and_future.py
Last active May 23, 2016 17:13
Deferred and Future
from twisted.internet import defer
from asyncio import futures, get_event_loop
d = defer.Deferred()
d.addCallback(lambda x: x + 1)
d.addCallback(lambda x: x * 2)
f1 = futures.Future()
f2 = futures.Future()
f1.add_done_callback(lambda f: f2.set_result(f.result() + 1))
@nakamuray
nakamuray / websocket_proxy.py
Last active February 9, 2019 05:26
[WIP] asyncio/aiohttp で websocket reverse proxy 書きたい
import asyncio
import aiohttp
from aiohttp import web
class WebsocketProxy(object):
def __init__(self, upstream_url):
self.upstream_url = upstream_url
from concurrent.futures import Future
fut = Future()
fut.add_done_callback(lambda f: f.result() * 2)
fut.set_result(21)
print(fut.result())
#assert fut.result() == 42
fut = Future()
@nakamuray
nakamuray / asyncio_bridge.py
Last active March 2, 2016 13:56
asyncio な関数を同期的なコードから呼びやすくするようなやつ
'''library to support calling asyncio functions from synchronous code
'''
import asyncio
import functools
import queue
import threading
def create_and_start_loop():
loop = asyncio.new_event_loop()
@nakamuray
nakamuray / eagain.js
Created January 6, 2016 12:29
Error: EAGAIN: resource temporarily unavailable, write
var fs = require('fs');
for (i=0; i<10000; i++) {
fs.writeSync(process.stdout.fd, 'hello world!\n');
}
@nakamuray
nakamuray / t-jedi.py
Created December 16, 2015 03:15
py3 で jedi 実行すると docstring が文字化けするっぽい?
import t
t.hello
import jedi
script = jedi.Script(path=__file__, line=2, column=7)
definition = script.goto_definitions()[0]
print(repr(definition.docstring()))
@nakamuray
nakamuray / trans.py
Created November 4, 2015 11:54
𝓗𝓮𝓵𝓵𝓸 𝓦𝓸𝓻𝓵𝓭
import string, sys
t = str.maketrans(string.ascii_uppercase+string.ascii_lowercase,
''.join(chr(u) for u in range(0x1d4d0, 0x1d504)))
print(sys.stdin.read().translate(t))
@nakamuray
nakamuray / nsd.py
Last active October 14, 2015 15:22
標準入力読んで画面にコメント流すやつ、 OSX でも動くかもしれない版。
'''Niconico Style Display
requirements:
PyQt5
pyobjc-framework-Cocoa (pyobjc-cocoa?, osx only)
'''
import multiprocessing
import sys
import threading