12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

(ソースコードメモ)PyTorchでのCPU側並列処理

Last updated at Posted at 2019-10-24

#はじめに
PyTorchのCPU側の並列処理は、ATen/Parallelで主に行う。CPUの並列処理の概要も文書に記載されている。現状の並列処理設定を確認するには、以下のスクリプトで確認できる。(PyTorch 1.2から以下のスクリプトで確認できる。)

import torch
print(torch.__config__.parallel_info()) 

Google Colabで実行すると、以下のように表示される。この場合、OpenMP設定になっており、環境変数OMP_NUM_THREADSを設定すれば、CPUの負荷を制限できる。


ATen/Parallel:
	at::get_num_threads() : 1
	at::get_num_interop_threads() : 1
OpenMP 201511 (a.k.a. OpenMP 4.5)
	omp_get_max_threads() : 1
Intel(R) Math Kernel Library Version 2019.0.4 Product Build 20190411 for Intel(R) 64 architecture applications
	mkl_get_max_threads() : 1
Intel(R) MKL-DNN v0.20.5 (Git Hash 0125f28c61c1f822fd48570b4c1066f96fcb9b2e)
std::thread::hardware_concurrency() : 2
Environment variables:
	OMP_NUM_THREADS : [not set]
	MKL_NUM_THREADS : [not set]
ATen parallel backend: OpenMP

現在は、OpenMPのみからATen/Parallelをベースとしたparallel_for/parallel_reduceへの書き換えが進行中である。ここでは、PyTorchの新規処理とともに、かつてのOpenMPを使った並列処理について説明する。なお、PyTorch 1.3時点でのデフォルトは、OpenMPである
PyTorchの処理は、データ処理演算と、データロード(DataLoader)に分かれる。データ処理演算で使われるATen/Parallelは、Pythonより下の演算処理であるため、一つのプロセスが数百%となる。そして、データローダは、num_workersで指定した数を、別プロセスとして起動している。

#PyTorch独自関数について
at::parallel_for関数やat::parallel_reduce関数で並列処理が出来る。なお、この関数のバックエンドは、OpenMPだけでなく、NativeおよびThread Building Blocks(TBB)も提供されている。ただし、コンパイル時に設定するので、実行時に変更出来ない。通常はOpenMP、モバイル系はNativeを使う設定になる。
##at::parallel_forの利用例
parallel_forは、様々な関数で用いられている。例えば、Adaptive Max Poolingでは、以下のように定義されている。該当関数の引数は、begin, end, grain_size, fの4つであった。それぞれ、0, sizeD, 0, [&]が設定されている。ちなみに、[&]は、ラムダ式のキャプチャ記法(Lambda capture)である。これにより、関数の前で設定された変数へのアクセスが内部で可能になる。


  at::parallel_for(0, sizeD, 0, [&](int64_t start, int64_t end) {
    for (auto d = start; d < end; d++)
    {
      /* loop over output */
      int64_t oh, ow;
      for(oh = 0; oh < osizeH; oh++)
      {
        int istartH = start_index(oh, osizeH, isizeH);
        int iendH   = end_index(oh, osizeH, isizeH);
        int kH = iendH - istartH;

        for(ow = 0; ow < osizeW; ow++)
        {
          int istartW = start_index(ow, osizeW, isizeW);
          int iendW   = end_index(ow, osizeW, isizeW);
          int kW = iendW - istartW;

          /* local pointers */
          scalar_t *ip = input_p   + d*istrideD + istartH*istrideH + istartW*istrideW;
          scalar_t *op = output_p  + d*osizeH*osizeW + oh*osizeW + ow;
          int64_t *indp = ind_p   + d*osizeH*osizeW + oh*osizeW + ow;

          /* compute local max: */
          int64_t maxindex = -1;
          scalar_t maxval = -std::numeric_limits<float>::max();
          int ih, iw;
          for(ih = 0; ih < kH; ih++)
          {
            for(iw = 0; iw < kW; iw++)
            {
              scalar_t val = *(ip + ih*istrideH + iw*istrideW);
              if ((val > maxval) || std::isnan(val))
              {
                maxval = val;
                maxindex = (ih+istartH)*isizeW + (iw+istartW);
              }
            }
          }

          /* set output to local max */
          *op = maxval;

          /* store location of max */
          *indp = maxindex;
        }
      }
    }
  });

