Skip to content

Instantly share code, notes, and snippets.

@TarrySingh
Last active December 10, 2018 05:26
Show Gist options
  • Save TarrySingh/63a7fb62e1cf22417a8301687b7eb1ed to your computer and use it in GitHub Desktop.
Save TarrySingh/63a7fb62e1cf22417a8301687b7eb1ed to your computer and use it in GitHub Desktop.
Face2Face
import os, argparse
import tensorflow as tf
from tensorflow.python.framework import graph_util
dir = os.path.dirname(os.path.realpath(__file__))
def freeze_graph(model_folder):
# We retrieve our checkpoint fullpath
checkpoint = tf.train.get_checkpoint_state(model_folder)
input_checkpoint = checkpoint.model_checkpoint_path
# We precise the file fullname of our freezed graph
absolute_model_folder = '/'.join(input_checkpoint.split('/')[:-1])
output_graph = absolute_model_folder + '/frozen_model.pb'
# Before exporting our graph, we need to precise what is our output node
# This is how TF decides what part of the Graph he has to keep and what part it can dump
# NOTE: this variable is plural, because you can have multiple output nodes
output_node_names = 'generate_output/output'
# We clear devices to allow TensorFlow to control on which device it will load operations
clear_devices = True
# We import the meta graph and retrieve a Saver
saver = tf.train.import_meta_graph(input_checkpoint + '.meta', clear_devices=clear_devices)
# We retrieve the protobuf graph definition
graph = tf.get_default_graph()
input_graph_def = graph.as_graph_def()
# We start a session and restore the graph weights
with tf.Session() as sess:
saver.restore(sess, input_checkpoint)
# We use a built-in TF helper to export variables to constants
output_graph_def = graph_util.convert_variables_to_constants(
sess, # The session is used to retrieve the weights
input_graph_def, # The graph_def is used to retrieve the nodes
output_node_names.split(",") # The output node names are used to select the usefull nodes
)
# Finally we serialize and dump the output graph to the filesystem
with tf.gfile.GFile(output_graph, 'wb') as f:
f.write(output_graph_def.SerializeToString())
print('%d ops in the final graph.' % len(output_graph_def.node))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--model-folder', type=str, help='Model folder to export')
args = parser.parse_args()
freeze_graph(args.model_folder)
import os
import cv2
import dlib
import time
import argparse
import numpy as np
from imutils import video
DOWNSAMPLE_RATIO = 4
def reshape_for_polyline(array):
return np.array(array, np.int32).reshape((-1, 1, 2))
def main():
os.makedirs('original', exist_ok=True)
os.makedirs('landmarks', exist_ok=True)
cap = cv2.VideoCapture(args.filename)
fps = video.FPS().start()
count = 0
while cap.isOpened():
ret, frame = cap.read()
frame_resize = cv2.resize(frame, None, fx=1 / DOWNSAMPLE_RATIO, fy=1 / DOWNSAMPLE_RATIO)
gray = cv2.cvtColor(frame_resize, cv2.COLOR_BGR2GRAY)
faces = detector(gray, 1)
black_image = np.zeros(frame.shape, np.uint8)
t = time.time()
# Perform if there is a face detected
if len(faces) == 1:
for face in faces:
detected_landmarks = predictor(gray, face).parts()
landmarks = [[p.x * DOWNSAMPLE_RATIO, p.y * DOWNSAMPLE_RATIO] for p in detected_landmarks]
jaw = reshape_for_polyline(landmarks[0:17])
left_eyebrow = reshape_for_polyline(landmarks[22:27])
right_eyebrow = reshape_for_polyline(landmarks[17:22])
nose_bridge = reshape_for_polyline(landmarks[27:31])
lower_nose = reshape_for_polyline(landmarks[30:35])
left_eye = reshape_for_polyline(landmarks[42:48])
right_eye = reshape_for_polyline(landmarks[36:42])
outer_lip = reshape_for_polyline(landmarks[48:60])
inner_lip = reshape_for_polyline(landmarks[60:68])
color = (255, 255, 255)
thickness = 3
cv2.polylines(black_image, [jaw], False, color, thickness)
cv2.polylines(black_image, [left_eyebrow], False, color, thickness)
cv2.polylines(black_image, [right_eyebrow], False, color, thickness)
cv2.polylines(black_image, [nose_bridge], False, color, thickness)
cv2.polylines(black_image, [lower_nose], True, color, thickness)
cv2.polylines(black_image, [left_eye], True, color, thickness)
cv2.polylines(black_image, [right_eye], True, color, thickness)
cv2.polylines(black_image, [outer_lip], True, color, thickness)
cv2.polylines(black_image, [inner_lip], True, color, thickness)
# Display the resulting frame
count += 1
print(count)
cv2.imwrite("original/{}.png".format(count), frame)
cv2.imwrite("landmarks/{}.png".format(count), black_image)
fps.update()
print('[INFO] elapsed time: {:.2f}'.format(time.time() - t))
if count == args.number: # only take 400 photos
break
elif cv2.waitKey(1) & 0xFF == ord('q'):
break
else:
print("No face detected")
fps.stop()
print('[INFO] elapsed time (total): {:.2f}'.format(fps.elapsed()))
print('[INFO] approx. FPS: {:.2f}'.format(fps.fps()))
cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--file', dest='filename', type=str, help='Name of the video file.')
parser.add_argument('--num', dest='number', type=int, help='Number of train data to be created.')
parser.add_argument('--landmark-model', dest='face_landmark_shape_file', type=str, help='Face landmark model file.')
args = parser.parse_args()
# Create the face predictor and landmark predictor
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(args.face_landmark_shape_file)
main()
import argparse
import tensorflow as tf
CROP_SIZE = 256 # scale_size = CROP_SIZE
ngf = 64
ndf = 64
def preprocess(image):
with tf.name_scope('preprocess'):
# [0, 1] => [-1, 1]
return image * 2 - 1
def deprocess(image):
with tf.name_scope('deprocess'):
# [-1, 1] => [0, 1]
return (image + 1) / 2
def conv(batch_input, out_channels, stride):
with tf.variable_scope('conv'):
in_channels = batch_input.get_shape()[3]
filter = tf.get_variable('filter', [4, 4, in_channels, out_channels], dtype=tf.float32,
initializer=tf.random_normal_initializer(0, 0.02))
# [batch, in_height, in_width, in_channels], [filter_width, filter_height, in_channels, out_channels]
# => [batch, out_height, out_width, out_channels]
padded_input = tf.pad(batch_input, [[0, 0], [1, 1], [1, 1], [0, 0]], mode='CONSTANT')
conv = tf.nn.conv2d(padded_input, filter, [1, stride, stride, 1], padding='VALID')
return conv
def lrelu(x, a):
with tf.name_scope('lrelu'):
# adding these together creates the leak part and linear part
# then cancels them out by subtracting/adding an absolute value term
# leak: a*x/2 - a*abs(x)/2
# linear: x/2 + abs(x)/2
# this block looks like it has 2 inputs on the graph unless we do this
x = tf.identity(x)
return (0.5 * (1 + a)) * x + (0.5 * (1 - a)) * tf.abs(x)
def batchnorm(input):
with tf.variable_scope('batchnorm'):
# this block looks like it has 3 inputs on the graph unless we do this
input = tf.identity(input)
channels = input.get_shape()[3]
offset = tf.get_variable('offset', [channels], dtype=tf.float32, initializer=tf.zeros_initializer())
scale = tf.get_variable('scale', [channels], dtype=tf.float32,
initializer=tf.random_normal_initializer(1.0, 0.02))
mean, variance = tf.nn.moments(input, axes=[0, 1, 2], keep_dims=False)
variance_epsilon = 1e-5
normalized = tf.nn.batch_normalization(input, mean, variance, offset, scale, variance_epsilon=variance_epsilon)
return normalized
def deconv(batch_input, out_channels):
with tf.variable_scope('deconv'):
batch, in_height, in_width, in_channels = [int(d) for d in batch_input.get_shape()]
filter = tf.get_variable('filter', [4, 4, out_channels, in_channels], dtype=tf.float32,
initializer=tf.random_normal_initializer(0, 0.02))
# [batch, in_height, in_width, in_channels], [filter_width, filter_height, out_channels, in_channels]
# => [batch, out_height, out_width, out_channels]
conv = tf.nn.conv2d_transpose(batch_input, filter, [batch, in_height * 2, in_width * 2, out_channels],
[1, 2, 2, 1], padding='SAME')
return conv
def process_image(x):
with tf.name_scope('load_images'):
raw_input = tf.image.convert_image_dtype(x, dtype=tf.float32)
raw_input.set_shape([None, None, 3])
# break apart image pair and move to range [-1, 1]
width = tf.shape(raw_input)[1] # [height, width, channels]
a_images = preprocess(raw_input[:, :width // 2, :])
b_images = preprocess(raw_input[:, width // 2:, :])
inputs, targets = [a_images, b_images]
# synchronize seed for image operations so that we do the same operations to both
# input and output images
def transform(image):
r = image
# area produces a nice downscaling, but does nearest neighbor for upscaling
# assume we're going to be doing downscaling here
r = tf.image.resize_images(r, [CROP_SIZE, CROP_SIZE], method=tf.image.ResizeMethod.AREA)
return r
with tf.name_scope('input_images'):
input_images = tf.expand_dims(transform(inputs), 0)
with tf.name_scope('target_images'):
target_images = tf.expand_dims(transform(targets), 0)
return input_images, target_images
# Tensor('batch:1', shape=(1, 256, 256, 3), dtype=float32) -> 1 batch size
def create_generator(generator_inputs, generator_outputs_channels):
layers = []
# encoder_1: [batch, 256, 256, in_channels] => [batch, 128, 128, ngf]
with tf.variable_scope('encoder_1'):
output = conv(generator_inputs, ngf, stride=2)
layers.append(output)
layer_specs = [
ngf * 2, # encoder_2: [batch, 128, 128, ngf] => [batch, 64, 64, ngf * 2]
ngf * 4, # encoder_3: [batch, 64, 64, ngf * 2] => [batch, 32, 32, ngf * 4]
ngf * 8, # encoder_4: [batch, 32, 32, ngf * 4] => [batch, 16, 16, ngf * 8]
ngf * 8, # encoder_5: [batch, 16, 16, ngf * 8] => [batch, 8, 8, ngf * 8]
ngf * 8, # encoder_6: [batch, 8, 8, ngf * 8] => [batch, 4, 4, ngf * 8]
ngf * 8, # encoder_7: [batch, 4, 4, ngf * 8] => [batch, 2, 2, ngf * 8]
ngf * 8, # encoder_8: [batch, 2, 2, ngf * 8] => [batch, 1, 1, ngf * 8]
]
for out_channels in layer_specs:
with tf.variable_scope('encoder_%d' % (len(layers) + 1)):
rectified = lrelu(layers[-1], 0.2)
# [batch, in_height, in_width, in_channels] => [batch, in_height/2, in_width/2, out_channels]
convolved = conv(rectified, out_channels, stride=2)
output = batchnorm(convolved)
layers.append(output)
layer_specs = [
(ngf * 8, 0.5), # decoder_8: [batch, 1, 1, ngf * 8] => [batch, 2, 2, ngf * 8 * 2]
(ngf * 8, 0.5), # decoder_7: [batch, 2, 2, ngf * 8 * 2] => [batch, 4, 4, ngf * 8 * 2]
(ngf * 8, 0.5), # decoder_6: [batch, 4, 4, ngf * 8 * 2] => [batch, 8, 8, ngf * 8 * 2]
(ngf * 8, 0.0), # decoder_5: [batch, 8, 8, ngf * 8 * 2] => [batch, 16, 16, ngf * 8 * 2]
(ngf * 4, 0.0), # decoder_4: [batch, 16, 16, ngf * 8 * 2] => [batch, 32, 32, ngf * 4 * 2]
(ngf * 2, 0.0), # decoder_3: [batch, 32, 32, ngf * 4 * 2] => [batch, 64, 64, ngf * 2 * 2]
(ngf, 0.0), # decoder_2: [batch, 64, 64, ngf * 2 * 2] => [batch, 128, 128, ngf * 2]
]
num_encoder_layers = len(layers)
for decoder_layer, (out_channels, dropout) in enumerate(layer_specs):
skip_layer = num_encoder_layers - decoder_layer - 1
with tf.variable_scope('decoder_%d' % (skip_layer + 1)):
if decoder_layer == 0:
# first decoder layer doesn't have skip connections
# since it is directly connected to the skip_layer
input = layers[-1]
else:
input = tf.concat([layers[-1], layers[skip_layer]], axis=3)
rectified = tf.nn.relu(input)
# [batch, in_height, in_width, in_channels] => [batch, in_height*2, in_width*2, out_channels]
output = deconv(rectified, out_channels)
output = batchnorm(output)
if dropout > 0.0:
output = tf.nn.dropout(output, keep_prob=1 - dropout)
layers.append(output)
# decoder_1: [batch, 128, 128, ngf * 2] => [batch, 256, 256, generator_outputs_channels]
with tf.variable_scope('decoder_1'):
input = tf.concat([layers[-1], layers[0]], axis=3)
rectified = tf.nn.relu(input)
output = deconv(rectified, generator_outputs_channels)
output = tf.tanh(output)
layers.append(output)
return layers[-1]
def create_model(inputs, targets):
with tf.variable_scope('generator') as scope:
out_channels = int(targets.get_shape()[-1])
outputs = create_generator(inputs, out_channels)
return outputs
def convert(image):
return tf.image.convert_image_dtype(image, dtype=tf.uint8, saturate=True, name='output') # output tensor
def generate_output(x):
with tf.name_scope('generate_output'):
test_inputs, test_targets = process_image(x)
# inputs and targets are [batch_size, height, width, channels]
model = create_model(test_inputs, test_targets)
# deprocess files
outputs = deprocess(model)
# reverse any processing on images so they can be written to disk or displayed to user
converted_outputs = convert(outputs)
return converted_outputs
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--model-input', dest='input_folder', type=str, help='Model folder to import.')
parser.add_argument('--model-output', dest='output_folder', type=str, help='Model (reduced) folder to export.')
args = parser.parse_args()
x = tf.placeholder(tf.uint8, shape=(256, 512, 3), name='image_tensor') # input tensor
y = generate_output(x)
with tf.Session() as sess:
# Restore original model
saver = tf.train.Saver()
checkpoint = tf.train.latest_checkpoint(args.input_folder)
saver.restore(sess, checkpoint)
# Export reduced model used for prediction
saver = tf.train.Saver()
saver.save(sess, '{}/reduced_model'.format(args.output_folder))
print("Model is exported to {}".format(checkpoint))
import argparse
import cv2
import dlib
import numpy as np
import tensorflow as tf
from imutils import video
CROP_SIZE = 256
DOWNSAMPLE_RATIO = 4
def reshape_for_polyline(array):
"""Reshape image so that it works with polyline."""
return np.array(array, np.int32).reshape((-1, 1, 2))
def resize(image):
"""Crop and resize image for pix2pix."""
height, width, _ = image.shape
if height != width:
# crop to correct ratio
size = min(height, width)
oh = (height - size) // 2
ow = (width - size) // 2
cropped_image = image[oh:(oh + size), ow:(ow + size)]
image_resize = cv2.resize(cropped_image, (CROP_SIZE, CROP_SIZE))
return image_resize
def load_graph(frozen_graph_filename):
"""Load a (frozen) Tensorflow model into memory."""
graph = tf.Graph()
with graph.as_default():
od_graph_def = tf.GraphDef()
with tf.gfile.GFile(frozen_graph_filename, 'rb') as fid:
serialized_graph = fid.read()
od_graph_def.ParseFromString(serialized_graph)
tf.import_graph_def(od_graph_def, name='')
return graph
def main():
# TensorFlow
graph = load_graph(args.frozen_model_file)
image_tensor = graph.get_tensor_by_name('image_tensor:0')
output_tensor = graph.get_tensor_by_name('generate_output/output:0')
sess = tf.Session(graph=graph)
# OpenCV
cap = cv2.VideoCapture(args.video_source)
fps = video.FPS().start()
while True:
ret, frame = cap.read()
# resize image and detect face
frame_resize = cv2.resize(frame, None, fx=1 / DOWNSAMPLE_RATIO, fy=1 / DOWNSAMPLE_RATIO)
gray = cv2.cvtColor(frame_resize, cv2.COLOR_BGR2GRAY)
faces = detector(gray, 1)
black_image = np.zeros(frame.shape, np.uint8)
for face in faces:
detected_landmarks = predictor(gray, face).parts()
landmarks = [[p.x * DOWNSAMPLE_RATIO, p.y * DOWNSAMPLE_RATIO] for p in detected_landmarks]
jaw = reshape_for_polyline(landmarks[0:17])
left_eyebrow = reshape_for_polyline(landmarks[22:27])
right_eyebrow = reshape_for_polyline(landmarks[17:22])
nose_bridge = reshape_for_polyline(landmarks[27:31])
lower_nose = reshape_for_polyline(landmarks[30:35])
left_eye = reshape_for_polyline(landmarks[42:48])
right_eye = reshape_for_polyline(landmarks[36:42])
outer_lip = reshape_for_polyline(landmarks[48:60])
inner_lip = reshape_for_polyline(landmarks[60:68])
color = (255, 255, 255)
thickness = 3
cv2.polylines(black_image, [jaw], False, color, thickness)
cv2.polylines(black_image, [left_eyebrow], False, color, thickness)
cv2.polylines(black_image, [right_eyebrow], False, color, thickness)
cv2.polylines(black_image, [nose_bridge], False, color, thickness)
cv2.polylines(black_image, [lower_nose], True, color, thickness)
cv2.polylines(black_image, [left_eye], True, color, thickness)
cv2.polylines(black_image, [right_eye], True, color, thickness)
cv2.polylines(black_image, [outer_lip], True, color, thickness)
cv2.polylines(black_image, [inner_lip], True, color, thickness)
# generate prediction
combined_image = np.concatenate([resize(black_image), resize(frame_resize)], axis=1)
image_rgb = cv2.cvtColor(combined_image, cv2.COLOR_BGR2RGB) # OpenCV uses BGR instead of RGB
generated_image = sess.run(output_tensor, feed_dict={image_tensor: image_rgb})
image_bgr = cv2.cvtColor(np.squeeze(generated_image), cv2.COLOR_RGB2BGR)
image_normal = np.concatenate([resize(frame_resize), image_bgr], axis=1)
image_landmark = np.concatenate([resize(black_image), image_bgr], axis=1)
if args.display_landmark == 0:
cv2.imshow('frame', image_normal)
else:
cv2.imshow('frame', image_landmark)
fps.update()
if cv2.waitKey(1) & 0xFF == ord('q'):
break
fps.stop()
print('[INFO] elapsed time (total): {:.2f}'.format(fps.elapsed()))
print('[INFO] approx. FPS: {:.2f}'.format(fps.fps()))
sess.close()
cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-src', '--source', dest='video_source', type=int,
default=0, help='Device index of the camera.')
parser.add_argument('--show', dest='display_landmark', type=int, default=0, choices=[0, 1],
help='0 shows the normal input and 1 the facial landmark.')
parser.add_argument('--landmark-model', dest='face_landmark_shape_file', type=str, help='Face landmark model file.')
parser.add_argument('--tf-model', dest='frozen_model_file', type=str, help='Frozen TensorFlow model file.')
args = parser.parse_args()
# Create the face predictor and landmark predictor
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(args.face_landmark_shape_file)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment