Created
May 18, 2011 06:16
-
-
Save progrium/978072 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def getPage(url): | |
d = Deferred() | |
# here it would set up an http client programmed to call | |
# d.callback(body) when page is fully received... | |
return d | |
def handleBody(body): | |
print body | |
def handleError(error): | |
print error | |
d = getPage("http://google.com") | |
d.addCallback(handleBody) | |
d.addErrback(handleError) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import reactor, defer | |
from twisted.web.client import getPage | |
import re | |
def lookup(country, search_term): | |
main_d = defer.Deferred() | |
def first_step(): | |
query = "http://www.google.%s/search?q=%s" % (country,search_term) | |
d = getPage(query) | |
d.addCallback(second_step, country) | |
d.addErrback(failure, country) | |
def second_step(content, country): | |
m = re.search('<div id="?res.*?href="(?P<url>http://[^"]+)"', | |
content, re.DOTALL) | |
if not m: | |
main_d.callback(None) | |
return | |
url = m.group('url') | |
d = getPage(url) | |
d.addCallback(third_step, country, url) | |
d.addErrback(failure, country) | |
def third_step(content, country, url): | |
m = re.search("<title>(.*?)</title>", content) | |
if m: | |
title = m.group(1) | |
main_d.callback(dict(url = url, title = title)) | |
else: | |
main_d.callback(dict(url=url, title="{not-specified}")) | |
def failure(e, country): | |
print ".%s FAILED: %s" % (country, str(e)) | |
main_d.callback(None) | |
first_step() | |
return main_d | |
def printResult(result, country): | |
if result: | |
print ".%s result: %s (%s)" % (country, result['url'], result['title']) | |
else: | |
print ".%s result: nothing found" % country | |
def runme(): | |
all = [] | |
for country in ["com", "pl", "nonexistant"]: | |
d = lookup(country, "Twisted") | |
d.addCallback(printResult, country) | |
all.append(d) | |
defer.DeferredList(all).addCallback(lambda _: reactor.stop()) | |
reactor.callLater(0, runme) | |
reactor.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@defer.inlineCallbacks | |
def someFunction(): | |
a = 1 | |
b = yield deferredReturningFunction(a) | |
c = yield anotherDeferredReturningFunction(a, b) | |
defer.returnValue(c) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import reactor, defer | |
from twisted.web.client import getPage | |
import re | |
@defer.inlineCallbacks | |
def lookup(country, search_term): | |
try: | |
query = "http://www.google.%s/search?q=%s" % (country,search_term) | |
content = yield getPage(query) | |
m = re.search('<div id="?res.*?href="(?P<url>http://[^"]+)"', | |
content, re.DOTALL) | |
if not m: | |
defer.returnValue(None) | |
url = m.group('url') | |
content = yield getPage(url) | |
m = re.search("<title>(.*?)</title>", content) | |
if m: | |
defer.returnValue(dict(url=url, title=m.group(1))) | |
else: | |
defer.returnValue(dict(url=url, title="{not-specified}")) | |
except Exception, e: | |
print ".%s FAILED: %s" % (country, str(e)) | |
def printResult(result, country): | |
if result: | |
print ".%s result: %s (%s)" % (country, result['url'], result['title']) | |
else: | |
print ".%s result: nothing found" % country | |
def runme(): | |
all = [] | |
for country in ["com", "pl", "nonexistant"]: | |
d = lookup(country, "Twisted") | |
d.addCallback(printResult, country) | |
all.append(d) | |
defer.DeferredList(all).addCallback(lambda _: reactor.stop()) | |
reactor.callLater(0, runme) | |
reactor.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gevent | |
import re | |
import urllib | |
import gevent.pool | |
import gevent.monkey; gevent.monkey.patch_all(thread=False) | |
def lookup(country, search_term): | |
try: | |
query = "http://www.google.%s/search?q=%s" % (country,search_term) | |
content = urllib.urlopen(query).read() | |
m = re.search('<div id="?res.*?href="(?P<url>http://[^"]+)"', | |
content, re.DOTALL) | |
if not m: | |
return None | |
url = m.group('url') | |
content = urllib.urlopen(url).read() | |
m = re.search("<title>(.*?)</title>", content) | |
if m: | |
return dict(url=url, title=m.group(1)) | |
else: | |
return dict(url=url, title="{not-specified}") | |
except Exception, e: | |
print ".%s FAILED: %s" % (country, str(e)) | |
def runme(): | |
all = gevent.pool.Group() | |
for country in ["com", "pl", "nonexistant"]: | |
all.spawn(lookup, country, "Twisted")) | |
all.join() | |
for result in all: | |
if result.value: | |
print ".%s result: %s (%s)" % (country, result.value['url'], result.value['title']) | |
else: | |
print ".%s result: nothing found" % country | |
runme() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment