Created
July 18, 2017 18:40
-
-
Save vandorjw/9a1cc7cae70e96d20ac57ebb673715d4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import imaplib | |
import email | |
import email.header | |
import datetime | |
import boto3 | |
EMAIL_ACCOUNT = os.environ['EMAIL_ACCOUNT'] | |
EMAIL_PASSWORD = os.environ['EMAIL_PASSWORD'] | |
# Use 'INBOX' to read inbox. Note that whatever folder is specified, | |
# after successfully running this script all emails in that folder | |
# will be marked as read. | |
EMAIL_FOLDER = os.environ['EMAIL_FOLDER'] | |
AWS_REGION = os.environ['X_AWS_REGION'] | |
AWS_ACCESS_KEY_ID = os.environ['X_AWS_ACCESS_KEY_ID'] | |
AWS_SECRET_KEY = os.environ['X_AWS_SECRET_KEY'] | |
AWS_METRIC_NAME = os.environ['AWS_METRIC_NAME'] | |
AWS_CLOUDWATCH_NAMESPACE = os.environ['AWS_CLOUDWATCH_NAMESPACE'] | |
def chunks(l, n): | |
"""Yield successive n-sized chunks from l.""" | |
for i in range(0, len(l), n): | |
yield l[i:i + n] | |
def process_mailbox(M): | |
""" | |
Do something with emails messages in the folder. | |
For the sake of this example, print some headers. | |
""" | |
rv, data = M.search(None, '(UNSEEN)') | |
if rv != 'OK': | |
print("No messages found!") | |
return | |
all_metrics = [] | |
for num in data[0].split(): | |
rv, data = M.fetch(num, '(RFC822)') | |
if rv != 'OK': | |
print("ERROR getting message", num) | |
return | |
msg = email.message_from_bytes(data[0][1]) | |
hdr = email.header.make_header(email.header.decode_header(msg['Subject'])) | |
subject = str(hdr) | |
non_ascii_subject = ''.join([i if ord(i) < 128 else ' ' for i in subject]) | |
msg_date = msg['Date'] | |
time_stamp = datetime.datetime.strptime(msg_date, '%a, %d %b %Y %H:%M:%S %z') | |
print('Message %s: %s' % (num, subject)) | |
print('Raw Date:', msg_date) | |
all_metrics.append( | |
{ | |
'MetricName': AWS_METRIC_NAME, | |
'Dimensions': [ | |
{ | |
'Name': 'EmailAccount', | |
'Value': EMAIL_ACCOUNT | |
}, | |
{ | |
'Name': 'Subject', | |
'Value': non_ascii_subject | |
}, | |
], | |
'Timestamp': time_stamp, | |
'Value': 1, | |
'Unit': 'Count', | |
}, | |
) | |
return all_metrics | |
def lambda_handler(event, context): | |
M = imaplib.IMAP4_SSL('imap.gmail.com') | |
try: | |
rv, data = M.login(EMAIL_ACCOUNT, EMAIL_PASSWORD) | |
except imaplib.IMAP4.error: | |
print ("LOGIN FAILED!!! ") | |
sys.exit(1) | |
print(rv, data) | |
rv, mailboxes = M.list() | |
if rv == 'OK': | |
print("Mailboxes:") | |
print(mailboxes) | |
rv, data = M.select(EMAIL_FOLDER) | |
if rv == 'OK': | |
print("Processing mailbox...\n") | |
all_metrics = process_mailbox(M) | |
print("Metrics to be sent to AWS (namespace = {}): {}".format(AWS_CLOUDWATCH_NAMESPACE, all_metrics)) | |
cloudwatch = boto3.client( | |
'cloudwatch', | |
region_name=AWS_REGION, | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_KEY, | |
) | |
for metric_data in list(chunks(all_metrics, 10)): | |
response = cloudwatch.put_metric_data( | |
Namespace=AWS_CLOUDWATCH_NAMESPACE, | |
MetricData=metric_data | |
) | |
print("Response from AWS: {}".format(response)) | |
M.close() | |
else: | |
print("ERROR: Unable to open mailbox ", rv) | |
M.logout() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a script I used to check email received. Our CI server would run daily tests, and those test would send out emails. If no email was detected in our test mailbox, I had cloudwatch configured to send and alarm email to the support team