5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

nem / symbolAdvent Calendar 2024

Day 19

Windows と TypeScript ではじめる Symbol 通信 09【ZeroMQ3】

Last updated at Posted at 2024-12-18

パーシャルエクステンションを有効にする

Peer の設定のままだとパーシャルが受け取れないので、エクステンションを有効にします。

extension.partialtransactiontrueにする。

resources/config-extensions-server.properties
[extensions]

# api extensions
extension.filespooling = true
extension.partialtransaction = true

extension.partialtransactionを有効にするとpeers-api.jsonが必要になります。このファイルの中身は全て dual ノードである必要があります。peers-p2p.jsonが全て dual ノードなら、これのコピーで問題ないです。

cp resources/peers-p2p.json resources/peers-api.json

パーシャル通知

パーシャル通知は以下の通りです。
あと、アナウンス時にエラーがあった場合にステータスの通知もあります。

項目 マーカー
パーシャル追加 70
パーシャル削除 71
署名 63
ステータス 73

これも 1 バイトなので特にリトルエンディアンを意識する必要はあく、トランザクション通知と同じようにアドレスでフィルタすることが出来ます。

private partialAddedMarker = Buffer.from(utils.hexToUint8('70'))
private partialRemovedMarker = Buffer.from(utils.hexToUint8('71'))
private cosignatureMarker = Buffer.from(utils.hexToUint8('63'))
private statusMarker = Buffer.from(utils.hexToUint8('73'))

全文

CatapultZeroMQ.ts
import { Hash256, utils } from 'symbol-sdk'
import { Address, models } from 'symbol-sdk/symbol'
import { Subscriber } from 'zeromq'

export class CatapultZeroMQ {
  private sock: Subscriber
  private tcpAddress: string

  private blockMarker = Buffer.from(
    utils.hexToUint8('9FF2D8E480CA6A49').reverse()
  )
  private finalizedBlockMarker = Buffer.from(
    utils.hexToUint8('4D4832A031CE7954').reverse()
  )
  private confirmedAddedMarker = Buffer.from(utils.hexToUint8('61'))
  private unconfirmedAddedMarker = Buffer.from(utils.hexToUint8('75'))
  private unconfirmedRemovedMarker = Buffer.from(utils.hexToUint8('72'))
  private partialAddedMarker = Buffer.from(utils.hexToUint8('70'))
  private partialRemovedMarker = Buffer.from(utils.hexToUint8('71'))
  private cosignatureMarker = Buffer.from(utils.hexToUint8('63'))
  private statusMarker = Buffer.from(utils.hexToUint8('73'))

  constructor() {
    this.sock = new Subscriber({
      connectTimeout: 3000,
      tcpKeepaliveInterval: 500,
      tcpKeepaliveCount: 500,
    })
    const host = '127.0.0.1'
    const port = 7902
    this.tcpAddress = `tcp://${host}:${port}`
    this.sock.connect(this.tcpAddress)
    console.info(`Connecting to ${this.tcpAddress}`)
  }

  start = async (): Promise<void> => {
    while (true) this.analyzeZeroMq(await this.sock.receive())
  }

  close = (): void => this.sock.disconnect(this.tcpAddress)

  subscribe = (topic: string): void => {
    const topicNames = topic.split('/')
    const tpc = topicNames[0]
    let addr = new Uint8Array()
    if (topicNames.length >= 2) addr = new Address(topicNames[1]).bytes

    if (tpc === 'block') {
      this.execSubscribe(this.blockMarker)
    } else if (tpc === 'finalizedBlock') {
      this.execSubscribe(this.finalizedBlockMarker)
    } else if (tpc === 'confirmedAdded') {
      this.execSubscribe(Buffer.concat([this.confirmedAddedMarker, addr]))
    } else if (tpc === 'unconfirmedAdded') {
      this.execSubscribe(Buffer.concat([this.unconfirmedAddedMarker, addr]))
    } else if (tpc === 'unconfirmedRemoved') {
      this.execSubscribe(Buffer.concat([this.unconfirmedRemovedMarker, addr]))
    } else if (tpc === 'partialAdded') {
      this.execSubscribe(Buffer.concat([this.partialAddedMarker, addr]))
    } else if (tpc === 'partialRemoved') {
      this.execSubscribe(Buffer.concat([this.partialRemovedMarker, addr]))
    } else if (tpc === 'cosignature') {
      this.execSubscribe(Buffer.concat([this.cosignatureMarker, addr]))
    } else if (tpc === 'status') {
      this.execSubscribe(Buffer.concat([this.statusMarker, addr]))
    } else {
      throw Error('Unknown topic.')
    }
  }

  private analyzeZeroMq = (receiveDatas: Buffer[]) => {
    const topic = receiveDatas[0]
    if (topic.equals(this.blockMarker)) {
      this.sendBlock(receiveDatas)
    } else if (topic.equals(this.finalizedBlockMarker)) {
      this.sendFinalizedBlock(receiveDatas)
    } else if (topic.readInt8() === this.confirmedAddedMarker.readInt8()) {
      this.sendConfirmedAdded(receiveDatas)
    } else if (topic.readInt8() === this.unconfirmedAddedMarker.readInt8()) {
      this.sendUnconfirmedAdded(receiveDatas)
    } else if (topic.readInt8() === this.unconfirmedRemovedMarker.readInt8()) {
      this.sendUnconfirmedRemoved(receiveDatas)
    } else if (topic.readInt8() === this.partialAddedMarker.readInt8()) {
      this.sendPartialAdded(receiveDatas)
    } else if (topic.readInt8() === this.partialRemovedMarker.readInt8()) {
      this.sendPartialRemoved(receiveDatas)
    } else if (topic.readInt8() === this.cosignatureMarker.readInt8()) {
      this.sendCosignature(receiveDatas)
    } else if (topic.readInt8() === this.statusMarker.readInt8()) {
      this.sendStatus(receiveDatas)
    }
  }

  private sendBlock = (receiveDatas: Buffer[]) => {
    const blockHeaderBuf = receiveDatas[1]
    const entityHashBuf = receiveDatas[2]
    const generationHashBuf = receiveDatas[3]
    blockHeaderBuf.writeInt32LE(blockHeaderBuf.byteLength) // 先頭にあるサイズを実サイズに変更する
    const data = models.BlockFactory.deserialize(blockHeaderBuf) as models.Block
    const zqData = {
      topic: 'block',
      data: {
        block: data.toJson(),
        meta: {
          hash: new Hash256(entityHashBuf).toString(),
          generationHash: new Hash256(generationHashBuf).toString(),
        },
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private sendFinalizedBlock = (receiveDatas: Buffer[]) => {
    const finalizedBlockHeaderBuf = receiveDatas[1]
    const data = models.FinalizedBlockHeader.deserialize(
      finalizedBlockHeaderBuf
    ) as models.FinalizedBlockHeader
    const zqData = {
      topic: 'finalizedBlock',
      data: data.toJson(),
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private sendConfirmedAdded = (receiveDatas: Buffer[]) => {
    const txBuf = receiveDatas[1]
    const hashBuf = receiveDatas[2]
    const merkleComponentHashBuf = receiveDatas[3]
    const heightBuf = receiveDatas[4]
    const data = models.TransactionFactory.deserialize(
      txBuf
    ) as models.Transaction
    const zqData = {
      topic: 'confirmedAdded',
      data: {
        transaction: data.toJson(),
        meta: {
          hash: new Hash256(hashBuf).toString(),
          merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
          height: models.Height.deserialize(heightBuf).value.toString(),
        },
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private sendUnconfirmedAdded = (receiveDatas: Buffer[]) => {
    const txBuf = receiveDatas[1]
    const hashBuf = receiveDatas[2]
    const merkleComponentHashBuf = receiveDatas[3]
    const heightBuf = receiveDatas[4]
    const data = models.TransactionFactory.deserialize(
      txBuf
    ) as models.Transaction
    const zqData = {
      topic: 'unconfirmedAdded',
      data: {
        transaction: data.toJson(),
        meta: {
          hash: new Hash256(hashBuf).toString(),
          merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
          height: models.Height.deserialize(heightBuf).value.toString(),
        },
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private sendUnconfirmedRemoved = (receiveDatas: Buffer[]) => {
    const hashBuf = receiveDatas[1]
    const zqData = {
      topic: 'unconfirmedRemoved',
      data: {
        meta: {
          hash: new Hash256(hashBuf).toString(),
        },
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  /**
   * WebSocket送信(パーシャル追加)
   * @param receiveDatas ZeroMQ受信データ
   */
  private sendPartialAdded = (receiveDatas: Buffer[]) => {
    const txBuf = receiveDatas[1]
    const hashBuf = receiveDatas[2]
    const merkleComponentHashBuf = receiveDatas[3]
    const heightBuf = receiveDatas[4]
    const data = models.TransactionFactory.deserialize(
      txBuf
    ) as models.Transaction
    const zqData = {
      topic: 'partialAdded',
      data: {
        transaction: data.toJson(),
        meta: {
          hash: new Hash256(hashBuf).toString(),
          merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
          height: models.Height.deserialize(heightBuf).value.toString(),
        },
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  /**
   * WebSocket送信(パーシャル削除)
   * @param receiveDatas ZeroMQ受信データ
   */
  private sendPartialRemoved = (receiveDatas: Buffer[]) => {
    const hashBuf = receiveDatas[1]
    const zqData = {
      topic: 'partialRemoved',
      data: {
        meta: {
          hash: new Hash256(hashBuf).toString(),
        },
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  /**
   * WebSocket送信(署名)
   * @param receiveDatas ZeroMQ受信データ
   */
  private sendCosignature = (receiveDatas: Buffer[]) => {
    const cosignatureBuf = receiveDatas[1]
    const data = models.Cosignature.deserialize(
      cosignatureBuf
    ) as models.Cosignature
    const zqData = {
      topic: 'cosignature',
      data: data.toJson(),
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  /**
   * WebSocket送信(ステータス)
   * @param receiveDatas ZeroMQ受信データ
   */
  private sendStatus = (receiveDatas: Buffer[]) => {
    const hashBuf = receiveDatas[1].subarray(0, 32)
    const deadlineBuf = receiveDatas[1].subarray(32, 40)
    const codeBuf = receiveDatas[1].subarray(40, 44)
    const zqData = {
      topic: 'status',
      data: {
        hash: new Hash256(hashBuf).toString(),
        code: codeBuf.readUint32LE(),
        deadline: deadlineBuf.readBigUInt64LE().toString(),
      },
    }
    console.log(JSON.stringify(zqData, null, 2))
  }

  private execSubscribe = (marker: Buffer) => this.sock.subscribe(marker)
}

実行

mainZqPartial.ts
import { CatapultZeroMQ } from './CatapultZeroMQ.js'

const zeroMq = new CatapultZeroMQ()
zeroMq.subscribe('partialAdded')
zeroMq.subscribe('partialAdded/TD4ITRLU6Q4YESG4ZSJD4YU6QL5T4VRWMHW4XIQ')
zeroMq.subscribe('partialRemoved')
zeroMq.subscribe('partialRemoved/TD4ITRLU6Q4YESG4ZSJD4YU6QL5T4VRWMHW4XIQ')
zeroMq.subscribe('cosignature')
zeroMq.subscribe('cosignature/TD4ITRLU6Q4YESG4ZSJD4YU6QL5T4VRWMHW4XIQ')
zeroMq.subscribe('status')
zeroMq.subscribe('status/TD4ITRLU6Q4YESG4ZSJD4YU6QL5T4VRWMHW4XIQ')
zeroMq.start()

実行後、パーシャルを投げると(マルチシグアカウントから投げると楽です)通知がコンソールに表示されます。
また、指定したアドレスからパーシャルを投げたり受け取ったりすると、フィルタした分の通知も表示されます。

バックナンバー

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?