Skip to content

Instantly share code, notes, and snippets.

@blha303
Last active December 3, 2022 21:04
Show Gist options
  • Save blha303/e4c8b7b81489a8d49a7a0057298d8a42 to your computer and use it in GitHub Desktop.
Save blha303/e4c8b7b81489a8d49a7a0057298d8a42 to your computer and use it in GitHub Desktop.
connects to an asterisk manager and picks up ExtensionStatus messages. ties into https://gist.github.com/blha303/37e83f320b009de19e7a6c140f51e8d5
import socket
from requests import get
EXTEN = "1"
presence_api = "http://127.0.0.1:8081"
auth = ("me", "aa")
pbx_ip = "127.0.0.2"
def process_event(resp):
d = {}
for line in resp.strip().split("\r\n"):
try:
name,data = line.split(": ", 1)
except ValueError:
if line.strip(): print(resp)
return None
d[name] = data
return d
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((pbx_ip,5038))
s.sendall("\r\nAction: login\r\nUsername: {0}\r\nSecret: {1}\r\n\r\n".format(*auth).encode("utf-8"))
while True:
data = s.recv(1024).decode("utf-8")
for resp in data.split("\r\n\r\n"):
resp = process_event(resp)
if not resp:
continue
if "Error" in resp:
print(resp)
if "Event" in resp and resp["Event"] == "ExtensionStatus" and resp["Exten"] == EXTEN:
print(resp)
if resp["Status"] == "1":
get(presence_api + "/_calls")
elif resp["Status"] == "0":
get(presence_api + "/_something")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment