Skip to content

Instantly share code, notes, and snippets.

@revmischa
Created January 15, 2019 08:03
Show Gist options
  • Save revmischa/b87ac85a426ebccf35e4f2830d93c613 to your computer and use it in GitHub Desktop.
Save revmischa/b87ac85a426ebccf35e4f2830d93c613 to your computer and use it in GitHub Desktop.
Twilio call handler lambda
from twilio.twiml.voice_response import VoiceResponse, Dial
from twilio.rest import Client
from urllib.parse import parse_qsl
import requests
import json
from pprint import pprint
import boto3
session = boto3.session.Session()
secrets = session.client(service_name='secretsmanager')
get_secret_value_response = secrets.get_secret_value(SecretId='twilio')
twil_config = json.loads(get_secret_value_response['SecretString'])
TWILIO_ACCT_SID = twil_config['TWILIO_ACCT_SID']
TWILIO_AUTH_TOKEN = twil_config['TWILIO_AUTH_TOKEN']
twil = Client(TWILIO_ACCT_SID, TWILIO_AUTH_TOKEN)
FORWARD_NUM = '+15101234567'
HOOK_URL = 'https://hooks.slack.com/services/T0000000/B324584/Zgatq2AS5FccT'
def incoming_callme(event, context):
call = dict(parse_qsl(event['body']))
from_num = call['From']
print(f"call from {from_num}")
lookup = lookup_number(call)
reputation = check_rep(call)
is_robo = check_is_robo(call)
caller = call['CallerName'] if 'CallerName' in call else call['Caller']
attachments = [{
'author_name': f"Call from {from_num}",
'text': f"Caller: {caller}",
}, {
'author_name': f"Reputation",
'text': reputation,
}, {
'author_name': f"Is Robo",
'text': "Yes" if is_robo else "No",
}]
attachments.append({"text": json.dumps(lookup, indent=4)})
slack(
text=f"Call from {from_num}",
icon=":telephone:",
attachments=attachments,
)
response = VoiceResponse()
# spam?
if reputation >= 3 or is_robo:
response.hangup()
return twiml_res(response)
dial = Dial()
dial.number(FORWARD_NUM)
response.append(dial)
return twiml_res(response)
def twiml_res(response):
return {
"statusCode": 200,
"headers": {"Content-Type": 'application/xml'},
"body": str(response),
}
return str(response)
def format_stuff(rec):
if len(rec) == 1:
rec = rec[0]
return json.dumps(rec, indent=2)
def check_rep(call):
res = twil.lookups.phone_numbers(call['From']).fetch(add_ons='whitepages_pro_phone_rep')
res = res.add_ons['results']['whitepages_pro_phone_rep']['result']
if 'reputation_level' not in res:
return -1
rep = res['reputation_level']
return rep
def check_is_robo(call):
res = twil.lookups.phone_numbers(call['From']).fetch(add_ons='nomorobo_spamscore')
res = res.add_ons['results']['nomorobo_spamscore']['result']
if 'score' not in res:
return False
return bool(res['score'])
def lookup_number(call):
res = twil.lookups.phone_numbers(call['From']).fetch(add_ons='whitepages_pro_caller_id')
res = res.add_ons['results']['whitepages_pro_caller_id']
pprint(res)
return res['result']
def slack(text=None, title=None, attachments=[], icon=None):
if not attachments:
return
# emoji icon
icon = 'telephone'
message = {
"username": title,
# "channel": channel,
"icon_emoji": f":{icon}:",
"attachments": attachments,
"text": text,
}
slack_raw(message)
def slack_raw(payload):
response = requests.post(
HOOK_URL, data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment