svmの実験2の続き
6. 前処理ベンチマーク
scikit版とspark版で、特に前処理の消費時間について確認したかったので、ほぼ同内容になる様にscriptを記述した。
6.1 使用したscript
spark版
from pyspark import SparkContext
sc = SparkContext()
import time
from skimage import io
T0 = time.time()
img1 = io.imread('train_images/train_hh_00.jpg')
img2 = io.imread('train_images/train_hv_00.jpg')
img3 = io.imread('train_annotations/train_00.png')
T1 = time.time()
print('1: read 3image files', T1-T0)
img11 = img1.reshape([1, -1])
img21 = img2.reshape([1, -1])
img31 = img3.reshape([1, -1])
spimg1 = sc.parallelize(img11[0].tolist())
spimg2 = sc.parallelize(img21[0].tolist())
spimg3 = sc.parallelize(img31[0].tolist())
T2 =time.time()
print('2: import to spark RDD', T2-T1)
label = spimg3.map(lambda i: 1 if i>10 else 0)
spimg11 = spimg1.zip(spimg2)
spimg31 = label.zip(spimg11)
train_x_pos = spimg31.filter(lambda i: i[0]==1)
train_x_neg = spimg31.filter(lambda i: i[0]==0)
len_pos = train_x_pos.count()
len_neg = train_x_neg.count()
sample_num = min(len_pos, len_neg)
if len_pos < len_neg:
neg = sc.parallelize(train_x_neg.takeSample(False, sample_num))
train_balance = train_x_pos.union(neg)
else:
pos = sc.parallelize(train_x_pos.takeSample(False, sample_num))
train_balance = pos.union(train_x_neg)
T3 = time.time()
print('3: extract in spark RDD', T3-T2)
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
labeled_train = train_balance.map(lambda i: LabeledPoint(i[0], Vectors.dense(i[1][0]/16.0, i[1][1]/16.0)))
# print(labeled_train.take(10))
# print(labeled_train.count())
T4 = time.time()
print('4: convert to LabeledPoint', T4-T3)
from pyspark.mllib.classification import SVMWithSGD, SVMModel
# print('sample size', sample_num*2)
labeled_train.persist()
model = SVMWithSGD.train(labeled_train, iterations=100)
# https://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machines-svms
labelsAndPreds = labeled_train.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(labeled_train.count())
print("Training Error = " + str(trainErr))
T5 = time.time()
print('5: Train 100 steps', T5-T4)
# 答えあわせ
origimg = spimg31.map(lambda i: LabeledPoint(i[0], Vectors.dense(i[1][0]/16.0, i[1][1]/16.0)))
labelsAndPreds = origimg.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(origimg.count())
print("Training Error = " + str(trainErr))
T6 = time.time()
print('6: judge', T6-T5)
print('Total Time', T6-T0)
scikit-learn版
import numpy as np
import time
from skimage import io
T0 = time.time()
img1 = io.imread('train_images/train_hh_00.jpg')
img2 = io.imread('train_images/train_hv_00.jpg')
img3 = io.imread('train_annotations/train_00.png')
T1 = time.time()
print('1: read 3image files', T1-T0)
img11 = img1.reshape([1, -1])
img21 = img2.reshape([1, -1])
img31 = img3.reshape([1, -1])
label = np.where(img31[0]>10, 1, 0)
train = np.vstack((img11, img21)).transpose()
mask = label == 1
train_x_pos = train[mask]
train_x_neg = train[np.logical_not(mask)]
sample = min(len(train_x_pos), len(train_x_neg))
label_balance = [1 for i in range(sample)] + [0 for i in range(sample)]
print('shrink length to', len(label_balance))
if len(train_x_pos) < len(train_x_neg):
np.random.shuffle(train_x_neg)
else:
np.random.shuffle(train_x_pos)
train_balance = np.concatenate([train_x_pos[:sample], train_x_neg[:sample]])
labeled_train = np.hstack([train_balance, np.array(label_balance).reshape([len(label_balance),1])])
np.random.shuffle(labeled_train)
train_X = labeled_train[:,0:2]
train_Y = labeled_train[:,2].transpose()
train_X2 = train_X / 16.0
T3 = time.time()
print('3: extract by numpy', T3-T1)
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import sklearn.svm
svm = sklearn.svm.SVC(kernel='rbf', random_state=0, gamma=0.10, C=10.0)
M = len(train_X2)
L=65536
offset=0
y_pred = []
start_time = time.time()
while offset < M:
#print('fitting', offset, '/', offset / M, time.time() - start_time, "sec")
L2 = min(offset+L, M)
#print(train_X[offset:L2], train_Y[offset:L2])
svm.fit(train_X2[offset:L2], train_Y[offset:L2])
_Y = svm.predict(train_X2[offset:L2])
y_pred.append(_Y)
offset += L
predicted = np.concatenate(y_pred).reshape(train_Y.shape)
n = np.count_nonzero(train_Y!=predicted)
print("Training Error = " + str(n / len(train_Y)))
T5 = time.time()
print('5: Train ', T5-T3)
# 答えあわせ
test_x = train / 16.0
M = len(test_x)
L=65536
offset=0
start_time = time.time()
y_pred = []
withView = False
while offset < M:
#print('prediciting', offset, '/', offset / M, time.time() - start_time, "sec")
L2 = min(offset+L, M)
_Y = svm.predict(test_x[offset:L2])
y_pred.append(_Y)
if withView:
mask = y_pred[0] == 1
test_x_pos = test_x[offset:L2][mask]
test_x_neg = test_x[offset:L2][np.logical_not(mask)]
print('now plotting')
plt.scatter(test_x_pos.transpose()[0], test_x_pos.transpose()[1])
plt.scatter(test_x_neg.transpose()[0], test_x_neg.transpose()[1])
plt.show()
#print('Misclassified samples: %d' % (Y[offset:L2] != _Y).sum())
#print('Accuracy: %.2f' % accuracy_score(Y[offset:L2], _Y))
offset += L
predicted = np.concatenate(y_pred).reshape(label.shape)
n = np.count_nonzero(label!=predicted)
print("Training Error = " + str(n / len(label)))
T6 = time.time()
print('6: judge', T6-T5)
print('Total Time', T6-T0)
6.2 結果
計算環境: Mac (Mem 32G, CPU core i7-2.6GHz x 6core)
stage | spark | scikit(python) | |
---|---|---|---|
1 | read 3image files | 1.563331127166748 | 1.551009178161621 |
2 | import to spark RDD | 52.45361089706421 | |
3 | extract in spark RDD | 161.4244680404663 | 99.94895792007446 |
4 | convert to LabeledPoint | 0.17891883850097656 | |
5 | Train 100 steps | 187.9373481273651 | 1317.0479888916016 |
6 | judge | 365.22955799102783 | 3557.4639291763306 |
Total Time | 768.7872350215912 | 4976.011885166168 |
学習過程でのerror rateはspark版が0.4984871634902538、scikit版が0.021082346770100804で、
画像全体に対してpredictして答えあわせしてみた結果はspark版が0.9468502424514323で、scikit版が0.016081751924169278だった。
spark版ではerror rateが大きすぎおそらく有意な学習を行っていない。
SVMの学習部分については計算回数などを合わせていないので時間で比較する意味がない。
学習に使うpiexelを元画像から抽出する過程は、spark版が52.5+161.4secで213.9secだったのに対し、scikit版(このあたりの抽出過程はnumpy処理のみ)では99.9secだったのでsparkに移植して並列処理させることで処理時間を圧縮することには成功していない。
計算環境: AWS EC2 (c5.9xlarge; Mem 72G, CPU 36core)
同じことを AWS EC2 で行った。使ったインスタンスは c5.9xlarge でMem 72G, CPU 36coreであった。
stage | spark | scikit(python) | |
---|---|---|---|
1 | read 3image files | 1.5626544952392578 | 1.5453143119812012 |
2 | import to spark RDD | 36.799010276794434 | |
3 | extract in spark RDD | 42.08454918861389 | 91.7525577545166 |
4 | convert to LabeledPoint | 0.1362450122833252 | |
5 | Train 100 steps | 118.87054777145386 | 1935.4138050079346 |
6 | judge | 68.1303505897522 | 5847.8766713142395 |
Total Time | 267.58335733413696 | 7876.588348388672 |
この場合は、RDD上での画像全pixelからの抽出処理は36.8+42.1=78.9secでnumpy単独の91.8secより短縮できている。
またscikit版のsvmの学習と判定の計算時間、1317秒と3557秒をデータサイズ9623644対95685968で正規化すると
> 1317/9623644
0.00013685044874893544
> 3557/95685968
0.00003717368465144231
で、学習には1点あたり判定の3.68倍程度時間がかかっていた。
6.3 spark版とscikit版のSVMの計算パラメータの違い
spark svmのデフォルトパラメータ
iterations – The number of iterations. (default: 100)
convergenceTol – A condition which decides iteration termination. (default: 0.001)
scikit-learn svmのデフォルトパラメータ
max_iterint, optional (default=-1)
Hard limit on iterations within solver, or -1 for no limit.
tolfloat, optional (default=1e-3)
Tolerance for stopping criterion.
Toleranceはいずれも0.001、1e-3だが、max iterationが spark SVM では100になっているが
scikit-learnでは-1で無制限になっている。
このため、 spark では打ち切りが早すぎるのではと疑われる。
ただし、生憎 spark svm には max iteration 無制限とするオプションはなさそう。
6.4 spark版でiterationを重ねた結果
以下のようにして iteration を重ねて weight パラメータと Error の推移を確認した。
sumi=0
i=10
for _i in range(8):
T51 = time.time()
if i==10:
model = SVMWithSGD.train(labeled_train, iterations=i)
else:
model = SVMWithSGD.train(labeled_train, iterations=i, initialWeights=model.weights)
T52 = time.time()
labelsAndPreds = labeled_train.map(lambda p: (p.label, model.predict(p.features)) )
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(labeled_train.count())
t2 = labelsAndPreds.filter(lambda lp: 0 == lp[1]).count()
T53 = time.time()
sumi += i
print(str(sumi)+' iters', T52 - T51, T53-T52, "Training Error = " + str(trainErr), t2, model)
i = sumi
T5 = time.time()
print('5: Train '+str(sumi)+'steps', T5-T4)
# 答えあわせ
origimg = spimg31.map(lambda i: LabeledPoint(i[0], Vectors.dense(i[1][0]/16.0, i[1][1]/16.0)))
labelsAndPreds = origimg.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(origimg.count())
print("Training Error = " + str(trainErr))
結果は以下のようになって、
Training Error = 0.49849028081254876は変わらないが、 weights=で表示されるパラメータは微妙に変化して行ってはいる。
class が 0 と判定されたケースは14338で変化しない。
データが、同じ画素値範囲でも0 であったり 1 であったりして、明確に0と分かるものだけ省く識別器になっていたのかもしれない。
10iters 75.10275387763977 44.93026399612427 Training Error = 0.49851179033638404 14338 (weights=[0.821124114549579,0.5460179446798151], intercept=0.0)
20iters 18.078901052474976 45.5715069770813 Training Error = 0.49851179033638404 14338 (weights=[0.821124114549579,0.5460179446798151], intercept=0.0)
40iters 25.66639018058777 42.937142848968506 Training Error = 0.49851179033638404 14338 (weights=[0.7679792225008205,0.7147579082559438], intercept=0.0)
80iters 44.65957999229431 43.41538667678833 Training Error = 0.49851179033638404 14338 (weights=[0.6691135849368834,0.9089417111916064], intercept=0.0)
160iters 80.68873810768127 43.846691608428955 Training Error = 0.49851179033638404 14338 (weights=[0.5571197590497332,1.1270377433296916], intercept=0.0)
320iters 55.14544892311096 41.29570007324219 Training Error = 0.49851179033638404 14338 (weights=[0.4932743427011136,1.2653979904178942], intercept=0.0)
640iters 38.29077696800232 40.86665320396423 Training Error = 0.49851179033638404 14338 (weights=[0.4568406884303467,1.3549579858027803], intercept=0.0)
1280iters 29.07907772064209 45.410537004470825 Training Error = 0.49851179033638404 14338 (weights=[0.42603926445867824,1.414722787934614], intercept=0.0)
5: Train 1280steps 715.0053498744965
Training Error = 0.9468502424514323