Skip to content

Instantly share code, notes, and snippets.

@sharoonthomas
Created May 26, 2011 15:09
Show Gist options
  • Save sharoonthomas/993335 to your computer and use it in GitHub Desktop.
Save sharoonthomas/993335 to your computer and use it in GitHub Desktop.
Test for tryton dispatcher
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#This file is part of Tryton. The COPYRIGHT file at the top level of
#this repository contains the full copyright notices and license terms.
import unittest
import threading
from functools import partial
from trytond.tests.test_tryton import POOL, DB_NAME, USER, CONTEXT, \
install_module
from trytond.security import login
from trytond.protocols.dispatcher import dispatch
from trytond.transaction import Transaction
class DispatcherTestCase(unittest.TestCase):
'''Test Dispatcher
Tryton uses Serialised transactions which has the highest isolation level.
This causes transactions to fail if a running transaction modifies a record
after the second transaction began and the new transaction wants to modify
the record. This test case inspects such cases.
'''
def setUp(self):
"""To test the dispatch where object_type = model a model is required
and to test the same the test module is installed
"""
install_module('test')
_, self.session = login(DB_NAME, 'admin', 'admin')
self.sequence_obj = POOL.get('ir.sequence')
# By default the sequence's get method cannot be called on RPC, so add
# it to the RPC safe list
self.sequence_obj._rpc['get_id'] = True
def get_partial_dispatcher(self):
"""A dispatcher with most of the default arguments filled up is
returned so that the verbosity of the calls in each test can be reduced
"""
return partial(dispatch, 'localhost', 5432, 'SOMERPC', DB_NAME, USER,
self.session, 'model', 'ir.sequence', 'get_id', context={})
def test0010sequential(self):
"""Sequentially execute model calls and ensure they work
"""
dispatcher = self.get_partial_dispatcher()
with Transaction().start(DB_NAME, USER, CONTEXT) as transaction:
self.sequence_id = self.sequence_obj.create({
'name': 'Test incremental',
'code': 'test',
'prefix': '',
'suffix': '',
'type': 'incremental',
})
transaction.cursor.commit()
for expected_id in xrange(1, 500 + 1):
self.assertEqual(dispatcher(self.sequence_id), str(expected_id))
def test0020threaded(self):
"""Asynchronously and concurrently call the get_id method
"""
dispatcher = self.get_partial_dispatcher()
with Transaction().start(DB_NAME, USER, CONTEXT) as transaction:
self.sequence_id = self.sequence_obj.create({
'name': 'Test incremental',
'code': 'test',
'prefix': '',
'suffix': '',
'type': 'incremental',
})
transaction.cursor.commit()
thread_1 = threading.Thread(
target = map,
args = (dispatcher, xrange(1, 500 + 1)))
thread_2 = threading.Thread(
target = map,
args = (dispatcher, xrange(1, 500 + 1)))
thread_1.start()
thread_2.start()
thread_1.join()
thread_2.join()
def suite():
return unittest.TestLoader().loadTestsFromTestCase(DispatcherTestCase)
if __name__ == '__main__':
suite = suite()
unittest.TextTestRunner(verbosity=2).run(suite)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment