Last active
May 8, 2017 17:16
-
-
Save billyeh/628eb2adcad08f3b1d837e488cf11fa0 to your computer and use it in GitHub Desktop.
Simple way to deliver photos by email using facebook image recognition
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import fnmatch | |
import os | |
import ast | |
import httplib2 | |
import mimetypes | |
import base64 | |
import logging | |
from facepy import GraphAPI | |
from apiclient.discovery import build | |
from oauth2client.file import Storage | |
from oauth2client.client import OAuth2WebServerFlow | |
from oauth2client.tools import run_flow | |
from email.mime.text import MIMEText | |
from email.mime.image import MIMEImage | |
from email.mime.multipart import MIMEMultipart | |
#httplib2.debuglevel = 1 | |
ACCESS_TOKEN = '' | |
COOKIES = '' | |
FB_DTSG = '' | |
OUTFILE = 'out.txt' | |
RECOGFILE = 'recognized.txt' | |
NAME_TO_EMAIL_FILE = 'name_to_email.txt' | |
PEOPLE_FLOW = OAuth2WebServerFlow( | |
client_id='', | |
client_secret='', | |
scope='https://www.googleapis.com/auth/contacts.readonly', | |
user_agent='Contacts/1.0') | |
GMAIL_FLOW = OAuth2WebServerFlow( | |
client_id='', | |
client_secret='', | |
scope='https://www.googleapis.com/auth/gmail.compose', | |
user_agent='Gmail/1.0') | |
MY_GMAIL_ID = '' | |
MY_EMAIL = '' | |
PHOTO_DIR = '/Users/user/Photos' | |
def recognize(path,access_token,cookies,fb_dtsg): | |
"""Thanks to https://github.com/samj1912/fbrecog/blob/master/fbrecog.py""" | |
URL = "https://www.facebook.com/photos/tagging/recognition/?dpr=1" | |
graph = GraphAPI(access_token) | |
post_id = graph.post( path = 'me/photos', source = open(path, 'rb'))['id'] | |
headers = { | |
'x_fb_background_state': '1', | |
'origin': 'https://www.facebook.com', | |
'accept-encoding': 'gzip, deflate, lzma', | |
'accept-language': 'en-US,en;q=0.8', | |
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2723.2 Safari/537.36', 'content-type': 'application/x-www-form-urlencoded', | |
'accept': '*/*', | |
'referer': 'https://www.facebook.com/', | |
'cookie': cookies, | |
'dnt': '1', | |
} | |
arr = [] | |
payload = [] | |
while not payload: | |
data = 'recognition_project=composer_facerec&photos[0]='+post_id+'&target&is_page=false&include_unrecognized_faceboxes=false&include_face_crop_src=true&include_recognized_user_profile_picture=true&include_low_confidence_recognitions=true&__a=1&fb_dtsg='+fb_dtsg | |
req = requests.post(URL,data = data,headers=headers) | |
payload = json.loads(req.text.replace('for (;;);',''))['payload'] | |
print('recognize: Getting payload') | |
for recog in payload[0]['faceboxes']: | |
name = recog['recognitions'] | |
if name: | |
arr.append({'name':name[0]['user']['name'] , 'certainty' : name[0]['certainty']}) | |
graph.delete(path = post_id) | |
return arr | |
def recognize_with_facebook(): | |
matches = [] | |
outfile = open(OUTFILE) | |
output = outfile.read() | |
outfile.close() | |
for root, dirnames, filenames in os.walk(PHOTO_DIR): | |
for filename in fnmatch.filter(filenames, '*.jpg'): | |
f = os.path.join(root, filename) | |
if f not in output: | |
matches.append(f) | |
photos = [] | |
with open(OUTFILE, "a+") as myfile: | |
for photo in matches: | |
match_array = recognize(photo, ACCESS_TOKEN, COOKIES, FB_DTSG) | |
match_object = {'photo': photo, 'matches': match_array} | |
myfile.write(str(match_object) + '\n') | |
def recognize_manually(): | |
myfile = open(RECOGFILE, 'a+') | |
otherfile = open(OUTFILE) | |
for line in otherfile: | |
pic = ast.literal_eval(line) | |
if not pic['matches']: | |
os.system('open "' + pic['photo'] + '"') | |
persons = raw_input('Enter names delimited by commas: ') | |
matches = [] | |
for person in persons.split(','): | |
matches.append({'certainty': 1, 'name': person}) | |
pic['matches'] = matches | |
myfile.write(str(pic) + '\n') | |
myfile.close() | |
otherfile.close() | |
os.system("mv " + RECOGFILE + " " + OUTFILE) | |
def get_people_service(): | |
"""From https://developers.google.com/people/ but honestly their documentation is garbage""" | |
storage = Storage('people_info.dat') | |
credentials = storage.get() | |
if not credentials or credentials.invalid: | |
credentials = run_flow(PEOPLE_FLOW, storage) | |
http = httplib2.Http() | |
http = credentials.authorize(http) | |
people_service = build(serviceName='people', version='v1', http=http) | |
return people_service | |
def get_gmail_service(): | |
storage = Storage('gmail_info.dat') | |
credentials = storage.get() | |
if not credentials or credentials.invalid: | |
credentials = run_flow(GMAIL_FLOW, storage) | |
http = httplib2.Http() | |
http = credentials.authorize(http) | |
gmail_service = build(serviceName='gmail', version='v1', http=http) | |
return gmail_service | |
def get_first_last_from_full(name): | |
"""We don't care about middle names, really""" | |
names = name.split(' ') | |
return (names[0], names[-1]) | |
def check_emails(response, all_names): | |
has_names = set() | |
for conn in response['connections']: | |
try: | |
has_names.add(get_first_last_from_full(conn['names'][0]['displayName'])) | |
except: | |
pass #deleted connections | |
no_names = set() | |
for line in open(OUTFILE): | |
pic = ast.literal_eval(line) | |
for match in pic['matches']: | |
name = get_first_last_from_full(match['name']) | |
if name not in has_names: | |
no_names.add(name) | |
all_names.add(name) | |
if no_names: | |
print('check_emails: Need email addresses for some names') | |
exit() | |
def get_emails(response, all_names): | |
name_to_id = {} | |
for conn in response['connections']: | |
try: | |
name = get_first_last_from_full(conn['names'][0]['displayName']) | |
id = conn['resourceName'] | |
name_to_id[name] = id | |
except: | |
pass | |
try: | |
name_to_email = ast.literal_eval(open(NAME_TO_EMAIL_FILE).read()) | |
except: | |
name_to_email = {} | |
for name in all_names: | |
if name in name_to_email: | |
continue | |
try: | |
profile = get_people_service().people().get(resourceName=name_to_id[name]).execute() | |
name_to_email[name] = profile['emailAddresses'][0]['value'] | |
except Exception as e: | |
print('get_emails: ', e) | |
open(NAME_TO_EMAIL_FILE, 'w').write(str(name_to_email)) | |
break | |
open(NAME_TO_EMAIL_FILE, 'w').write(str(name_to_email)) | |
return name_to_email | |
def create_message_with_attachment( | |
sender, to, subject, message_text, files): | |
"""Used https://developers.google.com/gmail/api/guides/sending for these two functions""" | |
message = MIMEMultipart() | |
message['to'] = to | |
message['from'] = sender | |
message['subject'] = subject | |
msg = MIMEText(message_text) | |
message.attach(msg) | |
for f in files: | |
content_type, encoding = mimetypes.guess_type(f) | |
if not content_type or encoding: | |
content_type = 'application/octet-stream' | |
main_type, sub_type = content_type.split('/', 1) | |
fp = open(f, 'rb') | |
msg = MIMEImage(fp.read(), _subtype=sub_type) | |
fp.close() | |
filename = os.path.basename(f) | |
msg.add_header('Content-Disposition', 'attachment', filename=filename) | |
message.attach(msg) | |
return {'raw': base64.urlsafe_b64encode(message.as_string())} | |
def send_message(service, user_id, message): | |
try: | |
message = (service.users().messages().send(userId=user_id, body=message) | |
.execute()) | |
print 'Message Id: %s' % message['id'] | |
return message | |
except Exception as e: | |
print(e) | |
def create_drafts(name_to_email): | |
name_to_photos = {} | |
for line in open(OUTFILE): | |
pic = ast.literal_eval(line) | |
for person in pic['matches']: | |
name = get_first_last_from_full(person['name']) | |
photolist = name_to_photos.setdefault(name, set()) | |
photolist.add((person['certainty'], pic['photo'])) | |
#send to myself with intended recipient as subject so final QC is by human | |
sender = MY_EMAIL | |
to = MY_EMAIL | |
base_text = "Enjoy!\n\nCertainty that you're in the respective images: " | |
gmail_service = get_gmail_service() | |
for person in name_to_photos: | |
subject = name_to_email[person] | |
message_text = base_text + ','.join([str(tup[0]) for tup in name_to_photos[person]]) | |
message_text += '\n\n*Images tagged by Facebook, except where certainty = 1, which means I tagged it. Sorry for any errors! I did my best.' | |
files = [tup[1] for tup in name_to_photos[person]] | |
for i, f in enumerate(files): | |
if i == 0: | |
msg_txt = message_text | |
sbj_txt = subject | |
else: | |
msg_txt = "" | |
sbj_txt = "" | |
print('create_drafts: Creating message for ' + str(person) + '...') | |
message = create_message_with_attachment(sender, to, sbj_txt, msg_txt, [f]) | |
print('create_drafts: Sending message...') | |
send_message(gmail_service, MY_GMAIL_ID, message) | |
if __name__ == '__main__': | |
recognize_with_facebook() | |
recognize_manually() | |
response = get_people_service().people().connections().list(resourceName="people/me", pageSize=500).execute() | |
all_names = set() | |
check_emails(response, all_names) | |
name_to_email = get_emails(response, all_names) | |
create_drafts(name_to_email) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment