0
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ファイナンス分野で時系列データベースinfluxDBを使う

Last updated at Posted at 2022-08-15

influxDBはinfluxdata社が開発、オープンソースで提供している時系列データベースです。時系列データベースの中でトップの人気を保っています。
この記事ではinfluxDBを起動してyahoo financeから取得した株価データを投入するところまでを見てみます。

influxDBには使いやすいUIもありますが、ここではpythonクライアントを使って操作する方法を取ります。

influxDBインストール、初期設定

docker-compose.yml

ここではdocker-composeでinfluxdbを設定します。incluxdb_cliで設定をしています。

version: '3'

services:
  influxdb:
    container_name: influxdb
    image: influxdb:latest
    volumes:
      - ./influxDB/influxdb2:/var/lib/influxdb2:rw
    ports:
      - "8086:8086"

volumes:
  influxdb2:

起動

docker-compose up -d

で起動します。ブラウザから localhost:8086 へアクセスすると"Setup Initial User"という画面が表示されるので、ユーザー名、パスワード、組織名(以下ではmyorgとします)、buckets名(なんでも可)を入力し、"Continue"ボタンを押す。モードの選択画面が表示されるので、"Advanced"を押す。

influxDBのtokenを生成

influxDB UIの"Load Data"メニュー→"API TOKENS"と辿って"GENERATE API TOKEN"で"ALL ACCESS API TOKEN"を選択するとTOKENが生成されるので(生成されたカードをダブルクリックするとポップアップ表示され、クリップボードにコピー出来ます)、コピーしておきます。

株価の投入

下記pythonコードを実行。例としてFordとGEの株価をyahoo financeから取得してinfluxDBに投入してみます。
なお、Open/High/Low/Closeのデータがありますが、タイムスタンプをNasdaqのOpenの時間に合わせています(米国東部時間の9:30am)。

inject_stock_prices.py
INFLUXDB_TOKEN = '(上記のAPI TOKENをここにコピペ)'
URL    = "http://influxdb:8086" # "influxdb"はdocker-compose.ymlのcontainer_nameで指定した名前。
ORG    = "myorg"                # InfluxDB初期設定で指定した組織名
BUCKET = 'influxdb-test'        # 任意のBUCKET名

import pandas as pd
import pandas_datareader.data as web
from datetime import datetime

from influxdb_client import InfluxDBClient, BucketRetentionRules, WriteOptions
from influxdb_client.client.write_api import PointSettings

# Bucketの生成
#https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/domain/bucket_retention_rules.py

with InfluxDBClient(url=URL, token=INFLUXDB_TOKEN) as client:
    buckets_api = client.buckets_api()
    retention_rules = BucketRetentionRules(type="expire", every_seconds=0, shard_group_duration_seconds=3600*24*365*10) # every_seconds = 0 means infinite
    created_bucket = buckets_api.create_bucket(bucket_name=BUCKET,
                                               retention_rules=retention_rules,
                                               org=ORG)
    print(created_bucket)

#株価をyahoo financeからダウンロード(例としてfordとGE)

start = datetime(1900,1,1)
end  = datetime(2100,1,1)

ford = web.DataReader('F', 'yahoo', start=start, end=end)
GE = web.DataReader('GE', 'yahoo', start=start, end=end)

#後にGrafanaでのローソク足表示の際にすんなり認識される様に列名を変更
#また、調整済みcloseの値(Adj close)をcloseとする

ford.columns = ["high", "low", "open", "close.raw", "volume", "close"]
GE.columns =  ["high", "low", "open", "close.raw", "volume", "close"]

#タイムスタンプを米国東部時間でのNasdaqとNYSEのOpen時間(午前9:30)に設定

ford.index = ford.index.tz_localize(tz='US/Eastern')+pd.DateOffset(hours=9.5)
ford.index = ford.index.tz_convert(tz='UTC')

GE.index = GE.index.tz_localize(tz='US/Eastern')+pd.DateOffset(hours=9.5)
GE.index = GE.index.tz_convert(tz='UTC')

# Bollinger Band, SMA(simple moving average)を計算して追加
import talib

timeperiod = 20

upper, middle, lower = talib.BBANDS(ford["close"], timeperiod=timeperiod)
SMA = talib.SMA(ford["close"], timeperiod=timeperiod)
ford = pd.concat([ford, upper.rename('Upper Bollinger Band'), middle.rename('Middle Bollinger Band'), lower.rename('Lower Bollinger Band'), SMA.rename('SMA')], axis=1)

upper, middle, lower = talib.BBANDS(GE["close"], timeperiod=timeperiod)
SMA = talib.SMA(GE["close"], timeperiod=timeperiod)
GE = pd.concat([GE, upper.rename('Upper Bollinger Band'), middle.rename('Middle Bollinger Band'), lower.rename('Lower Bollinger Band'), SMA.rename('SMA')], axis=1)

# 株価を投入

def write_dataframe(client, bucket, df, point_settings):
    
    with client.write_api(write_options=WriteOptions(batch_size=1000, flush_interval=30_000, jitter_interval=10_000, retry_interval=30_000), point_settings=point_settings) as write_api:

        write_api.write(bucket=bucket, record=df,
            data_frame_tag_columns=['retrieved from', 'inject time', 'SYMBOL', 'name'],
            data_frame_measurement_name="stock prices")

client = InfluxDBClient(url=URL, token=INFLUXDB_TOKEN, org=ORG, timeout=30_000)

# ford株価投入
point_settings = PointSettings(**{"retrieved from" : "yahoo finance", "inject time": str(datetime.now()), "SYMBOL" : "F", "NAME" : "ford", "transform" : "original"})
write_dataframe(client, BUCKET, ford, point_settings)

# GE株価投入
point_settings = PointSettings(**{"retrieved from" : "yahoo finance", "inject time": str(datetime.now()), "SYMBOL" : "GE", "NAME" : "GE", "transform" : "original"}})
write_dataframe(client, BUCKET, GE, point_settings)

確認

query_api = client.query_api()

query = 'from(bucket: "influxdb-test") \
  |> range(start: -10y)\
  |> filter(fn: (r) => r["_measurement"] == "stock prices")\
  |> filter(fn: (r) => r["NAME"] == "ford")\
  |> filter(fn: (r) => r["SYMBOL"] == "F")\
  |> filter(fn: (r) => r["_field"] == "Open")\
  |> last()'

result = query_api.query(org=ORG, query=query)

results = []
for table in result:
  for record in table.records:
    results.append((record.get_field(), record.get_value()))

print(results)

[('Open', 15.949999809265137)]

最大のハマりポイント

influxDB(というかほとんどの時系列データベース?)の主な用途はサーバー等の監視用途かと思います。なので、いろんなパラメータのdefault設定がその様な用途に最適化されています。

Bucketの生成時に設定するパラメータshard_group_duration_secondsは、投入したデータを分割保存する時の1単位の期間を表しますが、これがdefaultでは7日に相当する秒数となっています。株価のヒストリカルデータの様に数十年に渡るデータを保存したい場合、これではshard数が数十年/7日〜数千となって、データが数千個に細切れになって保存されてしまうことを意味します。最初これに気づかず、私の貧弱な環境では書き込み時にクラッシュしてしまいました。

参考

0
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?