1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Tomcatにおけるマルチテナント対応非同期処理の実装

Last updated at Posted at 2020-03-16

Tomcatにおけるマルチテナント対応非同期処理の実装

この記事は、「Tomcatにおける非同期処理の実装」の一部分です。

はじめに

Tomcatにおける非同期処理の実装」では、@Asyncとスレッドプールの組み合わせで、非同期タスクの流動制御ができることを紹介しました。しかし、@Asyncとスレッドプールは一対一の対応であるため、マルチテナントには対応できないわけです。

マルチテナントとは、1つのアプリケーションのインスタンスで、複数のお客様向けにサービスを提供する仕組みです。たとえば、次のようにテナントIDを指定した場合に、テナント「t1」と「t2」を異なるスレッドプールに割り当て、データストアの異なるスキーマへアクセスするというふうに、テナントごとに異なる資源・資産を利用するようなイメージです。

- 「GET http://localhost/async?tenantId=t1」
- 「GET http://localhost/async?tenantId=t2」

図4.0 マルチテナント対応のHTTPリクエスト

そこで、マルチテナント対応の非同期処理を実装する方法を考えてみます。また、Spring Frameworkの管理から外れる部分があり、スレッドプールのライフサイクル管理を自前で実装する必要があることも紹介します。

スレッドプール管理クラスを追加

マルチテナント対応のスレッドプールを管理するThreadPoolTaskExecutorsクラスを作成します。

このクラスでは、テナントIDとスレッドプールをkey-valueとしたMap型をフィールドで保持し、getExecutor()ではテナントIDに対応したスレッドプールを返しています。ところが、このクラスをBeanとして登録しても、Spring Frameworkはスレッドプールを内包していることを認識するわけではありません。

そこで、自前でスレッドプールのシャットダウンを行うために、@PreDestroyを付加したdestroy()を追加しています。Tomcat通常終了時に@PreDestroyのメソッドがコールバックされるため、そのタイミングでスレッドプールをシャットダウンさせるわけです。ただし、@Configurationクラスの作成で述べたように、このシャットダウンでもキューに滞留しているタスクが破棄される点に注意してください。

public class ThreadPoolTaskExecutors {
	private Map<String, ThreadPoolTaskExecutor> map = new HashMap<>();

	public ThreadPoolTaskExecutors(ThreadPoolSettings settings) {
		//テナントごとにスレッドプールを生成
		map.put("Tenant1", newExecutor("Tenant1", settings));
		map.put("Tenant2", newExecutor("Tenant2", settings));
		map.put("Tenant3", newExecutor("Tenant3", settings));
	}

	//スレッドプールを生成
	private static ThreadPoolTaskExecutor newExecutor(String tenantId, ThreadPoolSettings settings) {
		final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(settings.getCorePoolSize());
		executor.setQueueCapacity(settings.getQueueCapacity());
		executor.setMaxPoolSize(settings.getMaxPoolSize());
		executor.setKeepAliveSeconds(settings.getKeepAliveSeconds());
		executor.setThreadNamePrefix("Executor2-"+tenantId+"-");
		executor.setDaemon(false);
		executor.initialize();
		return executor;
	}

	//テナントIDに対応したスレッドプールを返す
	public ThreadPoolTaskExecutor getExecutor(String tenantId) {
		return map.get(tenantId);
	}

	//スレッドプールをシャットダウンさせる
	@PreDestroy
	private void destroy() {
		this.map.values().forEach(ThreadPoolTaskExecutor::shutdown);
	}
}

図4.1 ThreadPoolTaskExecutorsクラスの内容

@Configurationクラスの改修

前述のSampleConfigクラスに、下図のようにtaskExecutors2()を追加します。
このメソッドでは@Beanを利用して、マルチテナント用スレッドプール管理オブジェクトをBeanとして登録します。

public class SampleConfig {
		:
	@Bean
	public ThreadPoolTaskExecutors taskExecutors2() {
		return new ThreadPoolTaskExecutors(threadPoolSettings);
	}

図4.2 SampleConfig#taskExecutors2()の内容

@Serviceクラスの改修

前述のSampleServiceクラスに、下図のようにcmd2()を追加します。
cmd()と異なり、@Asyncは付けず、復帰値はFutureではなくStringのオブジェクトです。

public class SampleService {
		:
	public String cmd2(final String[] command) {
		final int exitValue = run(command);
		return "exitValue="+exitValue;
	}
		:
}

図4.3 SampleService#cmd2()の内容

@Controllerクラスの改修

前述のSampleControllerクラスを改修します。
まず、taskExecutors2フィールドを追加します。@AutowiredがあるのでSampleConfigクラスで登録したBeanが自動的に注入されます。

public class SampleController {
		:
	@Autowired
	private ThreadPoolTaskExecutors taskExecutors2;
		:
}

図4.4.1 SampleService#taskExecutors2の内容

次に、SampleServiceのcmd2()を呼び出すメソッドを追加します。
このメソッドの処理の流れはcmd()メソッドと同じですが、スレッドプールを取得して明に非同期呼び出しを行っている点で異なります。

public class SampleController {
		:
	@RequestMapping(value="/cmd2", method=RequestMethod.GET)
	@ResponseBody
	public String cmd2(@RequestParam(value="id", required=true)final String id) {
		final String tenantId = String.format("%s#%03d", id, counter.getAndIncrement());

		//ThreadPoolを取得
		final ThreadPoolTaskExecutor executor = taskExecutors2.getExecutor(id);
		if(executor == null) {
			return "Invalid id: id="+id+"\n";
		}

		try{
			final Future<String> future = executor.submit(()->{
				return sampleService.cmd2(COMMAND); //Serviceを呼び出す
			});

			synchronized(this.futures) {
				this.futures.put(tenantId, future);
			}
			return "Accepted: id="+tenantId+"\n";
		}catch(RejectedExecutionException e) {
			final CompletableFuture<String> future = new CompletableFuture<>();
			future.complete("Rejected");
			synchronized(this.futures) {
				this.futures.put(tenantId, future);
			}
			return "Rejected: id="+tenantId+"\n";
		}
	}

図4.4.2 SampleService#cmd2()の内容

動作確認

ローカルでTomcatコンテナを起動し、次のようにコマンドを実行して動作確認できます。

$ curl -X GET http://localhost:8080/cmd2?id=Tenant1
Accepted: id=Tenant1#001

$ curl -X GET http://localhost:8080/cmd2?id=Tenant1
Accepted: id=Tenant1#001

$ curl -X GET http://localhost:8080/cmd2?id=Tenant1
Accepted: id=Tenant1#001
$ curl -X GET http://localhost:8080/cmd2?id=Tenant1

$ curl -X GET http://localhost:8080/cmd2?id=Tenant1
Rejected: id=Tenant1#001

$ curl -X GET http://localhost:8080/cmd2?id=Tenant1
Rejected: id=Tenant1#001

# スレッドプールが「maxPoolSize=2」、「queueCapacity=2」のとき
# 10秒強待ってから/statusをGETすると
# 実行が終了したタスクが2個、実行中のタスクが2個、Rejectされたタスクが2個
# であることが出力される
$ curl -X GET http://localhost:8080/status
Tenant1#001: exitValue=0
Tenant1#002: Running
Tenant1#003: Running
Tenant1#004: exitValue=0
Tenant1#005: Rejected
Tenant1#006: Rejected

# Tomcatコンテナを通常終了させる
$ curl -X POST http://localhost:8080/shutdown

図4.5 マルチテナント対応非同期タスクの動作確認

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?