This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from PyQt4 import Qt | |
def Signal(name, *args): | |
def get_signal(self): | |
if not hasattr(self, '_signallers'): | |
self._signallers = {} | |
if (name, args) not in self._signallers: | |
signal = Qt.pyqtSignal(*args, name=name) | |
signaller_type = type('Signaller', (Qt.QObject,), {name: signal}) | |
self._signallers[name, args] = signaller_type() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
from devicetest import TangoTestContext | |
from PyTango import utils, EventType, DevState | |
from PyTango.server import Device, DeviceMeta, command, run | |
class Dummy(Device): | |
__metaclass__ = DeviceMeta | |
def init_device(self): | |
self.set_change_event('State', True, True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def background(): | |
while running: | |
await asyncio.sleep(0) | |
[x**2 for x in range(10**5)] | |
async def main(): | |
return await asyncio.sleep(20, result='hello') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# It's not recommended to modify this file in-place, because it will be | |
# overwritten during package upgrades. If you want to customize, the | |
# best way is to create a file "/etc/systemd/system/tango-db.service", | |
# containing | |
# .include /lib/systemd/system/tango-db.service | |
# ...make your changes here... | |
# or create a file "/etc/systemd/system/tango-db.service.d/foo.conf", | |
# which doesn't need to include ".include" call and which will be parsed | |
# after the file tango-db.service itself is parsed. | |
# |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Provide an executor to run asyncio coroutines in a shadow thread.""" | |
import asyncio | |
from threading import Thread | |
from concurrent.futures import Executor | |
class AsyncioExecutor(Executor): | |
def __init__(self): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Provide high-level UDP endpoints for asyncio. | |
Example: | |
async def main(): | |
# Create a local UDP enpoint | |
local = await open_local_endpoint('localhost', 8888) | |
# Create a remote UDP enpoint, pointing to the first one |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import collections | |
def run(tasks): | |
# Prepare | |
results = {} | |
count = itertools.count() | |
queue = collections.deque() | |
for task in tasks: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Fast eratosthenes prime generator with a cached wheel""" | |
import sys | |
import time | |
import operator | |
from itertools import cycle, chain, accumulate, islice, count | |
from functools import lru_cache, reduce, partial | |
from contextlib import contextmanager | |
CACHE_LEVEL = 5 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Command line interface for monitoring asyncio tasks.""" | |
import os | |
import signal | |
import asyncio | |
import argparse | |
import traceback | |
import linecache | |
from itertools import count |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""UDP proxy server.""" | |
import asyncio | |
class ProxyDatagramProtocol(asyncio.DatagramProtocol): | |
def __init__(self, remote_address): | |
self.remote_address = remote_address | |
self.remotes = {} |
OlderNewer