Skip to content

Instantly share code, notes, and snippets.

@cdunklau
Last active August 29, 2015 14:08
Show Gist options
  • Select an option

  • Save cdunklau/3750dac0096fe4c2963e to your computer and use it in GitHub Desktop.

Select an option

Save cdunklau/3750dac0096fe4c2963e to your computer and use it in GitHub Desktop.
SQLite3-backed mutable set
import sqlite3
import collections
class SQLiteSet(collections.MutableSet):
def __init__(self, database_filename):
self._filename = database_filename
self._con = sqlite3.connect(database_filename)
self._con.execute(
'CREATE TABLE IF NOT EXISTS string (string text UNIQUE)')
def __contains__(self, string):
c = self._con.cursor()
c.execute('SELECT string FROM string WHERE string = ?', (string,))
return bool(c.fetchall())
def __iter__(self):
c = self._con.cursor()
c.execute('SELECT string FROM string')
return iter([row[0] for row in c])
def __len__(self):
c = self._con.cursor()
c.execute('SELECT count(*) FROM string')
return c.fetchone()[0]
def add(self, string):
c = self._con.cursor()
with self._con:
if string not in self:
c.execute('INSERT INTO string (string) VALUES (?)', (string,))
def discard(self, string):
c = self._con.cursor()
with self._con:
c.execute('DELETE FROM string WHERE string = ?', (string,))
def __repr__(self):
return '<SQLiteSet {0} {1}>'.format(self._filename, list(iter(self)))
import os
import shutil
import unittest
import tempfile
import sqlite3
from sqliteset import SQLiteSet
class ManualFilesTestCase(unittest.TestCase):
def test_makes_new_database(self):
tempdir = tempfile.mkdtemp(prefix='sqliteset_test')
self.addCleanup(shutil.rmtree, tempdir)
filename = os.path.join(tempdir, 'test.db')
theset = SQLiteSet(filename)
con = sqlite3.connect(filename)
cursor = con.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
rows = cursor.fetchall()
self.assertEqual(rows, [('string',)])
cursor.execute("PRAGMA table_info('string')")
rows = cursor.fetchall()
self.assertEqual(rows, [(0, u'string', u'text', 0, None, 0)])
def test_does_not_clobber_existing(self):
tempdir = tempfile.mkdtemp(prefix='sqliteset_test')
self.addCleanup(shutil.rmtree, tempdir)
filename = os.path.join(tempdir, 'test.db')
con = sqlite3.connect(filename)
cursor = con.cursor()
cursor.execute('CREATE TABLE string (string text UNIQUE)')
con.commit()
cursor.execute('INSERT INTO string (string) VALUES ("mystring")')
con.commit()
theset = SQLiteSet(filename)
cursor.execute('SELECT string FROM string')
rows = cursor.fetchall()
self.assertEqual(rows, [('mystring',)])
class FunctionalityTestCase(unittest.TestCase):
def test_add(self):
theset = SQLiteSet(':memory:')
theset.add('foo')
theset.add('bar')
self.assertEqual(theset, {'bar', 'foo'})
def test_add_nonunique(self):
theset = SQLiteSet(':memory:')
theset.add('foo')
theset.add('foo')
self.assertIn('foo', theset)
self.assertEquals(theset, set(['foo']))
self.assertEquals(list(theset), ['foo'])
def test_contains(self):
theset = SQLiteSet(':memory:')
theset.add('foo')
theset.add('bar')
self.assertIn('foo', theset)
self.assertIn('bar', theset)
self.assertNotIn('bogus', theset)
def test_discard(self):
theset = SQLiteSet(':memory:')
theset.add('foo')
theset.add('bar')
theset.discard('foo')
self.assertEqual(theset, {'bar'})
def test_discard_nonexistant(self):
theset = SQLiteSet(':memory:')
theset.add('foo')
theset.discard('bogus')
self.assertEqual(theset, set(['foo']))
class PersistanceTestCase(unittest.TestCase):
def setUp(self):
self.filename = tempfile.mkstemp(prefix='sqliteset_test')
def tearDown(self):
os.remove(self.filename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment