3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ModbusスレーブをPCでエミュレートする

Posted at

はじめに

産業用装置のコントローラであるPLCから、PC上でデータを取得する方法として、Modbusが多く使われています。
この場合、PCがModbusマスタ、PLCがModbusスレーブとして動作します。
PCがModbusマスタとなるプログラム例はネット上に多く見つかりますが、逆にPCをModbusスレーブとして動作させるプログラム例は見つからなかったため、記事にしました。

対象とするケースは、ModbusでPLCからデータを取得するIoTゲートウェイを導入予定だが、正しくIoTゲートウェイが動作しデータを取得できるかのテストです。

image.png

PyModbusの導入

PC上でPythonスレーブをエミュレートするソフトウェアはいくつかありますが、今回はPythonパッケージであるPyModbusを使ってModbusスレーブを作成します。
PLCの動作をPythonを使って記述することで、PLCを搭載した装置自体の動作をエミュレート可能になります。

Pythonが実行できる環境で、以下のコマンドでインストールできます。

pip install pymodbus

Modbus/RTUにおけるModbusスレーブ

さて、早速プログラムを作ってみましょう。
今回はModbusマスタ端末(IoTゲートウェイ)とRS485接続する想定です。

import asyncio
import datetime
import pymodbus
from pymodbus.server import StartAsyncSerialServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext

async def run():
    store = ModbusSlaveContext(ir=ModbusSequentialDataBlock(0, [17] * 100))
    slaves = {
        0x01: store,
    }
    context = ModbusServerContext(slaves=slaves, single=False)
    await StartAsyncSerialServer(context=context, port="COM3", baudrate=19200, stopbits=1, bytesize=8, parity="N")
    
if __name__ == "__main__":
    pymodbus.pymodbus_apply_logging_config("DEBUG")
    asyncio.run(
        run(), debug=True
    )

以下、プログラムを解説します。

Modbusスレーブ端末のレジスタを定義します。
読み取り専用アナログ値レジスタである、入力レジスタirを、アドレス0から100ワード領域、17で初期化して確保します。

store = ModbusSlaveContext(ir=ModbusSequentialDataBlock(0, [17] * 100))

次に、スレーブ番号とレジスタを対応付けます。
ここでは、先ほど定義したレジスタを、スレーブ番号1に対応付けしています。

slaves = {
    0x01: store,
}

続いて、ModbusServerContextを初期化します。
Modbusスレーブは相手からの接続を待ち受ける側、いわゆるサーバとなります。

context = ModbusServerContext(slaves=slaves, single=False)

最後に、初期化したModbusServerContextをシリアル通信上で起動します。

await StartAsyncSerialServer(context=context, port="COM3", baudrate=19200, stopbits=1, bytesize=8, parity="N")

port="COM3"は、PCに接続したシリアル通信端末のポート番号を表しています。
Windowsの場合、COM◯◯(◯◯は番号)になると思います。
PCでRS485通信をするのに、USB・RS485コンバータを使う場合が多いと思いますが、PCにコンバータを接続すると、デバイスマネージャからCOMの番号を確認できます。

image.png

シリアル通信の設定は以下のとおりです。
RS485では複数の端末が同一のケーブル上にバス接続するため、接続したすべてのデバイスが同じ設定内容でなければなりません。
IoTゲートウェイも同じ設定か、確認が必要です。

  • baudrate=19200 ボーレート。いわゆる通信速度。速度を優先しないのであれば、19200bpsが一般的だと思います
  • stopbits=1 ストップビット。1か2を指定。1が多いと思います
  • bytesize=8 バイトサイズ。8か7を指定
  • parity="N" パリティ。誤り検出用ビットです。今回はN(パリティなし)を指定

IoTゲートウェイと通信

あるIoTゲートウェイと接続し、アドレス0から11ワード分を読み込む設定をしました。
すると、IoTゲートウェイからPLCへ毎秒データの要求をしているようで、ログがどんどん出力されます。
そのログの一部を切り取ってみました。

DEBUG logging:103 recv: 0x1 0x4 0x0 0x0 0x0 0xb 0xb1 0xcd old_data:  addr=None
DEBUG logging:103 Handling data: 0x1 0x4 0x0 0x0 0x0 0xb 0xb1 0xcd
DEBUG logging:103 Processing: 0x1 0x4 0x0 0x0 0x0 0xb 0xb1 0xcd
DEBUG logging:103 Getting Frame - 0x4 0x0 0x0 0x0 0xb
DEBUG logging:103 Factory Request[ReadInputRegistersRequest': 4]
DEBUG logging:103 Frame advanced, resetting header!!
DEBUG logging:103 validate: fc-[4] address-1: count-11
DEBUG logging:103 getValues: fc-[4] address-1: count-11
DEBUG logging:103 send: 0x1 0x4 0x16 0x0 0x11 0x0 0x11 0x0 0x11 0x0 0x11 
0x0 0x11 0x0 0x11 0x0 0x11 0x0 0x11 0x0 0x11 0x0 0x11 0x0 0x11 0x81 0x19

ログの1行目を見ると、「recv」とあり、シリアル通信を受信しています。

それを、「Handling data」とあるように、受信したデータを処理します。

データの先頭1バイトが0x01となるので、ユニット番号1を対象とした送信を表します。
Pythonでプログラムしたように、このエミュレータはユニット番号1をエミュレートしているため、自身への送信と受け止め、処理を続けます。

「Factory Request[ReadInputRegistersRequest': 4]」とあり、「0x04」をファクションコードとして、入力レジスタの読み込みと解釈しました。

「getValues」では、ModbusSequentialDataBlockに対してデータの要求をしています。

最後、「send」で、11ワード分のデータをシリアル通信で送信しています。

このように、もしModbus通信で問題が発生した場合は、ログを見ることで解決のヒントを得ることができます。

データを定期的に更新する

PythonでエミュレートしたデータをIoTゲートウェイで取得することはできましたが、データが変化しないため、正しく定期的にPLCにアクセスしているか分かりません。
そこで、レジスタの内容を定期的に変更してみます。

今回設定するPLCのレジスタの説明です。
以下のように、現在日時と起動後経過秒数を出力するようにしました。
一応1秒ごとに更新しますが、無限ループで1秒スリープを追加しているだけなので、厳密に1秒を刻むわけではありません。

アドレス 項目 内容
0 year 現在日時の年(数値4桁)
1 month 現在日時の月(1~12)
2 day 現在日時の日(1~31)
3 hour 現在日時の時(0~23)
4 minute 現在日時の分(0~59)
5 second 現在日時の秒(0~59)
6 seconds since startup lower 起動後の経過秒数(下位)
7 seconds since startup higher 起動後の経過秒数(上位)
8~ blank 17で固定

以上のレジスタのModbusスレーブをエミュレートするプログラムは以下です。

import asyncio
import datetime
import pymodbus
from pymodbus.server import StartAsyncSerialServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext

async def update_datablock(store):
    step = 0
    while True:
        now = datetime.datetime.now()
        values = [0] * 8
        values[0] = now.year
        values[1] = now.month
        values[2] = now.day
        values[3] = now.hour
        values[4] = now.minute
        values[5] = now.second
        values[6] = int(step) & 0xff
        values[7] = (int(step) >> 16) & 0xff
        store.setValues(4, 0, values)
        step += 1
        await asyncio.sleep(1.0)

async def run():
    store = ModbusSlaveContext(ir=ModbusSequentialDataBlock(0, [17] * 100))
    slaves = {
        0x01: store,
    }
    context = ModbusServerContext(slaves=slaves, single=False)

    task = asyncio.create_task(update_datablock(store=store))
    task.set_name("datetime updating task")
    await StartAsyncSerialServer(context=context, port="COM3", baudrate=19200, stopbits=1, bytesize=8, parity="N")
    task.cancel()
    
if __name__ == "__main__":
    pymodbus.pymodbus_apply_logging_config("DEBUG")
    asyncio.run(
        run(), debug=True
    )

pymodbusは非同期IOで並行処理を実現しています。
pymodbusのサーバで処理をブロックしている間に並行してレジスタの内容を書き換える処理を実行するため、以下のプログラムを追加します。

task = asyncio.create_task(update_datablock(store=store))

これにより、update_datablockメソッドが並列して実行されます。
update_datablockメソッドの内容は以下のとおりです。

async def update_datablock(store):
    step = 0
    while True:
        now = datetime.datetime.now()
        values = [0] * 8
        values[0] = now.year
        values[1] = now.month
        values[2] = now.day
        values[3] = now.hour
        values[4] = now.minute
        values[5] = now.second
        values[6] = int(step) & 0xff
        values[7] = (int(step) >> 16) & 0xff
        store.setValues(4, 0, values)
        step += 1
        await asyncio.sleep(1.0)

現在日時を取得して配列に年月や日時を配列に書き込みします。
次に、レジスタに値を書き込み、1秒間スリープします。
この処理を無限ループします。

複数のModbusスレーブをエミュレート

以下の環境を想定し、複数のModbusスレーブを制御してみます。

  • 水タンクがあり、水タンクには徐々に水が入っていき、水位が上昇します
  • 水タンクの水位を監視し、現在の水位をModbus出力する装置があります(スレーブ#1)
  • 水タンクのポンプを動作させ、排水を行える装置があります(スレーブ#2)

これらの条件でタンクを制御できるIoTゲートウェイのテストを実施してみます。

image.png

水位計測PLC(スレーブ#1)の入力レジスタの内容は以下のとおりです。

アドレス 項目 内容
0 water level 現在の水位[mm]

ポンプ制御PLC(スレーブ#2)の保持レジスタの内容は以下のとおりです。
IoTゲートウェイからレジスタの内容を書き換え可能な保持レジスタとしています。

アドレス 項目 内容
0 pump power 0:off 1:on

このシミュレーションを実現するプログラムは以下のとおりです。

water-tank-sim.py
import asyncio
import pymodbus
from pymodbus.server import StartAsyncSerialServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext

async def update_datablock(slave1: ModbusSlaveContext, slave2: ModbusSlaveContext):
    count = 0
    print('start')
    while True:
        # PLCのデータを取得
        level = slave1.getValues(4, 0, 1)[0]
        pump = slave2.getValues(3, 0, 1)[0]
        # ポンプ制御PLCの保持レジスタ0が1なら、
        # ポンプで排水(-10mm/0.1sec)
        if pump == 1:
            level = level - 10
        # タンクに自然流入(+1mm/0.1sec)
        level = level + 1
        # 水位の下限は0mm、上限は1000mm
        level = min(1000, max(0, level))
        # 更新した水位センサのデータをPLCに設定
        slave1.setValues(4, 0, [level])
        # カウントをインクリメント
        count = count + 1
        # 1秒ごとに水位を出力
        if count % 10 == 0:
            print("water level:", level, "mm,", "pump:", pump)
        # 0.1秒スリープ
        await asyncio.sleep(0.1)

async def run():
    slave1 = ModbusSlaveContext(ir=ModbusSequentialDataBlock(0, [17, 500]))
    slave2 = ModbusSlaveContext(hr=ModbusSequentialDataBlock(0, [17, 0]))
    slaves = {
        0x01: slave1,
        0x02: slave2,
    }
    context = ModbusServerContext(slaves=slaves, single=False)

    task = asyncio.create_task(update_datablock(slave1, slave2))
    task.set_name("datetime updating task")
    await StartAsyncSerialServer(context=context, port="COM3", baudrate=19200, stopbits=1, bytesize=8, parity="N")
    task.cancel()

    
if __name__ == "__main__":
    #pymodbus.pymodbus_apply_logging_config("DEBUG") #デバッグ時にコメントアウトする
    asyncio.run(
        run(), debug=True
    )

プログラムを解説します。

今回、Modbusスレーブは2つ使います。
そのため、slaves変数に2つのスレーブを設定しています。
データブロックに、[17, 500]と、先頭に17を追加しているのは、ブランクのためです。
なぜかgetValuesメソッドなどで2番目の数値にアクセスするので、1つずらすためです。

slave1 = ModbusSlaveContext(ir=ModbusSequentialDataBlock(0, [17, 500]))
slave2 = ModbusSlaveContext(hr=ModbusSequentialDataBlock(0, [17, 0]))
slaves = {
    0x01: slave1,
    0x02: slave2,
}

update_datablockメソッドは、コード中のコメントのとおりです。

このプログラムを実行し、IoTゲートウェイからModbusスレーブ#2を操作し、ポンプをON、OFFした結果は以下です。
ポンプ動作により水位が変化していることが分かります。

water level: 510 mm, pump: 0 <-自然流入により水位が徐々に上昇
water level: 520 mm, pump: 0
water level: 530 mm, pump: 0
water level: 540 mm, pump: 0
water level: 550 mm, pump: 0
water level: 560 mm, pump: 0
water level: 570 mm, pump: 0
water level: 580 mm, pump: 0
water level: 590 mm, pump: 0
water level: 600 mm, pump: 0
water level: 560 mm, pump: 1 <-ポンプ始動、以降水位が下がり始める
water level: 470 mm, pump: 1
water level: 380 mm, pump: 1
water level: 290 mm, pump: 1
water level: 200 mm, pump: 1
water level: 110 mm, pump: 1
water level: 40 mm, pump: 0 <-ポンプ停止、再び水位が徐々に上昇
water level: 50 mm, pump: 0
water level: 60 mm, pump: 0
water level: 70 mm, pump: 0
water level: 80 mm, pump: 0
water level: 90 mm, pump: 0

まとめ

PythonでModbusスレーブをエミュレートすることで、IoTゲートウェイなどのModbusマスタの動作を簡単にテストすることができます。
例えば、「タンクの水位の管理」のような設定においても、ポンプPLCが動作したら水位を減らすなどを、Pythonプログラム上で記述する事ができます。

ぜひ、Industrial IoTにご活用ください。

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?