はじめに
どーも、のぶこふです。
今回は、CordaにおけるFlowSession(以下、Session)について解説します。
※Corda 4.5 時点の内容です
▼FlowSessionのAPIドキュメント
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-flow-session/index.html
▼FlowSessionのドキュメント
https://docs.corda.net/docs/corda-os/4.5/flow-state-machines.html#flow-sessions
Sessionって、なんぞ?
Sessionは、通信等における開始から終了までを指すと思いますが、Cordaにおいても同様の意味合いを持ちます。
フロー間でメッセージを送ったり、カウンターフローに関する情報を照会するなどに使用されます。
一言で表すと、他ノードとの通信の開始から終了までのこと(下図、赤矢印)を、Sessionを呼びます。
FlowやState、Vaultといった概念(キーコンセプト)も大事ですが、Sessionを理解することで、Cordaの実装を一段階深められると思います。
Sessionの作り方
Sessionを取得するには、2つの方法があります。
- FlowLogic.initiateFlowを呼び出す
- InitiatedByフローへのコンストラクタパラメータとして受け取る(initiateFlowに対応するもので、返信に使用される)
initiateFlowを使用すると、引数に指定したPartyとSessionを作成します。(赤矢印)
そのSessionを使用してメッセージ等を送ると、受け手のノードのInitiatedByが付与されたClassが対応します。(青矢印)
Codeで表してみると、次のようになります。
@InitiatingFlow
@StartableByRPC
class Initiator(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val SessionNodeA = initiateFlow(node) // ---------------- 赤矢印
}
}
@InitiatedBy(Initiator::class)
class Responder(val counterPartySession: FlowSession // --------- 青矢印
) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
}
}
通信の仕方①ーSend/Receive
Sessionの取得方法は先の通りですが、これだけでは、使用することが出来ません。
まずは、最もシンプルなメッセージの送受信(Send/Receive)の例を見てみましょう。
▼Send
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-flow-session/send.html
▼Receive
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-flow-session/receive.html
- initiateFlowでSessionを作成する
- 「1.」で作成したSessionを使用して、メッセージを送る(Send)
- 受け手ノードのInitiatedByが付与されたClass内で、メッセージを受け取る(Receive)
次にCodeでも見てみます。
@InitiatingFlow
@StartableByRPC
class Initiator2(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(node) // ---------------------------- 1.Session作成
session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
}
@InitiatedBy(Initiator2::class)
class Responder2(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val receive = counterPartySession.receive<String>() // --------- 3.文字列(String)を受け取る
}
}
【注意点】Send/Receiveの型を合わせる必要があります。
上記の例では、文字列を送り、文字列で受け取っています。
この型が合わないと、エラーが発生するので、注意してください。
イメージはJavaで言うところのメソッドの引数の型を合わせる。と言った感じです。OverloadはNGです。
通信の仕方②ーSendAndReceive/Receive+Send
FlowSessionには、SendAndReceive関数というのもあるので、そちらの使い方も見てみましょう。
▼SendAndReceive
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-flow-session/send-and-receive.html
- Send同様、Sessionを作成し、メッセージを送る(SendAndReceive)
- 受け手ノードのInitiatedByが付与されたClass内で、メッセージを受け取る(Receive)
- 2.の同クラス内で、メッセージを送る(Send)
- 1.のClass内で、メッセージを受け取る(SendAndReceiveの戻り値)
コードは次の通りです。
@InitiatingFlow
@StartableByRPC
class Initiator3(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(node) // ------------------------------- 1.Session作成
val receicve = session.sendAndReceive<String>("hoge") // ---------- 2.文字列(String)を送る。受け取る型を指定している。
receive.unwrap { data -> println("[SEND AND RECEIVE] $data")} // -- 5.文字列(String)を受け取り、表示する
}
@InitiatedBy(Initiator3::class)
class Responder3(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val receive = counterPartySession.receive<String>() // --------- 3.文字列(String)を受け取る
counterPartySession.send("foo") // ----------------------------- 4.文字列(String)を送る(返す)
}
}
通信の仕方③ーSend+Send/Receive+Receive
次いで、複数のSendとReceiveがあるパターンです。
基本的には、今までの記述方法と変わりはありません。
【注意点】Send/Receiveの記述順は揃える必要があります。
つまり、1つ目のSendは、受け手ノードのInitiatedByが付与されたClass内の1つ目のReceiveで、2つ目のSendは2つ目のReceiveで受け取ろうとします。
この際、型があっていないと、エラーが発生します。
サンプルのコードです。
@InitiatingFlow
@StartableByRPC
class Initiator4(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(node) // ---------------------------- 1.Session作成
session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
session.send(100) // ---------------------------------------- 4.数値(Int)を送る
}
@InitiatedBy(Initiator4::class)
class Responder4(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val receiveStr = counterPartySession.receive<String>() // --------- 3.文字列(String)を受け取る
val receiveInt = counterPartySession.receive<Int>() // --------- 5.数値(Int)を受け取る
}
}
複数のSendとReceiveでのその他のパターン
「1:1」で対応しないパターンを見てみます。
基本的にはNGなので、きちんと数と型を合わせる必要があります。
SendとReceiveの型が対応していない(エラー発生)
SendとReceiveの型が、それぞれ対応していないので、この実装ではエラーになってしまいます。
java.lang.IllegalArgumentException: Payload invalid
@InitiatingFlow
@StartableByRPC
class Initiator5(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(node) // ---------------------------- 1.Session作成
session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
session.send(100) // ---------------------------------------- 4.数値(Int)を送る
}
@InitiatedBy(Initiator5::class)
class Responder5(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val receiveStr = counterPartySession.receive<Int>() // --------- 3.文字列(String)が送られてくるのに、数値(Int)を受け取ろうとしている
val receiveInt = counterPartySession.receive<String>() // ------ 5.数値(Int)が送られてくるのに、文字列(String)を受け取ろうとしている
}
}
Receiveの数が多い(エラー発生)
Receiveが多い場合もエラーが発生します。
net.corda.core.flows.UnexpectedFlowEndException: Tried to access ended session SessionId(toLong=4217002328289114723) with empty buffer
@InitiatingFlow
@StartableByRPC
class Initiator6(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(node) // ---------------------------- 1.Session作成
session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
}
@InitiatedBy(Initiator6::class)
class Responder6(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val receiveStr = counterPartySession.receive<String>() // --------- 3.文字列(String)を受け取る
val receiveInt = counterPartySession.receive<Int>() // --------- 4.何も送られて来ないのに、数値(Int)を受け取ろうとしている
}
}
Sendの数が多い(問題なし)
しかし、Receiveが存在しない場合は、特にエラー等は発生しません。
下図のような実装は(好ましい・好ましくないという観点は置いておいて)、可能です。
@InitiatingFlow
@StartableByRPC
class Initiator7(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(node) // ---------------------------- 1.Session作成
session.send("hoge") // ---------------------------------------- 2.文字列(String)を送る
session.send(100) // ---------------------------------------- 4.数値(Int)を送る(受け手無し)
}
@InitiatedBy(Initiator7::class)
class Responder7(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val receiveStr = counterPartySession.receive<String>() // ------ 3.文字列(String)を受け取る
}
}
通信の仕方④ーCollectSignaturesFlow/SignTransactionFlow、FinalityFlow/ReceiveFinalityFlow
Send/Receive以外の通信の方法も見てみます。
代表例としては、「CollectSignaturesFlow/SignTransactionFlow」と、「FinalityFlow/ReceiveFinalityFlow」だと思います。
▼CollectSignaturesFlow
トランザクションにおいて、相手(他ノード)の署名を集めるFlowです。
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-collect-signatures-flow/index.html
▼SignTransactionFlow
CollectSignaturesFlowに対応するFlowで、署名を行います。
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-sign-transaction-flow/index.html
▼FinalityFlow
指定されたトランザクションを検証し、Notaryへ送付します。問題がなければ、Vaultへ書き込みます。
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-finality-flow/index.html
▼ReceiveFinalityFlow
FinalityFlowに対応するFlowで、最終的なトランザクションを受け取ります。
https://api.corda.net/api/corda-os/4.5/html/api/kotlin/corda/net.corda.core.flows/-receive-finality-flow/index.html
@InitiatingFlow
@StartableByRPC
class Initiator8(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(node) // -------- 1.Session作成
// ーーー トランザクションの作成~検証 ここから ーーー
val notary = serviceHub.networkMapCache.notaryIdentities[0]
val output = TemplateState(data, listOf(ourIdentity, b, c))
val cmd = Command(TemplateContract.Commands.Action(), listOf(ourIdentity.owningKey, b.owningKey, c.owningKey))
val txBuilder = TransactionBuilder(notary)
.addCommand(cmd)
.addOutputState(output)
txBuilder.verify(serviceHub)
val signedTx = serviceHub.signInitialTransaction(txBuilder)
// ーーー トランザクションの作成~検証 ここまで ーーー
val fullySignedTx = subFlow(CollectSignaturesFlow(signedTx, setOf(session))) // --- 2.署名収集
subFlow(FinalityFlow(fullySignedTx, setOf(session))) // --------------------------- 4.トランザクションの確定
}
@InitiatedBy(Initiator8::class)
class Responder8(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val signTransactionFlow = object : SignTransactionFlow(counterPartySession) {// --- 3.署名実施
override fun checkTransaction(stx: SignedTransaction) = requireThat {}
}
val txId = subFlow(signTransactionFlow).id
subFlow(ReceiveFinalityFlow(counterPartySession, expectedTxId = txId)) // --- 5.確定したトランザクションの受け取り
}
}
そういえば、なぜSendが多い場合は問題が無いのか?
なぜSendが多い場合は問題が無いかと言うと、Responderの処理は初回のSendでキックされることで発火し、Receiveはメッセージを受信するまで一時停止するからです。
Receiveが存在しない、Sendの数よりも少ない場合は、一時停止が行われず、Responderの処理が終了します。
▼SendとReceiveが1:1で対応している場合
▼Receiveが存在しない場合
裏技?的な使い方としては、ResponderにReceiveを書かずSendを実行することで、Responderの処理を実施することも可能です。
@InitiatingFlow
@StartableByRPC
class Initiator8(private val node: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(node) // --- 1.Session作成
session.send("") // ------------------- 2.文字列(String)を送る(Responderの呼び出し)
}
@InitiatedBy(Initiator8::class)
class Responder8(val counterPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// Receiveを記述していないが、SendによりResponderがキックされるので、下記が実行される
println("foo") // -------------------- 3.実行される
}
}
ちなみに、通信の仕方④で紹介した各Flowも、内部的にはSend/Receiveを使用しています。
気になる方はぜひ見てみてください~~(R3の回し者ではありません)~~。
おわりに
今回は、あまり誰も教えてくれないと思われる、Session周りについて、解説してみました。
冒頭にもリンクは貼りましたが、ドキュメントに記載はあるものの、キーコンセプトからは外れているので、意外と理解してない方も多いのではないかなーと勝手に想像しています。
※私は、最近になって、ここらへんを完全に理解しました。
図で説明するとなると、やはり、なかなか技術がいるものですね。
これで伝わるものか・・・?
パット見で、あぁこんなもんなんだな。と思ってもらえたら幸いです。
長くなってしまいましたが、今回はここまでです。
ありがとうございました。