初投稿です。最近話題の仮想通貨のことを調べていたら、取引所にAPIがあって驚いています。しかも公開情報として利用できるんですね。これがすごい。
この投稿では、仮想通貨の取引所で公開されているストリーミングAPIを使い、配信されてくるレートや板情報、直近の取引値や取引量を、C#を用いて記録するプログラムを作ります。
#ストリーミングAPIで公開されている情報
例えばフィスコ仮想通貨取引所(金融情報で有名なフィスコ)のAPIドキュメントを見ると、ストリーミングAPIの項目に
{
"asks":[[30000.0, 0.1],[30010.0, 0.2],...],
"bids":[[29500.0, 0.5],[29300.0, 0.1],...],
"trades":[{"currenty_pair":"btc_jpy","trade_type":"ask","price":30001,"tid":123,"amount":0.02,"date":1427879761},{}...],
"timestamp":"2015-04-01 18:16:01.739990",
"last_price:{"action":"ask","price":30001},
"currency_pair":"btc_jpy"
}
レートだけでなく、板情報や、いくらでどれだけの数量の取引が成立した等のトランザクションデータも含まれています。これが簡単に利用できるのはすごい。神。特に日本株だとこういうフリーで利用できるAPIなんて聞いたことありません(あったら教えて欲しいレベル)。人工市場という、実データに依存せずに、純粋にシミュレーションを通じて市場をモデリング化しデータを解析する研究ジャンルが発達してきたことを踏まえると、なかなかこれは感慨深いものがあります。
同様のAPIは、ZaifやCoinCheckなどでもありますが、今回はフィスコのAPIを使います。
#プログラムの設計
Webサービスではないので、データだけ収集してあとで煮るなり焼くなりするアプローチで行きます。
- ストリーミングAPIはWebSocketで配信されているので、そのデータ(JSON形式)を受信(WebSocket4Netを使用)
- 生JSONを保存すると容量が大変なことになるので、一定時間のJSONをzipファイルとして圧縮して格納(gzip等より容量が縮む圧縮アルゴリズムはあるが、今回はそこまでするほどでもないので、扱いやすいzip形式で)
- zipに格納するまでの間、メモリー内に受信したデータをキューとして蓄積
- WebSocketが勝手に切断されることがあるので、切断されたら再接続する機能もつける
- 基本的にWebSocketの受信イベントがメインスレッドでくる保証がないので、スレッドセーフになるように気をつける(**[重要]**ファイルIOやキュー操作、FormでGUIをつける場合はそのコントロール操作など)
実装はVisualStudio 2017 Communityで行いました。
#注意事項
- 鼻ほじりながら趣味で作ったものなのでサポートはしません。不具合があったら自分で直してください。
- 得られたデータを用いていかなる損害を被ったとしても作者は補償しません。
#実装とコード
NuGetからWebSocket4Netをインストールしておきます。JSONパースを同時に行う場合は、Json.NETなどのJSONパーサーも用意しておくとよいです(この例では使いません)。
ソースは次の3ファイルに分かれています。これらを同一ディレクトリに配置します。
- Program.cs(エントリーポイント)
- ReceivedData.cs(受信したデータのオブジェクト)
- Sockets.cs(複数のWebSocketを管理し、Zip圧縮、切断時の再接続を行うメイン部分)
プロジェクト名は「CryptoCurrencyDataMiner」としました。データを保存しているだけなので、本来の意味のデータマイニングではありませんが、仮想通貨のマイニングとかけてこの名前にしてみました。次に各ソースを書きます。
##(1)Program.cs
using System;
namespace CryptoCurrencyDataMiner
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Press s to start / Press q to exit after starts");
//サーバー接続開始
if (Console.ReadLine() == "s")
{
Sockets.OpenAll();
}
//終了待機
while(true)
{
var str = Console.ReadLine();
if (str == "q") break;
}
//サーバー接続終了
Sockets.CloseAll();
}
}
}
##(2)ReceivedData.cs
using System;
namespace CryptoCurrencyDataMiner
{
public class ReceivedData
{
public string LocalTime { get; private set; }
public string Data { get; private set; }
public string Filename { get; private set; }
public ReceivedData(string data)
{
var date = DateTime.Now.ToString("o");
//厳密にやるならdataをパースしてサーバータイムで
LocalTime = date;
Filename = date.Replace(":", "_") + ".txt";
Data = data;
}
}
}
保存するテキストファイル名はマシンのローカル時間をISO 8601表記し、ファイル名では使えないセミコロンをアンダーバーに置き換えたものになります。厳密にやるなら、配信されてきたサーバー時間のタイムスタンプを使ってもいいかもしれない。
##(3)Sockets.cs
今回はbtc_jpy(ビットコイン対円)とmona_jpy(モナコイン対円)の2つのペアを同時に取得します。拡張は_tickersの配列をいじってください。
using System;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;
using System.IO;
using System.IO.Compression;
using WebSocket4Net;
namespace CryptoCurrencyDataMiner
{
public static class Sockets
{
private static WebSocket[] _sockets;
private static Timer _autoReconnectTimer = null;
private static int _autoRecconectInterval = 60 * 1000;//自動再接続の間隔(ミリ秒)
private static ConcurrentQueue<ReceivedData>[] _messageQueue;//一時的にメッセージを格納しておくキュー
private static int _zipCreateInterval = 300;//Zipを生成する間隔(秒)
private static string[] _tickers = new string[] { "btc_jpy", "mona_jpy" };//ティッカー一覧
static Sockets()
{
var n = _tickers.Length;
_sockets = new WebSocket[n];
_messageQueue = new ConcurrentQueue<ReceivedData>[n];
for(var i=0; i<n; i++)
{
_sockets[i] = new WebSocket("wss://ws.fcce.jp:8888/stream?currency_pair=" + _tickers[i]);
_messageQueue[i] = new ConcurrentQueue<ReceivedData>();
//同一スレッドから呼ばれる保証がないのでスレッドセーフなコレクションを使うこと
//イベントハンドラ
_sockets[i].Opened += OnOpened(_tickers[i]);
_sockets[i].MessageReceived += OnMessageReceived(i, _tickers[i]);
_sockets[i].Error += OnError(_tickers[i]);
}
}
//サーバー接続完了時
private static EventHandler OnOpened(string ticker)
{
return delegate (object s, EventArgs e)
{
var str = string.Format("({0})Succeed to connect server", ticker);
ErrorLog(str);
};
}
//メッセージ受信時
private static EventHandler<MessageReceivedEventArgs> OnMessageReceived(int index, string ticker)
{
var queue = _messageQueue[index];
var previousTime = DateTime.Now;
return delegate (object s, MessageReceivedEventArgs e)
{
var item = new ReceivedData(e.Message);
queue.Enqueue(item);
Console.WriteLine("{0}:[{1}]MessageReceived ({2}KB)", item.LocalTime, ticker, item.Data.Length / 1024.0);
Console.WriteLine(e.Message);//長ったらしかったらここを消す
//一定間隔で保存
if ((DateTime.Now - previousTime).TotalSeconds >= _zipCreateInterval)
{
var dir = Environment.CurrentDirectory + "\\" + ticker;
if (!Directory.Exists(dir)) Directory.CreateDirectory(dir);
if (queue.Count == 0) return;
//Zip作成
var zippath = "[" + ticker + "]" + DateTime.Now.ToString("o").Replace(":", "_") + ".zip";
using (var zipToOpen = new FileStream(dir + "\\" + zippath, FileMode.Create))
using (var archive = new ZipArchive(zipToOpen, ZipArchiveMode.Update))
{
while (queue.Count != 0)
{
ReceivedData data;
if (!queue.TryDequeue(out data))
{
Thread.Sleep(500);//成功するまでTry
continue;
}
var txtEntry = archive.CreateEntry(data.Filename);
using (var writer = new StreamWriter(txtEntry.Open()))
{
writer.WriteLine(data.Data);
}
}
}
//前回記録時間の更新
previousTime = DateTime.Now;
}
};
}
//エラー時
private static EventHandler<SuperSocket.ClientEngine.ErrorEventArgs> OnError(string ticker)
{
return delegate (object s, SuperSocket.ClientEngine.ErrorEventArgs e)
{
var errorStr = string.Format("({0}){1}", ticker, e.Exception.ToString());
ErrorLog(errorStr);
};
}
//エラーログメソッド
static object _errorLogLock = new Object();
private static void ErrorLog(string errorStr)
{
var str = string.Format("[{0}]{1}", DateTime.Now.ToString(), errorStr);
Console.WriteLine(str);
//TextWriter.Synchronizedだけだとエラー発生するっぽいので自前ロック
lock (_errorLogLock)
{
using (var sw = new StreamWriter(Environment.CurrentDirectory + "\\error.log", true, Encoding.UTF8))
using (var writerSync = TextWriter.Synchronized(sw))
{
sw.WriteLine(str);
}
}
}
//再接続(テスト版)
private static void AutoReconnectCallBack(object args)
{
var i = 0;
foreach (var ws in _sockets)
{
if (ws.State == WebSocketState.Closed)
{
//再接続
var str = string.Format("({0})Try to reconnect", _tickers[i]);
ErrorLog(str);
ws.Open();
}
i++;
}
}
//一括接続
public static void OpenAll()
{
foreach (var ws in _sockets) ws.Open();
//自動再接続をオン
_autoReconnectTimer = new Timer(AutoReconnectCallBack);
_autoReconnectTimer.Change(_autoRecconectInterval, _autoRecconectInterval);
}
//一括切断
public static void CloseAll()
{
foreach (var ws in _sockets) ws.Close();
//自動再接続をオフ
if (_autoReconnectTimer != null) _autoReconnectTimer.Change(Timeout.Infinite, Timeout.Infinite);
}
}
}
メッセージキューの部分は、普通のQueueではなくスレッドセーフなConcurrentQueueを使います。イベントハンドラはJavaScriptのクロージャっぽくおしゃれに書いてみました。
メッセージ受信時に、キューにEnqueueし、_zipCreateIntervalの間隔(この設定では5分間隔)でキューにたまったデータをZipに吐き出します。Zipファイル名はテキストファイルと同様に、格納時のマシンのローカル時間をISO 8601表記し、セミコロンをアンダーバーに置き換えたものですが、先頭にティッカー名をつけました。ConcurrentQueueではDequeueではなくTryDequeueになるのが特徴で、Dequeueに失敗した場合は一定時間(500ms)後にまたDequeueするようにしました。
エラー発生時はErrorLogメソッドで、カレントディレクトリに「error.log」ファイルが作られますが、ここもスレッドセーフになるようにします。TextWriter.Synchronizedだけだと確かに書き込み部分はスレッドセーフになるのですが、StreamWriterでエラーを起こす(ここで衝突のエラーを出している)ので、自前のlockをかけました。厳密にやりたいなら、log4net等のライブラリを使ったほうがいいかもしれません。
再接続部分は、OnErrorでopenをかけてもいいのですが、例えばノートPCで動かしてネット接続そのものが切れたときのように、リトライが1回だけだと直らないケースがあります。そのため、別に接続状態をチェックするようなタイマーを用意し、定期的にチェックさせStateがClosedになっていた場合は再接続をトライするようにしました。試しに、ネットワークケーブルをぶち抜いて2~3分して入れ直したところ、ちゃんと再接続されました。このタイマーは60秒間隔で設定しています。
追記:5分間隔でも1日300個近いzipができるので、長く使いたければ日や月単位でフォルダ分けしたほうがいいです。ぜひ改造してみてください。
#必要容量について
btc-jpyの場合、おおよそ1秒間隔で4KBのJSONが送られてくるので生JSONをそのまま保存すると4×60×60×24=337.5MBの容量が必要です。ただし、Zip圧縮で容量が1/6ぐらいになるので計算上は56.25MBとなります(この値は取引の活発さやレートの配信頻度によるので、今後もっと取引が活発になれば容量は増える可能性があります)。
実際、**2018年1月現在、Zaifのデータで30時間放置してみたところ、btc-jpyでは57.7MB(46.2MB/d)、mona-jpyでは21.7MB(17.4MB/d)**だったので、そこまで保存するのが非現実的というわけではありません。マイナーなペアだったらもっと容量は少なくなります。
今のところそこまで本気で収集する予定はないので、使っていないノートPCで動かしていますが、もし本格的にやるなら言語書き換えてVPSなんかで動かそうかなーと思案中。
では皆さんよい仮想通貨ライフを~~