Created
June 6, 2019 05:21
-
-
Save rgajrawala/d042b633dc777ccca30715e4c11bb214 to your computer and use it in GitHub Desktop.
Twitter Bot that responds to mentions with human faces in the media. Takes the face, runs emotion detection, and edits it into a corresponding photo of a flower.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import pickle | |
import urllib3 as urllib | |
import numpy as np | |
import cv2 as cv | |
from keras.models import load_model | |
from keras.applications import VGG16 | |
import tweepy | |
# GPU does not have enough memory for prediction, lol | |
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' | |
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' | |
TMP_IMG = './data/tmp.png' | |
# crop face from image | |
face_cascade = cv.CascadeClassifier('haarcascade_frontalface_default.xml') | |
RESIZE_DIM = 350 | |
def getFace(img): | |
gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY) | |
faces = face_cascade.detectMultiScale(gray, 1.3, 5) | |
if len(faces) == 0: | |
return None, None | |
x, y, w, h = faces[0] | |
return img[y:y+h, x:x+w], cv.resize(gray[y:y+h, x:x+w], (RESIZE_DIM, RESIZE_DIM)) | |
# download image from internet | |
http = urllib.PoolManager() | |
def download(url): | |
res = http.request('GET', url) | |
if res.status != 200: | |
return None | |
image = np.asarray(bytearray(res.data), dtype='uint8') | |
return cv.imdecode(image, cv.IMREAD_COLOR) | |
# classify a new image | |
EMOTION_DICT={ | |
0: 'neutral', | |
1: 'happy', | |
2: 'sadness', | |
3: 'surprise', | |
4: 'anger', | |
5: 'disgust', | |
6: 'fear' | |
} | |
vggModel = VGG16(weights='imagenet', include_top=False) | |
topModel = load_model('./data/model/model.h5') | |
def getEmotion(img): | |
cv.imwrite(TMP_IMG, img) | |
img = cv.imread(TMP_IMG) | |
img = img.reshape(1, img.shape[0], img.shape[1], img.shape[2]) / 255.0 | |
vggPred = vggModel.predict(img) | |
vggPred = vggPred.reshape(1, vggPred.shape[1] * vggPred.shape[2] * vggPred.shape[3]) | |
topPred = topModel.predict(vggPred) | |
emotionId = topPred[0].argmax() | |
return EMOTION_DICT[emotionId], emotionId | |
# load the face image into the correct flower image | |
def createEmotionImg(emotion, img): | |
if emotion == 'anger' or emotion == 'contempt' or emotion == 'disgust': | |
picName, (x, y), r = ('burning.jpg', (305, 623), 80) | |
elif emotion == 'fear' or emotion == 'sadness': | |
picName, (x, y), r = ('rainy.png', (674, 234), 70) | |
else: | |
picName, (x, y), r = ('sunflower.jpg', (487, 248), 70) | |
x -= int(r) | |
y -= int(r) | |
cv.imwrite(TMP_IMG, img) | |
img = cv.imread(TMP_IMG) | |
img = cv.resize(img, (r * 2, r * 2)) | |
bg = cv.imread('./flower-imgs/' + picName, -1) | |
if len(bg.shape) > 2 and bg.shape[2] == 4: | |
bg = cv.cvtColor(bg, cv.COLOR_BGRA2BGR) | |
# write face to bg | |
for y_ in range(img.shape[0]): | |
for x_ in range(img.shape[1]): | |
if (x_ - r)**2 + (y_ - r)**2 <= r**2: | |
bg[y+y_][x+x_] = img[y_][x_] | |
# cv.imshow('image', bg) | |
# cv.waitKey(0) | |
# cv.destroyAllWindows() | |
# exit() | |
cv.imwrite(TMP_IMG, bg) | |
return TMP_IMG | |
### MAIN | |
# set up tweepy | |
auth = tweepy.OAuthHandler('XXX', 'XXX') | |
auth.set_access_token('XXX', 'XXX') | |
auth.secure = True | |
api = tweepy.API(auth_handler=auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True) | |
# main loop, goes through new mentions and replies to them | |
lastId = pickle.load(open('lastid.pickle', 'rb')) | |
while True: | |
tweets = api.mentions_timeline(since_id=lastId) | |
for tweet in tweets: | |
if not 'media' in tweet.entities: | |
continue | |
image = download(tweet.entities['media'][0]['media_url_https']) | |
if image is None: # could not download media | |
api.update_status(status='@{} could not fetch your image 😔'.format(tweet.author.screen_name), in_reply_to_status_id=tweet.id) | |
print('[ERR] <@{}, ({})> No face found'.format(tweet.author.screen_name, tweet.id_str)) | |
continue | |
img, grey = getFace(image) | |
if img is None or grey is None: # could not find face | |
api.update_status(status='@{} could not find your face 😔'.format(tweet.author.screen_name), in_reply_to_status_id=tweet.id) | |
print('[ERR] <@{}, ({})> No face found'.format(tweet.author.screen_name, tweet.id_str)) | |
continue | |
emotion, emotionId = getEmotion(grey) | |
if emotion == 'neutral': # neutral expression | |
api.update_status(status='@{} where ur feels at? 😐'.format(tweet.author.screen_name), in_reply_to_status_id=tweet.id) | |
print('[OK] <@{}, ({})> Neutral expression'.format(tweet.author.screen_name, tweet.id_str)) | |
continue | |
filePath = createEmotionImg(emotion, img) | |
res = api.media_upload(filePath) | |
api.update_status(status='@{} you seem to be feeling {}'.format(tweet.author.screen_name, emotion), in_reply_to_status_id=tweet.id, media_ids=[res.media_id]) | |
print('[OK] <@{}, ({})> {} expression'.format(tweet.author.screen_name, tweet.id_str, emotion)) | |
if len(tweets) > 0: | |
# store the most recent tweet we replied to | |
lastId = tweets[0].id_str # the first tweet is the most recent | |
pickle.dump(lastId, open('lastid.pickle', 'wb')) | |
time.sleep(10) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Train.py: Train model | |
import os | |
import numpy as np | |
import pandas as pd | |
import glob | |
import cv2 | |
from sklearn.model_selection import train_test_split | |
from keras.layers import Dropout, Dense | |
from keras.layers.normalization import BatchNormalization | |
from keras.models import Sequential | |
from keras.applications import VGG16 | |
######### create data frames | |
def getDataFrame(emotion, emotionId): | |
imgs = glob.glob('./data/_' + emotion + '/*') | |
df = pd.DataFrame() | |
df['folderName'] = [str(i.split('\\')[0]) + '/' for i in imgs] | |
df['imageName'] = [str(i.split('\\')[1]) for i in imgs] | |
df['emotion'] = [emotion] * len(imgs) | |
df['labels'] = [emotionId] * len(imgs) | |
return df | |
# 0=neutral, 1=happy, 2=sadness, 3=surprise, 4=anger, 5=disgust, 6=fear | |
frames = pd.concat([ | |
getDataFrame('neutral', 0), | |
getDataFrame('happy', 1), | |
getDataFrame('sadness', 2), | |
getDataFrame('surprise', 3), | |
getDataFrame('anger', 4), | |
getDataFrame('disgust', 5), | |
getDataFrame('fear', 6) | |
]) | |
frames.reset_index(inplace=True, drop=True) | |
frames = frames.sample(frac=1.0) # shuffle data frame | |
frames.reset_index(inplace=True, drop=True) | |
######### | |
######### ONLY RUN IMAGE CONVERSION ONCE! | |
# convert to grayscale | |
for i in range(len(frames)): | |
path1 = frames['folderName'][i] | |
path2 = frames['imageName'][i] | |
img = cv2.imread(os.path.join(path1, path2)) | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
cv2.imwrite(os.path.join(path1, path2), gray) | |
# crop face | |
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml') | |
for i, d in frames.iterrows(): | |
imgPath = os.path.join(d['folderName'], d['imageName']) | |
img = cv2.imread(imgPath) | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
try: | |
(x, y, w, h) = face_cascade.detectMultiScale(gray, 1.3, 5)[0] # assume only one face per image | |
cv2.imwrite(imgPath, cv2.resize(img[y : y + h, x : x + w], (350, 350))) # cropping, resizing and saving image | |
except: | |
print('Could not detect face for file: ' + imgPath) | |
######### | |
######### split data frames into train/test/cv | |
trainFrames, testFrames = train_test_split(frames, stratify=frames['labels'], test_size=0.2) | |
trainFrames, cvFrames = train_test_split(trainFrames, stratify=trainFrames['labels'], test_size=0.15) | |
######### | |
######### bottleneck features | |
batchPointers = { | |
'train': 0, | |
'test': 0, | |
'cv': 0 | |
} | |
# bottleneck features for train data | |
trainLabels = pd.get_dummies(trainFrames['labels']).to_numpy() | |
model = VGG16(weights='imagenet', include_top=False) | |
SAVEDIR_TRAIN = './data/bottleneck-features/train' | |
SAVEDIR_TRAIN_LABELS = './data/bottleneck-features/train-labels' | |
BATCH_SIZE = 10 | |
for i in range(int(len(trainFrames) / BATCH_SIZE)): | |
# loadCombinedTrainBatch | |
batchImages = [] | |
batchLabels = [] | |
for j in range(BATCH_SIZE): | |
path1 = trainFrames.iloc[batchPointers['train'] + j]['folderName'] | |
path2 = trainFrames.iloc[batchPointers['train'] + j]['imageName'] | |
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image | |
batchImages.append(readImage) | |
batchLabels.append(trainLabels[batchPointers['train'] + j]) # append corresponding labels | |
batchPointers['train'] += BATCH_SIZE | |
x, y = np.array(batchImages), np.array(batchLabels) | |
np.save(os.path.join(SAVEDIR_TRAIN_LABELS, 'bottleneck-labels-{}'.format(i + 1)), y) | |
np.save(os.path.join(SAVEDIR_TRAIN, 'bottleneck-{}'.format(i + 1)), model.predict(x)) | |
# bottleneck features for cv data | |
cvLabels = pd.get_dummies(cvFrames['labels']).to_numpy() | |
model = VGG16(weights='imagenet', include_top=False) | |
SAVEDIR_CV = './data/bottleneck-features/cv' | |
SAVEDIR_CV_LABELS = './data/bottleneck-features/cv-labels' | |
for i in range(int(len(cvFrames) / BATCH_SIZE)): | |
batchImages = [] | |
batchLabels = [] | |
for j in range(BATCH_SIZE): | |
path1 = cvFrames.iloc[batchPointers['cv'] + j]['folderName'] | |
path2 = cvFrames.iloc[batchPointers['cv'] + j]['imageName'] | |
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image | |
batchImages.append(readImage) | |
batchLabels.append(cvLabels[batchPointers['cv'] + j]) #appending corresponding labels | |
batchPointers['cv'] += BATCH_SIZE | |
x, y = np.array(batchImages), np.array(batchLabels) | |
np.save(os.path.join(SAVEDIR_CV_LABELS, "bottleneck-labels-{}".format(i + 1)), y) | |
np.save(os.path.join(SAVEDIR_CV, "bottleneck-{}".format(i + 1)), model.predict(x)) | |
# bottleneck features for test data | |
testLabels = pd.get_dummies(testFrames['labels']).to_numpy() | |
model = VGG16(weights='imagenet', include_top=False) | |
SAVEDIR_TEST = './data/bottleneck-features/test' | |
SAVEDIR_TEST_LABELS = './data/bottleneck-features/test-labels' | |
for i in range(int(len(testFrames) / BATCH_SIZE)): | |
batchImages = [] | |
batchLabels = [] | |
for j in range(BATCH_SIZE): | |
path1 = testFrames.iloc[batchPointers['test'] + j]['folderName'] | |
path2 = testFrames.iloc[batchPointers['test'] + j]['imageName'] | |
readImage = cv2.imread(os.path.join(path1, path2)) / 255.0 # normalize image | |
batchImages.append(readImage) | |
batchLabels.append(testLabels[batchPointers['test'] + j]) #appending corresponding labels | |
batchPointers['test'] += BATCH_SIZE | |
x, y = np.array(batchImages), np.array(batchLabels) | |
np.save(os.path.join(SAVEDIR_TEST_LABELS, "bottleneck-labels-{}".format(i + 1)), y) | |
np.save(os.path.join(SAVEDIR_TEST, "bottleneck-{}".format(i + 1)), model.predict(x)) | |
######### | |
######### modeling and training | |
def model(inputShape): | |
model = Sequential() | |
model.add(Dense(512, activation='relu', input_dim=inputShape)) | |
model.add(Dropout(0.1)) | |
model.add(Dense(256, activation='relu')) | |
model.add(Dense(128, activation='relu')) | |
model.add(BatchNormalization()) | |
model.add(Dense(64, activation='relu')) | |
model.add(Dense(output_dim=7, activation='softmax')) | |
return model | |
SAVEDIR_MODEL = './data/model' | |
INPUT_SHAPE = 10 * 10 * 512 # shape of bottleneck feature of each image after passing through VGG-16 | |
model = model(INPUT_SHAPE) | |
# model.load_weights(os.path.join(SAVEDIR_MODEL, 'model.h5')) # only if we want to keep updating previously saved model | |
model.summary() | |
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) | |
EPOCHS = 12 | |
step = 0 | |
bottleneckFiles = { | |
'train': int(len(trainFrames) / BATCH_SIZE), | |
'cv': int(len(cvFrames) / BATCH_SIZE) | |
} | |
epochNum, trainLoss, trainAcc, cvLoss, cvAcc = [], [], [], [], [] | |
for epoch in range(EPOCHS): | |
avgEpochTrainLoss, avgEpochTrainAcc, avgEpochCVLoss, avgEpochCVAcc = 0.0, 0.0, 0.0, 0.0 | |
epochNum.append(epoch + 1) | |
for i in range(bottleneckFiles['train']): | |
step += 1 | |
# load batch of train bottleneck features for training MLP | |
xTrainLoad = np.load(os.path.join(SAVEDIR_TRAIN, 'bottleneck-{}.npy'.format(i + 1))) | |
xTrain = xTrainLoad.reshape(xTrainLoad.shape[0], xTrainLoad.shape[1] * xTrainLoad.shape[2] * xTrainLoad.shape[3]) | |
yTrain = np.load(os.path.join(SAVEDIR_TRAIN_LABELS, 'bottleneck-labels-{}.npy'.format(i + 1))) | |
# load batch of cv bottleneck features for cross-validation | |
xCVLoad = np.load(os.path.join(SAVEDIR_CV, 'bottleneck-{}.npy'.format((i % bottleneckFiles['cv']) + 1))) | |
xCV = xCVLoad.reshape(xCVLoad.shape[0], xCVLoad.shape[1] * xCVLoad.shape[2] * xCVLoad.shape[3]) | |
yCV = np.load(os.path.join(SAVEDIR_CV_LABELS, 'bottleneck-labels-{}.npy'.format((i % bottleneckFiles['cv']) + 1))) | |
trainLoss_, trainAcc_ = model.train_on_batch(xTrain, yTrain) # train the model on batch | |
cvLoss_, cvAcc_ = model.test_on_batch(xCV, yCV) # cross validate the model on CV Human batch | |
print('Epoch: {}, Step: {}, Tr_Loss: {}, Tr_Acc: {}, CV_Loss: {}, CV_Acc: {}'.format(epoch + 1, step, np.round(float(trainLoss_), 2), np.round(float(trainAcc_), 2), np.round(float(cvLoss_), 2), np.round(float(cvAcc_), 2))) | |
avgEpochTrainLoss += trainLoss_ / bottleneckFiles['train'] | |
avgEpochTrainAcc += trainAcc_ / bottleneckFiles['train'] | |
avgEpochCVLoss += cvLoss_ / bottleneckFiles['train'] | |
avgEpochCVAcc += cvAcc_ / bottleneckFiles['train'] | |
print('Avg_Train_Loss: {}, Avg_Train_Acc: {}, Avg_CV_Loss: {}, Avg_CV_Acc: {}'.format(np.round(float(avgEpochTrainLoss), 2), np.round(float(avgEpochTrainAcc), 2), np.round(float(avgEpochCVLoss), 2), np.round(float(avgEpochCVAcc), 2))) | |
trainLoss.append(avgEpochTrainLoss) | |
trainAcc.append(avgEpochTrainAcc) | |
cvLoss.append(avgEpochCVLoss) | |
cvAcc.append(avgEpochCVAcc) | |
model.save(os.path.join(SAVEDIR_MODEL, 'model.h5')) # save model on each epoch | |
model.save_weights(os.path.join(SAVEDIR_MODEL, 'model_weights.h5')) # save weights on each epoch | |
print('Model and weights saved at epoch {}'.format(epoch + 1)) | |
# save stats to log | |
log_frame = pd.DataFrame(columns = ['Epoch', 'Train_Loss', 'Train_Accuracy', 'CV_Loss', 'CV_Accuracy']) | |
log_frame['Epoch'] = epochNum | |
log_frame['Train_Loss'] = trainLoss | |
log_frame['Train_Accuracy'] = trainAcc | |
log_frame['CV_Loss'] = cvLoss | |
log_frame['CV_Accuracy'] = cvAcc | |
log_frame.to_csv('./data/log.csv', index=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment