Created
March 15, 2018 13:23
-
-
Save AlexisTheLarge/02b8975978b4332683cde165a3f95e10 to your computer and use it in GitHub Desktop.
Splice in Neural Enhance upscaling into the dfaker merge process.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" _ _ | |
_ __ ___ _ _ _ __ __ _| | ___ _ __ | |__ __ _ _ __ ___ ___ | |
| '_ \ / _ \ | | | '__/ _` | | / _ \ '_ \| '_ \ / _` | '_ \ / __/ _ \ | |
| | | | __/ |_| | | | (_| | | | __/ | | | | | | (_| | | | | (_| __/ | |
|_| |_|\___|\__,_|_| \__,_|_| \___|_| |_|_| |_|\__,_|_| |_|\___\___| | |
""" | |
# | |
# Original work Copyright (c) 2016, Alex J. Champandard. | |
# Modified work Copyright (c) 2018, Alexis_TheLarge. | |
# | |
# Neural Enhance is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General | |
# Public License version 3. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; | |
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. | |
# See full license here: https://github.com/alexjc/neural-enhance/blob/master/LICENSE | |
# | |
__version__ = '0.3' | |
import io | |
import os | |
import sys | |
import bz2 | |
import glob | |
import math | |
import time | |
import pickle | |
import random | |
import argparse | |
import itertools | |
import threading | |
import collections | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
files = ['images/128.jpg'] | |
zoom=2 | |
rendering_tile=80 | |
rendering_overlap=24 | |
rendering_histogram=False | |
i_type = 'photo' | |
model = 'default' | |
train = False | |
train_scales = 0 | |
train_blur = None | |
train_noise = None | |
train_jpeg = [] | |
eopchs = 10 | |
eopch_size = 72 | |
save_every = 10 | |
batch_shape = 192 | |
batch_size = 15 | |
buffer_size = 1500 | |
buffer_fraction = 5 | |
learning_rate = 1E-4 | |
learning_period = 75 | |
learning_decay = 0.5 | |
generator_upscale = 2 | |
generator_downscale = 0 | |
generator_filters = [64] | |
generator_blocks = 4 | |
generator_residual = 2 | |
perceptual_layer = 'conv2_2' | |
perceptual_weight = 1e0 | |
discriminator_size = 32 | |
smoothness_weight = 2e5 | |
adversary_weight = 5e2 | |
generator_start = 0 | |
discriminator_start = 1 | |
adversary_start = 2 | |
device = 'cuda' | |
#---------------------------------------------------------------------------------------------------------------------- | |
# Color coded output helps visualize the information a little better, plus it looks cool! | |
class ansi: | |
WHITE = '\033[0;97m' | |
WHITE_B = '\033[1;97m' | |
YELLOW = '\033[0;33m' | |
YELLOW_B = '\033[1;33m' | |
RED = '\033[0;31m' | |
RED_B = '\033[1;31m' | |
BLUE = '\033[0;94m' | |
BLUE_B = '\033[1;94m' | |
CYAN = '\033[0;36m' | |
CYAN_B = '\033[1;36m' | |
ENDC = '\033[0m' | |
def error(message, *lines): | |
string = "\n{}ERROR: " + message + "{}\n" + "\n".join(lines) + ("{}\n" if lines else "{}") | |
print(string.format(ansi.RED_B, ansi.RED, ansi.ENDC)) | |
sys.exit(-1) | |
def warn(message, *lines): | |
string = "\n{}WARNING: " + message + "{}\n" + "\n".join(lines) + "{}\n" | |
print(string.format(ansi.YELLOW_B, ansi.YELLOW, ansi.ENDC)) | |
def extend(lst): return itertools.chain(lst, itertools.repeat(lst[-1])) | |
# Load the underlying deep learning libraries based on the device specified. If you specify THEANO_FLAGS manually, | |
# the code assumes you know what you are doing and they are not overriden! | |
os.environ.setdefault('THEANO_FLAGS', 'floatX=float32,device={},force_device=True,allow_gc=True,'\ | |
'print_active_device=False'.format(device)) | |
# Scientific & Imaging Libraries | |
import numpy as np | |
import scipy.ndimage, scipy.misc, PIL.Image | |
# Numeric Computing (GPU) | |
import theano, theano.tensor as T | |
T.nnet.softminus = lambda x: x - T.nnet.softplus(x) | |
# Support ansi colors in Windows too. | |
if sys.platform == 'win32': | |
import colorama | |
# Deep Learning Framework | |
import lasagne | |
from lasagne.layers import Conv2DLayer as ConvLayer, Deconv2DLayer as DeconvLayer, Pool2DLayer as PoolLayer | |
from lasagne.layers import InputLayer, ConcatLayer, ElemwiseSumLayer, batch_norm | |
print('{} - Using the device `{}` for neural computation.{}\n'.format(ansi.CYAN, theano.config.device, ansi.ENDC)) | |
#====================================================================================================================== | |
# Image Processing | |
#====================================================================================================================== | |
class DataLoader(threading.Thread): | |
def __init__(self, zoom): | |
super(DataLoader, self).__init__(daemon=True) | |
self.data_ready = threading.Event() | |
self.data_copied = threading.Event() | |
self.zoom = zoom | |
self.orig_shape, self.seed_shape = batch_shape, batch_shape // self.zoom | |
self.orig_buffer = np.zeros((buffer_size, 3, self.orig_shape, self.orig_shape), dtype=np.float32) | |
self.seed_buffer = np.zeros((buffer_size, 3, self.seed_shape, self.seed_shape), dtype=np.float32) | |
self.files = glob.glob(train) | |
if len(self.files) == 0: | |
error("There were no files found to train from searching for `{}`".format(train), | |
" - Try putting all your images in one folder and using `--train=data/*.jpg`") | |
self.available = set(range(buffer_size)) | |
self.ready = set() | |
self.cwd = os.getcwd() | |
self.start() | |
def run(self): | |
while True: | |
random.shuffle(self.files) | |
for f in self.files: | |
self.add_to_buffer(f) | |
def add_to_buffer(self, f): | |
filename = os.path.join(self.cwd, f) | |
try: | |
orig = PIL.Image.open(filename).convert('RGB') | |
scale = 2 ** random.randint(0, train_scales) | |
if scale > 1 and all(s//scale >= batch_shape for s in orig.size): | |
orig = orig.resize((orig.size[0]//scale, orig.size[1]//scale), resample=PIL.Image.LANCZOS) | |
if any(s < batch_shape for s in orig.size): | |
raise ValueError('Image is too small for training with size {}'.format(orig.size)) | |
except Exception as e: | |
warn('Could not load `{}` as image.'.format(filename), | |
' - Try fixing or removing the file before next run.') | |
self.files.remove(f) | |
return | |
seed = orig | |
if train_blur is not None: | |
seed = seed.filter(PIL.ImageFilter.GaussianBlur(radius=random.randint(0, args.train_blur*2))) | |
if self.zoom > 1: | |
seed = seed.resize((orig.size[0]//self.zoom, orig.size[1]//self.zoom), resample=PIL.Image.LANCZOS) | |
if len(train_jpeg) > 0: | |
buffer, rng = io.BytesIO(), train_jpeg[-1] if len(train_jpeg) > 1 else 15 | |
seed.save(buffer, format='jpeg', quality=train_jpeg[0]+random.randrange(-rng, +rng)) | |
seed = PIL.Image.open(buffer) | |
orig = scipy.misc.fromimage(orig).astype(np.float32) | |
seed = scipy.misc.fromimage(seed).astype(np.float32) | |
if train_noise is not None: | |
seed += scipy.random.normal(scale=train_noise, size=(seed.shape[0], seed.shape[1], 1)) | |
for _ in range(seed.shape[0] * seed.shape[1] // (buffer_fraction * self.seed_shape ** 2)): | |
h = random.randint(0, seed.shape[0] - self.seed_shape) | |
w = random.randint(0, seed.shape[1] - self.seed_shape) | |
seed_chunk = seed[h:h+self.seed_shape, w:w+self.seed_shape] | |
h, w = h * self.zoom, w * self.zoom | |
orig_chunk = orig[h:h+self.orig_shape, w:w+self.orig_shape] | |
while len(self.available) == 0: | |
self.data_copied.wait() | |
self.data_copied.clear() | |
i = self.available.pop() | |
self.orig_buffer[i] = np.transpose(orig_chunk.astype(np.float32) / 255.0 - 0.5, (2, 0, 1)) | |
self.seed_buffer[i] = np.transpose(seed_chunk.astype(np.float32) / 255.0 - 0.5, (2, 0, 1)) | |
self.ready.add(i) | |
if len(self.ready) >= batch_size: | |
self.data_ready.set() | |
def copy(self, origs_out, seeds_out): | |
self.data_ready.wait() | |
self.data_ready.clear() | |
for i, j in enumerate(random.sample(self.ready, batch_size)): | |
origs_out[i] = self.orig_buffer[j] | |
seeds_out[i] = self.seed_buffer[j] | |
self.available.add(j) | |
self.data_copied.set() | |
#====================================================================================================================== | |
# Convolution Networks | |
#====================================================================================================================== | |
class SubpixelReshuffleLayer(lasagne.layers.Layer): | |
"""Based on the code by ajbrock: https://github.com/ajbrock/Neural-Photo-Editor/ | |
""" | |
def __init__(self, incoming, channels, upscale, **kwargs): | |
super(SubpixelReshuffleLayer, self).__init__(incoming, **kwargs) | |
self.upscale = upscale | |
self.channels = channels | |
def get_output_shape_for(self, input_shape): | |
def up(d): return self.upscale * d if d else d | |
return (input_shape[0], self.channels, up(input_shape[2]), up(input_shape[3])) | |
def get_output_for(self, input, deterministic=False, **kwargs): | |
out, r = T.zeros(self.get_output_shape_for(input.shape)), self.upscale | |
for y, x in itertools.product(range(r), repeat=2): | |
out=T.inc_subtensor(out[:,:,y::r,x::r], input[:,r*y+x::r*r,:,:]) | |
return out | |
class Model(object): | |
def __init__(self, zoom, model): | |
self.network = collections.OrderedDict() | |
self.network['img'] = InputLayer((None, 3, None, None)) | |
self.network['seed'] = InputLayer((None, 3, None, None)) | |
self.zoom = zoom | |
self.model = model | |
config, params = self.load_model() | |
self.config = config | |
self.setup_generator(self.last_layer(), config) | |
if train: | |
concatenated = lasagne.layers.ConcatLayer([self.network['img'], self.network['out']], axis=0) | |
self.setup_perceptual(concatenated) | |
self.load_perceptual() | |
self.setup_discriminator() | |
self.load_generator(params) | |
self.compile() | |
def get_config(self): | |
return self.config | |
#------------------------------------------------------------------------------------------------------------------ | |
# Network Configuration | |
#------------------------------------------------------------------------------------------------------------------ | |
def last_layer(self): | |
return list(self.network.values())[-1] | |
def make_layer(self, name, input, units, filter_size=(3,3), stride=(1,1), pad=(1,1), alpha=0.25): | |
conv = ConvLayer(input, units, filter_size, stride=stride, pad=pad, nonlinearity=None) | |
prelu = lasagne.layers.ParametricRectifierLayer(conv, alpha=lasagne.init.Constant(alpha)) | |
self.network[name+'x'] = conv | |
self.network[name+'>'] = prelu | |
return prelu | |
def make_block(self, name, input, units): | |
self.make_layer(name+'-A', input, units, alpha=0.1) | |
# self.make_layer(name+'-B', self.last_layer(), units, alpha=1.0) | |
return ElemwiseSumLayer([input, self.last_layer()]) if self.generator_residual else self.last_layer() | |
def setup_generator(self, input, config): | |
#for k, v in config.items(): setattr(args, k, v) | |
self.generator_upscale = config['generator_upscale'] | |
self.generator_downscale = config['generator_downscale'] | |
self.generator_filters = config['generator_filters'] | |
self.generator_blocks = config['generator_blocks'] | |
self.generator_residual = config['generator_residual'] | |
self.zoom = 2**(self.generator_upscale - self.generator_downscale) | |
units_iter = extend(self.generator_filters) | |
units = next(units_iter) | |
self.make_layer('iter.0', input, units, filter_size=(7,7), pad=(3,3)) | |
for i in range(0, self.generator_downscale): | |
self.make_layer('downscale%i'%i, self.last_layer(), next(units_iter), filter_size=(4,4), stride=(2,2)) | |
units = next(units_iter) | |
for i in range(0, self.generator_blocks): | |
self.make_block('iter.%i'%(i+1), self.last_layer(), units) | |
for i in range(0,self. generator_upscale): | |
u = next(units_iter) | |
self.make_layer('upscale%i.2'%i, self.last_layer(), u*4) | |
self.network['upscale%i.1'%i] = SubpixelReshuffleLayer(self.last_layer(), u, 2) | |
self.network['out'] = ConvLayer(self.last_layer(), 3, filter_size=(7,7), pad=(3,3), nonlinearity=None) | |
def setup_perceptual(self, input): | |
"""Use lasagne to create a network of convolution layers using pre-trained VGG19 weights. | |
""" | |
offset = np.array([103.939, 116.779, 123.680], dtype=np.float32).reshape((1,3,1,1)) | |
self.network['percept'] = lasagne.layers.NonlinearityLayer(input, lambda x: ((x+0.5)*255.0) - offset) | |
self.network['mse'] = self.network['percept'] | |
self.network['conv1_1'] = ConvLayer(self.network['percept'], 64, 3, pad=1) | |
self.network['conv1_2'] = ConvLayer(self.network['conv1_1'], 64, 3, pad=1) | |
self.network['pool1'] = PoolLayer(self.network['conv1_2'], 2, mode='max') | |
self.network['conv2_1'] = ConvLayer(self.network['pool1'], 128, 3, pad=1) | |
self.network['conv2_2'] = ConvLayer(self.network['conv2_1'], 128, 3, pad=1) | |
self.network['pool2'] = PoolLayer(self.network['conv2_2'], 2, mode='max') | |
self.network['conv3_1'] = ConvLayer(self.network['pool2'], 256, 3, pad=1) | |
self.network['conv3_2'] = ConvLayer(self.network['conv3_1'], 256, 3, pad=1) | |
self.network['conv3_3'] = ConvLayer(self.network['conv3_2'], 256, 3, pad=1) | |
self.network['conv3_4'] = ConvLayer(self.network['conv3_3'], 256, 3, pad=1) | |
self.network['pool3'] = PoolLayer(self.network['conv3_4'], 2, mode='max') | |
self.network['conv4_1'] = ConvLayer(self.network['pool3'], 512, 3, pad=1) | |
self.network['conv4_2'] = ConvLayer(self.network['conv4_1'], 512, 3, pad=1) | |
self.network['conv4_3'] = ConvLayer(self.network['conv4_2'], 512, 3, pad=1) | |
self.network['conv4_4'] = ConvLayer(self.network['conv4_3'], 512, 3, pad=1) | |
self.network['pool4'] = PoolLayer(self.network['conv4_4'], 2, mode='max') | |
self.network['conv5_1'] = ConvLayer(self.network['pool4'], 512, 3, pad=1) | |
self.network['conv5_2'] = ConvLayer(self.network['conv5_1'], 512, 3, pad=1) | |
self.network['conv5_3'] = ConvLayer(self.network['conv5_2'], 512, 3, pad=1) | |
self.network['conv5_4'] = ConvLayer(self.network['conv5_3'], 512, 3, pad=1) | |
def setup_discriminator(self): | |
c = discriminator_size | |
self.make_layer('disc1.1', batch_norm(self.network['conv1_2']), 1*c, filter_size=(5,5), stride=(2,2), pad=(2,2)) | |
self.make_layer('disc1.2', self.last_layer(), 1*c, filter_size=(5,5), stride=(2,2), pad=(2,2)) | |
self.make_layer('disc2', batch_norm(self.network['conv2_2']), 2*c, filter_size=(5,5), stride=(2,2), pad=(2,2)) | |
self.make_layer('disc3', batch_norm(self.network['conv3_2']), 3*c, filter_size=(3,3), stride=(1,1), pad=(1,1)) | |
hypercolumn = ConcatLayer([self.network['disc1.2>'], self.network['disc2>'], self.network['disc3>']]) | |
self.make_layer('disc4', hypercolumn, 4*c, filter_size=(1,1), stride=(1,1), pad=(0,0)) | |
self.make_layer('disc5', self.last_layer(), 3*c, filter_size=(3,3), stride=(2,2)) | |
self.make_layer('disc6', self.last_layer(), 2*c, filter_size=(1,1), stride=(1,1), pad=(0,0)) | |
self.network['disc'] = batch_norm(ConvLayer(self.last_layer(), 1, filter_size=(1,1), | |
nonlinearity=lasagne.nonlinearities.linear)) | |
#------------------------------------------------------------------------------------------------------------------ | |
# Input / Output | |
#------------------------------------------------------------------------------------------------------------------ | |
def load_perceptual(self): | |
"""Open the serialized parameters from a pre-trained network, and load them into the model created. | |
""" | |
vgg19_file = os.path.join(os.path.dirname(__file__), 'vgg19_conv.pkl.bz2') | |
if not os.path.exists(vgg19_file): | |
error("Model file with pre-trained convolution layers not found. Download here...", | |
"https://github.com/alexjc/neural-doodle/releases/download/v0.0/vgg19_conv.pkl.bz2") | |
data = pickle.load(bz2.open(vgg19_file, 'rb')) | |
layers = lasagne.layers.get_all_layers(self.last_layer(), treat_as_input=[self.network['percept']]) | |
for p, d in zip(itertools.chain(*[l.get_params() for l in layers]), data): p.set_value(d) | |
def list_generator_layers(self): | |
for l in lasagne.layers.get_all_layers(self.network['out'], treat_as_input=[self.network['img']]): | |
if not l.get_params(): continue | |
name = list(self.network.keys())[list(self.network.values()).index(l)] | |
yield (name, l) | |
def get_filename(self, absolute=False): | |
filename = 'ne%ix-%s-%s-%s.pkl.bz2' % (self.zoom, i_type, self.model, __version__) | |
return os.path.join(os.path.dirname(__file__), filename) if absolute else filename | |
def save_generator(self): | |
def cast(p): return p.get_value().astype(np.float16) | |
params = {k: [cast(p) for p in l.get_params()] for (k, l) in self.list_generator_layers()} | |
#config = {k: getattr(args, k) for k in ['generator_blocks', 'generator_residual', 'generator_filters'] + \ | |
# ['generator_upscale', 'generator_downscale']} | |
config = {} | |
config['generator_upscale'] = self.generator_upscale | |
config['generator_downscale'] = self.generator_downscale | |
config['generator_filters'] = self.generator_filters | |
config['generator_blocks'] = self.generator_blocks | |
config['generator_residual'] = self.generator_residual | |
pickle.dump((config, params), bz2.open(self.get_filename(absolute=True), 'wb')) | |
print(' - Saved model as `{}` after training.'.format(self.get_filename())) | |
def load_model(self): | |
if not os.path.exists(self.get_filename(absolute=True)): | |
if train: return {}, {} | |
error("Model file with pre-trained convolution layers not found. Download it here...", | |
"https://github.com/alexjc/neural-enhance/releases/download/v%s/%s"%(__version__, self.get_filename())) | |
print(' - Loaded file `{}` with trained model.'.format(self.get_filename())) | |
return pickle.load(bz2.open(self.get_filename(absolute=True), 'rb')) | |
def load_generator(self, params): | |
if len(params) == 0: return | |
for k, l in self.list_generator_layers(): | |
assert k in params, "Couldn't find layer `%s` in loaded model.'" % k | |
assert len(l.get_params()) == len(params[k]), "Mismatch in types of layers." | |
for p, v in zip(l.get_params(), params[k]): | |
assert v.shape == p.get_value().shape, "Mismatch in number of parameters for layer {}.".format(k) | |
p.set_value(v.astype(np.float32)) | |
#------------------------------------------------------------------------------------------------------------------ | |
# Training & Loss Functions | |
#------------------------------------------------------------------------------------------------------------------ | |
def loss_perceptual(self, p): | |
return lasagne.objectives.squared_error(p[:batch_size], p[batch_size:]).mean() | |
def loss_total_variation(self, x): | |
return T.mean(((x[:,:,:-1,:-1] - x[:,:,1:,:-1])**2 + (x[:,:,:-1,:-1] - x[:,:,:-1,1:])**2)**1.25) | |
def loss_adversarial(self, d): | |
return T.mean(1.0 - T.nnet.softminus(d[batch_size:])) | |
def loss_discriminator(self, d): | |
return T.mean(T.nnet.softminus(d[batch_size:]) - T.nnet.softplus(d[:batch_size])) | |
def compile(self): | |
# Helper function for rendering test images during training, or standalone inference mode. | |
input_tensor, seed_tensor = T.tensor4(), T.tensor4() | |
input_layers = {self.network['img']: input_tensor, self.network['seed']: seed_tensor} | |
output = lasagne.layers.get_output([self.network[k] for k in ['seed','out']], input_layers, deterministic=True) | |
self.predict = theano.function([seed_tensor], output) | |
if not train: return | |
output_layers = [self.network['out'], self.network[perceptual_layer], self.network['disc']] | |
gen_out, percept_out, disc_out = lasagne.layers.get_output(output_layers, input_layers, deterministic=False) | |
# Generator loss function, parameters and updates. | |
self.gen_lr = theano.shared(np.array(0.0, dtype=theano.config.floatX)) | |
self.adversary_weight = theano.shared(np.array(0.0, dtype=theano.config.floatX)) | |
gen_losses = [self.loss_perceptual(percept_out) * perceptual_weight, | |
self.loss_total_variation(gen_out) * smoothness_weight, | |
self.loss_adversarial(disc_out) * self.adversary_weight] | |
gen_params = lasagne.layers.get_all_params(self.network['out'], trainable=True) | |
print(' - {} tensors learned for generator.'.format(len(gen_params))) | |
gen_updates = lasagne.updates.adam(sum(gen_losses, 0.0), gen_params, learning_rate=self.gen_lr) | |
# Discriminator loss function, parameters and updates. | |
self.disc_lr = theano.shared(np.array(0.0, dtype=theano.config.floatX)) | |
disc_losses = [self.loss_discriminator(disc_out)] | |
disc_params = list(itertools.chain(*[l.get_params() for k, l in self.network.items() if 'disc' in k])) | |
print(' - {} tensors learned for discriminator.'.format(len(disc_params))) | |
grads = [g.clip(-5.0, +5.0) for g in T.grad(sum(disc_losses, 0.0), disc_params)] | |
disc_updates = lasagne.updates.adam(grads, disc_params, learning_rate=self.disc_lr) | |
# Combined Theano function for updating both generator and discriminator at the same time. | |
updates = collections.OrderedDict(list(gen_updates.items()) + list(disc_updates.items())) | |
self.fit = theano.function([input_tensor, seed_tensor], gen_losses + [disc_out.mean(axis=(1,2,3))], updates=updates) | |
class NeuralEnhancer(object): | |
def __init__(self, model_type, zoom, loader): | |
if train: | |
print('{}Training {} epochs on random image sections with batch size {}.{}'\ | |
.format(ansi.BLUE_B, args.epochs, args.batch_size, ansi.BLUE)) | |
else: | |
if len(files) == 0: error("Specify the image(s) to enhance on the command-line.") | |
#print('{}Enhancing {} image(s) specified on the command-line.{}'\ | |
# .format(ansi.BLUE_B, len(files), ansi.BLUE)) | |
self.zoom = zoom | |
self.model_type = model_type | |
self.thread = DataLoader(self.zoom) if loader else None | |
self.model = Model(self.zoom, self.model_type) | |
config = self.model.get_config() | |
self.generator_upscale = config['generator_upscale'] | |
self.generator_downscale = config['generator_downscale'] | |
print('{}'.format(ansi.ENDC)) | |
def imsave(self, fn, img): | |
scipy.misc.toimage(np.transpose(img + 0.5, (1, 2, 0)).clip(0.0, 1.0) * 255.0, cmin=0, cmax=255).save(fn) | |
def show_progress(self, orign, scald, repro): | |
os.makedirs('valid', exist_ok=True) | |
for i in range(batch_size): | |
self.imsave('valid/%s_%03i_origin.png' % (self.model_type, i), orign[i]) | |
self.imsave('valid/%s_%03i_pixels.png' % (self.model_type, i), scald[i]) | |
self.imsave('valid/%s_%03i_reprod.png' % (self.model_type, i), repro[i]) | |
def decay_learning_rate(self): | |
l_r, t_cur = learning_rate, 0 | |
while True: | |
yield l_r | |
t_cur += 1 | |
if t_cur % learning_period == 0: l_r *= learning_decay | |
def train(self): | |
seed_size = batch_shape // self.zoom | |
images = np.zeros((batch_size, 3, batch_shape, batch_shape), dtype=np.float32) | |
seeds = np.zeros((batch_size, 3, seed_size, seed_size), dtype=np.float32) | |
learning_rate = self.decay_learning_rate() | |
try: | |
average, start = None, time.time() | |
for epoch in range(epochs): | |
total, stats = None, None | |
l_r = next(learning_rate) | |
if epoch >= generator_start: self.model.gen_lr.set_value(l_r) | |
if epoch >= discriminator_start: self.model.disc_lr.set_value(l_r) | |
for _ in range(epoch_size): | |
self.thread.copy(images, seeds) | |
output = self.model.fit(images, seeds) | |
losses = np.array(output[:3], dtype=np.float32) | |
stats = (stats + output[3]) if stats is not None else output[3] | |
total = total + losses if total is not None else losses | |
l = np.sum(losses) | |
assert not np.isnan(losses).any() | |
average = l if average is None else average * 0.95 + 0.05 * l | |
print('↑' if l > average else '↓', end='', flush=True) | |
scald, repro = self.model.predict(seeds) | |
self.show_progress(images, scald, repro) | |
total /= epoch_size | |
stats /= epoch_size | |
totals, labels = [sum(total)] + list(total), ['total', 'prcpt', 'smthn', 'advrs'] | |
gen_info = ['{}{}{}={:4.2e}'.format(ansi.WHITE_B, k, ansi.ENDC, v) for k, v in zip(labels, totals)] | |
print('\rEpoch #{} at {:4.1f}s, lr={:4.2e}{}'.format(epoch+1, time.time()-start, l_r, ' '*(epoch_size-30))) | |
print(' - generator {}'.format(' '.join(gen_info))) | |
real, fake = stats[:batch_size], stats[batch_size:] | |
print(' - discriminator', real.mean(), len(np.where(real > 0.5)[0]), | |
fake.mean(), len(np.where(fake < -0.5)[0])) | |
if epoch == adversarial_start-1: | |
print(' - generator now optimizing against discriminator.') | |
self.model.adversary_weight.set_value(adversary_weight) | |
running = None | |
if (epoch+1) % save_every == 0: | |
print(' - saving current generator layers to disk...') | |
self.model.save_generator() | |
except KeyboardInterrupt: | |
pass | |
print('\n{}Trained {}x super-resolution for {} epochs.{}'\ | |
.format(ansi.CYAN_B, self.zoom, epoch+1, ansi.CYAN)) | |
self.model.save_generator() | |
print(ansi.ENDC) | |
def match_histograms(self, A, B, rng=(0.0, 255.0), bins=64): | |
(Ha, Xa), (Hb, Xb) = [np.histogram(i, bins=bins, range=rng, density=True) for i in [A, B]] | |
X = np.linspace(rng[0], rng[1], bins, endpoint=True) | |
Hpa, Hpb = [np.cumsum(i) * (rng[1] - rng[0]) ** 2 / float(bins) for i in [Ha, Hb]] | |
inv_Ha = scipy.interpolate.interp1d(X, Hpa, bounds_error=False, fill_value='extrapolate') | |
map_Hb = scipy.interpolate.interp1d(Hpb, X, bounds_error=False, fill_value='extrapolate') | |
return map_Hb(inv_Ha(A).clip(0.0, 255.0)) | |
def process(self, original): | |
# Snap the image to a shape that's compatible with the generator (2x, 4x) | |
s = 2 ** max(self.generator_upscale, self.generator_downscale) | |
by, bx = original.shape[0] % s, original.shape[1] % s | |
original = original[by-by//2:original.shape[0]-by//2,bx-bx//2:original.shape[1]-bx//2,:] | |
# Prepare paded input image as well as output buffer of zoomed size. | |
s, p, z = rendering_tile, rendering_overlap, self.zoom | |
image = np.pad(original, ((p, p), (p, p), (0, 0)), mode='reflect') | |
output = np.zeros((original.shape[0] * z, original.shape[1] * z, 3), dtype=np.float32) | |
# Iterate through the tile coordinates and pass them through the network. | |
for y, x in itertools.product(range(0, original.shape[0], s), range(0, original.shape[1], s)): | |
img = np.transpose(image[y:y+p*2+s,x:x+p*2+s,:] / 255.0 - 0.5, (2, 0, 1))[np.newaxis].astype(np.float32) | |
*_, repro = self.model.predict(img) | |
output[y*z:(y+s)*z,x*z:(x+s)*z,:] = np.transpose(repro[0] + 0.5, (1, 2, 0))[p*z:-p*z,p*z:-p*z,:] | |
#print('.', end='', flush=True) | |
output = output.clip(0.0, 1.0) * 255.0 | |
# Match color histograms if the user specified this option. | |
if rendering_histogram: | |
for i in range(3): | |
output[:,:,i] = self.match_histograms(output[:,:,i], original[:,:,i]) | |
return scipy.misc.toimage(output, cmin=0, cmax=255) | |
if __name__ == "__main__": | |
if train: | |
zoom = 2**(generator_upscale - generator_downscale) | |
enhancer = NeuralEnhancer(model, zoom, loader=True) | |
enhancer.train() | |
else: | |
x2 = NeuralEnhancer('default', 2, loader=False) | |
x4 = NeuralEnhancer('default', 4, loader=False) | |
repair = NeuralEnhancer('repair', 1, loader=False) | |
deblur = NeuralEnhancer('deblur', 1, loader=False) | |
for filename in files: | |
print(filename, end=' ') | |
cv_img = cv2.imread(filename) | |
pil_image=cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB) | |
#img = scipy.ndimage.imread(filename, mode='RGB') | |
out = np.array(x4.process(pil_image)) | |
#out = np.array(deblur.process(out)) | |
out = np.array(repair.process(out)) | |
out = np.array(x2.process(out)) | |
opencv_image=cv2.cvtColor(out, cv2.COLOR_RGB2BGR) | |
cv2.imshow('image',opencv_image) | |
cv2.waitKey(0) | |
cv2.destroyAllWindows() | |
#out.save(os.path.splitext(filename)[0]+'_ne%ix.png' % zoom) | |
print(flush=True) | |
print(ansi.ENDC) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Original work Copyright (c) 2018, dfaker. | |
# Modified work Copyright (c) 2018, Alexis_TheLarge. | |
# | |
# | |
# Subject to Mozilla Public License | |
# See: https://github.com/dfaker/df/blob/master/LICENSE | |
# | |
import argparse | |
import cv2 | |
import json | |
import numpy | |
from pathlib import Path | |
from tqdm import tqdm | |
from scipy import ndimage | |
from model import autoencoder_A | |
from model import autoencoder_B | |
from model import encoder, decoder_A, decoder_B | |
import enhance2 | |
from enhance2 import NeuralEnhancer | |
encoder .load_weights( "models/encoder.h5" ) | |
decoder_A.load_weights( "models/decoder_A.h5" ) | |
decoder_B.load_weights( "models/decoder_B.h5" ) | |
import time | |
n=0 | |
imageSize = 256 | |
croppedSize = 240 | |
zmask = numpy.zeros((1,128, 128,1),float) | |
NEx2 = NeuralEnhancer('default', 2, loader=False) | |
NEx4 = NeuralEnhancer('default', 4, loader=False) | |
NE_deblur = NeuralEnhancer('deblur', 1, loader=False) | |
def image_stats(image): | |
(l, a, b) = cv2.split(image) | |
(lMean, lStd) = (l.mean(), l.std()) | |
(aMean, aStd) = (a.mean(), a.std()) | |
(bMean, bStd) = (b.mean(), b.std()) | |
return (lMean, lStd, aMean, aStd, bMean, bStd) | |
def adjust_avg_color(img_old,img_new): | |
w,h,c = img_new.shape | |
for i in range(img_new.shape[-1]): | |
old_avg = img_old[:, :, i].mean() | |
new_avg = img_new[:, :, i].mean() | |
diff_int = (int)(old_avg - new_avg) | |
for m in range(img_new.shape[0]): | |
for n in range(img_new.shape[1]): | |
temp = (img_new[m,n,i] + diff_int) | |
if temp < 0: | |
img_new[m,n,i] = 0 | |
elif temp > 255: | |
img_new[m,n,i] = 255 | |
else: | |
img_new[m,n,i] = temp | |
def transfer_avg_color(img_old,img_new): | |
assert(img_old.shape==img_new.shape) | |
source = cv2.cvtColor(img_old, cv2.COLOR_BGR2LAB).astype("float32") | |
target = cv2.cvtColor(img_new, cv2.COLOR_BGR2LAB).astype("float32") | |
(lMeanSrc, lStdSrc, aMeanSrc, aStdSrc, bMeanSrc, bStdSrc) = image_stats(source) | |
(lMeanTar, lStdTar, aMeanTar, aStdTar, bMeanTar, bStdTar) = image_stats(target) | |
(l, a, b) = cv2.split(target) | |
l -= lMeanTar | |
a -= aMeanTar | |
b -= bMeanTar | |
l = (lStdTar / lStdSrc) * l | |
a = (aStdTar / aStdSrc) * a | |
b = (bStdTar / bStdSrc) * b | |
l += lMeanSrc | |
a += aMeanSrc | |
b += bMeanSrc | |
l = numpy.clip(l, 0, 255) | |
a = numpy.clip(a, 0, 255) | |
b = numpy.clip(b, 0, 255) | |
transfer = cv2.merge([l, a, b]) | |
transfer = cv2.cvtColor(transfer.astype("uint8"), cv2.COLOR_LAB2BGR) | |
return transfer | |
def convert_one_image( autoencoder,otherautoencoder, image, mat,facepoints,erosion_kernel,blur_size,seamlessClone,maskType,doublePass=False ): | |
global n | |
n+=1 | |
size = 64 | |
image_size = image.shape[1], image.shape[0] | |
sourceMat = mat.copy() | |
sourceMat = sourceMat*(240+(16*2)) | |
sourceMat[:,2] += 48 | |
face = cv2.warpAffine( image, sourceMat, (240+(48+16)*2,240+(48+16)*2) ) | |
#print(face.shape) | |
sourceFace = face.copy() | |
sourceFace = cv2.resize(sourceFace,(128,128),cv2.INTER_CUBIC) | |
face = cv2.resize(face,(64,64),cv2.INTER_AREA) | |
face = numpy.expand_dims( face, 0 ) | |
new_face_rgb,new_face_m = autoencoder.predict( [face / 255.0,zmask] ) | |
if doublePass: | |
#feed the original prediction back into the network for a second round. | |
new_face_rgb = new_face_rgb.reshape((128, 128, 3)) | |
new_face_rgb = cv2.resize( new_face_rgb , (64,64)) | |
new_face_rgb = numpy.expand_dims( new_face_rgb, 0 ) | |
new_face_rgb,_ = autoencoder.predict( [new_face_rgb,zmask] ) | |
_,other_face_m = otherautoencoder.predict( [face / 255.0,zmask] ) | |
new_face_m = numpy.maximum(new_face_m, other_face_m ) | |
new_face_rgb = numpy.clip( new_face_rgb[0] * 255, 0, 255 ).astype( image.dtype ) | |
new_face_m = numpy.clip( new_face_m[0] , 0, 1 ).astype( float ) * numpy.ones((new_face_m.shape[0],new_face_m.shape[1],3)) | |
base_image = numpy.copy( image ) | |
new_image = numpy.copy( image ) | |
transmat = mat * (64-16) *16 | |
transmat[::,2] += 8*16 | |
new_face_rgb = numpy.array(NEx4.process(new_face_rgb)) | |
new_face_rgb = numpy.array(NE_deblur.process(new_face_rgb)) | |
new_face_rgb = cv2.GaussianBlur(new_face_rgb,(11,11),0) | |
new_face_rgb = numpy.array(NEx2.process(new_face_rgb)) | |
adjust_avg_color(sourceFace,new_face_rgb) | |
new_face_m = cv2.resize(new_face_m, (1024,1024)) # scale mask to same | |
cv2.warpAffine( new_face_rgb, transmat, image_size, new_image, cv2.WARP_INVERSE_MAP | cv2.INTER_LANCZOS4, cv2.BORDER_TRANSPARENT ) | |
image_mask = numpy.zeros_like(new_image, dtype=float) | |
cv2.warpAffine( new_face_m, transmat, image_size, image_mask, cv2.WARP_INVERSE_MAP | cv2.INTER_CUBIC, cv2.BORDER_TRANSPARENT ) | |
if erosion_kernel is not None: | |
image_mask = cv2.erode(image_mask, erosion_kernel, iterations = 1) | |
#slightly enlarge the mask area | |
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(3,3)) | |
image_mask = cv2.dilate(image_mask,kernel,iterations = 1) | |
if seamlessClone: | |
unitMask = numpy.clip( image_mask * 365, 0, 255 ).astype(numpy.uint8) | |
maxregion = numpy.argwhere(unitMask==255) | |
if maxregion.size > 0: | |
miny,minx = maxregion.min(axis=0)[:2] | |
maxy,maxx = maxregion.max(axis=0)[:2] | |
lenx = maxx - minx; | |
leny = maxy - miny; | |
masky = int(minx+(lenx//2)) | |
maskx = int(miny+(leny//2)) | |
new_image = cv2.seamlessClone(new_image.astype(numpy.uint8),base_image.astype(numpy.uint8),unitMask,(masky,maskx) , cv2.NORMAL_CLONE ) | |
#image_mask = cv2.GaussianBlur(image_mask,(11,11),0) | |
if blur_size!=0: | |
image_mask = cv2.GaussianBlur(image_mask,(blur_size,blur_size),0) | |
foreground = cv2.multiply(image_mask, new_image.astype(float)) | |
background = cv2.multiply(1.0 - image_mask, base_image.astype(float)) | |
output = numpy.add(background,foreground) | |
cv2.imshow("output", output.astype(numpy.uint8) ) | |
if cv2.waitKey(1)==ord('q'): | |
exit() | |
return output | |
def main( args ): | |
input_dir = Path( args.input_dir ) | |
assert input_dir.is_dir() | |
alignments = input_dir / args.alignments | |
with alignments.open() as f: | |
alignments = json.load(f) | |
output_dir = input_dir / args.output_dir | |
output_dir.mkdir( parents=True, exist_ok=True ) | |
args.direction = 'AtoB' | |
if args.direction == 'AtoB': autoencoder,otherautoencoder = autoencoder_B,autoencoder_A | |
if args.direction == 'BtoA': autoencoder,otherautoencoder = autoencoder_A,autoencoder_B | |
if args.blurSize % 2 == 0: | |
args.blurSize+=1 | |
if args.erosionKernelSize>0: | |
erosion_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(args.erosionKernelSize,args.erosionKernelSize)) | |
else: | |
erosion_kernel = None | |
for e in alignments: | |
if len(e)<4: | |
raise LookupError('This script expects new format json files with face points included.') | |
for image_file, face_file, mat,facepoints in tqdm( alignments[args.startframe::args.frameSkip] ): | |
image = cv2.imread( str( input_dir / image_file ) ) | |
face = cv2.imread( str( input_dir / face_file ) ) | |
mat = numpy.array(mat).reshape(2,3) | |
if image is None: continue | |
if face is None: continue | |
new_image = convert_one_image( autoencoder, otherautoencoder, image, mat, facepoints, erosion_kernel, args.blurSize, args.seamlessClone, args.maskType, args.doublePass) | |
output_file = output_dir / Path(image_file).name | |
cv2.imwrite( str(output_file), new_image ) | |
def str2bool(v): | |
if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
return True | |
elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
return False | |
else: | |
raise argparse.ArgumentTypeError('Boolean value expected.') | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument( "input_dir", type=str, nargs='?' ) | |
parser.add_argument( "alignments", type=str, nargs='?', default='alignments.json' ) | |
parser.add_argument( "output_dir", type=str, nargs='?', default='merged' ) | |
parser.add_argument("--seamlessClone", type=str2bool, nargs='?', const=False, default='False', help="Attempt to use opencv seamlessClone.") | |
parser.add_argument("--doublePass", type=str2bool, nargs='?', const=False, default='False', help="Pass the original prediction output back through for a second pass.") | |
parser.add_argument('--maskType', type=str, default='FaceHullAndRect' ,choices=['FaceHullAndRect','FaceHull','Rect'], help="The type of masking to use around the face.") | |
parser.add_argument( "--startframe", type=int, default='0' ) | |
parser.add_argument( "--frameSkip", type=int, default='1' ) | |
parser.add_argument( "--blurSize", type=int, default='4' ) | |
parser.add_argument( "--erosionKernelSize", type=int, default='2' ) | |
parser.add_argument( "--direction", type=str, default="AtoB", choices=["AtoB", "BtoA"]) | |
main( parser.parse_args() ) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Original work Copyright (c) 2018, dfaker. | |
# Modified work Copyright (c) 2018, Alexis_TheLarge. | |
# | |
# | |
# Subject to Mozilla Public License | |
# See: https://github.com/dfaker/df/blob/master/LICENSE | |
# | |
from keras.models import Model | |
from keras.layers import Input, Dense, Flatten, Reshape, Dropout, Add,Concatenate, Lambda | |
from keras.layers.advanced_activations import LeakyReLU | |
from keras.layers.convolutional import Conv2D | |
from keras.initializers import RandomNormal | |
from keras.optimizers import Adam | |
from pixel_shuffler import PixelShuffler | |
import tensorflow as tf | |
from keras_contrib.losses import DSSIMObjective | |
from keras import losses | |
import time | |
from keras.utils import multi_gpu_model | |
class penalized_loss(object): | |
def __init__(self,mask,lossFunc,maskProp= 1.0): | |
self.mask = mask | |
self.lossFunc=lossFunc | |
self.maskProp = maskProp | |
self.maskaskinvProp = 1-maskProp | |
def __call__(self,y_true, y_pred): | |
tro, tgo, tbo = tf.split(y_true,3, 3 ) | |
pro, pgo, pbo = tf.split(y_pred,3, 3 ) | |
tr = tro | |
tg = tgo | |
tb = tbo | |
pr = pro | |
pg = pgo | |
pb = pbo | |
m = self.mask | |
m = m*self.maskProp | |
m += self.maskaskinvProp | |
tr *= m | |
tg *= m | |
tb *= m | |
pr *= m | |
pg *= m | |
pb *= m | |
y = tf.concat([tr, tg, tb],3) | |
p = tf.concat([pr, pg, pb],3) | |
#yo = tf.stack([tro,tgo,tbo],3) | |
#po = tf.stack([pro,pgo,pbo],3) | |
return self.lossFunc(y,p) | |
optimizer = Adam( lr=5e-5, beta_1=0.5, beta_2=0.999 ) | |
IMAGE_SHAPE = (64,64,3) | |
ENCODER_DIM = 1024 | |
conv_init = RandomNormal(0, 0.02) | |
gamma_init = RandomNormal(1., 0.02) | |
def __conv_init(a): | |
print("conv_init", a) | |
k = RandomNormal(0, 0.02)(a) # for convolution kernel | |
k.conv_weight = True | |
return k | |
def upscale_ps(filters, use_norm=True): | |
def block(x): | |
x = Conv2D(filters*4, kernel_size=3, use_bias=False, kernel_initializer=RandomNormal(0, 0.02), padding='same' )(x) | |
x = LeakyReLU(0.1)(x) | |
x = PixelShuffler()(x) | |
return x | |
return block | |
def res_block(input_tensor, f): | |
x = input_tensor | |
x = Conv2D(f, kernel_size=3, kernel_initializer=conv_init, use_bias=False, padding="same")(x) | |
x = LeakyReLU(alpha=0.2)(x) | |
x = Conv2D(f, kernel_size=3, kernel_initializer=conv_init, use_bias=False, padding="same")(x) | |
x = Add()([x, input_tensor]) | |
x = LeakyReLU(alpha=0.2)(x) | |
return x | |
def conv( filters ): | |
def block(x): | |
x = Conv2D( filters, kernel_size=5, strides=2, padding='same' )(x) | |
x = LeakyReLU(0.1)(x) | |
return x | |
return block | |
def upscale( filters ): | |
def block(x): | |
x = Conv2D( filters*4, kernel_size=3, padding='same' )(x) | |
x = LeakyReLU(0.1)(x) | |
x = PixelShuffler()(x) | |
return x | |
return block | |
def Encoder(): | |
input_ = Input( shape=IMAGE_SHAPE ) | |
x = conv( 128)(input_) | |
x = conv( 256)(x) | |
x = conv( 512)(x) | |
x = conv(1024)(x) | |
x = Dense( ENCODER_DIM )( Flatten()(x) ) | |
x = Dense(4*4*1024)(x) | |
x = Reshape((4,4,1024))(x) | |
x = upscale(512)(x) | |
return Model( input_, [x] ) | |
def Decoder(name): | |
input_ = Input( shape=(8,8,512) ) | |
skip_in = Input( shape=(8,8,512) ) | |
x = input_ | |
x = upscale(512)(x) | |
x = res_block(x, 512) | |
x = upscale(256)(x) | |
x = res_block(x, 256) | |
x = upscale(128)(x) | |
x = res_block(x, 128) | |
x = upscale(64)(x) | |
x = Conv2D( 3, kernel_size=5, padding='same', activation='sigmoid' )(x) | |
y = input_ | |
y = upscale(512)(y) | |
y = upscale(256)(y) | |
y = upscale(128)(y) | |
y = upscale(64)(y) | |
y = Conv2D( 1, kernel_size=5, padding='same', activation='sigmoid' )(y) | |
return Model( [input_], outputs=[x,y] ) | |
### ensure sure we have enough vram left to run NE model | |
import os | |
import keras.backend.tensorflow_backend as KTF | |
def get_session(gpu_fraction=0.8): | |
num_threads = os.environ.get('OMP_NUM_THREADS') | |
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=gpu_fraction) | |
if num_threads: | |
return tf.Session(config=tf.ConfigProto( | |
gpu_options=gpu_options, intra_op_parallelism_threads=num_threads)) | |
else: | |
return tf.Session(config=tf.ConfigProto(gpu_options=gpu_options)) | |
KTF.set_session(get_session()) | |
### | |
encoder = Encoder() | |
decoder_A = Decoder('MA') | |
decoder_B = Decoder('MB') | |
print(encoder.summary()) | |
print(decoder_A.summary()) | |
x1 = Input( shape=IMAGE_SHAPE ) | |
x2 = Input( shape=IMAGE_SHAPE ) | |
m1 = Input( shape=(64*2,64*2,1) ) | |
m2 = Input( shape=(64*2,64*2,1) ) | |
autoencoder_A = Model( [x1,m1], decoder_A( encoder(x1) ) ) | |
#autoencoder_A = multi_gpu_model( autoencoder_A ,2) | |
autoencoder_B = Model( [x2,m2], decoder_B( encoder(x2) ) ) | |
#autoencoder_B = multi_gpu_model( autoencoder_B ,2) | |
o1,om1 = decoder_A( encoder(x1)) | |
o2,om2 = decoder_B( encoder(x2)) | |
DSSIM = DSSIMObjective() | |
autoencoder_A.compile( optimizer=optimizer, loss=[ penalized_loss(m1, DSSIM),'mse'] ) | |
autoencoder_B.compile( optimizer=optimizer, loss=[ penalized_loss(m2, DSSIM),'mse'] ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Do you have any examples of this working?