0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

InfluxDBによるリアルタイム分析:実装と応用

Posted at

リアルタイム分析の実装例

1. IoTデバイス監視のリアルタイム分析

import time
import random
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
import threading

class RealTimeIoTMonitor:
    def __init__(self, url, token, org, bucket):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.bucket = bucket
        self.stop_monitoring = False

    def simulate_device_data(self, device_id):
        """デバイスのシミュレーションデータを生成and書き込み"""
        while not self.stop_monitoring:
            # 異常検知のためのランダムな変動を含むデータ生成
            temperature = round(random.uniform(20, 50), 2)
            humidity = round(random.uniform(30, 70), 2)
            power_consumption = round(random.uniform(50, 200), 2)

            point = (
                Point("iot_device_metrics")
                .tag("device_id", device_id)
                .field("temperature", temperature)
                .field("humidity", humidity)
                .field("power_consumption", power_consumption)
            )
            
            self.write_api.write(bucket=self.bucket, record=point)
            time.sleep(1)  # 1秒ごとにデータ生成

    def detect_anomalies(self):
        """リアルタイム異常検知クエリ"""
        while not self.stop_monitoring:
            query = f'''
            from(bucket:"{self.bucket}")
                |> range(start: -5m)
                |> filter(fn: (r) => r._measurement == "iot_device_metrics")
                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                |> filter(fn: (r) => 
                    r.temperature > 40 or 
                    r.power_consumption > 150
                )
            '''
            
            result = self.query_api.query(query=query)
            
            for table in result:
                for record in table.records:
                    print("🚨 異常検知:")
                    print(f"デバイスID: {record.values.get('device_id')}")
                    print(f"温度: {record.values.get('temperature')}°C")
                    print(f"電力消費: {record.values.get('power_consumption')}W")
                    print("---")
            
            time.sleep(10)  # 10秒ごとに異常検知

    def start_monitoring(self, device_id):
        """モニタリングスレッド開始"""
        data_thread = threading.Thread(target=self.simulate_device_data, args=(device_id,))
        anomaly_thread = threading.Thread(target=self.detect_anomalies)
        
        data_thread.start()
        anomaly_thread.start()
        
        return data_thread, anomaly_thread

    def stop(self):
        """モニタリング停止"""
        self.stop_monitoring = True

def main():
    monitor = RealTimeIoTMonitor(
        url="http://localhost:8086",
        token="your-secret-token",
        org="your-org",
        bucket="iot_monitoring"
    )

    # デバイス1のモニタリング開始
    data_thread, anomaly_thread = monitor.start_monitoring("device_001")

    try:
        # 長時間実行をシミュレート
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        monitor.stop()
        data_thread.join()
        anomaly_thread.join()

if __name__ == "__main__":
    main()

2. 金融市場リアルタイム分析

import time
import random
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
import threading

class RealTimeMarketAnalyzer:
    def __init__(self, url, token, org, bucket):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.query_api = self.client.query_api()
        self.bucket = bucket
        self.stop_monitoring = False

    def simulate_stock_data(self, symbols):
        """株価データのシミュレーションと書き込み"""
        while not self.stop_monitoring:
            for symbol in symbols:
                price = round(random.uniform(50, 500), 2)
                volume = random.randint(1000, 100000)
                volatility = round(random.uniform(0.1, 5.0), 2)

                point = (
                    Point("stock_market")
                    .tag("symbol", symbol)
                    .field("price", price)
                    .field("volume", volume)
                    .field("volatility", volatility)
                )
                
                self.write_api.write(bucket=self.bucket, record=point)
            
            time.sleep(5)  # 5秒ごとにデータ更新

    def market_trend_analysis(self):
        """リアルタイム市場トレンド分析"""
        while not self.stop_monitoring:
            query = f'''
            from(bucket:"{self.bucket}")
                |> range(start: -15m)
                |> filter(fn: (r) => r._measurement == "stock_market")
                |> group(columns: ["symbol"])
                |> mean(column: "price")
                |> sort(columns: ["_value"], desc: true)
                |> limit(n: 3)
            '''
            
            result = self.query_api.query(query=query)
            
            print("🔥 トップ3シンボル:")
            for table in result:
                for record in table.records:
                    print(f"シンボル: {record.values.get('symbol')}")
                    print(f"平均価格: {record.values.get('_value')}")
            
            time.sleep(30)  # 30秒ごとに分析

    def start_monitoring(self, symbols):
        """モニタリングスレッド開始"""
        data_thread = threading.Thread(target=self.simulate_stock_data, args=(symbols,))
        analysis_thread = threading.Thread(target=self.market_trend_analysis)
        
        data_thread.start()
        analysis_thread.start()
        
        return data_thread, analysis_thread

    def stop(self):
        """モニタリング停止"""
        self.stop_monitoring = True

def main():
    analyzer = RealTimeMarketAnalyzer(
        url="http://localhost:8086",
        token="your-secret-token",
        org="your-org",
        bucket="market_analysis"
    )

    symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
    data_thread, analysis_thread = analyzer.start_monitoring(symbols)

    try:
        while True:
            time.sleep(60)
    except KeyboardInterrupt:
        analyzer.stop()
        data_thread.join()
        analysis_thread.join()

if __name__ == "__main__":
    main()

リアルタイム分析の主要な特徴

1. マルチスレッディング

  • データ生成スレッド
  • リアルタイム分析スレッド

2. フレキシブルなクエリ

  • Flux言語による高度なデータ処理
  • リアルタイムでの集計、フィルタリング

3. 異常検知と傾向分析

  • 閾値ベースの異常検出
  • 動的な市場トレンド分析

注意点

  • 実際の運用では、より堅牢なエラーハンドリング
  • 本番環境では実データソースの使用
  • セキュリティとパフォーマンスの最適化

これらのコード例は、InfluxDBを使用したリアルタイム分析の基本的な実装方法を示しています。IoTデバイス監視と金融市場分析のシナリオを通じて、時系列データのリアルタイム処理と分析の可能性を探ります。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?