1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Ratpack入門 (6) - Promise

Last updated at Posted at 2018-02-04

Ratpack入門シリーズ

  1. Ratpack入門 (1) - Ratpackとは
  2. Ratpack入門 (2) - アーキテクチャー
  3. Ratpack入門 (3) - hello world 詳解
  4. Ratpack入門 (4) - ルーティング & 静的コンテンツ
  5. Ratpack入門 (5) - Json & Registry
  6. Ratpack入門 (6) - Promise
  7. Ratpack入門 (7) - Guice & Spring
  8. Ratpack入門 (8) - セッション
  9. Ratpack入門 (9) - Thymeleaf

Promise

Ratpackは非ブロッキング・イベント駆動のライブラリーですので、各処理も非同期で書かれることが前提になります。Javaの非同期処理を下手に書くと大変なことになるのは、Java経験者であればよく知っていると思います。Ratpackは非同期処理を簡潔に記述するために、Promiseクラスを提供しています。イメージとしてはJavaScriptのPromiseに似ており、then()で処理が完了したときのコールバックを記述することができます。

Promiseを作る

IO処理はもっとも典型的なブロッキングな操作だと思います。Blockingユーティリティークラスを使用することで、簡単にPromiseを作ることができます。

chain.all( ctx -> {

    String query = ctx.getRequest().getQueryParams().get( "id" );

    Promise<String> result = Blocking.get( () -> {
        return Database.find( query );
    } );

    ctx.render( result );
} );

ここでDatabase.find()を架空のデータベースからデータを探す処理だと考えてください。Blocking.get()は、引数のクロージャーを非同期に実行し、その戻り値をPromiseにラップします。Context.render()には、Promiseを渡すこともできます。

戻り値のない操作にはop()を使います。OperationクラスはRatpackにおける戻り値のないPromiseです。

Blocking.op( () -> {
    String data = ctx.getRequest().getQueryParams().get( "data" );
    Database.persist( data );
} ).then( () -> {
    ctx.render( "OK" );
} );

まず、Blocking.op()内で架空データベースに情報を保存します。op()メソッドはこの処理のOperationを返します。続いてthen()でデータベースにデータを保存した後の処理を記述します。Context.render()を呼んで、OKというレスポンスを作成しています。

Promise.sync()

ファクトリーからPromiseを作成します。

Promise<String> result = Promise.sync( () -> "data" );
result.then( data -> {
    ctx.render( "OK" );
} );

Promise.async()

他の非同期ライブラリーと連携するときのために、Promise.async()スタティックファクトリーが用意されています。

Promise<String> result = Promise.async( downstream -> {
    downstream.success( "data" );
} );
result.then( data -> {
    ctx.render( "OK" );
} );

success()メソッドを呼ぶことで、処理が終了したことを伝えます。Promise.async()自体は、引数の処理を非同期に実行するわけではないことに注意してください。あくまで自身で(あるいはライブラリーで)非同期処理を記述する必要があります(そのため、公式の例ではThreadを作成してsuccess()を呼んでいます)。

Promiseの操作

then

Promiseの処理が完了したときに呼ばれるコールバックを指定します。コールバック内でContext.render()を呼びレスポンスを作成するのが、最もよくある処理だと思います。

注意すべきなのが、then()はコールバックが実行されることをアプリケーションに登録し、順次実行することです。次のコードを考えてください。

@Data class Obj {
    public int a;
    public int b;
    public int c;
}
Obj o = new Obj();
Promise.value( 1 ).then( o::setA );
Promise.value( 2 ).then( o::setB );
Promise.value( 3 ).then( o::setC );
Operation.of( () -> ctx.render( o.toString() ) ).then();

Promiseは非同期処理を表すので、一見するとo.toString()呼び出し時のoのフィールドはタイミングに依存してしまうように見えるかもしれません。しかしthen()の呼び出しは、Ratpackが登録順に順次実行することを保証しているので、o.toString()の値は常にObj(a=1, b=2, c=3)となります。とはいえこの動作は非直感的で分かりづらい動作なので、あまり使用はしないほうが良いと思います。

map

指定の関数を、Promiseの結果に適応したPromiseを作成します。ストリームなどのmapと同じですね。

String result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .map( String::toUpperCase );
} ).getValue();

assertThat( result ).isEqualTo( "HOGE" );

blockingMap

mapとほぼ同じですが、実行をブロッキング処理用のスレッドで行います。map内の処理をBlocking.get()などでラップするイメージです。派生メソッドblockingOpがあります。

flatMap

Promiseの結果を、指定した関数が返すPromiseで置き換えます。RatpackにはデフォルトでPromiseを返す処理が多いので、使用頻度は意外と高めです。

String result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .flatMap( v -> {
                      assertThat( v ).isEqualTo( "hoge" );
                      return Promise.value( "piyo" );
                  } );
} ).getValue();

assertThat( result ).isEqualTo( "piyo" );

mapIf

指定されたPredicateが正になる場合のみ、マップ関数を適用します。

mapError flatMapError

例外が発生した場合、その例外を引数とするマップ関数を適用した結果を返します。正常終了した場合とエラーが発生した場合の分岐を流暢に書けます。

String result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .mapIf( s -> true, s -> { throw new RuntimeException();} )
                  .mapError( t -> "piyo" );
} ).getValue();

assertThat( result ).isEqualTo( "piyo" );

apply

呼び元のPromise自身を受け取り、Promiseを返す関数を引数にとります。いまいち使いどころがわかりませんが、処理をメソッドに分割したときの記述を簡潔にすることが目的のようです。

String result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" ).apply( p -> {
        assertThat( p == Promise.value( "hoge" ) ).isTrue();
        return p.map( String::toUpperCase );
    } );
} ).getValue();

assertThat( result ).isEqualTo( "HOGE" );

around

Promiseの計算前後に処理を挿入します。それ自体は便利そうに見えますが、afterの結果をExecResultにラップしてあげなければならないため、無意味に冗長なコードになってしまう残念なメソッドです。

String result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .around(
                          () -> "before",
                          ( before, r ) -> {
                              assertThat( before ).isEqualTo( "before" );
                              assertThat( r.getValue() ).isEqualTo( "hoge" );
                              return ExecResult.of( Result.success( "piyo" ) );
                          }
                  );
} ).getValue();

assertThat( result ).isEqualTo( "piyo" );

replace

Promiseを、別のPromiseで置き換えます。要するに、flatMap()の引数を取らないバージョンです。これも必要性がよくわからないメソッドですね。

String result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .replace( Promise.value( "piyo" ) );
} ).getValue();

assertThat( result ).isEqualTo( "piyo" );

route

Predicatetrueの場合、指定されたコンシューマーを実行します。JavaDocを見る限り、データのバリデーションなどに使うことを意図しているようです……が、使い勝手はあまりよくない気がします。

ExecResult<String> result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .route( s -> false, System.out::println );
} );

assertThat( result.getValue() ).isEqualTo( "hoge" );
assertThat( result.isComplete() ).isFalse();

boolean completed = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .route( s -> true, System.out::println );
} ).isComplete();

assertThat( completed ).isTrue();

to

Promiseをほかの型に変換します。いまいち使いどころがなさそうに見えますが、外部ライブラリーの統合などで使用します。以下はRxRatpackの例です。

List<String> resultHolder = new ArrayList<>();
ExecHarness.runSingle( e -> {
    Promise.value( "hoge" )
           .to( RxRatpack::observe )
           .subscribe( s -> resultHolder.add( s ) );
} );

assertThat( resultHolder ).containsExactly( "hoge" );

next

Promiseの結果を引数に持つコンシューマーを引数に持ちます。戻り値のPromiseはもともとのPromiseと同じ結果を返します。nextOpなどの派生メソッドがあります。

String result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .next( System.out::println );
} ).getValue();

assertThat( result ).isEqualTo( "hoge" );

right left

Promiseを別のPromiseと組み合わせ、PairPromiseとして返します。

Pair<String, String> result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .right( Promise.value( "piyo" ) );
} ).getValue();

assertThat( result.getLeft() ).isEqualTo( "hoge" );
assertThat( result.getRight() ).isEqualTo( "piyo" );

cache

Promiseの結果をキャッシュします。例外が発生した場合、その例外もキャッシュされます。
cacheResultIfなどの派生メソッドが存在します。

onError

エラー発生時の処理を記述します。エラー時のContext.render()を記述するのが主な使い方かと思います。例外のクラスを引数にとるもの、例外のコンシューマーを受け取るもの、Predicateで例外を選択するものなど、複数のパターンの引数を持つことができます。

close

Promiseが完了したり、例外が発生したりした場合、引数に指定されたAutoCloseableをクローズします。いまいち使いどころがわかりません。

retry

処理に失敗した場合、指定時間ののちリトライを試みます。外部APIの呼び出し時などに便利です。

String result = ExecHarness.yieldSingle( e -> {
    return Promise.value( "hoge" )
                  .retry( 3, Duration.ofSeconds( 1 ), ( i, t ) -> System.out.printf( "retry: %d%n", i ) );
} ).getValue();

assertThat( result ).isEqualTo( "hoge" );

time

Promiseの実行にかかった時間を返すコンシューマーを引数にとります。パフォーマンス測定が主な用途でしょうか。

スレッドとforkについて

Ratpackは非同期処理のために、Executionクラスであらわされる単位でPromiseを実行します。通常このExecutionを意識することはありませんが、複数のPromiseを並行で実行するためには、Executionfork()する必要があります。コンビニエンスメソッドとしてPromise.fork()が用意されており、簡単に別スレッドでPromiseを実行することができます。

下記コードはforkのJavaDocにある例を少し変更したものです。

CyclicBarrier b = new CyclicBarrier( 2 );

Pair<String, String> result = ExecHarness.yieldSingle( e -> {
    Promise<String> p1 = Promise.sync( () -> {
        b.await();
        return "hoge";
    } ).fork();
    Promise<String> p2 = Promise.sync( () -> {
        b.await();
        return "piyo";
    } ).fork();
    return p1.right( p2 );
} ).getValue();

assertThat( result.getLeft() ).isEqualTo( "hoge" );
assertThat( result.getRight() ).isEqualTo( "piyo" );

ここで、fork()の呼び出しを削除すると、p1およびp2は同じスレッドで順番に実行されるので、デッドロックが発生してしまいます。fork()で別スレッドを作成していれば、正常に動作します。

まとめ

RatpackのPromiseは、Javaが苦手とする非同期処理のサポートを実現します。とはいえ、癖が強い部分もあり、メソッドの数も多いため、きちんと活用しようとするのは難しい面もあります。あまり賢くやろうとしないのがコツかもしれません。RatpackにはRxJavaをサポートするためのRxRatpackモジュールがあります。使い慣れた非同期ライブラリーがある場合、それらを活用するのも手です。

個人的なおすすめ

  • よく使うのはmapflatMapくらいで、全てのメソッドが実用的なわけではない。
  • ブロッキング操作にはBlockingクラス、もしくはblocking...メソッドを使う。
  • 困ったらthenで頑張ればなんとかなる。
  • RxRatpackモジュールを適用し、RxJavaを活用する。
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?