3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

nem / symbolAdvent Calendar 2024

Day 18

Windows と TypeScript ではじめる Symbol 通信 08【ZeroMQ2】

Last updated at Posted at 2024-12-17

トランザクション通知

トランザクション通知は以下の通りです。

項目 マーカー
承認追加 61
未承認追加 75
未承認削除 73

1 バイトなので特にリトルエンディアンを意識する必要はありません。

private confirmedAddedMarker = Buffer.from(utils.hexToUint8('61'))
private unconfirmedAddedMarker = Buffer.from(utils.hexToUint8('75'))
private unconfirmedRemovedMarker = Buffer.from(utils.hexToUint8('72'))

また、マーカーにアドレスを追加することで、アドレスでフィルタすることが出来ます。
以下は、WebSocket と同じように/で区切ってアドレスを受け取るようにしてます。

  subscribe = (topic: string): void => {
    const topicNames = topic.split('/')
    const tpc = topicNames[0]
    let addr = new Uint8Array()
    if (topicNames.length >= 2) addr = new Address(topicNames[1]).bytes

全文

CatapultZeroMQ.ts
import { Hash256, utils } from 'symbol-sdk'
import { Address, models } from 'symbol-sdk/symbol'
import { Subscriber } from 'zeromq'

export class CatapultZeroMQ {
  private sock: Subscriber
  private tcpAddress: string

  private blockMarker = Buffer.from(
    utils.hexToUint8('9FF2D8E480CA6A49').reverse()
  )
  private finalizedBlockMarker = Buffer.from(
    utils.hexToUint8('4D4832A031CE7954').reverse()
  )
  private confirmedAddedMarker = Buffer.from(utils.hexToUint8('61'))
  private unconfirmedAddedMarker = Buffer.from(utils.hexToUint8('75'))
  private unconfirmedRemovedMarker = Buffer.from(utils.hexToUint8('72'))

  constructor() {
    this.sock = new Subscriber({
      connectTimeout: 3000,
      tcpKeepaliveInterval: 500,
      tcpKeepaliveCount: 500,
    })
    const host = '127.0.0.1'
    const port = 7902
    this.tcpAddress = `tcp://${host}:${port}`
    this.sock.connect(this.tcpAddress)
    console.info(`Connecting to ${this.tcpAddress}`)
  }

  start = async (): Promise<void> => {
    while (true) this.analyzeZeroMq(await this.sock.receive())
  }

  close = (): void => this.sock.disconnect(this.tcpAddress)

  subscribe = (topic: string): void => {
    const topicNames = topic.split('/')
    const tpc = topicNames[0]
    let addr = new Uint8Array()
    if (topicNames.length >= 2) addr = new Address(topicNames[1]).bytes

    if (tpc === 'block') {
      this.execSubscribe(this.blockMarker)
    } else if (tpc === 'finalizedBlock') {
      this.execSubscribe(this.finalizedBlockMarker)
    } else if (tpc === 'confirmedAdded') {
      this.execSubscribe(Buffer.concat([this.confirmedAddedMarker, addr]))
    } else if (tpc === 'unconfirmedAdded') {
      this.execSubscribe(Buffer.concat([this.unconfirmedAddedMarker, addr]))
    } else if (tpc === 'unconfirmedRemoved') {
      this.execSubscribe(Buffer.concat([this.unconfirmedRemovedMarker, addr]))
    } else {
      throw Error('Unknown topic.')
    }
  }

  private analyzeZeroMq = (receiveDatas: Buffer[]) => {
    const topic = receiveDatas[0]
    if (topic.equals(this.blockMarker)) {
      this.sendBlock(receiveDatas)
    } else if (topic.equals(this.finalizedBlockMarker)) {
      this.sendFinalizedBlock(receiveDatas)
    } else if (topic.readInt8() === this.confirmedAddedMarker.readInt8()) {
      this.sendConfirmedAdded(receiveDatas)
    } else if (topic.readInt8() === this.unconfirmedAddedMarker.readInt8()) {
      this.sendUnconfirmedAdded(receiveDatas)
    } else if (topic.readInt8() === this.unconfirmedRemovedMarker.readInt8()) {
      this.sendUnconfirmedRemoved(receiveDatas)
    }
  }

  private sendBlock = (receiveDatas: Buffer[]) => {
    const blockHeaderBuf = receiveDatas[1]
    const entityHashBuf = receiveDatas[2]
    const generationHashBuf = receiveDatas[3]
    blockHeaderBuf.writeInt32LE(blockHeaderBuf.byteLength) // 先頭にあるサイズを実サイズに変更する
    const data = models.BlockFactory.deserialize(blockHeaderBuf) as models.Block
    const zqData = {
      topic: 'block',
      data: {
        block: data.toJson(),
        meta: {
          hash: new Hash256(entityHashBuf).toString(),
          generationHash: new Hash256(generationHashBuf).toString(),
        },
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private sendFinalizedBlock = (receiveDatas: Buffer[]) => {
    const finalizedBlockHeaderBuf = receiveDatas[1]
    const data = models.FinalizedBlockHeader.deserialize(
      finalizedBlockHeaderBuf
    ) as models.FinalizedBlockHeader
    const zqData = {
      topic: 'finalizedBlock',
      data: data.toJson(),
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private sendConfirmedAdded = (receiveDatas: Buffer[]) => {
    const txBuf = receiveDatas[1]
    const hashBuf = receiveDatas[2]
    const merkleComponentHashBuf = receiveDatas[3]
    const heightBuf = receiveDatas[4]
    const data = models.TransactionFactory.deserialize(
      txBuf
    ) as models.Transaction
    const wsData = {
      topic: 'confirmedAdded',
      data: {
        transaction: data.toJson(),
        meta: {
          hash: new Hash256(hashBuf).toString(),
          merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
          height: models.Height.deserialize(heightBuf).value.toString(),
        },
      },
    }
    console.log(wsData)
  }

  private sendUnconfirmedAdded = (receiveDatas: Buffer[]) => {
    const txBuf = receiveDatas[1]
    const hashBuf = receiveDatas[2]
    const merkleComponentHashBuf = receiveDatas[3]
    const heightBuf = receiveDatas[4]
    const data = models.TransactionFactory.deserialize(
      txBuf
    ) as models.Transaction
    const wsData = {
      topic: 'unconfirmedAdded',
      data: {
        transaction: data.toJson(),
        meta: {
          hash: new Hash256(hashBuf).toString(),
          merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
          height: models.Height.deserialize(heightBuf).value.toString(),
        },
      },
    }
    console.log(wsData)
  }

  private sendUnconfirmedRemoved = (receiveDatas: Buffer[]) => {
    const hashBuf = receiveDatas[1]
    const wsData = {
      topic: 'unconfirmedRemoved',
      data: {
        meta: {
          hash: new Hash256(hashBuf).toString(),
        },
      },
    }
    console.log(wsData)
  }

  private execSubscribe = (marker: Buffer) => this.sock.subscribe(marker)
}

実行

mainZqTran.ts
import { CatapultZeroMQ } from './CatapultZeroMQ.js'

const zeroMq = new CatapultZeroMQ()
zeroMq.subscribe('confirmedAdded')
zeroMq.subscribe('confirmedAdded/TBZN46UIU5BFLJI46VB4JTHHCE5EN2RFLR7NX3A')
zeroMq.subscribe('unconfirmedAdded')
zeroMq.subscribe('unconfirmedAdded/TBZN46UIU5BFLJI46VB4JTHHCE5EN2RFLR7NX3A')
zeroMq.subscribe('unconfirmedRemoved')
zeroMq.subscribe('unconfirmedRemoved/TBZN46UIU5BFLJI46VB4JTHHCE5EN2RFLR7NX3A')
zeroMq.start()

実行後、トランザクションを投げると通知がコンソールに表示されます。
また、指定したアドレスからトランザクションを投げたり受け取ったりすると、フィルタした分の通知も表示されます。

{
  "topic": "unconfirmedAdded",
  "data": {
    "transaction": {
      "signature": "F59D5831A943F7FD38C53EB3B3F982E2AA727897761CCA4135B42B32BF090FAE94E688833F89CBF22810D5052AFD9A4AFF2581E387C95FE85876C7B883E7D70E",
      "signerPublicKey": "B863173F9E4924CA5EB63BFC13689E27F603C29AD8C82615A10531329BF7A94E",
      "version": 1,
      "network": 152,
      "type": 16724,
      "fee": "18656",
      "deadline": "66319948926",
      "recipientAddress": "9852701708153EB74C71713F0AC7C56F5DE5670E06175954",
      "mosaics": [
        {
          "mosaicId": "8268645399043017678",
          "amount": "1234560"
        }
      ],
      "message": ""
    },
    "meta": {
      "hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
      "merkleComponentHash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
      "height": "0"
    }
  }
}
{
  "topic": "unconfirmedAdded",
  "data": {
    "transaction": {
      "signature": "F59D5831A943F7FD38C53EB3B3F982E2AA727897761CCA4135B42B32BF090FAE94E688833F89CBF22810D5052AFD9A4AFF2581E387C95FE85876C7B883E7D70E",
      "signerPublicKey": "B863173F9E4924CA5EB63BFC13689E27F603C29AD8C82615A10531329BF7A94E",
      "version": 1,
      "network": 152,
      "type": 16724,
      "fee": "18656",
      "deadline": "66319948926",
      "recipientAddress": "9852701708153EB74C71713F0AC7C56F5DE5670E06175954",
      "mosaics": [
        {
          "mosaicId": "8268645399043017678",
          "amount": "1234560"
        }
      ],
      "message": ""
    },
    "meta": {
      "hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
      "merkleComponentHash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
      "height": "0"
    }
  }
}
{
  "topic": "unconfirmedRemoved",
  "data": {
    "meta": {
      "hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A"
    }
  }
}
{
  "topic": "unconfirmedRemoved",
  "data": {
    "meta": {
      "hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A"
    }
  }
}
{
  "topic": "confirmedAdded",
  "data": {
    "transaction": {
      "signature": "F59D5831A943F7FD38C53EB3B3F982E2AA727897761CCA4135B42B32BF090FAE94E688833F89CBF22810D5052AFD9A4AFF2581E387C95FE85876C7B883E7D70E",
      "signerPublicKey": "B863173F9E4924CA5EB63BFC13689E27F603C29AD8C82615A10531329BF7A94E",
      "version": 1,
      "network": 152,
      "type": 16724,
      "fee": "18656",
      "deadline": "66319948926",
      "recipientAddress": "9852701708153EB74C71713F0AC7C56F5DE5670E06175954",
      "mosaics": [
        {
          "mosaicId": "8268645399043017678",
          "amount": "1234560"
        }
      ],
      "message": ""
    },
    "meta": {
      "hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
      "merkleComponentHash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
      "height": "1947094"
    }
  }
}
{
  "topic": "confirmedAdded",
  "data": {
    "transaction": {
      "signature": "F59D5831A943F7FD38C53EB3B3F982E2AA727897761CCA4135B42B32BF090FAE94E688833F89CBF22810D5052AFD9A4AFF2581E387C95FE85876C7B883E7D70E",
      "signerPublicKey": "B863173F9E4924CA5EB63BFC13689E27F603C29AD8C82615A10531329BF7A94E",
      "version": 1,
      "network": 152,
      "type": 16724,
      "fee": "18656",
      "deadline": "66319948926",
      "recipientAddress": "9852701708153EB74C71713F0AC7C56F5DE5670E06175954",
      "mosaics": [
        {
          "mosaicId": "8268645399043017678",
          "amount": "1234560"
        }
      ],
      "message": ""
    },
    "meta": {
      "hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
      "merkleComponentHash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
      "height": "1947094"
    }
  }
}

バックナンバー

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?