Skip to content

Instantly share code, notes, and snippets.

@qingfeng
Forked from Cairnarvon/banned.py
Created November 18, 2012 09:13
Show Gist options
  • Save qingfeng/4104336 to your computer and use it in GitHub Desktop.
Save qingfeng/4104336 to your computer and use it in GitHub Desktop.
Web service that keeps track of bad images using perceptual hashes and a bloom filter, to be used by imageboards and the like. (Proof of concept.)
#!/usr/bin/python
import argparse
import cgi
import math
import os
import sys
from PIL import Image
import pybloomfilter # https://github.com/axiak/pybloomfiltermmap
def phash(im):
"""Basic DCT phash."""
im = im.resize((32, 32)).convert('L')
seq = [sum(im.getpixel((x, y)) *
math.cos(math.pi / 32 * (x + .5) * u) *
math.cos(math.pi / 32 * (y + .5) * v)
for x in range(32) for y in range(32))
for v in range(8) for u in range(8)]
avg = sum(seq[1:]) / (len(seq) - 1)
return reduce(lambda x, (y, z): x | (z << y),
enumerate(map(lambda i: 0 if i < avg else 1, seq)),
0)
def run(environ, start_response):
"""WSGI app. Lication."""
form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ)
# If we didn't get an image, display a basic form.
if 'image' not in form:
start_response('200 Whatever', [('Content-Type', 'text/html')])
return """\
<form action="/" method="post" enctype="multipart/form-data">
<p>
<input type="file" name="image" accept="image/*" value="k" />
</p>
<p>
<input type="radio" name="add" value="0" checked
onclick="document.getElementById('pw').style.visibility='hidden'"
checked
/> Check image
</p>
<p>
<input type="radio" name="add" value="1"
onclick="document.getElementById('pw').style.visibility='visible'"
/> Add image</div></p>
""" + ("""\
<p id="pw" style="visibility: hidden">
Password: <input type="text" name="pass" />
</p>
""" if opts.password is not None else "") + """\
<p>
<input type="submit" value="Go" />
</p>
</form>
"""
# If the user is trying to add a new entry, ensure he's allowed to.
if form.getfirst('add') == '1' and opts.password is not None and \
form.getfirst('pass') != opts.password:
start_response('401 Naughty', [('Content-Type', 'text/plain')])
return "Bad password."
# Calculate the phash.
try:
image = Image.open(form['image'].file)
except Exception, e:
start_response('400 Not Okay', [('Content-Type', 'text/plain')])
return "Your input could be better than it is. (%s)" % str(e)
h = phash(image)
# Add the phash to the bloom filter if that's what we're doing.
if form.getfirst('add') == '1':
bloomf.add(h)
start_response('200 k', [('Content-Type', 'text/plain')])
return "Added."
# Check if it's already there otherwise.
if h in bloomf:
start_response('200 k', [('Content-Type', 'text/plain')])
return "YAS IS AWFUL"
else:
start_response('404 Fa No Fa', [('Content-Type', 'text/plain')])
return "NOE IS FINE"
if __name__ == '__main__':
global bloomf, opts
# Parse options.
argp = argparse.ArgumentParser(description="""\
This is a web service to keep track of banned images.
It calculates perceptual hashes of images uploaded to it (to be able
to identify images even if they're slightly modified, and to obviate
the need for storing CP on your server) and stores those in a bloom
filter for fast membership determination.
""")
argp.add_argument('bloomfile', nargs='?', default='phash.dat',
help='file to back bloom filter.')
argp.add_argument('--password',
help='password required to add new entries (if any).')
argp.add_argument('--port', type=int, default=8080,
help='port on which to run the web server.')
opts = argp.parse_args()
# Initialise bloom filter.
if os.path.exists(opts.bloomfile):
bloomf = pybloomfilter.BloomFilter.open(opts.bloomfile)
else:
bloomf = pybloomfilter.BloomFilter(10000000, 0.0001, opts.bloomfile)
# Serve web.
if 'REQUEST_METHOD' in os.environ:
from wsgiref.handlers import BaseCGIHandler
BaseCGIHandler(sys.stdin, sys.stdout, sys.stderr, os.environ).run(run)
else:
from wsgiref.simple_server import WSGIServer, WSGIRequestHandler
httpd = WSGIServer(('', opts.port), WSGIRequestHandler)
httpd.set_app(run)
print "Serving on http://%s:%s" % httpd.socket.getsockname()
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment