LoginSignup
5
8

More than 5 years have passed since last update.

Bitcoinの信用取引ログをCloudSQLに自動保存する方法

Last updated at Posted at 2017-09-21

概要と目的

仮想通貨取引所の一つであるCoincheckで行なったbitcoin信用取引のログを、自動でCloudSQLに保存する方法です。Pythonで書きました。
仮想通貨取引所では多くの場合、ブラウザ上で確認できる取引履歴は直近のものに限られています。ところが、個人にしろ、法人にしろ、税務署への申告時には正確な収益が必要です。つまり、一年間の正確なデータが必要なのです。そこで、これまでエクセルシートに手打ちで行なっていた作業を自動化し、労働力を削減するために、さらには、税理士さんにより見やすい、より正確なデータをお渡しすることで大いに感謝されたいという優しさという名の承認欲求を満たすために作成しました。

前提条件

  • GCP利用開始手続き済み
    • Compute Engineのインスタンス作成済み
  • CoincheckにてAPIキーを作成済み
  • Coincheckにて信用取引を行なったことがある
  • Python2系環境構築済み(ローカルもしくはVMインスタンス上)

本題

CloudSQLのインスタンスを作成し、ComputeEngineから接続

プロキシを使うよりもIPアドレスを使ったほうが楽なので、今回はそちらを使用します。

MySQL クライアントを Compute Engine から接続する  |  Cloud SQL for MySQL  |  Google Cloud Platform
こちらにある約1分の動画が大変わかりやすいので、ぜひ参考にしてください。

私の場合、基本的にローカルで作業を行なっていますが、GoogleCloudShellを使っても全く同じです。
ローカルからの接続に関してはこちら( Google Cloud Platformをローカルから利用するための準備 - Qiita )をご参考に。これまた大変わかりやすいです。

$ mysql -h 'CloudSQLの外部IPアドレス' -u root -p'パスワード'

これで接続完了です。
後ほどdbに接続しますので、適当なものを一つ作ります。
(カラム:id, status, created_at, open_rate, side, amount, closed_at, closed_rate, pl)

mysql> create database db名;

Coincheckから取引履歴を取得

Private APIの利用にはHMAC-SHA256 hash形式でシークレットキーを使ったSIGNATUREを作成する必要があります。coincheckのサイトにリクエストのサンプルが載ってありますが、Pythonのサンプルは無いため、こちらに載せておきます。
(セキュリティに関する責任は一切負いません。)

coincheckApi.py
# -*- coding: utf-8 -*-
import json
import requests
import time
import hmac
import hashlib

class ApiCall:
    def __init__(self,api_key,api_secret,api_endpoint):
        self.api_key = api_key
        self.api_secret = api_secret
        self.api_endpoint = api_endpoint

    def private_get_api(self, path):
        timestamp = str(int(float(time.time())*100))
        text = timestamp + self.api_endpoint + path
        sign = hmac.new(bytes(self.api_secret.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
        request_data=requests.get(
            self.api_endpoint+path
            ,headers = {
                'ACCESS-KEY': self.api_key,
                'ACCESS-NONCE': timestamp,
                'ACCESS-SIGNATURE': sign,
                'Content-Type': 'application/json'
            })
        return request_data

別ファイルに履歴取得プログラムを作成。

sample.py
# -*- coding: utf-8 -*-
import coincheckApi
import json
import requests
import time
import hmac
import hashlib
import csv
import os
import sys
import math

coincheck_key = 'アクセスキー'
coincheck_secret = 'シークレットキー'
coincheck_endpoint = 'https://coincheck.com'

coincheck_api = coincheckApi.ApiCall(coincheck_key, coincheck_secret, coincheck_endpoint)

def leverage_history():
    result = coincheck_api.private_get_api_params('/api/exchange/leverage/positions').json()
    return result['data']

print(leverage_history())

試しに出力してみましょう。

$ python sample.py >> sample.txt
sample.txt
{'success': True, 'pagination': {'limit': 10, 'order': 'desc', 'starting_after': None, 'ending_before': None}, 'data': [
{'id': 1583225, 'pair': 'btc_jpy', 'status': 'open', 'created_at': '2017-09-19T12:45:36.000Z', 'closed_at': None, 'open_rate': '444179.0', 'closed_rate': None, 'amount': '2.0', 'all_amount': '2.0', 'side': 'buy', 'pl': '-3572.3428', 'new_order': {'id': 252514674, 'side': 'buy', 'rate': None, 'amount': '2.0', 'pending_amount': '0', 'status': 'complete', 'created_at': '2017-09-19T12:45:35.000Z'}, 'close_orders': []},
{'id': 1575326, 'status': 'closed', 'created_at': '2017-09-19T05:36:33.000Z', 'closed_at': '2017-09-19T06:04:32.000Z', 'open_rate': '429000.0', 'closed_rate': '436599.0', 'amount': '0.0', 'all_amount': '2.0', 'side': 'buy', 'pl': '14855.4986',
'new_order': {'id': 251714094, 'side': 'buy', 'rate': None, 'amount': '2.0', 'pending_amount': '0', 'status': 'complete'},'close_orders': [{'id': 251777433, 'side': 'sell', 'rate': None, 'amount': '2.0', 'pending_amount': '0', 'status': 'complete'}]}]}

上記の形式で取引履歴を取得することができました。
ちなみに、上から一つ目がまだポジションをクローズしていないもの、二つ目がすでにポジションをクローズしたものです。

Cloud SQLへの保存

まずはMySQLdbモジュールをインストールします。
CloudSQLへpythonで接続する場合は、このモジュールを使いましょう。
Googleも推奨しております。
ただし、python3には対応しておりませんのでご注意を。

$ pip install MySQL-Python

もし、python2系を使っているにもかかわらずインストールに苦戦した場合はこちらを一度ご覧ください。

インストールできたらCloudSQLへpythonで接続します。
最初の接続時に作ったdbに接続。

sample.py
import MySQLdb

connection = MySQLdb.connect(host='CloudSQLのIPアドレス', user='root', passwd='パスワード', db='db名', charset='utf8')
connection.autocommit(True)
cursor = connection.cursor()

SQLへクエリを投げる方法は以下の通りです。

sample.py
cursor.execute("クエリ")  #変数なし
cursor.execute("クエリ", (変数,))  #変数一個(カンマに注意)
cursor.execute("クエリ", (変数1,変数2, ... ,変数n)  #変数複数

取引履歴の各値をクエリの変数にするとこんな感じ。

sample.py
coincheck_id = history[0]['id']
status = history[0]['status']
created_at = history[0]['created_at']
open_rate = history[0]['open_rate']
side = history[0]['side']
amount = history[0]['all_amount']


#変数はデータ型に関係なく全て%sで
query = "INSERT INTO table名 (id, status, created_at, open_rate, side, amount) VALUES (%s, %s, %s, %s, %s, %s)"

cursor.execute(query, (coincheck_id, status, created_at, open_rate, side, amount))

Cloud SQLを自動で更新

自動で更新、とは言ったものの、VMインスタンスで10分おきにcronでプログラムを回すだけです。
設計としては、一定間隔で履歴を取得し、更新(新たな取引、もしくはopenからcloseへの変更)があればinsertもしくはupdateする形です。

冒頭でも述べたように、取引所のAPIから得られる履歴は有限です。
私の場合、多ければ数時間で50回ほど取引を行うこともありますので、昨日の履歴が追えないなんてことはざらにあります。その上で、open、closedが入り乱れますので、10分おきという短い間隔で回しております。
insertとupdateの判別に関しては、以下のコードを見ていただけたら(文字にするより見たほうが早い)と思います。上述した分も含めたものを載せておきます。

sample.py

# -*- coding: utf-8 -*-
import coincheckApi
import json
import requests
import time
import hmac
import hashlib
import csv
import os
import sys
import math
import MySQLdb

connection = MySQLdb.connect(host='CloudSQLのIPアドレス', user='root', passwd='パスワード', db='db名', charset='utf8')
connection.autocommit(True)
cursor = connection.cursor()

coincheck_key = 'アクセスキー'
coincheck_secret = 'シークレットキー'
coincheck_endpoint = 'https://coincheck.com'

coincheck_api = coincheckApi.ApiCall(coincheck_key, coincheck_secret, coincheck_endpoint)

# 信用取引の履歴取得
def leverage_history():
    result = coincheck_api.private_get_api_params('/api/exchange/leverage/positions').json()
    return result['data']

# SQLから1が返ってきたら既存のポジション
def new_or_not(coincheck_id):
    query = "SELECT * FROM leverage_history where id=%s"
    result = cursor.execute(query, (coincheck_id,))
    return result

# SQLのstatus(openかclosedか)確認
def open_or_closed(coincheck_id):
    query = "SELECT status FROM leverage_history where id=%s"
    cursor.execute(query, (coincheck_id,))
    return cursor.fetchall()


history = leverage_history()

for data in history:
    # 新規id
    if new_or_not(data['id']) == 0:
        # まだopenの場合
        if data['status'] == 'open':
            # 特定のデータを挿入
            coincheck_id = data['id']
            status = data['status']
            created_at = data['created_at']
            open_rate = data['open_rate']
            side = data['side']
            amount = data['all_amount']
            new_insert_query = "INSERT INTO leverage_history (id, status, created_at, open_rate, side, amount) VALUES (%s, %s, %s, %s, %s, %s)"
            cursor.execute(new_insert_query, (coincheck_id, status, created_at, open_rate, side, amount))
        # すでにcloseしている
        elif data['status'] == 'closed':
            # 全データを挿入
            coincheck_id = data['id']
            status = data['status']
            created_at = data['created_at']
            open_rate = data['open_rate']
            side = data['side']
            amount = data['all_amount']
            closed_at = data['closed_at']
            closed_rate = data['closed_rate']
            pl = data['pl']
            new_insert_query = "INSERT INTO leverage_history (id, status, created_at, open_rate, side, amount, closed_at, closed_rate, pl) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
            cursor.execute(new_insert_query, (coincheck_id, status, created_at, open_rate, side, amount, closed_at, closed_rate, pl))
        else:
            pass

    # 既存id
    else:
        # dbにclose情報が入っていなくて、すでにcloseされている場合
        if(data['status'] == 'closed' and open_or_closed(data['id'])[0][0] == "open"):
            coincheck_id = data['id']
            status = data['status']
            closed_at = data['closed_at']
            closed_rate = data['closed_rate']
            pl = data['pl']
            update_query = "UPDATE leverage_history SET status=(%s), closed_at=(%s), closed_rate=(%s), pl=(%s) WHERE id=(%s)"
            cursor.execute(update_query, (status, closed_at, closed_rate, pl, coincheck_id))
        else:
            pass

dbの様子。

2017-09-20 4.06.14.png

所感

業務削減につながり、同僚たちは私の知らないところで泣いて喜んでいるのかと思いきや、他の全取引所でもこれ(正確にはもうちょっと色々複雑にしてます)と同じようなものを作らなければならなくなりました。
そんなことより、GCPはやはり大変使いやすいし、仮想通貨の取引ログは絶対に取るべき。CloudSQLにデータをためていると、datastudioやdatalabを使ってすぐに可視化できるので、そこまで作れば税理士さんの立場は疎か、自分で納税額をきちんと算出できるのでは無いでしょうか。
Qiita初投稿でした。

5
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
8