Synthesijer入門1からの続きです.入門1では,QuickSortのプログラムをJavaで記述してから,FPGAに実装するまでに必要なJavaプログラムの記述の変更とVerilog-HDLファイルの記述について触れました.入門2ではスレッドを用いた並列化によってQuickSortを高速化し,さらに性能を評価するためのカウンタを取り付けてみます.
目次
- マルチスレッド版ソートを論理回路として実装する
- 性能評価用カウンタを追加する
マルチスレッド版ソートを論理回路として実装する
先程までのプログラム(論理回路)は100個のデータをQuicksortのアルゴリズムに従って順番に値を比較し,並べ替えるプログラム(論理回路)でした.次はスレッドを利用した並列処理をソートに導入します.具体的には,入力データを50個の2つのグループに分け,それら50個のソートを2つのQuicksortで並列に実行させます.このままでは100個のデータのソートにならないため,2つのグループのソートデータをマージソート(mergesort)で1つのソート結果にまとめます.以下に,マージソートを含むSortTop.javaを示します.
public class SortTop extends Thread{
public static final int NUM = 100;
public static final int BLOCKSIZE = 50; // 100/2
// 実行状態を表すためのフラグを追加
public boolean start_flag;
public boolean error_flag;
public boolean done_flag;
public boolean merge_flag;
/* Randomクラスは論理回路に変換できない,かつ,
* 配列も動的にインスタンス化できないので,
* 配列valuesをフィールド変数として宣言する.
*/
private int [] values = new int[NUM];
/* QuickSortクラスのインスタンス化も静的にする */
private QuickSort2 qsort0 = new QuickSort2();
private QuickSort2 qsort1 = new QuickSort2();
public void run() {
/* フィールド変数の定義時の初期化はサポートされていないので,
* init()メソッド内で初期化の処理を追加する */
init();
/* qsortに配列データをセットする */
for( int i=0; i<BLOCKSIZE; i++) {
qsort0.setValue(i, values[i]);
qsort1.setValue(i, values[BLOCKSIZE+i] );
}
//System.out.println("Start sorting");
start_flag = true;
// QuickSortを2つ並列に実行開始
qsort0.start();
qsort1.start();
try {
qsort0.join();
qsort1.join();
} catch(Exception e) {
}
/* 2つのQuicksortのソート結果をマージソートして *
* 配列valuesに格納する.
*/
//System.out.println("Start mergesort.");
merge_flag = true;
mergeSort();
for( int i=0; i< values.length; i++) {
//System.out.println(values[i]);
if( i!=0 && (values[i-1]>values[i])) {
//System.out.println("Sort Error!!");
//System.exit(1);
error_flag = true;
}
}
//System.out.println("Done.");
done_flag = true;
}
/* 2つのQuicksortのソート結果をマージソートして *
* 配列valuesに格納する.
*/
private void mergeSort() {
int left=0, right=0;
int i=0;
int l_value=qsort0.getValue(0);
int r_value=qsort1.getValue(0);
for(; (left < BLOCKSIZE) && (right<BLOCKSIZE); i++) {
if( l_value < r_value ) {
values[i] = l_value;
left++;
l_value = qsort0.getValue(left);
} else {
values[i] = r_value;
right++;
r_value = qsort1.getValue(right);
}
}
while( left < BLOCKSIZE ) {
values[i++] = l_value;
left++;
l_value = qsort0.getValue(left);
}
while( right < BLOCKSIZE ) {
values[i++] = r_value;
right++;
r_value = qsort0.getValue(right);
}
}
/* フィールド変数の定義時の初期化はサポートされていないので,
* init()メソッド内で初期化の処理を追加する */
private void init() {
start_flag = false;
error_flag = false;
done_flag = false;
merge_flag = false;
values[0] = -580072087;
values[1] = -301572182;
(省略)
values[99] = -400747481;
}
}
上記では,staticな定数としてBLOCKSIZEを定義しています.このBLOCKSIZEは1つのQuicksortが担当するソートの個数を表しています.このBLOCKSIZEは今回は2並列で実装しているので,100/2=50にしていますが,並列数が増加するとこの個数は変化するので,つい,100/2と書きたくなってしまいます.ですが,そのような式で値を書いた場合,Synthesijerでは正しく論理回路を生成してくれませんので,気をつけて下さい.
また2つのクイックソートを実装したいので,2つのクイックソートをインスタンス変数(qsort0, qsort1)として定義しています.ここも出来れば配列を用いて実現したいところなのですが,実行時に動的にインスタンスを生成することができないため,配列を使用しないで実現する必要があります.その後は,qsort0.start(), qsort1.start()によってスレッドの実行を開始しています.これは論理回路になってもスレッドの振る舞いと同様に,ソートの終了を待たずに次の処理に進みます.スレッドの終了はqsort0.join()などでソートの終了を待っています.
Quicksortの実行が終了した後はマージソートを実行しています.マージソートはSortTopクラス内のメソッドとして実現しています.
次にQuickSort2.javaのソースコードを示します.
public class QuickSort2 extends Thread {
private static final int NUM = 50;
public int [] arr = new int [NUM];
private int [] left_stack = new int [NUM];
private int [] right_stack = new int [NUM];
public int getValue(int index) {
if( index < NUM )
return arr[index];
else
return 0;
}
public void setValue(int index, int data) {
arr[index] = data;
}
public void run() {
quickSort(0,50);
}
public void quickSort(int left, int right) {
int sp;
sp = 0;
left_stack[0] = 0;
right_stack[0] = arr.length-1;
sp = 1;
/* スタックが空になるまで繰り返す */
while( sp > 0 ) {
/* スタックPOP */
sp--;
left = left_stack[sp];
right = right_stack[sp];
int index = partition(left, right);
/* スタックPUSH */
if( left < index-1 ) {
left_stack[sp] = left;
right_stack[sp++] = index-1;
}
if( index < right ) {
left_stack[sp] = index;
right_stack[sp++] = right;
}
}
}
private int partition(int left, int right) {
int i = left, j = right;
int tmp;
//int pivot = arr[(left + right) / 2];
int pivot = arr[(left + right) >> 1];
while (i <= j) {
while (arr[i] < pivot)
i++;
while (arr[j] > pivot)
j--;
if (i <= j) {
tmp = arr[i];
arr[i] = arr[j];
arr[j] = tmp;
i++;
j--;
}
};
return i;
}
}
QuickSort2.javaの変更点は以下になります.
- スレッドとして実現したいのでThreadクラスを継承した.
- SortTop.javaのマージソート時に配列外アクセスの可能性があるので,getValueメソッドで,配列外アクセスの場合には0を返すように処理を変更した
- スレッドとして実行するために,runメソッドから処理の実行を開始するように変更した.
ここまで出来たら,後はSynthesijerでHDLを生成して,FPGA用の構成情報を作成して,実行して確認して下さい.一度,Synthesijerが正しく動作する論理回路を生成するJavaプログラムが書けると,改良は比較的,簡単にできると思います.
性能評価用カウンタを追加する
ここまでのステップでスレッドによる並列化をしたのですが,FPGAボードで実行しても一瞬で終わってしまって,並列化した効果が分かりません.効果を体感するためにデータ数を多くしても良いのですが,ここでは正確に性能が計れるようにカウンタを追加しましょう.つまり,ソートの開始と同時にcount.start()でカウントアップを始め,ソートが終わると同時にcount.stop()でカウンタを止め,それを表示することを考えます.そのため,最初にカウンタの記述をし,次にカウンタの値を7セグメント(略して7セグ)ディスプレイに表示するための7セグメントデコーダの記述をします.
性能評価用カウンタを記述する
以下に性能評価用カウンタCounter.javaのソースコードを示します.
import synthesijer.rt.*;
public class Counter{
private int count;
private boolean state;
public void init() {
count = 0;
state = false;
}
public void start() {
state = true;
}
public void stop() {
state = false;
}
public int getValue() {
return count;
}
@auto
private void run() {
/* カウントは3サイクルに1回プラスされる*/
while(state) {
count=count+3;
}
}
}
動作としては,startメソッドによりstateという状態変数をtrueにします.すると常に動作しているrunメソッドによりstateがtrueになるとカウントアップします.そして,stopメソッドによりstateの値をfalseにするとカウントアップは停止します.そしてカウンタ値の初期化はinitメソッドで,値の取得はgetValueメソッドで実現しています.ここで,カウントアップを+1ではなく+3にしていますが,その理由は上記の記述により生成された回路を回路シミュレーターで確認したところ,このカウントアップ操作が3サイクルに1度実行されていたためです.
このrunメソッドには@autoというアノテーションがついていますが,これはSynthesijerで常に動作するメソッドにつけるアノテーションです.このアノテーションを使用しているため,最初にsynthesijerのimport文を追加しています.
7セグデコーダを追加して,カウンタ値を表示する
7セグデコーダは最初に4ビットの値に対する7セグデコーダを作成します.以下に,4ビットの7セグデコーダを示します.
public class SevenSegmentDeCoder04 {
public char convert(char value) {
value = (char)(value & 0x0F);
switch(value) {
case 0: return 0x3F;
case 1: return 0x06;
case 2: return 0x5B;
case 3: return 0x4F;
case 4: return 0x66;
case 5: return 0x6D;
case 6: return 0x7D;
case 7: return 0x07;
case 8: return 0x7F;
case 9: return 0x6F;
case 10: return 0x77;
case 11: return 0x7C;
case 12: return 0x39;
case 13: return 0x5E;
case 14: return 0x79;
default: return 0x71;
}
}
}
上記ではconvertメソッドに渡された数値を対応する7セグメントディスプレイ用のビット列に変換しています.7セグメントディスプレイにおける数値とビット列のパターンはWikiのページを参考にしました.このデコーダではconvertメソッドの引数で渡された数値を8ビットのビット列にして返しています.
次に,上記の4ビットの7セグデコーダを使用して8ビットの7セグデコーダを作成しました.
public class SevenSegmentDeCoder08 {
private SevenSegmentDeCoder04 dec = new SevenSegmentDeCoder04();
public short convert(char value) {
short display;
char lower, upper;
char lower_display, upper_display;
lower = (char)(value & 0x0F);
upper = (char)((value >> 4) & 0x0F);
lower_display = dec.convert(lower);
upper_display = dec.convert(upper);
display = (short) (((short)upper_display<<8) | lower_display);
return display;
}
}
こちらは8ビットの数値を2つの7セグメントディスプレイに表示するための回路になります.7セグメントディスプレイ用のビット列が2つ必要ですので,convertメソッドの返り値は16ビットのshort型にしています.
以下同様に,16, 32ビットの数値を変換するクラスを作成しました.
public class SevenSegmentDeCoder16 {
private SevenSegmentDeCoder08 dec = new SevenSegmentDeCoder08();
public int convert(short value) {
int display;
char lower, upper;
short lower_display, upper_display;
lower = (char)(value & 0xFF);
upper = (char)((value >> 8) & 0xFF);
lower_display = dec.convert(lower);
upper_display = dec.convert(upper);
display = (int) (((int)upper_display<<16) | lower_display);
return display;
}
}
public class SevenSegmentDeCoder32 {
private SevenSegmentDeCoder16 dec = new SevenSegmentDeCoder16();
public long convert(int value) {
long display;
short lower, upper;
int lower_display, upper_display;
lower = (short)(value & 0xFFFF);
upper = (short)((value >> 16) & 0xFFFF);
lower_display = dec.convert(lower);
upper_display = dec.convert(upper);
display = (long) (((long)upper_display<<32) | lower_display);
return display;
}
}
SortTop.javaにカウンタと7セグデコーダを追加する
後はSortTop.javaにカウンタと7セグデコーダを追加すれば完成です.以下にSortTop.javaのソースコードを示します.
public class SortTop extends Thread{
public static final int NUM = 100;
public static final int BLOCKSIZE = 50; // 100/2
// 実行状態を表すためのフラグを追加
public boolean start_flag;
public boolean error_flag;
public boolean done_flag;
public boolean merge_flag;
// 7セグメントディスプレイ用の変数
public long seven;
/* Randomクラスは論理回路に変換できない,かつ,
* 配列も動的にインスタンス化できないので,
* 配列valuesをフィールド変数として宣言する.
*/
private int [] values = new int[NUM];
/* QuickSortクラスのインスタンス化も静的にする */
private QuickSort2 qsort0 = new QuickSort2();
private QuickSort2 qsort1 = new QuickSort2();
/* 性能評価用カウンタ */
Counter count = new Counter();
/* 7セグメントデコーダ */
SevenSegmentDecoder32 dec = new SevenSegmentDecoder32();
public void run() {
/* フィールド変数の定義時の初期化はサポートされていないので,
* init()メソッド内で初期化の処理を追加する */
init();
/* qsortに配列データをセットする */
for( int i=0; i<BLOCKSIZE; i++) {
qsort0.setValue(i, values[i]);
qsort1.setValue(i, values[BLOCKSIZE+i] );
}
//System.out.println("Start sorting");
start_flag = true;
//計測スタート
count.start();
// QuickSortを2つ並列に実行開始
qsort0.start();
qsort1.start();
try {
qsort0.join();
qsort1.join();
} catch(Exception e) {
}
/* 2つのQuicksortのソート結果をマージソートして *
* 配列valuesに格納する.
*/
//System.out.println("Start mergesort.");
merge_flag = true;
mergeSort();
// 計測停止
count.stop();
for( int i=0; i< values.length; i++) {
//System.out.println(values[i]);
if( i!=0 && (values[i-1]>values[i])) {
//System.out.println("Sort Error!!");
//System.exit(1);
error_flag = true;
while(true);
}
}
//System.out.println("Done.");
done_flag = true;
// 性能評価結果を7セグメントディスプレイに表示
seven = dec.convert(count.getValue());
}
/* 2つのQuicksortのソート結果をマージソートして *
* 配列valuesに格納する.
*/
private void mergeSort() {
int left=0, right=0;
int i=0;
int l_value=qsort0.getValue(0);
int r_value=qsort1.getValue(0);
for(; (left < BLOCKSIZE) && (right<BLOCKSIZE); i++) {
if( l_value < r_value ) {
values[i] = l_value;
left++;
l_value = qsort0.getValue(left);
} else {
values[i] = r_value;
right++;
r_value = qsort1.getValue(right);
}
}
while( left < BLOCKSIZE ) {
values[i++] = l_value;
left++;
l_value = qsort0.getValue(left);
}
while( right < BLOCKSIZE ) {
values[i++] = r_value;
right++;
r_value = qsort0.getValue(right);
}
}
/* フィールド変数の定義時の初期化はサポートされていないので,
* init()メソッド内で初期化の処理を追加する */
private void init() {
start_flag = false;
error_flag = false;
done_flag = false;
merge_flag = false;
count.init();
values[0] = -580072087;
values[1] = -301572182;
(省略)
values[99] = -400747481;
}
}
変更点はカウンタの初期化,スタート,ストップを追加し,最後に7セグメントディスプレイに表示するため,7セグデコーダ結果をsevenという変数に格納しています(変数sevenをFPGAボードの7セグメントディスプレイと接続します)
この記述に対応したtop.vも下記に示します.
module top(input CLOCK_50,
input [3:0] KEY,
output [7:0] LEDG,
output [17:0] LEDR,
output [6:0] HEX0, HEX1, HEX2, HEX3, HEX4, HEX5, HEX6, HEX7);
wire [63:0] seven;
assign HEX0 = ~seven[6:0];
assign HEX1 = ~seven[14:8];
assign HEX2 = ~seven[22:16];
assign HEX3 = ~seven[30:24];
assign HEX4 = ~seven[38:32];
assign HEX5 = ~seven[46:40];
assign HEX6 = ~seven[54:48];
assign HEX7 = ~seven[62:56];
SortTop sort(
.clk(CLOCK_50),
.reset(~KEY[0]),
.run_req(~KEY[1]),
.run_busy(LEDG[7]),
.start_flag_we(1'b0),
.start_flag_out(LEDG[0]),
.done_flag_we(1'b0),
.done_flag_out(LEDG[1]),
.merge_flag_we(1'b0),
.merge_flag_out(LEDG[2]),
.error_flag_we(1'b0),
.error_flag_out(LEDR[0]),
.seven_we(1'b0),
.seven_out(seven)
);
endmodule
7セグメントディスプレイ用の接続の記述が追加されている以外は大きな変更はありません.
これらの変更をした後,FPGAボードで実行してみますと7セグメントディスプレイに「3123」と表示されます.これは16進数表記なので,10進数にすると12579サイクルということが分かります.また詳細は省きますが,シングルスレッド版のソートもサイクル数を計測すると21984サイクルでした.以上から,確かに並列化の効果はありそうです.