Help us understand the problem. What is going on with this article?

【旧】Channel のススメ

【2020/02/25 追記】この記事はJuliaの古いバージョンに対応したものです。2019/12 時点の最新バージョンに対応した記事をリライトしています、そちらも参照してください→ Channel のススメ for Julia v1.3

この記事は、Julia Advent Calendar 2017 の11日目の記事です。

今回は Channel という機能について紹介したいと思います。
対象は、Julia v0.6.x/v0.5.x です1

この記事で言いたいことは、以下の2点です:

  • 今まで Task Iteration を使ってきた人は、Channel Iteration に乗り換えよう!
  • Channel で 並行(並列)処理と触れあおう!

Channel とは?

Channel とは、簡単にまとめると、以下のようなモノです:

  • 複数の Task(=コルーチン)間でデータをやり取りするモノ2

…簡単にまとめすぎて1行で済ませてしまった。

もう一言だけ言っておくと、これは外部パッケージではなく、Julia の標準機能(標準ライブラリに入っているもの)である、ということ。でもたぶん、認知度そんなに高くないんじゃないかな、と思って今回、紹介しようと思い立ったわけです。

具体的には、以下にサンプルを示しながら、できること・使い途を紹介していきたいと思います。

お題1:ズンドコキヨシ

まずは、少し前に流行った『ズンドコキヨシ』。
v0.6.x/v0.5.x どちらでも動作。

zundokochannel.jl
function zundokochannel()
    function _producer(channel::Channel{String})
        i = 1
        while i > 0
            zd, i = rand([("ズン", i + 1), ("ドコ", +(i<5))])
            put!(channel, zd)
        end
        put!(channel, "キ・ヨ・シ!")
        close(channel)
    end
    channel = Channel{String}(32)
    @schedule _producer(channel)
    channel
end

注意点としては、関数 zundokochannel() は実行しても標準出力には何も出力されない、ということ。
この関数の戻り値は Channel{String} 型です。それを例えば以下のように for 式に渡すと、本来の『ズンドコキヨシ』のように動作します:

julia> for cw in zundokochannel()
           print(cw)
       end
ドコズンズンズンドコドコドコドコズンドコドコドコズンズンズンズンズンドコキ・ヨ・シ!

ちゃんと『ズン』が4回(以上)続いた後の『ドコ』の後に『キ・ヨ・シ!』と表示されますね。

また collect() 関数に渡せば、"ズン", "ドコ", "キ・ヨ・シ!" からなる文字列のリスト(正確には1次元配列)が得られます:

julia> collect(zundokochannel())
14-element Array{String,1}:
 "ズン"    
 "ドコ"    
 "ズン"    
 "ズン"    
 "ドコ"    
 "ドコ"    
 "ズン"    
 "ドコ"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ドコ"    
 "キ・ヨ・シ!"

このように Channel を使うと「次々と算出した値を送出するストリーム」を実現できます。

解説

Channel のできること・使い途その1。「Iteration(Generator の代わり)」。

zundokochannel() 関数内で、別の関数 _producer() を定義していますが、それを直接呼び出す代わりに、12行目で @schedule _producer(channel) と記述しています。
@schedule は、以下に続く式を Task にラップして、内部的なタスクスケジューラーに登録するマクロ。Task とは、所謂 コルーチン で、一連の処理の中で中断・継続を実施することで複数のコルーチン間で協調動作ができます。その結果として、Task としてラップした処理の中では、メインルーチンの処理とは無関係に別のロジックを動かすことができ、必要に応じてその結果をメインルーチン(や他のコルーチン)に随時引き渡すこととかもできます。

_producer() 関数の中では、定義に従って『ズン』『ドコ』『キ・ヨ・シ!』を順に算出するロジックが記述されています。あとは「標準出力に出力する」とか「リストにどんどん追加する」という記述がない代わりに、put!(channel, zd) という式によって、Channel 経由で他のコルーチンにその値を引き渡しています。

それを利用する側は、同じ channel オブジェクトから、他のコルーチンで生成・送出された値を順次受け取り、それを処理することができます。Julia では Channel 型は Iterable なので、そのまま for に渡したり、引数にイテレータを要求する関数(collect のほかに filter とか take とか)に渡したりすることができます。

比較:Task Iteration

Julia には、上記と同じことを実現する別の方法が用意されています(ました)。
先ほど説明した Task を直接利用する方法です。

zundokotask.jl
function zundokotask()
    function _producer()
        i = 1
        while i > 0
            zd, i = rand([("ズン", i + 1), ("ドコ", +(i<5))])
            produce(zd)
        end
        produce("キ・ヨ・シ!")
    end
    @task _producer()
end

この関数の戻り値は Task です。for に渡しても collect() に渡しても全く同様の結果となります(以下は Julia v0.5.2 で for 式を実行した例です)。

julia> VERSION
v"0.5.2"

julia> for cw in zundokotask()
           print(cw)
       end
ズンドコズンズンドコズンドコドコズンズンズンドコズンドコドコズンズンズンズンズンドコキ・ヨ・シ!

produce() 関数は、コルーチンを一時中断して、別の(その Task を実行した)メインルーチン(または他のコルーチン)に値を引き渡す役割をします。つまり Channel を経由せずに直接やりとりしているわけです。『Python の yield と同じ働き』、と説明した方がピンとくる方もいらっしゃるかもしれません。

これを Task Iteration という言い方をします。
Channel を利用したイテレーションと比較すると、以下のようになります。

Channel Iteration Task Iteration
値の引き渡し put!(channel, 《値》) produce(《値》)
値の受け取り take!(channel) consume(task)
受け渡しの仕組み wait queue※1 タスク切り替え時※2
受け渡される値の型 型パラメータで指定されたもの
Channel{T}T
不定
Any 型)
記述量 (詳細後述) 短め?

※1 Channel の「wait queue」による値の受け渡し概要:

  1. take! は、指定した Channel に値がバッファされていたら、その先頭の値を取り出してすぐに戻る。そうでなければ、Channel に値が追加(どこかの Task で put!)されるまで wait する。
  2. put! は、指定した Channel のバッファに空きがあればそこに値を追加してすぐに戻る。そうでなければ、どこか(別の Task)で take! が実行されるまで wait する。
  3. 1., 2. を繰り返す(並行可)ことで、「どこかの Task で put! した値を別の Task の take! で順次取り出す」を実現。

※2 Task の「タスク切り替え時」における値の受け渡し概要3

  1. consume(task) が実行されたとき、実行中のタスク(時としてメインルーチン)の実行が一時中断し、指定した task に処理が移る。
  2. その task 内で produce(《値》) が実行されたとき、その task の実行が中断され、他のタスクに処理が移る。
  3. consume(task) を実行したタスクに処理が戻ったとき、produce(《値》) で渡された値を受け取り、処理が再開する。

上記の説明中に出てきた take!() 関数と consume() 関数は、今までのコードには(まだ)出てきていませんが、Channel/Task を for 式に渡したときに内部で使用されています。

なお。
Task Iteration は、Julia v0.6 では deprecated になりました4。実行すると以下のように大量の Warning が出ます(v0.6.1 で実行した結果)。

julia> VERSION
v"0.6.1"

julia> collect(zundokotask())
WARNING: Task Iteration is now deprecated. Use Channels for inter-task communication.  A for-loop on a Channel object is terminated by calling `close` on the object.
Stacktrace:
 [1] depwarn(::String, ::Symbol) at ./deprecated.jl:70
  :《略》
WARNING: consume is now deprecated. Use Channels for inter-task communication.
Stacktrace:
 [1] depwarn(::String, ::Symbol) at ./deprecated.jl:70
  :《略》
WARNING: produce is now deprecated. Use Channels for inter-task communication.
Stacktrace:
 [1] depwarn(::String, ::Symbol) at ./deprecated.jl:70
  :《略》
10-element Array{String,1}:
 "ドコ"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ズン"    
 "ドコ"    
 "キ・ヨ・シ!"

はい。Warning の内容に「Channel 使え」て出てますね。「Channelfor ループで使うなら、ちゃんと close() 呼べば止まるよ」と親切に教えてくれていますね。

ちなみにイテレーション目的での Channel 利用については、Julia v0.6 から以下のように(今までの Task Iteration と同じくらい)簡単に記述できるようになりました。

zundokochannel_v06.jl
function zundokochannel_v06()
    Channel(ctype=String, csize=32) do channel::Channel{String}
        i = 1
        while i > 0
            zd, i = rand([("ズン", i + 1), ("ドコ", +(i<5))])
            put!(channel, zd)
        end
        put!(channel, "キ・ヨ・シ!")
    end
end

記述量的にも Task Iteration と同じくらい短いですし、名前付き引数 ctype をきちんと設定すれば、(Channel Iteration の特徴の1つである)列挙される値の型も指定できる(Channel(ctype=T) do ~ end の戻り値は Channel{T} 型になる)ので、最適化の恩恵も受けやすくなります。
あと目立たないですが Point としては、close(channel) を記述していないこと。これ、記述し忘れではなく、不要なのです。裏で走る Task が完走(=処理が完了)したら、自動的に close される仕組みになっています。close し忘れる心配なし!

要するに、Deprecated となった Task Iteration の代替策を用意するだけでなく、それをなるべく簡単に記述できるようにして、ユーザに移行を促しているわけですね。
Julia v0.6.x をお使いの型は、今すぐ Task Iteration から Channel Iteration に乗り換えましょう!

お題2:早いもの勝ち

"julialang" というキーワードで、ネットで検索することを考えます。
検索エンジンはいくつかありますが、それら複数にリクエストを投げてそのうち「一番最初に返ってきた結果」のみ採用する、というシチュエーションを考えてみましょう。

↓こんな感じのコードで実現できます(要:外部パッケージ Requests5 ):

search_race.jl
# Pkg.add("Requests")
using Requests

function do_search(results::Channel{Tuple{String, Float64, HttpCommon.Response}}, engine::String, url::String)
    res, exec_time = @timed get(url)
    if isopen(results)
        put!(results, (engine, exec_time, res))
    end
end

function main()
    results = Channel{Tuple{String, Float64, HttpCommon.Response}}(3)

    searches = [
        "Yahoo"=>"https://search.yahoo.co.jp/search?p=julialang", 
        "Google"=>"https://www.google.co.jp/search?q=julialang", 
        "Bing"=>"https://www.bing.com/search?q=julialang"]
    for (engine, url) = searches
        @schedule do_search(results, engine, url)
    end

    res0 = take!(results)
    close(results)

    println(res0...)
end

main()
# => Google0.464624012Response(200 OK, 16 headers, 50836 bytes in body)
# => Bing0.400164612Response(200 OK, 13 headers, 93005 bytes in body)
# ※実行する度に変わる

実行例:

julia> main()
Google0.618542127Response(200 OK, 16 headers, 50820 bytes in body)

julia> main()
Google0.437615018Response(200 OK, 16 headers, 50854 bytes in body)

julia> main()
Yahoo0.524059646Response(200 OK, 14 headers, 9101 bytes in body)

julia> main()
Google0.466880201Response(200 OK, 16 headers, 50847 bytes in body)

julia> main()
Bing0.39620631Response(200 OK, 13 headers, 90889 bytes in body)

環境によって結果は変わってきますが、手元の環境だとだいたい0.5秒前後で結果が返ってきました。
あと、Google が割と優秀ですが、たまに Yahoo や Bing の方が先に結果が帰って来ることがあるみたいですね。

解説

Channel のできること・使い途その2。「複数 Task で並行(並列)処理した結果のやり取り」。

do_search() 関数は、実際にリクエストを発行する処理をします。またレスポンスが返ってくるまでにどれだけ時間がかかったかも測定しています(res, exec_time = @timed get(url))。
そしてその結果を channel に追加して終了します。追加する前に isopen(channel) というチェックを入れて、「channel がすでに close していたら処理をしない」ようにしています(これがないと、2番目以降に返ってきたものに対して「Channel はすでにクローズされているよ」という旨のエラーが出ます)。

それをまた @scheduleTask にラップして呼んでいます。つまり別タスクで(並行して)リクエストが発行されます。

その後 take!() 関数で、その最初の結果を取得します。これはすぐに結果が返ってくるとは限りません。まだ Channel results が空ならば(=3つの検索リクエストのいずれも結果が返ってきていなければ)、返ってくるまで wait が発生します。そして取得できたら、Channel resultsclose して結果を出力しています。先ほども言ったように、2番目・3番目の結果は無視されます。

この場合は厳密には「並列 (Parallel) 処理」ではありません(たぶん)ですが、結果として、少なくとも「Yahoo 検索」「Google 検索」「Bing 検索」の3つのリクエストが実質上 同時に 発行されて、一番早く結果が返ってきたものだけ が処理されていることは分かると思います。
このように、複数の処理が(少なくとも論理的に)同時に走り、結果やイベント発生などのタイミングが順不同(任意)となるような処理のことを 並行 (Concurrent) 処理 と呼びます6

そして Channel は、そのような各 Task が(非同期に)並行処理されている中で、「最初の結果が返ってきたら~」と言ったような(同期を取った)処理を可能にしているのです。
これを Channel の代わりに普通の配列でやろうとした場合を考えてみてください。複数 Task 間でメモリ空間が共有されていれば、どのタスクからも配列に値の追加等はきっとできるでしょう、けれども「いつどのタスクが値を入れたのか」は分かりません。中身を覗いて見てもまだどのタスクも結果を格納していないかもしれません。それを「値が格納されるまで待つ」という動作もしてくれる、それが Channel同期処理 です。

ちなみにここに挙げた main() 関数を以下のように書き換えると、結果は全て返ってきますが、その出力が(かかった時間に応じて)時間差で順次表示されます。

search_race_main2.jl
function main2()
    results = Channel{Tuple{String, Float64, HttpCommon.Response}}(3)

    searches = [
        "Yahoo"=>"https://search.yahoo.co.jp/search?p=julialang", 
        "Google"=>"https://www.google.co.jp/search?q=julialang", 
        "Bing"=>"https://www.bing.com/search?q=julialang"]
    for (engine, url) = searches
        @schedule do_search(results, engine, url)
    end

    res1 = take!(results)
    println(res1...)

    res2 = take!(results)
    println(res2...)

    res3 = take!(results)
    println(res3...)

    close(results)
end
julia> main2()
Google0.498960819Response(200 OK, 16 headers, 50815 bytes in body)
Bing0.612970655Response(200 OK, 13 headers, 90978 bytes in body)
Yahoo1.447274849Response(200 OK, 14 headers, 9105 bytes in body)

応用(案)

そもそも、先の Request 処理がなぜ並行に動くのかというと。
一言で言えば、ネットワーク処理だから。リクエストを投げた後、レスポンスが帰ってくるまでの時間、CPUは遊んでいる(もし他に何も処理がなければ完全なidle状態な)わけです。そのすきに別のリクエストを投げたり、別の処理を走らせることは理論的に可能。Julia の場合、それが『別 Task』なら実際に実現できてしまう、ということです7

これは I/O 制御時も同様。ファイルを読み込む処理自体を並行処理にしておけば、裏 Task で少しずつファイルを読みながら、本 Task(メインルーチン等)で別の作業ができます8
これを利用すれば、大量または大容量のファイルから少しずつ内容を読み取りながら少しずつ提供する「バッチプロデューサ」が実現できそうですね。ただこの場合、おそらく『マルチプロセス』にしないと実際には思ったほどのパフォーマンスが出ないかも。Julia ならそれも実現できる(その時は Channel ではなく RemoteChannel というものを利用することになります)んですけれど、それはまた別のお話。またいつか時間があるときにでも、実用例と共に示せたらと思います。

参考


  1. Channel そのものは v0.4.x からある機能で、v0.5用のコードはある程度動作する模様です(完全には確認していません)。 

  2. この機能は Julia としては後発で、他の言語、例えば go には言語機能として channel が存在します(ちなみに Julia の Task にあたるのが goroutine ですね)(というかそもそも go からの影響で導入されたモノかもしれない)。 

  3. ちなみに Channel でも、バッファサイズ(Channel 生成時の引数で指定)を 0 にする(Unbufferd Channel と言います)と、produce()/consume() を利用した Task の「タスク切り替え時値受け渡し」と同様な動作になります(同等ではない)。 

  4. 経緯等の詳細は GitHub のプルリク deprecate produce, consume and task iteration #19841 を参照。要するに「Channel がある今、put!/take! の方が軽いしこっちで事足りるよね!」てことですかね。 

  5. コードの1行目にコメントアウトしてありますが、このコードを実行するには REPL 上であらかじめ Pkg.add("Requests") を実行して Requests パッケージをインストールしておく必要があります。 

  6. 言葉の定義が文献によって曖昧だったりするのですが、私は「並行=論理的(きちんと実装されていれば本当に同時に処理が走ることもある)」「並列=物理的(本当に同時に処理が走る)」という使い分け(つまり「並行 (concurrent) の方がより広い概念」)を支持します。 

  7. 詳しく言うと、ある Task で Request を投げてから Response が帰ってくるまでの間、その Task は wait 状態になり、その間に別 Task に処理が切り替わる、という仕組みですね。 

  8. こちらも同様、ファイルを read している時の wait 状態(I/O wait)の時に別 Task の処理を実行できる、という仕組み。 

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした