はじめに
こんにちは!今回はおんどとり WebStorage APIを活用した第2弾として、取得したデータをAzure CosmosDBに保存する方法をご紹介します。なぜCosmosDBなのかって?それは、単純に使ってみたかったからです!
PythonプログラムでAPIからデータを取得する方法については、前回の記事をご参照ください。
Azure Cosmos DBの作成
Azure Cosmos DBを使ってデータベースを作成する手順を以下に示します。この手順では、Azureポータルを使ってデータベースを準備します。
Azure Cosmos DB アカウントの作成
-
Azureポータルにサインイン:
-
portal.azure.com
にアクセスし、Microsoftアカウントでサインインします。
-
-
リソースの作成:
- 左上の「+ リソースの作成」をクリックします。
- 「Azure Cosmos DB」を検索し、表示された結果から「Azure Cosmos DB」を選択します。
-
アカウントの設定:
- APIオプションの選択画面で「コア (SQL) - 推奨」を選択し、「作成」をクリックします。
- 以下の情報を入力します:
- サブスクリプション: 使用するAzureサブスクリプションを選択します。
- リソースグループ: 既存のリソースグループを選択するか、「新規作成」で新しいリソースグループを作成します。
-
アカウント名: 一意の名前を入力します(例:
mycosmosdb123
)。 - 場所: ユーザーに最も近いリージョンを選択します。
- 容量モード: 「プロビジョニングされたスループット」を選択します。
-
確認と作成:
- 入力内容を確認し、「確認および作成」をクリックします。
- 「作成」をクリックしてデプロイを開始します。デプロイが完了するまで数分かかることがあります。
データベースとコンテナーの作成
-
Azure Cosmos DB アカウントに移動:
- デプロイが完了したら、「リソースに移動」をクリックして、Azure Cosmos DBアカウントのページに移動します。
-
データ エクスプローラーの使用:
- 左側のメニューから「データ エクスプローラー」を選択します。
- 「新しいデータベース」をクリックし、データベース名を入力します(例:
mydatabase
)。
-
コンテナーの作成:
- データベース内で「新しいコンテナー」をクリックします。
- コンテナー名を入力し、パーティションキーを設定します(例:
/remote-serial
)。 - スループットを設定し、「OK」をクリックしてコンテナーを作成します。
これで、Azure Cosmos DBにデータを格納する準備が整いました。次はPythonからデータを保存してみましょう。
補足:パーティションキーとは
Azure Cosmos DBにおけるパーティションキーは、データを分散して格納するための重要な要素です。パーティションキーを適切に選択することで、データのスケールアウトが効率的になり、パフォーマンスが向上します。以下に、パーティションキーについて詳しく説明します。
-
役割: パーティションキーは、データを異なるパーティションに分散するためのキーです。これにより、データのスケーラビリティとパフォーマンスが向上します。
-
選定のポイント:
- データが均等に分散されるようにする。
- クエリで頻繁に使用されるフィールドを選ぶと効率的です。
- 一度設定したパーティションキーは変更できないため、慎重に選ぶ必要があります。
-
命名例:
/remote-serial
各用語のメモ
項目 | 命名例 | 説明 |
---|---|---|
アカウント | mycompany-cosmosdb1 |
企業名やプロジェクト名を含めた一意の名前 |
データベース | sensorDataDB |
複数のコンテナを含む論理的な単位。セキュリティやスループットの管理が可能。 |
コンテナ | temperatureReadings |
コンテナは実際にデータが格納される場所であり、データのスケールアウトの単位 |
パーティションキー | /remote-serial |
データを均等に分散するためのキー |
この例では、/remote-serial
をパーティションキーとして使用しています。これは各リモートデバイスのシリアル番号に基づいてデータを分散させるため、均等な分散が期待できます。
Pythonコード
1. 前回のおさらい
APIからデータを取得してデータフレームに変換するまでの処理です。
import requests
import os
import pandas as pd
import json
from dotenv import load_dotenv
from pprint import pprint
from datetime import datetime, timedelta
# データフレームの表示で改行を防ぐために、出力幅を広げる
pd.set_option('display.width', 1000)
# 環境変数をセット
load_dotenv(dotenv_path='./ondotori.env')
# 環境変数から情報を取得
API_KEY = os.getenv('API_KEY')
LOGIN_ID = os.getenv('LOGIN_ID')
LOGIN_PASS = os.getenv('LOGIN_PASS')
REMOTE_SERIAL = os.getenv('REMOTE_SERIAL')
BASE_SERIAL = os.getenv('BASE_SERIAL')
### データを取得してデータフレーム形式へ変換 ###
# 現在の時刻と24時間前の時刻をUあNIXタイムスタンプで取得
now = datetime.now()
unixtime_to = int(now.timestamp())
unixtime_from = int((now - timedelta(days=1)).timestamp())
# APIエンドポイントとヘッダー
url = "https://api.webstorage.jp/v1/devices/data-rtr500"
headers = {
"Content-Type": "application/json",
"X-HTTP-Method-Override": "GET"
}
# リクエストボディ
payload = {
"api-key": API_KEY,
"login-id": LOGIN_ID,
"login-pass": LOGIN_PASS,
"remote-serial": REMOTE_SERIAL,
"base-serial": BASE_SERIAL,
"unixtime-from": unixtime_from,
"unixtime-to": unixtime_to,
"type": "json"
}
# APIリクエスト
response = requests.post(url, headers=headers, json=payload)
# レスポンスの確認
if response.status_code == 200:
data = response.json()
# データフレームの作成
df = pd.DataFrame(data['data'])
df['remote-serial'] = data['remote-serial']
df['base-serial'] = data['base-serial']
df = df[['remote-serial', 'base-serial', 'data-id', 'unixtime', 'ch1', 'ch2']]
df.columns = ['remote_serial', 'base_serial', 'data_id', 'time', 'temperature', 'humidity']
# UNIXタイムを人間が読める形式に変換
# df['time'] = pd.to_datetime(df['time'], unit='s')
df['time'] = pd.to_datetime(df['time'].astype(float), unit='s')
# 'id'列の追加
df['id'] = df['remote_serial'] + '-' + df['base_serial'] + '-' + df['data_id'].astype(str) + '-' + df['time'].dt.strftime('%Y%m%d%H%M%S')
# CSVファイルに出力する場合
# df.to_csv('data.csv', index=False)
# print("CSVファイルにデータを出力しました。")
print(df)
else:
print(f"エラーが発生しました: {response.status_code}")
2. cosmosDBへデータを保存する処理
from azure.cosmos import CosmosClient, PartitionKey
# Azure Cosmos DBの接続情報
COSMOS_ENDPOINT = os.getenv('COSMOS_URL')
COSMOS_KEY = os.getenv('COSMOS_KEY')
DATABASE_NAME = os.getenv('DATABASE_NAME')
CONTAINER_NAME = os.getenv('CONTAINER_NAME')
# Cosmos DBクライアントの作成
client = CosmosClient(COSMOS_ENDPOINT, COSMOS_KEY)
database = client.get_database_client(DATABASE_NAME)
container = database.get_container_client(CONTAINER_NAME)
# データをCosmos DBに保存
for _, row in df.iterrows():
item = {
'id': row['id'],
'remote_serial': row['remote_serial'],
'base_serial': row['base_serial'],
'data_id': row['data_id'],
'time': row['time'].isoformat(),
'temperature': row['temperature'],
'humidity': row['humidity']
}
container.upsert_item(item)
print("Data has been successfully saved to Azure Cosmos DB.")
CosmosDBへの接続情報は、おんどとりWebStorageと同様にenvファイルに記載してください。
補足: Azure Cosmos DBのURLとキーはAzureポータルで確認できます。以下手順です:
- Azureポータルにサインインし(portal.azure.com)、Microsoftアカウントでログイン。
- 左側メニューから「すべてのリソース」を選び、目的のCosmos DBアカウントを選択。
- 左メニューから「キー」を選び、URI(エンドポイントURL)とPRIMARY KEY(プライマリキー)を見ることができます。これら情報でCosmos DBへ接続可能です。
補足: Upsertメソッドについて
upsertメソッドは、新規ドキュメント作成または既存ドキュメント更新という動作です。具体的には:
- 上書きされる場合:同じidとパーティションキー持つドキュメント存在時、そのドキュメント更新(上書き)。
- 別々値として保持:ドキュメント存在しない場合、新規ドキュメントとして追加。
- 具体例:remote-serialがパーティションキーでdata-idがidの場合、それら値既存時には上書きされます。
3. 保存したデータの確認
保存データの確認
try:
# データのカウント
query = "SELECT VALUE COUNT(1) FROM c"
count = list(container.query_items(query=query, enable_cross_partition_query=True))
print(f"Total documents in the container: {count[0]}")
# 直近のデータ1000件を取得
query = "SELECT * FROM c ORDER BY c.time DESC OFFSET 0 LIMIT 1000"
items = list(container.query_items(query=query, enable_cross_partition_query=True))
# データフレームに保存
df = pd.DataFrame(items)
print(df)
except exceptions.CosmosHttpResponseError as e:
print(f"An error occurred: {e.message}")
次回
今回はDB保存まで行いました。次回は応用編をご紹介予定です。お楽しみに!