Skip to content

Instantly share code, notes, and snippets.

@NaPs
Created May 29, 2009 18:42
Show Gist options
  • Select an option

  • Save NaPs/120132 to your computer and use it in GitHub Desktop.

Select an option

Save NaPs/120132 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#coding=utf8
import sys
import time
import re
from subprocess import Popen, PIPE
from arprequest import ArpRequest
import arprequest
PASSWORD = "1"
class Monit(object):
''' Monitoring du serveur '''
def __init__(self):
self.environ = {} # Variables d'environnement AGI
# Initialisation du script
# (récupération des variables d'environnement d'AGI)
line = sys.stdin.readline()
while line != '\n':
if line.startswith('agi_'):
key, value = line.strip().split(':')
self.environ[key] = value
line = sys.stdin.readline()
# Services à tester
# Le premier élément est le nom du fichier qui prononce le nom, le second est
# la méthode à appeler pour tester le fonctionnement du service, et les trois
# derniers sont les commandes à exécuter pour respectivement démarrer, redémarrer,
# et arréter les services
self.services = (
('httpd', self.service_http, '/etc/init.d/lighttpd start', '/etc/init.d/lighttpd restart', '/etc/init.d/lighttpd stop'),
('ftpd', self.service_ftp, '/etc/init.d/pure-ftpd start', '/etc/init.d/pure-ftpd restart', '/etc/init.d/pure-ftpd stop'),
('mysql', self.service_mysql, '/etc/init.d/mysql start', '/etc/init.d/mysql restart', '/etc/init.d/mysql stop'),
)
# On lance le menu d'accueil
self.menu_accueil()
def print_debug(self, message):
sys.stderr.write(message + '\n')
sys.stderr.flush()
def exec_command(self, command):
'''Exécuter une commande agi et retourner son résultat sous forme
de dictionnaire. '''
sys.stdout.write('%s\n' % command) # Écrire la commande sur la sortie standard
sys.stdout.flush()
self.print_debug('<<<<<<<<<<< %s' % command)
result = sys.stdin.readline() # Lire le résultat sur l'entrée standard
self.print_debug('>>>>>>>>>>> %s'%result)
return dict([x.split('=') for x in result.strip().split()][1:])
def say_file(self, filename, keys=''):
''' Lire un fichier audio. `filename` est le nom du fichier sans extension
et keys les touches qui peuvent arreter sa lecture. Retourne le caractère
appuyé par l'utilisateur, ou None si il n'a rien appuyé. '''
# Envois de la commande
input = self.exec_command('STREAM FILE /usr/share/asterisk/sounds/custom/%s "%s"' % (filename, keys))
if input['result'] == '0':
return None
else:
return chr(int(input['result']))
def say_group(self, keys='', *says):
''' Prend en arguments tous les noms de fichiers audios à jouer ainsi
que les touches qui permetterons de stopper le fichier. La méthode
retourne la touche pressée ou bien None si aucune ne l'est. '''
for say in says:
value = self.say_file(say, keys)
if value is not None:
return value
return None
def demander_mdp(self):
''' Menu de demande du mot de passe. Fonction blocante tant que
le mot de passe correct n'a pas été entré. '''
entered = '' # Ce que l'utilisateur à saisit jusque là
while entered != PASSWORD: # On tourne tant que le mdp est incorrect
self.say_file('mot_de_passe', keys='#')
self.say_file('mdp_instructions', keys='#')
key = None
while key != '#': # On tourne tant que l'utilisateur ne valide pas avec #
key = self.wait_for_digit()
if key in ('1', '2', '3', '4', '5', '6', '7', '8', '9', '0'):
entered += key
elif key == '*': # La touche "*" permet de remettre à zéro l'entrée du mdp
entered = ''
self.say_file('mdp_raz')
if entered != PASSWORD: # Mdp incorrect
self.say_file('mdp_incorrect')
entered = ''
def wait_for_digit(self, timeout=-1):
''' Attente l'appuis d'une touche par l'utilisateur. Retourne None si
aucune touche n'a été enfoncée durant le temps impartit ou la touche. '''
input = self.exec_command('WAIT FOR DIGIT %s' % timeout)
if input['result'] == '0':
return None
else:
return chr(int(input['result']))
def menu_accueil(self):
''' Menu d'accueil de l'IVR. '''
self.say_file('bienvenue', keys='#') # On dit bonjour
self.demander_mdp() # Et on demande le mot de passe
self.menu_principal() # Si celui-ci est correctement entré, on passe à la suite
def menu_principal(self):
''' Menu principal '''
while True: # On tourne en boucle, tant que l'utilisateur ne raccroche pas.
a_dire = [] # Liste des fichiers voix à faire écouter à l'utilisateur
a_dire.append('menu_principal')
if ArpRequest('192.168.208.46', 'eth0', arp_type=arprequest.ARP_STANDARD).request():
# Charge du serveur :
a_dire.append('le_serveur_charge')
entier, deci = self.exec_on_monitored_server('cat /proc/loadavg').split()[0].split('.')
a_dire.append(str(int(entier)))
a_dire.append('virgule')
if deci[0] == '0':
a_dire.append('0')
a_dire.append(str(int(deci)))
else:
a_dire.append(str(int(deci)))
# Utilisation de la ram :
meminfo = self.exec_on_monitored_server('cat /proc/meminfo').strip().split('\n')
memmap = {}
for memitem in meminfo:
memitem = memitem.strip().split()
memmap[memitem[0][:-1]] = int(memitem[1])
used, free = (memmap['MemTotal'] - memmap['MemFree']) / 1024 , (memmap['MemTotal']) / 1024
a_dire.append('il_utilise')
if used > 199:
a_dire.append(str(used/100))
if used > 99:
a_dire.append('100')
a_dire.append(str(used-(used/100*100)))
a_dire.append('mega_de_ram_sur')
if free > 199:
a_dire.append(str(free/100))
if free > 99:
a_dire.append('100')
a_dire.append(str(free-(free/100*100)))
a_dire.append('dispo')
# Utilisation du disque :
a_dire.append('il_reste')
freedisk = self.exec_on_monitored_server('df | grep /dev/sda1').strip().split()[4]
a_dire.append(freedisk[:-1])
a_dire.append('pourcent_libre')
# Utilisateurs connectés :
w = len(self.exec_on_monitored_server('w -hs').strip().split('\n'))
a_dire.append(w)
a_dire.append('utilisateurs_connectes')
# Message des différents menus :
a_dire.append('tapez')
else:
a_dire.append('serveur_arrete')
input = self.say_group('123', *a_dire)
# Si l'utilisateur n'enfonce aucune touche pendant les instructions,
# on attend sa réponse quand même...
if input is None or input.strip() == '':
key = self.wait_for_digit()
else:
key = input
# Renvois vers les sous-menus :
if key == '1':
self.menu_services()
elif key == '2':
self.menu_actions()
elif key == '3':
self.menu_gobelet()
def menu_services(self):
''' Menu de tous les services. '''
key = None
while key != '0': # On boucle tant que l'utilisateur ne sort pas du menu avec "0"
a_dire = [] # Liste des fichiers voix à faire écouter à l'utilisateur
a_dire.append('menu_services')
for i, service in enumerate(self.services): # On boucle sur les services
status = service[1]() # Exécution de la fonction de test du fonctionnement du service
a_dire.append('le_service_numero')
a_dire.append(str(i+1)) # +1 car enumerate part de 0
a_dire.append(service[0]) # On ajoute le fichier audio qui prononce ce service
# On dit si il est démarré ou non
if status:
a_dire.append('est_demarre')
else:
a_dire.append('est_arrete')
a_dire.append('choix_services') # On annonce les choix possibles
input = self.say_group(''.join([str(x) for x in range(0, i+2)]), *a_dire)
if input is None:
key = self.wait_for_digit()
else:
key = input
if key in [str(x) for x in range(1, i+2)]: # La touche pressée est un numéro de service
self.menu_service(int(key) - 1)
def menu_service(self, numero_service):
''' Menu de contrôle _d'un_ service '''
key = None
while key != '0': # On boucle tant que l'utilisateur ne sort pas du menu avec "0"
# On récupère le tuple du service :
service = self.services[numero_service]
status = service[1]() # Exécution de la méthode de test de fonctionnement du service
a_dire = [] # Liste des fichiers voix à faire écouter à l'utilisateur
a_dire.append('vous_avez_select')
a_dire.append(service[0])
a_dire.append('il_est_actuellement')
if status: # Si le service est démarré on propose de le redémarrer ou de l'arréter
a_dire.append('demarre')
a_dire.append('tapez_2_pour_redemarrer')
a_dire.append('tapez_3_pour_arreter')
else: # Si le service est arreté on propose de le démarrer ou de le redémarrer
a_dire.append('arrete')
a_dire.append('tapez_1_pour_demarrer')
a_dire.append('tapez_2_pour_redemarrer')
# Instruction de sortie du menu :
a_dire.append('tapez_0')
input = self.say_group('0123', *a_dire)
if input is None:
key = self.wait_for_digit()
else:
key = input
if key == '1':
self.exec_on_monitored_server(service[2]) # Démarrer le service
self.say_file('service_demarre')
elif key == '2':
self.exec_on_monitored_server(service[3]) # Redémarrer le service
self.say_file('service_redemarre')
elif key == '3':
self.exec_on_monitored_server(service[4]) # Arreter le service
self.say_file('service_arrete')
def menu_actions(self):
''' Menu de gestion du serveur. '''
key = None
while key != '0': # On tourne en boucle, tant que l'utilisateur n'appuis pas sur "0"
input = self.say_file('actions_possibles', keys='0123')
# Si l'utilisateur n'enfonce aucune touche pendant les instructions,
# on attend sa réponse quand même...
if input is None or input.strip() == '':
key = self.wait_for_digit()
else:
key = input
# Renvois vers les sous-menus :
if key == '1':
self.say_file('wol')
elif key == '2':
self.exec_on_monitored_server('halt')
self.say_file('arret')
elif key == '3':
self.exec_on_monitored_server('reboot')
self.say_file('reboot')
def menu_gobelet(self):
''' Menu d'ouverture/fermeture du porte gobelet© intégré au serveur. '''
a_dire = ['porte_gobelet', 'menu_princi']
key = None
while key != '0': # On tourne en boucle, tant que l'utilisateur n'appuis pas sur "0"
input = self.say_group('012', *a_dire)
# Si l'utilisateur n'enfonce aucune touche pendant les instructions,
# on attend sa réponse quand même...
if input is None or input.strip() == '':
key = self.wait_for_digit()
else:
key = input
# Renvois vers les sous-menus :
if key == '1':
self.exec_on_monitored_server('eject')
elif key == '2':
self.exec_on_monitored_server('eject -t')
def service_http(self):
sortie = self.exec_on_monitored_server('pgrep lighttpd > /dev/null && echo $?')
if sortie.strip() == '0':
return True
else:
return False
def service_ftp(self):
sortie = self.exec_on_monitored_server('pgrep pure-ftpd > /dev/null && echo $?')
if sortie.strip() == '0':
return True
else:
return False
def service_mysql(self):
sortie = self.exec_on_monitored_server('pgrep mysqld > /dev/null && echo $?')
if sortie.strip() == '0':
return True
else:
return False
def exec_on_monitored_server(self, command):
return Popen('ssh [email protected] %s' % command, shell=True, stdout=PIPE).communicate()[0]
if __name__ == '__main__':
monit = Monit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment