Python3

お手軽にデータ処理を高速化するライブラリ

はじめに

この記事では、機械学習などで膨大なデータ処理する際、
お手軽に高速化するための、便利なライブラリについてまとめます。

なお、機械学習などでは、numpy, pandasを使うことが多いと思います。
ここでは書きませんが、これらは書き方次第で高速化が可能です。

もしデータ処理に時間がかかっているなら、
先にnumpy, pandasの書き方を
見直すべきです。というより、見直さないと、便利なライブラリを使っても、満足いく結果が得られないと思います。

ライブラリ

メモ化

関数の入力と戻り値の対応をメモリに保存し、処理を高速化します。

functools.lru_cache

計算を行う関数calcを何度も実行します。

import time
import random

def calc(x):
    y = 0
    for _ in range(100):
        y += x + x^2
    return y

# 実行
s = time.time()
for _ in range(10000):
    calc(random.randint(0, 10))
print(time.time() - s)

この関数calcにfunctools.lru_cacheというデコレータを付与するだけで、メモ化できます。

import time
import random
from functools import lru_cache

@lru_cache(maxsize=1024)
def calc(x):
    y = 0
    for _ in range(100):
        y += x + x^2
    return y

s = time.time()

# 実行
for _ in range(10000):
    calc(random.randint(0, 10))

print(time.time() - s)

print(calc.cache_info()) # cache状況の表示
calc.cache_clear()       # cacheのクリア
print(calc.cache_info()) # cacheの状況の表示

この処理では、おおよそ、5倍程度早くなります。

  • 不要になったらcacheをクリアしておきましょう。
  • maxsizeは、直近の呼び出しをどこまで記録するかです。Noneの場合は無制限なので注意。2の累乗が好ましいらしい。

numpyの実行を高速化

numba.jit

PythonのコードをJIT(just-in-time)コンパイルすることで高速化します。

numpyの計算calcを実行します。あえて、forを回す必要がある処理にしておきます。

import numpy as np
import time

def calc(arr):
    result = 0
    for i in range(arr.shape[0]):
        for j in range(arr.shape[1]):
            result += arr[i, j]
    return result

s = time.time()
calc(np.random.rand(10000, 1000))
print(time.time() - s)

この関数calcにnumba.jitというデコレータを付与するだけで、jitコンパイルされます。

import numpy as np
import time
from numba import jit

@jit
def calc(arr):
    result = 0
    for i in range(arr.shape[0]):
        for j in range(arr.shape[1]):
            result += arr[i, j]
    return result

s = time.time()
calc(np.random.rand(10000, 1000))
print(time.time() - s)

この処理では、おおよそ、10倍程度早くなります。

  • pandasは無効なので、一旦numpyにして使いましょう。

並列処理

複数のcpuで並列処理を行うことで高速化します。

joblib.Parallel

import numpy as np
import time

def sleep(arr):
    for _ in range(arr.shape[0]):    
        time.sleep(0.01)
    return arr

s = time.time()
sleep(np.random.rand(100, 10000))
print(time.time() - s)

引数を並列数に応じて分割した上で、それぞれを並列処理します。

import numpy as np
from joblib import Parallel, delayed, cpu_count
import time

def sleep(arr):
    for _ in range(arr.shape[0]):    
        time.sleep(0.01)
    print(arr.shape)
    return arr

def split_arr(arr, n):
    for _arr in np.array_split(arr, n):
        yield _arr

s = time.time()

cpus = cpu_count()
print(cpus)

results = Parallel(n_jobs=cpus)(
    [delayed(sleep)(x) for x in split_arr(np.random.rand(100, 10000), cpus)]
)

y = np.vstack(results)
print(y.shape)
print(time.time() - s)

この処理では、cpuを4つ使っており、おおよそ、4倍程度早くなります。

以上

他に何か見つかったら追加していこうと思います。