Created
March 13, 2015 20:39
-
-
Save SegFaultAX/cc6caa30136f213b5e12 to your computer and use it in GitHub Desktop.
Elasticsearch Curator - Index Filtering Proposal #1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import operator | |
import curator | |
import elasticsearch | |
class IndexFilter(object): | |
def __init__(self, | |
time_format="%Y.%m.%d", | |
pattern=r"(?P<date>{timestamp})$", | |
filters=None): | |
self.time_format = time_format | |
self.pattern = pattern | |
self.time_re = re.compile(self.pattern.format( | |
timestamp=curator.get_date_regex(time_format))) | |
self.filters = filters if filters is not None else [] | |
def _add_filter(self, fn): | |
return self.__class__( | |
time_format=self.time_format, | |
pattern=self.pattern, | |
filters=self.filters + [fn]) | |
def filter(self, indices): | |
return (idx for idx in indices if all(f(idx) for f in self.filters)) | |
def with_re(self, regex): | |
def re_predicate(index): | |
return re.search(regex, index) is not None | |
return self._add_filter(re_predicate) | |
def with_prefix(self, prefix): | |
return self.with_re("^{}".format(re.escape(prefix))) | |
def with_suffix(self, suffix): | |
return self.with_re("{}$".format(re.escape(suffix))) | |
def with_time_comparison(self, value, comparator, units, utc_now): | |
def time_compare_predicate(index): | |
m = self.time_re.search(index) | |
if not m: | |
return False | |
dt = curator.get_datetime(m.group("date"), self.time_format) | |
cutoff = curator.get_cutoff(value, units, utc_now) | |
return comparator(dt, cutoff) | |
return self._add_filter(time_compare_predicate) | |
def older_than(self, value, units="days", utc_now=None): | |
return self.with_time_comparison(value, operator.lt, units, utc_now) | |
def newer_than(self, value, units="days", utc_now=None): | |
return self.with_time_comparison(value, operator.gt, units, utc_now) | |
if __name__ == "__main__": | |
import unittest, datetime | |
class IndexFilterTest(unittest.TestCase): | |
def setUp(self): | |
self.indices = [ | |
u'logstash-2015.03.08', | |
u'logstash-2015.03.09', | |
u'logstash-2015.03.04', | |
u'logstash-2015.03.05', | |
u'logstash-2015.03.06', | |
u'logstash-2015.03.07', | |
u'logstash-2015.03.01', | |
u'logstash-2015.03.02', | |
u'logstash-2015.03.03', | |
u'cluster', | |
u'.marvel-2015.03.10', | |
u'.marvel-2015.03.11', | |
u'kibana-int', | |
u'.marvel-2015.03.13', | |
u'.marvel-2015.03.12', | |
u'.marvel-2015.03.09', | |
u'.marvel-2015.02.27', | |
u'.marvel-2015.02.28', | |
u'logstash-2015.03.13', | |
u'logstash-2015.03.12', | |
u'logstash-2015.03.11', | |
u'logstash-2015.03.10', | |
u'logstash-2015.02.28', | |
u'.marvel-kibana', | |
u'.marvel-2015.03.08', | |
u'.marvel-2015.03.07', | |
u'.marvel-2015.03.06', | |
u'.marvel-2015.03.05', | |
u'.marvel-2015.03.04', | |
u'.marvel-2015.03.03', | |
u'.marvel-2015.03.02', | |
u'.marvel-2015.03.01' | |
] | |
def test_base_filter(self): | |
self.assertEqual( | |
list(IndexFilter().filter(self.indices)), | |
self.indices) | |
def test_prefix_filter(self): | |
self.assertEqual( | |
list(IndexFilter().with_prefix("logstash-").filter(self.indices)), | |
list(e for e in self.indices if e.startswith("logstash"))) | |
def test_suffix_filter(self): | |
self.assertEqual( | |
list(IndexFilter().with_suffix("kibana").filter(self.indices)), | |
list(e for e in self.indices if e.endswith("kibana"))) | |
def test_chaining(self): | |
f = (IndexFilter() | |
.with_prefix(".marvel") | |
.with_suffix("kibana")) | |
self.assertEqual( | |
list(f.filter(self.indices)), | |
list(e for e in self.indices if e.startswith(".marvel") and | |
e.endswith("kibana"))) | |
def test_older_than(self): | |
now = datetime.datetime(2015, 3, 13) | |
f = (IndexFilter() | |
.with_prefix("logstash-") | |
.older_than(10, "days", now)) | |
self.assertEqual( | |
list(f.filter(self.indices)), | |
[ | |
u'logstash-2015.03.01', | |
u'logstash-2015.03.02', | |
u'logstash-2015.03.03', | |
u'logstash-2015.02.28' | |
]) | |
def test_newer_than(self): | |
now = datetime.datetime(2015, 3, 13) | |
f = (IndexFilter() | |
.with_prefix("logstash-") | |
.newer_than(3, "days", now)) | |
self.assertEqual( | |
list(f.filter(self.indices)), | |
[ | |
u'logstash-2015.03.13', | |
u'logstash-2015.03.12' | |
]) | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment