2
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

【ETL処理】第一弾:デイトレードの取引データを加工→蓄積する流れのまとめ

Last updated at Posted at 2021-11-28

概要

データ分析のインターンのおかげでETL処理を組むことができるようになりました。
今回はこの技術を用いて、為替のデイトレードで得た取引履歴の、データ加工と蓄積の流れを構築したいと思います。

できるようになること

  • AWSでETL処理が書けるようになる
  • データストレージの使い方が少しわかる

補足

今回はGlueやLambdaを用いない、データ分析の半自動化を目指しています。とはいえ、スクリプトさえできてしまえば自動化も簡単なので別記事でその方法を書こうと思います。

利用サービス

  • AWS S3
  • Amazon SageMaker
  • HighLow Australia (証券会社)

ETL処理フロー

データフローの全体像はこちらです。
90c4106b14b7c44cf7695b5985ccf523[1].png

S3のDataLakeというバケットに溜まった取引履歴をデータ整形して、DataWarehouseに蓄積します。(HighLow側にAPIがないので、DataLakeへのデータ蓄積は手動アップロードにします。)

ETL処理の前準備

ETL(抽出、変換、読み込み)処理を行う前に、そもそも抽出するデータがストレージに蓄積されていないといけません。本来のAWSのS3活用方法とは異なりますが、今回は気にせずデータのアップロードを手動で行います。

データ取得

HighLowオーストラリアという証券会社の、個人的な取引履歴を利用します。
707cf6d1499b688a4def13ae3b7271d1[1].png

写真の様に、[検索する]から数字の順番に取引履歴取得範囲を設定し、[ダウンロード]でCSV形式でダウンロードします。
取引データの配布等は行っていませんのでご了承ください。

分析頻度は週単位想定しているので、とりあえず1ヵ月を4週間に分けて4つのCSVファイルを取得しました。

S3へデータアップロード

AWSのS3では、

  • datalake-trading
  • datawarehouse-trading
    というバケットを用意しておいてください。

ここの、datalake-tradingというバケットに、[trading-history/]というプレフィックスを切り、その中に先ほど取得したCSVファイルをアップロードします。

2588c173c419f0d1023e82e1ec6b7c00[1].png

SageMakerでETL処理を書く

データの準備ができたことで、いよいよデータ中身を見ながら整形していきます。

コーディング環境の作成

AWSのSageMakerを開いて、ノートブックインスタンスを作成してください。
個人利用なので、インスタンスタイプはデフォルト最小の[ml.t2.medium]で十分だと思います。

ここからは作成したインスタンス上で、JupyterLabを利用してコーディングしていきます。

S3のデータを受け取るpythonスクリプト

JupyterLab上で、Python3を書いていきます。

  1. ライブラリダウンロード
  2. データ整形する対象のファイル名をS3から取得
  3. 対象のファイルをS3から取得しデータフレームとして読み込み

この流れでスクリプトを書きます。
なお、CSVファイルには日本語が含まれているので、encoding='Shift-jis"という引数を与えないと、データフレームとして読み込むことができませんので注意が必要です。

import pandas
import boto3
import io

s3_client = boto3.client('s3')
data_bucket_name='datalake-trading'

obj_list=s3_client.list_objects(Bucket=data_bucket_name)
file=[]
for contents in obj_list['Contents']:
    if "csv" in contents['Key']:
        file.append(contents['Key'])
    else:
        print("Not CSV:", contents['Key'])
file_data=file[-1] # 最新ファイルの取得
print("Target CSV:", file_data)

response = s3_client.get_object(Bucket=data_bucket_name, Key=file_data)
response_body = response["Body"].read()
temp_df = pandas.read_csv(io.BytesIO(response_body), header=0, delimiter=",", low_memory=False,encoding="shift-jis") 

スクリプトは今後の自動化を想定して、最新ファイルのデータ整形を行う仕様にしています。過去データをすべて取得する場合は、データフレームを週ごとに読み込んで結合する処理が別途必要です。

ちなみにs3へのアクセスモジュールはこちらを参考にしました。

データフレームを見る

データ分析の第一歩です。生データを目視して、分析に必要そうなデータを新たに作成したり、逆に不必要なデータを取り除いたりします。
c5760837cf7ec4dcc7a791e12e141306[1].png

個人的に分析したい内容は、

  • 証拠金増減の時系列グラフ
  • 各通貨の勝率
  • 手法別での勝率
  • High-Low別の勝率

です。もっと見たいところはあるけど一旦はこんな感じにします。
なので、これを満たすようなデータフレームを作成するところが目標です。

データ整形のpythonスクリプト

特徴量を作っていきます。

# 列名を英語表記に変える
temp_df.rename(columns={'取引番号':"trading_number", '取引原資産':'currency', 'オプションの種類':'option_type', 
                        '方向':'HighLow', ' 取引内容 ':'entry_point', 'ステータス ':'states', '購入':'Bet_amount', 'ペイアウト ':'payout', 
                        '判定レート ':'end_point', ' 取引時間 ':'entry_start', ' 終了時刻 ':'entry_end'}, inplace=True)
temp_df['Bet_amount'] = [int(Bet_amount.replace("", "").replace(",", "")) for Bet_amount in temp_df['Bet_amount']]
temp_df['payout'] = [int(payout.replace("---", "0")) if payout=="---" else int(payout.replace("", "").replace(",", "")) for payout in temp_df['payout'] ]
temp_df = temp_df.sort_values(['trading_number']).reset_index(drop=True) #日付の昇順に並び替え

# 1. 証拠金増減の時系列グラフに必要なデータ
temp_df['deposit'] = 0
for i in range(len(temp_df)):
    mergin = temp_df['payout'][i] - temp_df['Bet_amount'][i]
    if i != 0:
        temp_df['deposit'][i] = temp_df['deposit'][i-1] + mergin
    else:
        temp_df['deposit'][i] = 0

# 2. 各通貨の手法別の利益を出すのに必要なデータ
temp_df['total_check'] = [int(p-b) if p>b else p for (b, p) in zip(temp_df['Bet_amount'], temp_df['payout'])]

# 3. 勝率計算に必要なデータ
temp_df['win_lose'] = ["win" if check!=0 else "lose" for check in temp_df['total_check']]

# 4. 平均獲得Pipsを計算するのに必要なデータ
temp_df['pips'] = [(e - s) if e>s else (s-e) for (e,s) in zip(temp_df['end_point'], temp_df['entry_point'])]

datawarehouseへデータを送る

ファイル名を指定して、datawarehouseに整形後のCSVファイルをアップロードします。

# 取引期間の取得
start_date = temp_df["entry_start"][0].split(" ")[0].replace("/", "-")
last_date = temp_df["entry_end"][len(temp_df)-1].split(" ")[0].replace("/", "-")

# 保存するファイル名
file_name = start_date+"_"+last_date+".csv"

# csvファイルの作成
temp_df.to_csv(file_name)

# datawarehouseアップロード
s3 = boto3.resource('s3')
upload_bucket = s3.Bucket(upload_bucket_name)
upload_bucket.upload_file(file_name, file_name)

上記を実行し、S3:datawarehouse-tradingを確認します。
a46d9f28403b9cb2948d006b87248997[1].png

確かに、アップロードされていることを確認しました。
自動化する場合は、この記事で書いたコードを、AWS Glueに張り付けるだけです。

次回は、Glueで自動化 → BIツールで可視化する流れを書こうと思います。

最後に

Sagamakerのインスタンスは稼働時間に対してお金がかかります。
使わない時はインスタンスを停止しておくよう注意しましょう。

2
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?