-
-
Save peterbe/1127141 to your computer and use it in GitHub Desktop.
newtcbs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import sys | |
import logging | |
import psycopg2 | |
logger = logging.getLogger("duplicates") | |
logger.addHandler(logging.StreamHandler(sys.stderr)) | |
def update(config, targetDate): | |
functions = ( | |
# function name, parmeters, dependencies | |
('update_product_versions',[],[]), | |
('update_signatures', [targetDate],[]), | |
('update_os_versions', [targetDate],[]), | |
('update_tcbs', [targetDate], ['update_product_versions', 'update_signatures']), | |
('update_adu', [targetDate],[]) | |
) | |
failed = set() | |
databaseDSN = "host=%(databaseHost)s "\ | |
"dbname=%(databaseName)s "\ | |
"user=%(databaseUserName)s "\ | |
"password=%(databasePassword)s" % config | |
connection = psycopg2.connect(databaseDSN) | |
cursor = connection.cursor() | |
for funcname, parameters, deps in functions: | |
if set(deps) & failed: | |
# one of the deps previously failed, so skip this one | |
logger.warn("For %r, dependency %s failed so skipping" | |
% (funcname, set(deps) & failed)) | |
continue | |
logger.info('Running %s' % funcname) | |
cursor.callproc(funcname, parameters) | |
if cursor.fetchone(): | |
connection.commit() | |
else: | |
failed.add(funcname) | |
logger.warn('%r failed' % funcname) | |
return len(failed) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
import newtcbs | |
from mock import patch | |
class mock_connection: | |
def __init__(self, c): | |
self.c = c | |
def cursor(self): | |
return self.c | |
def commit(self): | |
pass | |
class mock_psycopg2: | |
def __init__(self, cursor): | |
self.cursor = cursor | |
def connect(self, *args, **kwargs): | |
return mock_connection(self.cursor) | |
class mock_cursor: | |
def __init__(self, returns): | |
self.returns = returns | |
self.called = [] | |
def callproc(self, name, params): | |
self.name = name | |
self.called.append(name) | |
def fetchone(self): | |
if self.name in self.returns: | |
return self.returns[self.name] | |
return True | |
class TestCase(unittest.TestCase): | |
def setUp(self): | |
self.config = {'databaseHost': '', | |
'databasePassword': '', | |
'databaseName': '', | |
'databaseUserName': '', | |
} | |
def test_failing__update_product_versions(self): | |
cursor = mock_cursor({ | |
'update_product_versions': False, | |
}) | |
self.mock_psycopg2 = mock_psycopg2(cursor) | |
newtcbs.psycopg2 = self.mock_psycopg2 | |
with patch('newtcbs.logger') as mock_logger: | |
newtcbs.update(self.config, None) | |
self.assertEqual(cursor.called, [ | |
'update_product_versions', 'update_signatures', | |
'update_os_versions', 'update_adu' | |
]) | |
self.assertEqual(mock_logger.info.call_count, 4) | |
self.assertEqual(mock_logger.warn.call_count, 2) | |
self.assertEqual(mock_logger.error.call_count, 0) | |
def test_all_works(self): | |
with patch('newtcbs.logger') as mock_logger: | |
with patch('newtcbs.psycopg2') as mock_psycopg2: | |
newtcbs.update(self.config, None) | |
connection = mock_psycopg2.connect.return_value | |
cursor = connection.cursor.return_value | |
self.assertEqual(cursor.callproc.call_count, 5) | |
self.assertEqual(connection.commit.call_count, 5) | |
self.assertEqual(mock_logger.info.call_count, 5) | |
self.assertEqual(mock_logger.warn.call_count, 0) | |
self.assertEqual(mock_logger.error.call_count, 0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment