はじめに
GuavaライブラリはJavaにおけるコーディングを簡単化するための様々なユーティリティを提供するライブラリである.Googleの多くのJavaベースのプロジェクトで使用されているらしく、apacheのcommonsライブラリと似た機能を提供する.
このページは、自分でGuavaライブラリを使ってみて使い方が分かったものから随時まとめる備忘録である.書き始めとして、日本語や英語のブログを見てもなかなか見つからないcom.google.common.util.concurrentパッケージについてまとめる.確かに面倒だし、かなり大規模な実装プランがないと手を出さないかも知れないけど折角だからまとめてほしかったと他のブログを読んで思ったので、自分でまとめてみる.
concurrentパッケージは、スレッド制御に関する便利機能を提供するパッケージである.スレッドを直接制御するのではなく各種ExecutorServiceを作ることで様々なスレッド制御のテンプレートパターンを活用することができる.すべてを紹介することはできないが、ここでは、ListenableFutureについて紹介する(他のも使ったら、随時更新する予定である)
ListenableFutureクラス
ListenableFutureは、従来のjava.util.concurrentのFutureクラスの拡張で、Futureは非同期計算(処理)の結果を扱うためのクラスである.計算が完了したかどうかのチェック、完了までの待機、計算結果の取得などを行うためのメソッドが用意されている.ListenableFutureは、計算が完了した後に、その結果を用いて次の処理を行うListenerを登録できるFutureクラスの拡張である.
上記のjavacodegeeksで紹介されている例を簡略化した(urlのコンテンツへのアクセスは行わない)コード例を以下に示す(長いのでmain関数の中身のみ書くことにする)
URL[] topSites = null;
try {
topSites = new URL[] {new URL("http://www.google.com")};
} catch (MalformedURLException e1) {
e1.printStackTrace();
}
logger.debug("in main");
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
for (final URL siteUrl : topSites) {
final ListenableFuture<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
logger.debug("in call()");
Thread.sleep(100);
return siteUrl.toString();
}
});
future.addListener(new Runnable() {
@Override
public void run() {
try {
final String contents = future.get();
logger.debug("listener[1]: {}", contents);
Thread.sleep(100);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} finally {
logger.debug("job completed.");
}
}
}, pool);
future.addListener(new Runnable() {
@Override
public void run() {
try {
final String contents = future.get();
logger.debug("listener[2]: {}", contents);
Thread.sleep(100);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} finally {
logger.debug("job completed.");
}
}
}, pool);
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdownNow();
コード全体は以下を参照。
コードの処理内容を以下にまとめる.
- urlの配列を作る
- urlを文字列化して100ms待つCallableインタフェースの登録、実行を行い、ListenableFutureを得る
- 得られたListenableFutureに対して結果文字列を出力して、100ms待つ処理を行うListenerを2つ登録する
- すべての処理が終わるまで待って、終了する
このソースの実行結果を以下に示す.
14:55:58.286 [main] DEBUG concurrent.App - in main
14:55:58.300 [pool-1-thread-1] DEBUG concurrent.App - in call()
14:55:58.401 [pool-1-thread-2] DEBUG concurrent.App - listener[1]: http://www.google.com
14:55:58.401 [pool-1-thread-3] DEBUG concurrent.App - listener[2]: http://www.google.com
14:55:58.502 [pool-1-thread-3] DEBUG concurrent.App - job completed.
14:55:58.502 [pool-1-thread-2] DEBUG concurrent.App - job completed.
結果の読み方は、左から、実行された時間、スレッド名、デバッグモード、呼ばれたクラス、出力文字列となっている.実行時間とスレッド名を見るとcallが呼ばれて、約100msec後に登録された2つのリスナーがcallメソッドが実行されたスレッドとはそれぞれ別のスレッドで同時実行されていることが確認できる.Listenerが同時実行されたのは、ListenableFutureのaddListenerメソッドの第二引数にスレッドプールのExecutorServiceを登録しているからであり、例えば、以下のようにMoreExecutors.sameThreadExecutor()(callメソッドと同じスレッドを使うよう)に切り替えると
future.addListener(new Runnable() {
@Override
public void run() {
try {
final String contents = future.get();
logger.debug("listener[2]: {}", contents);
Thread.sleep(100);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} finally {
logger.debug("job completed.");
}
}
}, MoreExecutors.sameThreadExecutor());
結果は以下のようになる.
15:14:42.126 [main] DEBUG concurrent.App - in main
15:14:42.141 [pool-1-thread-1] DEBUG concurrent.App - in call()
15:14:42.242 [pool-1-thread-1] DEBUG concurrent.App - listener[1]: http://www.google.com
15:14:42.343 [pool-1-thread-1] DEBUG concurrent.App - job completed.
15:14:42.343 [pool-1-thread-1] DEBUG concurrent.App - listener[2]: http://www.google.com
15:14:42.444 [pool-1-thread-1] DEBUG concurrent.App - job completed.
同じく実行時間とスレッド名に着目しながら見ると、callメソッドで使われたスレッドと同じスレッドを用いて、2つのListenerメソッドが順次実行されていることが読み取れる.
このように、ListenableFutureを使うと、パイプ処理のように実行結果を受けた処理をつなげる設計ができ、ExecutorServiceを変更することでシーケンシャルに実行するのか、並列処理するのかなどの柔軟な制御を行うことができる.
各種実装の解説
ここでは、コードの内容について簡単に説明する.
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ExecutorServiceに、スレッドプールの中のスレッドを利用するようにするExecutorServiceを設定している.ListeningExecutorServiceは、ListenableFutureを返すExecutorServiceの実装で、MoreExecutors.listeningDecorator()で,java.util.concurrent.Executorsで作成したスレッドプールを使うExecutorServiceをラッピングしている.
final ListenableFuture<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
logger.debug("in call()");
Thread.sleep(100);
return siteUrl.toString();
}
});
Callableインタフェースの実装とその実行を指示し、その未来の計算結果として、ListenableFutureインスタンスを取得している.
future.addListener(new Runnable() {
@Override
public void run() {
try {
final String contents = future.get();
logger.debug("listener[1]: {}", contents);
Thread.sleep(100);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} finally {
logger.debug("job completed.");
}
}
}, pool);
得られたFutureクラスに、Runnableの実装を登録している.Listenerというけど中身はRunnableの実装である.runメソッド中では、futureから計算結果を取得し、その後の処理が続く.
ListenableScheduledFutureクラス
ListenableFutureをスケジューラで使いたい場合のコードが以下。ListeningExecutorServiceの代わりに、ListeningScheduledExecutorServiceが、ListenableFutureの代わりに
ListenableScheduledFutureがそれぞれ用意されている。以下のコードは自分の書いたコードをシンプル化したものだけど、まだテストしていないからバグがあるかも・・
public class DataClowler implements Callable<String> {
protected static Logger logger = LoggerFactory.getLogger(DataClowler.class);
protected ListenableScheduledFuture future = null;
protected ListeningScheduledExecutorService scheduler = null;
protected ListeningExecutorService pool = null;
protected static final int threadPoolNum = 10;
protected long delay = 0;
protected String uri = null;
public DataClowler(String url) {
this.scheduler = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadPoolNum));
this.pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(threadPoolNum));
this.uri = url;
this.delay = 2000; // msec
}
public String call () {
try {
String resource = Resources.toString(new URL(this.uri), Charsets.UTF_8);
future = scheduler.schedule(this, this.delay, TimeUnit.MILLISECONDS);
future.addListener(new Echo(future), this.pool);
return resource;
} catch (Exception e) {
logger.error("Error: {}", e);
return null;
}
}
/**
* タスクを開始(再開)する。
*/
public void start() {
future = scheduler.schedule(this, this.delay, TimeUnit.MILLISECONDS);
future.addListener(new DataClowler(core, future), this.pool);
}
/**
* schedulerに登録したタスクを停止する。
*/
public void stop() {
if (future != null) future.cancel(true);
}
/**
* schedulerをシャットダウンする
*/
public void shutdown() {
scheduler.shutdownNow();
}
}
上述のコードのEchoクラスの実装は以下の通り。ListenableFutureのコード例と違い、独立して定義する場合、futureオブジェクトをコンストラクタに渡しておくと良い。
public class Echo implements Runnable {
protected ListenableScheduledFuture future = null;
public Echo(ListenableScheduledFuture future) {
this.future = future;
}
@Override
public void run() {
try {
final String contents = (String)future.get();
logger.debug("listener[1]: {}", contents);
Thread.sleep(100);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} finally {
logger.debug("job completed.");
}
}
}
以上、ScheduledExecutorServiceと連携する場合の実装を紹介した。できれば、scheduleAtFixedRate()関数と一緒に使いたいところだけど、今のところ、
Callableインタフェースを登録できないので、使えない模様。画竜点睛を欠く感じで今後の実装に期待したい。