broker と server の起動
同期データの削除
同期したデータがある場合は、勿体ないですが削除してください。
rm -rf data/*
設定の変更
enableAutoSyncCleanup = false
extension.mongo = false
extension.filespooling = true
起動
catapult.broker .
catapult.server .
TypeScript プロジェクト
ZeroMQ インストール
TypeScript 用の ZeroMQ モジュールが必要なのでインストールします。
yarn add zeromq
ZeroMQ へ接続
以下の様な感じで ZeroMQ へ接続できます。
セキュリティ上 7902 ポートは開いていないので、各自用意したノードが必要になります。
import { Subscriber } from 'zeromq'
const sock = new Subscriber({
connectTimeout: 3000,
tcpKeepaliveInterval: 500,
tcpKeepaliveCount: 500,
})
const host = '127.0.0.1'
const port = 7902
const tcpAddress = `tcp://${host}:${port}`
const sock.connect(tcpAddress)
欲しい情報毎にマーカーが設定されていて、それを登録してノード側に変化があったら通知が来る用になります。WebSocket みたいだって?これは WebSocket の大元です。
ブロック通知
新ブロック通知のマーカーは9FF2D8E480CA6A49
、ファイナライズブロック通知のマーカーは4D4832A031CE7954
です。
項目 | マーカー |
---|---|
新ブロック | 9FF2D8E480CA6A49 |
ファイナライズブロック | 4D4832A031CE7954 |
ドロップ | 5C20D68AEE25B0B0 |
WebSocket 前提で作ったので、メソッド名がsendBlock
等になっていますが気にしないでください。
ドロップは、プチフォークしたときに通知されるモノと思われます。使うタイミングは、ほぼ無いと思うので省略します。
Symbol-SDK v3.2.3 で追加されたtoJson
メソッドを使用しています。
import { Hash256, utils } from 'symbol-sdk'
import { Address, models } from 'symbol-sdk/symbol'
import { Subscriber } from 'zeromq'
export class CatapultZeroMQ {
private sock: Subscriber
private tcpAddress: string
private blockMarker = Buffer.from(
utils.hexToUint8('9FF2D8E480CA6A49').reverse()
)
private finalizedBlockMarker = Buffer.from(
utils.hexToUint8('4D4832A031CE7954').reverse()
)
constructor() {
this.sock = new Subscriber({
connectTimeout: 3000,
tcpKeepaliveInterval: 500,
tcpKeepaliveCount: 500,
})
const host = '127.0.0.1'
const port = 7902
this.tcpAddress = `tcp://${host}:${port}`
this.sock.connect(this.tcpAddress)
console.info(`Connecting to ${this.tcpAddress}`)
}
start = async (): Promise<void> => {
while (true) this.analyzeZeroMq(await this.sock.receive())
}
close = (): void => this.sock.disconnect(this.tcpAddress)
subscribe = (topic: string): void => {
const topicNames = topic.split('/')
const tpc = topicNames[0]
let addr = new Uint8Array()
if (topicNames.length >= 2) addr = new Address(topicNames[1]).bytes
if (tpc === 'block') {
this.execSubscribe(this.blockMarker)
} else if (tpc === 'finalizedBlock') {
this.execSubscribe(this.finalizedBlockMarker)
} else {
throw Error('Unknown topic.')
}
}
private analyzeZeroMq = (receiveDatas: Buffer[]) => {
const topic = receiveDatas[0]
if (topic.equals(this.blockMarker)) this.sendBlock(receiveDatas)
else if (topic.equals(this.finalizedBlockMarker))
this.sendFinalizedBlock(receiveDatas)
}
private sendBlock = (receiveDatas: Buffer[]) => {
const blockHeaderBuf = receiveDatas[1]
const entityHashBuf = receiveDatas[2]
const generationHashBuf = receiveDatas[3]
blockHeaderBuf.writeInt32LE(blockHeaderBuf.byteLength) // 先頭にあるサイズを実サイズに変更する
const data = models.BlockFactory.deserialize(blockHeaderBuf) as models.Block
const zqData = {
topic: 'block',
data: {
block: data.toJson(),
meta: {
hash: new Hash256(entityHashBuf).toString(),
generationHash: new Hash256(generationHashBuf).toString(),
},
},
}
console.log(JSON.stringify(zqData, null, 2))
}
private sendFinalizedBlock = (receiveDatas: Buffer[]) => {
const finalizedBlockHeaderBuf = receiveDatas[1]
const data = models.FinalizedBlockHeader.deserialize(
finalizedBlockHeaderBuf
) as models.FinalizedBlockHeader
const zqData = {
topic: 'finalizedBlock',
data: data.toJson(),
}
console.log(JSON.stringify(zqData, null, 2))
}
private execSubscribe = (marker: Buffer) => this.sock.subscribe(marker)
}
新ブロック情報は、ブロックデータなのですが、トランザクションは付与されておらず、先頭のブロックサイズと実際のサイズが異なるので、sdk で処理するために修正しています。
実行
import { CatapultZeroMQ } from './CatapultZeroMQ.js'
const zeroMq = new CatapultZeroMQ()
zeroMq.subscribe('block')
zeroMq.subscribe('finalizedBlock')
zeroMq.start()
同期中の場合は、次から次に通知が来ると思います。
finalizedBlock が出力されるまで 15 ~ 20 分ほど待つと思います。
> yarn tsx .\src\mainZqBlock.ts
Connecting to tcp://127.0.0.1:7902
{
"topic": "block",
"data": {
"block": {
"signature": "A39193788BA2B6830951BF9D383DB6159E6C865B2F550415CAB36D72F943E00D0C3D77BE0F6C095D45A9B29588D25BD9D57DBD5243A65138F62F251C498E0806",
"signerPublicKey": "B8C83E30D74736E90D3318551998016446CF68C98D0932B688FD1463AAE6D125",
"version": 1,
"network": 152,
"type": 33091,
"height": "1942900",
"timestamp": "66181087271",
"difficulty": "10013867935565",
"generationHashProof": {
"gamma": "401BA2A0A8057CFEC78AAABF0A41CD817DD4690FC7DE58EAF20DCE4217728FF4",
"verificationHash": "82BA527E90CF1E480CD3CDBE81E6660C",
"scalar": "BAE2BFE024A79002AE54684063FC946997F29DA55B92FD0171C6988376018209"
},
"previousBlockHash": "648FF05BD5CC74C9BDD81CC9E4566A338DCF811A011903508FE11B443676676E",
"transactionsHash": "E6BB03B0F7A00E547462AC8C08E62659B40312D79C9D8775487B78416615A1C1",
"receiptsHash": "78417303C53636FE8FDFD82B2205EA5EEDD05A26E1E36AEE86792141CD56A570",
"stateHash": "C6767F94950E260DF67DCA80BDE1E984330560CF946865253F1C295B9D20354B",
"beneficiaryAddress": "987CCA1A46DDD5D1C4341EA0332FEC10EAF05FA99E53A705",
"feeMultiplier": 100,
"transactions": []
},
"meta": {
"hash": "E12B41A78D7DB9779FB6BC7858F025156A0A2F06922C3794E9D7B64128087765",
"generationHash": "D566353F909F0C6EA10FC9FA0FF3CD5418E13AE5CE89FB2E51E2AED012F18330"
}
}
}
{
"topic": "finalizedBlock",
"data": {
"round": {
"epoch": 2700,
"point": 22
},
"height": "1942904",
"hash": "A0048B3AFD80EC798C1D125F5E477D841A23A4276983B8E1341C07FBEBD297BE"
}
}
バックナンバー