Spring IntegrationのTCP機能には、ソケットレベルのタイムアウト時間(SO_TIMEOUT)を指定することはできるのですが、これは接続相手から指定した時間データが送られてこないとタイムアウト扱いになります。
特に問題ないように思うかもしれませんが、例えばタイムアウト時間に「10秒」を指定した場合、接続完了から10秒間データのやり取りがないとタイムアウトになってしまいます。これは都度接続方式であれば問題にならない可能性はありますが、常時接続方式だと・・・問題になります。
本エントリーの記載は、NIO(Non-Blocking/IO)を使う前提になります。NIOじゃないと「やりたいこと」が実現できないので悪しからず。
やりたいこと
データを受信して1つのメッセージにデシリアライズが完了するまでのタイムアウト時間を指定できるようにしたい。
タイムアウト検知機能付きのデシリアライザ
タイムアウト検知機能付きのデシリアライザを作ってみました。
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.serializer.Deserializer;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class TimeoutableDeserializer<T> implements Deserializer<T>, InitializingBean, DisposableBean {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Deserializer<T> delegate;
private ScheduledExecutorService executor;
private long timeout = 30;
private TimeUnit timeoutUnit = TimeUnit.SECONDS;
private int monitoringThreadCorePoolSize = 10;
public TimeoutableDeserializer(Deserializer<T> delegate) {
this.delegate = delegate;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public void setTimeoutUnit(TimeUnit timeoutUnit) {
this.timeoutUnit = timeoutUnit;
}
public void setMonitoringThreadCorePoolSize(int monitoringThreadCorePoolSize) {
this.monitoringThreadCorePoolSize = monitoringThreadCorePoolSize;
}
@Override
public T deserialize(InputStream inputStream) throws IOException {
AtomicReference<T> data = new AtomicReference<>();
Thread monitoringTargetThread = Thread.currentThread();
// 指定したタイムアウト時間を経過したらデシリアライズ処理を行なっているスレッドに割り込む(=タイムアウトで処理を中断させる)タスクを追加
ScheduledFuture<?> monitoringFuture = executor.schedule(() -> {
if (data.get() == null) {
monitoringTargetThread.interrupt();
}
}, timeout, timeoutUnit);
try {
// 具体的なデシリアライズ処理を行うデシリアライザへ処理を移譲
data.set(delegate.deserialize(inputStream));
} finally {
// タスクが未実行であればタスクをキャンセル
if (!monitoringFuture.isDone()) {
monitoringFuture.cancel(true);
}
if (data.get() == null) {
// データが未設定ということは、割り込みよってデシリアライズ処理が中断した(=つまりタイムアウトになった)
logger.warn("Detect a timeout. timeout={}ms interrupted={}", timeoutUnit.toMillis(timeout), monitoringTargetThread.isInterrupted());
} else {
// データが取得できた場合でもタイミングによっては割り込み状態になる可能性があるので割り込み状態をリセットする
if (Thread.interrupted()) {
logger.info("Reset interrupted status because it has been deserialized receiving data to one message.");
}
}
}
// 取得したデータを返却する
return data.get();
}
@Override
public void afterPropertiesSet() {
// タイムアウト監視タスク用のスレッドプールを生成
this.executor = Executors.newScheduledThreadPool(monitoringThreadCorePoolSize);
}
@Override
public void destroy() {
// タイムアウト監視タスク用のスレッドプールを停止
executor.shutdown();
}
}
まとめ
とりあえず・・・やりたいことはできそう。標準機能でできるよ〜とか、もっとスマートな方法あるよ〜とかあれば是非コメントを!!