Last active
August 29, 2015 14:08
-
-
Save cdunklau/3750dac0096fe4c2963e to your computer and use it in GitHub Desktop.
SQLite3-backed mutable set
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| *.py[co] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sqlite3 | |
| import collections | |
| class SQLiteSet(collections.MutableSet): | |
| def __init__(self, database_filename): | |
| self._filename = database_filename | |
| self._con = sqlite3.connect(database_filename) | |
| self._con.execute( | |
| 'CREATE TABLE IF NOT EXISTS string (string text UNIQUE)') | |
| def __contains__(self, string): | |
| c = self._con.cursor() | |
| c.execute('SELECT string FROM string WHERE string = ?', (string,)) | |
| return bool(c.fetchall()) | |
| def __iter__(self): | |
| c = self._con.cursor() | |
| c.execute('SELECT string FROM string') | |
| return iter([row[0] for row in c]) | |
| def __len__(self): | |
| c = self._con.cursor() | |
| c.execute('SELECT count(*) FROM string') | |
| return c.fetchone()[0] | |
| def add(self, string): | |
| c = self._con.cursor() | |
| with self._con: | |
| if string not in self: | |
| c.execute('INSERT INTO string (string) VALUES (?)', (string,)) | |
| def discard(self, string): | |
| c = self._con.cursor() | |
| with self._con: | |
| c.execute('DELETE FROM string WHERE string = ?', (string,)) | |
| def __repr__(self): | |
| return '<SQLiteSet {0} {1}>'.format(self._filename, list(iter(self))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import shutil | |
| import unittest | |
| import tempfile | |
| import sqlite3 | |
| from sqliteset import SQLiteSet | |
| class ManualFilesTestCase(unittest.TestCase): | |
| def test_makes_new_database(self): | |
| tempdir = tempfile.mkdtemp(prefix='sqliteset_test') | |
| self.addCleanup(shutil.rmtree, tempdir) | |
| filename = os.path.join(tempdir, 'test.db') | |
| theset = SQLiteSet(filename) | |
| con = sqlite3.connect(filename) | |
| cursor = con.cursor() | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") | |
| rows = cursor.fetchall() | |
| self.assertEqual(rows, [('string',)]) | |
| cursor.execute("PRAGMA table_info('string')") | |
| rows = cursor.fetchall() | |
| self.assertEqual(rows, [(0, u'string', u'text', 0, None, 0)]) | |
| def test_does_not_clobber_existing(self): | |
| tempdir = tempfile.mkdtemp(prefix='sqliteset_test') | |
| self.addCleanup(shutil.rmtree, tempdir) | |
| filename = os.path.join(tempdir, 'test.db') | |
| con = sqlite3.connect(filename) | |
| cursor = con.cursor() | |
| cursor.execute('CREATE TABLE string (string text UNIQUE)') | |
| con.commit() | |
| cursor.execute('INSERT INTO string (string) VALUES ("mystring")') | |
| con.commit() | |
| theset = SQLiteSet(filename) | |
| cursor.execute('SELECT string FROM string') | |
| rows = cursor.fetchall() | |
| self.assertEqual(rows, [('mystring',)]) | |
| class FunctionalityTestCase(unittest.TestCase): | |
| def test_add(self): | |
| theset = SQLiteSet(':memory:') | |
| theset.add('foo') | |
| theset.add('bar') | |
| self.assertEqual(theset, {'bar', 'foo'}) | |
| def test_add_nonunique(self): | |
| theset = SQLiteSet(':memory:') | |
| theset.add('foo') | |
| theset.add('foo') | |
| self.assertIn('foo', theset) | |
| self.assertEquals(theset, set(['foo'])) | |
| self.assertEquals(list(theset), ['foo']) | |
| def test_contains(self): | |
| theset = SQLiteSet(':memory:') | |
| theset.add('foo') | |
| theset.add('bar') | |
| self.assertIn('foo', theset) | |
| self.assertIn('bar', theset) | |
| self.assertNotIn('bogus', theset) | |
| def test_discard(self): | |
| theset = SQLiteSet(':memory:') | |
| theset.add('foo') | |
| theset.add('bar') | |
| theset.discard('foo') | |
| self.assertEqual(theset, {'bar'}) | |
| def test_discard_nonexistant(self): | |
| theset = SQLiteSet(':memory:') | |
| theset.add('foo') | |
| theset.discard('bogus') | |
| self.assertEqual(theset, set(['foo'])) | |
| class PersistanceTestCase(unittest.TestCase): | |
| def setUp(self): | |
| self.filename = tempfile.mkstemp(prefix='sqliteset_test') | |
| def tearDown(self): | |
| os.remove(self.filename) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment