yunjey

model version 0.1

import tensorflow as tf
from ops import *
class DTN(object):
"""Domain Transfer Network for unsupervised cross-domain image generation
Construct discriminator and generator to prepare for training.
"""
def __init__(self, batch_size=100, learning_rate=0.0002, image_size=32, output_size=32,
dim_color=3, dim_fout=100, dim_df=64, dim_gf=64, dim_ff=64):
"""
Args:
learning_rate: (optional) learning rate for discriminator and generator
image_size: (optional) spatial size of input image for discriminator
output_size: (optional) spatial size of image generated by generator
dim_color: (optional) dimension of image color; default is 3 for rgb
dim_fout: (optional) dimension of z (random input vector for generator)
dim_df: (optional) dimension of discriminator's filter in first convolution layer
dim_gf: (optional) dimension of generator's filter in last convolution layer
dim_ff: (optional) dimension of function f's filter in first convolution layer
"""
# hyper parameters
self.batch_size = batch_size
self.learning_rate = learning_rate
self.image_size = image_size
self.output_size = output_size
self.dim_color = dim_color
self.dim_fout = dim_fout
self.dim_df = dim_df
self.dim_gf = dim_gf
self.dim_ff = dim_ff
# placeholder
self.images = tf.placeholder(tf.float32, shape=[batch_size, image_size, image_size, dim_color], name='images')
#self.z = tf.placeholder(tf.float32, shape=[None, dim_z], name='input_for_generator')
# batch normalization layer for discriminator, generator and funtion f
self.d_bn1 = batch_norm(name='d_bn1')
self.d_bn2 = batch_norm(name='d_bn2')
self.d_bn3 = batch_norm(name='d_bn3')
self.g_bn1 = batch_norm(name='g_bn1')
self.g_bn2 = batch_norm(name='g_bn2')
self.g_bn3 = batch_norm(name='g_bn3')
self.g_bn4 = batch_norm(name='g_bn4')
self.f_bn1 = batch_norm(name='f_bn1')
self.f_bn2 = batch_norm(name='f_bn2')
self.f_bn3 = batch_norm(name='f_bn3')
self.f_bn4 = batch_norm(name='f_bn4')
def function_f(self, images, reuse=False):
"""f consistancy
Args:
images: images for domain S and T, of shape (batch_size, image_size, image_size, dim_color)
Returns:
out: output vectors, of shape (batch_size, dim_f_out)
"""
with tf.variable_scope('function_f', reuse=reuse):
h1 = lrelu(conv2d(images, self.dim_ff, name='f_h1')) # (batch_size, 16, 16, 64)
h2 = lrelu(self.d_bn1(conv2d(h1, self.dim_ff*2, name='f_h2'))) # (batch_size, 8, 8 128)
h3 = lrelu(self.d_bn2(conv2d(h2, self.dim_ff*4, name='f_h3'))) # (batch_size, 4, 4, 256)
h4 = lrelu(self.d_bn3(conv2d(h3, self.dim_ff*8, name='f_h4'))) # (batch_size, 2, 2, 512)
h4 = tf.reshape(h4, [self.batch_size,-1])
out = linear(h4, self.dim_fout, name='f_out')
return tf.nn.tanh(out)
def generator(self, z, reuse=False):
"""Generator: Deconvolutional neural network with relu activations.
Last deconv layer does not use batch normalization.
Args:
z: random input vectors, of shape (batch_size, dim_z)
Returns:
out: generated images, of shape (batch_size, image_size, image_size, dim_color)
"""
if reuse:
train = False
else:
train = True
with tf.variable_scope('generator', reuse=reuse):
# spatial size for convolution
s = self.output_size
s2, s4, s8, s16 = s/2, s/4, s/8, s/16 # 32, 16, 8, 4
# project and reshape z
h1= linear(z, s16*s16*self.dim_gf*8, name='g_h1') # (batch_size, 2*2*512)
h1 = tf.reshape(h1, [-1, s16, s16, self.dim_gf*8]) # (batch_size, 2, 2, 512)
h1 = relu(self.g_bn1(h1, train=train))
h2 = deconv2d(h1, [self.batch_size, s8, s8, self.dim_gf*4], name='g_h2') # (batch_size, 4, 4, 256)
h2 = relu(self.g_bn2(h2, train=train))
h3 = deconv2d(h2, [self.batch_size, s4, s4, self.dim_gf*2], name='g_h3') # (batch_size, 8, 8, 128)
h3 = relu(self.g_bn3(h3, train=train))
h4 = deconv2d(h3, [self.batch_size, s2, s2, self.dim_gf], name='g_h4') # (batch_size, 16, 16, 64)
h4 = relu(self.g_bn4(h4, train=train))
out = deconv2d(h4, [self.batch_size, s, s, self.dim_color], name='g_out') # (batch_size, 32, 32, dim_color)
return tf.nn.tanh(out)
def discriminator(self, images, reuse=False):
"""Discrimator: Convolutional neural network with leaky relu activations.
First conv layer does not use batch normalization.
Args:
images: real or fake images of shape (batch_size, image_size, image_size, dim_color)
Returns:
out: scores for whether it is a real image or a fake image, of shape (batch_size,)
"""
with tf.variable_scope('discriminator', reuse=reuse):
# convolution layer
h1 = lrelu(conv2d(images, self.dim_df, name='d_h1')) # (batch_size, 16, 16, 64)
h2 = lrelu(self.d_bn1(conv2d(h1, self.dim_df*2, name='d_h2'))) # (batch_size, 8, 8, 128)
h3 = lrelu(self.d_bn2(conv2d(h2, self.dim_df*4, name='d_h3'))) # (batch_size, 4, 4, 256)
h4 = lrelu(self.d_bn3(conv2d(h3, self.dim_df*8, name='d_h4'))) # (batch_size, 2, 2, 512)
# fully connected layer
h4 = tf.reshape(h4, [self.batch_size, -1])
out = linear(h4, 1, name='d_out') # (batch_size,)
return out
def build_model(self):
# construct generator and discriminator for training phase
self.f_x = self.function_f(self.images)
self.fake_images = self.generator(self.f_x) # (batch_size, 32, 32, 3)
self.logits_real = self.discriminator(self.images) # (batch_size,)
self.logits_fake = self.discriminator(self.fake_images, reuse=True) # (batch_size,)
self.fgf_x = self.function_f(self.fake_images, reuse=True) # (batch_size, dim_f)
# construct generator for test phase
self.sampled_images = self.generator(self.f_x, reuse=True) # (batch_size, 32, 32, 3)
# compute loss
self.d_loss_real = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(self.logits_real, tf.ones_like(self.logits_real)))
self.d_loss_fake = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(self.logits_fake, tf.zeros_like(self.logits_fake)))
self.d_loss = self.d_loss_real + self.d_loss_fake
self.g_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(self.logits_fake, tf.ones_like(self.logits_fake)))
self.g_const_loss = tf.reduce_mean(tf.square(self.images - self.fake_images)) # L_TID
self.f_const_loss = tf.reduce_mean(tf.square(self.f_x - self.fgf_x)) # L_CONST
# divide variables for discriminator and generator
t_vars = tf.trainable_variables()
self.d_vars = [var for var in t_vars if 'discriminator' in var.name]
self.g_vars = [var for var in t_vars if 'generator' in var.name]
self.f_vars = [var for var in t_vars if 'function_f' in var.name]
# optimizer for discriminator and generator
with tf.name_scope('optimizer'):
self.d_optimizer_real = tf.train.AdamOptimizer(self.learning_rate, beta1=0.5).minimize(self.d_loss_real, var_list=self.d_vars)
self.d_optimizer_fake = tf.train.AdamOptimizer(self.learning_rate, beta1=0.5).minimize(self.d_loss_fake, var_list=self.d_vars)
self.g_optimizer = tf.train.AdamOptimizer(self.learning_rate, beta1=0.5).minimize(self.g_loss, var_list=self.g_vars+self.f_vars)
self.g_optimizer_const = tf.train.AdamOptimizer(self.learning_rate, beta1=0.5).minimize(self.g_const_loss, var_list=self.g_vars+self.f_vars)
self.f_optimizer_const = tf.train.AdamOptimizer(self.learning_rate, beta1=0.5).minimize(self.f_const_loss, var_list=self.f_vars+self.g_vars)
# summary ops for tensorboard visualization
tf.scalar_summary('d_loss_real', self.d_loss_real)
tf.scalar_summary('d_loss_fake', self.d_loss_fake)
tf.scalar_summary('d_loss', self.d_loss)
tf.scalar_summary('g_loss', self.g_loss)
tf.scalar_summary('g_const_loss', self.g_const_loss)
tf.scalar_summary('f_const_loss', self.f_const_loss)
tf.image_summary('original_images', self.images, max_images=6)
tf.image_summary('sampled_images', self.sampled_images, max_images=6)
for var in tf.trainable_variables():
tf.histogram_summary(var.op.name, var)
self.summary_op = tf.merge_all_summaries()
self.saver = tf.train.Saver()
\ No newline at end of file
import tensorflow as tf
class batch_norm(object):
"""Computes batch normalization operation
Args:
x: input tensor of shape (batch_size, width, height, channels_in) or (batch_size, dim_in)
train: True or False; At train mode, it normalizes the input with mini-batch statistics
At test mode, it normalizes the input with the moving averages and variances
Returns:
out: batch normalized output of the same shape with x
"""
def __init__(self, name):
self.name = name
def __call__(self, x, train=True):
out = tf.contrib.layers.batch_norm(x, decay=0.99, center=True, scale=True, activation_fn=None,
updates_collections=None, is_training=train, scope=self.name)
return out
def conv2d(x, channel_out, k_w=5, k_h=5, s_w=2, s_h=2, name=None):
"""Computes convolution operation
Args:
x: input tensor of shape (batch_size, width_in, heigth_in, channel_in)
channel_out: number of channel for output tensor
k_w: kernel width size; default is 5
k_h: kernel height size; default is 5
s_w: stride size for width; default is 2
s_h: stride size for heigth; default is 2
Returns:
out: output tensor of shape (batch_size, width_out, height_out, channel_out)
"""
channel_in = x.get_shape()[-1]
with tf.variable_scope(name):
w = tf.get_variable('w', shape=[k_w, k_h, channel_in, channel_out],
initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable('b', shape=[channel_out], initializer=tf.constant_initializer(0.0))
out = tf.nn.conv2d(x, w, strides=[1, s_w, s_h, 1], padding='SAME') + b
return out
def deconv2d(x, output_shape, k_w=5, k_h=5, s_w=2, s_h=2, name=None):
"""Computes deconvolution operation
Args:
x: input tensor of shape (batch_size, width_in, height_in, channel_in)
output_shape: list corresponding to [batch_size, width_out, height_out, channel_out]
k_w: kernel width size; default is 5
k_h: kernel height size; default is 5
s_w: stride size for width; default is 2
s_h: stride size for heigth; default is 2
Returns:
out: output tensor of shape (batch_size, width_out, hegith_out, channel_out)
"""
channel_in = x.get_shape()[-1]
channel_out = output_shape[-1]
with tf.variable_scope(name):
w = tf.get_variable('w', shape=[k_w, k_h, channel_out, channel_in],
initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable('b', shape=[channel_out], initializer=tf.constant_initializer(0.0))
out = tf.nn.conv2d_transpose(x, filter=w, output_shape=output_shape, strides=[1, s_w, s_h, 1]) + b
return out
def linear(x, dim_out, name=None):
"""Computes linear transform (fully-connected layer)
Args:
x: input tensor of shape (batch_size, dim_in)
dim_out: dimension for output tensor
Returns:
out: output tensor of shape (batch_size, dim_out)
"""
dim_in = x.get_shape()[-1]
with tf.variable_scope(name):
w = tf.get_variable('w', shape=[dim_in, dim_out], initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable('b', shape=[dim_out], initializer=tf.constant_initializer(0.0))
out = tf.matmul(x, w) + b
return out
def relu(x):
return tf.nn.relu(x)
def lrelu(x, leak=0.2):
return tf.maximum(x, leak*x)
\ No newline at end of file
import tensorflow as tf
import numpy as np
import os
import scipy.io
import hickle
from scipy import ndimage
class Solver(object):
"""Load dataset and train DCGAN"""
def __init__(self, model, num_epoch=10, mnist_path= 'mnist/', svhn_path='svhn/', model_save_path='model/', log_path='log/'):
self.model = model
self.num_epoch = num_epoch
self.mnist_path = mnist_path
self.svhn_path = svhn_path
self.model_save_path = model_save_path
self.log_path = log_path
# create directory if not exists
if not os.path.exists(log_path):
os.makedirs(log_path)
if not os.path.exists(model_save_path):
os.makedirs(model_save_path)
# construct the dcgan model
model.build_model()
# load dataset
self.svhn = self.load_svhn(self.svhn_path)
self.mnist = self.load_mnist(self.mnist_path)
def load_svhn(self, image_path, split='train'):
print ('loading svhn image dataset..')
if split == 'train':
svhn = scipy.io.loadmat(os.path.join(image_path, 'train_32x32.mat'))
else:
svhn = scipy.io.loadmat(os.path.join(image_path, 'test_32x32.mat'))
images = np.transpose(svhn['X'], [3, 0, 1, 2])
images = images / 127.5 - 1
print ('finished loading svhn image dataset..!')
return images
def load_mnist(self, image_path, split='train'):
print ('loading mnist image dataset..')
if split == 'train':
image_file = os.path.join(image_path, 'train.images.hkl')
else:
image_file = os.path.join(image_path, 'test.images.hkl')
images = hickle.load(image_file)
images = images / 127.5 - 1
print ('finished loading mnist image dataset..!')
return images
def train(self):
model=self.model
#load image dataset
svhn = self.svhn
mnist = self.mnist
num_iter_per_epoch = int(mnist.shape[0] / model.batch_size)
config = tf.ConfigProto(allow_soft_placement = True)
config.gpu_options.allow_growth = True
with tf.Session(config=config) as sess:
# initialize parameters
tf.initialize_all_variables().run()
summary_writer = tf.train.SummaryWriter(logdir=self.log_path, graph=tf.get_default_graph())
for e in range(self.num_epoch):
for i in range(num_iter_per_epoch):
# train model for domain S
image_batch = svhn[i*model.batch_size:(i+1)*model.batch_size]
feed_dict = {model.images: image_batch}
sess.run(model.d_optimizer_fake, feed_dict)
sess.run(model.f_optimizer_const, feed_dict)
sess.run(model.g_optimizer, feed_dict)
if i % 10 == 0:
feed_dict = {model.images: image_batch}
summary, d_loss, g_loss = sess.run([model.summary_op, model.d_loss, model.g_loss], feed_dict)
summary_writer.add_summary(summary, e*num_iter_per_epoch + i)
print ('Epoch: [%d] Step: [%d/%d] d_loss: [%.6f] g_loss: [%.6f]' %(e+1, i+1, num_iter_per_epoch, d_loss, g_loss))
# train model for domain T
image_batch = mnist[i*model.batch_size:(i+1)*model.batch_size]
feed_dict = {model.images: image_batch}
sess.run(model.d_optimizer_real, feed_dict)
sess.run(model.d_optimizer_fake, feed_dict)
sess.run(model.g_optimizer, feed_dict)
sess.run(model.g_optimizer_const, feed_dict)
if i % 500 == 0:
model.saver.save(sess, os.path.join(self.model_save_path, 'dcgan-%d' %(e+1)), global_step=i+1)
print ('model/dcgan-%d-%d saved' %(e+1, i+1))
\ No newline at end of file
from model import DTN
from solver import Solver
def main():
model = DTN()
solver = Solver(model, num_epoch=10, svhn_path='svhn/', model_save_path='model/', log_path='log/')
solver.train()
if __name__ == "__main__":
main()
\ No newline at end of file