LoginSignup
0
0

More than 5 years have passed since last update.

GCP で tensorflow 遊び5

Posted at

GCP で tensorflow 遊び4の続き

そろそろ飽きてきたので、楽しい楽しい高速化遊びで遊ぶことにする。

実は gcp はディスクが遅い。
hdd なんか使うと 0.75 IOPS でどうだかいう話で( https://cloud.google.com/compute/docs/disks/performance?hl=ja ) read に 3400 秒くらいかかっていたのが ssd にして 310 秒くらいに丸められたので良しとしたのは 第1回 で書いた。
それでも 1RUN あたり 300秒とか消費されているので、これがどうにかならないかと弄ってみる。

pickle でキャッシュ

trainとtestのうち、trainはバッチを固定したくないのでshuffleして読んでいるのでキャッシュできない。
しかしtestはshuffleいらないよなーということでディレクトリを頭から読んでいるのでこれをキャッシュしてみる。

もともと read まわりはこんなコードだった。

def read_and_norm(line):
    l2 = line.split()
    t1 = time.time()
    img = io.imread(TARGETDIR+l2[0])
    t2 = time.time()
    a, b = img.min(), img.max()
    if (b-a > 0):
        img = (img - a) / (b - a)
        t3 = time.time()
        #print(l2[0],a,b)
        if MODE == 'TRAIN' and l2[1] == '0':
            L = [1, 0]
        else:
            L = [0, 1]
        return [[img, L], [t2-t1, t3-t2]]
    else:
        t3 = time.time()
        return [[], [t2-t1, t3-t2]]

lineはtrain_masterから読んできた行で、ファイル名とラベルが

train_255369.tif        0

のように入っている。元のtrain_masterから100行くらい切り出して、1行毎にこの関数でfile readしてなにか normalize して one hot 形式でラベル貼っつけて、ついでに経過時間のリストとセットで返している。

一旦これを、こんな形式に書き換える。

def read_and_norms(filenames):
    def fun1(line):
        l2 = line.split()
        t1 = time.time()
        img = io.imread(TARGETDIR+l2[0])
        t2 = time.time()
        if MODE == 'TRAIN' and l2[1] == '0':
            L = [1, 0]
        else:
            L = [0, 1]
        return [[img, L], [t2-t1]]
    imgs = map(fun1, filenames)
    def fun2(img0):
        img = img0[0][0]
        t2 = time.time()
        a, b = img.min(), img.max()
        if (b-a > 0):
            img = (img - a) / (b - a)
            t3 = time.time()
            return [[img, img0[0][1]], [img0[1][0], t3-t2]]
        else:
            t3 = time.time()
            return [[], [img0[1][0], t3-t2]]
    return map(fun2, imgs)

一瞬でわかりにくくなったが、今度は1バッチ分のtrain_masterの行配列が渡ってきて、これをmap関数で1行単位に分割して行の中を分割してreadしてラベル貼っつけて、mapなのでlistでreturnして、それを後続のmap関数でnormalize処理している。

retrunされるのは、画像とラベルのリストと経過時間二つのリストが束ねられたリストがファイル数分固められたリストである。

この、前段のmap関数の結果をpickleで保存しておいて次回以降は再利用する。

def read_and_norms(filenames, pknum):
    def fun1(line):
        l2 = line.split()
        t1 = time.time()
        img = io.imread(TARGETDIR+l2[0])
        t2 = time.time()
        if MODE == 'TRAIN' and l2[1] == '0':
            L = [1, 0]
        else:
            L = [0, 1]
        return [[img, L], [t2-t1]]
    imgs = []
    pkfile = TEST_PICKLE_DIR + 'pkl' + str(pknum) if pknum != False else ''
    if ((pknum!=False) and (os.path.exists(pkfile))):
        with open(pkfile, 'rb') as f:
            imgs = pickle.load(f)
    else:
        imgX = map(fun1, filenames)
        imgs = list(imgX)
        if ((pknum!=False) and (not os.path.exists(pkfile))):
            with open(pkfile, 'wb') as f:
                pickle.dump(imgs, f)
    def fun2(img0):
        img = img0[0][0]
        t2 = time.time()
        a, b = img.min(), img.max()
        if (b-a > 0):
            img = (img - a) / (b - a)
            t3 = time.time()
            return [[img, img0[0][1]], [img0[1][0], t3-t2]]
        else:
            t3 = time.time()
            return [[], [img0[1][0], t3-t2]]
    return map(fun2, imgs)

引数が増えているがこれは

        pknum = False if MODE == 'TRAIN' else batchnum
        imgs = read_and_norms(filenames_x, pknum)

としてTESTモードの時だけバッチ番号を渡している。
このバッチ番号から

    pkfile = TEST_PICKLE_DIR + 'pkl' + str(pknum) if pknum != False else ''

でファイル名を作っているのは特に説明することはなくて、そのあとのif文も

  1. pickleするつもりでいてファイルが存在していたらそこから読む
  2. さもなくば真面目に imread して、
    1. pickleするつもりでいてファイルが存在していなかったらpickleで保存する

というだけである。

こいつの面倒な点は、python3のmapはリストではなくiterableを後ろに回すので途中でpickle保存のために全部巻き取ると後ろのmap関数にiterableが届かなくなる点にある。
よって、pickle保存しようがしまいがそこから読もうが一旦リストimgsを経由して二つのmap関数を繋ぐ必要がある。

結果がどうなったか

file read/sec total elapsed time/sec
最初の形式 280.75 299.44
map関数2個に置き換え 281.47 289.51
pickleあり1RUN目 289.19 297.08
pickleあり2RUN目 100.18 107.26

map関数置き換えで特に悪さが出るでもなく。意図通りpickleありの2度目以降はcacheから読むので file open の回数が 1/100 になって 300秒から100秒への短縮に成功している。

multiprocess で read と compute の並行処理

pythonじゃなければ、 multi thread で read と compute を重ねるとか、ファイル100個の read を並行処理でぶん投げて promise で待つとかいろいろやりようがあるのだが、なにせ python は GIL で 1 thread しか同時に回らないのでどうしようも無い。

こういうのを process 並列にするとプロセス間通信が必要で、メモリコピーが発生して嫌だけど(multi threadだったら勝手に覗き込めばいい)他にないので multiprocessing module で並行処理してみる。
ただこの場合、 Queue オブジェクトを使ってコード上では何も考えずにデータの受け渡しができるのは大変えらい。

こんな関数を別プロセス上で動かす。

def read_fork(filenames, MODE, _fun1, TARGETDIR, TEST_PICKLE_DIR, q):
    _start_time = time.time()
    read_time = 0
    read_time1 = 0
    read_time2 = 0
    batchnum = 0
    while len(filenames)>0:
        filenames_x = filenames[:min(BATCHSIZE, len(filenames))]
        print("batch:", len(filenames_x), '/', len(filenames), "elapsed:", time.time() - _start_time, "sec")
        del filenames[:min(BATCHSIZE, len(filenames))]
        batchnum += 1
        if MODE == 'TRAIN':
            filenames_y = [l.split()[0] for l in filenames_x]
        else:
            filenames_y = filenames_x

        Train = []
        Label = []
        r0 = time.time()
        ### read images, labels
        pknum = False if MODE == 'TRAIN' else batchnum
        imgs = _fun1(filenames_x, pknum, MODE, TARGETDIR, TEST_PICKLE_DIR)
        for l in imgs:
            a, times = l
            if len(a)>0:
                Train.append(a[0])
                Label.append(a[1])
            read_time1 += times[0]
            read_time2 += times[1]
        read_time += time.time() - r0
        q.put([Train, Label, filenames_y])
    q.put(['END', read_time, read_time1, read_time2])

train_masterの全ファイル名をこのread_forkに渡して別プロセスで動かさせると、こいつは1バッチ100ファイルづつ読んでリストにしてQueueに突っ込んでメインプロセスに送り返してくる。
全ファイル読み終わると終了サインを含むリストを送ってくるので、メインプロセス側では読み込んだ画像データが来続ける限りCNN処理を繰り返す。

呼び出し側はこんな感じである。

    q = Queue()
    p = Process(target=read_fork, args=(filenames, MODE, read_and_norms, TARGETDIR, TEST_PICKLE_DIR, q))
    p.start()
    do_CNN = True
    while do_CNN:
        imgs = q.get()
        if imgs[0] != 'END':
            ## CNN
            Train = imgs[0]
            Label = imgs[1]
            filenames_y = imgs[2]

            ### ここに CNN 処理が入る

        else:
            do_CNN = False

read_and_norms は前述のpickleありmap2段の file read 関数だがこれも引数として渡して何事もなく別プロセスに渡る。

結果どうなったか

read/sec compute/sec elapsed time/sec
single process 331.91 459.56 791.98
multi process 390.22 534.85 548.88

(computeが若干変わっているのは無視して)read timeがどちらも300秒以上あるにも関わらず total elapsed timeは790秒が550秒に圧縮されている。
この分がreadとcomputeが実時間で重なっている分である。

参考までに、全script

TARGETDIR = 'train/'
BATCHSIZE = 100
MODE2 = 'CONT'
#MODE2 = 'RESET'
NUM_STEP = 50
SHUFFLE = True
SESSION_FILE = "./model.ckpt"
TEST_PICKLE_DIR = "../pkl/"

##############################################################
##
## tensorflow CNN structures
##

## CNN
## [32,32,7]pixel
## -> [3,3,7,16] kernel -> [32,32,16]pixel -> [16,16,16]pixel
## -> [3,3,16,32] kernel -> [16,16,32]pixel -> [8,8,32]pixel
## -> [3,3,32,32] kernel -> [8,8,32]pixel -> [4,4,32]pixel
## -> [128]FC
## -> 2

import tensorflow as tf

def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)

def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1,1,1,1], padding='SAME')

def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1,2,2,1], strides=[1,2,2,1], padding='SAME')

x = tf.placeholder(tf.float32, shape=[None, 32,32,7])
y_ = tf.placeholder(tf.float32, shape=[None,2])

#x_image = x / 256.0
x_image = x

## 1st layer
W_conv1 = weight_variable([3,3,7,16])
b_conv1 = bias_variable([16])

h_conv1 = tf.nn.relu(conv2d(x_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)

## 2nd layer
W_conv2 = weight_variable([3,3,16,32])
b_conv2 = bias_variable([32])

h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)

## 3rd layer
W_conv3 = weight_variable([3,3,32,32])
b_conv3 = bias_variable([32])

h_conv3 = tf.nn.relu(conv2d(h_pool2, W_conv3) + b_conv3)
h_pool3 = max_pool_2x2(h_conv3)

## 4th, full-connection
W_fc1 = weight_variable([4 * 4 * 32, 128])
b_fc1 = bias_variable([128])

h_pool2_flat = tf.reshape(h_pool3, [-1, 4*4*32])
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)

keep_prob = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

## final,
W_fc2 = weight_variable([128,2])
b_fc2 = bias_variable([2])

y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2

#residual = y_ - y_conv
residual2 = tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=y_conv)

cross_entropy = tf.reduce_mean(residual2)
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
correct_prediction = tf.equal(tf.argmax(y_conv, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

##############################################################
##
## start script main
##

import random
import sys, os
import time
import datetime
from skimage import io
import pickle
from multiprocessing import Process, Queue

filenames = []

if os.path.isfile(sys.argv[1]):
    MODE = 'TRAIN'
    f = open(sys.argv[1])
    filenames = f.readlines()
    f.close()
    filenames.pop(0)
    if SHUFFLE:
        random.shuffle(filenames)
elif os.path.isdir(sys.argv[1]):
    MODE = 'TEST'
    filenames = os.listdir(sys.argv[1])

def read_and_norms(filenames, pknum, MODE, TARGETDIR, TEST_PICKLE_DIR):
    def fun1(line):
        l2 = line.split()
        t1 = time.time()
        img = io.imread(TARGETDIR+l2[0])
        t2 = time.time()
        if MODE == 'TRAIN' and l2[1] == '0':
            L = [1, 0]
        else:
            L = [0, 1]
        return [[img, L], [t2-t1]]
    imgs = []
    pkfile = TEST_PICKLE_DIR + 'pkl' + str(pknum) if pknum != False else ''
    if ((pknum!=False) and (os.path.exists(pkfile))):
        with open(pkfile, 'rb') as f:
            imgs = pickle.load(f)
    else:
        imgX = map(fun1, filenames)
        imgs = list(imgX)
        if ((pknum!=False) and (not os.path.exists(pkfile))):
            with open(pkfile, 'wb') as f:
                pickle.dump(imgs, f)
    def fun2(img0):
        img = img0[0][0]
        t2 = time.time()
        a, b = img.min(), img.max()
        if (b-a > 0):
            img = (img - a) / (b - a)
            t3 = time.time()
            return [[img, img0[0][1]], [img0[1][0], t3-t2]]
        else:
            t3 = time.time()
            return [[], [img0[1][0], t3-t2]]
    return map(fun2, imgs)

def read_fork(filenames, MODE, _fun1, TARGETDIR, TEST_PICKLE_DIR, q):
    _start_time = time.time()
    read_time = 0
    read_time1 = 0
    read_time2 = 0
    batchnum = 0
    while len(filenames)>0:
        filenames_x = filenames[:min(BATCHSIZE, len(filenames))]
        print("batch:", len(filenames_x), '/', len(filenames), "elapsed:", time.time() - _start_time, "sec")
        del filenames[:min(BATCHSIZE, len(filenames))]
        batchnum += 1
        if MODE == 'TRAIN':
            filenames_y = [l.split()[0] for l in filenames_x]
        else:
            filenames_y = filenames_x

        Train = []
        Label = []
        r0 = time.time()
        ### read images, labels
        pknum = False if MODE == 'TRAIN' else batchnum
        imgs = _fun1(filenames_x, pknum, MODE, TARGETDIR, TEST_PICKLE_DIR)
        for l in imgs:
            a, times = l
            if len(a)>0:
                Train.append(a[0])
                Label.append(a[1])
            read_time1 += times[0]
            read_time2 += times[1]
        read_time += time.time() - r0
        q.put([Train, Label, filenames_y])
    q.put(['END', read_time, read_time1, read_time2])



with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    saver = tf.train.Saver()
    start_time = time.time()
    if MODE2 != 'RESET':
        if os.path.exists(SESSION_FILE + ".meta"):
            saver.restore(sess, SESSION_FILE)

    if MODE != 'TRAIN':
        TARGETDIR = sys.argv[1] + '/'

    compute_time = 0
    q = Queue()
    p = Process(target=read_fork, args=(filenames, MODE, read_and_norms, TARGETDIR, TEST_PICKLE_DIR, q))
    p.start()
    do_CNN = True
    while do_CNN:
        imgs = q.get()
        if imgs[0] != 'END':
            ## CNN
            Train = imgs[0]
            Label = imgs[1]
            filenames_y = imgs[2]

            #print(Train)
            #print(Train[0])
            #print(Train[0].dtype, Train[0].shape)
            #print(Label)

            Label0 = [[0, 1] for i in Train]
            c0 = time.time()

            if MODE == 'TRAIN':
                for i in range(NUM_STEP):
                    train_step.run(feed_dict={x: Train, y_: Label, keep_prob: 0.5})
                    if i % 10 == 9:
                        train_accuracy = accuracy.eval(feed_dict={
                            x: Train, y_: Label, keep_prob: 1.0})
                        print('step %d, training accuracy %g' % (i+1, train_accuracy))
                        #print(residual2.eval(feed_dict={
                        #    x: Train, y_: Label, keep_prob: 1.0}))

                #print(y_conv.eval(feed_dict={
                #    x: Train, y_: Label, keep_prob: 1.0}))
                print(residual2.eval(feed_dict={
                    x: Train, y_: Label, keep_prob: 1.0}))

            answer = correct_prediction.eval(feed_dict={
                x: Train, y_: Label0, keep_prob: 1.0})
            if MODE == 'TRAIN':
                for i in range(len(answer)):
                    print(filenames_y[i], 0 if answer[i]==False else 1, Label[i][1])
            else:
                for i in range(len(answer)):
                    print(filenames_y[i], 0 if answer[i]==False else 1)
            compute_time += time.time() - c0
        else:
            do_CNN = False
            read_time = imgs[1]
            read_time1 = imgs[2]
            read_time2 = imgs[3]
    p.join()


    if MODE == 'TRAIN':
        saver.save(sess, SESSION_FILE)

    print(datetime.datetime.now())
    print('elapsed time', time.time() - start_time)
    print('read', read_time, 'compute', compute_time)
    print('read', read_time1, read_time2)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0