Created
January 20, 2014 07:47
-
-
Save agronholm/8516459 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@pytest.fixture() | |
def memjobstore(): | |
return MemoryJobStore() | |
@pytest.fixture() | |
def shelvejobstore(request): | |
def finish(): | |
jobstore.close() | |
if os.path.exists(jobstore.path): | |
os.remove(jobstore.path) | |
filterwarnings('ignore', category=RuntimeWarning) | |
f = NamedTemporaryFile(prefix='apscheduler_') | |
f.close() | |
resetwarnings() | |
jobstore = ShelveJobStore(f.name) | |
request.addfinalizer(finish) | |
return jobstore | |
@pytest.fixture() | |
def sqlalchemyjobstore(request): | |
if not SQLAlchemyJobStore: | |
pytest.skip('SQLAlchemyJobStore missing') | |
jobstore = SQLAlchemyJobStore(url='sqlite:///') | |
request.addfinalizer(jobstore.close) | |
return jobstore | |
@pytest.fixture() | |
def mongodbjobstore(request): | |
def finish(): | |
connection = jobstore.collection.database.connection | |
connection.drop_database(jobstore.collection.database.name) | |
jobstore.close() | |
if not MongoDBJobStore: | |
pytest.skip('MongoDBJobStore missing') | |
jobstore = MongoDBJobStore(database='apscheduler_unittest') | |
request.addfinalizer(finish) | |
return jobstore | |
@pytest.fixture() | |
def redisjobstore(request): | |
def finish(): | |
jobstore.redis.flushdb() | |
jobstore.close() | |
if not RedisJobStore: | |
pytest.skip('RedisJobStore missing') | |
jobstore = RedisJobStore() | |
request.addfinalizer(finish) | |
return jobstore | |
@pytest.mark.parametrize('jobstore', [memjobstore, shelvejobstore, mongodbjobstore, redisjobstore]) | |
def test_jobstore_add_update_remove(jobstore): | |
trigger_date = datetime(2999, 1, 1) | |
earlier_date = datetime(2998, 12, 31) | |
trigger = DateTrigger({}, trigger_date, local_tz) | |
job = Job(trigger, dummy_job, [], {}, 1, False, None, None, 1) | |
job.next_run_time = trigger_date | |
assert jobstore.jobs == [] | |
jobstore.add_job(job) | |
assert jobstore.jobs == [job] | |
assert jobstore.jobs[0] == job | |
assert jobstore.jobs[0].runs == 0 | |
job.runs += 1 | |
jobstore.update_job(job) | |
jobstore.load_jobs() | |
assert len(jobstore.jobs) == 1 | |
assert jobstore.jobs == [job] | |
assert jobstore.jobs[0].runs == 1 | |
jobstore.remove_job(job) | |
assert jobstore.jobs == [] | |
jobstore.load_jobs() | |
assert jobstore.jobs == [] | |
@pytest.mark.parametrize('jobstore', [shelvejobstore, mongodbjobstore, redisjobstore]) | |
def test_one_job_fails_to_load(jobstore): | |
trigger_date = datetime(2999, 1, 1) | |
trigger = DateTrigger({}, trigger_date, local_tz) | |
global dummy_job2, dummy_job_temp | |
job1 = Job(trigger, dummy_job, [], {}, 1, False, None, None, 1) | |
job2 = Job(trigger, dummy_job2, [], {}, 1, False, None, None, 1) | |
job3 = Job(trigger, dummy_job3, [], {}, 1, False, None, None, 1) | |
for job in job1, job2, job3: | |
job.next_run_time = trigger_date | |
jobstore.add_job(job) | |
dummy_job_temp = dummy_job2 | |
del dummy_job2 | |
try: | |
jobstore.load_jobs() | |
assert len(jobstore.jobs) == 2 | |
finally: | |
dummy_job2 = dummy_job_temp | |
del dummy_job_temp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment