LoginSignup
0
0

More than 5 years have passed since last update.

Domain Adaptation (labeled target domain data) の実装に関するメモ

Last updated at Posted at 2018-08-18

Data

images_train = mnist.train.images
labels_train = mnist.train.labels
images_test = mnist.test.images
labels_test = mnist.test.labels

images_train_3d = np.reshape(images_train, [-1, 28, 28])
images_test_3d = np.reshape(images_test, [-1, 28, 28])

images_flip_train_3d = images_train_3d[:, :, ::-1]
images_flip_test_3d = images_test_3d[:, :, ::-1]
images_flip_train = np.reshape(images_flip_train_3d, [-1, 28*28])
images_flip_test = np.reshape(images_flip_test_3d, [-1, 28*28])

image.png

Sample Code: Target domain data are labeled.

class DA_4():
  def __init__(self):
    pass

  def weight_variable(self, name, shape):
    initializer = tf.truncated_normal_initializer(mean = 0.0, stddev = 0.01, dtype = tf.float32)
    return tf.get_variable(name, shape, initializer = initializer)

  def bias_variable(self, name, shape):
    initializer = tf.constant_initializer(value = 0.0, dtype = tf.float32)
    return tf.get_variable(name, shape, initializer = initializer)

  def alpha_variable(self, name):
    initializer = tf.constant_initializer(value = 0.75, dtype = tf.float32)
    return tf.get_variable(name, shape = (), initializer = initializer)

  def F_0(self, x, filter_size, n_filters_1, n_filters_2, n_fc, keep_prob, reuse = False):
    x_reshaped = tf.reshape(x, [-1, 28, 28, 1])

    with tf.variable_scope('F_0', reuse = reuse):
      w_1 = self.weight_variable('w_1', [filter_size, filter_size, 1, n_filters_1])
      b_1 = self.bias_variable('b_1', [n_filters_1])

      # conv
      conv_1 = tf.nn.conv2d(x_reshaped, w_1, strides = [1, 2, 2, 1], padding = 'SAME') + b_1

      # batch norm
      #batch_mean, batch_var = tf.nn.moments(conv_1, [0, 1, 2])
      #conv_1 = (conv_1 - batch_mean) / (tf.sqrt(batch_var) + 1e-10)

      # relu
      conv_1 = tf.nn.relu(conv_1)

      # max_pool
      #conv_1 = tf.nn.max_pool(conv_1, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = 'SAME')

      w_2 = self.weight_variable('w_2', [filter_size, filter_size, n_filters_1, n_filters_2])
      b_2 = self.bias_variable('b_2', [n_filters_2])

      # conv
      conv_2 = tf.nn.conv2d(conv_1, w_2, strides = [1, 2, 2, 1], padding = 'SAME') + b_2

      # batch norm
      batch_mean, batch_var = tf.nn.moments(conv_2, [0, 1, 2])
      conv_2 = (conv_2 - batch_mean) / (tf.sqrt(batch_var) + 1e-10)

      # relu
      conv_2 = tf.nn.relu(conv_2)

      # max_pool
      #conv_2 = tf.nn.max_pool(conv_2, ksize = [1, 2, 2, 1], strides = [1, 2, 2, 1], padding = 'SAME')

      conv_2_flat = tf.reshape(conv_2, [-1, 7 * 7 * n_filters_2])

      w_3 = self.weight_variable('w_3', [7 * 7 * n_filters_2, n_fc])
      b_3 = self.bias_variable('b_3', [n_fc])

      fc_1 = tf.nn.relu(tf.matmul(conv_2_flat, w_3) + b_3)

      # batch norm
      batch_mean, batch_var = tf.nn.moments(fc_1, [0])

      fc_1 = (fc_1 - batch_mean) / (tf.sqrt(batch_var) + 1e-10)

      # dropout
      #fc_1 = tf.nn.dropout(fc_1, keep_prob)

      feature = fc_1

    return feature

  def F_1(self, x, n_fc_1, n_fc_2, keep_prob, reuse = False):

    with tf.variable_scope('F_1', reuse = reuse):
      w_1 = self.weight_variable('w_1', [n_fc_1, n_fc_2])
      b_1 = self.bias_variable('b_1', [n_fc_2])

      fc_1 = tf.nn.relu(tf.matmul(x, w_1) + b_1)

      # batch norm
      batch_mean, batch_var = tf.nn.moments(fc_1, [0])
      fc_1 = (fc_1 - batch_mean) / (tf.sqrt(batch_var) + 1e-10)

      # dropout
      #fc_1 = tf.nn.dropout(fc_1, keep_prob)

      w_2 = self.weight_variable('w_2', [n_fc_2, 10])
      b_2 = self.bias_variable('b_2', [10])

      fc_2 = tf.matmul(fc_1, w_2) + b_2
      logits = fc_2

    return logits

  def F_2(self, x, n_fc_1, n_fc_2, keep_prob, reuse = False):

    with tf.variable_scope('F_2', reuse = reuse):
      w_1 = self.weight_variable('w_1', [n_fc_1, n_fc_2])
      b_1 = self.bias_variable('b_1', [n_fc_2])

      fc_1 = tf.nn.relu(tf.matmul(x, w_1) + b_1)

      # batch norm
      batch_mean, batch_var = tf.nn.moments(fc_1, [0])
      fc_1 = (fc_1 - batch_mean) / (tf.sqrt(batch_var) + 1e-10)

      # dropout
      #fc_1 = tf.nn.dropout(fc_1, keep_prob)

      w_2 = self.weight_variable('w_2', [n_fc_2, 10])
      b_2 = self.bias_variable('b_2', [10])

      fc_2 = tf.matmul(fc_1, w_2) + b_2
      logits = fc_2

    return logits

  def loss_cross_entropy(self, y, t):
    cross_entropy = - tf.reduce_mean(tf.reduce_sum(t * tf.log(tf.clip_by_value(y, 1e-10, 1.0)), axis = 1))
    return cross_entropy

  def loss_entropy(self, p):
    entropy = - tf.reduce_mean(tf.reduce_sum(p * tf.log(tf.clip_by_value(p, 1e-10, 1.0)), axis = 1))
    return entropy

  def loss_mutual_info(self, p):
    p_ave = tf.reduce_mean(p, axis = 0)
    h_y = -tf.reduce_sum(p_ave * tf.log(p_ave + 1e-16))
    h_y_x = - tf.reduce_mean(tf.reduce_sum(p * tf.log(tf.clip_by_value(p, 1e-10, 1.0)), axis = 1))
    mutual_info = h_y - h_y_x
    return -mutual_info

  def accuracy(self, y, t):
    correct_preds = tf.equal(tf.argmax(y, axis = 1), tf.argmax(t, axis = 1))
    accuracy = tf.reduce_mean(tf.cast(correct_preds, tf.float32))
    return accuracy

  def training(self, loss, learning_rate, var_list):
    optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
    #optimizer = tf.train.GradientDescentOptimizer(learning_rate = learning_rate)
    train_step = optimizer.minimize(loss, var_list = var_list)
    return train_step

  def training_clipped(self, loss, learning_rate, clip_norm, var_list):
    optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)

    grads_and_vars = optimizer.compute_gradients(loss, var_list = var_list)
    clipped_grads_and_vars = [(tf.clip_by_norm(grad, clip_norm = clip_norm), \
                             var) for grad, var in grads_and_vars]
    train_step = optimizer.apply_gradients(clipped_grads_and_vars)

    return train_step

  # train 0 and 1
  def fit_0_1(self, images_train, labels_train, images_test, labels_test, \
              filter_size, n_filters_1, n_filters_2, n_fc_0, n_fc_1, \
              learning_rate, n_iter, batch_size, show_step, is_saving, model_path):

    tf.reset_default_graph()

    x = tf.placeholder(shape = [None, 28 * 28], dtype = tf.float32)
    y = tf.placeholder(shape = [None, 10], dtype = tf.float32)
    keep_prob = tf.placeholder(shape = (), dtype = tf.float32)

    feat = self.F_0(x, filter_size, n_filters_1, n_filters_2, n_fc_0, \
                      keep_prob, reuse = False)
    logits = self.F_1(feat, n_fc_0, n_fc_1, keep_prob, reuse = False)
    probs = tf.nn.softmax(logits)
    loss = self.loss_cross_entropy(probs, y)

    var_list_F_0 = tf.trainable_variables('F_0')
    var_list_F_1 = tf.trainable_variables('F_1')
    var_list = var_list_F_0 + var_list_F_1

    # Without Gradient Clipping
    train_step = self.training(loss, learning_rate, var_list)
    # With Gradient Clipping
    #train_step = self.training_clipped(loss, learning_rate, 0.1)

    acc =  self.accuracy(probs, y)

    init = tf.global_variables_initializer()
    saver = tf.train.Saver()

    with tf.Session() as sess:

      sess.run(init)

      history_loss_train = []
      history_loss_test = []

      history_acc_train = []
      history_acc_test = []

      for i in range(n_iter):
        # Train
        rand_index = np.random.choice(len(images_train), size = batch_size)
        x_batch = images_train[rand_index]
        y_batch = labels_train[rand_index]

        feed_dict = {x: x_batch, y: y_batch, keep_prob: 0.7}

        sess.run(train_step, feed_dict = feed_dict)

        temp_loss = sess.run(loss, feed_dict = feed_dict)
        temp_acc = sess.run(acc, feed_dict = feed_dict)

        history_loss_train.append(temp_loss)
        history_acc_train.append(temp_acc)

        if (i + 1) % show_step == 0:
          print ('-' * 15)
          print ('Iteration: ' + str(i + 1) + '  Loss: ' + str(temp_loss) + \
                '  Accuracy: ' + str(temp_acc))

        # Test
        rand_index = np.random.choice(len(images_test), size = batch_size)
        x_batch = images_test[rand_index]
        y_batch = labels_test[rand_index]

        feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}

        temp_loss = sess.run(loss, feed_dict = feed_dict)
        temp_acc = sess.run(acc, feed_dict = feed_dict)

        history_loss_test.append(temp_loss)
        history_acc_test.append(temp_acc)

      if is_saving:
        model_path = saver.save(sess, model_path)
        print ('done saving at ', model_path)

    print ('-'* 15)    
    fig = plt.figure(figsize = (10, 3))
    ax1 = fig.add_subplot(1, 2, 1)
    ax1.plot(range(n_iter), history_loss_train, 'b-', label = 'Train')
    ax1.plot(range(n_iter), history_loss_test, 'r--', label = 'Test')
    ax1.set_title('Loss')
    ax1.legend(loc = 'upper right')

    ax2 = fig.add_subplot(1, 2, 2)
    ax2.plot(range(n_iter), history_acc_train, 'b-', label = 'Train')
    ax2.plot(range(n_iter), history_acc_test, 'r--', label = 'Test')
    ax2.set_ylim(0.0, 1.0)
    ax2.set_title('Accuracy')
    ax2.legend(loc = 'lower right')

    plt.show()

  # train 0 and 1 first then 2  
  def fit_0_1_2(self, images_train_s, labels_train_s, images_test_s, labels_test_s, \
                images_train_t, labels_train_t, images_test_t, labels_test_t, \
                filter_size, n_filters_1, n_filters_2, n_fc_0, n_fc_1, n_fc_2, \
                learning_rate, n_iter, batch_size, show_step, is_saving, model_path):

    tf.reset_default_graph()

    x = tf.placeholder(shape = [None, 28 * 28], dtype = tf.float32)
    y = tf.placeholder(shape = [None, 10], dtype = tf.float32)
    keep_prob = tf.placeholder(shape = (), dtype = tf.float32)

    feat = self.F_0(x, filter_size, n_filters_1, n_filters_2, n_fc_0, \
                      keep_prob, reuse = False)

    logits_1 = self.F_1(feat, n_fc_0, n_fc_1, keep_prob, reuse = False)
    probs_1 = tf.nn.softmax(logits_1)
    loss_1 = self.loss_cross_entropy(probs_1, y)

    logits_2 = self.F_2(feat, n_fc_0, n_fc_2, keep_prob, reuse = False)
    probs_2 = tf.nn.softmax(logits_2)
    loss_2 = self.loss_cross_entropy(probs_2, y)

    var_list_0 = tf.trainable_variables('F_0')
    var_list_1 = tf.trainable_variables('F_1')
    var_list_2 = tf.trainable_variables('F_2')

    var_list_0_1 = var_list_0 + var_list_1
    #var_list_0_2 = var_list_0 + var_list_2

    # Without Gradient Clipping
    train_step_1 = self.training(loss_1, learning_rate, var_list_0_1)
    train_step_2 = self.training(loss_2, learning_rate, var_list_2)

    # With Gradient Clipping
    #train_step_1 = self.training_clipped(loss_1, learning_rate, 0.1, var_list_0_1)
    #train_step_2 = self.training_clipped(loss_2, learning_rate, 0.1, var_list_0_2)

    acc_1 =  self.accuracy(probs_1, y)
    acc_2 =  self.accuracy(probs_2, y)

    init = tf.global_variables_initializer()
    saver = tf.train.Saver()

    with tf.Session() as sess:

      sess.run(init)

      # for 0 and 1
      history_loss_train_1 = []
      history_loss_test_1 = []

      history_acc_train_1 = []
      history_acc_test_1 = []

      for i in range(n_iter):
        # Train
        rand_index = np.random.choice(len(images_train_s), size = batch_size)
        x_batch = images_train_s[rand_index]
        y_batch = labels_train_s[rand_index]

        feed_dict = {x: x_batch, y: y_batch, keep_prob: 0.7}

        sess.run(train_step_1, feed_dict = feed_dict)

        temp_loss = sess.run(loss_1, feed_dict = feed_dict)
        temp_acc = sess.run(acc_1, feed_dict = feed_dict)

        history_loss_train_1.append(temp_loss)
        history_acc_train_1.append(temp_acc)

        if (i + 1) % show_step == 0:
          print ('-' * 15)
          print ('Iteration: ' + str(i + 1) + '  Loss: ' + str(temp_loss) + \
                '  Accuracy: ' + str(temp_acc))

        # Test
        rand_index = np.random.choice(len(images_test_s), size = batch_size)
        x_batch = images_test_s[rand_index]
        y_batch = labels_test_s[rand_index]

        feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}

        temp_loss = sess.run(loss_1, feed_dict = feed_dict)
        temp_acc = sess.run(acc_1, feed_dict = feed_dict)

        history_loss_test_1.append(temp_loss)
        history_acc_test_1.append(temp_acc)

      if is_saving:
        model_path = saver.save(sess, model_path)
        print ('done saving at ', model_path)

      print ('-'* 15)    
      fig = plt.figure(figsize = (10, 3))
      ax1 = fig.add_subplot(1, 2, 1)
      ax1.plot(range(n_iter), history_loss_train_1, 'b-', label = 'Train')
      ax1.plot(range(n_iter), history_loss_test_1, 'r--', label = 'Test')
      ax1.set_title('Loss')
      ax1.legend(loc = 'upper right')

      ax2 = fig.add_subplot(1, 2, 2)
      ax2.plot(range(n_iter), history_acc_train_1, 'b-', label = 'Train')
      ax2.plot(range(n_iter), history_acc_test_1, 'r--', label = 'Test')
      ax2.set_ylim(0.0, 1.0)
      ax2.set_title('Accuracy')
      ax2.legend(loc = 'lower right')

      plt.show()

      # for 2  
      history_loss_train_2 = []
      history_loss_test_2 = []

      history_acc_train_2 = []
      history_acc_test_2 = []

      for i in range(n_iter):
        # Train
        rand_index = np.random.choice(len(images_train_t), size = batch_size)
        x_batch = images_train_t[rand_index]
        y_batch = labels_train_t[rand_index]

        feed_dict = {x: x_batch, y: y_batch, keep_prob: 0.7}

        sess.run(train_step_2, feed_dict = feed_dict)

        temp_loss = sess.run(loss_2, feed_dict = feed_dict)
        temp_acc = sess.run(acc_2, feed_dict = feed_dict)

        history_loss_train_2.append(temp_loss)
        history_acc_train_2.append(temp_acc)

        if (i + 1) % show_step == 0:
          print ('-' * 15)
          print ('Iteration: ' + str(i + 1) + '  Loss: ' + str(temp_loss) + \
                '  Accuracy: ' + str(temp_acc))

        # Test
        rand_index = np.random.choice(len(images_test_t), size = batch_size)
        x_batch = images_test_t[rand_index]
        y_batch = labels_test_t[rand_index]

        feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}

        temp_loss = sess.run(loss_2, feed_dict = feed_dict)
        temp_acc = sess.run(acc_2, feed_dict = feed_dict)

        history_loss_test_2.append(temp_loss)
        history_acc_test_2.append(temp_acc)

      if is_saving:
        model_path = saver.save(sess, model_path)
        print ('done saving at ', model_path)

      print ('-'* 15)    
      fig = plt.figure(figsize = (10, 3))
      ax1 = fig.add_subplot(1, 2, 1)
      ax1.plot(range(n_iter), history_loss_train_2, 'b-', label = 'Train')
      ax1.plot(range(n_iter), history_loss_test_2, 'r--', label = 'Test')
      ax1.set_title('Loss')
      ax1.legend(loc = 'upper right')

      ax2 = fig.add_subplot(1, 2, 2)
      ax2.plot(range(n_iter), history_acc_train_2, 'b-', label = 'Train')
      ax2.plot(range(n_iter), history_acc_test_2, 'r--', label = 'Test')
      ax2.set_ylim(0.0, 1.0)
      ax2.set_title('Accuracy')
      ax2.legend(loc = 'lower right')

      plt.show()

  def predict_1(self, images, labels, filter_size, n_filters_1, n_filters_2, n_fc_0, n_fc_1, model_path):

    x = tf.placeholder(shape = [None, 28 * 28], dtype = tf.float32)
    y = tf.placeholder(shape = [None, 10], dtype = tf.float32)
    keep_prob = tf.placeholder(shape = (), dtype = tf.float32)

    feat = self.F_0(x, filter_size, n_filters_1, n_filters_2, n_fc_0, \
                      keep_prob, reuse = True)

    logits = self.F_1(feat, n_fc_0, n_fc_1, keep_prob, reuse = True)
    probs = tf.nn.softmax(logits)
    acc =  self.accuracy(probs, y)

    saver = tf.train.Saver()

    with tf.Session() as sess:

      saver.restore(sess, model_path)

      feed_dict = {x: images, y: labels, keep_prob: 1.0}

      print('Accuracy_1: {:.4f}'.format(sess.run(acc, feed_dict = feed_dict)))

      return sess.run(probs, feed_dict = feed_dict)

  def predict_2(self, images, labels, filter_size, n_filters_1, n_filters_2, n_fc_0, n_fc_2, model_path):

    x = tf.placeholder(shape = [None, 28 * 28], dtype = tf.float32)
    y = tf.placeholder(shape = [None, 10], dtype = tf.float32)
    keep_prob = tf.placeholder(shape = (), dtype = tf.float32)

    feat = self.F_0(x, filter_size, n_filters_1, n_filters_2, n_fc_0, \
                      keep_prob, reuse = True)

    logits = self.F_2(feat, n_fc_0, n_fc_2, keep_prob, reuse = True)
    probs = tf.nn.softmax(logits)
    acc =  self.accuracy(probs, y)

    saver = tf.train.Saver()

    with tf.Session() as sess:

      saver.restore(sess, model_path)

      feed_dict = {x: images, y: labels, keep_prob: 1.0}

      print('Accuracy_2: {:.4f}'.format(sess.run(acc, feed_dict = feed_dict)))

      return sess.run(probs, feed_dict = feed_dict)

