Last active
December 27, 2015 08:29
-
-
Save alecthomas/7297077 to your computer and use it in GitHub Desktop.
A generic search query parser.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
# | |
# Copyright (C) 2009 Alec Thomas <[email protected]> | |
# All rights reserved. | |
# | |
# This software is licensed as described in the file COPYING, which | |
# you should have received as part of this distribution. | |
# | |
# Author: Alec Thomas <[email protected]> | |
"""Abstract query parser. | |
The query language is in the following form: | |
<term> <term> document must contain all of these terms | |
"some term" return documents matching this exact phrase | |
-<term> exclude documents containing this term | |
<term> or <term> return documents matching either term | |
<selector>:<term> return documents with term in the specified selector | |
""" | |
from compiler import ast, misc, pycodegen | |
import re | |
class QueryError(Exception): | |
"""Base exception for queries.""" | |
class InvalidQuery(QueryError): | |
"""An invalid query was encountered.""" | |
class QueryNode(object): | |
"""A query parse node. | |
>>> QueryNode(QueryNode.TERM, 'one') | |
("one") | |
>>> QueryNode(QueryNode.AND, | |
... left=QueryNode(QueryNode.TERM, 'one'), | |
... right=QueryNode(QueryNode.TERM, 'two')) | |
(and | |
("one") | |
("two")) | |
>>> QueryNode(QueryNode.NOT, left=QueryNode(QueryNode.TERM, 'one')) | |
(not | |
("one") | |
nil) | |
""" | |
# TODO Separate lexer and parser identifiers | |
NULL = 0 | |
TERM = 1 | |
NOT = 2 | |
AND = 3 | |
OR = 4 | |
SELECTOR = 5 | |
BEGINSUB = 6 | |
ENDSUB = 7 | |
_TYPE_MAP = {NULL: 'null', TERM: 'term', NOT: 'not', AND: 'and', OR: 'or', | |
SELECTOR: 'selector'} | |
__slots__ = ('type', 'value', 'left', 'right') | |
def __init__(self, type, value=None, left=None, right=None): | |
self.type = type | |
self.value = value | |
self.left = left | |
self.right = right | |
def __repr__(self): | |
def show(node, depth=0): | |
if node.type == QueryNode.TERM: | |
text = '%s("%s"' % (' ' * depth, node.value) | |
elif node.type == QueryNode.SELECTOR: | |
text = '%s(selector "%s"' % (' ' * depth, node.value) | |
else: | |
text = "%s(%s%s" % (' ' * depth, self._TYPE_MAP[node.type], | |
node.value and ' "%s"' % (node.value,) or | |
"") | |
if node.left or node.right: | |
text += "\n" | |
if node.left: | |
text += show(node.left, depth + 1) | |
else: | |
text += "%snil" % (' ' * (depth + 1)) | |
text += "\n" | |
if node.right: | |
text += show(node.right, depth + 1) | |
else: | |
text += "%snil" % (' ' * (depth + 1)) | |
text += ")" | |
return text | |
return show(self) | |
class Query(QueryNode): | |
"""Query parser. Converts a simple query language into a parse tree which | |
Indexers can then convert into their own implementation-specific | |
representation. | |
The query language is in the following form: | |
<term> <term> document must contain all of these terms | |
"some term" return documents matching this exact phrase | |
-<term> exclude documents containing this term | |
<term> or <term> return documents matching either term | |
<selector>:<term> return documents with term in the specified selector | |
eg. | |
>>> Query('lettuce tomato -cheese') | |
(and | |
("lettuce") | |
(and | |
("tomato") | |
(not | |
("cheese") | |
nil))) | |
>>> Query('"mint slices" -timtams') | |
(and | |
("mint slices") | |
(not | |
("timtams") | |
nil)) | |
>>> Query('"brie cheese" or "camembert cheese"') | |
(or | |
("brie cheese") | |
("camembert cheese")) | |
>>> Query('one two:(three or four)') | |
(and | |
("one") | |
(selector "two" | |
(or | |
("three") | |
("four")) | |
nil)) | |
""" | |
_TOKENISE_RE = re.compile(r""" | |
(?P<not>-)| | |
(?P<or>or)| | |
(?P<and>and)| | |
\"(?P<dquote>(?:\\.|[^\"])*)\"| | |
'(?P<squote>(?:\\.|[^'])*)'| | |
(?P<startsub>\()| | |
(?P<endsub>\))| | |
(?P<selector>:)| | |
(?P<term>\w+)""", re.I | re.X) | |
_GROUP_MAP = {'dquote': QueryNode.TERM, 'squote': QueryNode.TERM, | |
'term': QueryNode.TERM, 'not': QueryNode.NOT, | |
'or': QueryNode.OR, 'and': QueryNode.AND, | |
'selector': QueryNode.SELECTOR, | |
'startsub': QueryNode.BEGINSUB, | |
'endsub': QueryNode.ENDSUB} | |
def __init__(self, phrase): | |
QueryNode.__init__(self, QueryNode.NULL) | |
self._phrase = '' | |
if phrase is None or isinstance(phrase, QueryNode): | |
self._phrase = repr(phrase) | |
root = phrase | |
else: | |
self._phrase = phrase | |
tokens = self._tokenise(phrase) | |
root = self.parse(tokens) | |
if root: | |
# Make ourselves into the root node | |
for k in QueryNode.__slots__: | |
setattr(self, k, getattr(root, k)) | |
self._compiled = self._compile() | |
def __str__(self): | |
return self._phrase | |
def parse(self, tokens): | |
left = self.parse_unary(tokens) | |
while tokens: | |
if tokens[0][0] == QueryNode.ENDSUB: | |
return left | |
if tokens[0][0] == QueryNode.OR: | |
tokens.pop(0) | |
return QueryNode(QueryNode.OR, left=left, | |
right=self.parse(tokens)) | |
else: | |
if tokens[0][0] == QueryNode.AND: | |
tokens.pop(0) | |
return QueryNode(QueryNode.AND, left=left, | |
right=self.parse(tokens)) | |
return left | |
def parse_unary(self, tokens): | |
"""Parse a unary operator. Currently only NOT. | |
>>> q = Query('') | |
>>> q.parse_unary(q._tokenise('-foo:bar')) | |
(not | |
(selector "foo" | |
("bar") | |
nil) | |
nil) | |
>>> q.parse_unary(q._tokenise('-foo')) | |
(not | |
("foo") | |
nil) | |
""" | |
if not tokens: | |
return None | |
if tokens[0][0] == QueryNode.BEGINSUB: | |
tokens.pop(0) | |
node = self.parse(tokens) | |
if not tokens or tokens[0][0] != QueryNode.ENDSUB: | |
raise InvalidQuery('expected ) at end of sub-expression') | |
tokens.pop(0) | |
return node | |
if tokens[0][0] == QueryNode.NOT: | |
tokens.pop(0) | |
return QueryNode(QueryNode.NOT, left=self.parse_terminal(tokens)) | |
return self.parse_terminal(tokens) | |
def parse_terminal(self, tokens): | |
"""Parse a terminal token. | |
>>> q = Query('') | |
>>> q.parse_terminal(q._tokenise('foo')) | |
("foo") | |
""" | |
if not tokens: | |
raise InvalidQuery('unexpected end of string') | |
if tokens[0][0] == QueryNode.TERM and len(tokens) > 1 and tokens[1][0] == QueryNode.SELECTOR: | |
_, value = tokens.pop(0) | |
value = value.decode('string_escape') | |
tokens.pop(0) | |
return QueryNode(QueryNode.SELECTOR, value=value, left=self.parse_unary(tokens)) | |
elif tokens[0][0] in (QueryNode.TERM, QueryNode.OR): | |
_, value = tokens.pop(0) | |
value = value.decode('string_escape') | |
return QueryNode(QueryNode.TERM, value=value) | |
raise InvalidQuery('expected terminal, got "%s"' % tokens[0][1]) | |
def terms(self, exclude_not=True): | |
"""A generator returning the terms contained in the Query.""" | |
def _convert(node): | |
if not node: | |
return | |
if node.type == node.TERM: | |
yield node.value | |
elif node.type == node.NOT and exclude_not: | |
return | |
else: | |
for child in _convert(node.left): | |
yield child | |
for child in _convert(node.right): | |
yield child | |
return _convert(self) | |
def _apply(self, matcher, selector): | |
"""Recursive version of compiled expression matcher.""" | |
def _apply(node): | |
if not node or not node.type: | |
return True | |
if node.type == Query.TERM: | |
return matcher(node.value) | |
elif node.type == Query.AND: | |
return _apply(node.left) and _apply(node.right) | |
elif node.type == Query.OR: | |
return _apply(node.left) or _apply(node.right) | |
elif node.type == Query.NOT: | |
return not _apply(node.left) | |
elif node.type == Query.SELECTOR: | |
return selector(node.left, QueryNode(node.right)) | |
else: | |
raise NotImplementedError('query node type %s not implemented' | |
% node.type) | |
return _apply(self) | |
def _compile(self): | |
"""Compile the query into a single Python expression.""" | |
def _generate(node): | |
if not node or not node.type: | |
return ast.Name('True') | |
if node.type == Query.TERM: | |
return ast.CallFunc(ast.Name('matcher'), | |
[ast.Const(node.value)], None, None) | |
elif node.type == Query.AND: | |
return ast.And([_generate(node.left), _generate(node.right)]) | |
elif node.type == Query.OR: | |
return ast.Or([_generate(node.left), _generate(node.right)]) | |
elif node.type == Query.NOT: | |
return ast.Not(_generate(node.left)) | |
elif node.type == Query.SELECTOR: | |
return ast.CallFunc(ast.Name('selector'), | |
[ast.Const(node.left.value), | |
ast.Const(Query(node.right))], | |
None, None) | |
else: | |
raise NotImplementedError('query node type %s not implemented' | |
% node.type) | |
function = ast.Lambda(['matcher', 'selector'], [], 0, _generate(self)) | |
query_ast = ast.Expression(function) | |
misc.set_filename('<%s compiled query>' % self.__class__.__name__, | |
query_ast) | |
gen = pycodegen.ExpressionCodeGenerator(query_ast) | |
return eval(gen.getCode()) | |
def __call__(self, matcher, selector=None): | |
"""Match the query using callbacks. | |
The function accepts two callbacks, one for matching text and one for | |
matching selectors. | |
:param matcher: A callback in the form matcher(text). | |
:param selector: A callback in the form selector(name, expression). | |
:returns: True if the query matched. | |
""" | |
return self._compiled(matcher, selector or | |
self._unsupported_selector) | |
def _unsupported_selector(self, name, value): | |
return False | |
def as_string(self, and_=' ', or_=' OR ', not_='-'): | |
"""Convert Query to a boolean expression. | |
eg. "term AND term OR term AND NOT term" | |
The expanded operators can be customised for syntactical variations by | |
overriding the keyword arguments. | |
>>> Query('foo bar').as_string() | |
'foo bar' | |
>>> Query('foo bar or baz').as_string() | |
'foo bar OR baz' | |
>>> Query('foo -bar or baz').as_string() | |
'foo -bar OR baz' | |
""" | |
def _convert(node): | |
if not node or node.type is None: | |
return '' | |
if node.type == node.AND: | |
return '%s%s%s' % (_convert(node.left), and_, | |
_convert(node.right)) | |
elif node.type == node.OR: | |
return '%s%s%s' % (_convert(node.left), or_, | |
_convert(node.right)) | |
elif node.type == node.NOT: | |
return '%s%s' % (not_, _convert(node.left)) | |
elif node.type == node.TERM: | |
return node.value | |
else: | |
raise NotImplementedError(node.type) | |
return _convert(self) | |
# Internal methods | |
def _tokenise(self, phrase): | |
"""Tokenise a phrase string. | |
>>> q = Query('') | |
>>> q._tokenise('one') | |
[(1, 'one')] | |
>>> q._tokenise('one two') | |
[(1, 'one'), (1, 'two')] | |
>>> q._tokenise('one or two') | |
[(1, 'one'), (4, 'or'), (1, 'two')] | |
>>> q._tokenise('"one two"') | |
[(1, 'one two')] | |
>>> q._tokenise("'one two'") | |
[(1, 'one two')] | |
>>> q._tokenise('-one') | |
[(2, '-'), (1, 'one')] | |
""" | |
tokens = [(self._GROUP_MAP[token.lastgroup], | |
token.group(token.lastindex)) | |
for token in self._TOKENISE_RE.finditer(phrase)] | |
return tokens | |
class QueryVisitor(object): | |
"""A Query visitor.""" | |
def visit(self, node): | |
"""Visit all nodes in the parse tree. | |
:param node: Node to start visitiations at. | |
:returns: Output of visitation. | |
""" | |
return getattr(self, 'visit_' + Query._TYPE_MAP[node.type])(node) | |
def visit_null(self, node): | |
raise NotImplementedError | |
def visit_term(self, node): | |
raise NotImplementedError | |
def visit_not(self, node): | |
raise NotImplementedError | |
def visit_and(self, node): | |
raise NotImplementedError | |
def visit_or(self, node): | |
raise NotImplementedError | |
def visit_selector(self, node): | |
raise NotImplementedError |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment