5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Windows と TypeScript ではじめる Symbol 通信 02【ソケット通信】

Last updated at Posted at 2024-12-11

Symbol ノード間の通信

Symbol ノード間の通信は 7900 ポートで SSL ソケット通信をしています。3000 ポートの Rest ゲートウェイで通信はしていません。Rest ゲートウェイとそれに付随する WebSocket は外部アプリケーション用です。

なお余談ですが、7900 ポートは開けておいた方が無難です。数秒程度同期遅れや若干ですがフォーク耐性が下がります。Dual ノードで公開している場合、7900 が開いていないとブロックの書き込み完了まで待たなくなるのか、パーシャルの受け取りを拒否することがあります。

SSL ソケット通信

コードは以下です。変更することはないのでabstractにして継承して使用します。

src/SslSocket.ts
import { X509Certificate } from 'node:crypto'
import { readFileSync } from 'node:fs'
import tls, { ConnectionOptions } from 'node:tls'

/**
 * SSLソケット
 */
export abstract class SslSocket {
  /** コネクションオプション */
  private connectionOptions: ConnectionOptions

  /** X509証明書 */
  protected x509Certificate: X509Certificate | undefined

  /** ソケット */
  private tlsSocket: tls.TLSSocket | undefined

  /** Symbolヘッダーサイズ */
  private readonly SYMBOL_HEADER_SIZE = 8

  /**
   * コンストラクタ
   * @param caCertPath CA証明書パス
   * @param nodeCertPath ノード証明書パス
   * @param nodeKeyPath ノードキーパス
   * @param host ホスト
   * @param port ポート(デフォルト: 7900)
   * @param timeout タイムアウト(デフォルト: 3000)
   */
  constructor(
    caCertPath: string,
    nodeCertPath: string,
    nodeKeyPath: string,
    host: string,
    port: number = 7900,
    timeout: number = 3000
  ) {
    this.connectionOptions = {
      host,
      port,
      timeout,
      cert: Buffer.concat([
        readFileSync(nodeCertPath),
        readFileSync(caCertPath),
      ]),
      key: readFileSync(nodeKeyPath),
      rejectUnauthorized: false,
    }
  }

  /**
   * ソケット接続
   */
  protected async connect(): Promise<void> {
    console.debug('socket connected.')
    this.tlsSocket = await new Promise<tls.TLSSocket>((resolve) => {
      const sock = tls.connect(this.connectionOptions, () => {
        resolve(sock)
      })
      // SSL接続
      sock.on(
        'secureConnect',
        () => (this.x509Certificate = sock.getPeerX509Certificate())
      )
    })
  }

  /**
   * リクエスト
   * @param packetType パケットタイプ
   * @param payload ペイロード
   * @param isResponse レスポンス有無
   * @returns レスポンス
   */
  protected async request(
    packetType: number,
    payload?: Uint8Array,
    isResponse = true
  ): Promise<Uint8Array | undefined> {
    // Symbolパケット生成
    const payloadSize = payload ? payload.length : 0
    const packetSize = this.SYMBOL_HEADER_SIZE + payloadSize
    const symbolPacketBuffer = new ArrayBuffer(packetSize)
    // Symbolヘッダー編集
    const symbolHeader = new DataView(symbolPacketBuffer)
    symbolHeader.setUint32(0, packetSize, true)
    symbolHeader.setUint32(4, packetType, true)
    // Symbolペイロード編集
    if (payload) {
      const symbolPayload = new Uint8Array(
        symbolPacketBuffer,
        this.SYMBOL_HEADER_SIZE,
        payloadSize
      )
      symbolPayload.set(payload)
    }
    // 接続確認
    if (!this.tlsSocket || this.tlsSocket.closed) await this.connect()
    if (!this.tlsSocket) throw Error('Failed to connect socket.')
    // Symbolパケット送信
    this.tlsSocket.write(new Uint8Array(symbolPacketBuffer))
    if (!isResponse) return // レスポンスなしの場合
    return await this.receiver(this.tlsSocket, packetType) // レスポンスありの場合
  }

  /**
   * 受信機
   * @param packetType パケットタイプ
   * @returns レスポンス
   */
  private async receiver(
    socket: tls.TLSSocket,
    packetType: number
  ): Promise<Uint8Array | undefined> {
    return new Promise<Uint8Array | undefined>((resolve, reject) => {
      let responseSize = this.SYMBOL_HEADER_SIZE // ヘッダ分のサイズを前もって付与
      let responseData: Uint8Array | undefined = undefined

      // データ受信
      socket.once('data', (data) => {
        // レスポンスデータ(ヘッダ)取得
        const nodeBufferView = Buffer.from(new Uint8Array(data).buffer)
        // レスポンスサイズチェック
        const responseDataSize = nodeBufferView.readUInt32LE(0)
        if (responseDataSize === 0) {
          socket.destroy()
          reject('Received data is empty.')
        }
        // レスポンスパケットタイプチェック
        const responsePacketType = nodeBufferView.readUInt32LE(4)
        if (responsePacketType !== packetType) {
          socket.destroy()
          reject(
            `Mismatch packet type: expect: ${packetType} actual: ${responsePacketType}`
          )
        }
        // ヘッダが問題なければデータ部取得
        socket.on('data', (data) => {
          const tempResponseData = new Uint8Array(data)
          responseSize += tempResponseData.length
          if (!responseData) {
            // 初回
            responseData = tempResponseData
          } else {
            // 連結
            const merged = new Uint8Array(
              responseData.length + tempResponseData.length
            )
            merged.set(responseData)
            merged.set(tempResponseData, responseData.length)
            responseData = merged
          }
          if (responseDataSize <= responseSize) {
            resolve(responseData)
          }
        })
      })

      // タイムアウト
      socket.on('timeout', () => {
        socket.destroy()
        console.debug(`socket timeout: ${packetType}`)
        reject('timeout')
      })
      // エラー
      socket.on('error', (error) => {
        socket.destroy()
        console.debug(`socket error: ${packetType}`)
        reject(error)
      })
      // 切断
      socket.on('close', () => {
        console.debug(`socket close: ${packetType}`)
      })
    })
  }

  /**
   * ソケット切断
   */
  protected close() {
    if (this.tlsSocket && !this.tlsSocket.closed) {
      this.tlsSocket.end(new Uint8Array())
    }
  }
}

接続

this.connectionOptions = {
  host,
  port,
  timeout,
  cert: Buffer.concat([readFileSync(nodeCertPath), readFileSync(caCertPath)]),
  key: readFileSync(nodeKeyPath),
  rejectUnauthorized: false,
};

this.tlsSocket = await new Promise<tls.TLSSocket>((resolve) => {
  const sock = tls.connect(this.connectionOptions, () => {
    resolve(sock);
  });
});

ホストやポートを指定して接続します。SSL ソケット通信なので証明書を同時に付与して、接続できたらソケットが返ってきます。
自己証明書なのでrejectUnauthorized: falseを付けて検証しないようにします。

リクエスト

/**
 * リクエスト
 * @param packetType パケットタイプ
 * @param payload ペイロード
 * @param isResponse レスポンス有無
 * @returns レスポンス
 */
protected async request(
  packetType: number,
  payload?: Uint8Array,
  isResponse = true
): Promise<Uint8Array | undefined> {
  // Symbolパケット生成
  const payloadSize = payload ? payload.length : 0
  const packetSize = this.SYMBOL_HEADER_SIZE + payloadSize
  const symbolPacketBuffer = new ArrayBuffer(packetSize)
  // Symbolヘッダー編集
  const symbolHeader = new DataView(symbolPacketBuffer)
  symbolHeader.setUint32(0, packetSize, true)
  symbolHeader.setUint32(4, packetType, true)
  // Symbolペイロード編集
  if (payload) {
    const symbolPayload = new Uint8Array(
      symbolPacketBuffer,
      this.SYMBOL_HEADER_SIZE,
      payloadSize
    )
    symbolPayload.set(payload)
  }
  // 接続確認
  if (!this.tlsSocket || this.tlsSocket.closed) await this.connect()
  if (!this.tlsSocket) throw Error('Failed to connect socket.')
  // Symbolパケット送信
  this.tlsSocket.write(new Uint8Array(symbolPacketBuffer))
  if (!isResponse) return // レスポンスなしの場合
  return await this.receiver(this.tlsSocket, packetType) // レスポンスありの場合
}

リクエストは 8 バイトの Symbol ヘッダーと場合に応じてペイロードを付与して送信し、レスポンスがあるパケットタイプの場合は、レスポンスを解析するメソッドを呼び出します。

Symbol ヘッダーは以下の構成です。

項目 長さ
size 4 バイト
paketType 4 バイト

sizeは、Symbol ヘッダーのみの場合は 8。ペイロードがある場合は、ペイロード長を加算します。
paketTypeは、何を要求するかを示すモノで、client/catapult/src/catapult/ionet/PacketType.hに書かれています。

レスポンス

/**
 * 受信機
 * @param packetType パケットタイプ
 * @returns レスポンス
 */
private async receiver(
  socket: tls.TLSSocket,
  packetType: number
): Promise<Uint8Array | undefined> {
  return new Promise<Uint8Array | undefined>((resolve, reject) => {
    let responseSize = this.SYMBOL_HEADER_SIZE // ヘッダ分のサイズを前もって付与
    let responseData: Uint8Array | undefined = undefined

    // SSL接続
    socket.on(
      'secureConnect',
      () => (this.x509Certificate = socket.getPeerX509Certificate())
    )

    // データ受信
    socket.once('data', (data) => {
      // レスポンスデータ(ヘッダ)取得
      const nodeBufferView = Buffer.from(new Uint8Array(data).buffer)
      // レスポンスサイズチェック
      const responseDataSize = nodeBufferView.readUInt32LE(0)
      if (responseDataSize === 0) {
        socket.destroy()
        reject('Received data is empty.')
      }
      // レスポンスパケットタイプチェック
      const responsePacketType = nodeBufferView.readUInt32LE(4)
      if (responsePacketType !== packetType) {
        socket.destroy()
        reject(
          `Mismatch packet type: expect: ${packetType} actual: ${responsePacketType}`
        )
      }
      // ヘッダが問題なければデータ部取得
      socket.on('data', (data) => {
        const tempResponseData = new Uint8Array(data)
        responseSize += tempResponseData.length
        if (!responseData) {
          // 初回
          responseData = tempResponseData
        } else {
          // 連結
          const merged = new Uint8Array(
            responseData.length + tempResponseData.length
          )
          merged.set(responseData)
          merged.set(tempResponseData, responseData.length)
          responseData = merged
        }
        if (responseDataSize <= responseSize) {
          resolve(responseData)
        }
      })
    })

    // タイムアウト
    socket.on('timeout', () => {
      socket.destroy()
      console.debug(`socket timeout: ${packetType}`)
      reject('timeout')
    })
    // エラー
    socket.on('error', (error) => {
      socket.destroy()
      console.debug(`socket error: ${packetType}`)
      reject(error)
    })
    // 切断
    socket.on('close', () => {
      console.debug(`socket close: ${packetType}`)
    })
  })
}

リクエストしたパケットタイプのデータが返ってきたかを判定し、リクエストしたデータであれば残りのデータを取得します。
基底クラスなので個別の処理は行わずに受信データをそのまま返すようにしています。

切断

/**
 * ソケット切断
 */
protected close() {
  if (this.tlsSocket && !this.tlsSocket.closed) {
    this.tlsSocket.end(new Uint8Array())
  }
}

空データを送信して送信終了したことをノードに知らせます。これでノード側から切断されます。
ノード側のタイミングで切断されるため、接続と切断を短い間隔で行うことは出来ません。

パケットの操作

パケットの操作を少し簡単にできるクラスを作成します。

src/PacketBuffer.ts
export class PacketBuffer {
  private _length: number
  private _index: number

  constructor(private readonly _buffer: Buffer) {
    this._length = _buffer.length
    this._index = 0
  }

  addOffset(addOffset: number): number {
    this._index += addOffset
    return this._index
  }

  readUInt8(addOffset: number = 0): number {
    this.addOffset(addOffset)
    const readData = this._buffer.readUInt8(this._index)
    this._index += 1
    return readData
  }

  readUInt16LE(addOffset: number = 0): number {
    this.addOffset(addOffset)
    const readData = this._buffer.readUInt16LE(this._index)
    this._index += 2
    return readData
  }

  readUInt32LE(addOffset: number = 0): number {
    this.addOffset(addOffset)
    const readData = this._buffer.readUInt32LE(this._index)
    this._index += 4
    return readData
  }

  readBigUInt64LE(addOffset: number = 0): bigint {
    this.addOffset(addOffset)
    const readData = this._buffer.readBigUInt64LE(this._index)
    this._index += 8
    return readData
  }

  readString(length: number, addOffset: number = 0): string {
    this.addOffset(addOffset)
    const readData = this._buffer.toString('utf8', this._index, this._index + length)
    this._index += length
    return readData
  }

  readHexString(length: number, addOffset: number = 0): string {
    this.addOffset(addOffset)
    const readData = this._buffer.toString('hex', this._index, this._index + length)
    this._index += length
    return readData
  }

  get index(): number {
    return this._index
  }

  get length(): number {
    return this._length
  }

ノード時間の取得

では、実際に SSL ソケット通信を使ってノードからデータを取得してみましょう。
Rest で言うところの/node/timeです。

ノードへリクエスト

パケットタイプは0x120で、ペイロードはありません。

Catapult.ts
import { NodeTime } from './models/NodeTime.js'
import { SslSocket } from './SslSocket.js'

export class Catapult extends SslSocket {
  /** パケットタイプ */
  private PacketType = {
    TIME_SYNC_NETWORK_TIME: 0x1_20,
  }

  /**
   * /node/time 同等の値を持つクラスを取得
   * @returns NodeTime
   */
  async getNodeTime() {
    console.info('NodeTime')
    let nodeTime: NodeTime | undefined
    try {
      // ピア問合せ
      const socketData = await this.request(
        this.PacketType.TIME_SYNC_NETWORK_TIME
      )
      // デシリアライズ
      if (socketData) nodeTime = NodeTime.deserialize(socketData)
    } catch (e) {
      if (e instanceof Error) console.error(e.message)
      else console.error(e)
    } finally {
      this.close()
    }
    return nodeTime
  }
}

レスポンスデータの解析

シリアライズされたデータが返ってくるので、Rest で見慣れた形に直します。

項目 長さ
sendTimestamp 8 バイト
receiveTimestamp 8 バイト
src/models/NodeTime.ts
export class NodeTime {
  constructor(public sendTimestamp: string, public receiveTimestamp: string) {}

  static deserialize(payload: Uint8Array) {
    const nodeBufferView = Buffer.from(payload);
    const sendTimestamp = nodeBufferView.readBigUInt64LE(0).toString();
    const receiveTimestamp = nodeBufferView.readBigUInt64LE(8).toString();
    return new NodeTime(sendTimestamp, receiveTimestamp);
  }

  toJson() {
    return {
      communicationTimestamps: {
        sendTimestamp: this.sendTimestamp,
        receiveTimestamp: this.receiveTimestamp,
      },
    };
  }
}

メインの作成

実行するコードを作成します。
Catapultクラスの第 4 引数は公開されたノードであればどこでも良いです。メインネット、テストネットも問いません。証明書は共用出来ます。

src/mainNodeTime.ts
import { Catapult } from './Catapult.js'

const catapult = new Catapult(
  'cert/ca.crt.pem',
  'cert/node.crt.pem',
  'cert/node.key.pem',
  'localhost'
)

const nodeTime = await catapult.getNodeTime()
if (nodeTime) console.log(nodeTime)

実行

yarn tsx .\src\mainNodeTime.ts

実行すると以下の様にノード時間が出力されます。

NodeTime
socket connected.
NodeTime {
  sendTimestamp: '115990352077',
  receiveTimestamp: '115990352077'
}
socket close: 288

バックナンバー

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?