# Time Series Analysis
class TSA():
def __init__(self):
pass
def weight_variable(self, name, shape):
initializer = tf.truncated_normal_initializer(mean = 0.0, stddev = 0.01, dtype = tf.float32)
return tf.get_variable(name, shape, initializer = initializer)
def bias_variable(self, name, shape):
initializer = tf.constant_initializer(value = 0.0, dtype = tf.float32)
return tf.get_variable(name, shape, initializer = initializer)
def get_zoneout_mask(self, zoneout_prob, shape):
keep_prob = tf.convert_to_tensor(zoneout_prob)
random_tensor = keep_prob + tf.random_uniform(shape)
binary_tensor = tf.floor(random_tensor)
zoneout_mask = binary_tensor
return zoneout_mask
def lstm(self, x, length, n_in, n_units_l, n_predictions, batch_size, \
forget_bias, zoneout_prob, reuse = False):
x = tf.reshape(x, [-1, length, n_in])
h = tf.zeros(shape = [batch_size, n_units_l], dtype = tf.float32)
c = tf.zeros(shape = [batch_size, n_units_l], dtype = tf.float32)
with tf.variable_scope('lstm', reuse = reuse):
w_x = self.weight_variable('w_x', [n_in, n_units_l * 4])
w_h = self.weight_variable('w_h', [n_units_l, n_units_l * 4])
b = self.bias_variable('b', [n_units_l * 4])
zoneout_mask_c = self.get_zoneout_mask(zoneout_prob, [n_units_l])
zoneout_mask_complement_c = tf.ones(shape = [n_units_l], dtype = tf.float32) - zoneout_mask_c
zoneout_mask_h = self.get_zoneout_mask(zoneout_prob, [n_units_l])
zoneout_mask_complement_h = tf.ones(shape = [n_units_l], dtype = tf.float32) - zoneout_mask_h
for t in range(length):
t_x = tf.matmul(x[:, t, :], w_x)
t_h = tf.matmul(h, w_h)
i, f, o, g = tf.split(tf.add(tf.add(t_x, t_h), b), 4, axis = 1)
i = tf.nn.sigmoid(i)
f = tf.nn.sigmoid(f + forget_bias)
o = tf.nn.sigmoid(o)
g = tf.nn.tanh(g)
# zoneout
c_temp = tf.add(tf.multiply(f, c), tf.multiply(i, g))
h_temp = tf.multiply(o, tf.nn.tanh(c))
c = zoneout_mask_c * c + \
zoneout_mask_complement_c * c_temp
h = zoneout_mask_h * h + \
zoneout_mask_complement_h * h_temp
w_2 = self.weight_variable('w_2', [n_units_l, n_predictions])
b_2 = self.bias_variable('b_2', [n_predictions])
y = tf.matmul(h, w_2) + b_2
return y
def ar(self, x, length, p, n_predictions, reuse = False):
x = x[:, length - p:]
with tf.variable_scope('ar', reuse = reuse):
w = self.weight_variable('w', [p, n_predictions])
b = self.bias_variable('b', [n_predictions])
y = tf.matmul(x, w) + b
return y
def fc(self, x, length, p, n_units_fc, keep_prob, reuse = False):
x = x[:, length - p:]
with tf.variable_scope('fc', reuse = reuse):
w_1 = self.weight_variable('w_1', [p, n_units_fc])
b_1 = self.bias_variable('b_1', [n_units_fc])
y = tf.matmul(x, w_1) + b_1
# batch norm
#batch_mean, batch_var = tf.nn.moments(y, [0])
#y = (y - batch_mean) / (tf.sqrt(batch_var) + 1e-10)
# relu
y = tf.nn.relu(y)
# dropout
#y = tf.nn.dropout(y, keep_prob)
w_2 = self.weight_variable('w_2', [n_units_fc, n_predictions])
b_2 = self.bias_variable('b_2', [n_predictions])
y = tf.matmul(y, w_2) + b_2
return y
def gradient_reversal(self, f, lam, n_units_f, batch_size):
i = tf.ones(shape = [batch_size, n_units_f], dtype = tf.float32)
return - lam * i + tf.stop_gradient(f + lam * i)
def loss_mse(self, y, t):
mse = tf.reduce_mean(tf.reduce_sum(tf.square(t - y), axis = 1))
return mse
def loss_cross_entropy(self, y, t):
cross_entropy = - tf.reduce_mean(tf.reduce_sum(t * tf.log(tf.clip_by_value(y, 1e-10, 1.0)), axis = 1))
return cross_entropy
def loss_entropy(self, p):
entropy = tf.reduce_mean(tf.reduce_sum(p * tf.log(tf.clip_by_value(p, 1e-10, 1.0)), axis = 1))
return entropy
def accuracy(self, y, t):
correct_preds = tf.equal(tf.argmax(y, axis = 1), tf.argmax(t, axis = 1))
accuracy = tf.reduce_mean(tf.cast(correct_preds, tf.float32))
return accuracy
def accuracy_mae(self, y, t, n_predictions):
accuracy = tf.reduce_mean(tf.reduce_sum(tf.abs(y - t), axis = 1)) / n_predictions
return accuracy
def training(self, loss, learning_rate, var_list):
optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
train_step = optimizer.minimize(loss, var_list = var_list)
return train_step
def training_gd(self, loss, learning_rate, var_list):
#optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
optimizer = tf.train.GradientDescentOptimizer(learning_rate = learning_rate)
train_step = optimizer.minimize(loss, var_list = var_list)
return train_step
def training_clipped(self, loss, learning_rate, clip_norm, var_list):
optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
grads_and_vars = optimizer.compute_gradients(loss, var_list = var_list)
clipped_grads_and_vars = [(tf.clip_by_norm(grad, clip_norm = clip_norm), \
var) for grad, var in grads_and_vars]
train_step = optimizer.apply_gradients(clipped_grads_and_vars)
return train_step
def fit_lstm(self, x_train, x_test, y_train, y_test, \
length, n_in, n_units_l, n_predictions, \
learning_rate, n_iter, batch_size, show_step, is_saving, model_path):
tf.reset_default_graph()
x = tf.placeholder(shape = [None, length], dtype = tf.float32)
y = tf.placeholder(shape = [None, n_predictions], dtype = tf.float32)
keep_prob = tf.placeholder(shape = [], dtype = tf.float32)
preds = self.lstm(x, length, n_in, n_units_l, n_predictions, batch_size, 1.0, 0.0, reuse = False)
loss = self.loss_mse(preds, y)
var_list = tf.trainable_variables('lstm')
train_step = self.training(loss, learning_rate, var_list)
acc = self.accuracy_mae(preds, y, n_predictions)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
with tf.Session() as sess:
sess.run(init)
history_loss_train = []
history_loss_test = []
history_acc_train = []
history_acc_test = []
for i in range(n_iter):
# Training
rand_index = np.random.choice(len(x_train), size = batch_size)
x_batch = x_train[rand_index]
y_batch = y_train[rand_index]
feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}
sess.run(train_step, feed_dict = feed_dict)
temp_loss = sess.run(loss, feed_dict = feed_dict)
temp_acc = sess.run(acc, feed_dict = feed_dict)
history_loss_train.append(temp_loss)
history_acc_train.append(temp_acc)
if (i + 1) % show_step == 0:
print ('-' * 100)
print ('Iteration: ' + str(i + 1) + ' Loss: ' + str(temp_loss) \
+ ' Accuracy: ' + str(temp_acc))
# Test
rand_index = np.random.choice(len(x_test), size = batch_size)
x_batch = x_test[rand_index]
y_batch = y_test[rand_index]
feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}
temp_loss = sess.run(loss, feed_dict = feed_dict)
temp_acc = sess.run(acc, feed_dict = feed_dict)
history_loss_test.append(temp_loss)
history_acc_test.append(temp_acc)
if is_saving:
model_path = saver.save(sess, model_path)
print ('-' * 100)
print ('done saving at ', model_path)
print ('-' * 100)
fig = plt.figure(figsize = (10, 3))
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(range(n_iter), history_loss_train, 'b-', label = 'Training')
ax1.plot(range(n_iter), history_loss_test, 'r-', label = 'Test')
ax1.set_ylim(0.0, 1.0)
ax1.set_title('Loss')
ax1.legend(loc = 'upper right')
ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(range(n_iter), history_acc_train, 'b-', label = 'Training')
ax2.plot(range(n_iter), history_acc_test, 'r-', label = 'Test')
ax2.set_ylim(0.0, 1.0)
ax2.set_title('Accuracy')
ax2.legend(loc = 'upper right')
plt.show()
def fit_ar(self, x_train, x_test, y_train, y_test, \
length, p, n_predictions, \
learning_rate, n_iter, batch_size, show_step, is_saving, model_path):
tf.reset_default_graph()
x = tf.placeholder(shape = [None, length], dtype = tf.float32)
y = tf.placeholder(shape = [None, n_predictions], dtype = tf.float32)
keep_prob = tf.placeholder(shape = [], dtype = tf.float32)
preds = self.ar(x, length, p, n_predictions, reuse = False)
loss = self.loss_mse(preds, y)
var_list = tf.trainable_variables('ar')
train_step = self.training(loss, learning_rate, var_list)
acc = self.accuracy_mae(preds, y, n_predictions)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
with tf.Session() as sess:
sess.run(init)
history_loss_train = []
history_loss_test = []
history_acc_train = []
history_acc_test = []
for i in range(n_iter):
# Training
rand_index = np.random.choice(len(x_train), size = batch_size)
x_batch = x_train[rand_index]
y_batch = y_train[rand_index]
feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}
sess.run(train_step, feed_dict = feed_dict)
temp_loss = sess.run(loss, feed_dict = feed_dict)
temp_acc = sess.run(acc, feed_dict = feed_dict)
history_loss_train.append(temp_loss)
history_acc_train.append(temp_acc)
if (i + 1) % show_step == 0:
print ('-' * 100)
print ('Iteration: ' + str(i + 1) + ' Loss: ' + str(temp_loss) \
+ ' Accuracy: ' + str(temp_acc))
# Test
rand_index = np.random.choice(len(x_test), size = batch_size)
x_batch = x_test[rand_index]
y_batch = y_test[rand_index]
feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}
temp_loss = sess.run(loss, feed_dict = feed_dict)
temp_acc = sess.run(acc, feed_dict = feed_dict)
history_loss_test.append(temp_loss)
history_acc_test.append(temp_acc)
if is_saving:
model_path = saver.save(sess, model_path)
print ('-' * 100)
print ('done saving at ', model_path)
print ('-' * 100)
fig = plt.figure(figsize = (10, 3))
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(range(n_iter), history_loss_train, 'b-', label = 'Training')
ax1.plot(range(n_iter), history_loss_test, 'r-', label = 'Test')
ax1.set_ylim(0.0, 1.0)
ax1.set_title('Loss')
ax1.legend(loc = 'upper right')
ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(range(n_iter), history_acc_train, 'b-', label = 'Training')
ax2.plot(range(n_iter), history_acc_test, 'r-', label = 'Test')
ax2.set_ylim(0.0, 1.0)
ax2.set_title('Accuracy')
ax2.legend(loc = 'upper right')
plt.show()
def fit_fc(self, x_train, x_test, y_train, y_test, \
length, p, n_units_fc, n_predictions, \
learning_rate, n_iter, batch_size, show_step, is_saving, model_path):
tf.reset_default_graph()
x = tf.placeholder(shape = [None, length], dtype = tf.float32)
y = tf.placeholder(shape = [None, n_predictions], dtype = tf.float32)
keep_prob = tf.placeholder(shape = [], dtype = tf.float32)
preds = self.fc(x, length, p, n_units_fc, n_predictions, reuse = False)
loss = self.loss_mse(preds, y)
var_list = tf.trainable_variables('fc')
train_step = self.training(loss, learning_rate, var_list)
acc = self.accuracy_mae(preds, y, n_predictions)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
with tf.Session() as sess:
sess.run(init)
history_loss_train = []
history_loss_test = []
history_acc_train = []
history_acc_test = []
for i in range(n_iter):
# Training
rand_index = np.random.choice(len(x_train), size = batch_size)
x_batch = x_train[rand_index]
y_batch = y_train[rand_index]
feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}
sess.run(train_step, feed_dict = feed_dict)
temp_loss = sess.run(loss, feed_dict = feed_dict)
temp_acc = sess.run(acc, feed_dict = feed_dict)
history_loss_train.append(temp_loss)
history_acc_train.append(temp_acc)
if (i + 1) % show_step == 0:
print ('-' * 100)
print ('Iteration: ' + str(i + 1) + ' Loss: ' + str(temp_loss) \
+ ' Accuracy: ' + str(temp_acc))
# Test
rand_index = np.random.choice(len(x_test), size = batch_size)
x_batch = x_test[rand_index]
y_batch = y_test[rand_index]
feed_dict = {x: x_batch, y: y_batch, keep_prob: 1.0}
temp_loss = sess.run(loss, feed_dict = feed_dict)
temp_acc = sess.run(acc, feed_dict = feed_dict)
history_loss_test.append(temp_loss)
history_acc_test.append(temp_acc)
if is_saving:
model_path = saver.save(sess, model_path)
print ('-' * 100)
print ('done saving at ', model_path)
print ('-' * 100)
fig = plt.figure(figsize = (10, 3))
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(range(n_iter), history_loss_train, 'b-', label = 'Training')
ax1.plot(range(n_iter), history_loss_test, 'r-', label = 'Test')
ax1.set_ylim(0.0, 1.0)
ax1.set_title('Loss')
ax1.legend(loc = 'upper right')
ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(range(n_iter), history_acc_train, 'b-', label = 'Training')
ax2.plot(range(n_iter), history_acc_test, 'r-', label = 'Test')
ax2.set_ylim(0.0, 1.0)
ax2.set_title('Accuracy')
ax2.legend(loc = 'upper right')
plt.show()
def predict_lstm(self, x_input, length, n_in, n_units_l, n_predictions, \
batch_size, model_path):
x = tf.placeholder(shape = [None, length], dtype = tf.float32)
y = tf.placeholder(shape = [None, n_predictions], dtype = tf.float32)
keep_prob = tf.placeholder(shape = [], dtype = tf.float32)
preds = self.lstm(x, length, n_in, n_units_l, n_predictions, batch_size, 1.0, 0.0, reuse = True)
saver = tf.train.Saver()
with tf.Session() as sess:
saver.restore(sess, model_path)
feed_dict = {x: x_input, keep_prob: 1.0}
prediction = sess.run(preds, feed_dict = feed_dict)
return prediction
def predict_ar(self, x_input, length, p, n_predictions, \
batch_size, model_path):
x = tf.placeholder(shape = [None, length], dtype = tf.float32)
y = tf.placeholder(shape = [None, n_predictions], dtype = tf.float32)
keep_prob = tf.placeholder(shape = [], dtype = tf.float32)
preds = self.ar(x, length, p, n_predictions, reuse = True)
saver = tf.train.Saver()
with tf.Session() as sess:
saver.restore(sess, model_path)
feed_dict = {x: x_input, keep_prob: 1.0}
prediction = sess.run(preds, feed_dict = feed_dict)
return prediction
def predict_fc(self, x_input, length, p, n_units_fc, n_predictions, \
batch_size, model_path):
x = tf.placeholder(shape = [None, length], dtype = tf.float32)
y = tf.placeholder(shape = [None, n_predictions], dtype = tf.float32)
keep_prob = tf.placeholder(shape = [], dtype = tf.float32)
preds = self.fc(x, length, p, n_units_fc, n_predictions, reuse = True)
saver = tf.train.Saver()
with tf.Session() as sess:
saver.restore(sess, model_path)
feed_dict = {x: x_input, keep_prob: 1.0}
prediction = sess.run(preds, feed_dict = feed_dict)
return prediction