0
Help us understand the problem. What are the problem?

posted at

updated at

auカブコム証券のkabuステーションPUSH APIを受信してCSVファイルへ保存する

はじめに

前記事

  1. auカブコム証券のkabuステーションREST APIをcurlで叩く
  2. auカブコム証券のkabuステーションREST APIをjava(generated by the swagger code generator)で叩く
  3. auカブコム証券のkabuステーションREST APIの残高照会をcurlとjavaで叩く
  4. auカブコム証券のkabuステーションREST APIの残高照会から先物OPのdeltaを計算する
  5. auカブコム証券のkabuステーションREST APIのテスト用モックサーバーを作る
  6. auカブコム証券のkabuステーションREST APIの認証済TOKENをファイル管理する
  7. auカブコム証券のkabuステーションREST APIで自前のトレイル注文を作ってみる(情報収集編)
  8. auカブコム証券のkabuステーションREST APIで自前のトレイル注文を作ってみる(注文発注編)
  9. auカブコム証券のkabuステーションREST APIで自前のトレイル注文を作ってみる(ツール完成編)
  10. auカブコム証券のkabuステーションREST APIで自前のリピート注文を作ってみる(注文約定調査編)
  11. auカブコム証券のkabuステーションREST APIで自前のトレイル注文の改善(トレイル幅の動的拡張)
  12. auカブコム証券のkabuステーションREST APIで自前のリピート注文を作ってみる(設計、検証用実装編)
  13. auカブコム証券のチャートデータの日付を調整する
  14. auカブコム証券のkabuステーションPUSH APIのサンプル
  15. auカブコム証券のkabuステーションREST APIで自前のリピート注文を作ってみる(リファクタリング編)

14番目の記事で、PUSH APIの受信は出来るようになったので、必要な部分のみを切り出して、CSVファイルへ保存する。
Symbolに指定した銘柄以外も受信する可能性があるので、除外する。
ただし、価格を切り出しているが、価格が変わっていない場合でも、その都度出力する。

本番データが使えるようになってから出力結果を見て、サイズを落とす必要がある場合、価格が変わった場合のみ出力するように修正する。

環境設定

Global.cfgのSymbolの1銘柄のみ。
複数銘柄は後日ファイル名から検討する。

CSVファイル

保存先:\tmp\ChartData.csv

カラム定義

No Name Description Example
1 date 日時 2022-05-03 00:36:37
2 price 約定価格 26730.0

※日時は、"2022-05-03T00:36:37+09:00"から"T"と"+09:00"を取る

スレッド同期とバッファリング

おそらくWebSocketContainerが別スレッドで動作するので、コールバックonMessage()と、メインのexecute()と、スレッド同期を取る。
synchoronizedの範囲を狭くするため、addChartData()とgetAndClearChartData()の2メソッドのみとして、リストのコピーを返して、JSON文字列の解析の処理の間を待たせないようにする。

1秒に数十件を受信しそうなので、リストに追加して、メインのexecute()のスリープ時間でバッファフラッシュのタイミングを調整する。

追記:JSONデータ調査

保存してあった生JSONデータから、短時間で変更のあった項目を挙げる。

"TradingVolume":407256.0  ->  407261.0
"VWAP":26761.8904  ->  26761.8900
"TradingValue":1089894043000.0  ->  1089907408000.0
"BidQty":139.0  ->  170.0
"Sell1":{"Price":26730.0,"Qty":139.0,"Sign":"0101"}  ->  {"Price":26730.0,"Qty":170.0,"Sign":"0101"}
"Sell2":{"Price":26735.0,"Qty":252.0}  ->  {"Price":26735.0,"Qty":253.0}
"Sell3":{"Price":26740.0,"Qty":273.0}  ->  {"Price":26740.0,"Qty":276.0}
"Sell9":{"Price":26770.0,"Qty":749.0}  ->  {"Price":26770.0,"Qty":750.0}
"AskQty":52.0  ->  65.0
"Buy1":{"Price":26725.0,"Qty":52.0,"Sign":"0101"}  ->  {"Price":26725.0,"Qty":65.0,"Sign":"0101"}
"Buy5":{"Price":26705.0,"Qty":274.0}  ->  {"Price":26705.0,"Qty":275.0}
"Buy6":{"Price":26700.0,"Qty":318.0}  ->  {"Price":26700.0,"Qty":308.0}

追記:バッファリングされた最新の価格と同じ場合はスキップする

メモリ上のバッファの最新の価格(lastPrice)と同じ場合は、ファイル出力をスキップする。

追記:メッセージJSONをGsonでパース

検証用にメッセージJSONをGsonでパースしてみる。
io.swagger.client.model.BoardSuccessから気配を省いたapi.BoardBeanを作成する。
parseJson()でインスタンス化する。
先物は価格があれば当面はよいが、OPはGreeksの値が欲しいのでBoardBeanで受け取るようにする予定。

追記:時間外のデータのチェック

時間外だと

"Symbol":"167060019",
"SymbolName":"日経225mini 22/06",
"CurrentPrice":null,
"CurrentPriceTime":null,
"CurrentPriceChangeStatus":"",
"CurrentPriceStatus":null,

いまのソースだと、

String price = "null";
String date = null;

となる。
date.substringでNPEとなるが、priceが数値でないのも問題。

		// 価格を切り出す。時間外は"CurrentPrice":nullのため、"null"が返る。
		String price = StringUtil.parseString(message, "\"CurrentPrice\":", ",");
		if (price == null || "null".equals(price)) {
			return null;
		}

		// 日時を切り出す。時間外は"CurrentPriceTime":nullのため、見つからない。
		String date = StringUtil.parseString(message, "\"CurrentPriceTime\":\"", "\"");
		if (date == null) {
			return null;
		}

追記:15:10-16:30のデータ

-15:10
15:10-15:15 プレクロージング
16:15-16:30 プレオープニング
16:30-

2022-05-06 15:09:58,27075.0
2022-05-06 15:09:59,27070.0
2022-05-06 15:10:00,27075.0
2022-05-06 15:15:01,27105.0
2022-05-06 16:30:01,26995.0
2022-05-06 16:30:01,27000.0

追記:05:55-06:00のデータ

-05:55
05:55-06:00 プレクロージング

2022-05-07 05:54:51,26815.0
2022-05-07 05:54:59,26825.0
2022-05-07 06:00:01,26800.0

だからこっちには05:55のデータがないのか。
でも、05:55を例外的に扱うのか、前の05:50をコピーするのか、どちらでもよい気がしてきた。
変化がない閑散とした1日ならば、前の価格をコピーすることは普通の処理。

追記:1日のデータ件数

デバッグ用ログファイルを開きっぱなしにして、08:47~30:00までonMessage()の文字列を追記したところ、先物2022/06で466,094件受信した。ファイルサイズは731MB。

価格の変化があったものをCSVファイルに追記したら、08:47~30:00までで27,838件、789KB。

追記:メインスレッドのsleep間隔を10秒に変更

1秒ごとにバッファをフラッシュすると、バッファに5~6件で、実際に0~1件ファイル出力される。
参照する側は10秒遅れくらいでよいので、スリープ時間を1秒から10秒に延ばす。

追記:ファイル出力がエラーとなってもバッファがクリアされないように修正する

今のソースでは、ファイル出力時にエラーとなると、getAndClearChartData()時点でバッファをクリアしているので、消えてしまう。

	private void writeChartData() {
		List<String> bufList = getAndClearChartData();
		if (bufList.size() > 0) {
			int writeCnt = 0;
			try (PrintWriter pw = FileUtil.writer(DB_FILEPATH, FileUtil.UTF8, true)) {
				for (String s : bufList) {

そこで、isExistsChartData()を追加し、バッファのデータ有無を調べてから、ファイルオープンし、getAndClearChartData()で全データ取得とクリアを行う。

	private void writeChartData() {
		if (isExistsChartData()) {
			try (PrintWriter pw = FileUtil.writer(DB_FILEPATH, FileUtil.UTF8, true)) {
				int writeCnt = 0;
				List<String> bufList = getAndClearChartData();
				for (String s : bufList) {

isExistsChartData()がfalseの場合、getAndClearChartData()がsize() == 0のときと変わらない。
isExistsChartData()がtrueの場合、getAndClearChartData()までの短い時間に別スレッドから追加されても、処理対象の件数が増えるだけで影響はない。

事前にファイルを排他ロックするツールを実行しておくと、エラーとなる。
ロック解除すると、バッファのデータが抜けていないことを確認する(下記のテストではソース上に3件追加した状態で実行)。

MainChartData.writeChartData(): ERROR java.io.FileNotFoundException: \tmp\ChartData.csv (プロセスはファイルにアクセスできません。別のプロセスが使用中です。)
MainChartData.writeChartData(): bufList.size=3, writeCnt=2

追記:TradingVolumeを追加する

今後、onMessageの生JSONデータは捨てるので、売買高もCSVファイルに追加する。

カラム定義

No Name Description Example
1 date 日時 2022-05-03 00:36:37
2 price 約定価格 26730.0
3 volume 売買高 407256.0

追記:価格でなく出来高が変化したときに保存する。

当初、価格しか保存しなかったので、価格が変化されたときのみCSVファイルに保存していたが、売買高を追加したので、同じ価格でも売買高が変化されたときに保存する。

2022-05-17 01:52:32,26475.0,384463.0
2022-05-17 01:52:34,26470.0,384481.0
2022-05-17 01:52:36,26470.0,384485.0  ※この行が追加
2022-05-17 01:52:48,26475.0,384488.0

追記:シャットダウンフックを登録して、バッファをフラッシュする。

10秒ごとにバッファされたメッセージの処理を行うため、30:02以降に止めることになるが、Ctrl+Cで強制終了すると、バッファに溜まったメッセージが捨てられるため、シャットダウンフックを登録して、溜まっているバッファの処理を最後に行う。

12:23:05に次の処理が実行される前の12:23:04にCtrl+Cを押したログ。[1]と[17]はスレッドID。

2022/05/17 12:22:55.964 [1] MainChartData.writeChartData(): bufList.size=31, writeCnt=1
2022/05/17 12:23:04.205 [17] onClose:TyrusSession{uri=ws://localhost:18080/kabusapi/websocket, id='dba4bd0a- 
2022/05/17 12:23:04.212 [1] MainChartData.writeChartData(): bufList.size=47, writeCnt=9
2022/05/17 12:23:04.214 [1] MainChartData.execute: Done
2022/05/17 12:23:04.214 [17] ShutdownHook: Done

なお、onOpenやonMessageは[15]と[16]のスレッドが上がっていた。

2022/05/17 12:20:55.728 [15] onOpen:TyrusSession{uri=ws://localhost:18080/kabusapi/websocket, id='dba4bd0a-
2022/05/17 12:20:56.025 [16] onMessge:{"ClearingPrice":26480.0000,
2022/05/17 12:20:56.868 [15] onMessge:{"ClearingPrice":26480.0000,
2022/05/17 12:20:56.868 [16] onMessge:{"ClearingPrice":26480.0000,

追記:ChartData.lockを使ってチャートデータの排他制御する。

他のプロセスが安全にChartData.csvをRead/Writeできるように、ChartData.lockを使って排他制御する。

追記:カラム1の日時を、CurrentPriceTime(現値時刻)から、TradingVolumeTime(売買高時刻)に変更する。

もともと現値しか保存していなかったので、現値時刻を保存していたが、売買高を追加したので、売買高時刻に変更する。

githubソース

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
0
Help us understand the problem. What are the problem?