Dobot(ロボットコントローラー)とPC間でModbus TCP通信を行う場合、「どちらが主導権を持って相手にデータを要求するか」によって、サーバーとクライアントの役割が入れ替わります。
この点は、Modbusに慣れていないと特に混乱しやすいポイントです。
※ Modbusでは「接続されている側」ではなく、「リクエストを発行する側」がクライアントになります。
1. PC(またはPLC)からロボットを制御・監視する場合
一般的な運用形態であり、PC側のソフトウェア(DobotStudio Proや独自のアプリ)からロボットの状態を取得したり、動作を指示したりするケースです。
- PC (外部デバイス): Modbusマスター(クライアント)
- Dobot ロボットコントローラー: Modbusスレーブ(サーバー)
-
通信の仕組み:
- ロボットコントローラーは、外部デバイスからのアクセス用に内蔵スレーブを提供しており、ポート 502(map1)または 1502(map2)で待ち受けています。
- PC側がクライアントとして接続を確立し、ロボット内のレジスタ(コイルや保持レジスタなど)に対して読み書きを行います。
2. ロボットのスクリプトから外部機器(PCやカメラ等)を制御する場合
ロボット側のLuaプログラム(スクリプト)やBlocklyで ModbusCreate 命令を使用し、外部にあるAI-PCやカメラ制御PCなどと同期を図るケースです。
- Dobot ロボットコントローラー: Modbusマスター(クライアント)
- PC (外部デバイス): Modbusスレーブ(サーバー)
-
通信の仕組み:
- スクリプト内で
ModbusCreate(IP, port, slave_id)を実行することで、ロボット側が自発的に外部のサーバーへ接続を試みます。 - この場合、PC側にはModbusスレーブ(サーバー)として動作するソフトウェアが起動している必要があります。
- スクリプト内で
非同期サーバー
今回は、ロボットのスクリプトから外部機器(PCやカメラ等)を制御する場合を想定し、PC側にModbusスレーブ(サーバー)を立てるコードについて解説します。
※ Dobotからの周期ポーリングや複数要求に耐えるため、PC側は非同期サーバーとして実装するのが安全です。
この server_async.py は何をするコードか
一言で言うと:
PC上に「Modbusスレーブ(サーバー)」を立て、
外部のModbusクライアント(Dobot / PLC / テストクライアント)からの
読み書き要求に非同期で応答するための汎用サーバー
です。
つまりあなたの記事文脈では、
- Dobot = Modbusクライアント
- PC = Modbusサーバー(このコード)
という構成を実現するための 正解コードです。
#!/usr/bin/env python3
"""Pymodbus asynchronous Server Example.
An example of a multi threaded asynchronous server.
usage::
server_async.py [-h] [--comm {tcp,udp,serial,tls}]
[--framer {ascii,rtu,socket,tls}]
[--log {critical,error,warning,info,debug}]
[--port PORT] [--store {sequential,sparse,factory,none}]
[--device_ids DEVICE_IDS]
-h, --help
show this help message and exit
-c, --comm {tcp,udp,serial,tls}
set communication, default is tcp
-f, --framer {ascii,rtu,socket,tls}
set framer, default depends on --comm
-l, --log {critical,error,warning,info,debug}
set log level, default is info
-p, --port PORT
set port
set serial device baud rate
--store {sequential,sparse,factory,none}
set datastore type
--device_ids DEVICE IDs
set list of devices to respond to
The corresponding client can be started as:
python3 client_sync.py
"""
import asyncio
import logging
import sys
from collections.abc import Callable
from typing import Any
try:
import helper # type: ignore[import-not-found]
except ImportError:
print("*** ERROR --> THIS EXAMPLE needs the example directory, please see \n\
https://pymodbus.readthedocs.io/en/latest/source/examples.html\n\
for more information.")
sys.exit(-1)
from pymodbus import ModbusDeviceIdentification
from pymodbus import __version__ as pymodbus_version
from pymodbus.datastore import (
ModbusDeviceContext,
ModbusSequentialDataBlock,
ModbusServerContext,
ModbusSparseDataBlock,
)
from pymodbus.server import (
StartAsyncSerialServer,
StartAsyncTcpServer,
StartAsyncTlsServer,
StartAsyncUdpServer,
)
_logger = logging.getLogger(__file__)
_logger.setLevel(logging.INFO)
def setup_server(description=None, context=None, cmdline=None):
"""Run server setup."""
args = helper.get_commandline(server=True, description=description, cmdline=cmdline)
if context:
args.context = context
datablock: Callable[[], Any]
if not args.context:
_logger.info("### Create datastore")
# The datastores only respond to the addresses that are initialized
# If you initialize a DataBlock to addresses of 0x00 to 0xFF, a request to
# 0x100 will respond with an invalid address exception.
# This is because many devices exhibit this kind of behavior (but not all)
if args.store == "sequential":
# Continuing, use a sequential block without gaps.
datablock = lambda : ModbusSequentialDataBlock(0x00, [17] * 100) # pylint: disable=unnecessary-lambda-assignment
elif args.store == "sparse":
# Continuing, or use a sparse DataBlock which can have gaps
datablock = lambda : ModbusSparseDataBlock({0x00: 0, 0x05: 1}) # pylint: disable=unnecessary-lambda-assignment
elif args.store == "factory":
# Alternately, use the factory methods to initialize the DataBlocks
# or simply do not pass them to have them initialized to 0x00 on the
# full address range::
datablock = lambda : ModbusSequentialDataBlock.create() # pylint: disable=unnecessary-lambda-assignment,unnecessary-lambda
if args.device_ids > 1:
# The server then makes use of a server context that allows the server
# to respond with different device contexts for different device ids.
# By default it will return the same context for every device id supplied
# (broadcast mode).
# However, this can be overloaded by setting the single flag to False and
# then supplying a dictionary of device id to context mapping::
context = {}
for device_id in range(args.device_ids):
context[device_id] = ModbusDeviceContext(
di=datablock(),
co=datablock(),
hr=datablock(),
ir=datablock(),
)
single = False
else:
context = ModbusDeviceContext(
di=datablock(), co=datablock(), hr=datablock(), ir=datablock()
)
single = True
# Build data storage
args.context = ModbusServerContext(devices=context, single=single)
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
# If you don't set this or any fields, they are defaulted to empty strings.
# ----------------------------------------------------------------------- #
args.identity = ModbusDeviceIdentification(
info_name={
"VendorName": "Pymodbus",
"ProductCode": "PM",
"VendorUrl": "https://github.com/pymodbus-dev/pymodbus/",
"ProductName": "Pymodbus Server",
"ModelName": "Pymodbus Server",
"MajorMinorRevision": pymodbus_version,
}
)
return args
async def run_async_server(args) -> None:
"""Run server."""
txt = f"### start ASYNC server, listening on {args.port} - {args.comm}"
_logger.info(txt)
if args.comm == "tcp":
address = (args.host if args.host else "", args.port if args.port else None)
await StartAsyncTcpServer(
context=args.context, # Data storage
identity=args.identity, # server identify
address=address, # listen address
# custom_functions=[], # allow custom handling
framer=args.framer, # The framer strategy to use
# ignore_missing_devices=True, # ignore request to a missing device
# broadcast_enable=False, # treat device 0 as broadcast address,
# timeout=1, # waiting time for request to complete
)
elif args.comm == "udp":
address = (
args.host if args.host else "127.0.0.1",
args.port if args.port else None,
)
await StartAsyncUdpServer(
context=args.context, # Data storage
identity=args.identity, # server identify
address=address, # listen address
# custom_functions=[], # allow custom handling
framer=args.framer, # The framer strategy to use
# ignore_missing_devices=True, # ignore request to a missing device
# broadcast_enable=False, # treat device id 0 as broadcast address,
# timeout=1, # waiting time for request to complete
)
elif args.comm == "serial":
# socat -d -d PTY,link=/tmp/ptyp0,raw,echo=0,ispeed=9600
# PTY,link=/tmp/ttyp0,raw,echo=0,ospeed=9600
await StartAsyncSerialServer(
context=args.context, # Data storage
identity=args.identity, # server identify
# timeout=1, # waiting time for request to complete
port=args.port, # serial port
# custom_functions=[], # allow custom handling
framer=args.framer, # The framer strategy to use
# stopbits=1, # The number of stop bits to use
# bytesize=8, # The bytesize of the serial messages
# parity="N", # Which kind of parity to use
baudrate=args.baudrate, # The baud rate to use for the serial device
# handle_local_echo=False, # Handle local echo of the USB-to-RS485 adaptor
# ignore_missing_devices=True, # ignore request to a missing device
# broadcast_enable=False, # treat device_id 0 as broadcast address,
)
elif args.comm == "tls":
address = (args.host if args.host else "", args.port if args.port else None)
await StartAsyncTlsServer(
context=args.context, # Data storage
# port=port, # on which port
identity=args.identity, # server identify
# custom_functions=[], # allow custom handling
address=address, # listen address
framer=args.framer, # The framer strategy to use
certfile=helper.get_certificate(
"crt"
), # The cert file path for TLS (used if sslctx is None)
# sslctx=sslctx, # The SSLContext to use for TLS (default None and auto create)
keyfile=helper.get_certificate(
"key"
), # The key file path for TLS (used if sslctx is None)
# password="none", # The password for for decrypting the private key file
# ignore_missing_devices=True, # ignore request to a missing device
# broadcast_enable=False, # treat device_id 0 as broadcast address,
# timeout=1, # waiting time for request to complete
)
async def async_helper() -> None:
"""Combine setup and run."""
_logger.info("Starting...")
run_args = setup_server(description="Run asynchronous server.")
await run_async_server(run_args)
if __name__ == "__main__":
asyncio.run(async_helper(), debug=True)
以下では、実務(Dobot → PC サーバー)目線で整理して解説します。
全体構造(俯瞰)
このファイルは大きく分けて 4つの役割を持っています。
- コマンドライン引数の解析
- Modbusデータストア(レジスタ領域)の構築
- 通信方式別(TCP/UDP/Serial/TLS)のサーバー起動
- asyncio による非同期イベントループ起動
① コマンドライン対応(実務で地味に重要)
usage::
server_async.py [-h] [--comm {tcp,udp,serial,tls}]
[--framer {ascii,rtu,socket,tls}]
[--port PORT]
[--store {sequential,sparse,factory,none}]
[--device_ids DEVICE_IDS]
ここが示しているのは:
- TCP / UDP / Serial / TLS すべて対応
- RTU / ASCII / Socket フレーマ対応
- **レジスタの持ち方(連続 / 疎 / 工場出荷風)**を切替可能
- 複数Slave IDにも対応
👉 「検証用サーバー」として非常に強力
👉 Dobotとの接続確認にも最適
② データストア構築(Modbusサーバーの心臓部)
ModbusDeviceContext(
di=datablock(),
co=datablock(),
hr=datablock(),
ir=datablock(),
)
ここで Modbusの4領域を定義しています。
| 略称 | 意味 |
|---|---|
| di | Discrete Inputs |
| co | Coils |
| hr | Holding Registers |
| ir | Input Registers |
sequential / sparse の違い
ModbusSequentialDataBlock(0x00, [17] * 100)
- 0番地から 連続100レジスタ
- PLC的な「普通のレジスタ配置」
ModbusSparseDataBlock({0x00: 0, 0x05: 1})
- 飛び飛びアドレス
- 実機に近い挙動(未定義アドレスは例外)
👉 Dobot連携では sequential が一番扱いやすい
③ device_ids(超重要)
if args.device_ids > 1:
context = {}
for device_id in range(args.device_ids):
context[device_id] = ModbusDeviceContext(...)
single = False
ここで何をしているかというと:
- Slave ID(Unit ID)ごとに別のメモリ空間を持たせる
- 複数ロボット/複数装置の同時接続を想定
通常のDobot連携では:
device_id = 1single = True
👉 PLC的に「1台の装置」として振る舞う
④ Modbusデバイス情報(地味に重要)
args.identity = ModbusDeviceIdentification(
info_name={
"VendorName": "Pymodbus",
"ProductName": "Pymodbus Server",
"MajorMinorRevision": pymodbus_version,
}
)
これは Function Code 0x2B / 0x0E
(Read Device Identification)で返されます。
- クライアント側から
「どんな装置か?」を問い合わせたときの応答
👉 デバッグ時に「ちゃんとつながっているか」確認しやすい
⑤ 通信方式ごとのサーバー起動
TCP(Dobot用途はここ)
await StartAsyncTcpServer(
context=args.context,
identity=args.identity,
address=("", args.port),
framer=args.framer,
)
- デフォルトポート:502
- Dobot の
ModbusCreate(IP, 502, slave_id)と 完全対応
👉 この記事の主役
Serial / UDP / TLS
- RS-485
- 仮想COM
- 暗号化Modbus
まで公式対応。
産業用途を本気で想定している設計です。
⑥ asyncio による非同期実行
if __name__ == "__main__":
asyncio.run(async_helper(), debug=True)
- サーバーは イベントループで常駐
- 複数クライアントからの同時アクセスに耐える
- Dobotが周期的にポーリングしても問題なし
👉 GUI・AI処理と同居させても安全
Dobot × このサーバーの関係(超重要まとめ)
| 役割 | 担当 |
|---|---|
| Modbus Client | Dobot(Lua / Blockly) |
| Modbus Server | PC(この server_async.py) |
| 接続方向 | Dobot → PC |
| 主導権 | Dobot |
| 用途 | シャッタ指示 / 推論完了通知 / OKNG返却 |
参考資料: