CNNで高圧縮されたJPG画像のノイズを除去する学習モデルを作成したので公開。コードはGitHubにて公開中。
動作環境
- Ubuntu 16.04.3 LTS
- Python 3.5.2
- chainer 3.2
- numpy 1.13.3
- cupy 2.2
- opencv-python 3.4.0.12
結果
概要を説明する前に、結果を以下に示す。[Original]
が未圧縮、[Compression]
が高圧縮、[Restoration]
がDeep Learningで高圧縮を補正した画像。ちょっとぼやけているが、おおむねきれいに復元できている(と思う)。補足すると使用しているフォントは学習に利用していないし、漢字も学習させていない。
以下はネットワーク層の層を減らして、中間層の数も減らした簡易版。細部が少々雑かも?
実行速度
正式版と簡易版の実行時間の違いは以下の通り。
- CPU:core-i7 7700K
- GPU:GTX 1060 6G
項目 | CPU/GPU | 銀河鉄道の夜 | よだかの星 |
---|---|---|---|
正式版 | CPU | 287.75 s | 216.85 s |
GPU | 3.51 s | 2.45 s | |
簡易版 | CPU | 4.92 s | 3.92 s |
GPU | 0.51 s | 0.05 s |
簡易版がCPUでも問題ない速度が出ている。圧縮率がそこまで高くない場合や細部を気にしない場合は簡易版で十分。
概要
データセットの作成
フォント40種Xフォントサイズ3種の120種類のランダムな文字列が記載された画像を準備する。動作確認だけならFontDataフォルダのデータを利用する。冒頭の結果を再現するには全部入りを利用する必要あり。
処理の流れとしては、以下の通り。
- 画像の読み込み
- 画像の圧縮と分割(水増しあり)
- 画像のシャッフル
- 画像の保存
def main(args):
ch = IMG.getCh(args.channel)
imgs = [cv2.imread(name, ch) for name in args.jpeg]
x, _ = IMG.split(
IMG.rotate(IMG.encodeDecode(imgs, ch, args.quality)),
args.img_size,
args.round
)
y, _ = IMG.split(
IMG.rotate(imgs),
args.img_size,
args.round
)
shuffle = np.random.permutation(range(len(x)))
train_size = int(len(x) * args.train_per_all)
train_x = IMG.imgs2arr(x[shuffle[:train_size]])
train_y = IMG.imgs2arr(y[shuffle[:train_size]])
test_x = IMG.imgs2arr(x[shuffle[train_size:]])
test_y = IMG.imgs2arr(y[shuffle[train_size:]])
saveNPZ(train_x, train_y, 'train', args.out_path, args.img_size)
saveNPZ(test_x, test_y, 'test', args.out_path, args.img_size)
高圧縮JPG画像の作成 encodeDecode()
と画像の分割と水増し split()
、rotate()
OpenCVの機能を利用して画像を高圧縮して入力画像リストを生成する(詳細)。出力画像は高圧縮していないものをそのまま使用する。
入力画像リストx
と正解画像リストy
を生成するにあたって事前にrotate()
を利用して左右反転と上下反転画像を追加し、画像を水増する。次に画像の分割(詳細)を行う。img_size
は画像を分割するサイズでround
は丸める端数の数。これらは下記で生成している。
ch = IMG.getCh(args.channel)
imgs = [cv2.imread(name, ch) for name in args.jpeg]
x, _ = IMG.split(
IMG.rotate(IMG.encodeDecode(imgs, ch, args.quality)),
args.img_size,
args.round
)
y, _ = IMG.split(
IMG.rotate(imgs),
args.img_size,
args.round
)
データセットをシャッフルする
学習効率を上げる効果を期待して各画像リストをシャッフルする。また、学習用とテスト用に各画像リストを分割する。シャッフルもx
とy
で対応関係が崩れないように、共通のランダム配列をshuffle
として生成し、利用する。imgs2arr()
では画像をChainer用に変換(詳細)している。
shuffle = np.random.permutation(range(len(x)))
train_size = int(len(x) * args.train_per_all)
train_x = IMG.imgs2arr(x[shuffle[:train_size]])
train_y = IMG.imgs2arr(y[shuffle[:train_size]])
test_x = IMG.imgs2arr(x[shuffle[train_size:]])
test_y = IMG.imgs2arr(y[shuffle[train_size:]])
データセットの保存
NPZ形式でデータセットを保存する。保存先はout_path
で、train_[Size]x[Size]_[Num].npz
とtest_[Size]x[Size]_[Num].npz
がそこに保存される。
saveNPZ(train_x, train_y, 'train', args.out_path, args.img_size)
saveNPZ(test_x, test_y, 'test', args.out_path, args.img_size)
データセットの確認
Tools/npz2jpg.py
を使用することで作成したデータセットを確認できる。上段が圧縮画像(入力画像)で下段が無圧縮画像(正解画像)。フォントの小さい文字などはもはや原型をとどめていない。
学習
学習の実行
デフォルトで用意している簡易版データセットを利用する場合は以下を入力する。自作データセットの場合は./FontData/
をそのデータセットの場所に変更する。オプションはいろいろ設定できるようになっており、例えばGPUを使用する場合は-g 0
などとすると使える。
$ ./train.py -i ./FontData/
MNISTの学習サンプルを改変しているため、以下ではtrain.py
で改変時に修正した箇所を説明する。
データセットの読み込み getImgData()
getImgData()
はNPZファイルを読み込み、学習用とテスト用のデータを準備する。ChainerのTrainerで自分のデータセットを使用するためにtuple_dataset.TupleDataset(入力画像リスト, 正解画像リスト)
にデータを格納する。
また、今回使用するネットワーク層は出力画像が入力画像の2倍の大きさになっているため、正解画像のraw
をsize2x()
で2倍にしている。
np_arr = np.load(os.path.join(folder, l))
x = np.array(np_arr['x'], dtype=np.float32)
y = IMG.arr2x(np.array(np_arr['y'], dtype=np.float32))
train = tuple_dataset.TupleDataset(x, y)
ネットワーク層の設定
中間層のユニット数やネットワーク層の数、活性化関数や損失関数の設定をオプション引数で設定できるようにしておけばパラメータ調整が楽になる。また、今回は分類問題ではないので、compute_accuracyをFalse
にする。
actfun_1 = IMG.getActfun(args.actfun_1)
actfun_2 = IMG.getActfun(args.actfun_2)
model = L.Classifier(
JC(n_unit=args.unit, layer=args.layer_num, rate=args.shuffle_rate,
actfun_1=actfun_1, actfun_2=actfun_2, dropout=args.dropout,
view=args.only_check),
lossfun=IMG.getLossfun(args.lossfun)
)
model.compute_accuracy = False
ネットワーク層はDownSampleBlock()
とUpsampleBlock()
で構成されている(詳細)。以下はネットワーク層初期化部分の抜粋。
class JC(Chain):
def __init__(self, n_unit=128, n_out=1, rate=4,
layer=3, actfun_1=F.relu, actfun_2=F.sigmoid,
dropout=0.0, view=False):
unit1 = n_unit
unit2 = n_unit//2
unit4 = n_unit//4
super(JC, self).__init__()
with self.init_scope():
self.block1a = DownSanpleBlock(unit2, 3, 1, 1, actfun_1)
self.block1b = DownSanpleBlock(unit1, 5, 2, 2, actfun_1, dropout)
self.block1c = UpSampleBlock(unit1, unit4, 5, 1, 2, actfun_2)
:
:
:
self.blockNa = DownSanpleBlock(unit1, 3, 1, 1, actfun_1)
self.blockNb = DownSanpleBlock(unit1, 3, 1, 1, actfun_1, dropout)
self.blockNc = UpSampleBlock(rate**2, 1, 5, 1, 2, actfun_2, rate)
self.layer = layer
self.view = view
json形式でパラメータ情報を保存する
predict.pyでネットワーク層の設定を読み込むのを自動化するために、学習時のネットワーク層の設定を辞書で格納する。保存はtrainer.run()
の直前にjson形式で保存している。
model_param = {i: getattr(args, i) for i in dir(args) if not '_' in i[0]}
model_param['img_ch'] = train[0][0].shape[0]
ハイパーパラメータの試行を自動化
auto_train.sh
を使うことでハイパーパラメータの試行を自動化できる。デフォルトではバッチサイズを変更して順次実行する。オプション引数が正しいかどうかはauto_train.sh -c
で確認できる。
#!/bin/bash
:
:
:
FLAG_CHK=""
:
:
:
COUNT=1
echo -e "\n<< test ["${COUNT}"] >>\n"
./train.py -i FontData/ -o ./result/001/ -b 10 -e 30 $FLAG_CHK
./Tools/plot_diff.py ./result/001 --no
COUNT=$(( COUNT + 1 ))
echo -e "\n<< test ["${COUNT}"] >>\n"
./train.py -i FontData/ -o ./result/001/ -b 30 -e 30 $FLAG_CHK
./Tools/plot_diff.py ./result/001 --no
:
:
:
推論実行
学習が終了すると*.model
が作成されているのでこれを利用すれば推論実行が可能になる。オプション引数-f
でスナップショットの保存をしている場合は*.snapshot
でも実行可能。あと学習実行時に生成された*.json
ファイルと推論実行したい画像を選択する。
推論実行したい画像はpredict.py
内で画像圧縮するので未圧縮画像を選択すること。また、画像は複数枚入力できる。
$ ./predict.py [モデルまたはスナップショットのパス] [学習パラメータのパス] [画像] ...
モデルのパラメータ取得
プログラム実行時に入力したモデルパラメータをここで取得する。活性化関数やネットワーク層の中間層の数などはgetActFun()
で取得しておく。
import json
def getModelParam(path):
try:
with open(path, 'r') as f:
d = json.load(f)
except:
import traceback
traceback.print_exc()
print(F.fileFuncLine())
exit()
af1 = IMG.getActfun(d['actfun_1'])
af2 = IMG.getActfun(d['actfun_2'])
return d['unit'], d['img_ch'], d['layer_num'], d['shuffle_rate'], af1, af2
推論実行メイン部
全体像は以下。後でそれぞれを説明する。
def predict(model, args, img, ch, val):
org_size = img.shape
comp = IMG.encodeDecode([img], IMG.getCh(ch), args.quality)
comp, size = IMG.split(comp, args.img_size)
imgs = []
st = time.time()
for i in range(0, len(comp), args.batch):
x = IMG.imgs2arr(comp[i:i + args.batch], gpu=args.gpu)
y = model.predictor(x)
y = to_cpu(y.array)
y = IMG.arr2imgs(y, 1, args.img_size * 2)
imgs.extend(y)
buf = [np.vstack(imgs[i * size[0]: (i + 1) * size[0]])
for i in range(size[1])]
img = np.hstack(buf)
h = 0.5
half_size = (int(img.shape[1] * h), int(img.shape[0] * h))
flg = cv2.INTER_NEAREST
img = cv2.resize(img, half_size, flg)
img = img[:org_size[0], :org_size[1]]
return img
画像の圧縮と分割 encodeDecode()
、split()
基本的にはデータセット作成時と同じ。分割時に生成されるsize
は画像を縦横何分割したかが書かれてあり、あとで分割画像を元に戻すときに必要。
comp = IMG.encodeDecode([img], IMG.getCh(ch), args.quality)
comp, size = IMG.split(comp, args.img_size)
推論実行
分割した画像をバッチサイズごとに推論実行していく。推論実行前に画像をChainer形式に変換
し、推論実行後はChainer形式をOpenCV形式にしてimgs
に格納する。
imgs = []
for i in range(0, len(comp), args.batch):
x = IMG.imgs2arr(comp[i:i + args.batch], gpu=args.gpu)
y = model.predictor(x)
y = to_cpu(y.array)
y = IMG.arr2imgs(y, 1, args.img_size * 2)
imgs.extend(y)
画像の結合
推論実行された出力画像を分割時に取得したsize
で結合していく。また、出力画像は入力画像の2倍の大きさになっているのでcv2.resize()
で半分の大きさにする。
単に結合しただけでは分割前と画像サイズが異なる(img_size
の倍数で生成される)ため、predict()
の初期で確保しておいたorg_size
で画像サイズを調整して完成。
buf = [np.vstack(imgs[i * size[0]: (i + 1) * size[0]])
for i in range(size[1])]
img = np.hstack(buf)
h = 0.5
half_size = (int(img.shape[1] * h), int(img.shape[0] * h))
flg = cv2.INTER_NEAREST
img = cv2.resize(img, half_size, flg)
img = img[:org_size[0], :org_size[1]]