Created
September 21, 2017 16:05
-
-
Save brad-anton/58b01c51b647d35a924a0fd82743e4fc to your computer and use it in GitHub Desktop.
Checks your mailbox for lots of 'Confirm' messages and then pulls info from them
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import httplib2 | |
import os | |
import base64 | |
import email | |
import re | |
from apiclient import errors | |
from apiclient import discovery | |
from oauth2client import client | |
from oauth2client import tools | |
from oauth2client.file import Storage | |
from urlparse import urlparse, urlunsplit | |
SCOPES = 'https://www.googleapis.com/auth/gmail.readonly' | |
CLIENT_SECRET_FILE = 'client_secret.json' | |
APPLICATION_NAME = 'Gmail API Python Quickstart' | |
class SpamRegurg: | |
def __init__(self, user_id='me'): | |
credentials = self.get_credentials() | |
http = credentials.authorize(httplib2.Http()) | |
self.service = discovery.build('gmail', 'v1', http=http) | |
self.user_id = 'me' | |
self.messages = [] | |
self.get_messages() | |
@staticmethod | |
def blob2url(blob): | |
"""Returns a list of strings that resemble a URL within blob | |
Keyword Arguments: | |
blob -- String of text to find URLs in | |
""" | |
re_url = re.compile(r'(?:http|ftp)s?://' # http:// or https:// | |
r'(?:(?:[A-Z0-9](?:[\-A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[\-A-Z0-9-]{2,}\.?)|' # domain... | |
r'localhost|' # localhost... | |
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 | |
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 | |
r'(?::\d+)?' # optional port | |
r'(?:/?)?' # URI Slash | |
r'(?:[a-z0-9\-._~:/?#%@!$&\'()*+,;=]+[a-z0-9\-._~:/?#%\[\]@!$&\'()*+,;=]+)?', # URI | |
re.IGNORECASE) | |
return re_url.findall(blob) | |
@staticmethod | |
def get_credentials(credential_dir='.'): | |
"""Gets valid user credentials from storage. | |
If nothing has been stored, or if the stored credentials are invalid, | |
the OAuth2 flow is completed to obtain the new credentials. | |
Returns: | |
Credentials, the obtained credential. | |
""" | |
credential_path = os.path.join(credential_dir, | |
'gmail-python-quickstart.json') | |
store = Storage(credential_path) | |
credentials = store.get() | |
if not credentials or credentials.invalid: | |
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) | |
flow.user_agent = APPLICATION_NAME | |
if flags: | |
credentials = tools.run_flow(flow, store, flags) | |
else: # Needed only for compatibility with Python 2.6 | |
credentials = tools.run(flow, store) | |
print('Storing credentials to ' + credential_path) | |
return credentials | |
def ListMessagesMatchingQuery(self, query=''): | |
"""List all Messages of the user's mailbox matching the query. | |
Args: | |
service: Authorized Gmail API service instance. | |
user_id: User's email address. The special value "me" | |
can be used to indicate the authenticated user. | |
query: String used to filter messages returned. | |
Eg.- 'from:user@some_domain.com' for Messages from a particular sender. | |
Returns: | |
List of Messages that match the criteria of the query. Note that the | |
returned list contains Message IDs, you must use get with the | |
appropriate ID to get the details of a Message. | |
""" | |
try: | |
response = self.service.users().messages().list(userId=self.user_id, | |
q=query).execute() | |
messages = [] | |
if 'messages' in response: | |
messages.extend(response['messages']) | |
while 'nextPageToken' in response: | |
page_token = response['nextPageToken'] | |
response = self.service.users().messages().list(userId=self.user_id, q=query, | |
pageToken=page_token).execute() | |
messages.extend(response['messages']) | |
return messages | |
except errors.HttpError, error: | |
print 'An error occurred: {}'.format(error) | |
def GetMimeMessage(self, msg_id): | |
"""Get a Message and use it to create a MIME Message. | |
Args: | |
service: Authorized Gmail API service instance. | |
user_id: User's email address. The special value "me" | |
can be used to indicate the authenticated user. | |
msg_id: The ID of the Message required. | |
Returns: | |
A MIME Message, consisting of data from Message. | |
""" | |
try: | |
message = self.service.users().messages().get(userId=self.user_id, id=msg_id, | |
format='raw').execute() | |
msg_str = base64.urlsafe_b64decode(message['raw'].encode('ASCII')) | |
mime_msg = email.message_from_string(msg_str) | |
return mime_msg | |
except errors.HttpError, error: | |
print 'An error occurred: %s' % error | |
def get_messages(self): | |
msg_ids = self.ListMessagesMatchingQuery('subject:confirm after:2017/9/20 before:2017/9/23') | |
for msg_id in msg_ids: | |
self.messages.append(self.GetMimeMessage(msg_id['id'])) | |
def get_confirmation(self, message): | |
results = [] | |
if message.is_multipart(): | |
for body in message.get_payload(): | |
results += self.blob2url(body.get_payload()) | |
else: | |
results += self.blob2url(body.get_payload()) | |
return results | |
def get_domain(self, message): | |
results = [] | |
urls = list(set(self.get_confirmation(message))) | |
for url in urls: | |
s = urlparse(url) | |
results.append(urlunsplit([s[0], s[1], '', '', ''])) | |
return results | |
def get_antiabuse(self, message): | |
results = [] | |
try: | |
for item in message.get_all('X-AntiAbuse'): | |
if item.startswith('Sender Address Domain - '): | |
results.append(item.replace('Sender Address Domain - ', '')) | |
except TypeError: | |
pass | |
return results | |
def get_artifacts(self): | |
results = [] | |
for message in self.messages: | |
artifacts = { 'confirmations': self.get_confirmation(message), | |
'domains': self.get_domain(message), | |
'antiabuse': self.get_antiabuse(message) | |
} | |
results.append(artifacts) | |
return results | |
if __name__ == '__main__': | |
s = SpamRegurg() | |
print s.get_artifacts() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment