TL; DR;
何かしらの計算を複数のプロセスやスレッド間で分担して実行する場合に、全体で正しく処理を進行するためにはプロセス/スレッド間で値の受け渡しや調停が必要になります。
値の送受信には、同じメモリ空間を共有して互いが共通のメモリアドレス上の値を書き換える方式(共有メモリ)と、連携の必要なプロセス/スレッド間で値を送り合うメッセージパッシングとの、大きく分けて2つのアプローチが存在します。
年内にもう一本ぐらい記事を書きたい気分になったので今回はこれらをサンプルを交えて紹介していきます。
スレッド
別々に進行する処理の間でメモリを共有する例を考えてみます。
まず、単にメインスレッドとは別のスレッドをcreate()して各々が関数を実行する場合です。
# include <thread>
void echo(string strs) {
printf("%d", strs);
}
int main {
// スレッド毎に実行
std::thread th1(echo("hoge"));
std::thread th2(echo("fuga"));
// スレッド終了待ち受け
th1.join();
th2.join();
}
スレッド毎に独立した処理をさせる分にはこれで良いのですが、同時に処理を進行させる以上はスレッド間で値を共有する必要が出てきます。
次はスレッド毎に決められた数だけ互いにカウンタを更新する場合を考えてみます。
# include <stdio.h>
# include <thread>
int v = 0;
void increment() {
for (int i=1; i <= 100000; i++) {
v++;
}
}
int main() {
std::thread th1(increment);
std::thread th2(increment);
th1.join();
th2.join();
printf("%d", v);
}
(たいへん安直なサンプルなのはご容赦ください..)
成る程、2スレッドが100000回ずつ数え上げて最終的にv
は200000になる、と言いたいところですが現実はそうはなりません。
実際に最終的な値は決定的ではなく、実行状況次第で毎度異なります。
これを書いた人は各々のスレッドが行儀良く交互に値を書き換えることを期待しますが、実際にはとあるスレッドが値を読んで書き戻す間に、もう片方のスレッドが追い越したり、古い値で上書いてしまうことがあるからです。
mutex
これに対するアプローチとして、1つのスレッドが特定の値を触っている間は他のスレッドが読み書きできないよう値をロックしてみます。
# include <thread>
# include <mutex>
int v = 0;
std::mutex mtx;
void increment() {
for (int i=0;i<10000;i++) {
std::lock_guard<std:mutex> lock(mtx);
v++;
}
}
int main {
std::thread th1(increment);
std::thread th2(increment);
th1.join();
th2.join();
printf("%d", v);
}

これで正しくカウンタが書き換わりました。
しかしながら一つの値についてロックを獲得できるスレッドは一つなので、同じ値を読み書きするスレッドが多ければ多いほどロックの獲得待ちが発生します。
CAS (Compare-And-Swap)
もう一つのアプローチとしては、スレッドが値を書き戻す前に直前に読んだ値と等しいか確認し(compare)、等しい場合に書き戻す(swap)という方法が存在します。
# include <thread>
# include <mutex>
int v = 0;
int comp_and_swap(int* target, int expected, int desired) {
if (*target == expected) {
// 直前に読んだ値と等しければ書き換える
*target = desired;
return true;
}
return false;
}
void increment() {
for (int i=0;i<100000;i++) {
int desired = v + 1;
// 値の書き換えに成功するまでリトライする
while(!comp_and_swap(&v, v, desired));
}
}
int main() {
std::thread th1(increment);
std::thread th2(increment);
th1.join();
th2.join();
printf("%d", v);
}

こちらは値の書き換えに失敗したスレッドのみが再度値を読み直して処理を繰り返すので、複数のスレッドがロックの獲得待ちにならないという特徴があります。
しかしながら、今回は値の数え上げなので発生しませんが、直前に割り込んだスレッドが割り込まれたスレッドが最後に読んだ値と同じ値で書き戻した場合等に、割り込まれたスレッドは途中で値が書き換えられたことを検知できないという問題もあります。
CSP
goroutine
go言語のケースを見てみます。
go言語はgroutineと呼ばれる軽量スレッドを備えているので、OSのスレッドに比べ少ないメモリで生成することができます。
関数の呼び出し時にgo
を付け加えることでその関数は別のgoroutine上で実行することができます。
package main
import (
"fmt"
"time"
)
func echo(strs string) {
fmt.Println(strs)
}
func main() {
var strs string = "Hello World"
go echo(strs)
go echo(strs)
// 終了待ち受け
time.Sleep(time.Second)
}
channel
goroutineの間で値の共有が必要な場合はchannelを使用します。
channelは個別のgoroutineとは分離したキューのような機構で、ここに各々のgroutineが値を出し入れすることで値の受け渡しを行います。
package main
import ("fmt"
"sync")
func increment(ch chan int, wg *sync.WaitGroup) {
for i := 0; i < 100000; i++ {
// channelから値を取り出してインクリメントして入れ直す
v := <- ch
v++
ch <- v
}
wg.Done()
}
func main() {
wg := &sync.WaitGroup{}
var v int = 0
ch := make(chan int, 1)
ch <- v
wg.Add(1)
go increment(ch, wg)
wg.Add(1)
go increment(ch, wg)
// 終了待ち受け
wg.Wait()
fmt.Println(<- ch)
}

後のactorモデルとの違いとしては、channelはプロセス/スレッド毎に結びついている訳ではないので、1つのchannelに対し複数のgoroutineが連携することもあれば、1つgoroutineで複数のchannelを受け持つというように、n:mの柔軟な組み合わせが出来ることが挙げられると思います。
Actor
メッセージパッシングにはCSPとは別に、Actorというモデルがあります。
間にchannelというレイヤを介してプロセス同士が連携するCSPとは異なり、Actorはプロセスに名前をつけて互いに値を送り合います。
Actorモデルを採用している言語にはScalaやErlang、Rustがありますが、今回はelixirの例を取り上げてみます。
(ElixirもといErlangは自身のメッセージパッシングの方式をactorと言い切っていないような気がしますが...)。
elixirもgoと同様、独立したメモリ空間を持つ独自の軽量プロセスを簡単に生成することができます。
spawn/3
の引数に実行したい関数を渡すことで生成した軽量プロセス上で処理を実行することができます。
# 標準出力するだけの何か
defmodule Hello do
def echo(msg) do
IO.puts msg
end
end
# 軽量プロセス上で実行
# spawn(モジュール名, 関数名, 引数)
spawn(Hello, :echo, ["hoge"])
spawn(Hello, :echo, ["fuga"])
ではプロセス間の値の受け渡しはどうするかというと、値を送りたいプロセスを直接指定してメッセージを送信します。
self/1
やspawn
で自分や生成先のプロセスのIDが返るので、値を送る時はメッセージの中に送信先や返信先のプロセスのIDも含めて送信します。
値を受け取る側はreceiveを使って他プロセスからのメッセージの待ち受けを行います。
defmodule Counter do
def increment(count, parent) do
receive do
{:incr, n, target } ->
_increment(:incr, count, n, target, parent)
# プロセス存続のため再帰
increment(count - 1, parent)
{:finish, n, target } ->
_increment(:finish, count, n, target, parent)
increment(count - 1, parent)
{:shutdown} ->
exit(:normal)
_ ->
exit(:normal)
end
end
# カウントアップ
defp _increment(:incr, _, n, target, _), do: send target, {:incr, n + 1, self()}
# カウントアップ終わり
defp _increment(:incr, 0, n, target, _), do: send target, {:finish, n, self()}
defp _increment(:finish, 0, n, _, parent), do: send parent, {:finish, n, self()}
defp _increment(:finish, _, n, target, _), do: send target, {:finish, n + 1, self()}
end
n = 0 # 軽量プロセス間のカウンタ
child1 = spawn(Counter, :increment, [1000000, self()])
child2 = spawn(Counter, :increment, [1000000, self()])
# 子プロセスに値を渡す
send child1, {:incr, n, child2}
# 結果待ち受け
receive do
{:finish, n, _} ->
IO.puts n
end
send child1, {:shutdown}
send child2, {:shutdown}
(え、こんな野暮ったい書き方はしない?)
軽量プロセスの間にchannelというレイヤが挟まるgoとは異なり、プロセス間で直接値の受け渡しをするイメージが近いです。
また、値を受け取る時にパターンマッチングを利用することで、選択的にメッセージを受信したり内容以下で処理を分けることができます。
サンプルでは生成したプロセスIDを持ち回ってメッセージを送っていますが、elixirはプロセス生成時に名前を登録することができるので、実際は名前を宛先にしてメッセージを送るケースが多いかと思います。
更に互い名前を登録している限りでは、別のホスト上で起動しているプロセスに対してネットワーク越しにメッセージを送受信することもできます。
参考