0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ORiN3を使ってデバイスにアクセスしてみる - python編

Last updated at Posted at 2025-05-14

ORiN3を使ってデバイスにアクセスしてみる - python編

はじめに

今回は、pythonでORiN3にアクセスしてみようと思います。

1. 前準備

プロキシ設定

もしネットワークがプロキシ経由になっている場合、APTコマンドもプロキシ設定が必要です。
以下の設定ファイルを確認してください。

$ sudo nano /etc/apt/apt.conf.d/95proxies

設定例:

Acquire::http::Proxy "http://<プロキシのIP>:<ポート>";
Acquire::https::Proxy "http://<プロキシのIP>:<ポート>";

プロキシを使用していない場合、この手順はスキップできます。

Ubuntuのバージョンなど

以下の環境で試してみた結果を記事に書いています。

$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 24.04.1 LTS
Release:        24.04
Codename:       noble

2. pythonパッケージの設定

Ubuntu24の環境であれば最初からpython3.12が含まれていると思います。

$ python3 --version
Python 3.12.3

なのでまずはpipのインストールから。

$ sudo apt install python3-pip
$ pip --version
pip 24.0 from /usr/lib/python3/dist-packages/pip (python 3.12)

次にvenvをインストールしてpythonの仮想環境をセットアップします。

$ mkdir orin3env
$ sudo apt install python3.12-venv
$ python3 -m venv ~/orin3env/venv
$ source ~/orin3env/venv/bin/activate

必要なpythonパッケージを設定していきます。

(venv) $ pip install orin3-message-client
(venv) $ pip show orin3-message-client
Name: orin3-message-client
Version: 1.0.232
Summary: Package for a gRPC client connecting to ORiN3 Remote Engine / ORiN3 Provider.
Home-page:
Author: densowave
Author-email: contact.iot@denso-wave.com
License:
Location: /home/jara/orin3env/venv/lib/python3.12/site-packages
Requires: grpcio, grpcio-tools
Required-by:

3. プロバイダの設定

以下の記事を参考にしていただきORiN3プロバイダーをインストールしてみてください。
今回はMockプロバイダー(version.1.1.0)を使用します。

4. サンプルプログラムを使ったデータアクセス

これで準備は完了です。
以下のコードを実行すればTrue/Falseの値が10回出力されるはずです。

import asyncio
import time
import uuid
from orin3_provider_client.v1.client_creator import ClientCreator
from orin3_provider_client.v1.common import ORiN3ValueType
from orin3_remote_engine_client.v1.remote_engine_client import ProtocolType, ProviderEndpoint, RemoteEngineClient, TelemetryOption, LogLevel

async def main():
    # Connect to Remote Engine
    async with RemoteEngineClient("127.0.0.1", 7103) as remote_engine:
        # Launch a Mock Provider
        provider_endpoints = [ProviderEndpoint(ProtocolType.HTTP, "127.0.0.1", 0)]
        telemetry_endpoints = []
        telemetry_attributes = {}
        telemetry_option = TelemetryOption(telemetry_attributes, True, telemetry_endpoints)
        extensions = {}
        result = await remote_engine.wakeup_provider_async(
            id="643D12C8-DCFC-476C-AA15-E8CA004F48E8",
            version="1.1.0",
            thread_safe_mode=True,
            endpoints=provider_endpoints,
            log_level=LogLevel.INFORMATION,
            telemetry_option=telemetry_option,
            extension=extensions)

        provider_information = result.provider_information
        endpoints = provider_information.endpoints
        endpoint = endpoints[0]

        async with await ClientCreator.attach_root_object_async(endpoint.ip_address, endpoint.port) as root_object:
            print(f"root_object.name: {root_object.name}")
            print(f"root_object.option: {root_object.option}")
            print(f"root_object.created_date_time: {str(root_object.created_date_time)}")
            print(f"root_object.orin3object_type: {str(root_object.orin3object_type)}")
            print(f"root_object.id: {str(uuid.UUID(bytes=root_object.id))}")
            
            # Create a controller
            controller_name = "GeneralPurposeController"
            controller_type = "ORiN3.Provider.ORiNConsortium.Mock.O3Object.Controller.GeneralPurposeController, ORiN3.Provider.ORiNConsortium.Mock"
            controller_option = '{"@Version":"1.1.0"}'
            controller = await ClientCreator.create_controller_async(root_object, controller_name, controller_type, controller_option)
            await controller.connect_async()

            # Create a variable
            variable_name = "PulseWaveBoolVariable"
            variable_type = "ORiN3.Provider.ORiNConsortium.Mock.O3Object.Variable.PulseWaveBoolVariable, ORiN3.Provider.ORiNConsortium.Mock"
            variable_option = '{"@Version":"1.1.0"}'
            variable = await ClientCreator.create_variable_async(controller, variable_name, variable_type, variable_option, ORiN3ValueType.ORIN3_BOOL)

            # GetValue
            for i in range(10):
                value = await variable.get_value_async()
                print(value)
                await asyncio.sleep(0.5)

            # Shutdown Provider
            await variable.delete_async()
            await controller.disconnect_async()
            await controller.delete_async()
            await root_object.shutdown_async()

if __name__ == '__main__':
    asyncio.run(main())

実行結果:

(venv) $ python3 accessToOrin3.py
root_object.name: Root
root_object.option: 
root_object.created_date_time: 2025-05-15 08:14:10.824719
root_object.orin3object_type: 0
root_object.id: f3ccd00f-fd91-3e49-b924-eaecab9ffd0c
False
False
True
True
False
False
True
True
False
False

関連情報(公式)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?