パーシャルエクステンションを有効にする
Peer の設定のままだとパーシャルが受け取れないので、エクステンションを有効にします。
extension.partialtransaction
をtrue
にする。
resources/config-extensions-server.properties
[extensions]
# api extensions
extension.filespooling = true
extension.partialtransaction = true
extension.partialtransaction
を有効にするとpeers-api.json
が必要になります。このファイルの中身は全て dual ノードである必要があります。peers-p2p.json
が全て dual ノードなら、これのコピーで問題ないです。
cp resources/peers-p2p.json resources/peers-api.json
パーシャル通知
パーシャル通知は以下の通りです。
あと、アナウンス時にエラーがあった場合にステータスの通知もあります。
項目 | マーカー |
---|---|
パーシャル追加 | 70 |
パーシャル削除 | 71 |
署名 | 63 |
ステータス | 73 |
これも 1 バイトなので特にリトルエンディアンを意識する必要はあく、トランザクション通知と同じようにアドレスでフィルタすることが出来ます。
private partialAddedMarker = Buffer.from(utils.hexToUint8('70'))
private partialRemovedMarker = Buffer.from(utils.hexToUint8('71'))
private cosignatureMarker = Buffer.from(utils.hexToUint8('63'))
private statusMarker = Buffer.from(utils.hexToUint8('73'))
全文
CatapultZeroMQ.ts
import { Hash256, utils } from 'symbol-sdk'
import { Address, models } from 'symbol-sdk/symbol'
import { Subscriber } from 'zeromq'
export class CatapultZeroMQ {
private sock: Subscriber
private tcpAddress: string
private blockMarker = Buffer.from(
utils.hexToUint8('9FF2D8E480CA6A49').reverse()
)
private finalizedBlockMarker = Buffer.from(
utils.hexToUint8('4D4832A031CE7954').reverse()
)
private confirmedAddedMarker = Buffer.from(utils.hexToUint8('61'))
private unconfirmedAddedMarker = Buffer.from(utils.hexToUint8('75'))
private unconfirmedRemovedMarker = Buffer.from(utils.hexToUint8('72'))
private partialAddedMarker = Buffer.from(utils.hexToUint8('70'))
private partialRemovedMarker = Buffer.from(utils.hexToUint8('71'))
private cosignatureMarker = Buffer.from(utils.hexToUint8('63'))
private statusMarker = Buffer.from(utils.hexToUint8('73'))
constructor() {
this.sock = new Subscriber({
connectTimeout: 3000,
tcpKeepaliveInterval: 500,
tcpKeepaliveCount: 500,
})
const host = '127.0.0.1'
const port = 7902
this.tcpAddress = `tcp://${host}:${port}`
this.sock.connect(this.tcpAddress)
console.info(`Connecting to ${this.tcpAddress}`)
}
start = async (): Promise<void> => {
while (true) this.analyzeZeroMq(await this.sock.receive())
}
close = (): void => this.sock.disconnect(this.tcpAddress)
subscribe = (topic: string): void => {
const topicNames = topic.split('/')
const tpc = topicNames[0]
let addr = new Uint8Array()
if (topicNames.length >= 2) addr = new Address(topicNames[1]).bytes
if (tpc === 'block') {
this.execSubscribe(this.blockMarker)
} else if (tpc === 'finalizedBlock') {
this.execSubscribe(this.finalizedBlockMarker)
} else if (tpc === 'confirmedAdded') {
this.execSubscribe(Buffer.concat([this.confirmedAddedMarker, addr]))
} else if (tpc === 'unconfirmedAdded') {
this.execSubscribe(Buffer.concat([this.unconfirmedAddedMarker, addr]))
} else if (tpc === 'unconfirmedRemoved') {
this.execSubscribe(Buffer.concat([this.unconfirmedRemovedMarker, addr]))
} else if (tpc === 'partialAdded') {
this.execSubscribe(Buffer.concat([this.partialAddedMarker, addr]))
} else if (tpc === 'partialRemoved') {
this.execSubscribe(Buffer.concat([this.partialRemovedMarker, addr]))
} else if (tpc === 'cosignature') {
this.execSubscribe(Buffer.concat([this.cosignatureMarker, addr]))
} else if (tpc === 'status') {
this.execSubscribe(Buffer.concat([this.statusMarker, addr]))
} else {
throw Error('Unknown topic.')
}
}
private analyzeZeroMq = (receiveDatas: Buffer[]) => {
const topic = receiveDatas[0]
if (topic.equals(this.blockMarker)) {
this.sendBlock(receiveDatas)
} else if (topic.equals(this.finalizedBlockMarker)) {
this.sendFinalizedBlock(receiveDatas)
} else if (topic.readInt8() === this.confirmedAddedMarker.readInt8()) {
this.sendConfirmedAdded(receiveDatas)
} else if (topic.readInt8() === this.unconfirmedAddedMarker.readInt8()) {
this.sendUnconfirmedAdded(receiveDatas)
} else if (topic.readInt8() === this.unconfirmedRemovedMarker.readInt8()) {
this.sendUnconfirmedRemoved(receiveDatas)
} else if (topic.readInt8() === this.partialAddedMarker.readInt8()) {
this.sendPartialAdded(receiveDatas)
} else if (topic.readInt8() === this.partialRemovedMarker.readInt8()) {
this.sendPartialRemoved(receiveDatas)
} else if (topic.readInt8() === this.cosignatureMarker.readInt8()) {
this.sendCosignature(receiveDatas)
} else if (topic.readInt8() === this.statusMarker.readInt8()) {
this.sendStatus(receiveDatas)
}
}
private sendBlock = (receiveDatas: Buffer[]) => {
const blockHeaderBuf = receiveDatas[1]
const entityHashBuf = receiveDatas[2]
const generationHashBuf = receiveDatas[3]
blockHeaderBuf.writeInt32LE(blockHeaderBuf.byteLength) // 先頭にあるサイズを実サイズに変更する
const data = models.BlockFactory.deserialize(blockHeaderBuf) as models.Block
const zqData = {
topic: 'block',
data: {
block: data.toJson(),
meta: {
hash: new Hash256(entityHashBuf).toString(),
generationHash: new Hash256(generationHashBuf).toString(),
},
},
}
console.log(JSON.stringify(zqData, null, 2))
}
private sendFinalizedBlock = (receiveDatas: Buffer[]) => {
const finalizedBlockHeaderBuf = receiveDatas[1]
const data = models.FinalizedBlockHeader.deserialize(
finalizedBlockHeaderBuf
) as models.FinalizedBlockHeader
const zqData = {
topic: 'finalizedBlock',
data: data.toJson(),
}
console.log(JSON.stringify(zqData, null, 2))
}
private sendConfirmedAdded = (receiveDatas: Buffer[]) => {
const txBuf = receiveDatas[1]
const hashBuf = receiveDatas[2]
const merkleComponentHashBuf = receiveDatas[3]
const heightBuf = receiveDatas[4]
const data = models.TransactionFactory.deserialize(
txBuf
) as models.Transaction
const zqData = {
topic: 'confirmedAdded',
data: {
transaction: data.toJson(),
meta: {
hash: new Hash256(hashBuf).toString(),
merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
height: models.Height.deserialize(heightBuf).value.toString(),
},
},
}
console.log(JSON.stringify(zqData, null, 2))
}
private sendUnconfirmedAdded = (receiveDatas: Buffer[]) => {
const txBuf = receiveDatas[1]
const hashBuf = receiveDatas[2]
const merkleComponentHashBuf = receiveDatas[3]
const heightBuf = receiveDatas[4]
const data = models.TransactionFactory.deserialize(
txBuf
) as models.Transaction
const zqData = {
topic: 'unconfirmedAdded',
data: {
transaction: data.toJson(),
meta: {
hash: new Hash256(hashBuf).toString(),
merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
height: models.Height.deserialize(heightBuf).value.toString(),
},
},
}
console.log(JSON.stringify(zqData, null, 2))
}
private sendUnconfirmedRemoved = (receiveDatas: Buffer[]) => {
const hashBuf = receiveDatas[1]
const zqData = {
topic: 'unconfirmedRemoved',
data: {
meta: {
hash: new Hash256(hashBuf).toString(),
},
},
}
console.log(JSON.stringify(zqData, null, 2))
}
/**
* WebSocket送信(パーシャル追加)
* @param receiveDatas ZeroMQ受信データ
*/
private sendPartialAdded = (receiveDatas: Buffer[]) => {
const txBuf = receiveDatas[1]
const hashBuf = receiveDatas[2]
const merkleComponentHashBuf = receiveDatas[3]
const heightBuf = receiveDatas[4]
const data = models.TransactionFactory.deserialize(
txBuf
) as models.Transaction
const zqData = {
topic: 'partialAdded',
data: {
transaction: data.toJson(),
meta: {
hash: new Hash256(hashBuf).toString(),
merkleComponentHash: new Hash256(merkleComponentHashBuf).toString(),
height: models.Height.deserialize(heightBuf).value.toString(),
},
},
}
console.log(JSON.stringify(zqData, null, 2))
}
/**
* WebSocket送信(パーシャル削除)
* @param receiveDatas ZeroMQ受信データ
*/
private sendPartialRemoved = (receiveDatas: Buffer[]) => {
const hashBuf = receiveDatas[1]
const zqData = {
topic: 'partialRemoved',
data: {
meta: {
hash: new Hash256(hashBuf).toString(),
},
},
}
console.log(JSON.stringify(zqData, null, 2))
}
/**
* WebSocket送信(署名)
* @param receiveDatas ZeroMQ受信データ
*/
private sendCosignature = (receiveDatas: Buffer[]) => {
const cosignatureBuf = receiveDatas[1]
const data = models.Cosignature.deserialize(
cosignatureBuf
) as models.Cosignature
const zqData = {
topic: 'cosignature',
data: data.toJson(),
}
console.log(JSON.stringify(zqData, null, 2))
}
/**
* WebSocket送信(ステータス)
* @param receiveDatas ZeroMQ受信データ
*/
private sendStatus = (receiveDatas: Buffer[]) => {
const hashBuf = receiveDatas[1].subarray(0, 32)
const deadlineBuf = receiveDatas[1].subarray(32, 40)
const codeBuf = receiveDatas[1].subarray(40, 44)
const zqData = {
topic: 'status',
data: {
hash: new Hash256(hashBuf).toString(),
code: codeBuf.readUint32LE(),
deadline: deadlineBuf.readBigUInt64LE().toString(),
},
}
console.log(JSON.stringify(zqData, null, 2))
}
private execSubscribe = (marker: Buffer) => this.sock.subscribe(marker)
}
実行
mainZqPartial.ts
import { CatapultZeroMQ } from './CatapultZeroMQ.js'
const zeroMq = new CatapultZeroMQ()
zeroMq.subscribe('partialAdded')
zeroMq.subscribe('partialAdded/TD4ITRLU6Q4YESG4ZSJD4YU6QL5T4VRWMHW4XIQ')
zeroMq.subscribe('partialRemoved')
zeroMq.subscribe('partialRemoved/TD4ITRLU6Q4YESG4ZSJD4YU6QL5T4VRWMHW4XIQ')
zeroMq.subscribe('cosignature')
zeroMq.subscribe('cosignature/TD4ITRLU6Q4YESG4ZSJD4YU6QL5T4VRWMHW4XIQ')
zeroMq.subscribe('status')
zeroMq.subscribe('status/TD4ITRLU6Q4YESG4ZSJD4YU6QL5T4VRWMHW4XIQ')
zeroMq.start()
実行後、パーシャルを投げると(マルチシグアカウントから投げると楽です)通知がコンソールに表示されます。
また、指定したアドレスからパーシャルを投げたり受け取ったりすると、フィルタした分の通知も表示されます。
バックナンバー