Created
July 19, 2012 08:46
-
-
Save 1900/3141649 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python -u | |
import os | |
import re | |
import sys | |
import time | |
import statvfs | |
import subprocess | |
import MySQLdb | |
# globa re | |
re_meminfo_parser = re.compile(r'^(?P<key>\S*):\s*(?P<value>\d*)\s*kB') | |
# | |
class OSstatus: | |
""" | |
result = report client status. | |
""" | |
def __init__(self, sleep=2): | |
"""Constructor | |
""" | |
self.sleep=sleep | |
def _get_mem_usage(self): | |
"""get mem used by percent | |
self.result = falot | |
""" | |
result={} | |
try: | |
fd=open('/proc/meminfo', 'r') | |
lines=fd.readlines() | |
finally: | |
if fd: | |
fd.close() | |
for line in lines: | |
match=re_meminfo_parser.match(line) | |
if not match: | |
continue # skip lines that don't parse | |
key, value=match.groups(['key', 'value']) | |
result[key]=int(value) | |
#print "mem :", 100*(result["MemTotal"]-result["MemFree"])/result["MemTotal"] | |
return 100.0*(result["MemTotal"]-result["MemFree"])/result["MemTotal"] | |
def get_mem_usage(self): | |
"""safe to call _get_memused() | |
self.result = falot | |
""" | |
try: | |
return self._get_mem_usage() | |
except Exception, e: | |
print "_get_mem_usage(self) Exception, %s"%e | |
return 0 | |
def get_5m_load(self): | |
"""get 5 mines avg load | |
self.result = float | |
""" | |
try: | |
return (os.getloadavg())[2] | |
except Exception, e: | |
print "_get_5m_load(self) Exception, %s"%e | |
return 0 | |
def _read_cpu_usage(self): | |
"""Read the current system cpu usage from /proc/stat.""" | |
try: | |
fd = open("/proc/stat", 'r') | |
lines = fd.readlines() | |
finally: | |
if fd: | |
fd.close() | |
for line in lines: | |
l = line.split() | |
if len(l) < 5: | |
continue | |
if l[0].startswith('cpu'): | |
return l | |
return {} | |
def get_cpu_usage(self): | |
"""get cpu avg used by percent | |
""" | |
cpustr=self._read_cpu_usage() | |
if not cpustr: | |
return 0 | |
#cpu usage=[(user_2 +sys_2+nice_2) - (user_1 + sys_1+nice_1)]/(total_2 - total_1)*100 | |
usni1=long(cpustr[1])+long(cpustr[2])+long(cpustr[3])+long(cpustr[5])+long(cpustr[6])+long(cpustr[7])+long(cpustr[4]) | |
usn1=long(cpustr[1])+long(cpustr[2])+long(cpustr[3] | |
time.sleep(self.sleep) | |
cpustr=self._read_cpu_usage() | |
if not cpustr: | |
return 0 | |
usni2=long(cpustr[1])+long(cpustr[2])+float(cpustr[3])+long(cpustr[5])+long(cpustr[6])+long(cpustr[7])+long(cpustr[4]) | |
usn2=long(cpustr[1])+long(cpustr[2])+long(cpustr[3]) | |
cpuper=(usn2-usn1)/(usni2-usni1) | |
return 100*cpuper | |
def get_os_info(self): | |
"""overide all functions. | |
""" | |
#return {"cpu": "%s"%round(float(self.get_cpu_usage()), 2),\ | |
# "mem": "%s"%round(float(self.get_mem_usage()), 2),\ | |
# "load": "%s"%round(float(self.get_5m_load()), 2),\ | |
# } | |
print "%s"%round(float(self.get_cpu_usage()), 2) | |
print "%s"%round(float(self.get_mem_usage()), 2) | |
print "%s"%round(float(self.get_5m_load()), 2) | |
ping_response = subprocess.Popen(["/bin/ping", "-c1", "-w100", "192.168.0.10"], stdout=subprocess.PIPE).stdout.read() | |
conn_output = subprocess.Popen(["netstat -na|grep ESTABLISHED|awk '{print $5}'|awk -F: '{print $1}'|sort|uniq -c|sort -r|wc -l"],stdout=subprocess.PIPE,shell=True).std | |
out.read() | |
conn = MySQLdb.connect(host='localhost',port=3306,user='root',passwd='root') | |
cursor=conn.cursor() | |
###select db | |
conn.select_db('os') | |
sql = "insert into os(cpu,mem,loadavg,ping,conn,datetime) values(%s,%s,%s,%s,%s,%s)" | |
param = ( "%s"%round(float(self.get_cpu_usage()), 2),"%s"%round(float(self.get_mem_usage()), 2),"%s"%round(float(self.get_5m_load()), 2),ping_response[115:125],conn_outp | |
ut,time.strftime('%Y-%m-%d',time.localtime(time.time()))) | |
n = cursor.execute(sql,param) | |
print n | |
cursor.close() | |
#print ping_response | |
import unittest | |
class clientTestCase(unittest.TestCase): | |
def setUp(self): | |
pass | |
def tearDown(self): | |
pass | |
def test_cpu(self): | |
""" | |
cpu | |
""" | |
osinfo=OSstatus(2) | |
self.assertEqual(type(osinfo.get_cpu_usage()), float) | |
return | |
def test_mem(self): | |
""" | |
mem | |
""" | |
osinfo=OSstatus(2) | |
self.assertEqual(type(osinfo.get_mem_usage()), float) | |
return | |
def test_load(self): | |
""" | |
load | |
""" | |
osinfo=OSstatus(2) | |
self.assertEqual(type(osinfo.get_5m_load()), float) | |
return | |
def test_all(self): | |
""" | |
load | |
""" | |
osinfo=OSstatus() | |
self.assertEqual(type(osinfo.get_os_info()), dict) | |
return | |
if __name__=='__test__': | |
unittest.main() | |
elif __name__=='__main__': | |
print "-"*20 | |
print OSstatus(2).get_os_info() | |
print "-"*20 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment