15
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AsyncとasyncとAsync - F#の非同期な事情

Posted at

F#はMicrosoftのDon Syme氏が開発したプログラミング言語で、命令型、オブジェクト指向、関数型の要素を併せ持つマルチパラダイム言語と位置付けられています。実装はオープンソースのMITライセンスのもと、githubのリポジトリで公開されています。

実行環境は.NET Frameworkおよび.NET CoreやMonoといったマルチプラットフォームに対応していますが、サポート状況については各プラットフォームごとの情報を参照してください(「F# の概要」など)。

なお、ここで対象とするF#のバージョンは4.1(FSharp.Compiler.Tools 10.0.1)とします。

非同期な事情

非同期処理というのは、実行中の処理とは別に新しく実行される処理で、それぞれが(見かけ上)同時に動作するもののことです。.NETでは、このような処理を行う方法がいくつか存在しています。それぞれがどのようなものか確認していきましょう。

Asyncメソッド(※勝手に命名)

.NETでは、メソッド名にAsyncがつけられたものがたくさんあります。ここではこれらを__Asyncメソッド__と呼ぶことにします。このAsyncメソッドは非同期で実行されることを表しているのですが、その方法はさまざまです。

イベント処理でAsync

たとえばWebClientクラスには、非同期でWebからの読み込みを行うDownloadStringAsyncというメソッドがあります。これは本来のプログラムの処理と並行してWebからの読み込みを行い、DownloadStringCompletedイベントが発火したときに読み込まれた文字列を利用した処理をできるようにするものです。

このメソッドでATOMフィードを読み込むには、先にDownloadStringCompletedのイベント処理を設定し、そのあとDownloadStringAsyncを実行します。読み込まれたフィードはイベント処理となる関数の引数(DownloadStringCompletedEventArgsクラス)に存在するResultプロパティにあります。

WebClientでATOMフィードを読み込む(イベント処理)
// ATOMフィードを読み込むサイト
let uri = new System.Uri("https://qiita.com/mokuba/feed")
// ATOMフィードを読み込むWebClientインスタンス
let client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
// ATOMフィードか読み込まれたら表示する
client.DownloadStringCompleted.Add(fun args -> printfn "%s" args.Result)
// ATOMフィードの読み込み開始
client.DownloadStringAsync(uri)
printfn "Downloading ..."

上のプログラムでは、ATOMフィードが読み込まれた後の処理が先に設定され、そのあとに読み込み(サーバーへのリクエスト)が開始されます。そして"Downloading ..."が表示された後に読み込まれたフィードが表示されます。

このように、イベント処理の場合は先にイベント発火後の処理を設定し、そのあとにイベントが発火する可能性のある処理が開始されます。そしてその処理が開始された後も、もともと実行していた処理("Downloading ..."の表示など)が継続して実行されます。

TaskでAsync

非同期処理にはTaskクラスによるものもあります。WebClientクラスのDownloadStringTaskAsyncメソッドは戻り値がTask<string>オブジェクトです。これはstringを戻り値とする非同期処理を表します。メソッドが実行された時点で非同期処理は開始されています。結果を得られた時の処理はContinueWithメソッドでを設定し、結果はResultプロパティで得られます。

WebClientでATOMフィードを読み込む(Task)
let uri     = new System.Uri("https://qiita.com/mokuba/feed")
let client  = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
// ATOMフィードを読み込むTask<string>オブジェクト(Taskは開始済み)
let task    = client.DownloadStringTaskAsync(uri)
// 結果を得られた時の処理
task.ContinueWith(System.Action<System.Threading.Tasks.Task<string>>(fun t -> printfn "%s" t.Result))
printfn "Downloading ..."

上のプログラムでは一番下の"Downloading ..."が表示されてからContinueWithメソッドで設定した処理が実行されます。このような処理の流れはイベント処理とよく似ています。

型拡張されたAsyncメソッド

Asyncメソッドには、F#の型拡張によって独自に定義されたメソッドがあります。これらも他のAsyncメソッドと同様に実行できます。

WebClient.AsyncDownloadString : System.Uri -> Async<string>

Webサイトからのダウンロードを非同期処理で行います。WebClientにはもともとDownloadStringAsyncというメソッドがありますが、こちらはDownloadStringCompletedイベントでダウンロード後の処理を行うものです。
AsyncDownloadStringメソッドは戻り値がAsync<string>なので、async { ... }内で使いやすくなっています。

WebClient.AsyncDownloadStringによるATOMフィードのダウンロード
async {
  let uri = new System.Uri("https://qiita.com/mokuba/feed")
  use wc  = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  return! wc.AsyncDownloadString(uri)
} |> Async.RunSynchronously

WebRequest.AsyncGetResponse : unit -> Async<System.Net.WebResponse>

こちらはWebサイトからのレスポンスを表すSystem.Net.WebResponseを得る処理を非同期で行うメソッドです。戻り値がAsync<System.Net.WebResponse>であり、async { .. }内で使いやすくなっています。もともとSystem.Net.WebRequestクラスにGetResponseAsyncというメソッドがありますが、こちらは戻り値がTask<System.Net.WebResponse>です。

WebRequest.AsyncGetResponseによるATOMフィードのダウンロード
async {
  let req  = System.Net.HttpWebRequest.Create(
               new System.Uri("https://qiita.com/mokuba/feed"))
  use! res = req.AsyncGetResponse()
  use  sr  = new System.IO.StreamReader(res.GetResponseStream())
  return sr.ReadToEnd()
} |> Async.RunSynchronously

Stream.AsyncRead

Streamから非同期で読み込みを行います。このメソッドはbyte [] * ?int * ?int -> Async<int>int -> Async<byte []>という2つの型があります。前者はあらかじめ用意したbyte[]に読み込めた長さが得られます。読み込みが完了すると値は0になります。

第2引数と第3引数はデータをbyte[]に書き込む際の始点と長さを表します。これらは省略すると始点が0、長さがbyte[]の長さとなります。後者は引数で指定した長さだけ読み込んだbyte []を得られます。ただし読み込むデータの容量が引数より少ないとSystem.IO.EndOfStreamExceptionという例外が発生します。

Stream.AsyncReadによるATOMフィードのダウンロード
async {
  // フィード読み込み用
  let  buff        = Array.zeroCreate<byte> 4096  // バッファ
  use  content     = new System.IO.MemoryStream()  // レスポンスのエンコーディング用
  // フィードを読み込むリクエスト
  let  req         = System.Net.WebRequest.Create(
                       new System.Uri("https://qiita.com/mokuba/feed"))
  // レスポンスを待つ
  use! res         = req.AsyncGetResponse()
  use  stream      = res.GetResponseStream()
  // レスポンスの読み込み
  let  mutable len = 1
  while len > 0 do
    // データを読み込んだらlenは0より大きくなる
    len <- Async.RunSynchronously <| stream.AsyncRead buff
    content.Write(buff, 0, len)
  // 読み込んだレスポンスを文字列化して返す
  return System.Text.Encoding.UTF8.GetString <| content.ToArray()
} |> Async.RunSynchronously

Stream.AsyncWrite : byte [] * ?int * ?int -> Async<unit>

Streamに対して非同期で書き込みを行います。戻り値はAsync<unit>なのでdo!で実行できます。第1引数が書き込むデータを持つbyte []、第2引数はbyte[]にデータを書き込む時の始点、第3引数は書き込む長さです。第2引数以降を省略すると始点が0、長さがbyte[]の長さとなります。

Stream.AsyncWriteによるATOMフィードの書き込み
async {
  // フィード読み込み用
  let  buff       = Array.zeroCreate<byte> 4096  // バッファ
  use  content    = new System.IO.MemoryStream()  // レスポンスのエンコーディング用
  // フィードを読み込むリクエスト
  let  req        = System.Net.WebRequest.Create(
                      new System.Uri("https://qiita.com/mokuba/feed"))
  // レスポンスを待つ
  use! res        = req.AsyncGetResponse()
  use  stream     = res.GetResponseStream()
  // レスポンスの読み込み
  let mutable len = 1
  while len > 0 do
    // データを読み込んだらlenは0より大きくなる
    len <- Async.RunSynchronously <| stream.AsyncRead buff
    do! content.AsyncWrite(buff, 0, len)  // do!で処理を行う
  // 読み込んだレスポンスを文字列化して返す
  return System.Text.Encoding.UTF8.GetString <| content.ToArray()
} |> Async.RunSynchronously

asyncブロック

このほかにもF#独自のものとしてasync { .. }で表すasyncブロックもあります。ブロックはAsync<'T>型となり、'T型の値を戻す非同期処理として定義されます。ブロック内の処理を実行するには次項で紹介するAsyncモジュールの関数を使います。

asyncブロックの例
async {
  let uri    = new System.Uri("https://qiita.com/mokuba/feed")
  use client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  let! feed  = client.AsyncDownloadString(uri)
  printfn "%s" feed
} |> Async.Start  // Asyncモジュールの関数(非同期で実行)

asyncブロックでは、その内部だけで使えるlet!, use!, do!, return!というキーワードがあります。これらはそれぞれlet, use, do, returnを非同期処理に対応させたもので、処理の終了を待ち、結果を得られればそれを次の処理で利用できるようにします。

let!

非同期処理で得られた結果を変数に設定します。

use!によるリソース確保の例
async {
  // ...
  // フィードを非同期でダウンロード
  let! feed  = client.AsyncDownloadString(uri)
  // ...
}

use!

非同期処理で確保されたリソースを設定します。

use!によるリソース確保の例
async {
  // ...
  let  req = System.Net.WebRequest.Create(
               new System.Uri("https://qiita.com/mokuba/feed"))
  // 非同期でレスポンスを得る
  use! res = req.AsyncGetResponse()
  // ...
}

do!

非同期処理を実行し、それが終了してから次の処理を進めます。

do!による処理の例
async {
  let  buff    = Array.zeroCreate<byte> 4096
  use  content = new System.IO.MemoryStream()
  // ...
  use  stream  = res.GetResponseStream()
  let! len     = stream.AsyncRead buff
  // ストリームに非同期で書き込み
  do! content.AsyncWrite(buff, 0, len)
  // ...
}

return!

非同期処理の結果をasyncブロックの戻り値とします。戻り値の型がAsync<'T>型の'Tになります。

return!による戻り値の例
async {
  use client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  // ダウンロードした結果をasyncブロックの戻り値とする
  return! client.AsyncDownloadString(new System.Uri("https://qiita.com/mokuba/feed"))
} |> Async.RunSynchronously |> printfn "%s"  // ダウンロードしたフィードを表示

Asyncモジュールの関数

asyncブロックの処理やAsyncメソッドを実行するために使われるのがAsyncモジュールの関数です。これらはF#独自で定義されているものです。ここではそのいくつかを紹介します。

Async.Start: Async<unit> * ?CancellationToken -> unit

戻り値のないasyncブロック(Async<unit>)をThreadPoolにより非同期で実行します。第2引数(省略可)にCancellationTokenを設定すると、CancellationTokenSourceCancelメソッドメソッドで処理をキャンセルできます。キャンセルされるとSystem.OperationCanceledExceptionが発生します。

Async.Startによる非同期処理の実行
// CancellationTokenでキャンセル可能に
let cts = new CancellationTokenSource()
Async.Start(async { ... }, cts.Token)
// キャンセルさせずに実行
async { ... } |> Async.Start

Async.StartImmediate : Async<unit> * ?CancellationToken -> unit

戻り値のないasyncブロックをすぐ非同期で実行します。第2引数(省略可)にはCancellationTokenも設定できます。

Async.Startによる非同期処理の実行
// CancellationTokenでキャンセル可能に
let cts = new CancellationTokenSource()
Async.StartImmediate(async { ... }, cts.Token)
// キャンセルさせずに実行
async { ... } |> Async.StartImmediate

Async.StartChild : Async<'T> * ?int -> Async<Async<'T>>

ある非同期処理内でasync { ... }の子ブロックを定義します。1回目のlet!で子ブロックを定義し、2回目のlet!で子ブロックが実行されます。親の非同期処理がキャンセルされると子ブロックの処理もキャンセルされます。第2引数(省略可)にはタイムアウト(ミリ秒)を設定できます。

Async.StartChildによる子ブロックの処理
// 非同期処理のキャンセル用
let cts = new System.Threading.CancellationTokenSource()
// 実行される非同期処理
let work = async {
  // キャンセルされたときの処理
  use! cancel = Async.OnCancel(fun() ->
                  System.Console.WriteLine("Cancel"))
  let rec fib n = if n > 2L then fib(n - 1L) + fib(n - 2L) else 1L
  for n in 10L..5L..100L do
    // 新たに開始される非同期処理(一緒にキャンセルされる)
    let! child  = Async.StartChild <| async { return fib n }
    let! result = child
    System.Console.WriteLine("fib {0} = {1}", n, result)
}
// 非同期処理を開始
Async.Start(work, cts.Token)
System.Threading.Thread.Sleep 300
// 非同期処理をキャンセル
cts.Cancel()

(* 結果
fib 10 = 55
fib 15 = 610
fib 20 = 6765
fib 25 = 75025
fib 30 = 832040
fib 35 = 9227465
Cancel
*)

Async.RunSynchronously: Async<'T> * ?int * ?CancellationToken -> 'T

戻り値のあるasyncブロックを実行し、終了を待って結果を得ます。結果が得られるまでは次の処理に進みません。引数はasyncブロックのほか、第2引数以降(省略可)にはタイムアウト(ミリ秒)、CancellationTokenも設定できます。タイムアウトになるとSystem.TimeoutExceptionが発生します。

Async.RunSynchronouslyによる非同期処理の実行
// タイムアウトなし
async {
  use client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  return! client.AsyncDownloadString(new System.Uri("https://qiita.com/mokuba/feed"))
} |> Async.RunSynchronously
// タイムアウトあり
Async.RunSynchronously (async {
  use client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  return! client.AsyncDownloadString(new System.Uri("https://qiita.com/mokuba/feed"))
}, 8000)

Async.AwaitEvent : IEvent<'Del,'T> * ?(unit -> unit) -> Async<'T>

イベント処理を非同期処理(Async<'T>型)に変換し、イベントが発火したらイベントハンドラの引数(~Args型)を得られるようにします。第2引数(省略可)はキャンセル時に実行する関数です。

Async.AwaitEventによるイベント処理
async {
  // ATOMフィードをダウンロード
  let client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  client.DownloadStringAsync(new System.Uri("https://qiita.com/mokuba/feed"))
  // DownloadStringCompletedイベントの発火を待って結果(args)を得る
  let! args  = Async.AwaitEvent <| client.DownloadStringCompleted
  return args.Result  // ダウンロードしたフィード
} |> Async.RunSynchronously |> printfn "%s"  // フィードを表示

Async.AwaitTask : Task<'T> -> Async<'T>

Task<'T>Async<'T>に変換し、asyncブロックやAsyncモジュールの関数で処理できるようにします。

Async.AwaitTaskによるTask<'T>からAsync<'T>への変換
async {
  // ATOMフィードをダウンロード
  let uri    = new System.Uri("https://qiita.com/mokuba/feed")
  use client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  // ダウンロードした内容をasyncブロックの戻り値とする
  return! Async.AwaitTask <| client.DownloadStringTaskAsync(uri)
} |> Async.RunSynchronously |> printfn "%s"  // フィードを表示

Async.AwaitIAsyncResult : IAsyncResult * ?int -> Async<bool>

.NETの非同期プログラミング モデル (APM)で使われるBegin~End~の両メソッドを仲介するIAsyncResultの処理が終わるまで待ちます。これによりBegin~メソッドの次にEnd~メソッドを実行する処理を実現できます。第2引数(省略可)にはタイムアウト(ミリ秒)を設定できます。戻り値は処理が完了したらtrueとなります。

Async.AwaitIAsyncResultによる非同期処理
open System.Net
open System.IO

async {
  // ATOMフィードにアクセス
  let  req    = WebRequest.Create("https://qiita.com/mokuba/feed")
  let  ar     = req.BeginGetResponse(null, null)
  // ar.IsCompletedの値がfalseからtrueになるまで待つ
  let! _      = Async.AwaitIAsyncResult ar
  use  stream = req.EndGetResponse(ar).GetResponseStream()
  // フィードのレスポンスを得る
  use  res    = new StreamReader(stream)
  return res.ReadToEnd()
} |> Async.RunSynchronously |> printfn "%s"  // フィードを表示

Async.FromBeginEnd : (AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * ?(unit -> unit) -> Async<'T> など複数

.NETの非同期プログラミング モデル (APM)で使われるBeginRead, EndReadなど対になるBegin~, End~メソッドを1つの非同期処理として扱います。Async.FromBeginEndの引数の数はBegin~メソッドの引数の数に対応しています。

Async.FromBeginEndによる非同期処理
async {
  // ATOMフィードにアクセス
  let req    = WebRequest.Create("https://qiita.com/mokuba/feed")
  // BeginGetResponse, EndGetResponseを1つの非同期処理として実行(WebResponseを得る)
  let! wr    = Async.FromBeginEnd(req.BeginGetResponse, req.EndGetResponse)
  use stream = wr.GetResponseStream()
  use res    = new StreamReader(stream)
  return res.ReadToEnd()  // フィードをダウンロード
} |> Async.RunSynchronously |> printfn "%s"  // フィードを表示

Async.AwaitWaitHandle : WaitHandle * ?int -> Async<bool>

WaitHandleによる非同期処理を実現します。.NETの標準APIでは、WaitHandleを継承するSemaphore, Mutex, EventWaitHandleとこれを継承するAutoResetEvent, ManualResetEventが対応します。第2引数(省略可)にはタイムアウト(ミリ秒)を設定できます。

Async.AwaitWaitHandleとSemaphoreによる並行処理の制御
// 同時にダウンロードできるフィードの数をSemaphoreで制御
let smp = new System.Threading.Semaphore(0, 3)
smp.Release(3)
// ダウンロードするフィードのタグ
let tags = ["fsharp"; ".net"; "asp.net"; "xamarin"; "azure"]
tags
|> List.map(fun word -> async {
     let uri   = new System.Uri(sprintf "https://qiita.com/tags/%s/feed" word)
     use wc    = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
     // セマフォ待ち
     let! _    = Async.AwaitWaitHandle smp
     stdout.WriteLine("タグ「{0}」のフィードをダウンロード中", word)
     // フィードのダウンロード
     let! feed = wc.AsyncDownloadString(uri)
     // セマフォ解放
     let _     = smp.Release()
     return feed
   })
|> Async.Parallel  // 並列処理を可能にする
|> Async.RunSynchronously  // 処理を実行(結果はフィードのリスト)

Async.CancelDefaultToken : unit -> unit

CancellationTokenが設定されずに実行された非同期処理をキャンセルします。キャンセルされた処理ではSystem.OperationCanceledExceptionが発生します。このとき使われるCancellationTokenAsync.DefaultCancellationTokenもしくはAsync.CancellationTokenで得られます。

Async.CancelDefaultTokenによる非同期処理のキャンセル
// 時間のかかる(かも?)処理
let rec fib n = if n > 2L then fib(n - 1L) + fib(n - 2L) else 1L
async {
  fib 50L |> printfn "%d"
} |> Async.Start  // デフォルトのCancellationToken(DefaultToken)を使用
// 途中でキャンセル
async {
  do! Async.Sleep 2000  // 2秒待つ
  Async.CancelDefaultToken()  // 非同期処理のキャンセル
} |> Async.Start

Async.OnCancel : (unit -> unit) -> Async<IDisposable>

非同期処理がキャンセルされた後の処理を設定します。これにより、キャンセルされた処理ごとにそれぞれの後処理を設定できます。キャンセルされる前に実行が開始された処理は継続し、その処理が終わったところで完全にキャンセルされます。

Async.OnCancelによる非同期処理のキャンセル
// 実行される非同期処理
let work = async {
  // キャンセルされたときの処理
  use! cancel    = Async.OnCancel(fun() ->
                     stdout.WriteLine("Cancel"))
  // 時間がかかりそうな処理
  let  rec fib n = if n > 2L then fib(n - 1L) + fib(n - 2L) else 1L
  for n in 10L..5L..100L do
    stdout.WriteLine("fib {0} = {1}", n, fib n)  // fib nをStartChildで実行するとこちらも途中でキャンセルされる
}
// 非同期処理を開始
Async.Start work
System.Threading.Thread.Sleep 300
// 非同期処理をキャンセル
Async.CancelDefaultToken()

(* 結果
fib 10 = 55
fib 15 = 610
fib 20 = 6765
fib 25 = 75025
fib 30 = 832040
fib 35 = 9227465
Cancel
fib 40 = 102334155  // キャンセルされる前に開始されていた処理
*)

Async.TryCancalled : Async<'T> * (OperationCanceledException -> unit) -> Async<'T>

第1引数の非同期処理がキャンセルされたときに第2引数の処理を行うよう設定します。処理がキャンセル済みかどうかはCancellationToken.IsCancellationRequestedプロパティの値(bool)で判断できます。

Async.TryCancelledにキャンセル後の処理を設定
open System
open System.Net
open System.Text

Async.TryCancelled(
  async {
    use  client = new WebClient(Encoding = Encoding.UTF8)
    let  uri    = new Uri("https://qiita.com/mokuba/feed")
    let! feed   = client.AsyncDownloadString(uri)
    let  token  = Async.DefaultCancellationToken
    if (token.IsCancellationRequested) then ()  // キャンセルされたか
    else printfn "%s" feed
  }, (fun ex -> eprintfn "%s" ex.Message))  // キャンセル後の処理
|> Async.Start
System.Threading.Thread.Sleep 300
Async.CancelDefaultToken()

Async.Catch : Async<'T> -> Async<Choice<'T, exn>>

asyncブロックを実行した結果に応じた処理を行うよう設定します。結果を表す型はChoice<'T1, 'T2>です。これはアクティブパターンで使われる型と同じで、Choice1Of2 of 'T1Choice2Of2 of 'T2という2つの項目を持つ判別共用体です。前者は例外が発生せずに'T1型の結果を得られたことを表し、後者は'T2型の例外が発生して結果を得られなかったことを表します。これらを元にmatchで処理を振り分けていきます。

Async.Catchによる処理の振り分け
open System
open System.Net
open System.Text

// フィードを受信(結果はChoice型)
let result = async {
               use client = new WebClient(Encoding = Encoding.UTF8)
               let uri    = new Uri("https://qiita.com/mokuba/feed")
               return! client.DownloadStringTaskAsync(uri) |> Async.AwaitTask
             } |> Async.Catch |> Async.RunSynchronously

match result with  // 結果に応じた処理の振り分け
| Choice1Of2 feed -> printfn  "%s" feed        // 正常に結果を得られたとき
| Choice2Of2 ex   -> eprintfn "%s" ex.Message  // 例外が発生したとき

Async.StartWithContinuations : Async<'T> * ('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) * ?CancellationToken -> unit

第1引数の処理の結果に応じて第2引数以降のどの処理を行うかが決まります。引数には実行する非同期処理、結果を得られたときの処理、例外が発生したときの処理、非同期処理をキャンセルされた時の処理が設定されます。

Async.StartWithContinuationsによる処理の振り分け
Async.StartWithContinuations (async {
  use client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  // raise <| new System.Exception("例外発生!")
  // Async.CancelDefaultToken()
  return! client.AsyncDownloadString(new System.Uri("https://qiita.com/mokuba/feed"))
}, (fun feed -> printfn "%s" feed)  // 正常に結果を得られたときの処理(フィードの表示)
,  (fun ex   -> eprintfn "ex %s" ex.Message)  // 例外が発生したときの処理(ex 例外発生!)
,  (fun ocex -> eprintfn "ocex %s" ocex.Message))  // キャンセルされたときの処理(ocex 操作は取り消されました。)

Async.FromContinuations : (('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>

直前までの結果に基づいて、どの処理を行うかを振り分けます。引数の関数には、正常処理(引き渡たされた結果の処理)、例外処理、キャンセル後の処理を行う関数が自動的に設定されます。

Async.FromContinuationsによる処理の振り分け
async {
  // ATOMフィードをダウンロード
  let uri    = new System.Uri("https://qiita.com/mokuba/feed")
  use client = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
  // 処理の結果に応じて振り分け
  return! Async.FromContinuations (fun (rf, ef, cf) ->
  // 引数
  // rf: 正常終了時に実行する関数
  // ef: 例外発生時に実行する関数
  // cf: キャンセルされたときに実行する関数
    try
      let feed = client.DownloadString(uri)
      // raise <| new System.OperationCanceledException("キャンセル!")
      // raise <| new System.Exception("例外発生!")
      rf(feed)  // 正常に終了し、戻り値を得る
    with
    | :? System.OperationCanceledException as ocex ->
           eprintfn "cf(ocex)"
           cf(ocex)  // キャンセルされたとき
    | ex ->
           eprintfn "ef(ex)"
           ef(ex)  // 例外が発生したとき
  )
} |> Async.RunSynchronously

Async.Parallel : seq<Async<'T>> -> Async<'T []>

複数の非同期処理を一斉に実行できるようにします。これを実行しただけでは処理は開始されません。処理を開始するにはAsync.RunSynchronouslyなど処理を開始するメソッドを実行してください。

引数はseq[Async<'T>]で非同期処理のコレクションですが、戻り値がAsync<'T []>つまり'Tの配列となりますので、型の違いに注意してください。また、多くの処理を同時に実行するとマシンやネットワークなどに大きな負荷がかかる場合がありますので、それらを踏まえた上で処理の内容を検討してください。

Async.Parallelによる並列処理
// フィードごとのタグ
let tags = ["fsharp"; ".net"; "asp.net"; "xamarin"; "azure"]
// タグごとのフィードを同時にダウンロード
tags
|> List.map(fun word -> async {
    // Async.Parallelで一斉に実行できるようasync { .. }で非同期処理とする
     let uri = new System.Uri(sprintf "https://qiita.com/tags/%s/feed" word)
     use wc  = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
     return! wc.AsyncDownloadString(uri)
   })
|> Async.Parallel  // フィードを一斉にダウンロードできるようにする
|> Async.RunSynchronously  // ダウンロードした結果はフィード(文字列)の配列

lock : ('a -> (unit -> 'b) -> 'b) when 'a : not struct

lockは特定のオブジェクトを排他的に処理する関数です。複数の並列処理で共有するデータの更新など、同時に複数からのアクセスを避けたい処理を実行するときに利用します。

lockによる排他制御
let tags          = ["fsharp"; ".net"; "asp.net"; "xamarin"; "azure"]
let lockObj       = obj()    // lock用オブジェクト(ダウンロード数のカウント処理)
let mutable count = 0  // ダウンロードしたフィードの数
tags
|> List.map(fun word -> async {
     // ATOMフィードのダウンロード
     let  uri  = new System.Uri(sprintf "https://qiita.com/tags/%s/feed" word)
     use  wc   = new System.Net.WebClient(Encoding = System.Text.Encoding.UTF8)
     let! feed = wc.AsyncDownloadString(uri)
     // ダウンロードできたフィードの数を表示(lock)
     lock lockObj (fun() ->
       count <- count + 1
       System.Console.WriteLine("{0} / {1} ダウンロード完了",
                                count, List.length tags))
     // ダウンロードしたフィード
     return feed
   })
|> Async.Parallel
|> Async.RunSynchronously

ここまでご覧いただいた方々の参考になればと思います。

15
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?