そろそろ飽きてきたので、楽しい楽しい高速化遊びで遊ぶことにする。
実は gcp はディスクが遅い。
hdd なんか使うと 0.75 IOPS でどうだかいう話で( https://cloud.google.com/compute/docs/disks/performance?hl=ja ) read に 3400 秒くらいかかっていたのが ssd にして 310 秒くらいに丸められたので良しとしたのは 第1回 で書いた。
それでも 1RUN あたり 300秒とか消費されているので、これがどうにかならないかと弄ってみる。
pickle でキャッシュ
trainとtestのうち、trainはバッチを固定したくないのでshuffleして読んでいるのでキャッシュできない。
しかしtestはshuffleいらないよなーということでディレクトリを頭から読んでいるのでこれをキャッシュしてみる。
もともと read まわりはこんなコードだった。
def read_and_norm(line):
l2 = line.split()
t1 = time.time()
img = io.imread(TARGETDIR+l2[0])
t2 = time.time()
a, b = img.min(), img.max()
if (b-a > 0):
img = (img - a) / (b - a)
t3 = time.time()
#print(l2[0],a,b)
if MODE == 'TRAIN' and l2[1] == '0':
L = [1, 0]
else:
L = [0, 1]
return [[img, L], [t2-t1, t3-t2]]
else:
t3 = time.time()
return [[], [t2-t1, t3-t2]]
line
はtrain_masterから読んできた行で、ファイル名とラベルが
train_255369.tif 0
のように入っている。元のtrain_masterから100行くらい切り出して、1行毎にこの関数でfile readしてなにか normalize して one hot 形式でラベル貼っつけて、ついでに経過時間のリストとセットで返している。
一旦これを、こんな形式に書き換える。
def read_and_norms(filenames):
def fun1(line):
l2 = line.split()
t1 = time.time()
img = io.imread(TARGETDIR+l2[0])
t2 = time.time()
if MODE == 'TRAIN' and l2[1] == '0':
L = [1, 0]
else:
L = [0, 1]
return [[img, L], [t2-t1]]
imgs = map(fun1, filenames)
def fun2(img0):
img = img0[0][0]
t2 = time.time()
a, b = img.min(), img.max()
if (b-a > 0):
img = (img - a) / (b - a)
t3 = time.time()
return [[img, img0[0][1]], [img0[1][0], t3-t2]]
else:
t3 = time.time()
return [[], [img0[1][0], t3-t2]]
return map(fun2, imgs)
一瞬でわかりにくくなったが、今度は1バッチ分のtrain_masterの行配列が渡ってきて、これをmap関数で1行単位に分割して行の中を分割してreadしてラベル貼っつけて、mapなのでlistでreturnして、それを後続のmap関数でnormalize処理している。
retrunされるのは、画像とラベルのリストと経過時間二つのリストが束ねられたリストがファイル数分固められたリストである。
この、前段のmap関数の結果をpickleで保存しておいて次回以降は再利用する。
def read_and_norms(filenames, pknum):
def fun1(line):
l2 = line.split()
t1 = time.time()
img = io.imread(TARGETDIR+l2[0])
t2 = time.time()
if MODE == 'TRAIN' and l2[1] == '0':
L = [1, 0]
else:
L = [0, 1]
return [[img, L], [t2-t1]]
imgs = []
pkfile = TEST_PICKLE_DIR + 'pkl' + str(pknum) if pknum != False else ''
if ((pknum!=False) and (os.path.exists(pkfile))):
with open(pkfile, 'rb') as f:
imgs = pickle.load(f)
else:
imgX = map(fun1, filenames)
imgs = list(imgX)
if ((pknum!=False) and (not os.path.exists(pkfile))):
with open(pkfile, 'wb') as f:
pickle.dump(imgs, f)
def fun2(img0):
img = img0[0][0]
t2 = time.time()
a, b = img.min(), img.max()
if (b-a > 0):
img = (img - a) / (b - a)
t3 = time.time()
return [[img, img0[0][1]], [img0[1][0], t3-t2]]
else:
t3 = time.time()
return [[], [img0[1][0], t3-t2]]
return map(fun2, imgs)
引数が増えているがこれは
pknum = False if MODE == 'TRAIN' else batchnum
imgs = read_and_norms(filenames_x, pknum)
としてTESTモードの時だけバッチ番号を渡している。
このバッチ番号から
pkfile = TEST_PICKLE_DIR + 'pkl' + str(pknum) if pknum != False else ''
でファイル名を作っているのは特に説明することはなくて、そのあとのif文も
- pickleするつもりでいてファイルが存在していたらそこから読む
- さもなくば真面目に imread して、
- pickleするつもりでいてファイルが存在していなかったらpickleで保存する
というだけである。
こいつの面倒な点は、python3のmapはリストではなくiterableを後ろに回すので途中でpickle保存のために全部巻き取ると後ろのmap関数にiterableが届かなくなる点にある。
よって、pickle保存しようがしまいがそこから読もうが一旦リストimgs
を経由して二つのmap関数を繋ぐ必要がある。
結果がどうなったか
file read/sec | total elapsed time/sec | |
---|---|---|
最初の形式 | 280.75 | 299.44 |
map関数2個に置き換え | 281.47 | 289.51 |
pickleあり1RUN目 | 289.19 | 297.08 |
pickleあり2RUN目 | 100.18 | 107.26 |
map関数置き換えで特に悪さが出るでもなく。意図通りpickleありの2度目以降はcacheから読むので file open の回数が 1/100 になって 300秒から100秒への短縮に成功している。
multiprocess で read と compute の並行処理
pythonじゃなければ、 multi thread で read と compute を重ねるとか、ファイル100個の read を並行処理でぶん投げて promise で待つとかいろいろやりようがあるのだが、なにせ python は GIL で 1 thread しか同時に回らないのでどうしようも無い。
こういうのを process 並列にするとプロセス間通信が必要で、メモリコピーが発生して嫌だけど(multi threadだったら勝手に覗き込めばいい)他にないので multiprocessing module で並行処理してみる。
ただこの場合、 Queue オブジェクトを使ってコード上では何も考えずにデータの受け渡しができるのは大変えらい。
こんな関数を別プロセス上で動かす。
def read_fork(filenames, MODE, _fun1, TARGETDIR, TEST_PICKLE_DIR, q):
_start_time = time.time()
read_time = 0
read_time1 = 0
read_time2 = 0
batchnum = 0
while len(filenames)>0:
filenames_x = filenames[:min(BATCHSIZE, len(filenames))]
print("batch:", len(filenames_x), '/', len(filenames), "elapsed:", time.time() - _start_time, "sec")
del filenames[:min(BATCHSIZE, len(filenames))]
batchnum += 1
if MODE == 'TRAIN':
filenames_y = [l.split()[0] for l in filenames_x]
else:
filenames_y = filenames_x
Train = []
Label = []
r0 = time.time()
### read images, labels
pknum = False if MODE == 'TRAIN' else batchnum
imgs = _fun1(filenames_x, pknum, MODE, TARGETDIR, TEST_PICKLE_DIR)
for l in imgs:
a, times = l
if len(a)>0:
Train.append(a[0])
Label.append(a[1])
read_time1 += times[0]
read_time2 += times[1]
read_time += time.time() - r0
q.put([Train, Label, filenames_y])
q.put(['END', read_time, read_time1, read_time2])
train_masterの全ファイル名をこのread_fork
に渡して別プロセスで動かさせると、こいつは1バッチ100ファイルづつ読んでリストにしてQueueに突っ込んでメインプロセスに送り返してくる。
全ファイル読み終わると終了サインを含むリストを送ってくるので、メインプロセス側では読み込んだ画像データが来続ける限りCNN処理を繰り返す。
呼び出し側はこんな感じである。
q = Queue()
p = Process(target=read_fork, args=(filenames, MODE, read_and_norms, TARGETDIR, TEST_PICKLE_DIR, q))
p.start()
do_CNN = True
while do_CNN:
imgs = q.get()
if imgs[0] != 'END':
## CNN
Train = imgs[0]
Label = imgs[1]
filenames_y = imgs[2]
### ここに CNN 処理が入る
else:
do_CNN = False
read_and_norms
は前述のpickleありmap2段の file read 関数だがこれも引数として渡して何事もなく別プロセスに渡る。
結果どうなったか
read/sec | compute/sec | elapsed time/sec | |
---|---|---|---|
single process | 331.91 | 459.56 | 791.98 |
multi process | 390.22 | 534.85 | 548.88 |
(computeが若干変わっているのは無視して)read timeがどちらも300秒以上あるにも関わらず total elapsed timeは790秒が550秒に圧縮されている。
この分がreadとcomputeが実時間で重なっている分である。
参考までに、全script
TARGETDIR = 'train/'
BATCHSIZE = 100
MODE2 = 'CONT'
#MODE2 = 'RESET'
NUM_STEP = 50
SHUFFLE = True
SESSION_FILE = "./model.ckpt"
TEST_PICKLE_DIR = "../pkl/"
##############################################################
##
## tensorflow CNN structures
##
## CNN
## [32,32,7]pixel
## -> [3,3,7,16] kernel -> [32,32,16]pixel -> [16,16,16]pixel
## -> [3,3,16,32] kernel -> [16,16,32]pixel -> [8,8,32]pixel
## -> [3,3,32,32] kernel -> [8,8,32]pixel -> [4,4,32]pixel
## -> [128]FC
## -> 2
import tensorflow as tf
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.1)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial)
def conv2d(x, W):
return tf.nn.conv2d(x, W, strides=[1,1,1,1], padding='SAME')
def max_pool_2x2(x):
return tf.nn.max_pool(x, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')
x = tf.placeholder(tf.float32, shape=[None, 32,32,7])
y_ = tf.placeholder(tf.float32, shape=[None,2])
#x_image = x / 256.0
x_image = x
## 1st layer
W_conv1 = weight_variable([3,3,7,16])
b_conv1 = bias_variable([16])
h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)
## 2nd layer
W_conv2 = weight_variable([3,3,16,32])
b_conv2 = bias_variable([32])
h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)
## 3rd layer
W_conv3 = weight_variable([3,3,32,32])
b_conv3 = bias_variable([32])
h_conv3 = tf.nn.relu(conv2d(h_pool2, W_conv3) + b_conv3)
h_pool3 = max_pool_2x2(h_conv3)
## 4th, full-connection
W_fc1 = weight_variable([4 * 4 * 32, 128])
b_fc1 = bias_variable([128])
h_pool2_flat = tf.reshape(h_pool3, [-1, 4*4*32])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
keep_prob = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)
## final,
W_fc2 = weight_variable([128,2])
b_fc2 = bias_variable([2])
y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
#residual = y_ - y_conv
residual2 = tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y_conv)
cross_entropy = tf.reduce_mean(residual2)
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
##############################################################
##
## start script main
##
import random
import sys, os
import time
import datetime
from skimage import io
import pickle
from multiprocessing import Process, Queue
filenames = []
if os.path.isfile(sys.argv[1]):
MODE = 'TRAIN'
f = open(sys.argv[1])
filenames = f.readlines()
f.close()
filenames.pop(0)
if SHUFFLE:
random.shuffle(filenames)
elif os.path.isdir(sys.argv[1]):
MODE = 'TEST'
filenames = os.listdir(sys.argv[1])
def read_and_norms(filenames, pknum, MODE, TARGETDIR, TEST_PICKLE_DIR):
def fun1(line):
l2 = line.split()
t1 = time.time()
img = io.imread(TARGETDIR+l2[0])
t2 = time.time()
if MODE == 'TRAIN' and l2[1] == '0':
L = [1, 0]
else:
L = [0, 1]
return [[img, L], [t2-t1]]
imgs = []
pkfile = TEST_PICKLE_DIR + 'pkl' + str(pknum) if pknum != False else ''
if ((pknum!=False) and (os.path.exists(pkfile))):
with open(pkfile, 'rb') as f:
imgs = pickle.load(f)
else:
imgX = map(fun1, filenames)
imgs = list(imgX)
if ((pknum!=False) and (not os.path.exists(pkfile))):
with open(pkfile, 'wb') as f:
pickle.dump(imgs, f)
def fun2(img0):
img = img0[0][0]
t2 = time.time()
a, b = img.min(), img.max()
if (b-a > 0):
img = (img - a) / (b - a)
t3 = time.time()
return [[img, img0[0][1]], [img0[1][0], t3-t2]]
else:
t3 = time.time()
return [[], [img0[1][0], t3-t2]]
return map(fun2, imgs)
def read_fork(filenames, MODE, _fun1, TARGETDIR, TEST_PICKLE_DIR, q):
_start_time = time.time()
read_time = 0
read_time1 = 0
read_time2 = 0
batchnum = 0
while len(filenames)>0:
filenames_x = filenames[:min(BATCHSIZE, len(filenames))]
print("batch:", len(filenames_x), '/', len(filenames), "elapsed:", time.time() - _start_time, "sec")
del filenames[:min(BATCHSIZE, len(filenames))]
batchnum += 1
if MODE == 'TRAIN':
filenames_y = [l.split()[0] for l in filenames_x]
else:
filenames_y = filenames_x
Train = []
Label = []
r0 = time.time()
### read images, labels
pknum = False if MODE == 'TRAIN' else batchnum
imgs = _fun1(filenames_x, pknum, MODE, TARGETDIR, TEST_PICKLE_DIR)
for l in imgs:
a, times = l
if len(a)>0:
Train.append(a[0])
Label.append(a[1])
read_time1 += times[0]
read_time2 += times[1]
read_time += time.time() - r0
q.put([Train, Label, filenames_y])
q.put(['END', read_time, read_time1, read_time2])
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
start_time = time.time()
if MODE2 != 'RESET':
if os.path.exists(SESSION_FILE + ".meta"):
saver.restore(sess, SESSION_FILE)
if MODE != 'TRAIN':
TARGETDIR = sys.argv[1] + '/'
compute_time = 0
q = Queue()
p = Process(target=read_fork, args=(filenames, MODE, read_and_norms, TARGETDIR, TEST_PICKLE_DIR, q))
p.start()
do_CNN = True
while do_CNN:
imgs = q.get()
if imgs[0] != 'END':
## CNN
Train = imgs[0]
Label = imgs[1]
filenames_y = imgs[2]
#print(Train)
#print(Train[0])
#print(Train[0].dtype, Train[0].shape)
#print(Label)
Label0 = [[0, 1] for i in Train]
c0 = time.time()
if MODE == 'TRAIN':
for i in range(NUM_STEP):
train_step.run(feed_dict={x: Train, y_: Label, keep_prob: 0.5})
if i % 10 == 9:
train_accuracy = accuracy.eval(feed_dict={
x: Train, y_: Label, keep_prob: 1.0})
print('step %d, training accuracy %g' % (i+1, train_accuracy))
#print(residual2.eval(feed_dict={
# x: Train, y_: Label, keep_prob: 1.0}))
#print(y_conv.eval(feed_dict={
# x: Train, y_: Label, keep_prob: 1.0}))
print(residual2.eval(feed_dict={
x: Train, y_: Label, keep_prob: 1.0}))
answer = correct_prediction.eval(feed_dict={
x: Train, y_: Label0, keep_prob: 1.0})
if MODE == 'TRAIN':
for i in range(len(answer)):
print(filenames_y[i], 0 if answer[i]==False else 1, Label[i][1])
else:
for i in range(len(answer)):
print(filenames_y[i], 0 if answer[i]==False else 1)
compute_time += time.time() - c0
else:
do_CNN = False
read_time = imgs[1]
read_time1 = imgs[2]
read_time2 = imgs[3]
p.join()
if MODE == 'TRAIN':
saver.save(sess, SESSION_FILE)
print(datetime.datetime.now())
print('elapsed time', time.time() - start_time)
print('read', read_time, 'compute', compute_time)
print('read', read_time1, read_time2)