1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

pymodbusコード解説(5) 非同期サーバー

1
Posted at

Dobot(ロボットコントローラー)とPC間でModbus TCP通信を行う場合、「どちらが主導権を持って相手にデータを要求するか」によって、サーバーとクライアントの役割が入れ替わります。
この点は、Modbusに慣れていないと特に混乱しやすいポイントです。
※ Modbusでは「接続されている側」ではなく、「リクエストを発行する側」がクライアントになります。

1. PC(またはPLC)からロボットを制御・監視する場合

一般的な運用形態であり、PC側のソフトウェア(DobotStudio Proや独自のアプリ)からロボットの状態を取得したり、動作を指示したりするケースです。

  • PC (外部デバイス): Modbusマスター(クライアント)
  • Dobot ロボットコントローラー: Modbusスレーブ(サーバー)
  • 通信の仕組み:
    • ロボットコントローラーは、外部デバイスからのアクセス用に内蔵スレーブを提供しており、ポート 502(map1)または 1502(map2)で待ち受けています。
    • PC側がクライアントとして接続を確立し、ロボット内のレジスタ(コイルや保持レジスタなど)に対して読み書きを行います。

2. ロボットのスクリプトから外部機器(PCやカメラ等)を制御する場合

ロボット側のLuaプログラム(スクリプト)やBlocklyで ModbusCreate 命令を使用し、外部にあるAI-PCやカメラ制御PCなどと同期を図るケースです。

  • Dobot ロボットコントローラー: Modbusマスター(クライアント)
  • PC (外部デバイス): Modbusスレーブ(サーバー)
  • 通信の仕組み:
    • スクリプト内で ModbusCreate(IP, port, slave_id) を実行することで、ロボット側が自発的に外部のサーバーへ接続を試みます。
    • この場合、PC側にはModbusスレーブ(サーバー)として動作するソフトウェアが起動している必要があります。

非同期サーバー

今回は、ロボットのスクリプトから外部機器(PCやカメラ等)を制御する場合を想定し、PC側にModbusスレーブ(サーバー)を立てるコードについて解説します。
※ Dobotからの周期ポーリングや複数要求に耐えるため、PC側は非同期サーバーとして実装するのが安全です。

この server_async.py は何をするコードか

一言で言うと:

PC上に「Modbusスレーブ(サーバー)」を立て、
外部のModbusクライアント(Dobot / PLC / テストクライアント)からの
読み書き要求に非同期で応答するための汎用サーバー

です。

つまりあなたの記事文脈では、

  • Dobot = Modbusクライアント
  • PC = Modbusサーバー(このコード)

という構成を実現するための 正解コードです。

server_async.py
#!/usr/bin/env python3
"""Pymodbus asynchronous Server Example.

An example of a multi threaded asynchronous server.

usage::

    server_async.py [-h] [--comm {tcp,udp,serial,tls}]
                    [--framer {ascii,rtu,socket,tls}]
                    [--log {critical,error,warning,info,debug}]
                    [--port PORT] [--store {sequential,sparse,factory,none}]
                    [--device_ids DEVICE_IDS]

    -h, --help
        show this help message and exit
    -c, --comm {tcp,udp,serial,tls}
        set communication, default is tcp
    -f, --framer {ascii,rtu,socket,tls}
        set framer, default depends on --comm
    -l, --log {critical,error,warning,info,debug}
        set log level, default is info
    -p, --port PORT
        set port
        set serial device baud rate
    --store {sequential,sparse,factory,none}
        set datastore type
    --device_ids DEVICE IDs
        set list of devices to respond to

The corresponding client can be started as:

    python3 client_sync.py

"""
import asyncio
import logging
import sys
from collections.abc import Callable
from typing import Any


try:
    import helper  # type: ignore[import-not-found]
except ImportError:
    print("*** ERROR --> THIS EXAMPLE needs the example directory, please see \n\
          https://pymodbus.readthedocs.io/en/latest/source/examples.html\n\
          for more information.")
    sys.exit(-1)

from pymodbus import ModbusDeviceIdentification
from pymodbus import __version__ as pymodbus_version
from pymodbus.datastore import (
    ModbusDeviceContext,
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSparseDataBlock,
)
from pymodbus.server import (
    StartAsyncSerialServer,
    StartAsyncTcpServer,
    StartAsyncTlsServer,
    StartAsyncUdpServer,
)


_logger = logging.getLogger(__file__)
_logger.setLevel(logging.INFO)


def setup_server(description=None, context=None, cmdline=None):
    """Run server setup."""
    args = helper.get_commandline(server=True, description=description, cmdline=cmdline)
    if context:
        args.context = context
    datablock: Callable[[], Any]
    if not args.context:
        _logger.info("### Create datastore")
        # The datastores only respond to the addresses that are initialized
        # If you initialize a DataBlock to addresses of 0x00 to 0xFF, a request to
        # 0x100 will respond with an invalid address exception.
        # This is because many devices exhibit this kind of behavior (but not all)
        if args.store == "sequential":
            # Continuing, use a sequential block without gaps.
            datablock = lambda : ModbusSequentialDataBlock(0x00, [17] * 100)  # pylint: disable=unnecessary-lambda-assignment
        elif args.store == "sparse":
            # Continuing, or use a sparse DataBlock which can have gaps
            datablock = lambda : ModbusSparseDataBlock({0x00: 0, 0x05: 1})  # pylint: disable=unnecessary-lambda-assignment
        elif args.store == "factory":
            # Alternately, use the factory methods to initialize the DataBlocks
            # or simply do not pass them to have them initialized to 0x00 on the
            # full address range::
            datablock = lambda : ModbusSequentialDataBlock.create()  # pylint: disable=unnecessary-lambda-assignment,unnecessary-lambda

        if args.device_ids > 1:
            # The server then makes use of a server context that allows the server
            # to respond with different device contexts for different device ids.
            # By default it will return the same context for every device id supplied
            # (broadcast mode).
            # However, this can be overloaded by setting the single flag to False and
            # then supplying a dictionary of device id to context mapping::
            context = {}

            for device_id in range(args.device_ids):
                context[device_id] = ModbusDeviceContext(
                    di=datablock(),
                    co=datablock(),
                    hr=datablock(),
                    ir=datablock(),
                )

            single = False
        else:
            context = ModbusDeviceContext(
                di=datablock(), co=datablock(), hr=datablock(), ir=datablock()
            )
            single = True

        # Build data storage
        args.context = ModbusServerContext(devices=context, single=single)

    # ----------------------------------------------------------------------- #
    # initialize the server information
    # ----------------------------------------------------------------------- #
    # If you don't set this or any fields, they are defaulted to empty strings.
    # ----------------------------------------------------------------------- #
    args.identity = ModbusDeviceIdentification(
        info_name={
            "VendorName": "Pymodbus",
            "ProductCode": "PM",
            "VendorUrl": "https://github.com/pymodbus-dev/pymodbus/",
            "ProductName": "Pymodbus Server",
            "ModelName": "Pymodbus Server",
            "MajorMinorRevision": pymodbus_version,
        }
    )
    return args


async def run_async_server(args) -> None:
    """Run server."""
    txt = f"### start ASYNC server, listening on {args.port} - {args.comm}"
    _logger.info(txt)
    if args.comm == "tcp":
        address = (args.host if args.host else "", args.port if args.port else None)
        await StartAsyncTcpServer(
            context=args.context,  # Data storage
            identity=args.identity,  # server identify
            address=address,  # listen address
            # custom_functions=[],  # allow custom handling
            framer=args.framer,  # The framer strategy to use
            # ignore_missing_devices=True,  # ignore request to a missing device
            # broadcast_enable=False,  # treat device 0 as broadcast address,
            # timeout=1,  # waiting time for request to complete
        )
    elif args.comm == "udp":
        address = (
            args.host if args.host else "127.0.0.1",
            args.port if args.port else None,
        )
        await StartAsyncUdpServer(
            context=args.context,  # Data storage
            identity=args.identity,  # server identify
            address=address,  # listen address
            # custom_functions=[],  # allow custom handling
            framer=args.framer,  # The framer strategy to use
            # ignore_missing_devices=True,  # ignore request to a missing device
            # broadcast_enable=False,  # treat device id 0 as broadcast address,
            # timeout=1,  # waiting time for request to complete
        )
    elif args.comm == "serial":
        # socat -d -d PTY,link=/tmp/ptyp0,raw,echo=0,ispeed=9600
        #             PTY,link=/tmp/ttyp0,raw,echo=0,ospeed=9600
        await StartAsyncSerialServer(
            context=args.context,  # Data storage
            identity=args.identity,  # server identify
            # timeout=1,  # waiting time for request to complete
            port=args.port,  # serial port
            # custom_functions=[],  # allow custom handling
            framer=args.framer,  # The framer strategy to use
            # stopbits=1,  # The number of stop bits to use
            # bytesize=8,  # The bytesize of the serial messages
            # parity="N",  # Which kind of parity to use
            baudrate=args.baudrate,  # The baud rate to use for the serial device
            # handle_local_echo=False,  # Handle local echo of the USB-to-RS485 adaptor
            # ignore_missing_devices=True,  # ignore request to a missing device
            # broadcast_enable=False,  # treat device_id 0 as broadcast address,
        )
    elif args.comm == "tls":
        address = (args.host if args.host else "", args.port if args.port else None)
        await StartAsyncTlsServer(
            context=args.context,  # Data storage
            # port=port,  # on which port
            identity=args.identity,  # server identify
            # custom_functions=[],  # allow custom handling
            address=address,  # listen address
            framer=args.framer,  # The framer strategy to use
            certfile=helper.get_certificate(
                "crt"
            ),  # The cert file path for TLS (used if sslctx is None)
            # sslctx=sslctx,  # The SSLContext to use for TLS (default None and auto create)
            keyfile=helper.get_certificate(
                "key"
            ),  # The key file path for TLS (used if sslctx is None)
            # password="none",  # The password for for decrypting the private key file
            # ignore_missing_devices=True,  # ignore request to a missing device
            # broadcast_enable=False,  # treat device_id 0 as broadcast address,
            # timeout=1,  # waiting time for request to complete
        )


async def async_helper() -> None:
    """Combine setup and run."""
    _logger.info("Starting...")
    run_args = setup_server(description="Run asynchronous server.")
    await run_async_server(run_args)


if __name__ == "__main__":
    asyncio.run(async_helper(), debug=True)

以下では、実務(Dobot → PC サーバー)目線で整理して解説します。

全体構造(俯瞰)

このファイルは大きく分けて 4つの役割を持っています。

  1. コマンドライン引数の解析
  2. Modbusデータストア(レジスタ領域)の構築
  3. 通信方式別(TCP/UDP/Serial/TLS)のサーバー起動
  4. asyncio による非同期イベントループ起動

① コマンドライン対応(実務で地味に重要)

usage::

    server_async.py [-h] [--comm {tcp,udp,serial,tls}]
                    [--framer {ascii,rtu,socket,tls}]
                    [--port PORT]
                    [--store {sequential,sparse,factory,none}]
                    [--device_ids DEVICE_IDS]

ここが示しているのは:

  • TCP / UDP / Serial / TLS すべて対応
  • RTU / ASCII / Socket フレーマ対応
  • **レジスタの持ち方(連続 / 疎 / 工場出荷風)**を切替可能
  • 複数Slave IDにも対応

👉 「検証用サーバー」として非常に強力
👉 Dobotとの接続確認にも最適


② データストア構築(Modbusサーバーの心臓部)

ModbusDeviceContext(
    di=datablock(),
    co=datablock(),
    hr=datablock(),
    ir=datablock(),
)

ここで Modbusの4領域を定義しています。

略称 意味
di Discrete Inputs
co Coils
hr Holding Registers
ir Input Registers

sequential / sparse の違い

ModbusSequentialDataBlock(0x00, [17] * 100)
  • 0番地から 連続100レジスタ
  • PLC的な「普通のレジスタ配置」
ModbusSparseDataBlock({0x00: 0, 0x05: 1})
  • 飛び飛びアドレス
  • 実機に近い挙動(未定義アドレスは例外)

👉 Dobot連携では sequential が一番扱いやすい


③ device_ids(超重要)

if args.device_ids > 1:
    context = {}
    for device_id in range(args.device_ids):
        context[device_id] = ModbusDeviceContext(...)
    single = False

ここで何をしているかというと:

  • Slave ID(Unit ID)ごとに別のメモリ空間を持たせる
  • 複数ロボット/複数装置の同時接続を想定

通常のDobot連携では:

  • device_id = 1
  • single = True

👉 PLC的に「1台の装置」として振る舞う


④ Modbusデバイス情報(地味に重要)

args.identity = ModbusDeviceIdentification(
    info_name={
        "VendorName": "Pymodbus",
        "ProductName": "Pymodbus Server",
        "MajorMinorRevision": pymodbus_version,
    }
)

これは Function Code 0x2B / 0x0E
(Read Device Identification)で返されます。

  • クライアント側から
    「どんな装置か?」を問い合わせたときの応答

👉 デバッグ時に「ちゃんとつながっているか」確認しやすい


⑤ 通信方式ごとのサーバー起動

TCP(Dobot用途はここ)

await StartAsyncTcpServer(
    context=args.context,
    identity=args.identity,
    address=("", args.port),
    framer=args.framer,
)
  • デフォルトポート:502
  • Dobot の ModbusCreate(IP, 502, slave_id)完全対応

👉 この記事の主役


Serial / UDP / TLS

  • RS-485
  • 仮想COM
  • 暗号化Modbus

まで公式対応。
産業用途を本気で想定している設計です。


⑥ asyncio による非同期実行

if __name__ == "__main__":
    asyncio.run(async_helper(), debug=True)
  • サーバーは イベントループで常駐
  • 複数クライアントからの同時アクセスに耐える
  • Dobotが周期的にポーリングしても問題なし

👉 GUI・AI処理と同居させても安全


Dobot × このサーバーの関係(超重要まとめ)

役割 担当
Modbus Client Dobot(Lua / Blockly)
Modbus Server PC(この server_async.py)
接続方向 Dobot → PC
主導権 Dobot
用途 シャッタ指示 / 推論完了通知 / OKNG返却

参考資料:

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?