1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

トラフィックジェネレータ Ixia-c を試してみた(第1回)

Last updated at Posted at 2025-04-29

ネットワークの動作確認をする際に、

  • 複数のIPアドレスを模擬してトラフィックを流したい
  • TCPやUDPのポート番号を複数使ってトラフィックを流したい
  • VLAN-IDを付与してトラフィックを流したい

といった疑似的なトラフィックを流すケースがあります。
そこで今回は、無料でも使える Ixia-c を使って、ネットワークシミュレーター環境でこれらを試してみました。

内容が少し長くなったため、以下の2回に分けて投稿します。

  • 第1回:同じIPセグメント間でトラフィックを流すケース
  • 第2回:ルーターを挟んで異なるIPセグメント間でトラフィックを流すケース

今回は第1回です。第2回の内容は以下となります。

Ixia-c とは

コンテナベースのネットワークテストツールで、Open Traffic Generator(OTG)APIに準拠したトラフィックジェネレーターおよびプロトコルエミュレーターです。

Ixia-cには無償で利用できるCommunity Editionがあり、今回のような単純なトラフィックジェネレータとしての確認なら無償版でも十分でした。
Community Editionと他の商用ライセンスとの違いは以下をご参照ください。
https://ixia-c.dev/licensing/

実施した環境

今回は以下の Examples の内容を参考に試してみました。

この構成では、サーバ内部での back-to-back 接続となるため、外部のネットワーク機器との接続確認ができません。そこで、外部のスイッチングハブ(SW-HUB)を経由した back-to-back 接続に変更して確認を行いました。

image.png

また、ixia-c への指示はotgcdnというCLIツールでの例になってますが、自分がやった限り送信時間を指定した設定ができなかったので、snappi という pythonモジュールを利用して確認しました。
各ソフトウェアのバージョンは以下の通りです。

  • Ubuntu 24.04.1 LTS
  • Python 3.12.3
  • pip 24.0
  • snappi Version: 1.28.2
  • Ixia-c
    • keng-controller 1.28.0-6
    • ixia-c-traffic-engine 1.8.0.245

セットアップ

以下のようなイメージで動作させるため、Ixia-c と snappi のセットアップを行います
image.png

サーバーのポート設定

以下の図のように2ポートからトラフィックが送受信できるように、今回は空きポートのens4ens5を利用する設定を事前に実施します
image.png

設定については、再起動後も残るようにnetplanens4ens5の設定を加えます。dhcpを無効と合わせてIPv6のリンクローカルを無効にします。
ens3は利用環境に合わせて変更してください。
以下のように/etc/netplan/99-custom.yamlを作成

/etc/netplan/99-custom.yaml
network:
  version: 2
  renderer: networkd

  ethernets:
    ens3:
      dhcp4: false
      addresses:
        - 192.168.1.101/24
      nameservers:
        addresses:
          - 8.8.8.8
        search: []
      routes:
      - to: default
        via: 192.168.1.1
    ens4:
      dhcp4: false
      dhcp6: false
      link-local: []       # IPv4/IPv6 のリンクローカルを無効化
    ens5:
      dhcp4: false
      dhcp6: false
      link-local: []       # IPv4/IPv6 のリンクローカルを無効化

上記ファイルを作成したら、以下の設定で反映されます。

netplan apply

netplan apply実行時の注意

/etc/netplan/99-custom.yaml で現在利用している環境から IP アドレス等を変更すると、ログインセッションが切れてしまう可能性があるため注意してください。
netplan get で現在の状態が .yaml ファイル形式で出力されるので、それを参考にしながら 99-custom.yaml を作成することをおすすめします。

Ixia-c のセットアップ

docker-compose でのセットアップになります。サーバーにDocker関連のインストールされていない場合は以下のリンクを参照にインストールしてください。
https://docs.docker.com/engine/install/ubuntu/#installation-methods

ディレクトリを作ってその中で各ファイルを作成します。

mkdir ixia-c
cd ixia-c/

compose.yaml の作成

以下のURLと内容から traffic_engine のIF名をens4ens5に変えてcompose.yamlを作成
https://github.com/open-traffic-generator/otg-examples/blob/main/docker-compose/b2b/compose.yml

compose.yaml
services:
  controller:
    image: ghcr.io/open-traffic-generator/keng-controller:1.28.0-6
    container_name: ixia-c-controller
    command: --accept-eula --http-port 8443
    network_mode: "host"
#    restart: always      # サーバー再起動でも自動で起動させたい場合はコメント外す
  traffic_engine_1:
    image: ghcr.io/open-traffic-generator/ixia-c-traffic-engine:1.8.0.245
    container_name: ixia-c-traffic_engine_1
    network_mode: "host"
#    restart: always      # サーバー再起動でも自動で起動させたい場合はコメント外す
    privileged: true
    environment:
      - OPT_LISTEN_PORT=5555
      - ARG_IFACE_LIST=virtual@af_packet,ens4  # サーバーのIF名に合わせて変更
      - OPT_NO_HUGEPAGES=Yes
  traffic_engine_2:
    image: ghcr.io/open-traffic-generator/ixia-c-traffic-engine:1.8.0.245
    container_name: ixia-c-traffic_engine_2
    network_mode: "host"
#    restart: always      # サーバー再起動でも自動で起動させたい場合はコメント外す
    privileged: true
    environment:
      - OPT_LISTEN_PORT=5556
      - ARG_IFACE_LIST=virtual@af_packet,ens5  # サーバーのIF名に合わせて変更
      - OPT_NO_HUGEPAGES=Yes

Docker Composeでビルド&起動

以下のコマンドで Ixia-c のビルドおよび起動が行われます。

docker compose up -d

初回実行時には、Docker イメージのダウンロードとビルドが行われるため、少し時間がかかりますが、2回目以降はダウンロードはスキップされ、すぐに起動されます。
(compose.yamlrestart: alwaysのコメントを外すとサーバーの再起動時などは自動で起動されます)

snappi のセットアップ

pip のインストール

pip でのインストールになるので、pip 環境がなければ事前にインストール。

apt-get install pip

Pythonの環境がない場合も、こちらで合わせてインストールされます。

snappi のインストール

以下の設定でインストールされます。

pip install --upgrade snappi --break-system-packages

これでセットアップは完了です。

トラフィックを流してみる

いろんなパターンでトラフィックを流してみます。流す条件や受信状態の確認などはPythonのスクリプトで定義していきます。
ChatGPTに相談しながら、2ポート同時に流し、送受信の総パケット数とリアルタイムのppsを表示するようにしてみました。

1.単純なIP+UDP通信(パケット数を指定)

まずはじめに以下の条件でトラフィックを流してみます

  • 送信レート : 3 pps
  • フレームサイズ : 123 byte
  • 送信パケット数 : 30 パケット送信で停止
  • MACアドレス : 00:00:00:00:00:aa00:00:00:00:00:bbをお互いの送信元・宛先にする
  • IPアドレス : 192.168.99.1192.168.99.2をお互いの送信元・宛先にする
  • UDPをつけて、5000160001をお互いの送信元・宛先ポートにする

以下のPythonスクリプトを作成

traffic-1-1flow-30packets.py
traffic-1-1flow-30packets.py
import snappi  # snappiライブラリをインポート(OTG APIを使うため)
import time    # 時間管理用の標準ライブラリをインポート

# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)

# 新しいコンフィグを作成
cfg = api.config()

# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]


#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name                   # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name]                # 受信先ポートをp2に設定

flow1.duration.fixed_packets.packets = 30            # 送信パケット数で測定を止める

flow1.rate.pps = 3                                   # 送信速度:パケット/秒
flow1.size.fixed = 123                               # パケットサイズ
flow1.metrics.enable = True                          # メトリクス(統計情報)を有効化
flow1.metrics.loss = False                           # パケット損失計測は無効
flow1.metrics.timestamps = False                     # タイムスタンプ計測は無効

# パケットの構造定義
eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()

eth1.src.value = '00:00:00:00:00:aa'                 # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb'                 # 宛先MACアドレス
ip1.src.value = '192.168.99.1'                       # 送信元IPアドレス
ip1.dst.value = '192.168.99.2'                       # 宛先IPアドレス
udp1.src_port.value = 50001                          # 送信元UDPポート
udp1.dst_port.value = 60001                          # 宛先UDPポート

#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]

flow2.duration.fixed_packets.packets = 30            # 送信パケット数で測定を止める

flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False

# パケット構造(flow1と逆方向の通信)
eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp()

eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001

#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)

# トラフィック送信を開始する関数
def start_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
    api.set_control_state(ctrl)

# トラフィック送信を停止する関数
def stop_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
    api.set_control_state(ctrl)

# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name]  # 対象フローを指定

# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')

# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
    while True:
        metrics = api.get_metrics(mreq)  # メトリクスを取得

        # 前回表示した行数分、カーソルを上に戻して上書き
        if prev_lines:
            print(f"\033[{prev_lines}A", end='')

        # 出力内容を構築
        output_lines = []
        #output_lines.append(f"Time: {time.strftime('%H:%M:%S')}")  # 現在時刻
        for fm in metrics.flow_metrics:
            output_lines.append(
                f"Flow {fm.name}: "
                f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
                f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
            )

        # 出力&行数カウント
        for line in output_lines:
            print(line)
        prev_lines = len(output_lines)

        time.sleep(1)  # 1秒ごとに更新

except KeyboardInterrupt:
    # Ctrl+Cで停止時の処理
    print('Stopping traffic...')
    stop_traffic()
    print('Traffic stopped.')

作成したスクリプトをpython3 ./traffic-1-1flow-30packets.pyで実行するとトラフィックを送信開始され、送受信のフレーム数とトラフィック量(pps)が表示されます。

# python3 ./traffic-1-1flow-30packets.py
2025-xx-xx xx:xx:55 [snappi.snappi] [WARNING] Certificate verification is disabled
--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---
Flow p1-p2: tx_frames=10 (2pps), rx_frames=10 (3pps)
Flow p2-p1: tx_frames=10 (2pps), rx_frames=10 (2pps)

パケットキャプチャを行うことで、設定どおりのパケットが送信されていることを確認できます。
image.png

2.単純なIP+UDP通信(測定時間を指定)

測定を完了させる条件をパケット数ではなく時間にする場合は以下のようにfixed_packets.packetsからfixed_seconds.secondsに変更することで対応できます。

- flow1.duration.fixed_packets.packets = 30            # 送信パケット数で測定を止める
+ flow1.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

- flow2.duration.fixed_packets.packets = 30            # 送信パケット数で測定を止める
+ flow2.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

以下のPythonスクリプトを作成

traffic-2-1flow-10sec.py
traffic-2-1flow-10sec.py
import snappi  # snappiライブラリをインポート(OTG APIを使うため)
import time    # 時間管理用の標準ライブラリをインポート

# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)

# 新しいコンフィグを作成
cfg = api.config()

# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]


#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name                   # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name]                # 受信先ポートをp2に設定

flow1.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

flow1.rate.pps = 3                                   # 送信速度:パケット/秒
flow1.size.fixed = 123                               # パケットサイズ
flow1.metrics.enable = True                          # メトリクス(統計情報)を有効化
flow1.metrics.loss = False                           # パケット損失計測は無効
flow1.metrics.timestamps = False                     # タイムスタンプ計測は無効

# パケットの構造定義
eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()

eth1.src.value = '00:00:00:00:00:aa'                 # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb'                 # 宛先MACアドレス
ip1.src.value = '192.168.99.1'                       # 送信元IPアドレス
ip1.dst.value = '192.168.99.2'                       # 宛先IPアドレス
udp1.src_port.value = 50001                          # 送信元UDPポート
udp1.dst_port.value = 60001                          # 宛先UDPポート

#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]

flow2.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False

# パケット構造(flow1と逆方向の通信)
eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp()

eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001

#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)

# トラフィック送信を開始する関数
def start_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
    api.set_control_state(ctrl)

# トラフィック送信を停止する関数
def stop_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
    api.set_control_state(ctrl)

# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name]  # 対象フローを指定

# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')

# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
    while True:
        metrics = api.get_metrics(mreq)  # メトリクスを取得

        # 前回表示した行数分、カーソルを上に戻して上書き
        if prev_lines:
            print(f"\033[{prev_lines}A", end='')

        # 出力内容を構築
        output_lines = []
        #output_lines.append(f"Time: {time.strftime('%H:%M:%S')}")  # 現在時刻
        for fm in metrics.flow_metrics:
            output_lines.append(
                f"Flow {fm.name}: "
                f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
                f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
            )

        # 出力&行数カウント
        for line in output_lines:
            print(line)
        prev_lines = len(output_lines)

        time.sleep(1)  # 1秒ごとに更新

except KeyboardInterrupt:
    # Ctrl+Cで停止時の処理
    print('Stopping traffic...')
    stop_traffic()
    print('Traffic stopped.')

3.送信元IPアドレスを複数にして通信

次に片方のトラフィックの送信元のIPアドレスを10個にして流してみます。
以下のとおりvalueの部分をincrementに変更することでパラメータを増加させることができます。

- ip1.src.value = '192.168.99.1'            # 送信元IPアドレス
+ ip1.src.increment.start = '192.168.99.1'  # 送信元IPアドレス、スタートするアドレス
+ ip1.src.increment.step  = '0.0.0.1'       # 送信元IPアドレス、増加量を指定(いくつずつ増や すか)
+ ip1.src.increment.count = 10              # 送信元IPアドレス、いくつのアドレスを生成するか (繰り返す数)を指定

以下のPythonスクリプトを作成

traffic-3-ipv4_10flow-10sec.py
traffic-3-ipv4_10flow-10sec.py
import snappi  # snappiライブラリをインポート(OTG APIを使うため)
import time    # 時間管理用の標準ライブラリをインポート

# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)

# 新しいコンフィグを作成
cfg = api.config()

# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]


#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name                   # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name]                # 受信先ポートをp2に設定

flow1.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

flow1.rate.pps = 3                                   # 送信速度:パケット/秒
flow1.size.fixed = 123                               # パケットサイズ
flow1.metrics.enable = True                          # メトリクス(統計情報)を有効化
flow1.metrics.loss = False                           # パケット損失計測は無効
flow1.metrics.timestamps = False                     # タイムスタンプ計測は無効

# パケットの構造定義
eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()

eth1.src.value = '00:00:00:00:00:aa'                 # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb'                 # 宛先MACアドレス

ip1.src.increment.start = '192.168.99.1'             # 送信元IPアドレス、スタートするアドレス
ip1.src.increment.step  = '0.0.0.1'                  # 送信元IPアドレス、増加量を指定(いくつずつ増やすか)
ip1.src.increment.count = 10                         # 送信元IPアドレス、いくつのアドレスを生成するか(繰り返す数)を指定

ip1.dst.value = '192.168.99.2'                       # 宛先IPアドレス
udp1.src_port.value = 50001                          # 送信元UDPポート
udp1.dst_port.value = 60001                          # 宛先UDPポート

#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]

flow2.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False

# パケット構造(flow1と逆方向の通信)
eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp()

eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001

#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)

# トラフィック送信を開始する関数
def start_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
    api.set_control_state(ctrl)

# トラフィック送信を停止する関数
def stop_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
    api.set_control_state(ctrl)

# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name]  # 対象フローを指定

# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')

# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
    while True:
        metrics = api.get_metrics(mreq)  # メトリクスを取得

        # 前回表示した行数分、カーソルを上に戻して上書き
        if prev_lines:
            print(f"\033[{prev_lines}A", end='')

        # 出力内容を構築
        output_lines = []
        #output_lines.append(f"Time: {time.strftime('%H:%M:%S')}")  # 現在時刻
        for fm in metrics.flow_metrics:
            output_lines.append(
                f"Flow {fm.name}: "
                f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
                f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
            )

        # 出力&行数カウント
        for line in output_lines:
            print(line)
        prev_lines = len(output_lines)

        time.sleep(1)  # 1秒ごとに更新

except KeyboardInterrupt:
    # Ctrl+Cで停止時の処理
    print('Stopping traffic...')
    stop_traffic()
    print('Traffic stopped.')

以下でパケットキャプチャしながら実行

python3 ./traffic-3-ipv4_10flow-10sec.py

パケットキャプチャ結果をみると送信元が「192.168.99.1」~「192.168.99.10」になっていることが確認できます。
image.png
(確認しやすいように送信元MACアドレスでフィルタ後の画面キャプチャにしてます)

4.UDPの送信元ポートを複数にて通信

次はさらにUDPの送信元ポート番号を10個にして流してみます。
IPの時と同じようにvalueの部分をincrementに変更することでパラメータを増加させることができます。

+ udp1.src_port.value = 50001            # 送信元UDPポート
- udp1.src_port.increment.start = 50001  # 送信元UDPポート、スタートするポート番号
- udp1.src_port.increment.step = 1       # 送信元UDPポート、増加量を指定(いくつずつ増やすか)
- udp1.src_port.increment.count = 10     # 送信元UDPポート、いくつのアドレスを生成するか(繰り返す数)を指定

以下のPythonスクリプトを作成

traffic-4-udp_10flow-10sec.py
traffic-4-udp_10flow-10sec.py
import snappi  # snappiライブラリをインポート(OTG APIを使うため)
import time    # 時間管理用の標準ライブラリをインポート

# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)

# 新しいコンフィグを作成
cfg = api.config()

# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]


#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name                   # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name]                # 受信先ポートをp2に設定

flow1.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

flow1.rate.pps = 3                                   # 送信速度:パケット/秒
flow1.size.fixed = 123                               # パケットサイズ
flow1.metrics.enable = True                          # メトリクス(統計情報)を有効化
flow1.metrics.loss = False                           # パケット損失計測は無効
flow1.metrics.timestamps = False                     # タイムスタンプ計測は無効

# パケットの構造定義
eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()

eth1.src.value = '00:00:00:00:00:aa'                 # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb'                 # 宛先MACアドレス

ip1.src.increment.start = '192.168.99.1'             # 送信元IPアドレス、スタートするアドレス
ip1.src.increment.step  = '0.0.0.1'                  # 送信元IPアドレス、増加量を指定(いくつずつ増やすか)
ip1.src.increment.count = 10                         # 送信元IPアドレス、いくつのアドレスを生成するか(繰り返す数)を指定

ip1.dst.value = '192.168.99.2'                       # 宛先IPアドレス

udp1.src_port.increment.start = 50001                # 送信元UDPポート、スタートするポート番号
udp1.src_port.increment.step = 1                     # 送信元UDPポート、増加量を指定(いくつずつ増やすか)
udp1.src_port.increment.count = 10                   # 送信元UDPポート、いくつのアドレスを生成するか(繰り返す数)を指定

udp1.dst_port.value = 60001                          # 宛先UDPポート

#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]

flow2.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False

# パケット構造(flow1と逆方向の通信)
eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp()

eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001

#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)

# トラフィック送信を開始する関数
def start_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
    api.set_control_state(ctrl)

# トラフィック送信を停止する関数
def stop_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
    api.set_control_state(ctrl)

# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name]  # 対象フローを指定

# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')

# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
    while True:
        metrics = api.get_metrics(mreq)  # メトリクスを取得

        # 前回表示した行数分、カーソルを上に戻して上書き
        if prev_lines:
            print(f"\033[{prev_lines}A", end='')

        # 出力内容を構築
        output_lines = []
        #output_lines.append(f"Time: {time.strftime('%H:%M:%S')}")  # 現在時刻
        for fm in metrics.flow_metrics:
            output_lines.append(
                f"Flow {fm.name}: "
                f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
                f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
            )

        # 出力&行数カウント
        for line in output_lines:
            print(line)
        prev_lines = len(output_lines)

        time.sleep(1)  # 1秒ごとに更新

except KeyboardInterrupt:
    # Ctrl+Cで停止時の処理
    print('Stopping traffic...')
    stop_traffic()
    print('Traffic stopped.')

パケットキャプチャ結果をみると送信元UDPポート番号が「50001」~「50010」になっていることが確認できます。
image.png

5.VLANタグをつけて通信

次はさらにVLANを付けて流してみます。
以下のようにvlanのパラメーターを追加することでVLANタグをつけて送信できます。今回は「VLAN-ID: 100」と「プライオリティ(CoS): 3」を設定。

- eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()
+ eth1, vlan1, ip1, udp1 = flow1.packet.ethernet().vlan().ipv4().udp()
+ vlan1.id.value = 100                                 # VLAN (VLAN-ID)
+ vlan1.priority.value = 3                             # VLAN (Priority:CoS)

以下のPythonスクリプトを作成

traffic-5-vlan-10sec.py
traffic-5-vlan-10sec.py
import snappi  # snappiライブラリをインポート(OTG APIを使うため)
import time    # 時間管理用の標準ライブラリをインポート

# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)

# 新しいコンフィグを作成
cfg = api.config()

# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]


#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name                   # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name]                # 受信先ポートをp2に設定

flow1.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

flow1.rate.pps = 3                                   # 送信速度:パケット/秒
flow1.size.fixed = 123                               # パケットサイズ
flow1.metrics.enable = True                          # メトリクス(統計情報)を有効化
flow1.metrics.loss = False                           # パケット損失計測は無効
flow1.metrics.timestamps = False                     # タイムスタンプ計測は無効

# パケットの構造定義
eth1, vlan1, ip1, udp1 = flow1.packet.ethernet().vlan().ipv4().udp()

eth1.src.value = '00:00:00:00:00:aa'                 # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb'                 # 宛先MACアドレス

vlan1.id.value = 100                                 # VLAN (VLAN-ID)
vlan1.priority.value = 3                             # VLAN (Priority:CoS)

ip1.src.increment.start = '192.168.99.1'             # 送信元IPアドレス、スタートするアドレス
ip1.src.increment.step  = '0.0.0.1'                  # 送信元IPアドレス、増加量を指定(いくつずつ増やすか)
ip1.src.increment.count = 10                         # 送信元IPアドレス、いくつのアドレスを生成するか(繰り返す数)を指定

ip1.dst.value = '192.168.99.2'                       # 宛先IPアドレス

udp1.src_port.increment.start = 50001                # 送信元UDPポート、スタートするポート番号
udp1.src_port.increment.step = 1                     # 送信元UDPポート、増加量を指定(いくつずつ増やすか)
udp1.src_port.increment.count = 10                   # 送信元UDPポート、いくつのアドレスを生成するか(繰り返す数)を指定

udp1.dst_port.value = 60001                          # 宛先UDPポート

#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]

flow2.duration.fixed_seconds.seconds = 10            # 送信時間で測定を止める

flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False

# パケット構造(flow1と逆方向の通信)
eth2, vlan2, ip2, udp2 = flow2.packet.ethernet().vlan().ipv4().udp()

eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
vlan2.id.value = 100
vlan2.priority.value = 3
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001

#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)

# トラフィック送信を開始する関数
def start_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
    api.set_control_state(ctrl)

# トラフィック送信を停止する関数
def stop_traffic():
    ctrl = api.control_state()
    ctrl.choice = ctrl.TRAFFIC
    ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
    ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
    api.set_control_state(ctrl)

# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name]  # 対象フローを指定

# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')

# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
    while True:
        metrics = api.get_metrics(mreq)  # メトリクスを取得

        # 前回表示した行数分、カーソルを上に戻して上書き
        if prev_lines:
            print(f"\033[{prev_lines}A", end='')

        # 出力内容を構築
        output_lines = []
        #output_lines.append(f"Time: {time.strftime('%H:%M:%S')}")  # 現在時刻
        for fm in metrics.flow_metrics:
            output_lines.append(
                f"Flow {fm.name}: "
                f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
                f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
            )

        # 出力&行数カウント
        for line in output_lines:
            print(line)
        prev_lines = len(output_lines)

        time.sleep(1)  # 1秒ごとに更新

except KeyboardInterrupt:
    # Ctrl+Cで停止時の処理
    print('Stopping traffic...')
    stop_traffic()
    print('Traffic stopped.')

パケットキャプチャ結果をみるとVLANタグがついていることと、その中身も「VLAN-ID: 100」と「プライオリティ(CoS): 3」 になっていることが確認できます。
image.png

最後に

今回はここまでです。
L2スイッチなどにトラフィックを流す場合は、今回紹介した方法で通信できると思います。
ただし、ルーターやL3スイッチを間に挟んでトラフィックを流す場合は、ARP解決が必要になります。
次回はその点について説明します。

第2回の内容は以下となります。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?