Parameters

filter_size = 3
n_filters_1 = 16
n_filters_2 = 16
n_fc_0 = 128
n_fc_1 = 64
n_fc_2 = 64
learning_rate = 0.01
n_iter = 100
batch_size = 64
show_step = 50
model_path = 'datalab/model'

Output results 1

images_train_s = images_train
labels_train_s = labels_train
images_test_s = images_test
labels_test_s = labels_test

images_train_t = images_flip_train
labels_train_t = labels_train
images_test_t = images_flip_test
labels_test_t = labels_test

da_4.fit_0_1_2(images_train_s, labels_train_s, images_test_s, labels_test_s, \
               images_train_t, labels_train_t, images_test_t, labels_test_t, \
               filter_size, n_filters_1, n_filters_2, n_fc_0, n_fc_1, n_fc_2, \
               learning_rate, n_iter, batch_size, show_step, is_saving, model_path)

image.png

image.png

Output resuts 2

indices = np.random.choice(10000, 32)
#images = images_test[indices]
images = images_flip_test[indices]
labels = labels_test[indices]

prods_1 = da_4.predict_1(images, labels, filter_size, n_filters_1, n_filters_2, n_fc_0, n_fc_1, model_path)
prods_2 = da_4.predict_2(images, labels, filter_size, n_filters_1, n_filters_2, n_fc_0, n_fc_2, model_path)

print ('-' * 15)
print ('Prediction_1')
print (np.argmax(prods_1, axis = 1))
print ('Prediction_2')
print (np.argmax(prods_2, axis = 1))
print ('True')
print (np.argmax(labels, axis = 1))

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0