C++で2次元配列の転置を行う処理について解説します。
前書き
入力を上書きせずに別の領域に出力する out-of-place 方式の実装です。
二次元配列の要素のサイズが1バイトの場合用のコードを載せています。2バイトや4バイトや8バイト向けの実装もほぼ同様のやり方で実現出来ます。
横着して Windows 向けのコードを一部使っていてプラットフォーム非依存な記述になっていません。
ナイーブな実装
template <typename T>
void transpose_naive(size_t columns, size_t rows,
const T* __restrict pSrc, ptrdiff_t srcLineStride,
T* __restrict pDst, ptrdiff_t dstLineStride) noexcept
{
for (size_t x = 0; x < columns; ++x) {
for (size_t y = 0; y < rows; ++y) {
pDst[(x * dstLineStride) + y] = pSrc[(y * srcLineStride) + x];
}
}
}
上記のナイーブな実装では2重ループを使用しています。外側が列方向のループで内側が行方向のループ、という建前ですが転置処理なので入力と出力では縦横の立場が逆転します。
内側のループでは入力の二次元配列の要素を読み取る方向が縦方向になっており1要素単位で読み書きしているのでキャッシュの利用がされにくく性能が出ません。ループの順番を入れ替えると入力の二次元配列の要素を読み取る方向が横方向になりますが、出力の二次元配列の要素を書き込む方向が縦方向になり結局性能が出ません。
1要素単位で読み書きしているのも性能が出ない原因ですが、結局メモリアクセスが問題になっています。
キャッシュ・ブロッキングによる最適化
template <typename T>
__forceinline
void transpose(size_t width, size_t height,
const T* __restrict pSrc, ptrdiff_t srcLineStride,
T* __restrict pDst, ptrdiff_t dstLineStride) noexcept
{
if (width == 0
|| height == 0
|| pSrc == nullptr
|| pDst == nullptr
|| srcLineStride < width
|| dstLineStride < height)
{
return;
}
if (width >= 16/sizeof(T) && height >= 16) {
size_t bw = std::min<size_t>(128u/sizeof(T), flp2(width));
size_t bh = std::min<size_t>(512u/sizeof(T), flp2(height));
size_t nw = width / bw;
size_t nh = height / bh;
for (size_t i = 0; i < nh; ++i) {
for (size_t j = 0; j < nw; ++j) {
auto pSrc2 = &pSrc[i*bh*srcLineStride + j*bw];
auto pDst2 = &pDst[j*bw*dstLineStride + i*bh];
constexpr size_t n = 16;
for (size_t l = 0; l < bh/n; ++l) {
for (size_t k = 0; k < bw/n; ++k) {
auto pSrc3 = &pSrc2[l*n*srcLineStride + k*n];
auto pDst3 = &pDst2[k*n*dstLineStride + l*n];
transpose_1b_16x16(pSrc3, srcLineStride, pDst3, dstLineStride);
}
}
}
}
size_t widthRemain = width % bw;
size_t heightRemain = height % bh;
size_t width2 = bw * nw;
size_t height2 = bh * nh;
if (widthRemain) {
transpose(widthRemain, height2, pSrc + width2, srcLineStride, pDst + (width2 * dstLineStride), dstLineStride);
}
if (heightRemain) {
transpose(width, heightRemain, pSrc + height2 * srcLineStride, srcLineStride, pDst + height2, dstLineStride);
}
}
else {
transpose_naive(width, height, pSrc, srcLineStride, pDst, dstLineStride);
}
}
上記の実装では入力の二次元配列を16×16要素の領域毎に分けて transpose_1b_16x16
関数を呼び出しています。本来は2重ループのところをループを分割して4重ループにしているのはメモリアクセスの局所性を高める対策です。
flp2
関数は引数に渡した値以下の最も大きい2の累乗値を返す処理です。
// https://stackoverflow.com/a/2681094
__forceinline
uint32_t flp2(uint32_t x)
{
x = x | (x >> 1);
x = x | (x >> 2);
x = x | (x >> 4);
x = x | (x >> 8);
x = x | (x >> 16);
return x - (x >> 1);
}
AVX2 による最適化
AVX2を使って最適化する事でスカラー実装と比べてインストラクション数を削減する事が出来ます。
最内周で呼び出している transpose_1b_16x16
関数の実装は下記のとおりです。
__forceinline
void transpose_1b_16x16(const uint8_t* __restrict pSrc, ptrdiff_t srcLineStride,
uint8_t* __restrict pDst, ptrdiff_t dstLineStride) noexcept
{
__m256i a0 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 8 * srcLineStride), (__m128i const*)(pSrc + 0 * srcLineStride));
__m256i a1 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 9 * srcLineStride), (__m128i const*)(pSrc + 1 * srcLineStride));
__m256i a2 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 10 * srcLineStride), (__m128i const*)(pSrc + 2 * srcLineStride));
__m256i a3 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 11 * srcLineStride), (__m128i const*)(pSrc + 3 * srcLineStride));
__m256i a4 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 12 * srcLineStride), (__m128i const*)(pSrc + 4 * srcLineStride));
__m256i a5 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 13 * srcLineStride), (__m128i const*)(pSrc + 5 * srcLineStride));
__m256i a6 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 14 * srcLineStride), (__m128i const*)(pSrc + 6 * srcLineStride));
__m256i a7 = _mm256_loadu2_m128i((__m128i const*)(pSrc + 15 * srcLineStride), (__m128i const*)(pSrc + 7 * srcLineStride));
__m256i b0 = _mm256_unpacklo_epi8(a0, a1);
__m256i b1 = _mm256_unpacklo_epi8(a2, a3);
__m256i b2 = _mm256_unpacklo_epi8(a4, a5);
__m256i b3 = _mm256_unpacklo_epi8(a6, a7);
__m256i b4 = _mm256_unpackhi_epi8(a0, a1);
__m256i b5 = _mm256_unpackhi_epi8(a2, a3);
__m256i b6 = _mm256_unpackhi_epi8(a4, a5);
__m256i b7 = _mm256_unpackhi_epi8(a6, a7);
a0 = _mm256_unpacklo_epi16(b0, b1);
a1 = _mm256_unpacklo_epi16(b2, b3);
a2 = _mm256_unpackhi_epi16(b0, b1);
a3 = _mm256_unpackhi_epi16(b2, b3);
a4 = _mm256_unpacklo_epi16(b4, b5);
a5 = _mm256_unpacklo_epi16(b6, b7);
a6 = _mm256_unpackhi_epi16(b4, b5);
a7 = _mm256_unpackhi_epi16(b6, b7);
b0 = _mm256_unpacklo_epi32(a0, a1);
b1 = _mm256_unpackhi_epi32(a0, a1);
b2 = _mm256_unpacklo_epi32(a2, a3);
b3 = _mm256_unpackhi_epi32(a2, a3);
b4 = _mm256_unpacklo_epi32(a4, a5);
b5 = _mm256_unpackhi_epi32(a4, a5);
b6 = _mm256_unpacklo_epi32(a6, a7);
b7 = _mm256_unpackhi_epi32(a6, a7);
a0 = _mm256_permute4x64_epi64(b0, _MM_SHUFFLE(3, 1, 2, 0));
a1 = _mm256_permute4x64_epi64(b1, _MM_SHUFFLE(3, 1, 2, 0));
a2 = _mm256_permute4x64_epi64(b2, _MM_SHUFFLE(3, 1, 2, 0));
a3 = _mm256_permute4x64_epi64(b3, _MM_SHUFFLE(3, 1, 2, 0));
a4 = _mm256_permute4x64_epi64(b4, _MM_SHUFFLE(3, 1, 2, 0));
a5 = _mm256_permute4x64_epi64(b5, _MM_SHUFFLE(3, 1, 2, 0));
a6 = _mm256_permute4x64_epi64(b6, _MM_SHUFFLE(3, 1, 2, 0));
a7 = _mm256_permute4x64_epi64(b7, _MM_SHUFFLE(3, 1, 2, 0));
_mm256_storeu2_m128i((__m128i*)(pDst + 1 * dstLineStride), (__m128i*)(pDst + 0 * dstLineStride), a0);
_mm256_storeu2_m128i((__m128i*)(pDst + 3 * dstLineStride), (__m128i*)(pDst + 2 * dstLineStride), a1);
_mm256_storeu2_m128i((__m128i*)(pDst + 5 * dstLineStride), (__m128i*)(pDst + 4 * dstLineStride), a2);
_mm256_storeu2_m128i((__m128i*)(pDst + 7 * dstLineStride), (__m128i*)(pDst + 6 * dstLineStride), a3);
_mm256_storeu2_m128i((__m128i*)(pDst + 9 * dstLineStride), (__m128i*)(pDst + 8 * dstLineStride), a4);
_mm256_storeu2_m128i((__m128i*)(pDst + 11 * dstLineStride), (__m128i*)(pDst + 10 * dstLineStride), a5);
_mm256_storeu2_m128i((__m128i*)(pDst + 13 * dstLineStride), (__m128i*)(pDst + 12 * dstLineStride), a6);
_mm256_storeu2_m128i((__m128i*)(pDst + 15 * dstLineStride), (__m128i*)(pDst + 14 * dstLineStride), a7);
}
ここでは 16×16要素のサイズが 256 バイトで 256bit レジスタ8本分に収まることを利用しています。処理の単位を大きくして 32×32要素の領域にするとレジスタスピルが起きる為、サイズを抑えてこれぐらいにした方が良いようです。
マルチスレッドによる最適化
transpose
関数の最も外側のループをスレッドで分割します。
std::vector<size_t> v(nh);
std::iota(v.begin(), v.end(), 0);
std::for_each(std::execution::par, v.begin(), v.end(), [=](size_t i) {
//for (size_t i = 0; i < nh; ++i) {
for (size_t j = 0; j < nw; ++j) {
auto pSrc2 = &pSrc[i*bh*srcLineStride + j*bw];
auto pDst2 = &pDst[j*bw*dstLineStride + i*bh];
constexpr size_t n = 16;
for (size_t l = 0; l < bh/n; ++l) {
for (size_t k = 0; k < bw/n; ++k) {
auto pSrc3 = &pSrc2[l*n*srcLineStride + k*n];
auto pDst3 = &pDst2[k*n*dstLineStride + l*n];
transpose_1b_16x16(pSrc3, srcLineStride, pDst3, dstLineStride);
}
}
}
});
//}
Large Page Support (Huge Page) の有効化による最適化
TLB(Translation Lookaside Buffer)の浪費を抑制する為に Large-Page-Support を有効化します。
Windows 10 での有効化手順
- 管理ツールのローカルセキュリティポリシーのローカルポリシーのユーザー権利の割り当ての中のメモリ内のページのロックに Administrators グループを追加します。
- 追加後にPCを再起動します。
- Large-Page-Supportを使用するプログラムを管理者権限で動作させて確認します。
実行時間が安定するように対策
他プロセスの影響を抑えるために下記の Windows API の呼び出しを行う対策を入れました。
SetPriorityClass(GetCurrentProcess(), REALTIME_PRIORITY_CLASS);
SetThreadPriority(GetCurrentThread(), 31);
Turbo Boost を無効化したりすると良いのかもしれませんが、それはやってません。
main 関数
#include <stdint.h>
#include <assert.h>
#include <cmath>
#include "transpose.h"
#include "timer.h"
#if (_WIN32 || _WIN64)
#include <Windows.h>
#include <tchar.h>
#endif
int main()
{
size_t wh = 256;
size_t height = wh;
size_t rep = 7;
size_t padding = 128;
size_t bufferSize = height * std::pow(2, rep) + padding;
bufferSize *= bufferSize;
#if (_WIN32 || _WIN64)
HANDLE hToken;
SetPriorityClass(GetCurrentProcess(), REALTIME_PRIORITY_CLASS);
SetThreadPriority(GetCurrentThread(), 31);
OpenProcessToken(GetCurrentProcess(), TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY, &hToken);
TOKEN_PRIVILEGES tp;
LookupPrivilegeValue(NULL, _T("SeLockMemoryPrivilege"), &tp.Privileges[0].Luid);
tp.PrivilegeCount = 1;
tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED;
BOOL status = AdjustTokenPrivileges(hToken, FALSE, &tp, 0, (PTOKEN_PRIVILEGES)NULL, 0);
DWORD err = GetLastError();
CloseHandle(hToken);
long long unsigned largePageMinimumSize = 0;
DWORD type = MEM_RESERVE | MEM_COMMIT | MEM_TOP_DOWN;
if (err == ERROR_SUCCESS) {
largePageMinimumSize = (long long unsigned)GetLargePageMinimum();
printf("Large Page Minimum Size: %llu\n", largePageMinimumSize);
bufferSize = (bufferSize + (largePageMinimumSize-1)) & ~(largePageMinimumSize-1);
type |= MEM_LARGE_PAGES;
}
else {
printf("Failed to acquire SeLockmemoryPrivilege. Large Page Support unavailable.\n");
}
uint8_t* pSrc = (uint8_t*)VirtualAlloc(NULL, bufferSize, type, PAGE_READWRITE);
uint8_t* pDst1 = (uint8_t*)VirtualAlloc(NULL, bufferSize, type, PAGE_READWRITE);
uint8_t* pDst2 = (uint8_t*)VirtualAlloc(NULL, bufferSize, type, PAGE_READWRITE);
uint8_t* pDst3 = (uint8_t*)VirtualAlloc(NULL, bufferSize, type, PAGE_READWRITE);
#else
std::vector<uint8_t> srcBuffer(bufferSize);
std::vector<uint8_t> dstBuffer1(bufferSize);
std::vector<uint8_t> dstBuffer2(bufferSize);
std::vector<uint8_t> dstBuffer3(bufferSize);
uint8_t* pSrc = &srcBuffer[0];
uint8_t* pDst1 = &dstBuffer1[0];
uint8_t* pDst2 = &dstBuffer2[0];
uint8_t* pDst3 = &dstBuffer3[0];
#endif
Timer t;
for (size_t j = 0; j < rep; ++j, height *= 2) {
size_t width = wh;
for (size_t i = 0; i < rep; ++i, width *= 2) {
size_t srcLineStride = width + padding;
size_t dstLineStride = height + padding;
for (size_t y = 0; y < height; ++y) {
for (size_t x = 0; x < width; ++x) {
auto v = (uint8_t)(y * dstLineStride + x);
pSrc[y * srcLineStride + x] = v;
}
}
t.Start();
transpose_naive(width, height, pSrc, srcLineStride, pDst2, dstLineStride);
auto timeTransposeNaive = t.ElapsedMicroSecond();
t.Start();
transpose_1b(width, height, pSrc, srcLineStride, pDst1, dstLineStride);
auto timeTranspose = t.ElapsedMicroSecond();
t.Start();
{
const __m256i* __restrict src = (const __m256i* __restrict)pSrc;
__m256i* __restrict dst = (__m256i* __restrict)pDst3;
const size_t n256 = (srcLineStride * height + 255) / 256;
for (size_t i=0; i<n256; ++i) {
__m256i v0 = _mm256_stream_load_si256(src++);
__m256i v1 = _mm256_stream_load_si256(src++);
__m256i v2 = _mm256_stream_load_si256(src++);
__m256i v3 = _mm256_stream_load_si256(src++);
__m256i v4 = _mm256_stream_load_si256(src++);
__m256i v5 = _mm256_stream_load_si256(src++);
__m256i v6 = _mm256_stream_load_si256(src++);
__m256i v7 = _mm256_stream_load_si256(src++);
_mm256_stream_si256(dst++, v0);
_mm256_stream_si256(dst++, v1);
_mm256_stream_si256(dst++, v2);
_mm256_stream_si256(dst++, v3);
_mm256_stream_si256(dst++, v4);
_mm256_stream_si256(dst++, v5);
_mm256_stream_si256(dst++, v6);
_mm256_stream_si256(dst++, v7);
}
}
auto timeCopy = t.ElapsedMicroSecond();
printf("%5zu x %5zu | %7zu | %7zu | %7zu\n", width, height, timeTransposeNaive, timeTranspose, timeCopy);
#if 1
// verify
for (size_t i = 0; i < width; ++i) {
if (memcmp(pDst1+i*dstLineStride, pDst2+i*dstLineStride, height) != 0) {
__debugbreak();
break;
}
}
#endif
}
}
#if (_WIN32 || _WIN64)
::VirtualFree(pSrc, NULL, MEM_RELEASE);
::VirtualFree(pDst1, NULL, MEM_RELEASE);
::VirtualFree(pDst2, NULL, MEM_RELEASE);
::VirtualFree(pDst3, NULL, MEM_RELEASE);
#endif
return 0;
}
下記が実行結果です。処理時間の単位はマイクロ秒です。
ナイーブ実装はサイズが大きくなると処理時間の伸びが大きいですが最適化版ではそれがある程度抑制されている事が確認出来ます。
縦横サイズ | ナイーブ | 最適化 | メモリコピー |
---|---|---|---|
256 x 256 | 39 | 34 | 4 |
512 x 256 | 120 | 25 | 9 |
1024 x 256 | 252 | 129 | 46 |
2048 x 256 | 566 | 156 | 32 |
4096 x 256 | 963 | 407 | 145 |
8192 x 256 | 2147 | 779 | 428 |
16384 x 256 | 3418 | 1784 | 1167 |
256 x 512 | 122 | 207 | 46 |
512 x 512 | 206 | 153 | 59 |
1024 x 512 | 491 | 159 | 51 |
2048 x 512 | 838 | 346 | 169 |
4096 x 512 | 2001 | 547 | 215 |
8192 x 512 | 3682 | 1607 | 1295 |
16384 x 512 | 8589 | 1493 | 975 |
256 x 1024 | 191 | 77 | 68 |
512 x 1024 | 607 | 152 | 73 |
1024 x 1024 | 1032 | 312 | 166 |
2048 x 1024 | 1718 | 895 | 611 |
4096 x 1024 | 4407 | 958 | 662 |
8192 x 1024 | 7766 | 2161 | 1823 |
16384 x 1024 | 14435 | 2401 | 3644 |
256 x 2048 | 917 | 157 | 96 |
512 x 2048 | 1751 | 329 | 129 |
1024 x 2048 | 3085 | 428 | 240 |
2048 x 2048 | 3433 | 1239 | 789 |
4096 x 2048 | 7504 | 1251 | 856 |
8192 x 2048 | 13741 | 2883 | 2134 |
16384 x 2048 | 24339 | 4625 | 3510 |
256 x 4096 | 2172 | 295 | 125 |
512 x 4096 | 4200 | 264 | 155 |
1024 x 4096 | 8651 | 1052 | 777 |
2048 x 4096 | 18210 | 1204 | 799 |
4096 x 4096 | 29321 | 2349 | 1754 |
8192 x 4096 | 53653 | 4268 | 3465 |
16384 x 4096 | 110591 | 8377 | 6690 |
256 x 8192 | 4312 | 655 | 789 |
512 x 8192 | 9982 | 548 | 384 |
1024 x 8192 | 18635 | 1478 | 2088 |
2048 x 8192 | 38330 | 2465 | 1714 |
4096 x 8192 | 54441 | 4421 | 3291 |
8192 x 8192 | 106142 | 8906 | 6756 |
16384 x 8192 | 214352 | 17524 | 13453 |
256 x 16384 | 8815 | 596 | 691 |
512 x 16384 | 22776 | 1155 | 997 |
1024 x 16384 | 38767 | 2335 | 2010 |
2048 x 16384 | 93267 | 4357 | 3656 |
4096 x 16384 | 107821 | 8519 | 7303 |
8192 x 16384 | 215015 | 17251 | 14307 |
16384 x 16384 | 460911 | 34051 | 26138 |
数字ばっかりなのでグラフ化した方が良いですね。
計測に使用したPCに搭載されているCPUは、Core i7-6700HQ @ 2.60 GHz です。実行の度に少し処理時間がばらついているのであまり正確な基準にはなりません。目安程度です。
処理時間計測クラス
Windows API を使用しています。
#pragma once
#include <windows.h>
class Timer final {
private:
LARGE_INTEGER start_;
public:
Timer() { Start(); }
void Start() { QueryPerformanceCounter(&start_); }
static __int64 GetFrequency() {
LARGE_INTEGER freq;
QueryPerformanceFrequency(&freq);
return freq.QuadPart;
}
__int64 Elapsed() const {
LARGE_INTEGER now;
QueryPerformanceCounter(&now);
return now.QuadPart - start_.QuadPart;
}
double ElapsedSecond() const {
return Elapsed() / (double) GetFrequency();
}
size_t ElapsedMilliSecond() const {
return Elapsed() * 1000 / GetFrequency();
}
size_t ElapsedMicroSecond() const {
return Elapsed() * 1000 * 1000 / GetFrequency();
}
};
まとめ
ナイーブな実装に比べるとそこそこ速いです。が、単純なメモリコピーと比較するとまだまだ処理時間が掛かっているので最適な実装からは程遠いようです…。