SELECT
details.installer.name,
details.installer.version,
COUNT(*) as total_downloads
FROM
TABLE_DATE_RANGE(
[the-psf:pypi.downloads],
TIMESTAMP("20160114"),
CURRENT_TIMESTAMP()
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import socket | |
parser = argparse.ArgumentParser(description="keepalive example") | |
parser.add_argument("mode", choices=("server", "client")) | |
def server(): | |
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import attr | |
from twisted.internet import defer, task | |
@attr.s | |
class LoopNTimes(object): | |
_times = attr.ib() | |
_f = attr.ib() | |
_loopingCall = attr.ib(default=None, init=False) | |
_called = attr.ib(default=0, init=False) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import endpoints, defer, protocol, task | |
class Echo(protocol.Protocol): | |
def dataReceived(self, data): | |
self.transport.write(data) | |
@task.react | |
def main(reactor): |
SELECT
REGEXP_EXTRACT(details.python, r"^([^\.]+\.[^\.]+)") as python_version,
details.installer.name,
details.installer.version,
COUNT(*) as total_downloads
FROM
TABLE_DATE_RANGE(
[the-psf:pypi.downloads],
TIMESTAMP("20160114"),
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
from hypothesis import given, strategies as st | |
class ComparisonTests(unittest.TestCase): | |
def assertNonesCauseTypeError(self, first, second): | |
try: | |
first > second |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from twisted.protocols import basic | |
from twisted.internet import defer, endpoints, protocol, task | |
class GetFileProtocol(basic.LineReceiver): | |
delimiter = '\n' | |
def lineReceived(self, line): | |
parts = line.split(None, 1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
client |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
def adapt(the_future): | |
try: | |
print("*** result before") | |
the_future.result() | |
except: | |
print("*** exception before") | |
the_future.exception() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eliot | |
import sys | |
eliot.to_file(sys.stdout) | |
with eliot.start_action(action_type=u"task"): | |
eliot.Message.log(key=u"value") |