Skip to content

Instantly share code, notes, and snippets.

@komeda-shinji
Created April 18, 2018 14:03
Show Gist options
  • Save komeda-shinji/13855ddb0140a918dff8543707dce4ee to your computer and use it in GitHub Desktop.
Save komeda-shinji/13855ddb0140a918dff8543707dce4ee to your computer and use it in GitHub Desktop.
#!/usr//bin/env python
#
# This code is based collectd-network-client-python, forked from https://github.com/appliedsec/collectd
#
import sys
import awscli.clidriver
import StringIO
import json
import datetime
import dateutil.parser
import re
import time
from calendar import timegm
import socket
import struct
import logging
import traceback
from functools import wraps
from threading import Thread
logger = logging.getLogger("cloudwatch")
#logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
#logging.basicConfig(stream=sys.stderr, level=logging.WARNING)
#from collectd import PLUGIN_TYPE, \
# TYPE_HOST, TYPE_TIME, TYPE_PLUGIN, TYPE_PLUGIN_INSTANCE, \
# TYPE_TYPE, TYPE_TYPE_INSTANCE, TYPE_VALUES, TYPE_INTERVAL, \
# LONG_INT_CODES, STRING_CODES, \
# VALUE_COUNTER, VALUE_GAUGE, VALUE_DERIVE, VALUE_ABSOLUTE, \
# SEND_INTERVAL, MAX_PACKET_SIZE
# pack_numeric, pack_string, pack_value, pack, \
# message_start, messages, sanitize, daemonize
SEND_INTERVAL = 300 # seconds
MAX_PACKET_SIZE = 1024 # bytes
TYPE_HOST = 0x0000
TYPE_TIME = 0x0001
TYPE_PLUGIN = 0x0002
TYPE_PLUGIN_INSTANCE = 0x0003
TYPE_TYPE = 0x0004
TYPE_TYPE_INSTANCE = 0x0005
TYPE_VALUES = 0x0006
TYPE_INTERVAL = 0x0007
LONG_INT_CODES = [TYPE_TIME, TYPE_INTERVAL]
STRING_CODES = [TYPE_HOST, TYPE_PLUGIN, TYPE_PLUGIN_INSTANCE, TYPE_TYPE, TYPE_TYPE_INSTANCE]
VALUE_COUNTER = 0
VALUE_GAUGE = 1
VALUE_DERIVE = 2
VALUE_ABSOLUTE = 3
VALUE_CODES = {
VALUE_COUNTER: "!Q",
VALUE_GAUGE: "<d",
VALUE_DERIVE: "!q",
VALUE_ABSOLUTE: "!Q"
}
metrics = {}
metrics["AWS/EC2"] = {}
metrics["AWS/EC2"]["status"] = [
"StatusCheckFailed",
"StatusCheckFailed_System",
"StatusCheckFailed_Instance"
]
metrics["AWS/EC2"]["cpu"] = [
"CPUUtilization",
"CPUCreditBalance",
"CPUCreditUsage"
]
metrics["AWS/EC2"]["disk"] = [
"DiskReadBytes",
"DiskWriteBytes",
"DiskReadOps",
"DiskWriteOps"
]
metrics["AWS/EC2"]["network"] = [
"NetworkIn"
"NetworkOut"
]
metrics["AWS/EBS"] = {}
metrics["AWS/EBS"]["disk"] = [
"VolumeTotalWriteTime",
"VolumeTotalReadTime",
"VolumeIdleTime",
"VolumeQueueLength",
"VolumeReadOps",
"VolumeWriteOps",
"VolumeReadBytes",
"VolumeWriteBytes"
]
#print "SEND_INTERVAL=%s" % collectd.SEND_INTERVAL
collectd_host = "localhost"
collectd_port = 25826
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
collectd_addr = (collectd_host, collectd_port)
driver = awscli.clidriver.create_clidriver()
def pack_numeric(type_code, number):
return struct.pack("!HHq", type_code, 12, number)
def pack_string(type_code, string):
string = bytes(string)
return struct.pack("!HH", type_code, 5 + len(string)) + string + "\0"
# packed.append(struct.pack("!BQ", VALUE_COUNTER, v))
# packed.append(struct.pack("!Bq", VALUE_DERIVE, v))
# packed.append(struct.pack("!BQ", VALUE_ABSOLUTE, v))
def pack_value(name, value, type=VALUE_GAUGE):
packed = []
if isinstance(value, list):
packed.append(struct.pack("!HHH", TYPE_VALUES, 6 + 9*len(value), len(value)))
for v in value:
packed.append(struct.pack("<B", VALUE_GAUGE))
for v in value:
packed.append(struct.pack("<d", v))
else:
packed.append(struct.pack("!HHH", TYPE_VALUES, 15, 1))
packed.append(struct.pack("<Bd", VALUE_GAUGE, value))
return "".join(packed)
def pack(id, value):
if isinstance(id, basestring):
return pack_value(id, value)
elif id in LONG_INT_CODES:
return pack_numeric(id, value)
elif id in STRING_CODES:
return pack_string(id, value)
else:
raise AssertionError("invalid type code " + str(id))
def message_start(when=None, host=socket.gethostname(), plugin_name="any", plugin_inst="", type_name="", type_inst=""):
packed = []
packed.append(pack(TYPE_HOST, host))
packed.append(pack(TYPE_TIME, when or time.time()))
packed.append(pack(TYPE_INTERVAL, SEND_INTERVAL))
packed.append(pack(TYPE_PLUGIN, plugin_name))
packed.append(pack(TYPE_PLUGIN_INSTANCE, plugin_inst))
packed.append(pack(TYPE_TYPE, type_name))
packed.append(pack(TYPE_TYPE_INSTANCE, type_inst))
return "".join(packed)
def messages(counts, when=None, host=socket.gethostname(), plugin_name="any", plugin_inst="", type_name="", type_inst="", value_type=VALUE_GAUGE):
packets = []
start = message_start(when, host, plugin_name, plugin_inst, type_name, type_inst)
if len(counts) == 1:
parts = [pack(name, count) for name,count in counts.items()]
else:
parts = [pack_value('values', counts.values(), value_type)]
parts = [p for p in parts if len(start) + len(p) <= MAX_PACKET_SIZE]
if parts:
curr, curr_len = [start], len(start)
for part in parts:
if curr_len + len(part) > MAX_PACKET_SIZE:
packets.append("".join(curr))
curr, curr_len = [start], len(start)
curr.append(part)
curr_len += len(part)
packets.append("".join(curr))
return packets
def sanitize(s):
return re.sub(r"[^a-zA-Z0-9]+", "_", s).strip("_")
def daemonize(func, sleep_for = 0):
@wraps(func)
def wrapped():
while True:
try:
func()
except:
try:
logger.error("unexpected error", exc_info = True)
except:
traceback.print_exc()
time.sleep(sleep_for)
t = Thread(target = wrapped)
t.daemon = True
t.start()
def send_snapshots(hostname, pname, pinst, data):
for datapoint in sorted(data['Datapoints'], key=lambda dp: dp['Timestamp']):
stats = {}
# "Unit": "Count"
for k in datapoint.keys():
if not k in ['Timestamp', 'Unit']:
stats[sanitize(k)] = datapoint[k]
t = dateutil.parser.parse(datapoint['Timestamp'])
#when = time.mktime(t.timetuple()) - time.timezone
when = timegm(t.timetuple())
tname = sanitize(data['Label'])
value_type = VALUE_GAUGE
print >>sys.stderr, 'messages(%s, %s, %s, %s, %s, %s)' % (hostname, pname, pinst, tname, value_type, stats)
for message in messages(stats, when, hostname, pname, pinst, tname):
#print "%s" % message
#sock.sendto(message, conn._collectd_addr)
sock.sendto(message, collectd_addr)
def get_metric_statistics(namespace, metric_name, start_time, end_time, dimension_name, dimension_value):
#start, microsecond = start_time.isoformat().split('.')
start = start_time.isoformat().split('.')[0] + 'Z'
end = end_time.isoformat().split('.')[0] + 'Z'
args = ['cloudwatch', 'get-metric-statistics']
args.extend(['--namespace', namespace])
args.extend(['--metric-name', metric_name])
args.extend(['--start-time', start])
args.extend(['--end-time', end])
args.extend(['--period', '%d' % SEND_INTERVAL])
#args.extend(['--statistics', 'Average', 'Minimum', 'Maximum'])
args.extend(['--statistics', 'Average', 'Maximum'])
args.extend(['--dimensions', 'Name=%s,Value=%s' % (dimension_name, dimension_value)])
output = StringIO.StringIO()
sys.stdout = output
driver.main(args)
contents = output.getvalue()
sys.stdout = sys.__stdout__
#print >>sys.stderr, contents
return json.loads(contents)
def get_statistics():
now = time.time()
delta = now % SEND_INTERVAL
end_time = datetime.datetime.utcfromtimestamp(now - delta)
start_time = datetime.datetime.utcfromtimestamp(now - delta - SEND_INTERVAL)
print >>sys.stderr, 'start_time = %s, end_time = %s' % (start_time, end_time)
output = StringIO.StringIO()
sys.stdout = output
driver.main(['ec2', 'describe-instances'])
contents = output.getvalue()
j = json.loads(contents)
sys.stdout = sys.__stdout__
for r in j['Reservations']:
for instance in r['Instances']:
if not instance['State']['Name'] in ('running'):
continue
hostname = instance['InstanceId']
for pname in metrics["AWS/EC2"].keys():
for metric in metrics["AWS/EC2"][pname]:
j = get_metric_statistics(
"AWS/EC2",
metric, start_time, end_time,
'InstanceId', instance['InstanceId'])
send_snapshots(hostname, pname, "", j)
if not instance.has_key('BlockDeviceMappings'):
continue
for mapping in instance['BlockDeviceMappings']:
deviceName = mapping['DeviceName'].replace('/dev/', '', 1)
if mapping.has_key('Ebs'):
volumeId = mapping['Ebs']['VolumeId']
volumeId = mapping['Ebs']['VolumeId']
for pname in metrics["AWS/EBS"].keys():
for metric in metrics["AWS/EBS"][pname]:
j = get_metric_statistics(
"AWS/EBS",
metric, start_time, end_time,
'VolumeId', volumeId)
send_snapshots(hostname, pname, deviceName, j)
#get_statistics()
#sys.exit(0)
daemonize(get_statistics, sleep_for = SEND_INTERVAL)
while True:
print >>sys.stderr, datetime.datetime.now()
time.sleep(60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment