4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

nem / symbolAdvent Calendar 2024

Day 17

Windows と TypeScript ではじめる Symbol 通信 07【ZeroMQ1】

Last updated at Posted at 2024-12-16

broker と server の起動

同期データの削除

同期したデータがある場合は、勿体ないですが削除してください。

rm -rf data/*

設定の変更

resources/config-node.properties
enableAutoSyncCleanup = false
resources/config-extensions-broker.properties
extension.mongo = false
resources/config-extensions-server.properties
extension.filespooling = true

起動

bash
catapult.broker .
bash
catapult.server .

TypeScript プロジェクト

ZeroMQ インストール

TypeScript 用の ZeroMQ モジュールが必要なのでインストールします。

yarn add zeromq

ZeroMQ へ接続

以下の様な感じで ZeroMQ へ接続できます。
セキュリティ上 7902 ポートは開いていないので、各自用意したノードが必要になります。

import { Subscriber } from 'zeromq'
const sock = new Subscriber({
  connectTimeout: 3000,
  tcpKeepaliveInterval: 500,
  tcpKeepaliveCount: 500,
})
const host = '127.0.0.1'
const port = 7902
const tcpAddress = `tcp://${host}:${port}`
const sock.connect(tcpAddress)

欲しい情報毎にマーカーが設定されていて、それを登録してノード側に変化があったら通知が来る用になります。WebSocket みたいだって?これは WebSocket の大元です。

ブロック通知

新ブロック通知のマーカーは9FF2D8E480CA6A49、ファイナライズブロック通知のマーカーは4D4832A031CE7954です。

項目 マーカー
新ブロック 9FF2D8E480CA6A49
ファイナライズブロック 4D4832A031CE7954
ドロップ 5C20D68AEE25B0B0

WebSocket 前提で作ったので、メソッド名がsendBlock等になっていますが気にしないでください。
ドロップは、プチフォークしたときに通知されるモノと思われます。使うタイミングは、ほぼ無いと思うので省略します。

Symbol-SDK v3.2.3 で追加されたtoJsonメソッドを使用しています。

src/CatapultZeroMQ.ts
import { Hash256, utils } from 'symbol-sdk'
import { Address, models } from 'symbol-sdk/symbol'
import { Subscriber } from 'zeromq'

export class CatapultZeroMQ {
  private sock: Subscriber
  private tcpAddress: string

  private blockMarker = Buffer.from(
    utils.hexToUint8('9FF2D8E480CA6A49').reverse()
  )
  private finalizedBlockMarker = Buffer.from(
    utils.hexToUint8('4D4832A031CE7954').reverse()
  )

  constructor() {
    this.sock = new Subscriber({
      connectTimeout: 3000,
      tcpKeepaliveInterval: 500,
      tcpKeepaliveCount: 500,
    })
    const host = '127.0.0.1'
    const port = 7902
    this.tcpAddress = `tcp://${host}:${port}`
    this.sock.connect(this.tcpAddress)
    console.info(`Connecting to ${this.tcpAddress}`)
  }

  start = async (): Promise<void> => {
    while (true) this.analyzeZeroMq(await this.sock.receive())
  }

  close = (): void => this.sock.disconnect(this.tcpAddress)

  subscribe = (topic: string): void => {
    const topicNames = topic.split('/')
    const tpc = topicNames[0]
    let addr = new Uint8Array()
    if (topicNames.length >= 2) addr = new Address(topicNames[1]).bytes

    if (tpc === 'block') {
      this.execSubscribe(this.blockMarker)
    } else if (tpc === 'finalizedBlock') {
      this.execSubscribe(this.finalizedBlockMarker)
    } else {
      throw Error('Unknown topic.')
    }
  }

  private analyzeZeroMq = (receiveDatas: Buffer[]) => {
    const topic = receiveDatas[0]
    if (topic.equals(this.blockMarker)) this.sendBlock(receiveDatas)
    else if (topic.equals(this.finalizedBlockMarker))
      this.sendFinalizedBlock(receiveDatas)
  }

  private sendBlock = (receiveDatas: Buffer[]) => {
    const blockHeaderBuf = receiveDatas[1]
    const entityHashBuf = receiveDatas[2]
    const generationHashBuf = receiveDatas[3]
    blockHeaderBuf.writeInt32LE(blockHeaderBuf.byteLength) // 先頭にあるサイズを実サイズに変更する
    const data = models.BlockFactory.deserialize(blockHeaderBuf) as models.Block
    const zqData = {
      topic: 'block',
      data: {
        block: data.toJson(),
        meta: {
          hash: new Hash256(entityHashBuf).toString(),
          generationHash: new Hash256(generationHashBuf).toString(),
        },
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private sendFinalizedBlock = (receiveDatas: Buffer[]) => {
    const finalizedBlockHeaderBuf = receiveDatas[1]
    const data = models.FinalizedBlockHeader.deserialize(
      finalizedBlockHeaderBuf
    ) as models.FinalizedBlockHeader
    const zqData = {
      topic: 'finalizedBlock',
      data: data.toJson(),
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private execSubscribe = (marker: Buffer) => this.sock.subscribe(marker)
}

新ブロック情報は、ブロックデータなのですが、トランザクションは付与されておらず、先頭のブロックサイズと実際のサイズが異なるので、sdk で処理するために修正しています。

実行

mainZqBlock.ts
import { CatapultZeroMQ } from './CatapultZeroMQ.js'

const zeroMq = new CatapultZeroMQ()
zeroMq.subscribe('block')
zeroMq.subscribe('finalizedBlock')
zeroMq.start()

同期中の場合は、次から次に通知が来ると思います。
finalizedBlock が出力されるまで 15 ~ 20 分ほど待つと思います。

> yarn tsx .\src\mainZqBlock.ts
Connecting to tcp://127.0.0.1:7902
{
  "topic": "block",
  "data": {
    "block": {
      "signature": "A39193788BA2B6830951BF9D383DB6159E6C865B2F550415CAB36D72F943E00D0C3D77BE0F6C095D45A9B29588D25BD9D57DBD5243A65138F62F251C498E0806",
      "signerPublicKey": "B8C83E30D74736E90D3318551998016446CF68C98D0932B688FD1463AAE6D125",
      "version": 1,
      "network": 152,
      "type": 33091,
      "height": "1942900",
      "timestamp": "66181087271",
      "difficulty": "10013867935565",
      "generationHashProof": {
        "gamma": "401BA2A0A8057CFEC78AAABF0A41CD817DD4690FC7DE58EAF20DCE4217728FF4",
        "verificationHash": "82BA527E90CF1E480CD3CDBE81E6660C",
        "scalar": "BAE2BFE024A79002AE54684063FC946997F29DA55B92FD0171C6988376018209"
      },
      "previousBlockHash": "648FF05BD5CC74C9BDD81CC9E4566A338DCF811A011903508FE11B443676676E",
      "transactionsHash": "E6BB03B0F7A00E547462AC8C08E62659B40312D79C9D8775487B78416615A1C1",
      "receiptsHash": "78417303C53636FE8FDFD82B2205EA5EEDD05A26E1E36AEE86792141CD56A570",
      "stateHash": "C6767F94950E260DF67DCA80BDE1E984330560CF946865253F1C295B9D20354B",
      "beneficiaryAddress": "987CCA1A46DDD5D1C4341EA0332FEC10EAF05FA99E53A705",
      "feeMultiplier": 100,
      "transactions": []
    },
    "meta": {
      "hash": "E12B41A78D7DB9779FB6BC7858F025156A0A2F06922C3794E9D7B64128087765",
      "generationHash": "D566353F909F0C6EA10FC9FA0FF3CD5418E13AE5CE89FB2E51E2AED012F18330"
    }
  }
}
{
  "topic": "finalizedBlock",
  "data": {
    "round": {
      "epoch": 2700,
      "point": 22
    },
    "height": "1942904",
    "hash": "A0048B3AFD80EC798C1D125F5E477D841A23A4276983B8E1341C07FBEBD297BE"
  }
}

バックナンバー

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?