Last active
August 29, 2015 14:26
-
-
Save hillar/f684b54ff051b0da1ff0 to your computer and use it in GitHub Desktop.
abusehelper post url helper function, see https://bitbucket.org/clarifiednetworks/abusehelper/src/8a0cf2672c5a792b82a725f3dd9798883ff424f3/abusehelper/core/utils.py?at=default#utils.py-60
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A bot that joins channels and send the events is sees to elasticsearch. | |
# in bulks ;) | |
''' | |
curl -XPUT "http://localhost:9200/_template/abuh" -d ' { | |
"template": "*@conference*", | |
"settings": { | |
"index.refresh_interval": "5s", | |
"index.number_of_shards": 1, | |
"index.number_of_replicas": 0 | |
}, | |
"mappings": { | |
"_default_": { | |
"_all": { | |
"enabled": true | |
}, | |
"dynamic_templates": [{ | |
"strings": { | |
"match": "*", | |
"match_mapping_type": "string", | |
"mapping": { | |
"type": "string", | |
"index": "analyzed", | |
"omit_norms": true, | |
"fields": { | |
"raw": { | |
"type": "string", | |
"index": "not_analyzed", | |
"ignore_above": 256 | |
} | |
} | |
} | |
} | |
}, | |
{ | |
"dates": { | |
"match": ".* time|time|@timestamp", | |
"match_pattern": "regex", | |
"mapping": { | |
"type": "date", | |
"format" : "yyyy-MM-dd HH:mm:ssZ||yyyy-MM-dd" | |
} | |
} | |
}, | |
{ | |
"ips": { | |
"match": ".* ip|ip", | |
"match_pattern": "regex", | |
"mapping": { | |
"type": "ip" | |
} | |
} | |
} | |
] | |
} | |
} | |
} | |
' | |
''' | |
from __future__ import absolute_import | |
import os | |
import time | |
import errno | |
import calendar | |
import idiokit | |
from idiokit.xmpp.jid import JID | |
from abusehelper.core import bot, taskfarm, events | |
from es_index import es_bulkindex | |
import urllib2 | |
def isoformat(seconds=None, format="%Y-%m-%d %H:%M:%SZ"): | |
""" | |
Return the ISO 8601 formatted timestamp based on the time | |
expressed in seconds since the epoch. Use time.time() if seconds | |
is not given or None. | |
>>> isoformat(0) | |
'1970-01-01 00:00:00Z' | |
see https://bitbucket.org/clarifiednetworks/abusehelper/wiki/Data%20Harmonization%20Ontology#!time | |
""" | |
return time.strftime(format, time.gmtime(seconds)) | |
class ElasticSearchArchiveBot(bot.ServiceBot): | |
elastic_host = bot.Param(""" | |
host:port, where elasticsearch lives (default: %default ) | |
""", default="localhost:9200") | |
queue_time = bot.IntParam(""" | |
wait no more than the given amount of seconds until sending | |
the data to elasticsearch (default: %default seconds) | |
""", default=12) | |
queue_size = bot.IntParam(""" | |
collect at least the given amount of events before sending | |
the data to elasticsearch (default: %default events) | |
""", default=1024) | |
bot_state_file = None # this bot has no state | |
def __init__(self, *args, **keys): | |
bot.ServiceBot.__init__(self, *args, **keys) | |
self.rooms = taskfarm.TaskFarm(self.handle_room) | |
self.opener = urllib2.build_opener() | |
@idiokit.stream | |
def handle_room(self, name): | |
msg = "room {0!r}".format(name) | |
attrs = events.Event(type="room", service=self.bot_name, room=name) | |
with self.log.stateful(repr(self.xmpp.jid), "room", repr(name)) as log: | |
log.open("Joining " + msg, attrs, status="joining") | |
room = yield self.xmpp.muc.join(name, self.bot_name) | |
log.open("Joined " + msg, attrs, status="joined") | |
try: | |
room_name = room.jid.bare() | |
#yield room | events.stanzas_to_events() | self.collect(room_jid) | |
yield room | self.collect(room_name, room.jid) | |
finally: | |
log.close("Left " + msg, attrs, status="left") | |
@idiokit.stream | |
def session(self, state, src_room): | |
yield self.rooms.inc(src_room) | |
def collect(self, room_name, own_jid): | |
collect = self._collect(room_name, own_jid) | |
idiokit.pipe(self._alert(self.queue_time), collect) | |
return collect | |
@idiokit.stream | |
def _alert(self, flush_interval=10.0): | |
while True: | |
yield idiokit.sleep(flush_interval) | |
yield idiokit.send() | |
@idiokit.stream | |
def _collect(self, room_name, own_jid): | |
timestamp = time.time() | |
name = self.indice_name(timestamp, room_name) | |
_queue = [] | |
try: | |
while True: | |
elements = yield idiokit.next() | |
if elements is None: # got alert | |
if len(_queue) > 0: | |
print "got alert" | |
yield self._flush(name, _queue) | |
_queue = [] | |
continue | |
timestamp = time.time() | |
for message in elements.with_attrs("from"): | |
sender = JID(elements.get_attr("from")) | |
#print unicode(sender).encode("unicode-escape") | |
if sender == own_jid or sender.resource is None: | |
continue | |
resource = sender.resource.encode("unicode-escape") | |
#print resource | |
for body in message.children("body"): | |
#print body.text.encode("unicode-escape") | |
_queue.append({"sender":resource, "@timestamp":isoformat(timestamp), "body":body.text.encode("unicode-escape")}) | |
for event in events.Event.from_elements(message): | |
#print event | |
eve = dict() | |
for key, value in event.items(): | |
eve[key] = value | |
# see https://bitbucket.org/clarifiednetworks/abusehelper/wiki/Data%20Harmonization%20Ontology#!time | |
#tmp = timestamp; | |
#if event.contains("observation time"): | |
# tmp = event.value("observation time") | |
#if event.contains("source time"): | |
# tmp = event.value("source time") | |
_queue.append({"sender":resource, "@timestamp":isoformat(timestamp), "event":eve}) | |
if len(_queue) > self.queue_size: | |
print "got queue_size %r" % len(_queue) | |
yield self._flush(name, _queue) | |
_queue = [] | |
timestamp = time.time() | |
new_name = self.indice_name(timestamp, room_name) | |
if new_name != name: | |
print "got new name" | |
yield self._flush(name, _queue) | |
_queue = [] | |
self.log.info("Closed indice {0!r}".format(name)) | |
name = new_name | |
self.log.info("next batches go to {0!r}".format(name)) | |
finally: | |
self._flush(name, _queue) | |
_queue = [] | |
self.log.info("stopped writing from {0!r}".format(name)) | |
@idiokit.stream | |
def _flush(self, name, _queue): | |
# while queue pop ... | |
error, result = yield es_bulkindex("http://10.9.2.126:9200", name, _queue, opener=self.opener) | |
if error is not None: | |
self.log.error(error) | |
if result is not None: | |
logevent = events.Event({ | |
"indice": name, | |
"event count": unicode(len(_queue)) | |
}) | |
self.log.info("wrote {0!r} to {1!r} ".format(len(_queue),name), event = logevent) | |
def indice_name(self, timestamp, room_name): | |
# TODO, get timestamp format from bot param as 5m,h,d,w,m | |
tmp = isoformat(timestamp, format="%Y-%m-%d") | |
return unicode(room_name).encode("utf-8")+"."+tmp | |
if __name__ == "__main__": | |
ElasticSearchArchiveBot.from_command_line().execute() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import idiokit | |
from post_url import post_url | |
try: | |
import simplejson as json | |
except ImportError: | |
import json | |
@idiokit.stream | |
def es_bulkindex(host, indice, data, type=None, opener=None): | |
url = host + "/_bulk" | |
bulk = [] | |
if type is None: | |
type = "undefined" | |
tmp = json.dumps({"index":{ "_index":indice,"_type":type}}) | |
for line in data: | |
bulk.append(tmp) | |
bulk.append(json.dumps(line)) | |
try: | |
info, output = yield post_url(url, "\n".join(bulk)+"\n", opener) | |
except Exception as e: | |
idiokit.stop(e, None) | |
#TODO, elasticsearch sends error back with header 200 ;( | |
tmp = json.loads(output.getvalue()) | |
#{"took":34,"errors":false,"items":[{"create":{"_index":"test","_type":"None","_id":"AU73tmvfT1oQ9DwRzSBJ","_version":1,"status":201}}]} | |
if tmp["errors"]: | |
# go over items, send only first error | |
for item in tmp["items"]: | |
for action in item: | |
if error in item[action]: | |
idiokit.stop(item[action]["error"], None) | |
continue | |
idiokit.stop(None, output.getvalue()) | |
if __name__ == "__main__": | |
@idiokit.stream | |
def test(): | |
eshost = "http://localhost:9200" | |
indice = "test" | |
data = [{ "field1" : "value1", "field2" : "value2"}] | |
yield index_test(eshost, indice, data) | print_result() | |
@idiokit.stream | |
def index_test(host, indice, data): | |
error, output = yield es_bulkindex(host, indice, data, "none") | |
yield idiokit.send(error,output) | |
@idiokit.stream | |
def print_result(): | |
while True: | |
i,o = yield idiokit.next() | |
print i | |
print o | |
idiokit.main_loop(test()) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import | |
import socket | |
import urllib2 | |
import httplib | |
import email.parser | |
import idiokit | |
from cStringIO import StringIO | |
from abusehelper.core.utils import FetchUrlFailed, FetchUrlTimeout, HTTPError, _is_timeout | |
@idiokit.stream | |
def post_url(url, data, opener=None, timeout=60.0, chunk_size=16384): | |
if opener is None: | |
opener = urllib2.build_opener() | |
try: | |
output = StringIO() | |
fileobj = yield idiokit.thread(opener.open, url, data=data, timeout=timeout) | |
try: | |
while True: | |
data = yield idiokit.thread(fileobj.read, chunk_size) | |
if not data: | |
break | |
output.write(data) | |
finally: | |
fileobj.close() | |
info = fileobj.info() | |
info = email.parser.Parser().parsestr(str(info), headersonly=True) | |
output.seek(0) | |
idiokit.stop(info, output) | |
except urllib2.HTTPError as he: | |
raise HTTPError(he.code, he.msg, he.hdrs, he.fp) | |
except urllib2.URLError as error: | |
if _is_timeout(error.reason): | |
raise FetchUrlTimeout("fetching URL timed out") | |
raise FetchUrlFailed(str(error)) | |
except socket.error as error: | |
if _is_timeout(error): | |
raise FetchUrlTimeout("fetching URL timed out") | |
raise FetchUrlFailed(str(error)) | |
except httplib.HTTPException as error: | |
raise FetchUrlFailed(str(error)) | |
if __name__ == "__main__": | |
@idiokit.stream | |
def test(url,data): | |
url = "http://localhost:9200/_bulk" | |
data = "{\"index\" : { \"_index\" : \"test\", \"_type\" : \"type1\", \"_id\" : \"1\" } }\n{ \"field1\" : \"value1\" }\n" | |
yield post_test(url, data) | print_result() | |
@idiokit.stream | |
def post_test(url, data): | |
try: | |
info, output = yield post_url(url, data) | |
except FetchUrlFailed as e: | |
print e | |
idiokit.stop(False) | |
yield idiokit.send(info, output) | |
@idiokit.stream | |
def print_result(): | |
while True: | |
i,o = yield idiokit.next() | |
print i | |
print o.getvalue() | |
idiokit.main_loop(test(url, data)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment