Created
September 1, 2010 10:08
-
-
Save micktwomey/560488 to your computer and use it in GitHub Desktop.
Generic URI parsing for python with a ZODB example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Generic URI parsing | |
Intended to be more generally applicable than urlparse. | |
""" | |
import cgi | |
import re | |
import urllib | |
URI_RE = re.compile(r""" | |
(?P<scheme>[^:]+): # file: or http: | |
( | |
(// # match :// schemes | |
( | |
(?P<username>[^:@]+) # username:password or username@ | |
(:)? # optional : between username and password | |
(?P<password>[^@]+)? # :password | |
@)? # username and password if there is an @ | |
(?P<hostname>[^/:?#;]*) # foo.com | |
(:(?P<port>[0-9]+))? # 8080 | |
(?P<path>/[^?#;]*)? # /foo/bar | |
(;(?P<params>[^?#]+))? # ;params | |
(\?(?P<query>[^#]*))? # ?foo=bar&ham=spam | |
(\#(?P<fragment>.*))? # #fragment | |
) | |
| | |
(?P<value>.*) # for other schemes just return a value | |
) | |
""", re.VERBOSE) | |
class URIParseError(Exception): | |
def __init__(self, uri): | |
self.uri = uri | |
def __str__(self): | |
return "Can't parse %r" % self.uri | |
def uriparse(uri, flatten_query=False, unquote=True): | |
"""Parse a URI and return a dictionary of the uri parts | |
See RFC2396. | |
:param uri: The URI string to parse, it is assumed this has already been | |
decoded and escaped. | |
:param flatten_query: If this is True the dict for the query results is | |
a flat key -> val instead of key -> [val1, val2, ...]. | |
:param unquote: If this is True then the uri will be unquoted first. | |
(e.g. %20 -> space). | |
:returns: A dictionary of the URI parts. | |
Keys returned: | |
* scheme: The URI's scheme (e.g. http or file). | |
* username: Username for authentication. | |
* password: Password for authentication. | |
* hostname: The hostname. | |
* port: Port on host, as an int. | |
* path: Path to the resource. | |
* params: Params portion of the URI (i.e. ;params). | |
* query: Query params passed in the URI, as a dictionary (i.e. ?ham=spam). | |
* fragment: Fragment identifier (i.e. #fragment). | |
If a URI without the :// after the scheme is used (e.g. mailto:) then only | |
the value is returned. i.e. keys scheme and value are returned | |
Examples: | |
>>> parts = uriparse("http://www.example.com/mypage.html") | |
>>> parts["scheme"] | |
'http' | |
>>> parts["hostname"] | |
'www.example.com' | |
>>> parts["path"] | |
'/mypage.html' | |
>>> parts = uriparse("myscheme://example.com:1234?ham=spam") | |
>>> parts["scheme"] | |
'myscheme' | |
>>> parts["hostname"] | |
'example.com' | |
>>> parts["port"] | |
1234 | |
>>> parts["query"] | |
{'ham': ['spam']} | |
Use flatten_query to flatten out the dict. This should be used with some | |
caution as information can be lost. | |
>>> parts = uriparse("myscheme://example.com:1234?ham=spam", flatten_query=True) | |
>>> parts["query"] | |
{'ham': 'spam'} | |
>>> parts = uriparse("myscheme://example.com:1234?ham=spam&ham=eggs", flatten_query=True) | |
>>> parts["query"] | |
{'ham': 'spam'} | |
>>> parts = uriparse("mailto:[email protected]") | |
>>> parts["scheme"] | |
'mailto' | |
>>> parts["value"] | |
'[email protected]' | |
""" | |
if unquote: | |
uri = urllib.unquote(uri) | |
m = URI_RE.match(uri) | |
if m is None: | |
raise URIParseError(uri) | |
parts = m.groupdict() | |
if parts["port"] is not None: | |
parts["port"] = int(parts["port"]) | |
if parts["query"] is not None: | |
parts["query"] = cgi.parse_qs(parts["query"]) | |
if flatten_query: | |
parts["query"] = dict((key, val[0]) for key, val in parts["query"].items()) | |
return parts | |
def selftest(): | |
"""Runs a series of tests on the module | |
""" | |
import doctest | |
doctest.testmod() | |
for uri, expected in ( | |
("file:///tmp/foo.fs", dict(scheme="file", path="/tmp/foo.fs")), | |
("file:///tmp/foo.fs?foo=bar&ham=spam", dict(scheme="file", path="/tmp/foo.fs", query={"foo": ["bar"], "ham": ["spam"]})), | |
("zeo://zeo.example.com", dict(scheme="zeo", hostname="zeo.example.com")), | |
("zeo://zeo.example.com;someparams", dict(scheme="zeo", hostname="zeo.example.com", params="someparams")), | |
("zeo://zeo.example.com;someparams?foo=bar", dict(scheme="zeo", hostname="zeo.example.com", params="someparams", query={"foo": ["bar"]})), | |
("zeo://zeo.example.com;someparams?foo=bar#fragment", dict(scheme="zeo", hostname="zeo.example.com", params="someparams", query={"foo": ["bar"]}, fragment="fragment")), | |
("zeo://zeo.example.com:8074/", dict(scheme="zeo", hostname="zeo.example.com", port=8074, path="/")), | |
("zeo://zeo.example.com:8074", dict(scheme="zeo", hostname="zeo.example.com", port=8074)), | |
("zeo://zeo.example.com:8074/1", dict(scheme="zeo", hostname="zeo.example.com", port=8074, path="/1")), | |
("zeo://zeo.example.com:8074/1/", dict(scheme="zeo", hostname="zeo.example.com", port=8074, path="/1/")), | |
("zeo://zeo.example.com:8074/1/#bar", dict(scheme="zeo", hostname="zeo.example.com", port=8074, path="/1/", fragment="bar")), | |
("zeo://zeo.example.com:8074?client=zope&var=/tmp", dict(scheme="zeo", hostname="zeo.example.com", port=8074, query={"client": ["zope"], "var":["/tmp"]})), | |
("http://example.com/mydoc.html#contents", dict(scheme="http", hostname="example.com", path="/mydoc.html", fragment="contents")), | |
("http://[email protected]/mydoc.html#contents", dict(scheme="http", hostname="example.com", username="user", path="/mydoc.html", fragment="contents")), | |
("http://user:[email protected]/mydoc.html#contents", dict(scheme="http", hostname="example.com", username="user", password="pass", path="/mydoc.html", fragment="contents")), | |
("mailto:[email protected]", dict(scheme="mailto", value="[email protected]")), | |
("ftp://ftp.is.co.za/rfc/rfc1808.txt", dict(scheme="ftp", hostname="ftp.is.co.za", path="/rfc/rfc1808.txt")), | |
("gopher://spinaltap.micro.umn.edu/00/Weather/California/Los%20Angeles", dict(scheme="gopher", hostname="spinaltap.micro.umn.edu", path="/00/Weather/California/Los Angeles")), | |
): | |
results = uriparse(uri) | |
try: | |
for key, val in expected.items(): | |
assert results[key] == val, "results[%r] = %r != %r" % (key, results[key], val) | |
for key in results: | |
if key not in expected: | |
assert (results[key] is None) or (results[key] == ""), "Expecting nothing for %r, got %r" % (key, results[key]) | |
except AssertionError, e: | |
print e | |
print uri, results | |
if __name__ == "__main__": | |
selftest() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""ZODB and ZEO related utilities | |
""" | |
import logging | |
import optparse | |
import re | |
from ZODB import DB | |
from ZODB.FileStorage import FileStorage | |
from ZEO.ClientStorage import ClientStorage | |
import transaction | |
from uriparse import uriparse | |
def connect(uri): | |
"""Given a uri returns a storage connection | |
file:///tmp/cooking.fs for a file storage | |
zeo://fermat.renre.com:8074/2 for zeo to instance 2 | |
zeo://fermat.renre.com:8074?client=gps&var=/tmp with params | |
This saves faffing around with zodb conf files or ClientStorage vs | |
FileStorage. | |
""" | |
parts = uriparse(uri, flatten_query=True) | |
if parts["scheme"] == "file": | |
return FileStorage(parts["path"]) | |
if parts["scheme"] == "zeo": | |
params = dict(parts["query"] or {}) | |
if parts["path"]: | |
params["storage"] = re.search(r"([0-9]+)", parts["path"]).groups()[0] | |
return ClientStorage((parts["hostname"], parts["port"] or 8074), **params) | |
if __name__ == "__main__": | |
parser = optparse.OptionParser() | |
options, args = parser.parse_args() | |
logging.basicConfig() | |
uri = args[0] | |
local_ns = dict(storage=connect(uri)) | |
local_ns["db"] = DB(local_ns["storage"]) | |
local_ns["connection"] = local_ns["db"].open() | |
local_ns["root"] = local_ns["connection"].root() | |
local_ns["transaction"] = transaction | |
try: | |
transaction.begin() | |
from IPython.Shell import IPShellEmbed | |
IPShellEmbed([])(local_ns=local_ns) | |
finally: | |
transaction.abort() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment