はじめに
半年ほど前から株を始めたけど,自分の資産額がどのように推移してるかを全く管理してなかったので,自動でまとめるプログラムを作ったので公開したいと思います.(実行環境はRaspberry Piです)
証券会社HPから資産額をスクレイピングする
僕はSBI証券を使っているので,以下はSBI証券での資産額の取得法です.他の証券会社でもこの部分を変更すれば使えると思います(知らんけど).
ホーム画面から現金残高,ポートフォリオ画面から時価評価額(国内株と投資信託)を取得します.
ホーム画面の現金残高
ココってところに金額が書かれているからセレニウムで取ってくる.このくらいサポートしてくれるAPIがあればいいんですが,ないので泥臭くHTMLをパースして取得します.
時価評価額の取得
ココってところをクリックするとcsvファイルがダウンロードされます.時価評価額をcsvファイルを解析して(csvファイルの最後の行に書いてある)取得します.このくらいサポートしてくれるAPIがあればいいんですが...
Googleシートへの自動書き込み
資産推移をGoogleSpreadSheetで管理することにしました.
実行方法
必要なライブラリをインストールした後,
python3 main.py
で実行.はじめにusernameとpasswordを入力すればあとは定時実行してくれます.(バグがある可能性はある)
詰まったところ
- seleniumでファイルのダウンロード先の指定
- Google SheetでAPIから書き込むときはCredentialを作るときにSCOPEを追加する必要があること
更新
流石にファイルにusernameとpasswordを書くのはアウトなので,ちょっとマシにしました.
あと,米株のPLも取得できるようにしました.
実装ではschedライブラリを使って定時実行に対応.
できたもの
以下ができたコードです.
ディレクトリ構造
.
├── data
│ └── 2020-11-01.csv
├── .credentials.json
├── .token.pkl
├── main.py
├── modules.py
└── worker.pid
モジュール
import os
import pickle
from datetime import datetime
import re
from getpass import getpass
import yaml
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
### constants
DOWNLOAD_PATH = os.path.abspath("data")
# google api
CRED_TOKEN = ".token.pkl"
GSHEET_ID = # Google Sheet のID
# webdriver
driver = None
class User(object):
def __init__(self):
self.username = input("Username: ")
self.password = getpass("Password: ")
def show(self):
print(f"Username: {self.username}")
print(f"Password: {self.password}")
def setup_webdriver(headless=True):
global driver
options = Options()
options.headless = headless
options.add_experimental_option("prefs", {"download.default_directory" : DOWNLOAD_PATH})
driver = webdriver.Chrome(options=options)
def quit_webdriver():
global driver
driver.quit()
def login_to_sbi(user):
url = "https://www.sbisec.co.jp/ETGate"
driver.get(url)
username_form = driver.find_element_by_id("user_input")\
.find_element_by_name("user_id")
username_form.send_keys(user.username)
password_form = driver.find_element_by_id("password_input")\
.find_element_by_name("user_password")
password_form.send_keys(user.password)
login_button = driver.find_element_by_name("ACT_login")
login_button.click()
def get_cash_amount_jp():
driver.get("https://site3.sbisec.co.jp/ETGate/?_ControlID=WPLEThmR001Control&_PageID=DefaultPID&_DataStoreID=DSWPLEThmR001Control&_ActionID=DefaultAID&getFlg=on")
cash = driver.find_element_by_class_name("tp-table-01")\
.find_element_by_class_name("tp-td-01")\
.find_element_by_tag_name("span")
return cash.text
def get_market_value_jp(timestamp):
driver.get("https://site3.sbisec.co.jp/ETGate/?_ControlID=WPLETpfR001Control&_PageID=WPLETpfR001Rlst10&_DataStoreID=DSWPLETpfR001Control&_SeqNo=1597374217450_default_task_466_DefaultPID_DefaultAID&_ActionID=csvdl&ref_from=1&ref_to=50&getFlg=on")
asset_file = f"{DOWNLOAD_PATH}/{timestamp}.csv"
os.rename(f"{DOWNLOAD_PATH}/New_file.csv", asset_file)
with open(asset_file, "r", encoding="shift_jis") as f:
content = f.read()
ll = content.split("\n")[-2].split(",")
return ll[0]
def get_profit_loss_jp(timestamp):
# timestamp = datetime.now().strftime('%Y-%m-%d')
cash = get_cash_amount_jp()
cash = float(re.sub(",", "", cash))
value = get_market_value_jp(timestamp)
value = float(re.sub(",", "", value))
profit_loss = round(cash + value, 2)
return profit_loss
def get_profit_loss_us():
driver.get("https://www.sbisec.co.jp/ETGate/?OutSide=on&_ControlID=WPLETsmR001Control&_DataStoreID=DSWPLETsmR001Control&sw_page=Foreign&cat1=home&cat2=none&sw_param1=GB&getFlg=on")
driver.get("https://global.sbisec.co.jp/Fpts/czk/profitAndLoss/moveProfitAndLoss")
td = driver.find_element_by_class_name("tblMod01.bSep")\
.find_element_by_tag_name("tbody")\
.find_element_by_tag_name("tr")\
.find_elements_by_tag_name("td")[-1]
val_us, val_jp = td.text.split("\n")
def format_val(val):
r = re.search("(\d|,|\.)*", val)
if r is not None:
return val[r.start(): r.end()]
else:
return "0"
return format_val(val_us), format_val(val_jp)
def get_credential():
creds = None
SCOPES = ['https://www.googleapis.com/auth/spreadsheets.readonly',
"https://www.googleapis.com/auth/spreadsheets"]
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(CRED_TOKEN):
with open(CRED_TOKEN, 'rb') as token:
creds = pickle.load(token)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'.credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(CRED_TOKEN, 'wb') as token:
pickle.dump(creds, token)
return creds
def update_gsheet(gsheet_range, vals):
creds = get_credential()
service = build('sheets', 'v4', credentials=creds)
body = {"values": [vals]}
# Call the Sheets API
resp = service.spreadsheets()\
.values()\
.append(spreadsheetId=GSHEET_ID,
range=gsheet_range,
valueInputOption="USER_ENTERED",
insertDataOption="INSERT_ROWS",
body=body)\
.execute()
def update(user):
setup_webdriver()
login_to_sbi(user)
timestamp = datetime.now().strftime('%Y-%m-%d')
pl_jp = get_profit_loss_jp(timestamp)
pl_us_d, pl_us_y = get_profit_loss_us()
update_gsheet("sbi!A1", [timestamp, pl_jp, pl_us_d, pl_us_y])
quit_webdriver()
実行するpythonファイル
import os
import sys
import sched
import time
from datetime import datetime, timedelta
from modules import User, update
def next_sunday():
"""return next sunday at 16:00 in unix time
"""
dt = datetime.now().date()
while dt.weekday() != 6: # weekday = 6 at Sunday
dt += timedelta(days=1)
sunday = datetime(dt.year, dt.month, dt.day, hour=16)
return time.mktime(sunday.timetuple())
def dump_pid():
with open("./worker.pid", "w") as f:
f.write(str(os.getpid()))
def main():
user = User()
user.show()
# set schedules
sc = sched.scheduler(time.time, time.sleep)
pid = os.fork()
if pid > 0:
pass
elif pid == 0:
dump_pid()
while True:
if sc.empty():
ut_sunday = next_sunday()
sc.enterabs(ut_sunday, 2, update, argument=(user,))
time.sleep(3600)
sc.run()
if __name__ == '__main__':
main()