トランザクション通知
トランザクション通知は以下の通りです。
項目 | マーカー |
---|---|
承認追加 | 61 |
未承認追加 | 75 |
未承認削除 | 73 |
1 バイトなので特にリトルエンディアンを意識する必要はありません。
private confirmedAddedMarker = Buffer.from(utils.hexToUint8('61'))
private unconfirmedAddedMarker = Buffer.from(utils.hexToUint8('75'))
private unconfirmedRemovedMarker = Buffer.from(utils.hexToUint8('72'))
また、マーカーにアドレスを追加することで、アドレスでフィルタすることが出来ます。
以下は、WebSocket と同じように/
で区切ってアドレスを受け取るようにしてます。
subscribe = (topic: string): void => {
const topicNames = topic.split('/')
const tpc = topicNames[0]
let addr = new Uint8Array()
if (topicNames.length >= 2) addr = new Address(topicNames[1]).bytes
全文
CatapultZeroMQ.ts
import { Hash256, utils } from 'symbol-sdk'
import { Address, models } from 'symbol-sdk/symbol'
import { Subscriber } from 'zeromq'
export class CatapultZeroMQ {
private sock: Subscriber
private tcpAddress: string
private blockMarker = Buffer.from(
utils.hexToUint8('9FF2D8E480CA6A49').reverse()
)
private finalizedBlockMarker = Buffer.from(
utils.hexToUint8('4D4832A031CE7954').reverse()
)
private confirmedAddedMarker = Buffer.from(utils.hexToUint8('61'))
private unconfirmedAddedMarker = Buffer.from(utils.hexToUint8('75'))
private unconfirmedRemovedMarker = Buffer.from(utils.hexToUint8('72'))
constructor() {
this.sock = new Subscriber({
connectTimeout: 3000,
tcpKeepaliveInterval: 500,
tcpKeepaliveCount: 500,
})
const host = '127.0.0.1'
const port = 7902
this.tcpAddress = `tcp://${host}:${port}`
this.sock.connect(this.tcpAddress)
console.info(`Connecting to ${this.tcpAddress}`)
}
start = async (): Promise<void> => {
while (true) this.analyzeZeroMq(await this.sock.receive())
}
close = (): void => this.sock.disconnect(this.tcpAddress)
subscribe = (topic: string): void => {
const topicNames = topic.split('/')
const tpc = topicNames[0]
let addr = new Uint8Array()
if (topicNames.length >= 2) addr = new Address(topicNames[1]).bytes
if (tpc === 'block') {
this.execSubscribe(this.blockMarker)
} else if (tpc === 'finalizedBlock') {
this.execSubscribe(this.finalizedBlockMarker)
} else if (tpc === 'confirmedAdded') {
this.execSubscribe(Buffer.concat([this.confirmedAddedMarker, addr]))
} else if (tpc === 'unconfirmedAdded') {
this.execSubscribe(Buffer.concat([this.unconfirmedAddedMarker, addr]))
} else if (tpc === 'unconfirmedRemoved') {
this.execSubscribe(Buffer.concat([this.unconfirmedRemovedMarker, addr]))
} else {
throw Error('Unknown topic.')
}
}
private analyzeZeroMq = (receiveDatas: Buffer[]) => {
const topic = receiveDatas[0]
if (topic.equals(this.blockMarker)) {
this.sendBlock(receiveDatas)
} else if (topic.equals(this.finalizedBlockMarker)) {
this.sendFinalizedBlock(receiveDatas)
} else if (topic.readInt8() === this.confirmedAddedMarker.readInt8()) {
this.sendConfirmedAdded(receiveDatas)
} else if (topic.readInt8() === this.unconfirmedAddedMarker.readInt8()) {
this.sendUnconfirmedAdded(receiveDatas)
} else if (topic.readInt8() === this.unconfirmedRemovedMarker.readInt8()) {
this.sendUnconfirmedRemoved(receiveDatas)
}
}
private sendBlock = (receiveDatas: Buffer[]) => {
const blockHeaderBuf = receiveDatas[1]
const entityHashBuf = receiveDatas[2]
const generationHashBuf = receiveDatas[3]
blockHeaderBuf.writeInt32LE(blockHeaderBuf.byteLength) // 先頭にあるサイズを実サイズに変更する
const data = models.BlockFactory.deserialize(blockHeaderBuf) as models.Block
const zqData = {
topic: 'block',
data: {
block: data.toJson(),
meta: {
hash: new Hash256(entityHashBuf).toString(),
generationHash: new Hash256(generationHashBuf).toString(),
},
},
}
console.log(JSON.stringify(zqData, null, 2))
}
private sendFinalizedBlock = (receiveDatas: Buffer[]) => {
const finalizedBlockHeaderBuf = receiveDatas[1]
const data = models.FinalizedBlockHeader.deserialize(
finalizedBlockHeaderBuf
) as models.FinalizedBlockHeader
const zqData = {
topic: 'finalizedBlock',
data: data.toJson(),
}
console.log(JSON.stringify(zqData, null, 2))
}
private sendConfirmedAdded = (receiveDatas: Buffer[]) => {
const txBuf = receiveDatas[1]
const hashBuf = receiveDatas[2]
const merkleComponentHashBuf = receiveDatas[3]
const heightBuf = receiveDatas[4]
const data = models.TransactionFactory.deserialize(
txBuf
) as models.Transaction
const wsData = {
topic: 'confirmedAdded',
data: {
transaction: data.toJson(),
meta: {
hash: new Hash256(hashBuf).toString(),
merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
height: models.Height.deserialize(heightBuf).value.toString(),
},
},
}
console.log(wsData)
}
private sendUnconfirmedAdded = (receiveDatas: Buffer[]) => {
const txBuf = receiveDatas[1]
const hashBuf = receiveDatas[2]
const merkleComponentHashBuf = receiveDatas[3]
const heightBuf = receiveDatas[4]
const data = models.TransactionFactory.deserialize(
txBuf
) as models.Transaction
const wsData = {
topic: 'unconfirmedAdded',
data: {
transaction: data.toJson(),
meta: {
hash: new Hash256(hashBuf).toString(),
merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
height: models.Height.deserialize(heightBuf).value.toString(),
},
},
}
console.log(wsData)
}
private sendUnconfirmedRemoved = (receiveDatas: Buffer[]) => {
const hashBuf = receiveDatas[1]
const wsData = {
topic: 'unconfirmedRemoved',
data: {
meta: {
hash: new Hash256(hashBuf).toString(),
},
},
}
console.log(wsData)
}
private execSubscribe = (marker: Buffer) => this.sock.subscribe(marker)
}
実行
mainZqTran.ts
import { CatapultZeroMQ } from './CatapultZeroMQ.js'
const zeroMq = new CatapultZeroMQ()
zeroMq.subscribe('confirmedAdded')
zeroMq.subscribe('confirmedAdded/TBZN46UIU5BFLJI46VB4JTHHCE5EN2RFLR7NX3A')
zeroMq.subscribe('unconfirmedAdded')
zeroMq.subscribe('unconfirmedAdded/TBZN46UIU5BFLJI46VB4JTHHCE5EN2RFLR7NX3A')
zeroMq.subscribe('unconfirmedRemoved')
zeroMq.subscribe('unconfirmedRemoved/TBZN46UIU5BFLJI46VB4JTHHCE5EN2RFLR7NX3A')
zeroMq.start()
実行後、トランザクションを投げると通知がコンソールに表示されます。
また、指定したアドレスからトランザクションを投げたり受け取ったりすると、フィルタした分の通知も表示されます。
{
"topic": "unconfirmedAdded",
"data": {
"transaction": {
"signature": "F59D5831A943F7FD38C53EB3B3F982E2AA727897761CCA4135B42B32BF090FAE94E688833F89CBF22810D5052AFD9A4AFF2581E387C95FE85876C7B883E7D70E",
"signerPublicKey": "B863173F9E4924CA5EB63BFC13689E27F603C29AD8C82615A10531329BF7A94E",
"version": 1,
"network": 152,
"type": 16724,
"fee": "18656",
"deadline": "66319948926",
"recipientAddress": "9852701708153EB74C71713F0AC7C56F5DE5670E06175954",
"mosaics": [
{
"mosaicId": "8268645399043017678",
"amount": "1234560"
}
],
"message": ""
},
"meta": {
"hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
"merkleComponentHash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
"height": "0"
}
}
}
{
"topic": "unconfirmedAdded",
"data": {
"transaction": {
"signature": "F59D5831A943F7FD38C53EB3B3F982E2AA727897761CCA4135B42B32BF090FAE94E688833F89CBF22810D5052AFD9A4AFF2581E387C95FE85876C7B883E7D70E",
"signerPublicKey": "B863173F9E4924CA5EB63BFC13689E27F603C29AD8C82615A10531329BF7A94E",
"version": 1,
"network": 152,
"type": 16724,
"fee": "18656",
"deadline": "66319948926",
"recipientAddress": "9852701708153EB74C71713F0AC7C56F5DE5670E06175954",
"mosaics": [
{
"mosaicId": "8268645399043017678",
"amount": "1234560"
}
],
"message": ""
},
"meta": {
"hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
"merkleComponentHash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
"height": "0"
}
}
}
{
"topic": "unconfirmedRemoved",
"data": {
"meta": {
"hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A"
}
}
}
{
"topic": "unconfirmedRemoved",
"data": {
"meta": {
"hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A"
}
}
}
{
"topic": "confirmedAdded",
"data": {
"transaction": {
"signature": "F59D5831A943F7FD38C53EB3B3F982E2AA727897761CCA4135B42B32BF090FAE94E688833F89CBF22810D5052AFD9A4AFF2581E387C95FE85876C7B883E7D70E",
"signerPublicKey": "B863173F9E4924CA5EB63BFC13689E27F603C29AD8C82615A10531329BF7A94E",
"version": 1,
"network": 152,
"type": 16724,
"fee": "18656",
"deadline": "66319948926",
"recipientAddress": "9852701708153EB74C71713F0AC7C56F5DE5670E06175954",
"mosaics": [
{
"mosaicId": "8268645399043017678",
"amount": "1234560"
}
],
"message": ""
},
"meta": {
"hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
"merkleComponentHash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
"height": "1947094"
}
}
}
{
"topic": "confirmedAdded",
"data": {
"transaction": {
"signature": "F59D5831A943F7FD38C53EB3B3F982E2AA727897761CCA4135B42B32BF090FAE94E688833F89CBF22810D5052AFD9A4AFF2581E387C95FE85876C7B883E7D70E",
"signerPublicKey": "B863173F9E4924CA5EB63BFC13689E27F603C29AD8C82615A10531329BF7A94E",
"version": 1,
"network": 152,
"type": 16724,
"fee": "18656",
"deadline": "66319948926",
"recipientAddress": "9852701708153EB74C71713F0AC7C56F5DE5670E06175954",
"mosaics": [
{
"mosaicId": "8268645399043017678",
"amount": "1234560"
}
],
"message": ""
},
"meta": {
"hash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
"merkleComponentHash": "4B735E641662A00124C3068E177B41CEA23793783FC2A5160EE7F87F99ED157A",
"height": "1947094"
}
}
}
バックナンバー