Skip to content

Instantly share code, notes, and snippets.

@flyer103
flyer103 / push_simple.py
Created March 24, 2014 09:19
简单的 push/pull 例子,本例是 push.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""简单的 push/pull 例子,本例是 push.
"""
import random
import time
@flyer103
flyer103 / test_threadpool.py
Created March 15, 2014 07:05
测试 python3.2+ 中的 ThreadPoolExecutor
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""测试使用 concurrent 提供的线程池.
"""
import time
import random
import concurrent.futures
@flyer103
flyer103 / test_multiprocess_res.py
Created March 15, 2014 06:58
测试 ProcessPoolExecutor 异步结果的获得
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""测试通过 concurrent.futures.ProcessPoolExecutor 获取多进程执行的结果.
"""
import pprint
from concurrent.futures import ProcessPoolExecutor, as_completed
@flyer103
flyer103 / test.py
Created February 24, 2014 06:06
Prove that a class is not thread-safe.
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
import threading
import Queue
import time
class MySingletonNTSafe(object):
@flyer103
flyer103 / peering3.py
Created December 21, 2013 07:14
An modification of [inter-broker-routing example](http://zguide.zeromq.org/page:all#Worked-Example-Inter-Broker-Routing)
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
"""
"""
import sys
import time
import random
import logging
@flyer103
flyer103 / ADT_array.py
Last active January 1, 2016 00:09
Implement fixed size one-dimension array and two-dimension array in Python.
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
"""Implement fixed size one-dimension array and two-dimension array
in Python.
"""
import ctypes
@flyer103
flyer103 / test_exception_greenlet.py
Last active December 31, 2015 00:08
test exception
#!/usr/bin/env python2.7
#coding: utf-8
import logging
import gevent
logging.basicConfig(level=logging.INFO)
@flyer103
flyer103 / test_pool.py
Last active December 30, 2015 11:29
Play with gevent.pool.Pool
#!/usr/bin/env python2.7
#coding: utf-8
import gevent
import gevent.pool
class TestPool(object):
def __init__(self, maxsize=10):
@flyer103
flyer103 / test_asyncresult.py
Created December 6, 2013 08:20
Test the usage of 'gevent.event.AsyncResult'
#!/usr/bin/env python2.7
#coding: utf-8
"""Test the usage of 'gevent.event.AsyncResult'
"""
import random
import gevent
from gevent.event import AsyncResult
@flyer103
flyer103 / test_more_event.py
Created December 6, 2013 08:19
再次测试使用 gevent.event.Event
#!/urs/bin/env python2.7
#coding: utf-8
"""Test the usage of 'gevent.event.Event' class.
"""
import random
import gevent
from gevent.event import Event