-
-
Save Alireza2n/330fd4c3bde9f38597b132c00b885501 to your computer and use it in GitHub Desktop.
Checks for Availablity of a PJSIP endpoint to use with Nagios or similar software, requires python3. Based on portions of jfinstrom/asterisk.py but rewritten using asterisk command.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/home/user/check_peers/venv/bin/python | |
""" | |
Based on portions of jfinstrom/asterisk.py but rewritten using asterisk command, | |
checks for Availablity of a PJSIP endpoint to use with Nagios or similar software, requires python3. | |
Usages: ./check_pjsip_endpoint.py <ext> (e.g. ./check_pjsip_endpoint.py 200) | |
Output: Endpoint 200 is Available at 200/sip:[email protected]:5060;transport=ud, is not in use and RTT is 26.426. | |
""" | |
import subprocess | |
import sys, re | |
def make_dict(data): | |
ret = {} | |
for line in data.split("\n"): | |
parts = re.findall("^(.*?): (.*)", line, re.IGNORECASE) | |
if parts: | |
if len(parts[0]) == 2: | |
ret[parts[0][0].strip()] = parts[0][1].strip() | |
return ret | |
ext = int(sys.argv[1]) | |
cmd = f"/usr/sbin/asterisk -x 'pjsip show endpoint {ext}'" | |
output = subprocess.check_output(cmd, shell=True) | |
resp = make_dict(output.decode()) | |
try: | |
contact = ' '.join(resp['Contact'].split()) | |
endpoint = ' '.join(resp['Endpoint'].split()) | |
contact_list = contact.split(' ') | |
endpoint_list = endpoint.split(' ') | |
if 'Unavail' in contact or 'Unavailable' in endpoint: | |
state = 'Unavailable' | |
elif 'Avail' in contact: | |
state = 'Available' | |
else: | |
state = 'Unknown' | |
rtt = contact_list[-1] | |
uri = contact_list[0] | |
if 'Not in use' in endpoint: | |
call_state = 'not in use' | |
elif 'In use' in endpoint: | |
call_state = 'in use' | |
else: | |
call_state = 'unknown' | |
if state == 'Available': | |
print(f"Endpoint {ext} is {state} at {uri}, is {call_state} and RTT is {rtt}.") | |
sys.exit(0) | |
elif state == 'Unavailable': | |
print(f"Endpoint {ext} is {state}.") | |
sys.exit(2) | |
else: | |
print(f"State of endpoint {ext} could not determined.") | |
sys.exit(3) | |
except Exception as e: | |
print(f"Failed to parse results. {e}.") | |
sys.exit(3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment