Last active
December 15, 2015 19:09
-
-
Save dusual/5308709 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Email Decode imports | |
from email import Encoders | |
import random | |
import imaplib,rfc822, re, StringIO, time, os, sys | |
from time import strftime | |
from datetime import datetime, timedelta | |
import email, email.Errors, email.Header, email.Message, email.Utils | |
import smtplib | |
from email.MIMEMultipart import MIMEMultipart | |
from email.MIMEBase import MIMEBase | |
from email.mime.text import MIMEText | |
import time as Time | |
import subprocess | |
from string import Template | |
import logging | |
import logging.handlers | |
from local_vars import * | |
#procmail imports | |
import sys | |
import os | |
#other imports | |
import re | |
from urllib2 import urlopen | |
from urllib import urlencode | |
#nltk imports | |
from nltk.stem import PorterStemmer | |
from nltk.tokenize import WordPunctTokenizer | |
from nltk.classify import NaiveBayesClassifier | |
from nltk.classify.util import accuracy | |
import sys | |
#finding bigrams | |
from nltk.collocations import BigramCollocationFinder | |
from nltk.metrics import BigramAssocMeasures | |
EMAIL_ADDRESSES = ['[email protected]'] | |
SF = 'SF-BA' | |
SR = 'SR_BA' | |
CSP = 'CSP-BA' | |
ITFS = 'SR-ITFS' | |
#utility functions | |
LOG_FILENAME="/var/log/rsvp.log" | |
logger = logging.getLogger("RSVP_Log") | |
logger.setLevel(logging.DEBUG) | |
handler = logging.handlers.RotatingFileHandler( | |
LOG_FILENAME, maxBytes=70000000, backupCount=5) | |
formatter = logging.Formatter('[%(asctime)s (%(name)s)]%(levelname)-8s"%(message)s"','%Y-%m-%d %a %H:%M:%S') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
subject_regexs = {'sf':re.compile(r'superfax',flags=re.IGNORECASE), | |
'sr':re.compile(r'virtual\s+receptionist',flags=re.IGNORECASE), | |
'csp':re.compile(r'channel\s+sales\s+partner',flags=re.IGNORECASE), | |
'itfs': re.compile(r'business\s+international',flags=re.IGNORECASE) | |
} | |
def read_in_chunks(file_object, chunk_size=1024): | |
"""Lazy function (generator) to read a file piece by piece. | |
Default chunk size: 1k.""" | |
while True: | |
data = file_object.read(chunk_size) | |
if not data: | |
break | |
yield data | |
def remove_html_tags(data): | |
p = re.compile(r'<.*?>') | |
return p.sub('', data) | |
def tuple_dictionary(tup): | |
d = {} | |
for x, y in tup: | |
if x not in d.keys(): | |
d.setdefault(x,y) | |
else: | |
temp = d[x] | |
del d[x] | |
d.setdefault(x,[]) | |
if isinstance(temp,list): | |
d[x].extend(temp) | |
else: | |
d[x].append(temp) | |
d[x].append(y) | |
return d | |
class ProcMailReciver(object): | |
def __init__(self): | |
self.mail_string = sys.stdin.read() | |
def get_mail(self): | |
return self.mail_string | |
class EmailReciever(object): | |
def __init__(self,mail_string): | |
self.mail_string = mail_string | |
self.mail = email.message_from_string(mail_string) | |
self.header_dict = tuple_dictionary(self.mail.items()) | |
def get_from(self): | |
return self.header_dict['From'] | |
def get_to(self): | |
return self.header_dict['To'] | |
def get_body(self): | |
if isinstance(self.mail.get_payload(),str) or isinstance(self.mail.get_payload(),unicode): | |
return ' '.join(self.mail.get_payload().split()) | |
else: | |
return ' '.join(self.mail.get_payload()[0].get_payload().strip().split()) | |
def get_subject(self): | |
return self.header_dict['Subject'] | |
def __str__(self): | |
return "Making sense of email text %s, %s " %(self.get_from(),self.get_subject) | |
def string_in_text(string,text): | |
if string in text: | |
return string | |
else: | |
None | |
def make_call(): | |
pass | |
class TextClassifier(object): | |
def __init__(self, text): | |
self.text = text | |
def initialize_text(self,text): | |
stemmer = PorterStemmer() | |
tokenizer = WordPunctTokenizer() | |
tokens = tokenizer.tokenize(text) | |
bigram_finder = BigramCollocationFinder.from_words(tokens) | |
bigrams = bigram_finder.nbest(BigramAssocMeasures.chi_sq, 500) | |
for bigram_tuple in bigrams: | |
x = "%s %s" % bigram_tuple | |
tokens.append(x) | |
result = [stemmer.stem(x.lower()) for x in tokens if x not in open(os.path.join(CLASSIFIER_HOME,'common-english-words.txt')).read().split(',') and len(x) > 1] | |
return result | |
def get_feature(self,word): | |
return dict([(word, True)]) | |
def bag_of_words(self,words): | |
return dict([(word, True) for word in words]) | |
def train_data(self): | |
texts = {} | |
texts['sf'] = open(os.path.join(CLASSIFIER_HOME,'superfax.txt')) | |
texts['sr'] = open(os.path.join(CLASSIFIER_HOME,'superrec.txt')) | |
texts['csp'] = open(os.path.join(CLASSIFIER_HOME,'csp.txt')) | |
texts['itfs'] = open(os.path.join(CLASSIFIER_HOME,'itfs.txt')) | |
#holds a dict of features for training our classifier | |
train_set = [] | |
# loop through each item, grab the text, tokenize it and create a training feature with it | |
for sense, f in texts.iteritems(): | |
text = f.read() | |
features = self.initialize_text(text) | |
train_set = train_set + [(self.get_feature(word), sense) for word in features] | |
self.classifier_obj = NaiveBayesClassifier.train(train_set) | |
def create_training_dict(self,text, sense): | |
''' returns a dict ready for a classifier's test method ''' | |
tokens = self.initialize_text(text) | |
return [(self.bag_of_words(tokens), sense)] | |
def classifier(self): | |
text = self.text | |
self.train_data() | |
tokens = self.bag_of_words(self.initialize_text(text)) | |
decision = self.classifier_obj.classify(tokens) | |
testfeats = [] | |
testfeats += self.create_training_dict(text, decision) | |
acc = accuracy(self.classifier_obj, testfeats) | |
return decision,acc | |
#Recieves object of the type EmailReciever | |
class EmailAnalytics(object): | |
def __init__(self,mail): | |
self.mail = mail | |
self.body = self.mail.get_body() | |
def find_product(self,body=None): | |
sub = self.match_subject(self.mail) | |
if sub is not None and self.is_reply(self.mail): | |
return sub | |
else: | |
text = self.mail.get_subject() + self.mail.get_body() | |
cf = TextClassifier(text) | |
result = cf.classifier() | |
if result[1] > 0.5: | |
return result[0] | |
return None | |
def find_number(self,body = None): | |
phone_number_pattern = re.compile(r'(?:\+?\d{5}[ -]?)?\d{10}') | |
if body is None: | |
body = self.body | |
numbers = phone_number_pattern.findall(body) | |
if len(numbers) > 0: | |
return numbers[0] | |
return None | |
def find_name(self,mail = None): | |
if mail is None: | |
mail = self.mail | |
return mail.get_from() | |
def find_email(self,mail=None): | |
if mail is None: | |
mail = self.mail | |
return mail.get_from() | |
def get_message(self,body = None): | |
if body is None: | |
body = self.body | |
return body | |
def is_reply(self,mail = None): | |
if mail is None: | |
mail = self.mail | |
if 'In-Reply-To' in mail.header_dict.keys(): | |
return True | |
return False | |
def match_subject(self,mail = None): | |
if mail is None: | |
mail = self.mail | |
for key in subject_regexs.keys(): | |
match = subject_regexs[key].findall(mail.get_subject()) | |
if len(match) > 0: | |
return key | |
return None | |
#returns a email object from a fetching algorithm | |
def fetch_mail(): | |
logger.debug('Fetching mail from procmail') | |
fm = ProcMailReciver() | |
mail_obj = EmailReciever(fm.get_mail()) | |
return mail_obj | |
def call_auto_response(name,email,product,number): | |
if product is not None: | |
if product=='sr': | |
product_code = SR | |
if product=='sf': | |
product_code = SF | |
if product=='csp': | |
product_code = CSP | |
if product=='itfs': | |
product_code = ITFS | |
if number is not None: | |
encoded_params = urlencode({'name':name,'email':email,'product':product_code,'number':number}) | |
else: | |
encoded_params = urlencode({'name':name,'email':email,'product':product_code}) | |
call_url = RESOURCE_HOME + '/api/auto-response?'+ encoded_params | |
urlopen(call_url) | |
return | |
def call_create_lead(name,email,product,number): | |
if product is not None: | |
if product=='sr': | |
product_code = SR | |
if product=='sf': | |
product_code = SF | |
if product=='csp': | |
product_code = CSP | |
if product=='itfs': | |
product_code = ITFS | |
if number is not None: | |
encoded_params = urlencode({'name':name,'email':email,'product':product_code,'number':number}) | |
else: | |
encoded_params = urlencode({'name':name,'email':email,'product':product_code}) | |
call_url = RESOURCE_HOME + '/api/create-lead?'+encoded_params | |
urlopen(call_url) | |
return | |
# if __name__== '__main__': | |
# mail_obj = fetch_mail() | |
# analysis_obj = EmailAnalytics(mail_obj) | |
# name,email,product,number = analysis_obj.find_name(),analysis_obj.find_email(),analysis_obj.find_product(),analysis_obj.find_number() | |
# call_auto_response(name,email,product,number) | |
# call_create_lead(name,email,product,number) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment