Skip to content

Instantly share code, notes, and snippets.

@1900
Created July 19, 2012 08:46
Show Gist options
  • Save 1900/3141649 to your computer and use it in GitHub Desktop.
Save 1900/3141649 to your computer and use it in GitHub Desktop.
#!/usr/bin/python -u
import os
import re
import sys
import time
import statvfs
import subprocess
import MySQLdb
# globa re
re_meminfo_parser = re.compile(r'^(?P<key>\S*):\s*(?P<value>\d*)\s*kB')
#
class OSstatus:
"""
result = report client status.
"""
def __init__(self, sleep=2):
"""Constructor
"""
self.sleep=sleep
def _get_mem_usage(self):
"""get mem used by percent
self.result = falot
"""
result={}
try:
fd=open('/proc/meminfo', 'r')
lines=fd.readlines()
finally:
if fd:
fd.close()
for line in lines:
match=re_meminfo_parser.match(line)
if not match:
continue # skip lines that don't parse
key, value=match.groups(['key', 'value'])
result[key]=int(value)
#print "mem :", 100*(result["MemTotal"]-result["MemFree"])/result["MemTotal"]
return 100.0*(result["MemTotal"]-result["MemFree"])/result["MemTotal"]
def get_mem_usage(self):
"""safe to call _get_memused()
self.result = falot
"""
try:
return self._get_mem_usage()
except Exception, e:
print "_get_mem_usage(self) Exception, %s"%e
return 0
def get_5m_load(self):
"""get 5 mines avg load
self.result = float
"""
try:
return (os.getloadavg())[2]
except Exception, e:
print "_get_5m_load(self) Exception, %s"%e
return 0
def _read_cpu_usage(self):
"""Read the current system cpu usage from /proc/stat."""
try:
fd = open("/proc/stat", 'r')
lines = fd.readlines()
finally:
if fd:
fd.close()
for line in lines:
l = line.split()
if len(l) < 5:
continue
if l[0].startswith('cpu'):
return l
return {}
def get_cpu_usage(self):
"""get cpu avg used by percent
"""
cpustr=self._read_cpu_usage()
if not cpustr:
return 0
#cpu usage=[(user_2 +sys_2+nice_2) - (user_1 + sys_1+nice_1)]/(total_2 - total_1)*100
usni1=long(cpustr[1])+long(cpustr[2])+long(cpustr[3])+long(cpustr[5])+long(cpustr[6])+long(cpustr[7])+long(cpustr[4])
usn1=long(cpustr[1])+long(cpustr[2])+long(cpustr[3]
time.sleep(self.sleep)
cpustr=self._read_cpu_usage()
if not cpustr:
return 0
usni2=long(cpustr[1])+long(cpustr[2])+float(cpustr[3])+long(cpustr[5])+long(cpustr[6])+long(cpustr[7])+long(cpustr[4])
usn2=long(cpustr[1])+long(cpustr[2])+long(cpustr[3])
cpuper=(usn2-usn1)/(usni2-usni1)
return 100*cpuper
def get_os_info(self):
"""overide all functions.
"""
#return {"cpu": "%s"%round(float(self.get_cpu_usage()), 2),\
# "mem": "%s"%round(float(self.get_mem_usage()), 2),\
# "load": "%s"%round(float(self.get_5m_load()), 2),\
# }
print "%s"%round(float(self.get_cpu_usage()), 2)
print "%s"%round(float(self.get_mem_usage()), 2)
print "%s"%round(float(self.get_5m_load()), 2)
ping_response = subprocess.Popen(["/bin/ping", "-c1", "-w100", "192.168.0.10"], stdout=subprocess.PIPE).stdout.read()
conn_output = subprocess.Popen(["netstat -na|grep ESTABLISHED|awk '{print $5}'|awk -F: '{print $1}'|sort|uniq -c|sort -r|wc -l"],stdout=subprocess.PIPE,shell=True).std
out.read()
conn = MySQLdb.connect(host='localhost',port=3306,user='root',passwd='root')
cursor=conn.cursor()
###select db
conn.select_db('os')
sql = "insert into os(cpu,mem,loadavg,ping,conn,datetime) values(%s,%s,%s,%s,%s,%s)"
param = ( "%s"%round(float(self.get_cpu_usage()), 2),"%s"%round(float(self.get_mem_usage()), 2),"%s"%round(float(self.get_5m_load()), 2),ping_response[115:125],conn_outp
ut,time.strftime('%Y-%m-%d',time.localtime(time.time())))
n = cursor.execute(sql,param)
print n
cursor.close()
#print ping_response
import unittest
class clientTestCase(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_cpu(self):
"""
cpu
"""
osinfo=OSstatus(2)
self.assertEqual(type(osinfo.get_cpu_usage()), float)
return
def test_mem(self):
"""
mem
"""
osinfo=OSstatus(2)
self.assertEqual(type(osinfo.get_mem_usage()), float)
return
def test_load(self):
"""
load
"""
osinfo=OSstatus(2)
self.assertEqual(type(osinfo.get_5m_load()), float)
return
def test_all(self):
"""
load
"""
osinfo=OSstatus()
self.assertEqual(type(osinfo.get_os_info()), dict)
return
if __name__=='__test__':
unittest.main()
elif __name__=='__main__':
print "-"*20
print OSstatus(2).get_os_info()
print "-"*20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment