Created
April 18, 2018 14:03
-
-
Save komeda-shinji/13855ddb0140a918dff8543707dce4ee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr//bin/env python | |
# | |
# This code is based collectd-network-client-python, forked from https://github.com/appliedsec/collectd | |
# | |
import sys | |
import awscli.clidriver | |
import StringIO | |
import json | |
import datetime | |
import dateutil.parser | |
import re | |
import time | |
from calendar import timegm | |
import socket | |
import struct | |
import logging | |
import traceback | |
from functools import wraps | |
from threading import Thread | |
logger = logging.getLogger("cloudwatch") | |
#logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) | |
#logging.basicConfig(stream=sys.stderr, level=logging.WARNING) | |
#from collectd import PLUGIN_TYPE, \ | |
# TYPE_HOST, TYPE_TIME, TYPE_PLUGIN, TYPE_PLUGIN_INSTANCE, \ | |
# TYPE_TYPE, TYPE_TYPE_INSTANCE, TYPE_VALUES, TYPE_INTERVAL, \ | |
# LONG_INT_CODES, STRING_CODES, \ | |
# VALUE_COUNTER, VALUE_GAUGE, VALUE_DERIVE, VALUE_ABSOLUTE, \ | |
# SEND_INTERVAL, MAX_PACKET_SIZE | |
# pack_numeric, pack_string, pack_value, pack, \ | |
# message_start, messages, sanitize, daemonize | |
SEND_INTERVAL = 300 # seconds | |
MAX_PACKET_SIZE = 1024 # bytes | |
TYPE_HOST = 0x0000 | |
TYPE_TIME = 0x0001 | |
TYPE_PLUGIN = 0x0002 | |
TYPE_PLUGIN_INSTANCE = 0x0003 | |
TYPE_TYPE = 0x0004 | |
TYPE_TYPE_INSTANCE = 0x0005 | |
TYPE_VALUES = 0x0006 | |
TYPE_INTERVAL = 0x0007 | |
LONG_INT_CODES = [TYPE_TIME, TYPE_INTERVAL] | |
STRING_CODES = [TYPE_HOST, TYPE_PLUGIN, TYPE_PLUGIN_INSTANCE, TYPE_TYPE, TYPE_TYPE_INSTANCE] | |
VALUE_COUNTER = 0 | |
VALUE_GAUGE = 1 | |
VALUE_DERIVE = 2 | |
VALUE_ABSOLUTE = 3 | |
VALUE_CODES = { | |
VALUE_COUNTER: "!Q", | |
VALUE_GAUGE: "<d", | |
VALUE_DERIVE: "!q", | |
VALUE_ABSOLUTE: "!Q" | |
} | |
metrics = {} | |
metrics["AWS/EC2"] = {} | |
metrics["AWS/EC2"]["status"] = [ | |
"StatusCheckFailed", | |
"StatusCheckFailed_System", | |
"StatusCheckFailed_Instance" | |
] | |
metrics["AWS/EC2"]["cpu"] = [ | |
"CPUUtilization", | |
"CPUCreditBalance", | |
"CPUCreditUsage" | |
] | |
metrics["AWS/EC2"]["disk"] = [ | |
"DiskReadBytes", | |
"DiskWriteBytes", | |
"DiskReadOps", | |
"DiskWriteOps" | |
] | |
metrics["AWS/EC2"]["network"] = [ | |
"NetworkIn" | |
"NetworkOut" | |
] | |
metrics["AWS/EBS"] = {} | |
metrics["AWS/EBS"]["disk"] = [ | |
"VolumeTotalWriteTime", | |
"VolumeTotalReadTime", | |
"VolumeIdleTime", | |
"VolumeQueueLength", | |
"VolumeReadOps", | |
"VolumeWriteOps", | |
"VolumeReadBytes", | |
"VolumeWriteBytes" | |
] | |
#print "SEND_INTERVAL=%s" % collectd.SEND_INTERVAL | |
collectd_host = "localhost" | |
collectd_port = 25826 | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
collectd_addr = (collectd_host, collectd_port) | |
driver = awscli.clidriver.create_clidriver() | |
def pack_numeric(type_code, number): | |
return struct.pack("!HHq", type_code, 12, number) | |
def pack_string(type_code, string): | |
string = bytes(string) | |
return struct.pack("!HH", type_code, 5 + len(string)) + string + "\0" | |
# packed.append(struct.pack("!BQ", VALUE_COUNTER, v)) | |
# packed.append(struct.pack("!Bq", VALUE_DERIVE, v)) | |
# packed.append(struct.pack("!BQ", VALUE_ABSOLUTE, v)) | |
def pack_value(name, value, type=VALUE_GAUGE): | |
packed = [] | |
if isinstance(value, list): | |
packed.append(struct.pack("!HHH", TYPE_VALUES, 6 + 9*len(value), len(value))) | |
for v in value: | |
packed.append(struct.pack("<B", VALUE_GAUGE)) | |
for v in value: | |
packed.append(struct.pack("<d", v)) | |
else: | |
packed.append(struct.pack("!HHH", TYPE_VALUES, 15, 1)) | |
packed.append(struct.pack("<Bd", VALUE_GAUGE, value)) | |
return "".join(packed) | |
def pack(id, value): | |
if isinstance(id, basestring): | |
return pack_value(id, value) | |
elif id in LONG_INT_CODES: | |
return pack_numeric(id, value) | |
elif id in STRING_CODES: | |
return pack_string(id, value) | |
else: | |
raise AssertionError("invalid type code " + str(id)) | |
def message_start(when=None, host=socket.gethostname(), plugin_name="any", plugin_inst="", type_name="", type_inst=""): | |
packed = [] | |
packed.append(pack(TYPE_HOST, host)) | |
packed.append(pack(TYPE_TIME, when or time.time())) | |
packed.append(pack(TYPE_INTERVAL, SEND_INTERVAL)) | |
packed.append(pack(TYPE_PLUGIN, plugin_name)) | |
packed.append(pack(TYPE_PLUGIN_INSTANCE, plugin_inst)) | |
packed.append(pack(TYPE_TYPE, type_name)) | |
packed.append(pack(TYPE_TYPE_INSTANCE, type_inst)) | |
return "".join(packed) | |
def messages(counts, when=None, host=socket.gethostname(), plugin_name="any", plugin_inst="", type_name="", type_inst="", value_type=VALUE_GAUGE): | |
packets = [] | |
start = message_start(when, host, plugin_name, plugin_inst, type_name, type_inst) | |
if len(counts) == 1: | |
parts = [pack(name, count) for name,count in counts.items()] | |
else: | |
parts = [pack_value('values', counts.values(), value_type)] | |
parts = [p for p in parts if len(start) + len(p) <= MAX_PACKET_SIZE] | |
if parts: | |
curr, curr_len = [start], len(start) | |
for part in parts: | |
if curr_len + len(part) > MAX_PACKET_SIZE: | |
packets.append("".join(curr)) | |
curr, curr_len = [start], len(start) | |
curr.append(part) | |
curr_len += len(part) | |
packets.append("".join(curr)) | |
return packets | |
def sanitize(s): | |
return re.sub(r"[^a-zA-Z0-9]+", "_", s).strip("_") | |
def daemonize(func, sleep_for = 0): | |
@wraps(func) | |
def wrapped(): | |
while True: | |
try: | |
func() | |
except: | |
try: | |
logger.error("unexpected error", exc_info = True) | |
except: | |
traceback.print_exc() | |
time.sleep(sleep_for) | |
t = Thread(target = wrapped) | |
t.daemon = True | |
t.start() | |
def send_snapshots(hostname, pname, pinst, data): | |
for datapoint in sorted(data['Datapoints'], key=lambda dp: dp['Timestamp']): | |
stats = {} | |
# "Unit": "Count" | |
for k in datapoint.keys(): | |
if not k in ['Timestamp', 'Unit']: | |
stats[sanitize(k)] = datapoint[k] | |
t = dateutil.parser.parse(datapoint['Timestamp']) | |
#when = time.mktime(t.timetuple()) - time.timezone | |
when = timegm(t.timetuple()) | |
tname = sanitize(data['Label']) | |
value_type = VALUE_GAUGE | |
print >>sys.stderr, 'messages(%s, %s, %s, %s, %s, %s)' % (hostname, pname, pinst, tname, value_type, stats) | |
for message in messages(stats, when, hostname, pname, pinst, tname): | |
#print "%s" % message | |
#sock.sendto(message, conn._collectd_addr) | |
sock.sendto(message, collectd_addr) | |
def get_metric_statistics(namespace, metric_name, start_time, end_time, dimension_name, dimension_value): | |
#start, microsecond = start_time.isoformat().split('.') | |
start = start_time.isoformat().split('.')[0] + 'Z' | |
end = end_time.isoformat().split('.')[0] + 'Z' | |
args = ['cloudwatch', 'get-metric-statistics'] | |
args.extend(['--namespace', namespace]) | |
args.extend(['--metric-name', metric_name]) | |
args.extend(['--start-time', start]) | |
args.extend(['--end-time', end]) | |
args.extend(['--period', '%d' % SEND_INTERVAL]) | |
#args.extend(['--statistics', 'Average', 'Minimum', 'Maximum']) | |
args.extend(['--statistics', 'Average', 'Maximum']) | |
args.extend(['--dimensions', 'Name=%s,Value=%s' % (dimension_name, dimension_value)]) | |
output = StringIO.StringIO() | |
sys.stdout = output | |
driver.main(args) | |
contents = output.getvalue() | |
sys.stdout = sys.__stdout__ | |
#print >>sys.stderr, contents | |
return json.loads(contents) | |
def get_statistics(): | |
now = time.time() | |
delta = now % SEND_INTERVAL | |
end_time = datetime.datetime.utcfromtimestamp(now - delta) | |
start_time = datetime.datetime.utcfromtimestamp(now - delta - SEND_INTERVAL) | |
print >>sys.stderr, 'start_time = %s, end_time = %s' % (start_time, end_time) | |
output = StringIO.StringIO() | |
sys.stdout = output | |
driver.main(['ec2', 'describe-instances']) | |
contents = output.getvalue() | |
j = json.loads(contents) | |
sys.stdout = sys.__stdout__ | |
for r in j['Reservations']: | |
for instance in r['Instances']: | |
if not instance['State']['Name'] in ('running'): | |
continue | |
hostname = instance['InstanceId'] | |
for pname in metrics["AWS/EC2"].keys(): | |
for metric in metrics["AWS/EC2"][pname]: | |
j = get_metric_statistics( | |
"AWS/EC2", | |
metric, start_time, end_time, | |
'InstanceId', instance['InstanceId']) | |
send_snapshots(hostname, pname, "", j) | |
if not instance.has_key('BlockDeviceMappings'): | |
continue | |
for mapping in instance['BlockDeviceMappings']: | |
deviceName = mapping['DeviceName'].replace('/dev/', '', 1) | |
if mapping.has_key('Ebs'): | |
volumeId = mapping['Ebs']['VolumeId'] | |
volumeId = mapping['Ebs']['VolumeId'] | |
for pname in metrics["AWS/EBS"].keys(): | |
for metric in metrics["AWS/EBS"][pname]: | |
j = get_metric_statistics( | |
"AWS/EBS", | |
metric, start_time, end_time, | |
'VolumeId', volumeId) | |
send_snapshots(hostname, pname, deviceName, j) | |
#get_statistics() | |
#sys.exit(0) | |
daemonize(get_statistics, sleep_for = SEND_INTERVAL) | |
while True: | |
print >>sys.stderr, datetime.datetime.now() | |
time.sleep(60) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment