Last active
December 10, 2018 05:26
-
-
Save TarrySingh/63a7fb62e1cf22417a8301687b7eb1ed to your computer and use it in GitHub Desktop.
Face2Face
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, argparse | |
import tensorflow as tf | |
from tensorflow.python.framework import graph_util | |
dir = os.path.dirname(os.path.realpath(__file__)) | |
def freeze_graph(model_folder): | |
# We retrieve our checkpoint fullpath | |
checkpoint = tf.train.get_checkpoint_state(model_folder) | |
input_checkpoint = checkpoint.model_checkpoint_path | |
# We precise the file fullname of our freezed graph | |
absolute_model_folder = '/'.join(input_checkpoint.split('/')[:-1]) | |
output_graph = absolute_model_folder + '/frozen_model.pb' | |
# Before exporting our graph, we need to precise what is our output node | |
# This is how TF decides what part of the Graph he has to keep and what part it can dump | |
# NOTE: this variable is plural, because you can have multiple output nodes | |
output_node_names = 'generate_output/output' | |
# We clear devices to allow TensorFlow to control on which device it will load operations | |
clear_devices = True | |
# We import the meta graph and retrieve a Saver | |
saver = tf.train.import_meta_graph(input_checkpoint + '.meta', clear_devices=clear_devices) | |
# We retrieve the protobuf graph definition | |
graph = tf.get_default_graph() | |
input_graph_def = graph.as_graph_def() | |
# We start a session and restore the graph weights | |
with tf.Session() as sess: | |
saver.restore(sess, input_checkpoint) | |
# We use a built-in TF helper to export variables to constants | |
output_graph_def = graph_util.convert_variables_to_constants( | |
sess, # The session is used to retrieve the weights | |
input_graph_def, # The graph_def is used to retrieve the nodes | |
output_node_names.split(",") # The output node names are used to select the usefull nodes | |
) | |
# Finally we serialize and dump the output graph to the filesystem | |
with tf.gfile.GFile(output_graph, 'wb') as f: | |
f.write(output_graph_def.SerializeToString()) | |
print('%d ops in the final graph.' % len(output_graph_def.node)) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--model-folder', type=str, help='Model folder to export') | |
args = parser.parse_args() | |
freeze_graph(args.model_folder) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import cv2 | |
import dlib | |
import time | |
import argparse | |
import numpy as np | |
from imutils import video | |
DOWNSAMPLE_RATIO = 4 | |
def reshape_for_polyline(array): | |
return np.array(array, np.int32).reshape((-1, 1, 2)) | |
def main(): | |
os.makedirs('original', exist_ok=True) | |
os.makedirs('landmarks', exist_ok=True) | |
cap = cv2.VideoCapture(args.filename) | |
fps = video.FPS().start() | |
count = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
frame_resize = cv2.resize(frame, None, fx=1 / DOWNSAMPLE_RATIO, fy=1 / DOWNSAMPLE_RATIO) | |
gray = cv2.cvtColor(frame_resize, cv2.COLOR_BGR2GRAY) | |
faces = detector(gray, 1) | |
black_image = np.zeros(frame.shape, np.uint8) | |
t = time.time() | |
# Perform if there is a face detected | |
if len(faces) == 1: | |
for face in faces: | |
detected_landmarks = predictor(gray, face).parts() | |
landmarks = [[p.x * DOWNSAMPLE_RATIO, p.y * DOWNSAMPLE_RATIO] for p in detected_landmarks] | |
jaw = reshape_for_polyline(landmarks[0:17]) | |
left_eyebrow = reshape_for_polyline(landmarks[22:27]) | |
right_eyebrow = reshape_for_polyline(landmarks[17:22]) | |
nose_bridge = reshape_for_polyline(landmarks[27:31]) | |
lower_nose = reshape_for_polyline(landmarks[30:35]) | |
left_eye = reshape_for_polyline(landmarks[42:48]) | |
right_eye = reshape_for_polyline(landmarks[36:42]) | |
outer_lip = reshape_for_polyline(landmarks[48:60]) | |
inner_lip = reshape_for_polyline(landmarks[60:68]) | |
color = (255, 255, 255) | |
thickness = 3 | |
cv2.polylines(black_image, [jaw], False, color, thickness) | |
cv2.polylines(black_image, [left_eyebrow], False, color, thickness) | |
cv2.polylines(black_image, [right_eyebrow], False, color, thickness) | |
cv2.polylines(black_image, [nose_bridge], False, color, thickness) | |
cv2.polylines(black_image, [lower_nose], True, color, thickness) | |
cv2.polylines(black_image, [left_eye], True, color, thickness) | |
cv2.polylines(black_image, [right_eye], True, color, thickness) | |
cv2.polylines(black_image, [outer_lip], True, color, thickness) | |
cv2.polylines(black_image, [inner_lip], True, color, thickness) | |
# Display the resulting frame | |
count += 1 | |
print(count) | |
cv2.imwrite("original/{}.png".format(count), frame) | |
cv2.imwrite("landmarks/{}.png".format(count), black_image) | |
fps.update() | |
print('[INFO] elapsed time: {:.2f}'.format(time.time() - t)) | |
if count == args.number: # only take 400 photos | |
break | |
elif cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
else: | |
print("No face detected") | |
fps.stop() | |
print('[INFO] elapsed time (total): {:.2f}'.format(fps.elapsed())) | |
print('[INFO] approx. FPS: {:.2f}'.format(fps.fps())) | |
cap.release() | |
cv2.destroyAllWindows() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--file', dest='filename', type=str, help='Name of the video file.') | |
parser.add_argument('--num', dest='number', type=int, help='Number of train data to be created.') | |
parser.add_argument('--landmark-model', dest='face_landmark_shape_file', type=str, help='Face landmark model file.') | |
args = parser.parse_args() | |
# Create the face predictor and landmark predictor | |
detector = dlib.get_frontal_face_detector() | |
predictor = dlib.shape_predictor(args.face_landmark_shape_file) | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import tensorflow as tf | |
CROP_SIZE = 256 # scale_size = CROP_SIZE | |
ngf = 64 | |
ndf = 64 | |
def preprocess(image): | |
with tf.name_scope('preprocess'): | |
# [0, 1] => [-1, 1] | |
return image * 2 - 1 | |
def deprocess(image): | |
with tf.name_scope('deprocess'): | |
# [-1, 1] => [0, 1] | |
return (image + 1) / 2 | |
def conv(batch_input, out_channels, stride): | |
with tf.variable_scope('conv'): | |
in_channels = batch_input.get_shape()[3] | |
filter = tf.get_variable('filter', [4, 4, in_channels, out_channels], dtype=tf.float32, | |
initializer=tf.random_normal_initializer(0, 0.02)) | |
# [batch, in_height, in_width, in_channels], [filter_width, filter_height, in_channels, out_channels] | |
# => [batch, out_height, out_width, out_channels] | |
padded_input = tf.pad(batch_input, [[0, 0], [1, 1], [1, 1], [0, 0]], mode='CONSTANT') | |
conv = tf.nn.conv2d(padded_input, filter, [1, stride, stride, 1], padding='VALID') | |
return conv | |
def lrelu(x, a): | |
with tf.name_scope('lrelu'): | |
# adding these together creates the leak part and linear part | |
# then cancels them out by subtracting/adding an absolute value term | |
# leak: a*x/2 - a*abs(x)/2 | |
# linear: x/2 + abs(x)/2 | |
# this block looks like it has 2 inputs on the graph unless we do this | |
x = tf.identity(x) | |
return (0.5 * (1 + a)) * x + (0.5 * (1 - a)) * tf.abs(x) | |
def batchnorm(input): | |
with tf.variable_scope('batchnorm'): | |
# this block looks like it has 3 inputs on the graph unless we do this | |
input = tf.identity(input) | |
channels = input.get_shape()[3] | |
offset = tf.get_variable('offset', [channels], dtype=tf.float32, initializer=tf.zeros_initializer()) | |
scale = tf.get_variable('scale', [channels], dtype=tf.float32, | |
initializer=tf.random_normal_initializer(1.0, 0.02)) | |
mean, variance = tf.nn.moments(input, axes=[0, 1, 2], keep_dims=False) | |
variance_epsilon = 1e-5 | |
normalized = tf.nn.batch_normalization(input, mean, variance, offset, scale, variance_epsilon=variance_epsilon) | |
return normalized | |
def deconv(batch_input, out_channels): | |
with tf.variable_scope('deconv'): | |
batch, in_height, in_width, in_channels = [int(d) for d in batch_input.get_shape()] | |
filter = tf.get_variable('filter', [4, 4, out_channels, in_channels], dtype=tf.float32, | |
initializer=tf.random_normal_initializer(0, 0.02)) | |
# [batch, in_height, in_width, in_channels], [filter_width, filter_height, out_channels, in_channels] | |
# => [batch, out_height, out_width, out_channels] | |
conv = tf.nn.conv2d_transpose(batch_input, filter, [batch, in_height * 2, in_width * 2, out_channels], | |
[1, 2, 2, 1], padding='SAME') | |
return conv | |
def process_image(x): | |
with tf.name_scope('load_images'): | |
raw_input = tf.image.convert_image_dtype(x, dtype=tf.float32) | |
raw_input.set_shape([None, None, 3]) | |
# break apart image pair and move to range [-1, 1] | |
width = tf.shape(raw_input)[1] # [height, width, channels] | |
a_images = preprocess(raw_input[:, :width // 2, :]) | |
b_images = preprocess(raw_input[:, width // 2:, :]) | |
inputs, targets = [a_images, b_images] | |
# synchronize seed for image operations so that we do the same operations to both | |
# input and output images | |
def transform(image): | |
r = image | |
# area produces a nice downscaling, but does nearest neighbor for upscaling | |
# assume we're going to be doing downscaling here | |
r = tf.image.resize_images(r, [CROP_SIZE, CROP_SIZE], method=tf.image.ResizeMethod.AREA) | |
return r | |
with tf.name_scope('input_images'): | |
input_images = tf.expand_dims(transform(inputs), 0) | |
with tf.name_scope('target_images'): | |
target_images = tf.expand_dims(transform(targets), 0) | |
return input_images, target_images | |
# Tensor('batch:1', shape=(1, 256, 256, 3), dtype=float32) -> 1 batch size | |
def create_generator(generator_inputs, generator_outputs_channels): | |
layers = [] | |
# encoder_1: [batch, 256, 256, in_channels] => [batch, 128, 128, ngf] | |
with tf.variable_scope('encoder_1'): | |
output = conv(generator_inputs, ngf, stride=2) | |
layers.append(output) | |
layer_specs = [ | |
ngf * 2, # encoder_2: [batch, 128, 128, ngf] => [batch, 64, 64, ngf * 2] | |
ngf * 4, # encoder_3: [batch, 64, 64, ngf * 2] => [batch, 32, 32, ngf * 4] | |
ngf * 8, # encoder_4: [batch, 32, 32, ngf * 4] => [batch, 16, 16, ngf * 8] | |
ngf * 8, # encoder_5: [batch, 16, 16, ngf * 8] => [batch, 8, 8, ngf * 8] | |
ngf * 8, # encoder_6: [batch, 8, 8, ngf * 8] => [batch, 4, 4, ngf * 8] | |
ngf * 8, # encoder_7: [batch, 4, 4, ngf * 8] => [batch, 2, 2, ngf * 8] | |
ngf * 8, # encoder_8: [batch, 2, 2, ngf * 8] => [batch, 1, 1, ngf * 8] | |
] | |
for out_channels in layer_specs: | |
with tf.variable_scope('encoder_%d' % (len(layers) + 1)): | |
rectified = lrelu(layers[-1], 0.2) | |
# [batch, in_height, in_width, in_channels] => [batch, in_height/2, in_width/2, out_channels] | |
convolved = conv(rectified, out_channels, stride=2) | |
output = batchnorm(convolved) | |
layers.append(output) | |
layer_specs = [ | |
(ngf * 8, 0.5), # decoder_8: [batch, 1, 1, ngf * 8] => [batch, 2, 2, ngf * 8 * 2] | |
(ngf * 8, 0.5), # decoder_7: [batch, 2, 2, ngf * 8 * 2] => [batch, 4, 4, ngf * 8 * 2] | |
(ngf * 8, 0.5), # decoder_6: [batch, 4, 4, ngf * 8 * 2] => [batch, 8, 8, ngf * 8 * 2] | |
(ngf * 8, 0.0), # decoder_5: [batch, 8, 8, ngf * 8 * 2] => [batch, 16, 16, ngf * 8 * 2] | |
(ngf * 4, 0.0), # decoder_4: [batch, 16, 16, ngf * 8 * 2] => [batch, 32, 32, ngf * 4 * 2] | |
(ngf * 2, 0.0), # decoder_3: [batch, 32, 32, ngf * 4 * 2] => [batch, 64, 64, ngf * 2 * 2] | |
(ngf, 0.0), # decoder_2: [batch, 64, 64, ngf * 2 * 2] => [batch, 128, 128, ngf * 2] | |
] | |
num_encoder_layers = len(layers) | |
for decoder_layer, (out_channels, dropout) in enumerate(layer_specs): | |
skip_layer = num_encoder_layers - decoder_layer - 1 | |
with tf.variable_scope('decoder_%d' % (skip_layer + 1)): | |
if decoder_layer == 0: | |
# first decoder layer doesn't have skip connections | |
# since it is directly connected to the skip_layer | |
input = layers[-1] | |
else: | |
input = tf.concat([layers[-1], layers[skip_layer]], axis=3) | |
rectified = tf.nn.relu(input) | |
# [batch, in_height, in_width, in_channels] => [batch, in_height*2, in_width*2, out_channels] | |
output = deconv(rectified, out_channels) | |
output = batchnorm(output) | |
if dropout > 0.0: | |
output = tf.nn.dropout(output, keep_prob=1 - dropout) | |
layers.append(output) | |
# decoder_1: [batch, 128, 128, ngf * 2] => [batch, 256, 256, generator_outputs_channels] | |
with tf.variable_scope('decoder_1'): | |
input = tf.concat([layers[-1], layers[0]], axis=3) | |
rectified = tf.nn.relu(input) | |
output = deconv(rectified, generator_outputs_channels) | |
output = tf.tanh(output) | |
layers.append(output) | |
return layers[-1] | |
def create_model(inputs, targets): | |
with tf.variable_scope('generator') as scope: | |
out_channels = int(targets.get_shape()[-1]) | |
outputs = create_generator(inputs, out_channels) | |
return outputs | |
def convert(image): | |
return tf.image.convert_image_dtype(image, dtype=tf.uint8, saturate=True, name='output') # output tensor | |
def generate_output(x): | |
with tf.name_scope('generate_output'): | |
test_inputs, test_targets = process_image(x) | |
# inputs and targets are [batch_size, height, width, channels] | |
model = create_model(test_inputs, test_targets) | |
# deprocess files | |
outputs = deprocess(model) | |
# reverse any processing on images so they can be written to disk or displayed to user | |
converted_outputs = convert(outputs) | |
return converted_outputs | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--model-input', dest='input_folder', type=str, help='Model folder to import.') | |
parser.add_argument('--model-output', dest='output_folder', type=str, help='Model (reduced) folder to export.') | |
args = parser.parse_args() | |
x = tf.placeholder(tf.uint8, shape=(256, 512, 3), name='image_tensor') # input tensor | |
y = generate_output(x) | |
with tf.Session() as sess: | |
# Restore original model | |
saver = tf.train.Saver() | |
checkpoint = tf.train.latest_checkpoint(args.input_folder) | |
saver.restore(sess, checkpoint) | |
# Export reduced model used for prediction | |
saver = tf.train.Saver() | |
saver.save(sess, '{}/reduced_model'.format(args.output_folder)) | |
print("Model is exported to {}".format(checkpoint)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import cv2 | |
import dlib | |
import numpy as np | |
import tensorflow as tf | |
from imutils import video | |
CROP_SIZE = 256 | |
DOWNSAMPLE_RATIO = 4 | |
def reshape_for_polyline(array): | |
"""Reshape image so that it works with polyline.""" | |
return np.array(array, np.int32).reshape((-1, 1, 2)) | |
def resize(image): | |
"""Crop and resize image for pix2pix.""" | |
height, width, _ = image.shape | |
if height != width: | |
# crop to correct ratio | |
size = min(height, width) | |
oh = (height - size) // 2 | |
ow = (width - size) // 2 | |
cropped_image = image[oh:(oh + size), ow:(ow + size)] | |
image_resize = cv2.resize(cropped_image, (CROP_SIZE, CROP_SIZE)) | |
return image_resize | |
def load_graph(frozen_graph_filename): | |
"""Load a (frozen) Tensorflow model into memory.""" | |
graph = tf.Graph() | |
with graph.as_default(): | |
od_graph_def = tf.GraphDef() | |
with tf.gfile.GFile(frozen_graph_filename, 'rb') as fid: | |
serialized_graph = fid.read() | |
od_graph_def.ParseFromString(serialized_graph) | |
tf.import_graph_def(od_graph_def, name='') | |
return graph | |
def main(): | |
# TensorFlow | |
graph = load_graph(args.frozen_model_file) | |
image_tensor = graph.get_tensor_by_name('image_tensor:0') | |
output_tensor = graph.get_tensor_by_name('generate_output/output:0') | |
sess = tf.Session(graph=graph) | |
# OpenCV | |
cap = cv2.VideoCapture(args.video_source) | |
fps = video.FPS().start() | |
while True: | |
ret, frame = cap.read() | |
# resize image and detect face | |
frame_resize = cv2.resize(frame, None, fx=1 / DOWNSAMPLE_RATIO, fy=1 / DOWNSAMPLE_RATIO) | |
gray = cv2.cvtColor(frame_resize, cv2.COLOR_BGR2GRAY) | |
faces = detector(gray, 1) | |
black_image = np.zeros(frame.shape, np.uint8) | |
for face in faces: | |
detected_landmarks = predictor(gray, face).parts() | |
landmarks = [[p.x * DOWNSAMPLE_RATIO, p.y * DOWNSAMPLE_RATIO] for p in detected_landmarks] | |
jaw = reshape_for_polyline(landmarks[0:17]) | |
left_eyebrow = reshape_for_polyline(landmarks[22:27]) | |
right_eyebrow = reshape_for_polyline(landmarks[17:22]) | |
nose_bridge = reshape_for_polyline(landmarks[27:31]) | |
lower_nose = reshape_for_polyline(landmarks[30:35]) | |
left_eye = reshape_for_polyline(landmarks[42:48]) | |
right_eye = reshape_for_polyline(landmarks[36:42]) | |
outer_lip = reshape_for_polyline(landmarks[48:60]) | |
inner_lip = reshape_for_polyline(landmarks[60:68]) | |
color = (255, 255, 255) | |
thickness = 3 | |
cv2.polylines(black_image, [jaw], False, color, thickness) | |
cv2.polylines(black_image, [left_eyebrow], False, color, thickness) | |
cv2.polylines(black_image, [right_eyebrow], False, color, thickness) | |
cv2.polylines(black_image, [nose_bridge], False, color, thickness) | |
cv2.polylines(black_image, [lower_nose], True, color, thickness) | |
cv2.polylines(black_image, [left_eye], True, color, thickness) | |
cv2.polylines(black_image, [right_eye], True, color, thickness) | |
cv2.polylines(black_image, [outer_lip], True, color, thickness) | |
cv2.polylines(black_image, [inner_lip], True, color, thickness) | |
# generate prediction | |
combined_image = np.concatenate([resize(black_image), resize(frame_resize)], axis=1) | |
image_rgb = cv2.cvtColor(combined_image, cv2.COLOR_BGR2RGB) # OpenCV uses BGR instead of RGB | |
generated_image = sess.run(output_tensor, feed_dict={image_tensor: image_rgb}) | |
image_bgr = cv2.cvtColor(np.squeeze(generated_image), cv2.COLOR_RGB2BGR) | |
image_normal = np.concatenate([resize(frame_resize), image_bgr], axis=1) | |
image_landmark = np.concatenate([resize(black_image), image_bgr], axis=1) | |
if args.display_landmark == 0: | |
cv2.imshow('frame', image_normal) | |
else: | |
cv2.imshow('frame', image_landmark) | |
fps.update() | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
fps.stop() | |
print('[INFO] elapsed time (total): {:.2f}'.format(fps.elapsed())) | |
print('[INFO] approx. FPS: {:.2f}'.format(fps.fps())) | |
sess.close() | |
cap.release() | |
cv2.destroyAllWindows() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-src', '--source', dest='video_source', type=int, | |
default=0, help='Device index of the camera.') | |
parser.add_argument('--show', dest='display_landmark', type=int, default=0, choices=[0, 1], | |
help='0 shows the normal input and 1 the facial landmark.') | |
parser.add_argument('--landmark-model', dest='face_landmark_shape_file', type=str, help='Face landmark model file.') | |
parser.add_argument('--tf-model', dest='frozen_model_file', type=str, help='Frozen TensorFlow model file.') | |
args = parser.parse_args() | |
# Create the face predictor and landmark predictor | |
detector = dlib.get_frontal_face_detector() | |
predictor = dlib.shape_predictor(args.face_landmark_shape_file) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment