ネットワークの動作確認をする際に、
- 複数のIPアドレスを模擬してトラフィックを流したい
- TCPやUDPのポート番号を複数使ってトラフィックを流したい
- VLAN-IDを付与してトラフィックを流したい
といった疑似的なトラフィックを流すケースがあります。
そこで今回は、無料でも使える Ixia-c を使って、ネットワークシミュレーター環境でこれらを試してみました。
内容が少し長くなったため、以下の2回に分けて投稿します。
- 第1回:同じIPセグメント間でトラフィックを流すケース
- 第2回:ルーターを挟んで異なるIPセグメント間でトラフィックを流すケース
今回は第1回です。第2回の内容は以下となります。
Ixia-c とは
コンテナベースのネットワークテストツールで、Open Traffic Generator(OTG)APIに準拠したトラフィックジェネレーターおよびプロトコルエミュレーターです。
Ixia-cには無償で利用できるCommunity Editionがあり、今回のような単純なトラフィックジェネレータとしての確認なら無償版でも十分でした。
Community Editionと他の商用ライセンスとの違いは以下をご参照ください。
https://ixia-c.dev/licensing/
実施した環境
今回は以下の Examples の内容を参考に試してみました。
この構成では、サーバ内部での back-to-back 接続となるため、外部のネットワーク機器との接続確認ができません。そこで、外部のスイッチングハブ(SW-HUB)を経由した back-to-back 接続に変更して確認を行いました。
また、ixia-c への指示はotgcdnというCLIツールでの例になってますが、自分がやった限り送信時間を指定した設定ができなかったので、snappi という pythonモジュールを利用して確認しました。
各ソフトウェアのバージョンは以下の通りです。
- Ubuntu 24.04.1 LTS
- Python 3.12.3
- pip 24.0
- snappi Version: 1.28.2
- Ixia-c
- keng-controller 1.28.0-6
- ixia-c-traffic-engine 1.8.0.245
セットアップ
以下のようなイメージで動作させるため、Ixia-c と snappi のセットアップを行います
サーバーのポート設定
以下の図のように2ポートからトラフィックが送受信できるように、今回は空きポートのens4
とens5
を利用する設定を事前に実施します
設定については、再起動後も残るようにnetplan
にens4
とens5
の設定を加えます。dhcpを無効と合わせてIPv6のリンクローカルを無効にします。
ens3
は利用環境に合わせて変更してください。
以下のように/etc/netplan/99-custom.yaml
を作成
network:
version: 2
renderer: networkd
ethernets:
ens3:
dhcp4: false
addresses:
- 192.168.1.101/24
nameservers:
addresses:
- 8.8.8.8
search: []
routes:
- to: default
via: 192.168.1.1
ens4:
dhcp4: false
dhcp6: false
link-local: [] # IPv4/IPv6 のリンクローカルを無効化
ens5:
dhcp4: false
dhcp6: false
link-local: [] # IPv4/IPv6 のリンクローカルを無効化
上記ファイルを作成したら、以下の設定で反映されます。
netplan apply
Ixia-c のセットアップ
docker-compose でのセットアップになります。サーバーにDocker関連のインストールされていない場合は以下のリンクを参照にインストールしてください。
https://docs.docker.com/engine/install/ubuntu/#installation-methods
ディレクトリを作ってその中で各ファイルを作成します。
mkdir ixia-c
cd ixia-c/
compose.yaml の作成
以下のURLと内容から traffic_engine のIF名をens4
とens5
に変えてcompose.yaml
を作成
https://github.com/open-traffic-generator/otg-examples/blob/main/docker-compose/b2b/compose.yml
services:
controller:
image: ghcr.io/open-traffic-generator/keng-controller:1.28.0-6
container_name: ixia-c-controller
command: --accept-eula --http-port 8443
network_mode: "host"
# restart: always # サーバー再起動でも自動で起動させたい場合はコメント外す
traffic_engine_1:
image: ghcr.io/open-traffic-generator/ixia-c-traffic-engine:1.8.0.245
container_name: ixia-c-traffic_engine_1
network_mode: "host"
# restart: always # サーバー再起動でも自動で起動させたい場合はコメント外す
privileged: true
environment:
- OPT_LISTEN_PORT=5555
- ARG_IFACE_LIST=virtual@af_packet,ens4 # サーバーのIF名に合わせて変更
- OPT_NO_HUGEPAGES=Yes
traffic_engine_2:
image: ghcr.io/open-traffic-generator/ixia-c-traffic-engine:1.8.0.245
container_name: ixia-c-traffic_engine_2
network_mode: "host"
# restart: always # サーバー再起動でも自動で起動させたい場合はコメント外す
privileged: true
environment:
- OPT_LISTEN_PORT=5556
- ARG_IFACE_LIST=virtual@af_packet,ens5 # サーバーのIF名に合わせて変更
- OPT_NO_HUGEPAGES=Yes
Docker Composeでビルド&起動
以下のコマンドで Ixia-c のビルドおよび起動が行われます。
docker compose up -d
初回実行時には、Docker イメージのダウンロードとビルドが行われるため、少し時間がかかりますが、2回目以降はダウンロードはスキップされ、すぐに起動されます。
(compose.yaml
にrestart: always
のコメントを外すとサーバーの再起動時などは自動で起動されます)
snappi のセットアップ
pip のインストール
pip でのインストールになるので、pip 環境がなければ事前にインストール。
apt-get install pip
Pythonの環境がない場合も、こちらで合わせてインストールされます。
snappi のインストール
以下の設定でインストールされます。
pip install --upgrade snappi --break-system-packages
これでセットアップは完了です。
トラフィックを流してみる
いろんなパターンでトラフィックを流してみます。流す条件や受信状態の確認などはPythonのスクリプトで定義していきます。
ChatGPTに相談しながら、2ポート同時に流し、送受信の総パケット数とリアルタイムのppsを表示するようにしてみました。
1.単純なIP+UDP通信(パケット数を指定)
まずはじめに以下の条件でトラフィックを流してみます
- 送信レート : 3 pps
- フレームサイズ : 123 byte
- 送信パケット数 : 30 パケット送信で停止
- MACアドレス :
00:00:00:00:00:aa
と00:00:00:00:00:bb
をお互いの送信元・宛先にする - IPアドレス :
192.168.99.1
と192.168.99.2
をお互いの送信元・宛先にする - UDPをつけて、
50001
と60001
をお互いの送信元・宛先ポートにする
以下のPythonスクリプトを作成
traffic-1-1flow-30packets.py
import snappi # snappiライブラリをインポート(OTG APIを使うため)
import time # 時間管理用の標準ライブラリをインポート
# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)
# 新しいコンフィグを作成
cfg = api.config()
# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]
#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name] # 受信先ポートをp2に設定
flow1.duration.fixed_packets.packets = 30 # 送信パケット数で測定を止める
flow1.rate.pps = 3 # 送信速度:パケット/秒
flow1.size.fixed = 123 # パケットサイズ
flow1.metrics.enable = True # メトリクス(統計情報)を有効化
flow1.metrics.loss = False # パケット損失計測は無効
flow1.metrics.timestamps = False # タイムスタンプ計測は無効
# パケットの構造定義
eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()
eth1.src.value = '00:00:00:00:00:aa' # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb' # 宛先MACアドレス
ip1.src.value = '192.168.99.1' # 送信元IPアドレス
ip1.dst.value = '192.168.99.2' # 宛先IPアドレス
udp1.src_port.value = 50001 # 送信元UDPポート
udp1.dst_port.value = 60001 # 宛先UDPポート
#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]
flow2.duration.fixed_packets.packets = 30 # 送信パケット数で測定を止める
flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False
# パケット構造(flow1と逆方向の通信)
eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp()
eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001
#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)
# トラフィック送信を開始する関数
def start_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
api.set_control_state(ctrl)
# トラフィック送信を停止する関数
def stop_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
api.set_control_state(ctrl)
# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name] # 対象フローを指定
# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')
# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
while True:
metrics = api.get_metrics(mreq) # メトリクスを取得
# 前回表示した行数分、カーソルを上に戻して上書き
if prev_lines:
print(f"\033[{prev_lines}A", end='')
# 出力内容を構築
output_lines = []
#output_lines.append(f"Time: {time.strftime('%H:%M:%S')}") # 現在時刻
for fm in metrics.flow_metrics:
output_lines.append(
f"Flow {fm.name}: "
f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
)
# 出力&行数カウント
for line in output_lines:
print(line)
prev_lines = len(output_lines)
time.sleep(1) # 1秒ごとに更新
except KeyboardInterrupt:
# Ctrl+Cで停止時の処理
print('Stopping traffic...')
stop_traffic()
print('Traffic stopped.')
作成したスクリプトをpython3 ./traffic-1-1flow-30packets.py
で実行するとトラフィックを送信開始され、送受信のフレーム数とトラフィック量(pps)が表示されます。
# python3 ./traffic-1-1flow-30packets.py
2025-xx-xx xx:xx:55 [snappi.snappi] [WARNING] Certificate verification is disabled
--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---
Flow p1-p2: tx_frames=10 (2pps), rx_frames=10 (3pps)
Flow p2-p1: tx_frames=10 (2pps), rx_frames=10 (2pps)
パケットキャプチャを行うことで、設定どおりのパケットが送信されていることを確認できます。
2.単純なIP+UDP通信(測定時間を指定)
測定を完了させる条件をパケット数ではなく時間にする場合は以下のようにfixed_packets.packets
からfixed_seconds.seconds
に変更することで対応できます。
- flow1.duration.fixed_packets.packets = 30 # 送信パケット数で測定を止める
+ flow1.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
- flow2.duration.fixed_packets.packets = 30 # 送信パケット数で測定を止める
+ flow2.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
以下のPythonスクリプトを作成
traffic-2-1flow-10sec.py
import snappi # snappiライブラリをインポート(OTG APIを使うため)
import time # 時間管理用の標準ライブラリをインポート
# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)
# 新しいコンフィグを作成
cfg = api.config()
# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]
#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name] # 受信先ポートをp2に設定
flow1.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
flow1.rate.pps = 3 # 送信速度:パケット/秒
flow1.size.fixed = 123 # パケットサイズ
flow1.metrics.enable = True # メトリクス(統計情報)を有効化
flow1.metrics.loss = False # パケット損失計測は無効
flow1.metrics.timestamps = False # タイムスタンプ計測は無効
# パケットの構造定義
eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()
eth1.src.value = '00:00:00:00:00:aa' # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb' # 宛先MACアドレス
ip1.src.value = '192.168.99.1' # 送信元IPアドレス
ip1.dst.value = '192.168.99.2' # 宛先IPアドレス
udp1.src_port.value = 50001 # 送信元UDPポート
udp1.dst_port.value = 60001 # 宛先UDPポート
#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]
flow2.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False
# パケット構造(flow1と逆方向の通信)
eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp()
eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001
#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)
# トラフィック送信を開始する関数
def start_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
api.set_control_state(ctrl)
# トラフィック送信を停止する関数
def stop_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
api.set_control_state(ctrl)
# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name] # 対象フローを指定
# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')
# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
while True:
metrics = api.get_metrics(mreq) # メトリクスを取得
# 前回表示した行数分、カーソルを上に戻して上書き
if prev_lines:
print(f"\033[{prev_lines}A", end='')
# 出力内容を構築
output_lines = []
#output_lines.append(f"Time: {time.strftime('%H:%M:%S')}") # 現在時刻
for fm in metrics.flow_metrics:
output_lines.append(
f"Flow {fm.name}: "
f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
)
# 出力&行数カウント
for line in output_lines:
print(line)
prev_lines = len(output_lines)
time.sleep(1) # 1秒ごとに更新
except KeyboardInterrupt:
# Ctrl+Cで停止時の処理
print('Stopping traffic...')
stop_traffic()
print('Traffic stopped.')
3.送信元IPアドレスを複数にして通信
次に片方のトラフィックの送信元のIPアドレスを10個にして流してみます。
以下のとおりvalue
の部分をincrement
に変更することでパラメータを増加させることができます。
- ip1.src.value = '192.168.99.1' # 送信元IPアドレス
+ ip1.src.increment.start = '192.168.99.1' # 送信元IPアドレス、スタートするアドレス
+ ip1.src.increment.step = '0.0.0.1' # 送信元IPアドレス、増加量を指定(いくつずつ増や すか)
+ ip1.src.increment.count = 10 # 送信元IPアドレス、いくつのアドレスを生成するか (繰り返す数)を指定
以下のPythonスクリプトを作成
traffic-3-ipv4_10flow-10sec.py
import snappi # snappiライブラリをインポート(OTG APIを使うため)
import time # 時間管理用の標準ライブラリをインポート
# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)
# 新しいコンフィグを作成
cfg = api.config()
# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]
#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name] # 受信先ポートをp2に設定
flow1.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
flow1.rate.pps = 3 # 送信速度:パケット/秒
flow1.size.fixed = 123 # パケットサイズ
flow1.metrics.enable = True # メトリクス(統計情報)を有効化
flow1.metrics.loss = False # パケット損失計測は無効
flow1.metrics.timestamps = False # タイムスタンプ計測は無効
# パケットの構造定義
eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()
eth1.src.value = '00:00:00:00:00:aa' # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb' # 宛先MACアドレス
ip1.src.increment.start = '192.168.99.1' # 送信元IPアドレス、スタートするアドレス
ip1.src.increment.step = '0.0.0.1' # 送信元IPアドレス、増加量を指定(いくつずつ増やすか)
ip1.src.increment.count = 10 # 送信元IPアドレス、いくつのアドレスを生成するか(繰り返す数)を指定
ip1.dst.value = '192.168.99.2' # 宛先IPアドレス
udp1.src_port.value = 50001 # 送信元UDPポート
udp1.dst_port.value = 60001 # 宛先UDPポート
#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]
flow2.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False
# パケット構造(flow1と逆方向の通信)
eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp()
eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001
#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)
# トラフィック送信を開始する関数
def start_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
api.set_control_state(ctrl)
# トラフィック送信を停止する関数
def stop_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
api.set_control_state(ctrl)
# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name] # 対象フローを指定
# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')
# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
while True:
metrics = api.get_metrics(mreq) # メトリクスを取得
# 前回表示した行数分、カーソルを上に戻して上書き
if prev_lines:
print(f"\033[{prev_lines}A", end='')
# 出力内容を構築
output_lines = []
#output_lines.append(f"Time: {time.strftime('%H:%M:%S')}") # 現在時刻
for fm in metrics.flow_metrics:
output_lines.append(
f"Flow {fm.name}: "
f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
)
# 出力&行数カウント
for line in output_lines:
print(line)
prev_lines = len(output_lines)
time.sleep(1) # 1秒ごとに更新
except KeyboardInterrupt:
# Ctrl+Cで停止時の処理
print('Stopping traffic...')
stop_traffic()
print('Traffic stopped.')
以下でパケットキャプチャしながら実行
python3 ./traffic-3-ipv4_10flow-10sec.py
パケットキャプチャ結果をみると送信元が「192.168.99.1」~「192.168.99.10」になっていることが確認できます。
(確認しやすいように送信元MACアドレスでフィルタ後の画面キャプチャにしてます)
4.UDPの送信元ポートを複数にて通信
次はさらにUDPの送信元ポート番号を10個にして流してみます。
IPの時と同じようにvalueの部分をincrementに変更することでパラメータを増加させることができます。
+ udp1.src_port.value = 50001 # 送信元UDPポート
- udp1.src_port.increment.start = 50001 # 送信元UDPポート、スタートするポート番号
- udp1.src_port.increment.step = 1 # 送信元UDPポート、増加量を指定(いくつずつ増やすか)
- udp1.src_port.increment.count = 10 # 送信元UDPポート、いくつのアドレスを生成するか(繰り返す数)を指定
以下のPythonスクリプトを作成
traffic-4-udp_10flow-10sec.py
import snappi # snappiライブラリをインポート(OTG APIを使うため)
import time # 時間管理用の標準ライブラリをインポート
# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)
# 新しいコンフィグを作成
cfg = api.config()
# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]
#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name] # 受信先ポートをp2に設定
flow1.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
flow1.rate.pps = 3 # 送信速度:パケット/秒
flow1.size.fixed = 123 # パケットサイズ
flow1.metrics.enable = True # メトリクス(統計情報)を有効化
flow1.metrics.loss = False # パケット損失計測は無効
flow1.metrics.timestamps = False # タイムスタンプ計測は無効
# パケットの構造定義
eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()
eth1.src.value = '00:00:00:00:00:aa' # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb' # 宛先MACアドレス
ip1.src.increment.start = '192.168.99.1' # 送信元IPアドレス、スタートするアドレス
ip1.src.increment.step = '0.0.0.1' # 送信元IPアドレス、増加量を指定(いくつずつ増やすか)
ip1.src.increment.count = 10 # 送信元IPアドレス、いくつのアドレスを生成するか(繰り返す数)を指定
ip1.dst.value = '192.168.99.2' # 宛先IPアドレス
udp1.src_port.increment.start = 50001 # 送信元UDPポート、スタートするポート番号
udp1.src_port.increment.step = 1 # 送信元UDPポート、増加量を指定(いくつずつ増やすか)
udp1.src_port.increment.count = 10 # 送信元UDPポート、いくつのアドレスを生成するか(繰り返す数)を指定
udp1.dst_port.value = 60001 # 宛先UDPポート
#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]
flow2.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False
# パケット構造(flow1と逆方向の通信)
eth2, ip2, udp2 = flow2.packet.ethernet().ipv4().udp()
eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001
#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)
# トラフィック送信を開始する関数
def start_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
api.set_control_state(ctrl)
# トラフィック送信を停止する関数
def stop_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
api.set_control_state(ctrl)
# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name] # 対象フローを指定
# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')
# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
while True:
metrics = api.get_metrics(mreq) # メトリクスを取得
# 前回表示した行数分、カーソルを上に戻して上書き
if prev_lines:
print(f"\033[{prev_lines}A", end='')
# 出力内容を構築
output_lines = []
#output_lines.append(f"Time: {time.strftime('%H:%M:%S')}") # 現在時刻
for fm in metrics.flow_metrics:
output_lines.append(
f"Flow {fm.name}: "
f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
)
# 出力&行数カウント
for line in output_lines:
print(line)
prev_lines = len(output_lines)
time.sleep(1) # 1秒ごとに更新
except KeyboardInterrupt:
# Ctrl+Cで停止時の処理
print('Stopping traffic...')
stop_traffic()
print('Traffic stopped.')
パケットキャプチャ結果をみると送信元UDPポート番号が「50001」~「50010」になっていることが確認できます。
5.VLANタグをつけて通信
次はさらにVLANを付けて流してみます。
以下のようにvlan
のパラメーターを追加することでVLANタグをつけて送信できます。今回は「VLAN-ID: 100」と「プライオリティ(CoS): 3」を設定。
- eth1, ip1, udp1 = flow1.packet.ethernet().ipv4().udp()
+ eth1, vlan1, ip1, udp1 = flow1.packet.ethernet().vlan().ipv4().udp()
+ vlan1.id.value = 100 # VLAN (VLAN-ID)
+ vlan1.priority.value = 3 # VLAN (Priority:CoS)
以下のPythonスクリプトを作成
traffic-5-vlan-10sec.py
import snappi # snappiライブラリをインポート(OTG APIを使うため)
import time # 時間管理用の標準ライブラリをインポート
# OTG サーバ(Ixia-c CE)への接続を初期化(証明書検証は無効)
api = snappi.api(location='https://localhost:8443', verify=False)
# 新しいコンフィグを作成
cfg = api.config()
# ポート定義:ポートp1とp2をローカルの特定ポート番号に割り当てる
p1 = cfg.ports.port(name='p1', location='localhost:5555')[-1]
p2 = cfg.ports.port(name='p2', location='localhost:5556')[-1]
#---------------------------
# フロー1を作成(p1→p2の通信)
#---------------------------
flow1 = cfg.flows.flow(name='p1-p2')[-1]
flow1.tx_rx.port.tx_name = p1.name # 送信元ポートをp1に設定
flow1.tx_rx.port.rx_names = [p2.name] # 受信先ポートをp2に設定
flow1.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
flow1.rate.pps = 3 # 送信速度:パケット/秒
flow1.size.fixed = 123 # パケットサイズ
flow1.metrics.enable = True # メトリクス(統計情報)を有効化
flow1.metrics.loss = False # パケット損失計測は無効
flow1.metrics.timestamps = False # タイムスタンプ計測は無効
# パケットの構造定義
eth1, vlan1, ip1, udp1 = flow1.packet.ethernet().vlan().ipv4().udp()
eth1.src.value = '00:00:00:00:00:aa' # 送信元MACアドレス
eth1.dst.value = '00:00:00:00:00:bb' # 宛先MACアドレス
vlan1.id.value = 100 # VLAN (VLAN-ID)
vlan1.priority.value = 3 # VLAN (Priority:CoS)
ip1.src.increment.start = '192.168.99.1' # 送信元IPアドレス、スタートするアドレス
ip1.src.increment.step = '0.0.0.1' # 送信元IPアドレス、増加量を指定(いくつずつ増やすか)
ip1.src.increment.count = 10 # 送信元IPアドレス、いくつのアドレスを生成するか(繰り返す数)を指定
ip1.dst.value = '192.168.99.2' # 宛先IPアドレス
udp1.src_port.increment.start = 50001 # 送信元UDPポート、スタートするポート番号
udp1.src_port.increment.step = 1 # 送信元UDPポート、増加量を指定(いくつずつ増やすか)
udp1.src_port.increment.count = 10 # 送信元UDPポート、いくつのアドレスを生成するか(繰り返す数)を指定
udp1.dst_port.value = 60001 # 宛先UDPポート
#---------------------------
# フロー2を作成(p2→p1の通信)
#---------------------------
flow2 = cfg.flows.flow(name='p2-p1')[-1]
flow2.tx_rx.port.tx_name = p2.name
flow2.tx_rx.port.rx_names = [p1.name]
flow2.duration.fixed_seconds.seconds = 10 # 送信時間で測定を止める
flow2.rate.pps = 3
flow2.size.fixed = 123
flow2.metrics.enable = True
flow2.metrics.loss = False
flow2.metrics.timestamps = False
# パケット構造(flow1と逆方向の通信)
eth2, vlan2, ip2, udp2 = flow2.packet.ethernet().vlan().ipv4().udp()
eth2.src.value = '00:00:00:00:00:bb'
eth2.dst.value = '00:00:00:00:00:aa'
vlan2.id.value = 100
vlan2.priority.value = 3
ip2.src.value = '192.168.99.2'
ip2.dst.value = '192.168.99.1'
udp2.src_port.value = 60001
udp2.dst_port.value = 50001
#---------------------------
# コンフィグ内容をOTGサーバに送信して適用
#---------------------------
api.set_config(cfg)
# トラフィック送信を開始する関数
def start_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.START
api.set_control_state(ctrl)
# トラフィック送信を停止する関数
def stop_traffic():
ctrl = api.control_state()
ctrl.choice = ctrl.TRAFFIC
ctrl.traffic.choice = ctrl.traffic.FLOW_TRANSMIT
ctrl.traffic.flow_transmit.state = ctrl.traffic.flow_transmit.STOP
api.set_control_state(ctrl)
# メトリクス取得のためのリクエストオブジェクトを作成
mreq = api.metrics_request()
mreq.flow.flow_names = [flow1.name, flow2.name] # 対象フローを指定
# トラフィックの送信を開始
start_traffic()
print('--- Traffic started. Press Ctrl+C to stop; metrics will update below. ---')
# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)# コンソールにリアルタイムでメトリクスを表示(上書き更新方式)
prev_lines = 0
try:
while True:
metrics = api.get_metrics(mreq) # メトリクスを取得
# 前回表示した行数分、カーソルを上に戻して上書き
if prev_lines:
print(f"\033[{prev_lines}A", end='')
# 出力内容を構築
output_lines = []
#output_lines.append(f"Time: {time.strftime('%H:%M:%S')}") # 現在時刻
for fm in metrics.flow_metrics:
output_lines.append(
f"Flow {fm.name}: "
f"tx_frames={fm.frames_tx} ({fm.frames_tx_rate:.0f}pps), "
f"rx_frames={fm.frames_rx} ({fm.frames_rx_rate:.0f}pps)"
)
# 出力&行数カウント
for line in output_lines:
print(line)
prev_lines = len(output_lines)
time.sleep(1) # 1秒ごとに更新
except KeyboardInterrupt:
# Ctrl+Cで停止時の処理
print('Stopping traffic...')
stop_traffic()
print('Traffic stopped.')
パケットキャプチャ結果をみるとVLANタグがついていることと、その中身も「VLAN-ID: 100」と「プライオリティ(CoS): 3」 になっていることが確認できます。
最後に
今回はここまでです。
L2スイッチなどにトラフィックを流す場合は、今回紹介した方法で通信できると思います。
ただし、ルーターやL3スイッチを間に挟んでトラフィックを流す場合は、ARP解決が必要になります。
次回はその点について説明します。
第2回の内容は以下となります。