Help us understand the problem. What is going on with this article?

【Corda】【初心者向け】FlowSessionを理解しよう

はじめに

どーも、のぶこふです。

今回は、CordaにおけるFlowSession(以下、Session)について解説します。
※Corda 4.5 時点の内容です

▼FlowSessionのAPIドキュメント
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-flow-session/index.html

▼FlowSessionのドキュメント
https://docs.corda.net/docs/corda-os/4.5/flow-state-machines.html#flow-sessions

Sessionって、なんぞ?

Sessionは、通信等における開始から終了までを指すと思いますが、Cordaにおいても同様の意味合いを持ちます。
フロー間でメッセージを送ったり、カウンターフローに関する情報を照会するなどに使用されます。

一言で表すと、他ノードとの通信の開始から終了までのこと(下図、赤矢印)を、Sessionを呼びます。
image.png

FlowやState、Vaultといった概念(キーコンセプト)も大事ですが、Sessionを理解することで、Cordaの実装を一段階深められると思います。

Sessionの作り方

Sessionを取得するには、2つの方法があります。

  1. FlowLogic.initiateFlowを呼び出す
  2. InitiatedByフローへのコンストラクタパラメータとして受け取る(initiateFlowに対応するもので、返信に使用される)

image.png

initiateFlowを使用すると、引数に指定したPartyとSessionを作成します。(赤矢印)
そのSessionを使用してメッセージ等を送ると、受け手のノードのInitiatedByが付与されたClassが対応します。(青矢印)

Codeで表してみると、次のようになります。

@InitiatingFlow
@StartableByRPC
class Initiator(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val SessionNodeA = initiateFlow(node) // ---------------- 赤矢印
    }
}

@InitiatedBy(Initiator::class) 
class Responder(val counterPartySession: FlowSession // --------- 青矢印
    ) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
    }
}

通信の仕方①ーSend/Receive

Sessionの取得方法は先の通りですが、これだけでは、使用することが出来ません。
まずは、最もシンプルなメッセージの送受信(Send/Receive)の例を見てみましょう。

▼Send
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-flow-session/send.html
▼Receive
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-flow-session/receive.html

  1. initiateFlowでSessionを作成する
  2. 「1.」で作成したSessionを使用して、メッセージを送る(Send)
  3. 受け手ノードのInitiatedByが付与されたClass内で、メッセージを受け取る(Receive)

image.png

次にCodeでも見てみます。

@InitiatingFlow
@StartableByRPC
class Initiator2(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val session = initiateFlow(node) // ---------------------------- 1.Session作成
        session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
}

@InitiatedBy(Initiator2::class)
class Responder2(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val receive = counterPartySession.receive<String>() // --------- 3.文字列(String)を受け取る
    }
}

【注意点】Send/Receiveの型を合わせる必要があります。
上記の例では、文字列を送り、文字列で受け取っています。
この型が合わないと、エラーが発生するので、注意してください。
イメージはJavaで言うところのメソッドの引数の型を合わせる。と言った感じです。OverloadはNGです。

通信の仕方②ーSendAndReceive/Receive+Send

FlowSessionには、SendAndReceive関数というのもあるので、そちらの使い方も見てみましょう。

▼SendAndReceive
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-flow-session/send-and-receive.html

  1. Send同様、Sessionを作成し、メッセージを送る(SendAndReceive)
  2. 受け手ノードのInitiatedByが付与されたClass内で、メッセージを受け取る(Receive)
  3. 2.の同クラス内で、メッセージを送る(Send)
  4. 1.のClass内で、メッセージを受け取る(SendAndReceiveの戻り値)

image.png

コードは次の通りです。

@InitiatingFlow
@StartableByRPC
class Initiator3(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val session = initiateFlow(node) // ------------------------------- 1.Session作成
        val receicve = session.sendAndReceive<String>("hoge") // ---------- 2.文字列(String)を送る。受け取る型を指定している。
        receive.unwrap { data -> println("[SEND AND RECEIVE] $data")} // -- 5.文字列(String)を受け取り、表示する
}

@InitiatedBy(Initiator3::class)
class Responder3(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val receive = counterPartySession.receive<String>() // --------- 3.文字列(String)を受け取る
        counterPartySession.send("foo") // ----------------------------- 4.文字列(String)を送る(返す)
    }
}

通信の仕方③ーSend+Send/Receive+Receive

次いで、複数のSendとReceiveがあるパターンです。
基本的には、今までの記述方法と変わりはありません。

【注意点】Send/Receiveの記述順は揃える必要があります。
つまり、1つ目のSendは、受け手ノードのInitiatedByが付与されたClass内の1つ目のReceiveで、2つ目のSendは2つ目のReceiveで受け取ろうとします。
この際、型があっていないと、エラーが発生します。

image.png

サンプルのコードです。

@InitiatingFlow
@StartableByRPC
class Initiator4(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val session = initiateFlow(node) // ---------------------------- 1.Session作成
        session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
        session.send(100)    // ---------------------------------------- 4.数値(Int)を送る
}

@InitiatedBy(Initiator4::class)
class Responder4(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val receiveStr = counterPartySession.receive<String>() // --------- 3.文字列(String)を受け取る
        val receiveInt = counterPartySession.receive<Int>()    // --------- 5.数値(Int)を受け取る
    }
}

複数のSendとReceiveでのその他のパターン

「1:1」で対応しないパターンを見てみます。
基本的にはNGなので、きちんと数と型を合わせる必要があります。

SendとReceiveの型が対応していない(エラー発生)

SendとReceiveの型が、それぞれ対応していないので、この実装ではエラーになってしまいます。

java.lang.IllegalArgumentException: Payload invalid

image.png

@InitiatingFlow
@StartableByRPC
class Initiator5(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val session = initiateFlow(node) // ---------------------------- 1.Session作成
        session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
        session.send(100)    // ---------------------------------------- 4.数値(Int)を送る
}

@InitiatedBy(Initiator5::class)
class Responder5(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val receiveStr = counterPartySession.receive<Int>() // --------- 3.文字列(String)が送られてくるのに、数値(Int)を受け取ろうとしている
        val receiveInt = counterPartySession.receive<String>() // ------ 5.数値(Int)が送られてくるのに、文字列(String)を受け取ろうとしている
    }
}

Receiveの数が多い(エラー発生)

Receiveが多い場合もエラーが発生します。

net.corda.core.flows.UnexpectedFlowEndException: Tried to access ended session SessionId(toLong=4217002328289114723) with empty buffer

image.png

@InitiatingFlow
@StartableByRPC
class Initiator6(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val session = initiateFlow(node) // ---------------------------- 1.Session作成
        session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
}

@InitiatedBy(Initiator6::class)
class Responder6(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val receiveStr = counterPartySession.receive<String>() // --------- 3.文字列(String)を受け取る
        val receiveInt = counterPartySession.receive<Int>()    // --------- 4.何も送られて来ないのに、数値(Int)を受け取ろうとしている
    }
}

Sendの数が多い(問題なし)

しかし、Receiveが存在しない場合は、特にエラー等は発生しません。
下図のような実装は(好ましい・好ましくないという観点は置いておいて)、可能です。

image.png

@InitiatingFlow
@StartableByRPC
class Initiator7(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val session = initiateFlow(node) // ---------------------------- 1.Session作成
        session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
        session.send(100)    // ---------------------------------------- 4.数値(Int)を送る(受け手無し)
}

@InitiatedBy(Initiator7::class)
class Responder7(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val receiveStr = counterPartySession.receive<String>() // ------ 3.文字列(String)を受け取る
    }
}

通信の仕方④ーCollectSignaturesFlow/SignTransactionFlow、FinalityFlow/ReceiveFinalityFlow

Send/Receive以外の通信の方法も見てみます。
代表例としては、「CollectSignaturesFlow/SignTransactionFlow」と、「FinalityFlow/ReceiveFinalityFlow」だと思います。

▼CollectSignaturesFlow
トランザクションにおいて、相手(他ノード)の署名を集めるFlowです。
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-collect-signatures-flow/index.html

▼SignTransactionFlow
CollectSignaturesFlowに対応するFlowで、署名を行います。
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-sign-transaction-flow/index.html

▼FinalityFlow
指定されたトランザクションを検証し、Notaryへ送付します。問題がなければ、Vaultへ書き込みます。
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-finality-flow/index.html

▼ReceiveFinalityFlow
FinalityFlowに対応するFlowで、最終的なトランザクションを受け取ります。
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-receive-finality-flow/index.html

image.png

@InitiatingFlow
@StartableByRPC
class Initiator8(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val session = initiateFlow(node) // -------- 1.Session作成
        // ーーー トランザクションの作成~検証 ここから ーーー
        val notary = serviceHub.networkMapCache.notaryIdentities[0]
        val output = TemplateState(data, listOf(ourIdentity, b, c))
        val cmd = Command(TemplateContract.Commands.Action(), listOf(ourIdentity.owningKey, b.owningKey, c.owningKey))
        val txBuilder = TransactionBuilder(notary)
                .addCommand(cmd)
                .addOutputState(output)
        txBuilder.verify(serviceHub)
        val signedTx = serviceHub.signInitialTransaction(txBuilder)
        // ーーー トランザクションの作成~検証 ここまで ーーー
        val fullySignedTx = subFlow(CollectSignaturesFlow(signedTx, setOf(session))) // --- 2.署名収集
        subFlow(FinalityFlow(fullySignedTx, setOf(session))) // --------------------------- 4.トランザクションの確定
}

@InitiatedBy(Initiator8::class)
class Responder8(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val signTransactionFlow = object : SignTransactionFlow(counterPartySession) {// --- 3.署名実施
            override fun checkTransaction(stx: SignedTransaction) = requireThat {}
        }
        val txId = subFlow(signTransactionFlow).id
        subFlow(ReceiveFinalityFlow(counterPartySession, expectedTxId = txId))       // --- 5.確定したトランザクションの受け取り
    }
}

そういえば、なぜSendが多い場合は問題が無いのか?

なぜSendが多い場合は問題が無いかと言うと、Responderの処理は初回のSendでキックされることで発火し、Receiveはメッセージを受信するまで一時停止するからです。
Receiveが存在しない、Sendの数よりも少ない場合は、一時停止が行われず、Responderの処理が終了します。

▼SendとReceiveが1:1で対応している場合
image.png
▼Receiveが存在しない場合
image.png

裏技?的な使い方としては、ResponderにReceiveを書かずSendを実行することで、Responderの処理を実施することも可能です。

@InitiatingFlow
@StartableByRPC
class Initiator8(private val node: Party) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        val session = initiateFlow(node) // --- 1.Session作成
        session.send("") // ------------------- 2.文字列(String)を送る(Responderの呼び出し)
}

@InitiatedBy(Initiator8::class)
class Responder8(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
    @Suspendable
    override fun call() {
        // Receiveを記述していないが、SendによりResponderがキックされるので、下記が実行される
        println("foo") // -------------------- 3.実行される
    }
}

ちなみに、通信の仕方④で紹介した各Flowも、内部的にはSend/Receiveを使用しています。
気になる方はぜひ見てみてください(R3の回し者ではありません)

おわりに

今回は、あまり誰も教えてくれないと思われる、Session周りについて、解説してみました。
冒頭にもリンクは貼りましたが、ドキュメントに記載はあるものの、キーコンセプトからは外れているので、意外と理解してない方も多いのではないかなーと勝手に想像しています。
※私は、最近になって、ここらへんを完全に理解しました。

図で説明するとなると、やはり、なかなか技術がいるものですね。
これで伝わるものか・・・?
パット見で、あぁこんなもんなんだな。と思ってもらえたら幸いです。

長くなってしまいましたが、今回はここまでです。
ありがとうございました。

nobkovskii
Qiitaでは主にブロックチェーン(HLF、Corda)の事を書いていきます。 ※投稿内容は個人の見解であり、所属する組織の公式見解ではありません
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away