LoginSignup
2
2

More than 5 years have passed since last update.

C++による二次元配列の転置処理について

Last updated at Posted at 2019-05-01

C++で2次元配列の転置を行う処理について解説します。

前書き

入力を上書きせずに別の領域に出力する out-of-place 方式の実装です。

二次元配列の要素のサイズが1バイトの場合用のコードを載せています。2バイトや4バイトや8バイト向けの実装もほぼ同様のやり方で実現出来ます。

横着して Windows 向けのコードを一部使っていてプラットフォーム非依存な記述になっていません。

ナイーブな実装

template <typename T>
void transpose_naive(size_t columns, size_t rows,
                     const T* __restrict pSrc, ptrdiff_t srcLineStride,
                     T* __restrict pDst, ptrdiff_t dstLineStride) noexcept
{
  for (size_t x = 0; x < columns; ++x) {
    for (size_t y = 0; y < rows; ++y) {
      pDst[(x * dstLineStride) + y] = pSrc[(y * srcLineStride) + x];
    }
  }
}

上記のナイーブな実装では2重ループを使用しています。外側が列方向のループで内側が行方向のループ、という建前ですが転置処理なので入力と出力では縦横の立場が逆転します。

内側のループでは入力の二次元配列の要素を読み取る方向が縦方向になっており1要素単位で読み書きしているのでキャッシュの利用がされにくく性能が出ません。ループの順番を入れ替えると入力の二次元配列の要素を読み取る方向が横方向になりますが、出力の二次元配列の要素を書き込む方向が縦方向になり結局性能が出ません。

1要素単位で読み書きしているのも性能が出ない原因ですが、結局メモリアクセスが問題になっています。

キャッシュ・ブロッキングによる最適化

template <typename T>
__forceinline
void transpose(size_t width, size_t height,
               const T* __restrict pSrc, ptrdiff_t srcLineStride,
               T* __restrict pDst, ptrdiff_t dstLineStride) noexcept
{
  if (width == 0
    || height == 0
    || pSrc == nullptr
    || pDst == nullptr
    || srcLineStride < width
    || dstLineStride < height)
  {
    return;
  }

  if (width >= 16/sizeof(T) && height >= 16) {
    size_t bw = std::min<size_t>(128u/sizeof(T), flp2(width));
    size_t bh = std::min<size_t>(512u/sizeof(T), flp2(height));
    size_t nw = width / bw;
    size_t nh = height / bh;
    for (size_t i = 0; i < nh; ++i) {
      for (size_t j = 0; j < nw; ++j) {
        auto pSrc2 = &pSrc[i*bh*srcLineStride + j*bw];
        auto pDst2 = &pDst[j*bw*dstLineStride + i*bh];
        constexpr size_t n = 16;
        for (size_t l = 0; l < bh/n; ++l) {
          for (size_t k = 0; k < bw/n; ++k) {
            auto pSrc3 = &pSrc2[l*n*srcLineStride + k*n];
            auto pDst3 = &pDst2[k*n*dstLineStride + l*n];
            transpose_1b_16x16(pSrc3, srcLineStride, pDst3, dstLineStride);
          }
        }
      }
    }
    size_t widthRemain = width % bw;
    size_t heightRemain = height % bh;
    size_t width2 = bw * nw;
    size_t height2 = bh * nh;
    if (widthRemain) {
      transpose(widthRemain, height2, pSrc + width2, srcLineStride, pDst + (width2 * dstLineStride), dstLineStride);
    }
    if (heightRemain) {
      transpose(width, heightRemain, pSrc + height2 * srcLineStride, srcLineStride, pDst + height2, dstLineStride);
    }
  }
  else {
    transpose_naive(width, height, pSrc, srcLineStride, pDst, dstLineStride);
  }
}

上記の実装では入力の二次元配列を16×16要素の領域毎に分けて transpose_1b_16x16 関数を呼び出しています。本来は2重ループのところをループを分割して4重ループにしているのはメモリアクセスの局所性を高める対策です。

flp2 関数は引数に渡した値以下の最も大きい2の累乗値を返す処理です。

// https://stackoverflow.com/a/2681094
__forceinline
uint32_t flp2(uint32_t x)
{
  x = x | (x >> 1);
  x = x | (x >> 2);
  x = x | (x >> 4);
  x = x | (x >> 8);
  x = x | (x >> 16);
  return x - (x >> 1);
}

