Skip to content

Instantly share code, notes, and snippets.

@SegFaultAX
Created March 13, 2015 20:39
Show Gist options
  • Save SegFaultAX/cc6caa30136f213b5e12 to your computer and use it in GitHub Desktop.
Save SegFaultAX/cc6caa30136f213b5e12 to your computer and use it in GitHub Desktop.
Elasticsearch Curator - Index Filtering Proposal #1
import re
import operator
import curator
import elasticsearch
class IndexFilter(object):
def __init__(self,
time_format="%Y.%m.%d",
pattern=r"(?P<date>{timestamp})$",
filters=None):
self.time_format = time_format
self.pattern = pattern
self.time_re = re.compile(self.pattern.format(
timestamp=curator.get_date_regex(time_format)))
self.filters = filters if filters is not None else []
def _add_filter(self, fn):
return self.__class__(
time_format=self.time_format,
pattern=self.pattern,
filters=self.filters + [fn])
def filter(self, indices):
return (idx for idx in indices if all(f(idx) for f in self.filters))
def with_re(self, regex):
def re_predicate(index):
return re.search(regex, index) is not None
return self._add_filter(re_predicate)
def with_prefix(self, prefix):
return self.with_re("^{}".format(re.escape(prefix)))
def with_suffix(self, suffix):
return self.with_re("{}$".format(re.escape(suffix)))
def with_time_comparison(self, value, comparator, units, utc_now):
def time_compare_predicate(index):
m = self.time_re.search(index)
if not m:
return False
dt = curator.get_datetime(m.group("date"), self.time_format)
cutoff = curator.get_cutoff(value, units, utc_now)
return comparator(dt, cutoff)
return self._add_filter(time_compare_predicate)
def older_than(self, value, units="days", utc_now=None):
return self.with_time_comparison(value, operator.lt, units, utc_now)
def newer_than(self, value, units="days", utc_now=None):
return self.with_time_comparison(value, operator.gt, units, utc_now)
if __name__ == "__main__":
import unittest, datetime
class IndexFilterTest(unittest.TestCase):
def setUp(self):
self.indices = [
u'logstash-2015.03.08',
u'logstash-2015.03.09',
u'logstash-2015.03.04',
u'logstash-2015.03.05',
u'logstash-2015.03.06',
u'logstash-2015.03.07',
u'logstash-2015.03.01',
u'logstash-2015.03.02',
u'logstash-2015.03.03',
u'cluster',
u'.marvel-2015.03.10',
u'.marvel-2015.03.11',
u'kibana-int',
u'.marvel-2015.03.13',
u'.marvel-2015.03.12',
u'.marvel-2015.03.09',
u'.marvel-2015.02.27',
u'.marvel-2015.02.28',
u'logstash-2015.03.13',
u'logstash-2015.03.12',
u'logstash-2015.03.11',
u'logstash-2015.03.10',
u'logstash-2015.02.28',
u'.marvel-kibana',
u'.marvel-2015.03.08',
u'.marvel-2015.03.07',
u'.marvel-2015.03.06',
u'.marvel-2015.03.05',
u'.marvel-2015.03.04',
u'.marvel-2015.03.03',
u'.marvel-2015.03.02',
u'.marvel-2015.03.01'
]
def test_base_filter(self):
self.assertEqual(
list(IndexFilter().filter(self.indices)),
self.indices)
def test_prefix_filter(self):
self.assertEqual(
list(IndexFilter().with_prefix("logstash-").filter(self.indices)),
list(e for e in self.indices if e.startswith("logstash")))
def test_suffix_filter(self):
self.assertEqual(
list(IndexFilter().with_suffix("kibana").filter(self.indices)),
list(e for e in self.indices if e.endswith("kibana")))
def test_chaining(self):
f = (IndexFilter()
.with_prefix(".marvel")
.with_suffix("kibana"))
self.assertEqual(
list(f.filter(self.indices)),
list(e for e in self.indices if e.startswith(".marvel") and
e.endswith("kibana")))
def test_older_than(self):
now = datetime.datetime(2015, 3, 13)
f = (IndexFilter()
.with_prefix("logstash-")
.older_than(10, "days", now))
self.assertEqual(
list(f.filter(self.indices)),
[
u'logstash-2015.03.01',
u'logstash-2015.03.02',
u'logstash-2015.03.03',
u'logstash-2015.02.28'
])
def test_newer_than(self):
now = datetime.datetime(2015, 3, 13)
f = (IndexFilter()
.with_prefix("logstash-")
.newer_than(3, "days", now))
self.assertEqual(
list(f.filter(self.indices)),
[
u'logstash-2015.03.13',
u'logstash-2015.03.12'
])
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment