Symbol ノード間の通信
Symbol ノード間の通信は 7900 ポートで SSL ソケット通信をしています。3000 ポートの Rest ゲートウェイで通信はしていません。Rest ゲートウェイとそれに付随する WebSocket は外部アプリケーション用です。
なお余談ですが、7900 ポートは開けておいた方が無難です。数秒程度同期遅れや若干ですがフォーク耐性が下がります。Dual ノードで公開している場合、7900 が開いていないとブロックの書き込み完了まで待たなくなるのか、パーシャルの受け取りを拒否することがあります。
SSL ソケット通信
コードは以下です。変更することはないのでabstract
にして継承して使用します。
import { X509Certificate } from 'node:crypto'
import { readFileSync } from 'node:fs'
import tls, { ConnectionOptions } from 'node:tls'
/**
* SSLソケット
*/
export abstract class SslSocket {
/** コネクションオプション */
private connectionOptions: ConnectionOptions
/** X509証明書 */
protected x509Certificate: X509Certificate | undefined
/** ソケット */
private tlsSocket: tls.TLSSocket | undefined
/** Symbolヘッダーサイズ */
private readonly SYMBOL_HEADER_SIZE = 8
/**
* コンストラクタ
* @param caCertPath CA証明書パス
* @param nodeCertPath ノード証明書パス
* @param nodeKeyPath ノードキーパス
* @param host ホスト
* @param port ポート(デフォルト: 7900)
* @param timeout タイムアウト(デフォルト: 3000)
*/
constructor(
caCertPath: string,
nodeCertPath: string,
nodeKeyPath: string,
host: string,
port: number = 7900,
timeout: number = 3000
) {
this.connectionOptions = {
host,
port,
timeout,
cert: Buffer.concat([
readFileSync(nodeCertPath),
readFileSync(caCertPath),
]),
key: readFileSync(nodeKeyPath),
rejectUnauthorized: false,
}
}
/**
* ソケット接続
*/
protected async connect(): Promise<void> {
console.debug('socket connected.')
this.tlsSocket = await new Promise<tls.TLSSocket>((resolve) => {
const sock = tls.connect(this.connectionOptions, () => {
resolve(sock)
})
// SSL接続
sock.on(
'secureConnect',
() => (this.x509Certificate = sock.getPeerX509Certificate())
)
})
}
/**
* リクエスト
* @param packetType パケットタイプ
* @param payload ペイロード
* @param isResponse レスポンス有無
* @returns レスポンス
*/
protected async request(
packetType: number,
payload?: Uint8Array,
isResponse = true
): Promise<Uint8Array | undefined> {
// Symbolパケット生成
const payloadSize = payload ? payload.length : 0
const packetSize = this.SYMBOL_HEADER_SIZE + payloadSize
const symbolPacketBuffer = new ArrayBuffer(packetSize)
// Symbolヘッダー編集
const symbolHeader = new DataView(symbolPacketBuffer)
symbolHeader.setUint32(0, packetSize, true)
symbolHeader.setUint32(4, packetType, true)
// Symbolペイロード編集
if (payload) {
const symbolPayload = new Uint8Array(
symbolPacketBuffer,
this.SYMBOL_HEADER_SIZE,
payloadSize
)
symbolPayload.set(payload)
}
// 接続確認
if (!this.tlsSocket || this.tlsSocket.closed) await this.connect()
if (!this.tlsSocket) throw Error('Failed to connect socket.')
// Symbolパケット送信
this.tlsSocket.write(new Uint8Array(symbolPacketBuffer))
if (!isResponse) return // レスポンスなしの場合
return await this.receiver(this.tlsSocket, packetType) // レスポンスありの場合
}
/**
* 受信機
* @param packetType パケットタイプ
* @returns レスポンス
*/
private async receiver(
socket: tls.TLSSocket,
packetType: number
): Promise<Uint8Array | undefined> {
return new Promise<Uint8Array | undefined>((resolve, reject) => {
let responseSize = this.SYMBOL_HEADER_SIZE // ヘッダ分のサイズを前もって付与
let responseData: Uint8Array | undefined = undefined
// データ受信
socket.once('data', (data) => {
// レスポンスデータ(ヘッダ)取得
const nodeBufferView = Buffer.from(new Uint8Array(data).buffer)
// レスポンスサイズチェック
const responseDataSize = nodeBufferView.readUInt32LE(0)
if (responseDataSize === 0) {
socket.destroy()
reject('Received data is empty.')
}
// レスポンスパケットタイプチェック
const responsePacketType = nodeBufferView.readUInt32LE(4)
if (responsePacketType !== packetType) {
socket.destroy()
reject(
`Mismatch packet type: expect: ${packetType} actual: ${responsePacketType}`
)
}
// ヘッダが問題なければデータ部取得
socket.on('data', (data) => {
const tempResponseData = new Uint8Array(data)
responseSize += tempResponseData.length
if (!responseData) {
// 初回
responseData = tempResponseData
} else {
// 連結
const merged = new Uint8Array(
responseData.length + tempResponseData.length
)
merged.set(responseData)
merged.set(tempResponseData, responseData.length)
responseData = merged
}
if (responseDataSize <= responseSize) {
resolve(responseData)
}
})
})
// タイムアウト
socket.on('timeout', () => {
socket.destroy()
console.debug(`socket timeout: ${packetType}`)
reject('timeout')
})
// エラー
socket.on('error', (error) => {
socket.destroy()
console.debug(`socket error: ${packetType}`)
reject(error)
})
// 切断
socket.on('close', () => {
console.debug(`socket close: ${packetType}`)
})
})
}
/**
* ソケット切断
*/
protected close() {
if (this.tlsSocket && !this.tlsSocket.closed) {
this.tlsSocket.end(new Uint8Array())
}
}
}
接続
this.connectionOptions = {
host,
port,
timeout,
cert: Buffer.concat([readFileSync(nodeCertPath), readFileSync(caCertPath)]),
key: readFileSync(nodeKeyPath),
rejectUnauthorized: false,
};
this.tlsSocket = await new Promise<tls.TLSSocket>((resolve) => {
const sock = tls.connect(this.connectionOptions, () => {
resolve(sock);
});
});
ホストやポートを指定して接続します。SSL ソケット通信なので証明書を同時に付与して、接続できたらソケットが返ってきます。
自己証明書なのでrejectUnauthorized: false
を付けて検証しないようにします。
リクエスト
/**
* リクエスト
* @param packetType パケットタイプ
* @param payload ペイロード
* @param isResponse レスポンス有無
* @returns レスポンス
*/
protected async request(
packetType: number,
payload?: Uint8Array,
isResponse = true
): Promise<Uint8Array | undefined> {
// Symbolパケット生成
const payloadSize = payload ? payload.length : 0
const packetSize = this.SYMBOL_HEADER_SIZE + payloadSize
const symbolPacketBuffer = new ArrayBuffer(packetSize)
// Symbolヘッダー編集
const symbolHeader = new DataView(symbolPacketBuffer)
symbolHeader.setUint32(0, packetSize, true)
symbolHeader.setUint32(4, packetType, true)
// Symbolペイロード編集
if (payload) {
const symbolPayload = new Uint8Array(
symbolPacketBuffer,
this.SYMBOL_HEADER_SIZE,
payloadSize
)
symbolPayload.set(payload)
}
// 接続確認
if (!this.tlsSocket || this.tlsSocket.closed) await this.connect()
if (!this.tlsSocket) throw Error('Failed to connect socket.')
// Symbolパケット送信
this.tlsSocket.write(new Uint8Array(symbolPacketBuffer))
if (!isResponse) return // レスポンスなしの場合
return await this.receiver(this.tlsSocket, packetType) // レスポンスありの場合
}
リクエストは 8 バイトの Symbol ヘッダーと場合に応じてペイロードを付与して送信し、レスポンスがあるパケットタイプの場合は、レスポンスを解析するメソッドを呼び出します。
Symbol ヘッダーは以下の構成です。
項目 | 長さ |
---|---|
size | 4 バイト |
paketType | 4 バイト |
size
は、Symbol ヘッダーのみの場合は 8。ペイロードがある場合は、ペイロード長を加算します。
paketType
は、何を要求するかを示すモノで、client/catapult/src/catapult/ionet/PacketType.hに書かれています。
レスポンス
/**
* 受信機
* @param packetType パケットタイプ
* @returns レスポンス
*/
private async receiver(
socket: tls.TLSSocket,
packetType: number
): Promise<Uint8Array | undefined> {
return new Promise<Uint8Array | undefined>((resolve, reject) => {
let responseSize = this.SYMBOL_HEADER_SIZE // ヘッダ分のサイズを前もって付与
let responseData: Uint8Array | undefined = undefined
// SSL接続
socket.on(
'secureConnect',
() => (this.x509Certificate = socket.getPeerX509Certificate())
)
// データ受信
socket.once('data', (data) => {
// レスポンスデータ(ヘッダ)取得
const nodeBufferView = Buffer.from(new Uint8Array(data).buffer)
// レスポンスサイズチェック
const responseDataSize = nodeBufferView.readUInt32LE(0)
if (responseDataSize === 0) {
socket.destroy()
reject('Received data is empty.')
}
// レスポンスパケットタイプチェック
const responsePacketType = nodeBufferView.readUInt32LE(4)
if (responsePacketType !== packetType) {
socket.destroy()
reject(
`Mismatch packet type: expect: ${packetType} actual: ${responsePacketType}`
)
}
// ヘッダが問題なければデータ部取得
socket.on('data', (data) => {
const tempResponseData = new Uint8Array(data)
responseSize += tempResponseData.length
if (!responseData) {
// 初回
responseData = tempResponseData
} else {
// 連結
const merged = new Uint8Array(
responseData.length + tempResponseData.length
)
merged.set(responseData)
merged.set(tempResponseData, responseData.length)
responseData = merged
}
if (responseDataSize <= responseSize) {
resolve(responseData)
}
})
})
// タイムアウト
socket.on('timeout', () => {
socket.destroy()
console.debug(`socket timeout: ${packetType}`)
reject('timeout')
})
// エラー
socket.on('error', (error) => {
socket.destroy()
console.debug(`socket error: ${packetType}`)
reject(error)
})
// 切断
socket.on('close', () => {
console.debug(`socket close: ${packetType}`)
})
})
}
リクエストしたパケットタイプのデータが返ってきたかを判定し、リクエストしたデータであれば残りのデータを取得します。
基底クラスなので個別の処理は行わずに受信データをそのまま返すようにしています。
切断
/**
* ソケット切断
*/
protected close() {
if (this.tlsSocket && !this.tlsSocket.closed) {
this.tlsSocket.end(new Uint8Array())
}
}
空データを送信して送信終了したことをノードに知らせます。これでノード側から切断されます。
ノード側のタイミングで切断されるため、接続と切断を短い間隔で行うことは出来ません。
パケットの操作
パケットの操作を少し簡単にできるクラスを作成します。
export class PacketBuffer {
private _length: number
private _index: number
constructor(private readonly _buffer: Buffer) {
this._length = _buffer.length
this._index = 0
}
addOffset(addOffset: number): number {
this._index += addOffset
return this._index
}
readUInt8(addOffset: number = 0): number {
this.addOffset(addOffset)
const readData = this._buffer.readUInt8(this._index)
this._index += 1
return readData
}
readUInt16LE(addOffset: number = 0): number {
this.addOffset(addOffset)
const readData = this._buffer.readUInt16LE(this._index)
this._index += 2
return readData
}
readUInt32LE(addOffset: number = 0): number {
this.addOffset(addOffset)
const readData = this._buffer.readUInt32LE(this._index)
this._index += 4
return readData
}
readBigUInt64LE(addOffset: number = 0): bigint {
this.addOffset(addOffset)
const readData = this._buffer.readBigUInt64LE(this._index)
this._index += 8
return readData
}
readString(length: number, addOffset: number = 0): string {
this.addOffset(addOffset)
const readData = this._buffer.toString('utf8', this._index, this._index + length)
this._index += length
return readData
}
readHexString(length: number, addOffset: number = 0): string {
this.addOffset(addOffset)
const readData = this._buffer.toString('hex', this._index, this._index + length)
this._index += length
return readData
}
get index(): number {
return this._index
}
get length(): number {
return this._length
}
ノード時間の取得
では、実際に SSL ソケット通信を使ってノードからデータを取得してみましょう。
Rest で言うところの/node/time
です。
ノードへリクエスト
パケットタイプは0x120
で、ペイロードはありません。
import { NodeTime } from './models/NodeTime.js'
import { SslSocket } from './SslSocket.js'
export class Catapult extends SslSocket {
/** パケットタイプ */
private PacketType = {
TIME_SYNC_NETWORK_TIME: 0x1_20,
}
/**
* /node/time 同等の値を持つクラスを取得
* @returns NodeTime
*/
async getNodeTime() {
console.info('NodeTime')
let nodeTime: NodeTime | undefined
try {
// ピア問合せ
const socketData = await this.request(
this.PacketType.TIME_SYNC_NETWORK_TIME
)
// デシリアライズ
if (socketData) nodeTime = NodeTime.deserialize(socketData)
} catch (e) {
if (e instanceof Error) console.error(e.message)
else console.error(e)
} finally {
this.close()
}
return nodeTime
}
}
レスポンスデータの解析
シリアライズされたデータが返ってくるので、Rest で見慣れた形に直します。
項目 | 長さ |
---|---|
sendTimestamp | 8 バイト |
receiveTimestamp | 8 バイト |
export class NodeTime {
constructor(public sendTimestamp: string, public receiveTimestamp: string) {}
static deserialize(payload: Uint8Array) {
const nodeBufferView = Buffer.from(payload);
const sendTimestamp = nodeBufferView.readBigUInt64LE(0).toString();
const receiveTimestamp = nodeBufferView.readBigUInt64LE(8).toString();
return new NodeTime(sendTimestamp, receiveTimestamp);
}
toJson() {
return {
communicationTimestamps: {
sendTimestamp: this.sendTimestamp,
receiveTimestamp: this.receiveTimestamp,
},
};
}
}
メインの作成
実行するコードを作成します。
Catapult
クラスの第 4 引数は公開されたノードであればどこでも良いです。メインネット、テストネットも問いません。証明書は共用出来ます。
import { Catapult } from './Catapult.js'
const catapult = new Catapult(
'cert/ca.crt.pem',
'cert/node.crt.pem',
'cert/node.key.pem',
'localhost'
)
const nodeTime = await catapult.getNodeTime()
if (nodeTime) console.log(nodeTime)
実行
yarn tsx .\src\mainNodeTime.ts
実行すると以下の様にノード時間が出力されます。
NodeTime
socket connected.
NodeTime {
sendTimestamp: '115990352077',
receiveTimestamp: '115990352077'
}
socket close: 288
バックナンバー