AVX2 による最適化

AVX2を使って最適化する事でスカラー実装と比べてインストラクション数を削減する事が出来ます。

最内周で呼び出している transpose_1b_16x16 関数の実装は下記のとおりです。

__forceinline
void transpose_1b_16x16(const uint8_t* __restrict pSrc, ptrdiff_t srcLineStride,
                        uint8_t* __restrict pDst, ptrdiff_t dstLineStride) noexcept
{
  __m256i a0 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 8 * srcLineStride), (__m128i const*)(pSrc + 0 * srcLineStride));
  __m256i a1 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 9 * srcLineStride), (__m128i const*)(pSrc + 1 * srcLineStride));
  __m256i a2 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 10 * srcLineStride), (__m128i const*)(pSrc + 2 * srcLineStride));
  __m256i a3 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 11 * srcLineStride), (__m128i const*)(pSrc + 3 * srcLineStride));
  __m256i a4 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 12 * srcLineStride), (__m128i const*)(pSrc + 4 * srcLineStride));
  __m256i a5 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 13 * srcLineStride), (__m128i const*)(pSrc + 5 * srcLineStride));
  __m256i a6 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 14 * srcLineStride), (__m128i const*)(pSrc + 6 * srcLineStride));
  __m256i a7 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 15 * srcLineStride), (__m128i const*)(pSrc + 7 * srcLineStride));
  __m256i b0 = _mm256_unpacklo_epi8(a0, a1);
  __m256i b1 = _mm256_unpacklo_epi8(a2, a3);
  __m256i b2 = _mm256_unpacklo_epi8(a4, a5);
  __m256i b3 = _mm256_unpacklo_epi8(a6, a7);
  __m256i b4 = _mm256_unpackhi_epi8(a0, a1);
  __m256i b5 = _mm256_unpackhi_epi8(a2, a3);
  __m256i b6 = _mm256_unpackhi_epi8(a4, a5);
  __m256i b7 = _mm256_unpackhi_epi8(a6, a7);
  a0 = _mm256_unpacklo_epi16(b0, b1);
  a1 = _mm256_unpacklo_epi16(b2, b3);
  a2 = _mm256_unpackhi_epi16(b0, b1);
  a3 = _mm256_unpackhi_epi16(b2, b3);
  a4 = _mm256_unpacklo_epi16(b4, b5);
  a5 = _mm256_unpacklo_epi16(b6, b7);
  a6 = _mm256_unpackhi_epi16(b4, b5);
  a7 = _mm256_unpackhi_epi16(b6, b7);
  b0 = _mm256_unpacklo_epi32(a0, a1);
  b1 = _mm256_unpackhi_epi32(a0, a1);
  b2 = _mm256_unpacklo_epi32(a2, a3);
  b3 = _mm256_unpackhi_epi32(a2, a3);
  b4 = _mm256_unpacklo_epi32(a4, a5);
  b5 = _mm256_unpackhi_epi32(a4, a5);
  b6 = _mm256_unpacklo_epi32(a6, a7);
  b7 = _mm256_unpackhi_epi32(a6, a7);
  a0 = _mm256_permute4x64_epi64(b0, _MM_SHUFFLE(3, 1, 2, 0));
  a1 = _mm256_permute4x64_epi64(b1, _MM_SHUFFLE(3, 1, 2, 0));
  a2 = _mm256_permute4x64_epi64(b2, _MM_SHUFFLE(3, 1, 2, 0));
  a3 = _mm256_permute4x64_epi64(b3, _MM_SHUFFLE(3, 1, 2, 0));
  a4 = _mm256_permute4x64_epi64(b4, _MM_SHUFFLE(3, 1, 2, 0));
  a5 = _mm256_permute4x64_epi64(b5, _MM_SHUFFLE(3, 1, 2, 0));
  a6 = _mm256_permute4x64_epi64(b6, _MM_SHUFFLE(3, 1, 2, 0));
  a7 = _mm256_permute4x64_epi64(b7, _MM_SHUFFLE(3, 1, 2, 0));
  _mm256_storeu2_m128i((__m128i*)(pDst + 1 * dstLineStride), (__m128i*)(pDst + 0 * dstLineStride), a0);
  _mm256_storeu2_m128i((__m128i*)(pDst + 3 * dstLineStride), (__m128i*)(pDst + 2 * dstLineStride), a1);
  _mm256_storeu2_m128i((__m128i*)(pDst + 5 * dstLineStride), (__m128i*)(pDst + 4 * dstLineStride), a2);
  _mm256_storeu2_m128i((__m128i*)(pDst + 7 * dstLineStride), (__m128i*)(pDst + 6 * dstLineStride), a3);
  _mm256_storeu2_m128i((__m128i*)(pDst + 9 * dstLineStride), (__m128i*)(pDst + 8 * dstLineStride), a4);
  _mm256_storeu2_m128i((__m128i*)(pDst + 11 * dstLineStride), (__m128i*)(pDst + 10 * dstLineStride), a5);
  _mm256_storeu2_m128i((__m128i*)(pDst + 13 * dstLineStride), (__m128i*)(pDst + 12 * dstLineStride), a6);
  _mm256_storeu2_m128i((__m128i*)(pDst + 15 * dstLineStride), (__m128i*)(pDst + 14 * dstLineStride), a7);
}

ここでは 16×16要素のサイズが 256 バイトで 256bit レジスタ8本分に収まることを利用しています。処理の単位を大きくして 32×32要素の領域にするとレジスタスピルが起きる為、サイズを抑えてこれぐらいにした方が良いようです。

マルチスレッドによる最適化

transpose 関数の最も外側のループをスレッドで分割します。

    std::vector<size_t> v(nh);
    std::iota(v.begin(), v.end(), 0);
    std::for_each(std::execution::par, v.begin(), v.end(), [=](size_t i) {
    //for (size_t i = 0; i < nh; ++i) {
      for (size_t j = 0; j < nw; ++j) {
        auto pSrc2 = &pSrc[i*bh*srcLineStride + j*bw];
        auto pDst2 = &pDst[j*bw*dstLineStride + i*bh];
        constexpr size_t n = 16;
        for (size_t l = 0; l < bh/n; ++l) {
          for (size_t k = 0; k < bw/n; ++k) {
            auto pSrc3 = &pSrc2[l*n*srcLineStride + k*n];
            auto pDst3 = &pDst2[k*n*dstLineStride + l*n];
            transpose_1b_16x16(pSrc3, srcLineStride, pDst3, dstLineStride);
          }
        }
      }
    });
    //}

Large Page Support (Huge Page) の有効化による最適化

TLB(Translation Lookaside Buffer)の浪費を抑制する為に Large-Page-Support を有効化します。

Windows 10 での有効化手順

  1. 管理ツールのローカルセキュリティポリシーのローカルポリシーのユーザー権利の割り当ての中のメモリ内のページのロックに Administrators グループを追加します。
  2. 追加後にPCを再起動します。
  3. Large-Page-Supportを使用するプログラムを管理者権限で動作させて確認します。

実行時間が安定するように対策

他プロセスの影響を抑えるために下記の Windows API の呼び出しを行う対策を入れました。

SetPriorityClass(GetCurrentProcess(), REALTIME_PRIORITY_CLASS);
SetThreadPriority(GetCurrentThread(), 31);

Turbo Boost を無効化したりすると良いのかもしれませんが、それはやってません。

main 関数

#include <stdint.h>
#include <assert.h>
#include <cmath>

#include "transpose.h"
#include "timer.h"

#if (_WIN32 || _WIN64)
#include <Windows.h>
#include <tchar.h>
#endif

int main()
{
  size_t wh = 256;
  size_t height = wh;
  size_t rep = 7;
  size_t padding = 128;
  size_t bufferSize = height * std::pow(2, rep) + padding;
  bufferSize *= bufferSize;

#if (_WIN32 || _WIN64)
  HANDLE hToken;
  SetPriorityClass(GetCurrentProcess(), REALTIME_PRIORITY_CLASS);
  SetThreadPriority(GetCurrentThread(), 31);
  OpenProcessToken(GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, &hToken);
  TOKEN_PRIVILEGES tp;
  LookupPrivilegeValue(NULL, _T("SeLockMemoryPrivilege"), &tp.Privileges[0].Luid);
  tp.PrivilegeCount = 1;
  tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED;
  BOOL status = AdjustTokenPrivileges(hToken, FALSE, &tp, 0, (PTOKEN_PRIVILEGES)NULL, 0);
  DWORD err = GetLastError();
  CloseHandle(hToken);
  long long unsigned largePageMinimumSize = 0;
  DWORD type = MEM_RESERVE | MEM_COMMIT | MEM_TOP_DOWN;
  if (err == ERROR_SUCCESS) {
    largePageMinimumSize = (long long unsigned)GetLargePageMinimum();
    printf("Large Page Minimum Size: %llu\n", largePageMinimumSize);
    bufferSize = (bufferSize + (largePageMinimumSize-1)) & ~(largePageMinimumSize-1);
    type |= MEM_LARGE_PAGES;
  }
  else {
    printf("Failed to acquire SeLockmemoryPrivilege. Large Page Support unavailable.\n");
  }
  uint8_t* pSrc = (uint8_t*)VirtualAlloc(NULL, bufferSize, type, PAGE_READWRITE);
  uint8_t* pDst1 = (uint8_t*)VirtualAlloc(NULL, bufferSize, type, PAGE_READWRITE);
  uint8_t* pDst2 = (uint8_t*)VirtualAlloc(NULL, bufferSize, type, PAGE_READWRITE);
  uint8_t* pDst3 = (uint8_t*)VirtualAlloc(NULL, bufferSize, type, PAGE_READWRITE);
#else
  std::vector<uint8_t> srcBuffer(bufferSize);
  std::vector<uint8_t> dstBuffer1(bufferSize);
  std::vector<uint8_t> dstBuffer2(bufferSize);
  std::vector<uint8_t> dstBuffer3(bufferSize);
  uint8_t* pSrc = &srcBuffer[0];
  uint8_t* pDst1 = &dstBuffer1[0];
  uint8_t* pDst2 = &dstBuffer2[0];
  uint8_t* pDst3 = &dstBuffer3[0];
#endif

  Timer t;
  for (size_t j = 0; j < rep; ++j, height *= 2) {
    size_t width = wh;
    for (size_t i = 0; i < rep; ++i, width *= 2) {
      size_t srcLineStride = width + padding;
      size_t dstLineStride = height + padding;
      for (size_t y = 0; y < height; ++y) {
        for (size_t x = 0; x < width; ++x) {
          auto v = (uint8_t)(y * dstLineStride + x);
          pSrc[y * srcLineStride + x] = v;
        }
      }

      t.Start();
      transpose_naive(width, height, pSrc, srcLineStride, pDst2, dstLineStride);
      auto timeTransposeNaive = t.ElapsedMicroSecond();

      t.Start();
      transpose_1b(width, height, pSrc, srcLineStride, pDst1, dstLineStride);
      auto timeTranspose = t.ElapsedMicroSecond();

      t.Start();
      {
        const __m256i* __restrict src = (const __m256i* __restrict)pSrc;
        __m256i* __restrict dst = (__m256i* __restrict)pDst3;
        const size_t n256 = (srcLineStride * height + 255) / 256;
        for (size_t i=0; i<n256; ++i) {
          __m256i v0 = _mm256_stream_load_si256(src++);
          __m256i v1 = _mm256_stream_load_si256(src++);
          __m256i v2 = _mm256_stream_load_si256(src++);
          __m256i v3 = _mm256_stream_load_si256(src++);
          __m256i v4 = _mm256_stream_load_si256(src++);
          __m256i v5 = _mm256_stream_load_si256(src++);
          __m256i v6 = _mm256_stream_load_si256(src++);
          __m256i v7 = _mm256_stream_load_si256(src++);
          _mm256_stream_si256(dst++, v0);
          _mm256_stream_si256(dst++, v1);
          _mm256_stream_si256(dst++, v2);
          _mm256_stream_si256(dst++, v3);
          _mm256_stream_si256(dst++, v4);
          _mm256_stream_si256(dst++, v5);
          _mm256_stream_si256(dst++, v6);
          _mm256_stream_si256(dst++, v7);
        }
      }
      auto timeCopy = t.ElapsedMicroSecond();

      printf("%5zu x %5zu | %7zu | %7zu | %7zu\n", width, height, timeTransposeNaive, timeTranspose, timeCopy);

#if 1
      // verify
      for (size_t i = 0; i < width; ++i) {
        if (memcmp(pDst1+i*dstLineStride, pDst2+i*dstLineStride, height) != 0) {
          __debugbreak();
          break;
        }
      }
#endif
    }
  }

#if (_WIN32 || _WIN64)
  ::VirtualFree(pSrc, NULL, MEM_RELEASE);
  ::VirtualFree(pDst1, NULL, MEM_RELEASE);
  ::VirtualFree(pDst2, NULL, MEM_RELEASE);
  ::VirtualFree(pDst3, NULL, MEM_RELEASE);
#endif

  return 0;
}

下記が実行結果です。処理時間の単位はマイクロ秒です。
ナイーブ実装はサイズが大きくなると処理時間の伸びが大きいですが最適化版ではそれがある程度抑制されている事が確認出来ます。

縦横サイズ ナイーブ 最適化 メモリコピー
256 x 256 39 34 4
512 x 256 120 25 9
1024 x 256 252 129 46
2048 x 256 566 156 32
4096 x 256 963 407 145
8192 x 256 2147 779 428
16384 x 256 3418 1784 1167
256 x 512 122 207 46
512 x 512 206 153 59
1024 x 512 491 159 51
2048 x 512 838 346 169
4096 x 512 2001 547 215
8192 x 512 3682 1607 1295
16384 x 512 8589 1493 975
256 x 1024 191 77 68
512 x 1024 607 152 73
1024 x 1024 1032 312 166
2048 x 1024 1718 895 611
4096 x 1024 4407 958 662
8192 x 1024 7766 2161 1823
16384 x 1024 14435 2401 3644
256 x 2048 917 157 96
512 x 2048 1751 329 129
1024 x 2048 3085 428 240
2048 x 2048 3433 1239 789
4096 x 2048 7504 1251 856
8192 x 2048 13741 2883 2134
16384 x 2048 24339 4625 3510
256 x 4096 2172 295 125
512 x 4096 4200 264 155
1024 x 4096 8651 1052 777
2048 x 4096 18210 1204 799
4096 x 4096 29321 2349 1754
8192 x 4096 53653 4268 3465
16384 x 4096 110591 8377 6690
256 x 8192 4312 655 789
512 x 8192 9982 548 384
1024 x 8192 18635 1478 2088
2048 x 8192 38330 2465 1714
4096 x 8192 54441 4421 3291
8192 x 8192 106142 8906 6756
16384 x 8192 214352 17524 13453
256 x 16384 8815 596 691
512 x 16384 22776 1155 997
1024 x 16384 38767 2335 2010
2048 x 16384 93267 4357 3656
4096 x 16384 107821 8519 7303
8192 x 16384 215015 17251 14307
16384 x 16384 460911 34051 26138

数字ばっかりなのでグラフ化した方が良いですね。

計測に使用したPCに搭載されているCPUは、Core i7-6700HQ @ 2.60 GHz です。実行の度に少し処理時間がばらついているのであまり正確な基準にはなりません。目安程度です。

処理時間計測クラス

Windows API を使用しています。

#pragma once
#include <windows.h>

class Timer final {
private:
  LARGE_INTEGER start_;
public:
  Timer() { Start(); }
  void Start() { QueryPerformanceCounter(&start_); }
  static __int64 GetFrequency() {
    LARGE_INTEGER freq;
    QueryPerformanceFrequency(&freq);
    return freq.QuadPart;
  }
  __int64 Elapsed() const {
    LARGE_INTEGER now;
    QueryPerformanceCounter(&now);
    return now.QuadPart - start_.QuadPart;
  }
  double ElapsedSecond() const {
    return Elapsed() / (double) GetFrequency();
  }
  size_t ElapsedMilliSecond() const {
    return Elapsed() * 1000 / GetFrequency();
  }
  size_t ElapsedMicroSecond() const {
    return Elapsed() * 1000 * 1000 / GetFrequency();
  }
};

まとめ

ナイーブな実装に比べるとそこそこ速いです。が、単純なメモリコピーと比較するとまだまだ処理時間が掛かっているので最適な実装からは程遠いようです…。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2