13
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ブロックチェーン NEM Symbolで初めてrxjsに出会った人へ

Last updated at Posted at 2019-12-26

symbol-sdkで開発をする上で避けられないのがrxjs。
熟練NEMアプリ開発者でも、なかなかうまく使いこなせないとの声を聞きます(私
2020年ブロックチェーンはいよいよ書き込みから読み込みも重要な要素となる時代に突入します。
NEM界隈での悲鳴は将来のブロックチェーン業界の悲鳴と受け止めることができます。
rxjsで記述すると、データを受信するたびに処理を発生させたい場合や、データの操作や抽出を一括して行いたい場合に非常に便利です。

今のうちにReactiveなストリームに慣れておきましょう。

従来のプログラムの書き方

まずは典型的な処理の流れを従来のプログラムの書き方で例示してみます。


let result = {};
let items = account.getInfo();
for(let item of items){

    if( item.amount > 0){
        result[item.name] = item.amount;
    }
}
console.log(result);

getInfoという関数で情報を取得してforでループを回し、if文を用いて条件に合致するものだけresultに詰め込んで、最後に出力するといった流れです。どんなプログラマでも一回は経験する王道の実装パターンですね。

rxjsの書き方

これをrxjsで書いてみます。


account.getInfo()
.pipe(
    mergeMap(item => item),
    filter(item => item.amount > 0)
)
.subscribe(
    result => console.log(result);
)

大きく分けてpipe部とsubscribeに分かれています。
pipeで取得できた情報をmergeMapやfilterというオペレータ関数で抽出・加工していきます。
最後にsubscribe部分に流れてきたデータを出力します。
一番最初にして最大の難関mergeMapがあります。mergeという名前に惑わされてはいけません。確かに、複数のデータを取得する場合は、それらのデータの挙動に注意する必要がありますが、nem2-sdkで利用する場合は単体での利用が多いです。特に配列形式でデータが取れる場合にmergeMapを利用します。

mergeMap
取得したデータ:[,,,]
次のフィルタに渡すデータ:[],[],[]

次にくるfilterやsubscribeで処理したい内容が、配列の一つ一つにそれぞれ適用させたい場合に、受け取った配列データを一つ一つの独立したデータに分解して配列のサイズ数だけ後続の処理を発生させることができます。mergeMapはほとんどその用途でのみ使用します。

出来るだけ従来のプログラムに合わせたいということであれば以下のような書き方もできます。

account.getInfo()
.subscribe(
    items => {
        let result = {};
        for(let item of items){
            if( item.amount > 0){
                result[item.name] = item.amount;
            }
        }
        console.log(result);
    };
)

単純な処理であればこれで十分かもしれません。ただ、取得したデータを条件にさらに情報を取りに行きたいときなどは処理がとても複雑になります。ちょっと極端な例ですが、こんな感じ。

account.getInfo()
.subscribe(
    items => {
        let result = {};
        for(let item of items){
            if( item.amount > 0){

                result[item.name] = item.amount;

                address.getInfo(item)
                .subscribe(
                    addresses => {
                        for(let address of addresses){
                            if(address.name == item.name){
                                result[item.address] = address.value();
                            }
                        }
                    }
                );
            }
        }
        console.log(result);
    };
)

これが上手に設計された rxjsを利用すると、以下のように書けるかもしれません。


account.getInfo()
.pipe(
    mergeMap(item => item),
    filter(item => item.amount > 0),
    map(_ => address.getInfo(_) )
)
.subscribe(
    result => console.log(result);
)

これは、subscribeが結合可能な特徴をうまく利用しています。
結合しない場合は以下のように書くこともできます。

account.getInfo()
.pipe(
    mergeMap(item => item),
    filter(item => item.amount > 0),
)
.subscribe(
    _ => {
        address.getInfo(_)
        .subscribe(result => consoel.log(result))
    }
)

subscribeが入れ子になってしまっていますね。
またpipeを分岐することも可能です。
filterの条件によって異なる出力をするようにしてみましょう。

let getAccountInfo = account.getInfo()
.pipe(
    mergeMap(item => item)
);

//出力1
getAccountInfo
.pipe(filter(item => item.amount > 0 && item.amount < 100))
.subscribe(result => console.log(result));

//出力2
getAccountInfo
.pipe(filter(item => item.amount >= 100))
.subscribe(result => console.log(result));

このように共通部分と独自部分に分けて実装することが可能です。
処理が複雑になってくると、不具合を起こした場合に原因を特定するのが困難になります。
その場合は処理を書き換えてデバッグコンソールで流れを追いやすくしましょう。

account.getInfo()
.pipe(
    mergeMap(item => {
        console.log(item); //デバッグ出力
        return item;
    }),
    filter(item => {
        if(item.amount > 0){
            return true;
        }
    }),
    map(_ => {
        console.log(_); //デバッグ出力
        return address.getInfo(_);
    })
)
.subscribe(
    result => {
       console.log(result);
    }
)

公式によると100以上のoperatorがあるようですが、私がよく使う関数は
mergeMap map filter zip toArray of の6つだけです。ぜひマスターしましょう!

ネットワークへの通知と承認のリスニング

ネットワークの通知とリスナーの登録を同時に行うパターンです。
リスナー内では通知時に取得済みのハッシュ値で照合することで意図したトランザクションを取得できるようにフィルターを利用します。


    txHttp
    .announce(signedTx)
    .subscribe(x => {
        console.log(NODE + "/transaction/" + signedTx.hash +"/status");
    }, err => console.error(err));

    listener
    .confirmed(alice.address)
    .pipe(
        ro.filter((tx) => {
            return tx.transactionInfo !== undefined && tx.transactionInfo.hash === signedTx.hash
        }),
    );
複数情報の同時取得

zipを利用して、複数のObservableクラスを同時に処理します。
以下のサンプルではaliceが署名権を持つアカウントの一覧を取得し、それらのネームスペースと有効期限を取得します。異なる関数でしか取得できないのでzipでまとめてアクセスし、subscribeでは配列として取得します。


multisigHttp.getMultisigAccountInfo(alice.address)
.pipe(
    ro.mergeMap(_=>{
        return  _.multisigAccounts
    }),
    ro.map(_=>_.address),
    ro.toArray(),
    ro.mergeMap(_ => {

        return  r.zip(
            nsHttp.getAccountsNames(_),
            nsHttp.getNamespacesFromAccounts(_)
        );
    }),
)
.subscribe(x => {
    $("#table").append("<tr>"
        +"<td>"+ x[0][i].names[0].name + "</td>"
        +"<td>"+ x[0][i].address.address + "</td>"
        +"<td>"+ x[1][i].endHeight.toString() + "</td>"
        + "</tr>"
    );

}, err => console.error(err));

また、pipeで処理を続けて行くうちに情報を付加させて次に情報を渡していきたい場面が出てくると思います。その場合もzipが活用できます。

return  r.zip(
            nsHttp.getAccountsNames(_),
            Observable.create(obs => obs.next("is_confirmed"))
);

Observableクラスを自作し、nextで次のオペレーターやsubscribeにフラグ情報を流してあげましょう。

ボンデッドトランザクションの署名


accountHttp.getAccountPartialTransactions(alice.address)
.pipe(
    ro.mergeMap(_ => _),
    ro.filter(_ => !_.signedByAccount(alice.publicAccount)), //未署名のもののみ抽出
    ro.map(_ => alice.signCosignatureTransaction(nem.CosignatureTransaction.create(_)), //署名
    ro.mergeMap(_ => txHttp.announceAggregateBondedCosignature(_)) //アナウンス
)
.subscribe(_ => {
},
    err => console.error(err)
);

それでも rxjsを多用したくない場合

以下のような書き方で、値が取得されるまで処理を止めておくことができます。

(async() =>{

    accountInfo = await account.getInfo().toPromise();

})();

awaitを使用する場合、そのロジックは asyncで囲われている必要があります。
上記はjsロード時にasyncとして処理する部分を準備する書き方です。
function 全体を asyncにする場合は以下のようにします。

async function getInfo(){
    info = await repo.search(...).toPromise(); 
}

他にも以下のような宣言の仕方があります。

listener.webSocket.onclose = async function(){
	await listenerKeepOpening(wsEndpoint);
}

listener.newBlock().subscribe(async block=>{
	await getNewInfo(block);
});

逃げの一手と思われるかもしれませんが、情報取得する間に処理が止まってしまうことがあまり問題とならない場合、リスナーのように次々にデータが流れこんでこない場合は、変数内の値の変化が一目瞭然なので無理にrxjsを使用せずtoPromise()で処理を同期化させるのも有効です。

13
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?