Skip to content

Instantly share code, notes, and snippets.

@progrium
Created May 18, 2011 06:16
Show Gist options
  • Save progrium/978072 to your computer and use it in GitHub Desktop.
Save progrium/978072 to your computer and use it in GitHub Desktop.
def getPage(url):
d = Deferred()
# here it would set up an http client programmed to call
# d.callback(body) when page is fully received...
return d
def handleBody(body):
print body
def handleError(error):
print error
d = getPage("http://google.com")
d.addCallback(handleBody)
d.addErrback(handleError)
from twisted.internet import reactor, defer
from twisted.web.client import getPage
import re
def lookup(country, search_term):
main_d = defer.Deferred()
def first_step():
query = "http://www.google.%s/search?q=%s" % (country,search_term)
d = getPage(query)
d.addCallback(second_step, country)
d.addErrback(failure, country)
def second_step(content, country):
m = re.search('<div id="?res.*?href="(?P<url>http://[^"]+)"',
content, re.DOTALL)
if not m:
main_d.callback(None)
return
url = m.group('url')
d = getPage(url)
d.addCallback(third_step, country, url)
d.addErrback(failure, country)
def third_step(content, country, url):
m = re.search("<title>(.*?)</title>", content)
if m:
title = m.group(1)
main_d.callback(dict(url = url, title = title))
else:
main_d.callback(dict(url=url, title="{not-specified}"))
def failure(e, country):
print ".%s FAILED: %s" % (country, str(e))
main_d.callback(None)
first_step()
return main_d
def printResult(result, country):
if result:
print ".%s result: %s (%s)" % (country, result['url'], result['title'])
else:
print ".%s result: nothing found" % country
def runme():
all = []
for country in ["com", "pl", "nonexistant"]:
d = lookup(country, "Twisted")
d.addCallback(printResult, country)
all.append(d)
defer.DeferredList(all).addCallback(lambda _: reactor.stop())
reactor.callLater(0, runme)
reactor.run()
@defer.inlineCallbacks
def someFunction():
a = 1
b = yield deferredReturningFunction(a)
c = yield anotherDeferredReturningFunction(a, b)
defer.returnValue(c)
from twisted.internet import reactor, defer
from twisted.web.client import getPage
import re
@defer.inlineCallbacks
def lookup(country, search_term):
try:
query = "http://www.google.%s/search?q=%s" % (country,search_term)
content = yield getPage(query)
m = re.search('<div id="?res.*?href="(?P<url>http://[^"]+)"',
content, re.DOTALL)
if not m:
defer.returnValue(None)
url = m.group('url')
content = yield getPage(url)
m = re.search("<title>(.*?)</title>", content)
if m:
defer.returnValue(dict(url=url, title=m.group(1)))
else:
defer.returnValue(dict(url=url, title="{not-specified}"))
except Exception, e:
print ".%s FAILED: %s" % (country, str(e))
def printResult(result, country):
if result:
print ".%s result: %s (%s)" % (country, result['url'], result['title'])
else:
print ".%s result: nothing found" % country
def runme():
all = []
for country in ["com", "pl", "nonexistant"]:
d = lookup(country, "Twisted")
d.addCallback(printResult, country)
all.append(d)
defer.DeferredList(all).addCallback(lambda _: reactor.stop())
reactor.callLater(0, runme)
reactor.run()
import gevent
import re
import urllib
import gevent.pool
import gevent.monkey; gevent.monkey.patch_all(thread=False)
def lookup(country, search_term):
try:
query = "http://www.google.%s/search?q=%s" % (country,search_term)
content = urllib.urlopen(query).read()
m = re.search('<div id="?res.*?href="(?P<url>http://[^"]+)"',
content, re.DOTALL)
if not m:
return None
url = m.group('url')
content = urllib.urlopen(url).read()
m = re.search("<title>(.*?)</title>", content)
if m:
return dict(url=url, title=m.group(1))
else:
return dict(url=url, title="{not-specified}")
except Exception, e:
print ".%s FAILED: %s" % (country, str(e))
def runme():
all = gevent.pool.Group()
for country in ["com", "pl", "nonexistant"]:
all.spawn(lookup, country, "Twisted"))
all.join()
for result in all:
if result.value:
print ".%s result: %s (%s)" % (country, result.value['url'], result.value['title'])
else:
print ".%s result: nothing found" % country
runme()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment