Created
April 29, 2011 17:58
-
-
Save mattbillenstein/948717 to your computer and use it in GitHub Desktop.
A simple cron job parser...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import datetime | |
class cronjob(object): | |
def __init__(self, line): | |
self.process_line(line) | |
def process_line(self, line): | |
m, h, dom, mon, dow, self.command = line.split(None, 5) | |
self.minutes = self.process_cronpart(m, 0, 59) | |
self.hours = self.process_cronpart(h, 0, 23) | |
self.days_of_month = self.process_cronpart(dom, 1, 31) | |
self.months = self.process_cronpart(mon, 1, 12) | |
self.days_of_week = self.process_cronpart(dow, 0, 6) | |
def process_cronpart(self, spec, min, max): | |
''' | |
simple (broken) cronpart parser -- supports multiple ranges, | |
modulo-style selectors, and the obvious * wildcard ... | |
This recurses on itself to parse sub-parts after we've split off a | |
selector as well. | |
''' | |
s = set() | |
for part in spec.split(','): | |
if '/' in part: | |
# intentionally support things like */3/2 which selects */6 ... | |
p, n = part.rsplit('/', 1) | |
n = int(n) | |
s.update(x for x in self.process_cronpart(p, min, max) if x % n == 0) | |
elif '-' in part: | |
a, b = part.split('-') | |
s.update(range(int(a), int(b)+1, 1)) | |
elif part == '*': | |
s.update(range(min, max+1, 1)) | |
else: | |
s.add(int(part)) | |
return s | |
def would_run(self, dt): | |
if dt.minute in self.minutes and \ | |
dt.hour in self.hours and \ | |
dt.day in self.days_of_month and \ | |
dt.month in self.months and \ | |
dt.weekday() in self.days_of_week: | |
return True | |
return False | |
@staticmethod | |
def process_file(f): | |
if isinstance(f, basestring): | |
f = open(f) | |
jobs = [] | |
for line in f: | |
line = line.strip() | |
if not line or line.startswith("#"): | |
continue | |
jobs.append(cronjob(line)) | |
return jobs | |
fname = sys.argv[1] | |
jobs = cronjob.process_file(fname) | |
dt = datetime.datetime.now() | |
end = dt + datetime.timedelta(days=7) | |
td = datetime.timedelta(minutes=1) | |
while dt < end: | |
for job in jobs: | |
if job.would_run(dt): | |
print "Run:", dt, job.command | |
dt += td |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment