3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PythonライブラリBACpypes3でBACnetデバイスと通信する

Posted at

BACnetとは

BACnetとは、ビルディングオートメーションで広く利用されている、産業用プロトコルです。
例えば、空調・衛生(水道)・防犯(ドアロック)・防災(火災や消火)などで利用されています。

ASHRAE(アメリカ暖房冷凍空調学会)が中心となって規格化していますが、日本の市場の特徴に合わせるため、電気設備学会が日本向けの仕様も作っています。

BACpypes3

BACnetをPythonで利用するには、BACpypes3というPythonライブラリを導入します。

BACpypesという「3」が末尾につかないライブラリも存在しますが、こちらはasyncioに対応しておらず、非同期IOが一般化した現在においては使いにくいため、async awaitにより並行処理を記述しやすいBACpypes3を今回利用します。

非同期IOに対応したBACpypes3の開発は開始したばかりのため、まだAPIなどは安定していませんし、未実装のメソッドがあります。
しかし、今更非同期IOに対応しないBACpypesを使うのは大変なので、BACpypes3を選択します。

デモサーバを起動

BACnetに対応したデバイスとして、電気設備のコントローラを構築するのは大変です。
そのため、パソコン上で動作するデモサーバを導入します。

なお、デモサーバを起動するパソコンは、BACpypes3を実行するパソコンとは別にしてください。

今回は、bacnet-toolsに同梱されている、bacservを利用します。
Windows向けの実行ファイルは下記のURLから入手できます。

ZIPファイルをダウンロードして解凍し、bacserv.exeを起動してください。

ただし、コマンドラインオプションを指定するため、PowerShellのようなターミナルを用います。
例えばPowerShellの場合、まずは以下のように、デモサーバで使うIPアドレスを指定します。
複数のネットワークインターフェースを有しているパソコンの場合、どのインターフェース上にデモサーバを起動すればよいか分からないためです。

> $Env:BACNET_IFACE = "192.168.0.184"

次に、device idを引数にしてデモサーバを起動します。
以下の例では、device idを123としています。

> .\bacserv.exe 123
BACnet Server Demo
BACnet Stack Version 1.3.4
BACnet Device ID: 123
Max APDU: 1476
BACnet Device Name: SimpleServer

BACpypes3をインストール

BACpypes3はpipで簡単にインストールできます。

> pip install bacpypes3

Pythonプログラムでインポートして実行すると、正常にインストールできたか分かります。

import bacpypes3
print('bacpypes3', bacpypes3.__version__)
実行結果
bacpypes3 0.0.86

ちなみに、私はVSCode上でJupyterNotebookを開いてプログラムを実行しています。
インタラクティブに実行結果を確認できるため、大変便利です。

BACnetデバイスを見つける

まずは、BACpypes3関連や非同期IOのパッケージをインポートします。

import asyncio

from bacpypes3.pdu import Address
from bacpypes3.primitivedata import ObjectIdentifier
from bacpypes3.basetypes import PropertyIdentifier
from bacpypes3.local.device import DeviceObject
from bacpypes3.ipv4.app import NormalApplication
from bacpypes3.pdu import Address, IPv4Address

次に、自分自身のデバイスを定義し、アプリケーションを起動します。

# make a device object
local_device = DeviceObject(
        objectIdentifier=("device", 1),
        objectName="test",
        vendorIdentifier=999,
    )
ipv4_address = IPv4Address("192.168.0.184/24", 47808)
app = NormalApplication(local_device, ipv4_address)

IPv4Address("192.168.0.184/24", 47808)で指定している"192.168.0.184/24"は、自身のパソコンのIPアドレスを指定しています。
BACnetデバイスを探す際にブロードキャストをするため、必ず/24のようなサブネットマスクの情報を追加しておいてください。

最後に、同じネットワーク内のBACnetデバイスを探す、who-isサービスを実行します。

# デバイスを探す
i_ams = await app.who_is()
for i_am in i_ams:
    device_address: Address = i_am.pduSource
    device_identifier: ObjectIdentifier = i_am.iAmDeviceIdentifier
    print(f"{device_identifier} @ {device_address}")

私の環境での実行結果は以下です。

実行例
device,123 @ 192.168.0.223

デバイスの情報を取得

最後に見つけたデバイスのアドレスとIDが、それぞれdevice_address、device_identifierに格納されています。
これらに加え、DeviceObjectのプロパティ名を引数で指定することで、プロパティの値を取得できます。
今回は"object-name"を取得します。

# デバイスの情報をもらう
device_description: str = await app.read_property(
    device_address, device_identifier, "object-name")
device_description
出力例
'SimpleServer'

データを取得

BACnetデバイスのオブジェクトをすべて列挙します。

object_list = await app.read_property(
            device_address, device_identifier, "object-list"
        )
object_list
出力例
[(<ObjectType: device>, 123),
 (<ObjectType: network-port>, 1),
 (<ObjectType: analog-input>, 0),
 (<ObjectType: analog-input>, 1),
 (<ObjectType: analog-input>, 2),
...
 (<ObjectType: accumulator>, 59),
 (<ObjectType: accumulator>, 60),
 (<ObjectType: accumulator>, 61),
 (<ObjectType: accumulator>, 62),
 (<ObjectType: accumulator>, 63)]

様々なオブジェクトをこのデバイスは持っていますが、今回はアナログ入力の0番を取得してみます。

以下のプログラムは、複数の値を読み込むサービスを実行する例です。
analog-input:0から、現在の値と、値の信頼性を取得します。

object_identifier = ObjectIdentifier("analog-input:0")
response = await app.read_property_multiple(
                device_address,
                [object_identifier, [PropertyIdentifier.presentValue, PropertyIdentifier.reliability]]
            )
response
実行例
[((<ObjectType: analog-input>, 0),
  <PropertyIdentifier: present-value>,
  None,
  0.0),
 ((<ObjectType: analog-input>, 0),
  <PropertyIdentifier: reliability>,
  None,
  <Reliability: no-fault-detected>)]

この実行結果から、値は「0.0」で、信頼性は「問題なし」ということが分かります。

データを設定

write_propertyメソッドを使うことで、オブジェクトに対して値を設定することもできます。
値を設定するため、analog-output:1を対象のオブジェクトとして指定します。
今回は、10を書き込んでみます。

まずは、書き込む前に、現在の値を取得します。

present_value = await app.read_property(
    device_address,
    ObjectIdentifier("analog-output:1"),
    PropertyIdentifier.presentValue
)
present_value
出力例
0.0

次に、値を設定します。

await app.write_property(
    device_address,
    ObjectIdentifier("analog-output:1"),
    PropertyIdentifier.presentValue,
    10.0
)

設定後に、再度、現在値を取得します。

present_value = await app.read_property(
    device_address,
    ObjectIdentifier("analog-output:1"),
    PropertyIdentifier.presentValue
)
present_value
出力例
10.0

以上の結果から、正しく値の設定ができていることが分かりました。

データの変化を通知(COV)

BACnetとは、値を監視する仕組みとしてCOV(Change Of Value)があります。

以下のプログラムは、analog-value:1オブジェクトの値を30秒間の中で変更があれば通知する設定です。
通知があるまで処理がブロックされるため、デモサーバへの値の書き込みは別のツールから行います。
count < 10とループ条件を指定することで、10回変化が発生すれば、停止するようにしています。

async with app.change_of_value(
                device_address,
                ObjectIdentifier("analog-value:1"),
                lifetime=30
            ) as scm:
    count = 0
    while count < 10:
        property_identifier, property_value = await scm.get_value()
        print(f"{property_identifier} {property_value}")
        count = count + 1
実行例
present-value 10.0
status-flags 
present-value 1.0
status-flags 
present-value 10.0
status-flags 
present-value 1.0
status-flags 
present-value 10.0
status-flags 

Yet Another BACnet Explorerなどの別のツールからanalog-value:1オブジェクトの値を変更したところ、COVの仕組みにより値の変更が通知されました。

アプリケーションを終了する

忘れずに終了する処理も実行しましょう。

app.close()
3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?