0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Autoencoder x K-means clustering の実装に関するメモ

Posted at

Point

  • Dimension reduction with Autoencoder
  • Clustering with K-means

Data

from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('datalab/mnist', one_hot = True)

Sample Code 1 (Autoencoder)

class Autoencoder():

  def __init__(self):
    pass

  def weight_variable(self, name, shape):
    initializer = tf.truncated_normal_initializer(mean = 0.0, stddev = 0.01, dtype = tf.float32)
    return tf.get_variable(name, shape, initializer = initializer)

  def bias_variable(self, name, shape):
    initializer = tf.constant_initializer(value = 0.0, dtype = tf.float32)
    return tf.get_variable(name, shape, initializer = initializer)

  def beta_variable(self, name, shape):
    initializer = tf.constant_initializer(value = 0.0, dtype = tf.float32)
    return tf.get_variable(name, shape, initializer = initializer)

  def gamma_variable(self, name, shape):
    initializer = tf.constant_initializer(value = 1.0, dtype = tf.float32)
    return tf.get_variable(name, shape, initializer = initializer)

  def inference(self, x, n_in, n_units_1, n_units_2):
    with tf.variable_scope('enc_dec'):
      w_1 = self.weight_variable('w_1', [n_in, n_units_1])
      b_1 = self.bias_variable('b_1', [n_units_1])

      h_1 = tf.add(tf.matmul(x, w_1), b_1)

      # batch normalization
      beta_1 = self.beta_variable('beta_1', [n_units_1])
      gamma_1 = self.gamma_variable('gamma_1', [n_units_1])

      mean_1, var_1 = tf.nn.moments(h_1, [0])
      h_1 = gamma_1 * (h_1 - mean_1) / tf.sqrt(var_1 + 1e-5) + beta_1

      bn_1 = tf.nn.sigmoid(h_1)

      w_2 = self.weight_variable('w_2', [n_units_1, n_units_2])
      b_2 = self.bias_variable('b_2', [n_units_2])

      code = tf.nn.sigmoid(tf.add(tf.matmul(bn_1, w_2), b_2))

      w_3 = tf.transpose(w_2)
      b_3 = self.bias_variable('b_3', [n_units_1])

      h_3 = tf.add(tf.matmul(code, w_3), b_3)

      # batch normalization
      beta_2 = self.beta_variable('beta_2', [n_units_1])
      gamma_2 = self.gamma_variable('gamma_2', [n_units_1])

      mean_2, var_2 = tf.nn.moments(h_3, [0])
      h_3 = gamma_2 * (h_3 - mean_2) / tf.sqrt(var_2 + 1e-5) + beta_2

      bn_2 = tf.nn.sigmoid(h_3)

      w_4 = tf.transpose(w_1)
      b_4 = self.bias_variable('b_4', [n_in])

      return tf.nn.sigmoid(tf.add(tf.matmul(bn_2, w_4), b_4))

  def loss(self, y, t):
    return - tf.reduce_mean(tf.reduce_sum(t * tf.log(tf.clip_by_value(y, 1e-10, 1.0)) + \
                                                             (1 - t) * tf.log(tf.clip_by_value(1 - y, 1e-10, 1.0)), axis = 1))
    #return tf.reduce_mean(tf.square(t - y))

  def training(self, loss, learning_rate):
    optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
    train_step = optimizer.minimize(loss)
    return train_step

  def training_clipped(self, loss, learning_rate, clip_norm):
    optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)

    grads_and_vars = optimizer.compute_gradients(loss)
    clipped_grads_and_vars = [(tf.clip_by_norm(grad, clip_norm = clip_norm), \
                             var) for grad, var in grads_and_vars]
    train_step = optimizer.apply_gradients(clipped_grads_and_vars)

    return train_step

  def fit(self, images_train, labels_train, images_test, labels_test, \
          n_in, n_units_1, n_units_2, \
          learning_rate, n_iter, batch_size, show_step, is_saving, model_path):

    tf.reset_default_graph()

    x = tf.placeholder(shape = [None, 28*28], dtype = tf.float32)

    y = self.inference(x, n_in, n_units_1, n_units_2)
    loss = self.loss(y, x)
    train_step = self.training(loss, learning_rate)

    init = tf.global_variables_initializer()
    saver = tf.train.Saver()

    with tf.Session() as sess:

      sess.run(init)

      history_loss_train = []
      history_loss_test = []

      for i in range(n_iter):
        # Train
        rand_index = np.random.choice(len(images_train), size = batch_size)
        x_batch = images_train[rand_index]

        feed_dict = {x: x_batch}

        sess.run(train_step, feed_dict = feed_dict)

        temp_loss = sess.run(loss, feed_dict = feed_dict)
        history_loss_train.append(temp_loss)

        if (i + 1) % show_step == 0:
          print ('--------------------')
          print ('Iteration: ' + str(i + 1) + '  Loss: ' + str(temp_loss))

        # Test
        rand_index = np.random.choice(len(images_test), size = batch_size)
        x_batch = images_test[rand_index]

        feed_dict = {x: x_batch}

        temp_loss = sess.run(loss, feed_dict = feed_dict)
        history_loss_test.append(temp_loss)

      if is_saving:
        model_path = saver.save(sess, model_path)
        print ('--------------------')
        print ('done saving at ', model_path)

    fig = plt.figure(figsize = (10, 3))
    ax1 = fig.add_subplot(1, 2, 1)
    ax1.plot(range(n_iter), history_loss_train, 'b-', label = 'Train')
    ax1.plot(range(n_iter), history_loss_test, 'r--', label = 'Test')
    ax1.set_title('Loss')
    ax1.legend(loc = 'upper right')

    plt.show()    

  def reconstruct_image(self, x, n_in, n_units_1, n_units_2, model_path):
    with tf.variable_scope('enc_dec', reuse = True):
      w_1 = self.weight_variable('w_1', [n_in, n_units_1])
      b_1 = self.bias_variable('b_1', [n_units_1])

      h_1 = tf.add(tf.matmul(x, w_1), b_1)

      beta_1 = self.beta_variable('beta_1', [n_units_1])
      gamma_1 = self.gamma_variable('gamma_1', [n_units_1])

      mean_1, var_1 = tf.nn.moments(h_1, [0])
      h_1 = gamma_1 * (h_1 - mean_1) / tf.sqrt(var_1 + 1e-5) + beta_1

      bn_1 = tf.nn.sigmoid(h_1)

      w_2 = self.weight_variable('w_2', [n_units_1, n_units_2])
      b_2 = self.bias_variable('b_2', [n_units_2])

      code = tf.nn.sigmoid(tf.add(tf.matmul(bn_1, w_2), b_2))

      w_3 = tf.transpose(w_2)
      b_3 = self.bias_variable('b_3', [n_units_1])

      h_3 = tf.add(tf.matmul(code, w_3), b_3)

      beta_2 = self.beta_variable('beta_2', [n_units_1])
      gamma_2 = self.gamma_variable('gamma_2', [n_units_1])

      mean_2, var_2 = tf.nn.moments(h_3, [0])
      h_3 = gamma_2 * (h_3 - mean_2) / tf.sqrt(var_2 + 1e-5) + beta_2

      bn_2 = tf.nn.sigmoid(h_3)

      w_4 = tf.transpose(w_1)
      b_4 = self.bias_variable('b_4', [n_in])

      y = tf.nn.sigmoid(tf.add(tf.matmul(bn_2, w_4), b_4))

    saver = tf.train.Saver()

    with tf.Session() as sess:

      saver.restore(sess, model_path)

      return sess.run(y)

  def generate_code(self, x, n_in, n_units_1, n_units_2, model_path):
    with tf.variable_scope('enc_dec', reuse = True):
      w_1 = self.weight_variable('w_1', [n_in, n_units_1])
      b_1 = self.bias_variable('b_1', [n_units_1])

      h_1 = tf.add(tf.matmul(x, w_1), b_1)

      beta_1 = self.beta_variable('beta_1', [n_units_1])
      gamma_1 = self.gamma_variable('gamma_1', [n_units_1])

      mean_1, var_1 = tf.nn.moments(h_1, [0])
      h_1 = gamma_1 * (h_1 - mean_1) / tf.sqrt(var_1 + 1e-5) + beta_1

      bn_1 = tf.nn.sigmoid(h_1)

      w_2 = self.weight_variable('w_2', [n_units_1, n_units_2])
      b_2 = self.bias_variable('b_2', [n_units_2])

      code = tf.nn.sigmoid(tf.add(tf.matmul(bn_1, w_2), b_2))

    saver = tf.train.Saver()

    with tf.Session() as sess:

      saver.restore(sess, model_path)

      return sess.run(code)
ae = Autoencoder()

n_in = 28*28
n_units_1 = 256
n_units_2 = 128
learning_rate = 0.01
n_iter = 1000
batch_size = 100
show_step = 200
model_path = 'datalab/model'

images_train = mnist.train.images
labels_train = mnist.train.labels
images_test = mnist.test.images
labels_test = mnist.test.labels

is_saving = True

ae.fit(images_train, labels_train, images_test, labels_test, \
          n_in, n_units_1, n_units_2, \
          learning_rate, n_iter, batch_size, show_step, is_saving, model_path)

image.png

sample_size = 3
index = np.random.choice(10000, sample_size)
x = images_test[index]
labels = np.argmax(labels_test[index], axis = 1)

reconstructed = ae.reconstruct_image(x, n_in, n_units_1, n_units_2, model_path)

print (np.shape(x))
print (np.shape(reconstructed))

fig = plt.figure(figsize = (5, 3))

ax1 = fig.add_subplot(2, 2, 1)
ax1.imshow(np.reshape(x[0], [28, 28]), cmap = 'gray')
ax1.set_axis_off()
ax1.set_title('Input')

ax2 = fig.add_subplot(2, 2, 2)
ax2.imshow(np.reshape(reconstructed[0], [28, 28]), cmap = 'gray')
ax2.set_axis_off()
ax2.set_title('Reconstructed')

ax3 = fig.add_subplot(2, 2, 3)
ax3.imshow(np.reshape(x[1], [28, 28]), cmap = 'gray')
ax3.set_axis_off()
ax3.set_title('Input')

ax4 = fig.add_subplot(2, 2, 4)
ax4.imshow(np.reshape(reconstructed[1], [28, 28]), cmap = 'gray')
ax4.set_axis_off()
ax4.set_title('Reconstructed')

plt.show()

image.png

sample_size = 1000
index = np.random.choice(10000, sample_size)

x_train = images_train[index]
y_train = labels_train[index]
y_train_idx = np.argmax(y_train, axis = 1)
x_test = images_test[index]
y_test = labels_test[index]
y_test_idx = np.argmax(y_test, axis = 1)

codes_train = ae.generate_code(x_train, n_in, n_units_1, n_units_2, model_path)
codes_test = ae.generate_code(x_test, n_in, n_units_1, n_units_2, model_path)

Sample Code 2 (K-means)

class k_means():
  def __init__(self):
    pass
    
  def inference(self, x, n_clusters):
    r_seed = np.random.randint(100)
    graph = tf.contrib.factorization.KMeans(x, num_clusters = n_clusters, \
                                           random_seed = r_seed)
    
    return graph.training_graph()

  def predict(self):
    pass
  
  def accuracy(self, cluster_idx, labels, lookup_table):
    y = tf.nn.embedding_lookup(lookup_table, cluster_idx)
    correct_preds = tf.equal(tf.squeeze(y), tf.argmax(labels, axis = 1))
    accuracy = tf.reduce_mean(tf.cast(correct_preds, tf.float32))
    
    return accuracy
  
  def fit(self, input_train, labels_train, input_test, labels_test, \
          dim, n_clusters, n_classes, n_iter, show_step):
    
    tf.reset_default_graph()
    
    x = tf.placeholder(shape = [None, dim], dtype = tf.float32)
    t = tf.placeholder(shape = [None, n_classes], dtype = tf.int32)
          
    feed_dict_train = {x: input_train, t: labels_train}
    feed_dict_test = {x: input_test, t: labels_test}

    all_scores, cluster_idx, scores, cluster_centers_initialized, init_op, training_op = self.inference(x, n_clusters)
    
    average_score = tf.reduce_mean(scores)
    
    init_var = tf.global_variables_initializer()
    
    with tf.Session() as sess:
      
      sess.run(init_var)
      sess.run(init_op, feed_dict = feed_dict_train)      
            
      for i in range(n_iter):
        sess.run(training_op, feed_dict = feed_dict_train)
      
        if (i + 1) % show_step == 0:
          average_score_train = sess.run(average_score, feed_dict = feed_dict_train)
          average_score_test = sess.run(average_score, feed_dict = feed_dict_test)
          cluster_idx_train = sess.run(cluster_idx, feed_dict = feed_dict_train)
          cluster_idx_test = sess.run(cluster_idx, feed_dict = feed_dict_test)
          
          count = np.zeros(shape = [n_clusters, n_classes], dtype = np.int32)
          
          for j in range(len(labels_train)):
            count[cluster_idx_train[0][j]] += labels_train[j]
            
          lookup_table = np.reshape(np.argmax(count, axis = 1), [-1, 1])
          
          accuracy_train = sess.run(self.accuracy(cluster_idx_train[0], labels_train, lookup_table))
          accuracy_test = sess.run(self.accuracy(cluster_idx_test[0], labels_test, lookup_table))

          
          print ('--------------------')
          print (' Iteration: ' + str(i + 1))
          print (' Average score (train): {:.5f}        Average score (test): {:.5f}' \
                 .format(average_score_train, average_score_test))
          print (' Accuracy (train): {:.3f}            Accuracy (test): {:.3f}' \
                 .format(accuracy_train, accuracy_test))

          fig = plt.figure(figsize = (8, 3))

          ax = fig.add_subplot(1, 2, 1)
          ax.scatter(input_train[:, 0], input_train[:, 1], c = cluster_idx_train[0], cmap = 'RdYlBu')  # Accent, RdYlBu
          ax.set_xlim(0.0, 1.0)
          ax.set_ylim(0.0, 1.0)
          ax.set_title('Train')

          ax = fig.add_subplot(1, 2, 2)
          ax.scatter(input_test[:, 0], input_test[:, 1], c = cluster_idx_test[0], cmap = 'RdYlBu')  # Accent, RdYlBu
          ax.set_xlim(0.0, 1.0)
          ax.set_ylim(0.0, 1.0)
          ax.set_title('Test')

          plt.show()
kmeans = k_means()

input_train_km = codes_train
labels_train_km = y_train.astype(np.int32)
input_test_km = codes_test
labels_test_km = y_test.astype(np.int32)

dim = 128
n_clusters = 20
n_classes = 10
n_iter = 50
show_step = 10

kmeans.fit(input_train_km, labels_train_km, input_test_km, labels_test_km, \
          dim, n_clusters, n_classes, n_iter, show_step)

image.png

image.png

image.png

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?