リアルタイム分析の実装例
1. IoTデバイス監視のリアルタイム分析
import time
import random
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
import threading
class RealTimeIoTMonitor:
def __init__(self, url, token, org, bucket):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.bucket = bucket
self.stop_monitoring = False
def simulate_device_data(self, device_id):
"""デバイスのシミュレーションデータを生成and書き込み"""
while not self.stop_monitoring:
# 異常検知のためのランダムな変動を含むデータ生成
temperature = round(random.uniform(20, 50), 2)
humidity = round(random.uniform(30, 70), 2)
power_consumption = round(random.uniform(50, 200), 2)
point = (
Point("iot_device_metrics")
.tag("device_id", device_id)
.field("temperature", temperature)
.field("humidity", humidity)
.field("power_consumption", power_consumption)
)
self.write_api.write(bucket=self.bucket, record=point)
time.sleep(1) # 1秒ごとにデータ生成
def detect_anomalies(self):
"""リアルタイム異常検知クエリ"""
while not self.stop_monitoring:
query = f'''
from(bucket:"{self.bucket}")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "iot_device_metrics")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) =>
r.temperature > 40 or
r.power_consumption > 150
)
'''
result = self.query_api.query(query=query)
for table in result:
for record in table.records:
print("🚨 異常検知:")
print(f"デバイスID: {record.values.get('device_id')}")
print(f"温度: {record.values.get('temperature')}°C")
print(f"電力消費: {record.values.get('power_consumption')}W")
print("---")
time.sleep(10) # 10秒ごとに異常検知
def start_monitoring(self, device_id):
"""モニタリングスレッド開始"""
data_thread = threading.Thread(target=self.simulate_device_data, args=(device_id,))
anomaly_thread = threading.Thread(target=self.detect_anomalies)
data_thread.start()
anomaly_thread.start()
return data_thread, anomaly_thread
def stop(self):
"""モニタリング停止"""
self.stop_monitoring = True
def main():
monitor = RealTimeIoTMonitor(
url="http://localhost:8086",
token="your-secret-token",
org="your-org",
bucket="iot_monitoring"
)
# デバイス1のモニタリング開始
data_thread, anomaly_thread = monitor.start_monitoring("device_001")
try:
# 長時間実行をシミュレート
while True:
time.sleep(60)
except KeyboardInterrupt:
monitor.stop()
data_thread.join()
anomaly_thread.join()
if __name__ == "__main__":
main()
2. 金融市場リアルタイム分析
import time
import random
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
import threading
class RealTimeMarketAnalyzer:
def __init__(self, url, token, org, bucket):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.query_api = self.client.query_api()
self.bucket = bucket
self.stop_monitoring = False
def simulate_stock_data(self, symbols):
"""株価データのシミュレーションと書き込み"""
while not self.stop_monitoring:
for symbol in symbols:
price = round(random.uniform(50, 500), 2)
volume = random.randint(1000, 100000)
volatility = round(random.uniform(0.1, 5.0), 2)
point = (
Point("stock_market")
.tag("symbol", symbol)
.field("price", price)
.field("volume", volume)
.field("volatility", volatility)
)
self.write_api.write(bucket=self.bucket, record=point)
time.sleep(5) # 5秒ごとにデータ更新
def market_trend_analysis(self):
"""リアルタイム市場トレンド分析"""
while not self.stop_monitoring:
query = f'''
from(bucket:"{self.bucket}")
|> range(start: -15m)
|> filter(fn: (r) => r._measurement == "stock_market")
|> group(columns: ["symbol"])
|> mean(column: "price")
|> sort(columns: ["_value"], desc: true)
|> limit(n: 3)
'''
result = self.query_api.query(query=query)
print("🔥 トップ3シンボル:")
for table in result:
for record in table.records:
print(f"シンボル: {record.values.get('symbol')}")
print(f"平均価格: {record.values.get('_value')}")
time.sleep(30) # 30秒ごとに分析
def start_monitoring(self, symbols):
"""モニタリングスレッド開始"""
data_thread = threading.Thread(target=self.simulate_stock_data, args=(symbols,))
analysis_thread = threading.Thread(target=self.market_trend_analysis)
data_thread.start()
analysis_thread.start()
return data_thread, analysis_thread
def stop(self):
"""モニタリング停止"""
self.stop_monitoring = True
def main():
analyzer = RealTimeMarketAnalyzer(
url="http://localhost:8086",
token="your-secret-token",
org="your-org",
bucket="market_analysis"
)
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
data_thread, analysis_thread = analyzer.start_monitoring(symbols)
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
analyzer.stop()
data_thread.join()
analysis_thread.join()
if __name__ == "__main__":
main()
リアルタイム分析の主要な特徴
1. マルチスレッディング
- データ生成スレッド
- リアルタイム分析スレッド
2. フレキシブルなクエリ
- Flux言語による高度なデータ処理
- リアルタイムでの集計、フィルタリング
3. 異常検知と傾向分析
- 閾値ベースの異常検出
- 動的な市場トレンド分析
注意点
- 実際の運用では、より堅牢なエラーハンドリング
- 本番環境では実データソースの使用
- セキュリティとパフォーマンスの最適化
これらのコード例は、InfluxDBを使用したリアルタイム分析の基本的な実装方法を示しています。IoTデバイス監視と金融市場分析のシナリオを通じて、時系列データのリアルタイム処理と分析の可能性を探ります。