Last active
December 3, 2022 21:04
-
-
Save blha303/e4c8b7b81489a8d49a7a0057298d8a42 to your computer and use it in GitHub Desktop.
connects to an asterisk manager and picks up ExtensionStatus messages. ties into https://gist.github.com/blha303/37e83f320b009de19e7a6c140f51e8d5
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
from requests import get | |
EXTEN = "1" | |
presence_api = "http://127.0.0.1:8081" | |
auth = ("me", "aa") | |
pbx_ip = "127.0.0.2" | |
def process_event(resp): | |
d = {} | |
for line in resp.strip().split("\r\n"): | |
try: | |
name,data = line.split(": ", 1) | |
except ValueError: | |
if line.strip(): print(resp) | |
return None | |
d[name] = data | |
return d | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.connect((pbx_ip,5038)) | |
s.sendall("\r\nAction: login\r\nUsername: {0}\r\nSecret: {1}\r\n\r\n".format(*auth).encode("utf-8")) | |
while True: | |
data = s.recv(1024).decode("utf-8") | |
for resp in data.split("\r\n\r\n"): | |
resp = process_event(resp) | |
if not resp: | |
continue | |
if "Error" in resp: | |
print(resp) | |
if "Event" in resp and resp["Event"] == "ExtensionStatus" and resp["Exten"] == EXTEN: | |
print(resp) | |
if resp["Status"] == "1": | |
get(presence_api + "/_calls") | |
elif resp["Status"] == "0": | |
get(presence_api + "/_something") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment