LoginSignup
1
0

More than 3 years have passed since last update.

svmとsparkの実験1

Posted at

svmの実験1の続き

pixel size が等しい 1 channel 画像が3枚ある。

同じ pixel 位置に対して、1画像の画素値をラベル、残りの画像の画素値をまとめて特徴量ベクトルとして scikit-learn の svm でクラス分けしたところ、前処理、学習、判定いずれも処理時間が長かった。
同じことを spark でもやってみる。

4. 読み込みと抽出

4.1 python

本当は map 処理して list で扱うが、説明のため以下ではベタ書きする。

まず画像の読み込み。読み込まれたものは、1 channel 画像なので numpy 2D array になっている
(多チャンネルの場合は、チャンネル分の 画素値 list が 2D に並ぶ)。
即 reshape で1次元化する。
ラベルの場合は、値で 1 か 0 でクラス化した。

from skimage import io
img3 = io.imread('train_annotations/train_00.png')
img31 = img3.reshape([1, -1])
img32 = img31[0]
label = np.where(img32>10, 1, 0)

同様にもう2枚も読み込み結合する。

img1 = io.imread('train_images/train_hh_00.jpg')
img11 = img1.reshape([1, -1])

img2 = io.imread('train_images/train_hv_00.jpg')
img21 = img2.reshape([1, -1])

train = np.vstack((img11, img21)).transpose()

以下の様な形状になっている。

>>> print(label)
[0 0 0 ... 0 0 0]
>>> print(train)
[[1 1]
 [1 0]
 [1 0]
 ...
 [8 2]
 [9 2]
 [8 2]]

label配列をマスクにすることで、以下の様に簡単に抽出できる。

mask = label == 1
train_pos = train[mask]
train_neg = train[np.logical_not(mask)]
sample_num = min(len(train_pos), len(train_neg))
if len(train_pos) < len(train_neg):
    np.random.shuffle(train_neg)
else:
    np.random.shuffle(train_pos)
train_balance = np.concatenate([train_pos[:sample_num], train_neg[:sample_num]])

positive, negative の長い方をrandom shuffleしておいて短い方と同じ長さだけ切り出してきて結合。
このあと、結合したものも random shuffle したいので、 label とセットにするが、このラベルは長さがわかっているので手で作れる。
np.hstack() で積み重ねて結合するが、形状を合わせておかないといけないのでreshape([len(label_new),1])がついている。

label_new = [1 for i in range(sample_num)] + [0 for i in range(sample_num)]
Y = np.hstack([train_balance, np.array(label_new).reshape([len(label_new),1])])
np.random.shuffle(Y)
(train_new, label_new) = (Y[:,0:2], Y[:,2].transpose())

こうなる

>>> print(len(train), len(train_new))
95685968 9623644

同種のことを Spark でやりたい。

4.2 spark

とりあえず、 spark に numpy ndarray を直接 RDD に変換するすべはない。
python list 以外は読み込まない仕様なので、1次元化した numpy array をさらに tolist() で変換する。
その際、いくら numpy 上で1次元でも 1xN 行列のままなので、その [0] を抽出してやる。

なお、あまり大きな配列を RDD にしようとするとメモリ不足で死ぬという本末転倒な罠がある。

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile.
: java.lang.OutOfMemoryError: Java heap space

云々と、 OutOfMemoryError だの Java heap space だの言ってくる場合がこれで、この時はオプションで
--driver-memory 4g などと大きめのメモリサイズを指定する(デフォルト値は 1g)。
よく見かけるオプションは --executor-memory だがそっちでは駄目。

from skimage import io
img3 = io.imread('train_annotations/train_00.png')
img31 = img3.reshape([1, -1])

img1 = io.imread('train_images/train_hh_00.jpg')
img11 = img1.reshape([1, -1])

img2 = io.imread('train_images/train_hv_00.jpg')
img21 = img2.reshape([1, -1])

spimg1 = sc.parallelize(img11[0].tolist())
spimg2 = sc.parallelize(img21[0].tolist())
spimg3 = sc.parallelize(img31[0].tolist())

label = spimg3.map(lambda i: 1 if i>10 else 0)
spimg11 = spimg1.zip(spimg2)
spimg31 = label.zip(spimg11)
train_x_pos = spimg31.filter(lambda i: i[0]==1)
train_x_neg = spimg31.filter(lambda i: i[0]==0)

何が作れているかというと

>>> spimg31.take(10)
2020-02-09 15:16:48,998 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (47186 KB). The maximum recommended task size is 100 KB.
[(0, (1, 1)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 0)), (0, (1, 1)), (0, (1, 1))]

という入れ子のタプル。

カウントしてみると

>>> train_x_pos.count()
2020-02-09 15:05:07,812 WARN scheduler.TaskSetManager: Stage 0 contains a task of very large size (47186 KB). The maximum recommended task size is 100 KB.
4811822
>>> train_x_neg.count()
2020-02-09 15:05:47,690 WARN scheduler.TaskSetManager: Stage 1 contains a task of very large size (47186 KB). The maximum recommended task size is 100 KB.
90874146

とちゃんと取れている。残りも同様に

len_pos = train_x_pos.count()
len_neg = train_x_neg.count()
sample_num = min(len_pos, len_neg)
if len_pos < len_neg:
    neg = sc.parallelize(train_x_neg.takeSample(False, sample_num))
    train_balance = train_x_pos.union(neg)
else:
    pos = sc.parallelize(tain_x_pos.takeSample(False, sample_num))
    train_balance = pos.union(train_x_neg)

とすれば

>>> train_balance.count()
2020-02-09 15:43:56,483 WARN scheduler.TaskSetManager: Stage 9 contains a task of very large size (47186 KB). The maximum recommended task size is 100 KB.
9623644

と無事変換される。

4.3 消費時間

残念ながら手元の PC (memory 32G, 2.6GHz Core i7) では python コードで2.0秒、93.3秒、9.6秒かかる処理に対して(どの部分かは後述のコードを参照)、
sparkでは55.1秒、170.9秒、51.4秒で3倍程度遅かった。

4.4 使用コード

python

import time
import numpy as np
from skimage import io

T0 = time.time()
img3 = io.imread('train_annotations/train_00.png')
img31 = img3.reshape([1, -1])
img32 = img31[0]
label = np.where(img32>10, 1, 0)

img1 = io.imread('train_images/train_hh_00.jpg')
img11 = img1.reshape([1, -1])

img2 = io.imread('train_images/train_hv_00.jpg')
img21 = img2.reshape([1, -1])

train = np.vstack((img11, img21)).transpose()

print(label)
print(train)

T1 = time.time()
mask = label == 1
train_pos = train[mask]
train_neg = train[np.logical_not(mask)]
sample_num = min(len(train_pos), len(train_neg))

if len(train_pos) < len(train_neg):
    np.random.shuffle(train_neg)
else:
    np.random.shuffle(train_pos)
train_balance = np.concatenate([train_pos[:sample_num], train_neg[:sample_num]])

label_new = [1 for i in range(sample_num)] + [0 for i in range(sample_num)]
Y = np.hstack([train_balance, np.array(label_new).reshape([len(label_new),1])])

T2 = time.time()
np.random.shuffle(Y)

(train_new, label_new) = (Y[:,0:2], Y[:,2].transpose())

print(label_new)
print(train_new)

print(len(train), len(train_new))

T3 = time.time()
print(T1-T0, T2-T1, T3-T2)

Spark

import time
from skimage import io
from pyspark import SparkContext
sc = SparkContext()

T0 = time.time()

img3 = io.imread('train_annotations/train_00.png')
img31 = img3.reshape([1, -1])

img1 = io.imread('train_images/train_hh_00.jpg')
img11 = img1.reshape([1, -1])

img2 = io.imread('train_images/train_hv_00.jpg')
img21 = img2.reshape([1, -1])

spimg1 = sc.parallelize(img11[0].tolist())
spimg2 = sc.parallelize(img21[0].tolist())
spimg3 = sc.parallelize(img31[0].tolist())

T1 = time.time()
label = spimg3.map(lambda i: 1 if i>10 else 0)
spimg11 = spimg1.zip(spimg2)
spimg31 = label.zip(spimg11)
train_x_pos = spimg31.filter(lambda i: i[0]==1)
train_x_neg = spimg31.filter(lambda i: i[0]==0)

len_pos = train_x_pos.count()
len_neg = train_x_neg.count()
sample_num = min(len_pos, len_neg)
if len_pos < len_neg:
    neg = sc.parallelize(train_x_neg.takeSample(False, sample_num))
    train_balance = train_x_pos.union(neg)
else:
    pos = sc.parallelize(train_x_pos.takeSample(False, sample_num))
    train_balance = pos.union(train_x_neg)

T2 =time.time()
print(train_balance.take(10))
print(train_balance.count())

T3 = time.time()

print(T1-T0, T2-T1, T3-T2)

5. Spark SVM

Spark MLlib で SVM で学習させてみる。
使うのは

from pyspark.mllib.classification import SVMWithSGD

だが、その前にフォーマット変換が要る。
RDD で MLlib を使うには LabeledPoint に変換する必要があるらしい。さらに特徴量は dense Vector にする。
ついでにこの時点で 0~255 の画素値を [0, 1) にする、ところだが、値が極端に小さなところに偏っていたので /256 とすべきところを /16 としている。

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
labeled_train = train_balance.map(lambda i: LabeledPoint(i[0], Vectors.dense(i[1][0]/16.0, i[1][1]/16.0)))

この、 LabeledPoint の RDD をインポートした SVMWithSGD にいきなり食わせれば model が得られて、こいつで predict できる。
あとはラベルと付き合わせれば正答率がわかる。

from pyspark.mllib.classification import SVMWithSGD
model = SVMWithSGD.train(labeled_train, iterations=100)
labelsAndPreds = labeled_train.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(labeled_train.count())
print("Training Error = " + str(trainErr))

実際やってみると

Training Error = 0.4984654461449322

で何も考えずに片方に全張りしたのと変わらない結果になった。
もうちょっとというところである。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0