Skip to content

Instantly share code, notes, and snippets.

@hillar
Last active August 29, 2015 14:26
Show Gist options
  • Save hillar/f684b54ff051b0da1ff0 to your computer and use it in GitHub Desktop.
Save hillar/f684b54ff051b0da1ff0 to your computer and use it in GitHub Desktop.
# A bot that joins channels and send the events is sees to elasticsearch.
# in bulks ;)
'''
curl -XPUT "http://localhost:9200/_template/abuh" -d ' {
"template": "*@conference*",
"settings": {
"index.refresh_interval": "5s",
"index.number_of_shards": 1,
"index.number_of_replicas": 0
},
"mappings": {
"_default_": {
"_all": {
"enabled": true
},
"dynamic_templates": [{
"strings": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "string",
"index": "analyzed",
"omit_norms": true,
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed",
"ignore_above": 256
}
}
}
}
},
{
"dates": {
"match": ".* time|time|@timestamp",
"match_pattern": "regex",
"mapping": {
"type": "date",
"format" : "yyyy-MM-dd HH:mm:ssZ||yyyy-MM-dd"
}
}
},
{
"ips": {
"match": ".* ip|ip",
"match_pattern": "regex",
"mapping": {
"type": "ip"
}
}
}
]
}
}
}
'
'''
from __future__ import absolute_import
import os
import time
import errno
import calendar
import idiokit
from idiokit.xmpp.jid import JID
from abusehelper.core import bot, taskfarm, events
from es_index import es_bulkindex
import urllib2
def isoformat(seconds=None, format="%Y-%m-%d %H:%M:%SZ"):
"""
Return the ISO 8601 formatted timestamp based on the time
expressed in seconds since the epoch. Use time.time() if seconds
is not given or None.
>>> isoformat(0)
'1970-01-01 00:00:00Z'
see https://bitbucket.org/clarifiednetworks/abusehelper/wiki/Data%20Harmonization%20Ontology#!time
"""
return time.strftime(format, time.gmtime(seconds))
class ElasticSearchArchiveBot(bot.ServiceBot):
elastic_host = bot.Param("""
host:port, where elasticsearch lives (default: %default )
""", default="localhost:9200")
queue_time = bot.IntParam("""
wait no more than the given amount of seconds until sending
the data to elasticsearch (default: %default seconds)
""", default=12)
queue_size = bot.IntParam("""
collect at least the given amount of events before sending
the data to elasticsearch (default: %default events)
""", default=1024)
bot_state_file = None # this bot has no state
def __init__(self, *args, **keys):
bot.ServiceBot.__init__(self, *args, **keys)
self.rooms = taskfarm.TaskFarm(self.handle_room)
self.opener = urllib2.build_opener()
@idiokit.stream
def handle_room(self, name):
msg = "room {0!r}".format(name)
attrs = events.Event(type="room", service=self.bot_name, room=name)
with self.log.stateful(repr(self.xmpp.jid), "room", repr(name)) as log:
log.open("Joining " + msg, attrs, status="joining")
room = yield self.xmpp.muc.join(name, self.bot_name)
log.open("Joined " + msg, attrs, status="joined")
try:
room_name = room.jid.bare()
#yield room | events.stanzas_to_events() | self.collect(room_jid)
yield room | self.collect(room_name, room.jid)
finally:
log.close("Left " + msg, attrs, status="left")
@idiokit.stream
def session(self, state, src_room):
yield self.rooms.inc(src_room)
def collect(self, room_name, own_jid):
collect = self._collect(room_name, own_jid)
idiokit.pipe(self._alert(self.queue_time), collect)
return collect
@idiokit.stream
def _alert(self, flush_interval=10.0):
while True:
yield idiokit.sleep(flush_interval)
yield idiokit.send()
@idiokit.stream
def _collect(self, room_name, own_jid):
timestamp = time.time()
name = self.indice_name(timestamp, room_name)
_queue = []
try:
while True:
elements = yield idiokit.next()
if elements is None: # got alert
if len(_queue) > 0:
print "got alert"
yield self._flush(name, _queue)
_queue = []
continue
timestamp = time.time()
for message in elements.with_attrs("from"):
sender = JID(elements.get_attr("from"))
#print unicode(sender).encode("unicode-escape")
if sender == own_jid or sender.resource is None:
continue
resource = sender.resource.encode("unicode-escape")
#print resource
for body in message.children("body"):
#print body.text.encode("unicode-escape")
_queue.append({"sender":resource, "@timestamp":isoformat(timestamp), "body":body.text.encode("unicode-escape")})
for event in events.Event.from_elements(message):
#print event
eve = dict()
for key, value in event.items():
eve[key] = value
# see https://bitbucket.org/clarifiednetworks/abusehelper/wiki/Data%20Harmonization%20Ontology#!time
#tmp = timestamp;
#if event.contains("observation time"):
# tmp = event.value("observation time")
#if event.contains("source time"):
# tmp = event.value("source time")
_queue.append({"sender":resource, "@timestamp":isoformat(timestamp), "event":eve})
if len(_queue) > self.queue_size:
print "got queue_size %r" % len(_queue)
yield self._flush(name, _queue)
_queue = []
timestamp = time.time()
new_name = self.indice_name(timestamp, room_name)
if new_name != name:
print "got new name"
yield self._flush(name, _queue)
_queue = []
self.log.info("Closed indice {0!r}".format(name))
name = new_name
self.log.info("next batches go to {0!r}".format(name))
finally:
self._flush(name, _queue)
_queue = []
self.log.info("stopped writing from {0!r}".format(name))
@idiokit.stream
def _flush(self, name, _queue):
# while queue pop ...
error, result = yield es_bulkindex("http://10.9.2.126:9200", name, _queue, opener=self.opener)
if error is not None:
self.log.error(error)
if result is not None:
logevent = events.Event({
"indice": name,
"event count": unicode(len(_queue))
})
self.log.info("wrote {0!r} to {1!r} ".format(len(_queue),name), event = logevent)
def indice_name(self, timestamp, room_name):
# TODO, get timestamp format from bot param as 5m,h,d,w,m
tmp = isoformat(timestamp, format="%Y-%m-%d")
return unicode(room_name).encode("utf-8")+"."+tmp
if __name__ == "__main__":
ElasticSearchArchiveBot.from_command_line().execute()
import idiokit
from post_url import post_url
try:
import simplejson as json
except ImportError:
import json
@idiokit.stream
def es_bulkindex(host, indice, data, type=None, opener=None):
url = host + "/_bulk"
bulk = []
if type is None:
type = "undefined"
tmp = json.dumps({"index":{ "_index":indice,"_type":type}})
for line in data:
bulk.append(tmp)
bulk.append(json.dumps(line))
try:
info, output = yield post_url(url, "\n".join(bulk)+"\n", opener)
except Exception as e:
idiokit.stop(e, None)
#TODO, elasticsearch sends error back with header 200 ;(
tmp = json.loads(output.getvalue())
#{"took":34,"errors":false,"items":[{"create":{"_index":"test","_type":"None","_id":"AU73tmvfT1oQ9DwRzSBJ","_version":1,"status":201}}]}
if tmp["errors"]:
# go over items, send only first error
for item in tmp["items"]:
for action in item:
if error in item[action]:
idiokit.stop(item[action]["error"], None)
continue
idiokit.stop(None, output.getvalue())
if __name__ == "__main__":
@idiokit.stream
def test():
eshost = "http://localhost:9200"
indice = "test"
data = [{ "field1" : "value1", "field2" : "value2"}]
yield index_test(eshost, indice, data) | print_result()
@idiokit.stream
def index_test(host, indice, data):
error, output = yield es_bulkindex(host, indice, data, "none")
yield idiokit.send(error,output)
@idiokit.stream
def print_result():
while True:
i,o = yield idiokit.next()
print i
print o
idiokit.main_loop(test())
from __future__ import absolute_import
import socket
import urllib2
import httplib
import email.parser
import idiokit
from cStringIO import StringIO
from abusehelper.core.utils import FetchUrlFailed, FetchUrlTimeout, HTTPError, _is_timeout
@idiokit.stream
def post_url(url, data, opener=None, timeout=60.0, chunk_size=16384):
if opener is None:
opener = urllib2.build_opener()
try:
output = StringIO()
fileobj = yield idiokit.thread(opener.open, url, data=data, timeout=timeout)
try:
while True:
data = yield idiokit.thread(fileobj.read, chunk_size)
if not data:
break
output.write(data)
finally:
fileobj.close()
info = fileobj.info()
info = email.parser.Parser().parsestr(str(info), headersonly=True)
output.seek(0)
idiokit.stop(info, output)
except urllib2.HTTPError as he:
raise HTTPError(he.code, he.msg, he.hdrs, he.fp)
except urllib2.URLError as error:
if _is_timeout(error.reason):
raise FetchUrlTimeout("fetching URL timed out")
raise FetchUrlFailed(str(error))
except socket.error as error:
if _is_timeout(error):
raise FetchUrlTimeout("fetching URL timed out")
raise FetchUrlFailed(str(error))
except httplib.HTTPException as error:
raise FetchUrlFailed(str(error))
if __name__ == "__main__":
@idiokit.stream
def test(url,data):
url = "http://localhost:9200/_bulk"
data = "{\"index\" : { \"_index\" : \"test\", \"_type\" : \"type1\", \"_id\" : \"1\" } }\n{ \"field1\" : \"value1\" }\n"
yield post_test(url, data) | print_result()
@idiokit.stream
def post_test(url, data):
try:
info, output = yield post_url(url, data)
except FetchUrlFailed as e:
print e
idiokit.stop(False)
yield idiokit.send(info, output)
@idiokit.stream
def print_result():
while True:
i,o = yield idiokit.next()
print i
print o.getvalue()
idiokit.main_loop(test(url, data))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment