3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

svmとsparkの実験2

Posted at

svmの実験2の続き

6. 前処理ベンチマーク

scikit版とspark版で、特に前処理の消費時間について確認したかったので、ほぼ同内容になる様にscriptを記述した。

6.1 使用したscript

spark版

from pyspark import SparkContext
sc = SparkContext()
import time
from skimage import io

T0 = time.time()

img1 = io.imread('train_images/train_hh_00.jpg')
img2 = io.imread('train_images/train_hv_00.jpg')
img3 = io.imread('train_annotations/train_00.png')

T1 = time.time()
print('1: read 3image files', T1-T0)

img11 = img1.reshape([1, -1])
img21 = img2.reshape([1, -1])
img31 = img3.reshape([1, -1])

spimg1 = sc.parallelize(img11[0].tolist())
spimg2 = sc.parallelize(img21[0].tolist())
spimg3 = sc.parallelize(img31[0].tolist())

T2 =time.time()
print('2: import to spark RDD', T2-T1)

label = spimg3.map(lambda i: 1 if i>10 else 0)
spimg11 = spimg1.zip(spimg2)
spimg31 = label.zip(spimg11)
train_x_pos = spimg31.filter(lambda i: i[0]==1)
train_x_neg = spimg31.filter(lambda i: i[0]==0)

len_pos = train_x_pos.count()
len_neg = train_x_neg.count()
sample_num = min(len_pos, len_neg)
if len_pos < len_neg:
    neg = sc.parallelize(train_x_neg.takeSample(False, sample_num))
    train_balance = train_x_pos.union(neg)
else:
    pos = sc.parallelize(train_x_pos.takeSample(False, sample_num))
    train_balance = pos.union(train_x_neg)

T3 = time.time()
print('3: extract in spark RDD', T3-T2)


from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
labeled_train = train_balance.map(lambda i: LabeledPoint(i[0], Vectors.dense(i[1][0]/16.0, i[1][1]/16.0)))

# print(labeled_train.take(10))
# print(labeled_train.count())
T4 = time.time()
print('4: convert to LabeledPoint', T4-T3)

from pyspark.mllib.classification import SVMWithSGD, SVMModel

# print('sample size', sample_num*2)
labeled_train.persist()
model = SVMWithSGD.train(labeled_train, iterations=100)

# https://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machines-svms
labelsAndPreds = labeled_train.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(labeled_train.count())
print("Training Error = " + str(trainErr))

T5 = time.time()
print('5: Train 100 steps', T5-T4)

# 答えあわせ
origimg = spimg31.map(lambda i: LabeledPoint(i[0], Vectors.dense(i[1][0]/16.0, i[1][1]/16.0)))
labelsAndPreds = origimg.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(origimg.count())
print("Training Error = " + str(trainErr))

T6 = time.time()
print('6: judge', T6-T5)

print('Total Time', T6-T0)

scikit-learn版

import numpy as np
import time
from skimage import io

T0 = time.time()

img1 = io.imread('train_images/train_hh_00.jpg')
img2 = io.imread('train_images/train_hv_00.jpg')
img3 = io.imread('train_annotations/train_00.png')

T1 = time.time()
print('1: read 3image files', T1-T0)

img11 = img1.reshape([1, -1])
img21 = img2.reshape([1, -1])
img31 = img3.reshape([1, -1])

label = np.where(img31[0]>10, 1, 0)
train = np.vstack((img11, img21)).transpose()

mask = label == 1
train_x_pos = train[mask]
train_x_neg = train[np.logical_not(mask)]
sample = min(len(train_x_pos), len(train_x_neg))
label_balance = [1 for i in range(sample)] + [0 for i in range(sample)]
print('shrink length to', len(label_balance))
if len(train_x_pos) < len(train_x_neg):
    np.random.shuffle(train_x_neg)
else:
    np.random.shuffle(train_x_pos)

train_balance = np.concatenate([train_x_pos[:sample], train_x_neg[:sample]])

labeled_train = np.hstack([train_balance, np.array(label_balance).reshape([len(label_balance),1])])
np.random.shuffle(labeled_train)

train_X = labeled_train[:,0:2]
train_Y = labeled_train[:,2].transpose()
train_X2 = train_X / 16.0

T3 = time.time()
print('3: extract by numpy', T3-T1)

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import sklearn.svm

svm = sklearn.svm.SVC(kernel='rbf', random_state=0, gamma=0.10, C=10.0)

M = len(train_X2)
L=65536
offset=0
y_pred = []
start_time = time.time()
while offset < M:
    #print('fitting', offset, '/', offset / M, time.time() - start_time, "sec")
    L2 = min(offset+L, M)
    #print(train_X[offset:L2], train_Y[offset:L2])
    svm.fit(train_X2[offset:L2], train_Y[offset:L2])
    _Y = svm.predict(train_X2[offset:L2])
    y_pred.append(_Y)
    offset += L

predicted = np.concatenate(y_pred).reshape(train_Y.shape)
n = np.count_nonzero(train_Y!=predicted)
print("Training Error = " + str(n / len(train_Y)))
T5 = time.time()
print('5: Train ', T5-T3)


# 答えあわせ
test_x = train / 16.0
M = len(test_x)
L=65536
offset=0
start_time = time.time()
y_pred = []
withView = False
while offset < M:
    #print('prediciting', offset, '/', offset / M, time.time() - start_time, "sec")
    L2 = min(offset+L, M)
    _Y = svm.predict(test_x[offset:L2])
    y_pred.append(_Y)
    if withView:
        mask = y_pred[0] == 1
        test_x_pos = test_x[offset:L2][mask]
        test_x_neg = test_x[offset:L2][np.logical_not(mask)]
        print('now plotting')
        plt.scatter(test_x_pos.transpose()[0], test_x_pos.transpose()[1])
        plt.scatter(test_x_neg.transpose()[0], test_x_neg.transpose()[1])
        plt.show()

    #print('Misclassified samples: %d' % (Y[offset:L2] != _Y).sum())
    #print('Accuracy: %.2f' % accuracy_score(Y[offset:L2], _Y))
    offset += L

predicted = np.concatenate(y_pred).reshape(label.shape)
n = np.count_nonzero(label!=predicted)
print("Training Error = " + str(n / len(label)))

T6 = time.time()
print('6: judge', T6-T5)

print('Total Time', T6-T0)

6.2 結果

計算環境: Mac (Mem 32G, CPU core i7-2.6GHz x 6core)

stage spark scikit(python)
1 read 3image files 1.563331127166748 1.551009178161621
2 import to spark RDD 52.45361089706421
3 extract in spark RDD 161.4244680404663 99.94895792007446
4 convert to LabeledPoint 0.17891883850097656
5 Train 100 steps 187.9373481273651 1317.0479888916016
6 judge 365.22955799102783 3557.4639291763306
Total Time 768.7872350215912 4976.011885166168

学習過程でのerror rateはspark版が0.4984871634902538、scikit版が0.021082346770100804で、
画像全体に対してpredictして答えあわせしてみた結果はspark版が0.9468502424514323で、scikit版が0.016081751924169278だった。
spark版ではerror rateが大きすぎおそらく有意な学習を行っていない。

SVMの学習部分については計算回数などを合わせていないので時間で比較する意味がない。
学習に使うpiexelを元画像から抽出する過程は、spark版が52.5+161.4secで213.9secだったのに対し、scikit版(このあたりの抽出過程はnumpy処理のみ)では99.9secだったのでsparkに移植して並列処理させることで処理時間を圧縮することには成功していない。

計算環境: AWS EC2 (c5.9xlarge; Mem 72G, CPU 36core)

同じことを AWS EC2 で行った。使ったインスタンスは c5.9xlarge でMem 72G, CPU 36coreであった。

stage spark scikit(python)
1 read 3image files 1.5626544952392578 1.5453143119812012
2 import to spark RDD 36.799010276794434
3 extract in spark RDD 42.08454918861389 91.7525577545166
4 convert to LabeledPoint 0.1362450122833252
5 Train 100 steps 118.87054777145386 1935.4138050079346
6 judge 68.1303505897522 5847.8766713142395
Total Time 267.58335733413696 7876.588348388672

この場合は、RDD上での画像全pixelからの抽出処理は36.8+42.1=78.9secでnumpy単独の91.8secより短縮できている。

またscikit版のsvmの学習と判定の計算時間、1317秒と3557秒をデータサイズ9623644対95685968で正規化すると

> 1317/9623644
0.00013685044874893544
> 3557/95685968
0.00003717368465144231

で、学習には1点あたり判定の3.68倍程度時間がかかっていた。

6.3 spark版とscikit版のSVMの計算パラメータの違い

spark svmのデフォルトパラメータ

iterations – The number of iterations. (default: 100)
convergenceTol – A condition which decides iteration termination. (default: 0.001)

scikit-learn svmのデフォルトパラメータ

max_iterint, optional (default=-1)
Hard limit on iterations within solver, or -1 for no limit.
tolfloat, optional (default=1e-3)
Tolerance for stopping criterion.

Toleranceはいずれも0.001、1e-3だが、max iterationが spark SVM では100になっているが
scikit-learnでは-1で無制限になっている。
このため、 spark では打ち切りが早すぎるのではと疑われる。

ただし、生憎 spark svm には max iteration 無制限とするオプションはなさそう。

6.4 spark版でiterationを重ねた結果

以下のようにして iteration を重ねて weight パラメータと Error の推移を確認した。

sumi=0
i=10
for _i in range(8):
    T51 = time.time()
    if i==10:
        model = SVMWithSGD.train(labeled_train, iterations=i)
    else:
        model = SVMWithSGD.train(labeled_train, iterations=i, initialWeights=model.weights)

    T52 = time.time()
    labelsAndPreds = labeled_train.map(lambda p: (p.label, model.predict(p.features)) )
    trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(labeled_train.count())
    t2 = labelsAndPreds.filter(lambda lp: 0 == lp[1]).count()
    T53 = time.time()
    sumi += i
    print(str(sumi)+' iters', T52 - T51, T53-T52, "Training Error = " + str(trainErr), t2, model)
    i = sumi

T5 = time.time()
print('5: Train '+str(sumi)+'steps', T5-T4)


# 答えあわせ
origimg = spimg31.map(lambda i: LabeledPoint(i[0], Vectors.dense(i[1][0]/16.0, i[1][1]/16.0)))
labelsAndPreds = origimg.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(origimg.count())
print("Training Error = " + str(trainErr))

結果は以下のようになって、
Training Error = 0.49849028081254876は変わらないが、 weights=で表示されるパラメータは微妙に変化して行ってはいる。
class が 0 と判定されたケースは14338で変化しない。
データが、同じ画素値範囲でも0 であったり 1 であったりして、明確に0と分かるものだけ省く識別器になっていたのかもしれない。

10iters 75.10275387763977 44.93026399612427 Training Error = 0.49851179033638404 14338 (weights=[0.821124114549579,0.5460179446798151], intercept=0.0)
20iters 18.078901052474976 45.5715069770813 Training Error = 0.49851179033638404 14338 (weights=[0.821124114549579,0.5460179446798151], intercept=0.0)
40iters 25.66639018058777 42.937142848968506 Training Error = 0.49851179033638404 14338 (weights=[0.7679792225008205,0.7147579082559438], intercept=0.0)
80iters 44.65957999229431 43.41538667678833 Training Error = 0.49851179033638404 14338 (weights=[0.6691135849368834,0.9089417111916064], intercept=0.0)
160iters 80.68873810768127 43.846691608428955 Training Error = 0.49851179033638404 14338 (weights=[0.5571197590497332,1.1270377433296916], intercept=0.0)
320iters 55.14544892311096 41.29570007324219 Training Error = 0.49851179033638404 14338 (weights=[0.4932743427011136,1.2653979904178942], intercept=0.0)
640iters 38.29077696800232 40.86665320396423 Training Error = 0.49851179033638404 14338 (weights=[0.4568406884303467,1.3549579858027803], intercept=0.0)
1280iters 29.07907772064209 45.410537004470825 Training Error = 0.49851179033638404 14338 (weights=[0.42603926445867824,1.414722787934614], intercept=0.0)
5: Train 1280steps 715.0053498744965
Training Error = 0.9468502424514323

fig-1
spark svmでiterationを重ねた時のweights parameterの推移。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?