はじめに
私は金融ソリューション事業部にて証券系の業務に従事しています。この業界では株式・債権・FXなどの取引を行う際に、FIX (Financial Information eXchange) プロトコルがよく使われます。
今回はquickfix/j (以後 qfj と呼びます) という、オープンソースで開発されているJavaのFIXプロトコルエンジンについて紹介しようと思います。
ちなみに、quickfixにはほかも quickfix/Go (Go言語のライブラリ)や quickfix/n (.NETのライブラリ) などもあります。
FIXプロトコルについて
qfj の話に入る前に、実装するうえで最低限知っておいてほしいFIXプロトコルの仕様について簡単に説明します。
FIXについて知っているよ、という方は飛ばしていただいて大丈夫です。
FIXとは
Financial Information eXchange(FIX)プロトコルは、TCP/IP上で動く証券取引のリアルタイムにやり取りをするために開発されたメッセージ仕様です。
Initiator と Acceptor
FIXプロトコルでは、発注側 (Initiator) と受注側 (Acceptor) の2つの立場があります。例えば、証券会社-取引所の間でFIXプロトコルを使った通信をする場合は証券会社が発注側、取引所が受注側になります。
- Initiator: 注文を発注する側です。FIXプロトコルでは発注する側からログオンを送信し、注文の新規・訂正・取消等を送信します。
- Acceptor: 注文を受注する側です。FIXプロトコルではInitiatorから来た電文に対して、注文の新規受付(拒否)や約定等を送信します。
電文形式
FIXプロトコルの電文(メッセージ)は<Tag>=<Value> という形式をSOH文字 (文字コード0x01) でつなげたものになっています。
以下は SOH文字を | で置き換えた電文例です。
8=FIX.4.2 | 9=178 | 35=8 | 49=PHLX | 56=PERS | 52=20071123-05:30:00.000 | 11=ATOMNOCCC9990900 | 20=3 | 150=E | 39=E | 55=MSFT | 167=CS | 54=1 | 38=15 | 40=2 | 44=15 | 58=PHLX EQUITY TESTING | 59=0 | 47=C | 32=0 | 31=0 | 151=15 | 14=0 | 6=0 | 10=128 |
例えば、 52=20071123-05:30:00.000 は Tag-52(SendingTime) が 2007/11/23 5:30 であることを意味しています。
どのタグが何を意味するのかはAcceptor側が出しているFIX仕様書であったり、FIXmate といったリファレンスツール等を使うことで調べることができます。
管理電文・業務電文
FIXでは電文は2つに分類されます。
- 管理電文: ログオン・ログアウト電文やハートビート電文などの通信に関する電文を指します。
- 業務電文: 新規注文や約定などの取引に関する電文を指します。
メッセージの受信
さて、ここからはサンプルプログラムの実装を始めましょう。
受信クラス
まずは Application インターフェースを継承したクラスを作ります。
このクラスでは7つのイベントのメソッドをOverride できます。
- fromAdmin
- fromApp
- onCreate
- onLogin
- onLogout
- toAdmin
- toApp
また、特定のメッセージタイプだけ取り扱いたい場合はqfjにある MessageCrackerクラスを継承し、 onMessage メソッドを適切に実装することで取り扱いが可能です。
public class MyApplication extends MessageCracker implements Application {
private static final Logger logger = LoggerFactory.getLogger(AcceptorApp.class);
/* セッション構築時のイベント */
@Override
public void onCreate(SessionID sessionId) {
logger.info("onCreate: SessionId={}", sessionId);
}
/* ログオン時のイベント */
@Override
public void onLogon(SessionID sessionId) {
logger.info("onLogon: SessionId={}", sessionId);
}
/* ログアウト時のイベント */
@Override
public void onLogout(SessionID sessionId) {
logger.info("onLogout: SessionId={}", sessionId);
}
/* 管理電文の送信イベント */
@Override
public void toAdmin(Message message, SessionID sessionId) {
logger.info("toAdmin: Message={}, SessionId={}", message, sessionId);
}
/* 業務電文の送信イベント */
@Override
public void toApp(Message message, SessionID sessionId) {
logger.info("toApp: Message={}, SessionId={}", message, sessionId);
}
/* 管理電文の受信イベント */
@Override
public void fromAdmin(Message message, SessionID sessionId) {
logger.info("fromAdmin: Message={}, SessionId={}", message, sessionId);
}
/* 業務電文の受信イベント */
@Override
public void fromApp(Message message, SessionID sessionId) throws UnsupportedMessageType, FieldNotFound, IncorrectTagValue {
logger.info("fromApp: Message={}, SessionId={}", message, sessionId);
crack(message, sessionId);
}
/* NewOrderSingle (MsgType=D) を受信したときにコールされる. */
public void OnMessage(NewOrderSingle message, SessionID sesionId)
{
// 新規注文受付処理を書いたりする.
}
/* Handler アノテーションを付けることでメソッド名をonMessage以外に変更することも可能. */
@Handler
public void onOrderCancelMessage(OrderCancelRequest msg, SessionID sessionId)
{
// 注文取消処理を書いたりする.
}
}
メッセージの取扱
受信したメッセージから情報を取得するにはメッセージクラスから値を取得します。取得メソッドはいくつかあるので、読みやすいメソッドを使ってください。
public void read(NewOrderSingle message) throws FieldNotFound
{
String clOrdID;
clOrdID = message.get(new ClOrdID()).getValue(); // getメソッドから取得
clOrdID = message.getClOrdID().getValue(); // getter から取得
clOrdID = message.getString(ClOrdID.FIELD); // FIELD値(Tag番号)から取得
clOrdID = message.getField(new ClOrdID()).getValue(); // getFieldメソッドから取得
double qty; // 数値型フィールド
qty = message.get(new OrderQty()).getValue();
qty = message.getOrderQty().getValue();
qty = message.getDouble(OrderQty.FIELD);
qty = message.getField(new OrderQty()).getValue();
String clearingAccount;
// clearingAccount = message.get(new ClearingAccount()).getValue(); // そのMessageに未定義のフィールドはgetメソッドは使えない.
// clearingAccount = message.getClearingAccount().getValue();
clearingAccount = message.getString(ClearingAccount.FIELD); // コンパイルは通るが、メッセージに無ければ例外が飛ぶ.
clearingAccount = message.getField(new ClearingAccount()).getValue();
}
メッセージの送信
メッセージ作成
送信したいメッセージを作るには、メッセージクラスを作成し、そこに値を設定していきます。
public Message CreateNewOrderSingle()
{
NewOrderSingle msg = new NewOrderSingle();
msg.set(new ClOrdID("NSS09080001")); // Setメソッドで値を設定できる.
// もしくは必須フィールドはコンストラクタで指定可能.
msg = new NewOrderSingle(
new ClOrdID("NSS09080001"),
new Side(Side.BUY),
new TransactTime(LocalDateTime.now()),
new OrdType(OrdType.MARKET)
);
msg.set(new OrderQty(5000));
return msg;
}
送信処理
送信は何かクラスを継承する必要はなく、staticメソッドである Session.sendToTarget を呼ぶだけで送信ができます。
public void send(Message msg, SessionID sessionId)
{
try {
if (!Session.sendToTarget(msg, sessionId))
{
logger.error("Failed to send.");
}
logger.info("Successfully sending message.");
} catch (SessionNotFound e) {
logger.error("Session not found.", e);
}
}
Configuration
メッセージの送受信処理が書けたところで、実際に通信してみたいところですが、その前に接続先の設定について話します。
接続先 (host, port 以外にも CompID, SubID, LocationID タグの値なども含みます) はConfigファイルに書いておきます。
接続先以外にも接続開始・終了時間や再接続の間隔などの設定も可能です。
設定の詳細は以下に纏まっています。
Acceptor.cfg
# 全セッション共通
[DEFAULT]
ConnectionType=acceptor
StartTime=00:00:00
EndTime=23:59:59
HeartBtInt=30
SenderCompID=ACCEPTOR
TargetCompID=INITIATOR
FileLogPath=log/acceptor_log
FileStorePath=log/acceptor_message_log
# セッションごとの設定
[SESSION]
BeginString=FIX.4.4
SocketAcceptPort=57752
Initiator.cfg
# 全セッション共通
[DEFAULT]
ConnectionType=initiator
StartTime=00:00:00
EndTime=23:59:59
HeartBtInt=30
ReconnectInterval=60
SenderCompID=INITIATOR
TargetCompID=ACCEPTOR
FileLogPath=log/initiator_log
FileStorePath=log/initiator_message_log
# セッションごとの設定
[SESSION]
BeginString=FIX.4.4
SocketConnectHost=localhost
SocketConnectPort=57752
通信処理の開始
次はメッセージの送受信を動かしてみます。
Acceptor, Initiatorの順に動かすことでログオンメッセージ、しばらくするとハートビートメッセージが通信されるようになります。
Acceptor
Acceptor側は以下のように Acceptor.start を呼び出すことで、Initiatorからのログオンを受信できるようになります。
public void start(String[] args) throws ConfigError {
Application application = new MyApplication(); // 自作のApplication実装
SessionSettings settings = new SessionSettings("cfg/acceptor.cfg"); // configのあるパスを指定.
MessageStoreFactory storeFactory = new FileStoreFactory(settings);
LogFactory logFactory = new FileLogFactory(settings);
MessageFactory messageFactory = new DefaultMessageFactory();
try
{
Acceptor acceptor = new SocketAcceptor(application, storeFactory, settings, logFactory, messageFactory);
acceptor.start();
}
catch(Exception e)
{
e.printStackTrace();
}
}
Initiator
Initiator側はAcceptorの開始後に Initiator.start を呼び出すことでログオン処理が開始されます。
public static void start(String[] args) throws ConfigError {
try
{
Application application = new InitiatorApp();
SessionSettings settings = new SessionSettings("cfg/initiator.cfg");
MessageStoreFactory storeFactory = new FileStoreFactory(settings);
LogFactory logFactory = new FileLogFactory(settings);
final DefaultMessageFactory defaultMessageFactory = new DefaultMessageFactory();
defaultMessageFactory.addFactory(FixVersions.BEGINSTRING_FIX44, quickfix.fix44.MessageFactory.class);
Initiator initiator = new SocketInitiator(application, storeFactory, settings, logFactory, defaultMessageFactory);
initiator.start();
// Mainスレッドが終了するとinitiatorも終わってしまいログアウトするので、実際には別スレッドで実行したりCountdownLatch等を使ってスレッドを止めておく.
Thread.sleep(300_000);
}
catch (Exception e)
{
e.printStackTrace();
}
}
サンプルコード
qfj のリポジトリ内にはInitiator, Acceptor それぞれのqjfの実装のサンプルコードがあります。
- Banzai: Initiator の実装
- Executor: Acceptor の実装
上記で色々と話ましたが、より具体的な使い方はこのサンプルコードを見るのがわかりやすいと思います。このサンプルコードを読むのに、この記事が役立てば幸いです。