OpenMPについて

OpenMPは、fork-joinモデルにより、並列化している。コンパイラディレクティブである。以下で示す#pragma ompで処理をコンパイラに指示しバイナリを生成する。gccの場合、実体の処理はgcc同梱のlibgompでおこなっている。
さて、OpenMPは、PyTorchでは、ATenの中のat::parallel_forのバックエンドや、従前のTHTensor直で用いている。

##OpenMPのフラグをオンにする
gccコンパイル時に、-fopenmpで宣言を行う。このフラグにより、#pragma ompの宣言が可能になる。

##OpenMPの使い方
ここでは、

  • OpenMPの設定
  • OpenMPの直接呼出し(コーディング)

について説明する。

###環境変数の設定
デフォルトでは、コア数(もしくはハイパースレッドを考慮してx2)が指定される。しかし、必要なら、OMP_NUM_THREADSで設定することが出来る。

###ソースコードでの記述
#pragma omp以下で定義できるパラメータは、libgompのマニュアルの10 The libgomp ABIに記載されている。また、OpenMPのマニュアルのparallel構造体にも仕様として記載されている。

parallel

以下のように並列処理を記述する


 #pragma omp parallel
  {
    body;
  }

for

[parallel for]
(https://gcc.gnu.org/onlinedocs/libgomp/Implementing-FOR-construct.html#Implementing-FOR-construct)による記載もできる。forで示されたループごとに、複数の計算資源に渡す。以下のコードの場合、`ub - lb`個に分けられ処理が行われる。


#pragma omp parallel for
  for (i = lb; i <= ub; i++)
    body;

private

変数の共有可否の指定をprivate等で行う。ここでは、変数kがスレッド毎に別々に利用されることを示す。
この為、このループの外側で定義された変数(k)と、#pragma omp内の変数は関係しない。


#pragma omp parallel for private(k)
    for (k = 0; k < r_->size(0)*r_->size(1); k++) {
    {
      scalar_t* ptr_output = output_data + k*nOutputCols*nOutputRows;
      int64_t l;
      for (l = 0; l < nOutputRows*nOutputCols; l++)
        ptr_output[l] = 0.0;
    }
  }
参考資料

critical

同一名称(ここだとblasgemm)のクリティカルセクションに入るのを防ぐ宣言

#pragma omp critical(blasgemm)
   body:

#####参考資料

OpenMPの利用例

at::parallel_forのバックエンドで使われている。なお、divupは、ATen/Parallel.hで定義されている。このコードでわかるように、スレッド数に設定した範囲で処理を分配して実行する。


template <class F>
inline void parallel_for(
    const int64_t begin,
    const int64_t end,
    const int64_t grain_size,
    const F& f) {
  TORCH_CHECK(grain_size >= 0);
  if (begin >= end) {
    return;
  }
#ifdef _OPENMP
  std::atomic_flag err_flag = ATOMIC_FLAG_INIT;
  std::exception_ptr eptr;
#pragma omp parallel if (!omp_in_parallel() && ((end - begin) >= grain_size))
  {
    int64_t num_threads = omp_get_num_threads();
    int64_t tid = omp_get_thread_num();
    int64_t chunk_size = divup((end - begin), num_threads);
    int64_t begin_tid = begin + tid * chunk_size;
    if (begin_tid < end) {
      try {
        f(begin_tid, std::min(end, chunk_size + begin_tid));
      } catch (...) {
        if (!err_flag.test_and_set()) {
          eptr = std::current_exception();
        }
      }
    }
  }
  if (eptr) {
    std::rethrow_exception(eptr);
  }
#else
  f(begin, end);
#endif
}

#参考資料

PyTorch

OpenMP

C++

12
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?