Skip to content

Instantly share code, notes, and snippets.

@gregplaysguitar
Last active January 29, 2018 07:54
Show Gist options
  • Save gregplaysguitar/7c9d7db6b4c90e9bafd1b6f7fa7ff248 to your computer and use it in GitHub Desktop.
Save gregplaysguitar/7c9d7db6b4c90e9bafd1b6f7fa7ff248 to your computer and use it in GitHub Desktop.
Postgres full-text search utils for django
# -*- coding: utf-8 -*-
"""
To enable (and test) the unaccent function, as a superuser:
\connect DATABASE
CREATE EXTENSION unaccent;
SELECT unaccent('èéêë');
Example
-------
Create a blog app with the following `models.py`:
from django.db import models
class Post(models.Model):
title = models.CharField(max_length=100)
text = models.TextField()
author = models.ForeignKey('auth.User')
Create the initial migration, then an empty one to add the index:
> ./manage.py makemigrations blog
> ./manage.py makemigrations blog --empty --name ft_index
Update `blog/migrations/0002_ft_index.py` like so:
from django.db import migrations
from utilities.postgres_ft import create_fulltext
AUTHOR_NAME = "array_to_string(array(" \
"SELECT concat(first_name, ' ', last_name) FROM auth_user " \
"WHERE auth_user.id=e.author_id), ' ')"
class Migration(migrations.Migration):
dependencies = [
('blog', '0001_initial'),
]
operations = [
create_fulltext('blog_post', ['title', 'text'],
[AUTHOR_NAME]),
]
Example search query:
from utilities.postgres_ft import sanitize_search_term
from .models import Post
query = sanitize_search_term(request.GET.get('q', ''))
qs = Post.objects.extra(
where=["fts_document @@ to_tsquery('english', unaccent(%s))"],
params=[query]
)
"""
import re
import string
from django.db import migrations
from django.template import Template, Context
BACKWARDS = """
DROP INDEX {{ table_name }}_{{ index_name }}_index;
DROP TRIGGER {{ table_name }}_{{ index_name }}_insert_trigger
ON {{ table_name }};
DROP TRIGGER {{ table_name }}_{{ index_name }}_update_trigger
ON {{ table_name }};
DROP FUNCTION {{ table_name }}_{{ index_name }}_trigger();
DROP FUNCTION {{ table_name }}_{{ index_name }}(e {{ table_name }});
ALTER TABLE {{ table_name }} DROP COLUMN {{ index_name }} RESTRICT;
"""
INDEX_FUNCTION = """
CREATE OR REPLACE FUNCTION {{ table_name }}_{{ index_name }}(e {{ table_name }})
RETURNS tsvector AS $$
DECLARE
{{ table_name }}_document TEXT;
BEGIN
SELECT concat_ws(' ',
{% for f in fields %}e.{{ f }}, {% endfor %}
{% for extra in extras %}{{ extra|safe }}, {% endfor %}
'')
INTO {{ table_name }}_document;
RETURN to_tsvector('pg_catalog.english', unaccent({{ table_name }}_document));
END;
$$ LANGUAGE plpgsql;
"""
UPDATE = """
UPDATE {{ table_name }}
SET {{ index_name }}={{ table_name }}_{{ index_name }}({{ table_name }});
"""
FORWARDS = """
ALTER TABLE {{ table_name }} ADD COLUMN {{ index_name }} tsvector;
""" + INDEX_FUNCTION + """
CREATE FUNCTION {{ table_name }}_{{ index_name }}_trigger()
RETURNS TRIGGER AS $$
BEGIN
NEW.{{ index_name }}={{ table_name }}_{{ index_name }}(NEW);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER {{ table_name }}_{{ index_name }}_update_trigger
BEFORE UPDATE ON {{ table_name }} FOR EACH ROW
EXECUTE PROCEDURE {{ table_name }}_{{ index_name }}_trigger();
CREATE TRIGGER {{ table_name }}_{{ index_name }}_insert_trigger
BEFORE INSERT ON {{ table_name }} FOR EACH ROW
EXECUTE PROCEDURE {{ table_name }}_{{ index_name }}_trigger();
CREATE INDEX {{ table_name }}_{{ index_name }}_index ON {{ table_name }}
USING gin({{ index_name }});
""" + UPDATE
def create_fulltext(table_name, fields, extras=[], index_name='fts_document'):
"""Returns a `django.db.migrations.RunSQL` instance which creates a
fulltext index for a django model in a field called index_name.
table_name and fields refer to the indexed model; extras is a list of
subqueries returning related data to be included in the index. The
primary table is aliased "e" for reference in the subquery, i.e.
array_to_string(array(
SELECT name from author where author.id=e.author_id), ' ')
"""
ctx = Context({
'table_name': table_name,
'fields': fields,
'extras': extras,
'index_name': index_name,
})
return migrations.RunSQL(Template(FORWARDS).render(ctx),
Template(BACKWARDS).render(ctx))
def update_fulltext(table_name, fields, extras=[], index_name='fts_document'):
"""Returns a `django.db.migrations.RunSQL` instance which updates a
fulltext index created by the create_fulltext function. Arguments as per
create_fulltext
"""
ctx = Context({
'table_name': table_name,
'fields': fields,
'extras': extras,
'index_name': index_name,
})
return migrations.RunSQL(Template(INDEX_FUNCTION + UPDATE).render(ctx))
def sanitize_search_term(term, prefix=False):
# Replace all puncuation with spaces.
allowed_punctuation = set(['&', '|', '"', "'"])
all_punctuation = set(string.punctuation)
punctuation = "".join(all_punctuation - allowed_punctuation)
term = re.sub(r"[{}]+".format(re.escape(punctuation)), " ", term)
# Substitute all double quotes with single quotes.
term = term.replace('"', "'")
term = re.sub(r"[']+", "'", term)
# Create regex to find strings within quotes.
quoted_strings_re = re.compile(r"('[^']*')")
space_between_words_re = re.compile(r'([^ &|])[ ]+([^ &|])')
spaces_surrounding_letter_re = re.compile(r'[ ]+([^ &|])[ ]+')
multiple_operator_re = re.compile(r"[ &]+(&|\|)[ &]+")
tokens = quoted_strings_re.split(term)
processed_tokens = []
for token in tokens:
# Remove all surrounding whitespace.
token = token.strip()
if token in ['', "'"]:
continue
if token[0] != "'":
# Surround single letters with &'s
token = spaces_surrounding_letter_re.sub(r' & \1 & ', token)
# Specify '&' between words that have neither | or & specified.
token = space_between_words_re.sub(r'\1 & \2', token)
if prefix:
# Add a prefix wildcard to every search term.
token = re.sub(r'([^ &|]+)', r'\1:*', token)
processed_tokens.append(token)
term = " & ".join(processed_tokens)
# Replace ampersands or pipes surrounded by ampersands.
term = multiple_operator_re.sub(r" \1 ", term)
# Escape single quotes
return term.replace("'", "''")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment