Skip to content

Instantly share code, notes, and snippets.

@alecthomas
Last active December 27, 2015 08:29
Show Gist options
  • Save alecthomas/7297077 to your computer and use it in GitHub Desktop.
Save alecthomas/7297077 to your computer and use it in GitHub Desktop.
A generic search query parser.
# encoding: utf-8
#
# Copyright (C) 2009 Alec Thomas <[email protected]>
# All rights reserved.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution.
#
# Author: Alec Thomas <[email protected]>
"""Abstract query parser.
The query language is in the following form:
<term> <term> document must contain all of these terms
"some term" return documents matching this exact phrase
-<term> exclude documents containing this term
<term> or <term> return documents matching either term
<selector>:<term> return documents with term in the specified selector
"""
from compiler import ast, misc, pycodegen
import re
class QueryError(Exception):
"""Base exception for queries."""
class InvalidQuery(QueryError):
"""An invalid query was encountered."""
class QueryNode(object):
"""A query parse node.
>>> QueryNode(QueryNode.TERM, 'one')
("one")
>>> QueryNode(QueryNode.AND,
... left=QueryNode(QueryNode.TERM, 'one'),
... right=QueryNode(QueryNode.TERM, 'two'))
(and
("one")
("two"))
>>> QueryNode(QueryNode.NOT, left=QueryNode(QueryNode.TERM, 'one'))
(not
("one")
nil)
"""
# TODO Separate lexer and parser identifiers
NULL = 0
TERM = 1
NOT = 2
AND = 3
OR = 4
SELECTOR = 5
BEGINSUB = 6
ENDSUB = 7
_TYPE_MAP = {NULL: 'null', TERM: 'term', NOT: 'not', AND: 'and', OR: 'or',
SELECTOR: 'selector'}
__slots__ = ('type', 'value', 'left', 'right')
def __init__(self, type, value=None, left=None, right=None):
self.type = type
self.value = value
self.left = left
self.right = right
def __repr__(self):
def show(node, depth=0):
if node.type == QueryNode.TERM:
text = '%s("%s"' % (' ' * depth, node.value)
elif node.type == QueryNode.SELECTOR:
text = '%s(selector "%s"' % (' ' * depth, node.value)
else:
text = "%s(%s%s" % (' ' * depth, self._TYPE_MAP[node.type],
node.value and ' "%s"' % (node.value,) or
"")
if node.left or node.right:
text += "\n"
if node.left:
text += show(node.left, depth + 1)
else:
text += "%snil" % (' ' * (depth + 1))
text += "\n"
if node.right:
text += show(node.right, depth + 1)
else:
text += "%snil" % (' ' * (depth + 1))
text += ")"
return text
return show(self)
class Query(QueryNode):
"""Query parser. Converts a simple query language into a parse tree which
Indexers can then convert into their own implementation-specific
representation.
The query language is in the following form:
<term> <term> document must contain all of these terms
"some term" return documents matching this exact phrase
-<term> exclude documents containing this term
<term> or <term> return documents matching either term
<selector>:<term> return documents with term in the specified selector
eg.
>>> Query('lettuce tomato -cheese')
(and
("lettuce")
(and
("tomato")
(not
("cheese")
nil)))
>>> Query('"mint slices" -timtams')
(and
("mint slices")
(not
("timtams")
nil))
>>> Query('"brie cheese" or "camembert cheese"')
(or
("brie cheese")
("camembert cheese"))
>>> Query('one two:(three or four)')
(and
("one")
(selector "two"
(or
("three")
("four"))
nil))
"""
_TOKENISE_RE = re.compile(r"""
(?P<not>-)|
(?P<or>or)|
(?P<and>and)|
\"(?P<dquote>(?:\\.|[^\"])*)\"|
'(?P<squote>(?:\\.|[^'])*)'|
(?P<startsub>\()|
(?P<endsub>\))|
(?P<selector>:)|
(?P<term>\w+)""", re.I | re.X)
_GROUP_MAP = {'dquote': QueryNode.TERM, 'squote': QueryNode.TERM,
'term': QueryNode.TERM, 'not': QueryNode.NOT,
'or': QueryNode.OR, 'and': QueryNode.AND,
'selector': QueryNode.SELECTOR,
'startsub': QueryNode.BEGINSUB,
'endsub': QueryNode.ENDSUB}
def __init__(self, phrase):
QueryNode.__init__(self, QueryNode.NULL)
self._phrase = ''
if phrase is None or isinstance(phrase, QueryNode):
self._phrase = repr(phrase)
root = phrase
else:
self._phrase = phrase
tokens = self._tokenise(phrase)
root = self.parse(tokens)
if root:
# Make ourselves into the root node
for k in QueryNode.__slots__:
setattr(self, k, getattr(root, k))
self._compiled = self._compile()
def __str__(self):
return self._phrase
def parse(self, tokens):
left = self.parse_unary(tokens)
while tokens:
if tokens[0][0] == QueryNode.ENDSUB:
return left
if tokens[0][0] == QueryNode.OR:
tokens.pop(0)
return QueryNode(QueryNode.OR, left=left,
right=self.parse(tokens))
else:
if tokens[0][0] == QueryNode.AND:
tokens.pop(0)
return QueryNode(QueryNode.AND, left=left,
right=self.parse(tokens))
return left
def parse_unary(self, tokens):
"""Parse a unary operator. Currently only NOT.
>>> q = Query('')
>>> q.parse_unary(q._tokenise('-foo:bar'))
(not
(selector "foo"
("bar")
nil)
nil)
>>> q.parse_unary(q._tokenise('-foo'))
(not
("foo")
nil)
"""
if not tokens:
return None
if tokens[0][0] == QueryNode.BEGINSUB:
tokens.pop(0)
node = self.parse(tokens)
if not tokens or tokens[0][0] != QueryNode.ENDSUB:
raise InvalidQuery('expected ) at end of sub-expression')
tokens.pop(0)
return node
if tokens[0][0] == QueryNode.NOT:
tokens.pop(0)
return QueryNode(QueryNode.NOT, left=self.parse_terminal(tokens))
return self.parse_terminal(tokens)
def parse_terminal(self, tokens):
"""Parse a terminal token.
>>> q = Query('')
>>> q.parse_terminal(q._tokenise('foo'))
("foo")
"""
if not tokens:
raise InvalidQuery('unexpected end of string')
if tokens[0][0] == QueryNode.TERM and len(tokens) > 1 and tokens[1][0] == QueryNode.SELECTOR:
_, value = tokens.pop(0)
value = value.decode('string_escape')
tokens.pop(0)
return QueryNode(QueryNode.SELECTOR, value=value, left=self.parse_unary(tokens))
elif tokens[0][0] in (QueryNode.TERM, QueryNode.OR):
_, value = tokens.pop(0)
value = value.decode('string_escape')
return QueryNode(QueryNode.TERM, value=value)
raise InvalidQuery('expected terminal, got "%s"' % tokens[0][1])
def terms(self, exclude_not=True):
"""A generator returning the terms contained in the Query."""
def _convert(node):
if not node:
return
if node.type == node.TERM:
yield node.value
elif node.type == node.NOT and exclude_not:
return
else:
for child in _convert(node.left):
yield child
for child in _convert(node.right):
yield child
return _convert(self)
def _apply(self, matcher, selector):
"""Recursive version of compiled expression matcher."""
def _apply(node):
if not node or not node.type:
return True
if node.type == Query.TERM:
return matcher(node.value)
elif node.type == Query.AND:
return _apply(node.left) and _apply(node.right)
elif node.type == Query.OR:
return _apply(node.left) or _apply(node.right)
elif node.type == Query.NOT:
return not _apply(node.left)
elif node.type == Query.SELECTOR:
return selector(node.left, QueryNode(node.right))
else:
raise NotImplementedError('query node type %s not implemented'
% node.type)
return _apply(self)
def _compile(self):
"""Compile the query into a single Python expression."""
def _generate(node):
if not node or not node.type:
return ast.Name('True')
if node.type == Query.TERM:
return ast.CallFunc(ast.Name('matcher'),
[ast.Const(node.value)], None, None)
elif node.type == Query.AND:
return ast.And([_generate(node.left), _generate(node.right)])
elif node.type == Query.OR:
return ast.Or([_generate(node.left), _generate(node.right)])
elif node.type == Query.NOT:
return ast.Not(_generate(node.left))
elif node.type == Query.SELECTOR:
return ast.CallFunc(ast.Name('selector'),
[ast.Const(node.left.value),
ast.Const(Query(node.right))],
None, None)
else:
raise NotImplementedError('query node type %s not implemented'
% node.type)
function = ast.Lambda(['matcher', 'selector'], [], 0, _generate(self))
query_ast = ast.Expression(function)
misc.set_filename('<%s compiled query>' % self.__class__.__name__,
query_ast)
gen = pycodegen.ExpressionCodeGenerator(query_ast)
return eval(gen.getCode())
def __call__(self, matcher, selector=None):
"""Match the query using callbacks.
The function accepts two callbacks, one for matching text and one for
matching selectors.
:param matcher: A callback in the form matcher(text).
:param selector: A callback in the form selector(name, expression).
:returns: True if the query matched.
"""
return self._compiled(matcher, selector or
self._unsupported_selector)
def _unsupported_selector(self, name, value):
return False
def as_string(self, and_=' ', or_=' OR ', not_='-'):
"""Convert Query to a boolean expression.
eg. "term AND term OR term AND NOT term"
The expanded operators can be customised for syntactical variations by
overriding the keyword arguments.
>>> Query('foo bar').as_string()
'foo bar'
>>> Query('foo bar or baz').as_string()
'foo bar OR baz'
>>> Query('foo -bar or baz').as_string()
'foo -bar OR baz'
"""
def _convert(node):
if not node or node.type is None:
return ''
if node.type == node.AND:
return '%s%s%s' % (_convert(node.left), and_,
_convert(node.right))
elif node.type == node.OR:
return '%s%s%s' % (_convert(node.left), or_,
_convert(node.right))
elif node.type == node.NOT:
return '%s%s' % (not_, _convert(node.left))
elif node.type == node.TERM:
return node.value
else:
raise NotImplementedError(node.type)
return _convert(self)
# Internal methods
def _tokenise(self, phrase):
"""Tokenise a phrase string.
>>> q = Query('')
>>> q._tokenise('one')
[(1, 'one')]
>>> q._tokenise('one two')
[(1, 'one'), (1, 'two')]
>>> q._tokenise('one or two')
[(1, 'one'), (4, 'or'), (1, 'two')]
>>> q._tokenise('"one two"')
[(1, 'one two')]
>>> q._tokenise("'one two'")
[(1, 'one two')]
>>> q._tokenise('-one')
[(2, '-'), (1, 'one')]
"""
tokens = [(self._GROUP_MAP[token.lastgroup],
token.group(token.lastindex))
for token in self._TOKENISE_RE.finditer(phrase)]
return tokens
class QueryVisitor(object):
"""A Query visitor."""
def visit(self, node):
"""Visit all nodes in the parse tree.
:param node: Node to start visitiations at.
:returns: Output of visitation.
"""
return getattr(self, 'visit_' + Query._TYPE_MAP[node.type])(node)
def visit_null(self, node):
raise NotImplementedError
def visit_term(self, node):
raise NotImplementedError
def visit_not(self, node):
raise NotImplementedError
def visit_and(self, node):
raise NotImplementedError
def visit_or(self, node):
raise NotImplementedError
def visit_selector(self, node):
raise NotImplementedError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment