1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

今回は、フリーの Python ベースシミュレーションフレームワーク OptiCommPy をご紹介します。OptiCommPy は光ファイバー通信システムの物理層シミュレーションを容易に行うためのライブラリで、研究や学習目的の用途に適しています。

OptiCommPy の特徴として、M-PAM、M-QAM、M-PSK、OOK など複数のデジタル変調方式に対応しており、IM-DD からコヒーレント光通信システムまで幅広い方式のシミュレーションが可能です。また、光送信器や光増幅、非線形伝搬、受信器などのモデルを備えており、光ファイバー伝送の物理現象を忠実に再現できます。

シグナル処理面でも優れた機能を提供しており、信号の再サンプリング、マッチドフィルタ、クロックリカバリ、分散補償(EDC)やキャリア位相復調など、コヒーレント受信における標準的な DSP ブロックを含んでいます。これらの機能によって、実際の受信アルゴリズムを模した評価を行うことができます。

今回作成する事例

そこで今回は以下の図にあるようなコヒーレント光通信システムをOptiCommPyでシミュレーションしてみます。
スクリーンショット 2026-01-13 151247.png
この図の構成をOptiCommPyを用いて実施する場合、各素子とOptiCommPyの各モジュールとの対応は下記の表になります。
画像1.png

Rx DSPブロック内の「機能別」対応(図のDSPを分解)すると、
画像2.png
となります。今回はテストということで、試しにシミュレーションしてみます。
レーザについては雑音無し、スペクトルの広がりなし、
変調器についても帯域無制限の条件、
受光器についてはparamPD.B = paramTx.Rsで帯域をきめて、今回はparamTx.Rs = 32e9 Hzとしています。
これは受光器(PD+TIA)の電気帯域制限としています。

スクリプト全体

以下がスクリプトになります。
このスクリプトでは,まず送信側のシンボルレートとして paramTx.Rs = 32e9 が設定されており,これは 32 GBd を意味します。変調方式は paramTx.M = 4 かつ constType = "psk" なので QPSK(4-PSK)であり,1シンボルあたりに 2 bit の情報を運びます。さらに paramTx.nPolModes = 2 が指定されているため,信号は偏波多重(PDM)されており,2偏波同時に伝送されます。

送信波形の生成時には,1シンボルあたり 8 サンプルを用いる設定になっており(paramTx.SpS = 8),その結果,送信側のサンプリング周波数は
Fs = Rs × SpS = 32 GHz × 8 = 256 GSa/s
となります。この高いサンプリングレートは,RRC パルス整形や分散などの物理効果を連続時間に近い形で正確に表現するためのものです。

受信側では,整合フィルタ(MF)を通した後に信号を間引きし,1シンボルあたり 2 サンプルの表現に落としています(SpSout = 2)。このときのサンプリング周波数は
2 × Rs = 64 GSa/s
となり,この 2 Sa/sym の信号が EDC や後段 DSP の入力になります。2 Sa/sym にしているのは,タイミングずれや分数遅延に対する余裕を持たせつつ,DSP の計算量を現実的な範囲に抑えるためです。

以上を踏まえると,ビットレートについては,QPSK なので 1 偏波あたりのビットレートは
32 GBd × 2 = 64 Gb/s
となり,これを 2 偏波で送信しているため,1 チャネルあたりの総ビットレートは 128 Gb/s になります。

# -*- coding: utf-8 -*-
"""
Created on Tue Jan 13 13:36:55 2026

@author: babatake
"""

# -*- coding: utf-8 -*-
"""
Created on Tue Jan 13 10:32:46 2026
@author: babatake
"""

import numpy as np
import numbers

from optic.utils import parameters
from optic.models.tx import simpleWDMTx
from optic.models.channels import linearFiberChannel
from optic.models.devices import pdmCoherentReceiver, basicLaserModel, edfa  # ★追加

from optic.dsp.core import pulseShape, firFilter, decimate, pnorm
from optic.dsp.equalization import edc, mimoAdaptEqualizer
from optic.dsp.carrierRecovery import cpr
from optic.comm.metrics import fastBERcalc

import optic.models.devices as dev

# ------------------------------------------------------------
# Monkey patch: make pbs robust against non-numeric theta
# ------------------------------------------------------------
_pbs_orig = dev.pbs

def _to_float_theta(x, default=0.0):
    if isinstance(x, numbers.Real):
        return float(x)
    if isinstance(x, complex) and abs(x.imag) < 1e-15:
        return float(x.real)
    for key in ("polRotation", "theta", "thetaSig", "thetasig", "θ", "θsig"):
        if hasattr(x, key):
            v = getattr(x, key)
            if isinstance(v, numbers.Real):
                return float(v)
    return float(default)

def pbs_safe(E, θ=0.0):
    θ = _to_float_theta(θ, default=0.0)
    return _pbs_orig(E, θ=θ)

dev.pbs = pbs_safe
# ------------------------------------------------------------

def _xcorr_lag_fft(a, b, max_lag):
    N = len(a)
    fa = np.fft.fft(a, n=2*N)
    fb = np.fft.fft(b, n=2*N)
    c = np.fft.ifft(fa * np.conj(fb))
    c = np.concatenate([c[-max_lag:], c[:max_lag+1]])
    lags = np.arange(-max_lag, max_lag+1)
    k = np.argmax(np.abs(c))
    peak = float(np.abs(c[k]) / (np.linalg.norm(a)*np.linalg.norm(b) + 1e-15))
    return int(lags[k]), peak

def reference_check_fast(y2sps, d1sps, name="", Nwin_sym=50000, max_lag=4000):
    print("\n==============================")
    print(f" Reference check (FAST): {name}")
    print("==============================")

    Nwin = min(Nwin_sym, len(d1sps), len(y2sps)//2)
    d = pnorm(d1sps[:Nwin, :])

    cand = {
        "even (0::2)": pnorm(y2sps[0::2, :][:Nwin, :]),
        "odd  (1::2)": pnorm(y2sps[1::2, :][:Nwin, :]),
    }

    best = None
    for phase, y in cand.items():
        for swap in (False, True):
            yy = y[:, ::-1] if swap else y
            for conj in (False, True):
                yyy = np.conj(yy) if conj else yy
                for sx in (1.0, -1.0):
                    for sy in (1.0, -1.0):
                        ytest = yyy.copy()
                        ytest[:, 0] *= sx
                        ytest[:, 1] *= sy

                        lagX, peakX = _xcorr_lag_fft(ytest[:, 0], d[:, 0], max_lag=max_lag)
                        lagY, peakY = _xcorr_lag_fft(ytest[:, 1], d[:, 1], max_lag=max_lag)
                        lag = int(np.round((lagX + lagY) / 2))

                        if lag >= 0:
                            ya = ytest[lag:, :]
                            da = d[:len(ya), :]
                        else:
                            da = d[-lag:, :]
                            ya = ytest[:len(da), :]

                        mse = float(np.mean(np.abs(ya - da)**2) / (np.mean(np.abs(da)**2) + 1e-15))
                        if (best is None) or (mse < best[0]):
                            best = (mse, phase, lag, swap, conj, sx, sy, lagX, lagY, peakX, peakY)

    mse, phase, lag, swap, conj, sx, sy, lagX, lagY, peakX, peakY = best
    print(f"[BEST] phase = {phase}")
    print(f"[BEST] swap={swap}, conj={conj}, sign=({int(sx):+d},{int(sy):+d})")
    print(f"[BEST] lag(avg)={lag}  (lagX={lagX}, lagY={lagY})")
    print(f"[BEST] peak corr (X,Y)=({peakX:.4f},{peakY:.4f})")
    print(f"[BEST] norm MSE={mse:.4e}")
    return best

def apply_best_mapping(sigRx_2sps, symbTx, best):
    mse, phase, lag, swap, conj, sx, sy, *_ = best

    if "even" in phase:
        x1 = pnorm(sigRx_2sps[0::2, :])
    else:
        x1 = pnorm(sigRx_2sps[1::2, :])

    if swap:
        x1 = x1[:, ::-1]
    if conj:
        x1 = np.conj(x1)
    x1[:, 0] *= sx
    x1[:, 1] *= sy

    d1 = pnorm(symbTx)

    if lag >= 0:
        x1a = x1[lag:, :]
        d1a = d1[:len(x1a), :]
    else:
        d1a = d1[-lag:, :]
        x1a = x1[:len(d1a), :]

    return x1a, d1a

def main():
    # -----------------------------
    # 0) Tx parameters
    # -----------------------------
    paramTx = parameters()
    paramTx.M = 4
    paramTx.constType = "psk"
    paramTx.Rs = 32e9
    paramTx.SpS = 8
    paramTx.pulseType = "rrc"
    paramTx.nFilterTaps = 1024
    paramTx.pulseRollOff = 0.1
    paramTx.powerPerChannel = -2   # dBm
    paramTx.nChannels = 1
    paramTx.wdmGridSpacing = 50e9
    paramTx.Fc = 193.1e12
    paramTx.laserLinewidth = 0
    paramTx.nPolModes = 2
    paramTx.nBits = int(np.log2(paramTx.M) * 200_000)
    paramTx.seed = 123

    sigTx, symbTxWDM, paramTx = simpleWDMTx(paramTx)
    symbTx = symbTxWDM[:, :, 0]

    Fs = float(paramTx.Rs * paramTx.SpS)
    print("symbTxWDM.shape =", symbTxWDM.shape)
    print("sigTx.shape     =", sigTx.shape)
    print("symbTx.shape    =", symbTx.shape)
    print(f"Tx: Rs={paramTx.Rs/1e9:.2f} GBd, SpS={paramTx.SpS}, Fs={Fs/1e9:.2f} GSa/s")

    # -----------------------------
    # 1) Linear fiber channel
    # -----------------------------
    paramCh = parameters()
    paramCh.L = 80
    paramCh.alpha = 0.2
    paramCh.D = 16
    paramCh.Fc = paramTx.Fc
    paramCh.Fs = Fs

    sigRxOpt = linearFiberChannel(sigTx, paramCh)

    # -----------------------------
    # ★ 1.5) EDFA (ASE added here)
    #   - 80 km @0.2 dB/km => 16 dB loss, so set G=16 dB to compensate (example)
    # -----------------------------
    paramEDFA = parameters()
    paramEDFA.G = float(paramCh.alpha * paramCh.L)  # 16 dB (loss compensation)
    paramEDFA.NF = 5.0                              # [dB] noise figure (tune this)
    paramEDFA.Fc = paramTx.Fc
    paramEDFA.Fs = Fs
    paramEDFA.seed = 2026
    sigRxOpt = edfa(sigRxOpt, paramEDFA)            # ★ASE付与 :contentReference[oaicite:1]{index=1}

    # -----------------------------
    # 2) LO + coherent receiver FE
    # -----------------------------
    paramLO = parameters()
    paramLO.P = 10
    paramLO.lw = 0
    paramLO.RIN_var = 0
    paramLO.Ns = len(sigRxOpt)
    paramLO.Fs = Fs
    paramLO.seed = 789
    paramLO.freqShift = 0
    sigLO = basicLaserModel(paramLO)

    paramFE = parameters()
    paramFE.Fs = Fs
    paramFE.polRotation = float(np.pi/2)  # 90 deg
    paramFE.pdl = 0.0
    paramFE.polDelay = float(1e-12)       # fractional delay (note: may require timing recovery)

    paramPD = parameters()
    paramPD.B = paramTx.Rs
    paramPD.Fs = Fs
    paramPD.ideal = True
    paramPD.seed = 1011

    sigRx = pdmCoherentReceiver(sigRxOpt, sigLO, paramFE, paramPD)

    # -----------------------------
    # 3) Rx DSP: MF -> decimate to 2 Sa/sym -> EDC
    # -----------------------------
    paramPS = parameters()
    paramPS.SpS = paramTx.SpS
    paramPS.nFilterTaps = paramTx.nFilterTaps
    paramPS.rollOff = paramTx.pulseRollOff
    paramPS.pulseType = paramTx.pulseType
    pulse = pulseShape(paramPS)

    sigRx = firFilter(pulse, sigRx)

    paramDec = parameters()
    paramDec.SpSin = paramTx.SpS
    paramDec.SpSout = 2
    sigRx_2sps = decimate(sigRx, paramDec)

    paramEDC = parameters()
    paramEDC.L = paramCh.L
    paramEDC.D = paramCh.D
    paramEDC.Fc = paramCh.Fc
    paramEDC.Rs = paramTx.Rs
    paramEDC.Fs = 2 * paramTx.Rs
    sigRx_2sps = edc(sigRx_2sps, paramEDC)

    # -----------------------------
    # 4) Reference check -> apply best mapping -> x1a,d1a
    # -----------------------------
    best = reference_check_fast(sigRx_2sps, symbTx, name="After EDC (2 Sa/sym)",
                                Nwin_sym=50000, max_lag=4000)
    x1a, d1a = apply_best_mapping(sigRx_2sps, symbTx, best)

    # -----------------------------
    # 5) 2x2 MIMO equalizer: CMA -> DD-LMS (SpS=1)
    # -----------------------------
    paramEq = parameters()
    paramEq.nTaps = 35
    paramEq.SpS = 1
    paramEq.numIter = 2
    paramEq.storeCoeff = False
    paramEq.M = paramTx.M
    paramEq.constType = "psk"
    paramEq.prgsBar = False
    paramEq.alg = ["cma", "dd-lms"]
    paramEq.mu = [5e-3, 5e-4]
    paramEq.L = [int(0.2 * d1a.shape[0]), int(0.8 * d1a.shape[0])]

    yEQ = mimoAdaptEqualizer(x1a, paramEq, d1a)

    # -----------------------------
    # 6) CPR (QPSK)
    # -----------------------------
    paramCPR = parameters()
    paramCPR.alg = "viterbi"
    paramCPR.M = paramTx.M
    paramCPR.constType = "psk"
    paramCPR.shapingFactor = 0
    paramCPR.N = 35
    paramCPR.B = 64
    paramCPR.returnPhases = False
    paramCPR.Ts = 1 / paramTx.Rs

    yCPR = cpr(yEQ, paramCPR)

    # -----------------------------
    # 7) BER / SER / SNR
    # -----------------------------
    discard = 5000
    N = min(len(yCPR), len(d1a))
    ind = np.arange(discard, N - discard)

    BER, SER, SNR = fastBERcalc(yCPR[ind, :], d1a[ind, :], paramTx.M, "psk")

    print("=== WITH EDFA ASE (CMA -> CPR) ===")
    print("      pol.X      pol.Y")
    print(f" SER: {SER[0]:.2e},  {SER[1]:.2e}")
    print(f" BER: {BER[0]:.2e},  {BER[1]:.2e}")
    print(f" SNR: {SNR[0]:.2f} dB,  {SNR[1]:.2f} dB")

if __name__ == "__main__":
    main()

実行結果

こちらが実行結果になります。理想条件で計算しているので、ビットエラーレート(BER)は0となっています。今後はレーザ光源に雑音を加えたり、帯域制限をかけたりしてシミュレーションを試す予定です。

WITH EDFA ASE (CMA -> CPR)

Metric pol.X pol.Y
SER 0.00e+00 0.00e+00
BER 0.00e+00 0.00e+00
SNR [dB] 29.71 29.71

本記事では OptiCommPy を用いて、DP-QPSK コヒーレント光通信システムのエンドツーエンドシミュレーションを行いました。
送信・伝送・EDFA による ASE 付加・受信・DSP(等化、位相回復)までを Python だけで一貫して扱える点が特徴で、
本スクリプトを基に、送信器や受光器の帯域制限、OSNR 劣化、DSP アルゴリズムの影響を段階的に評価できます。
また、外部ツールで得た周波数応答や回路モデルを組み込むことで、より実機に近い検討へ拡張可能です。
OptiCommPy は、理論検証から設計初期の検討までを効率よく行うための実践的な光通信シミュレーション基盤なので、光通信ステムやDSPの設計ツールとして有用です。

※本記事は筆者個人の見解であり、所属組織の公式見解を示すものではありません。

問い合わせ

光学シミュレーションソフトの導入や技術相談、
設計解析委託をお考えの方はサイバネットシステムにお問合せください。

光学ソリューションサイトについては以下の公式サイトを参照:
👉 光学ソリューションサイト(サイバネット)

光学分野のエンジニアリングサービスについては以下の公式サイトを参照:
👉 光学エンジニアリングサービス(サイバネット)

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?