influxDBはinfluxdata社が開発、オープンソースで提供している時系列データベースです。時系列データベースの中でトップの人気を保っています。
この記事ではinfluxDBを起動してyahoo financeから取得した株価データを投入するところまでを見てみます。
influxDBには使いやすいUIもありますが、ここではpythonクライアントを使って操作する方法を取ります。
influxDBインストール、初期設定
docker-compose.yml
ここではdocker-composeでinfluxdbを設定します。incluxdb_cliで設定をしています。
version: '3'
services:
influxdb:
container_name: influxdb
image: influxdb:latest
volumes:
- ./influxDB/influxdb2:/var/lib/influxdb2:rw
ports:
- "8086:8086"
volumes:
influxdb2:
起動
docker-compose up -d
で起動します。ブラウザから localhost:8086 へアクセスすると"Setup Initial User"という画面が表示されるので、ユーザー名、パスワード、組織名(以下ではmyorgとします)、buckets名(なんでも可)を入力し、"Continue"ボタンを押す。モードの選択画面が表示されるので、"Advanced"を押す。
influxDBのtokenを生成
influxDB UIの"Load Data"メニュー→"API TOKENS"と辿って"GENERATE API TOKEN"で"ALL ACCESS API TOKEN"を選択するとTOKENが生成されるので(生成されたカードをダブルクリックするとポップアップ表示され、クリップボードにコピー出来ます)、コピーしておきます。
株価の投入
下記pythonコードを実行。例としてFordとGEの株価をyahoo financeから取得してinfluxDBに投入してみます。
なお、Open/High/Low/Closeのデータがありますが、タイムスタンプをNasdaqのOpenの時間に合わせています(米国東部時間の9:30am)。
INFLUXDB_TOKEN = '(上記のAPI TOKENをここにコピペ)'
URL = "http://influxdb:8086" # "influxdb"はdocker-compose.ymlのcontainer_nameで指定した名前。
ORG = "myorg" # InfluxDB初期設定で指定した組織名
BUCKET = 'influxdb-test' # 任意のBUCKET名
import pandas as pd
import pandas_datareader.data as web
from datetime import datetime
from influxdb_client import InfluxDBClient, BucketRetentionRules, WriteOptions
from influxdb_client.client.write_api import PointSettings
# Bucketの生成
#https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/domain/bucket_retention_rules.py
with InfluxDBClient(url=URL, token=INFLUXDB_TOKEN) as client:
buckets_api = client.buckets_api()
retention_rules = BucketRetentionRules(type="expire", every_seconds=0, shard_group_duration_seconds=3600*24*365*10) # every_seconds = 0 means infinite
created_bucket = buckets_api.create_bucket(bucket_name=BUCKET,
retention_rules=retention_rules,
org=ORG)
print(created_bucket)
#株価をyahoo financeからダウンロード(例としてfordとGE)
start = datetime(1900,1,1)
end = datetime(2100,1,1)
ford = web.DataReader('F', 'yahoo', start=start, end=end)
GE = web.DataReader('GE', 'yahoo', start=start, end=end)
#後にGrafanaでのローソク足表示の際にすんなり認識される様に列名を変更
#また、調整済みcloseの値(Adj close)をcloseとする
ford.columns = ["high", "low", "open", "close.raw", "volume", "close"]
GE.columns = ["high", "low", "open", "close.raw", "volume", "close"]
#タイムスタンプを米国東部時間でのNasdaqとNYSEのOpen時間(午前9:30)に設定
ford.index = ford.index.tz_localize(tz='US/Eastern')+pd.DateOffset(hours=9.5)
ford.index = ford.index.tz_convert(tz='UTC')
GE.index = GE.index.tz_localize(tz='US/Eastern')+pd.DateOffset(hours=9.5)
GE.index = GE.index.tz_convert(tz='UTC')
# Bollinger Band, SMA(simple moving average)を計算して追加
import talib
timeperiod = 20
upper, middle, lower = talib.BBANDS(ford["close"], timeperiod=timeperiod)
SMA = talib.SMA(ford["close"], timeperiod=timeperiod)
ford = pd.concat([ford, upper.rename('Upper Bollinger Band'), middle.rename('Middle Bollinger Band'), lower.rename('Lower Bollinger Band'), SMA.rename('SMA')], axis=1)
upper, middle, lower = talib.BBANDS(GE["close"], timeperiod=timeperiod)
SMA = talib.SMA(GE["close"], timeperiod=timeperiod)
GE = pd.concat([GE, upper.rename('Upper Bollinger Band'), middle.rename('Middle Bollinger Band'), lower.rename('Lower Bollinger Band'), SMA.rename('SMA')], axis=1)
# 株価を投入
def write_dataframe(client, bucket, df, point_settings):
with client.write_api(write_options=WriteOptions(batch_size=1000, flush_interval=30_000, jitter_interval=10_000, retry_interval=30_000), point_settings=point_settings) as write_api:
write_api.write(bucket=bucket, record=df,
data_frame_tag_columns=['retrieved from', 'inject time', 'SYMBOL', 'name'],
data_frame_measurement_name="stock prices")
client = InfluxDBClient(url=URL, token=INFLUXDB_TOKEN, org=ORG, timeout=30_000)
# ford株価投入
point_settings = PointSettings(**{"retrieved from" : "yahoo finance", "inject time": str(datetime.now()), "SYMBOL" : "F", "NAME" : "ford", "transform" : "original"})
write_dataframe(client, BUCKET, ford, point_settings)
# GE株価投入
point_settings = PointSettings(**{"retrieved from" : "yahoo finance", "inject time": str(datetime.now()), "SYMBOL" : "GE", "NAME" : "GE", "transform" : "original"}})
write_dataframe(client, BUCKET, GE, point_settings)
確認
query_api = client.query_api()
query = 'from(bucket: "influxdb-test") \
|> range(start: -10y)\
|> filter(fn: (r) => r["_measurement"] == "stock prices")\
|> filter(fn: (r) => r["NAME"] == "ford")\
|> filter(fn: (r) => r["SYMBOL"] == "F")\
|> filter(fn: (r) => r["_field"] == "Open")\
|> last()'
result = query_api.query(org=ORG, query=query)
results = []
for table in result:
for record in table.records:
results.append((record.get_field(), record.get_value()))
print(results)
[('Open', 15.949999809265137)]
最大のハマりポイント
influxDB(というかほとんどの時系列データベース?)の主な用途はサーバー等の監視用途かと思います。なので、いろんなパラメータのdefault設定がその様な用途に最適化されています。
Bucketの生成時に設定するパラメータshard_group_duration_secondsは、投入したデータを分割保存する時の1単位の期間を表しますが、これがdefaultでは7日に相当する秒数となっています。株価のヒストリカルデータの様に数十年に渡るデータを保存したい場合、これではshard数が数十年/7日〜数千となって、データが数千個に細切れになって保存されてしまうことを意味します。最初これに気づかず、私の貧弱な環境では書き込み時にクラッシュしてしまいました。