svmの実験1の続き
pixel size が等しい 1 channel 画像が3枚ある。
同じ pixel 位置に対して、1画像の画素値をラベル、残りの画像の画素値をまとめて特徴量ベクトルとして scikit-learn の svm でクラス分けしたところ、前処理、学習、判定いずれも処理時間が長かった。
同じことを spark でもやってみる。
4. 読み込みと抽出
4.1 python
本当は map 処理して list で扱うが、説明のため以下ではベタ書きする。
まず画像の読み込み。読み込まれたものは、1 channel 画像なので numpy 2D array になっている
(多チャンネルの場合は、チャンネル分の 画素値 list が 2D に並ぶ)。
即 reshape で1次元化する。
ラベルの場合は、値で 1 か 0 でクラス化した。
from skimage import io
img3 = io.imread('train_annotations/train_00.png')
img31 = img3.reshape([1, -1])
img32 = img31[0]
label = np.where(img32>10, 1, 0)
同様にもう2枚も読み込み結合する。
img1 = io.imread('train_images/train_hh_00.jpg')
img11 = img1.reshape([1, -1])
img2 = io.imread('train_images/train_hv_00.jpg')
img21 = img2.reshape([1, -1])
train = np.vstack((img11, img21)).transpose()
以下の様な形状になっている。
>>> print(label)
[0 0 0 ... 0 0 0]
>>> print(train)
[[1 1]
[1 0]
[1 0]
...
[8 2]
[9 2]
[8 2]]
label配列をマスクにすることで、以下の様に簡単に抽出できる。
mask = label == 1
train_pos = train[mask]
train_neg = train[np.logical_not(mask)]
sample_num = min(len(train_pos), len(train_neg))
if len(train_pos) < len(train_neg):
np.random.shuffle(train_neg)
else:
np.random.shuffle(train_pos)
train_balance = np.concatenate([train_pos[:sample_num], train_neg[:sample_num]])
positive, negative の長い方をrandom shuffleしておいて短い方と同じ長さだけ切り出してきて結合。
このあと、結合したものも random shuffle したいので、 label とセットにするが、このラベルは長さがわかっているので手で作れる。
np.hstack() で積み重ねて結合するが、形状を合わせておかないといけないのでreshape([len(label_new),1])
がついている。
label_new = [1 for i in range(sample_num)] + [0 for i in range(sample_num)]
Y = np.hstack([train_balance, np.array(label_new).reshape([len(label_new),1])])
np.random.shuffle(Y)
(train_new, label_new) = (Y[:,0:2], Y[:,2].transpose())
こうなる
>>> print(len(train), len(train_new))
95685968 9623644
同種のことを Spark でやりたい。
4.2 spark
とりあえず、 spark に numpy ndarray を直接 RDD に変換するすべはない。
python list 以外は読み込まない仕様なので、1次元化した numpy array をさらに tolist()
で変換する。
その際、いくら numpy 上で1次元でも 1xN 行列のままなので、その [0]
を抽出してやる。
なお、あまり大きな配列を RDD にしようとするとメモリ不足で死ぬという本末転倒な罠がある。
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
: java.lang.OutOfMemoryError: Java heap space
云々と、 OutOfMemoryError
だの Java heap space
だの言ってくる場合がこれで、この時はオプションで
--driver-memory 4g
などと大きめのメモリサイズを指定する(デフォルト値は 1g)。
よく見かけるオプションは --executor-memory
だがそっちでは駄目。
from skimage import io
img3 = io.imread('train_annotations/train_00.png')
img31 = img3.reshape([1, -1])
img1 = io.imread('train_images/train_hh_00.jpg')
img11 = img1.reshape([1, -1])
img2 = io.imread('train_images/train_hv_00.jpg')
img21 = img2.reshape([1, -1])
spimg1 = sc.parallelize(img11[0].tolist())
spimg2 = sc.parallelize(img21[0].tolist())
spimg3 = sc.parallelize(img31[0].tolist())
label = spimg3.map(lambda i: 1 if i>10 else 0)
spimg11 = spimg1.zip(spimg2)
spimg31 = label.zip(spimg11)
train_x_pos = spimg31.filter(lambda i: i[0]==1)
train_x_neg = spimg31.filter(lambda i: i[0]==0)
何が作れているかというと
>>> spimg31.take(10)
2020-02-09 15:16:48,998 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (47186 KB). The maximum recommended task size is 100 KB.
[(0, (1, 1)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 1)), (0, (1, 1))]
という入れ子のタプル。
カウントしてみると
>>> train_x_pos.count()
2020-02-09 15:05:07,812 WARN scheduler.TaskSetManager: Stage 0 contains a task of very large size (47186 KB). The maximum recommended task size is 100 KB.
4811822
>>> train_x_neg.count()
2020-02-09 15:05:47,690 WARN scheduler.TaskSetManager: Stage 1 contains a task of very large size (47186 KB). The maximum recommended task size is 100 KB.
90874146
とちゃんと取れている。残りも同様に
len_pos = train_x_pos.count()
len_neg = train_x_neg.count()
sample_num = min(len_pos, len_neg)
if len_pos < len_neg:
neg = sc.parallelize(train_x_neg.takeSample(False, sample_num))
train_balance = train_x_pos.union(neg)
else:
pos = sc.parallelize(tain_x_pos.takeSample(False, sample_num))
train_balance = pos.union(train_x_neg)
とすれば
>>> train_balance.count()
2020-02-09 15:43:56,483 WARN scheduler.TaskSetManager: Stage 9 contains a task of very large size (47186 KB). The maximum recommended task size is 100 KB.
9623644
と無事変換される。
4.3 消費時間
残念ながら手元の PC (memory 32G, 2.6GHz Core i7) では python コードで2.0秒、93.3秒、9.6秒かかる処理に対して(どの部分かは後述のコードを参照)、
sparkでは55.1秒、170.9秒、51.4秒で3倍程度遅かった。
4.4 使用コード
python
import time
import numpy as np
from skimage import io
T0 = time.time()
img3 = io.imread('train_annotations/train_00.png')
img31 = img3.reshape([1, -1])
img32 = img31[0]
label = np.where(img32>10, 1, 0)
img1 = io.imread('train_images/train_hh_00.jpg')
img11 = img1.reshape([1, -1])
img2 = io.imread('train_images/train_hv_00.jpg')
img21 = img2.reshape([1, -1])
train = np.vstack((img11, img21)).transpose()
print(label)
print(train)
T1 = time.time()
mask = label == 1
train_pos = train[mask]
train_neg = train[np.logical_not(mask)]
sample_num = min(len(train_pos), len(train_neg))
if len(train_pos) < len(train_neg):
np.random.shuffle(train_neg)
else:
np.random.shuffle(train_pos)
train_balance = np.concatenate([train_pos[:sample_num], train_neg[:sample_num]])
label_new = [1 for i in range(sample_num)] + [0 for i in range(sample_num)]
Y = np.hstack([train_balance, np.array(label_new).reshape([len(label_new),1])])
T2 = time.time()
np.random.shuffle(Y)
(train_new, label_new) = (Y[:,0:2], Y[:,2].transpose())
print(label_new)
print(train_new)
print(len(train), len(train_new))
T3 = time.time()
print(T1-T0, T2-T1, T3-T2)
Spark
import time
from skimage import io
from pyspark import SparkContext
sc = SparkContext()
T0 = time.time()
img3 = io.imread('train_annotations/train_00.png')
img31 = img3.reshape([1, -1])
img1 = io.imread('train_images/train_hh_00.jpg')
img11 = img1.reshape([1, -1])
img2 = io.imread('train_images/train_hv_00.jpg')
img21 = img2.reshape([1, -1])
spimg1 = sc.parallelize(img11[0].tolist())
spimg2 = sc.parallelize(img21[0].tolist())
spimg3 = sc.parallelize(img31[0].tolist())
T1 = time.time()
label = spimg3.map(lambda i: 1 if i>10 else 0)
spimg11 = spimg1.zip(spimg2)
spimg31 = label.zip(spimg11)
train_x_pos = spimg31.filter(lambda i: i[0]==1)
train_x_neg = spimg31.filter(lambda i: i[0]==0)
len_pos = train_x_pos.count()
len_neg = train_x_neg.count()
sample_num = min(len_pos, len_neg)
if len_pos < len_neg:
neg = sc.parallelize(train_x_neg.takeSample(False, sample_num))
train_balance = train_x_pos.union(neg)
else:
pos = sc.parallelize(train_x_pos.takeSample(False, sample_num))
train_balance = pos.union(train_x_neg)
T2 =time.time()
print(train_balance.take(10))
print(train_balance.count())
T3 = time.time()
print(T1-T0, T2-T1, T3-T2)
5. Spark SVM
Spark MLlib で SVM で学習させてみる。
使うのは
from pyspark.mllib.classification import SVMWithSGD
だが、その前にフォーマット変換が要る。
RDD で MLlib を使うには LabeledPoint に変換する必要があるらしい。さらに特徴量は dense Vector にする。
ついでにこの時点で 0~255 の画素値を [0, 1) にする、ところだが、値が極端に小さなところに偏っていたので /256 とすべきところを /16 としている。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
labeled_train = train_balance.map(lambda i: LabeledPoint(i[0], Vectors.dense(i[1][0]/16.0, i[1][1]/16.0)))
この、 LabeledPoint の RDD をインポートした SVMWithSGD にいきなり食わせれば model が得られて、こいつで predict できる。
あとはラベルと付き合わせれば正答率がわかる。
from pyspark.mllib.classification import SVMWithSGD
model = SVMWithSGD.train(labeled_train, iterations=100)
labelsAndPreds = labeled_train.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(labeled_train.count())
print("Training Error = " + str(trainErr))
実際やってみると
Training Error = 0.4984654461449322
で何も考えずに片方に全張りしたのと変わらない結果になった。
もうちょっとというところである。