はじめに
SpringBootでは非同期処理を簡単に使える仕組みが用意されています。 非同期処理をさせたいメソッドに @Async
をつけ、アプリケーション全体の設定として @EnableAsyc
をつけるだけで良いのですが、これはControllerだけではなく、他のステレオタイプでも実施できます。
これにより、1つのController+Serviceから複数のデータソース・複数環境のデータを結合して返せるようになります。
簡単な例として、データベースの検索結果を取得するRepository、もう一方はWebAPIからJSONで検索結果をもらうRepositoryから得られた結果を結合して返す機能を作るとします。
この2つを結合した結果を返す、一番単純で簡単な方法、2つのRepositoryの結果を順次呼び出して、その結果を返すことです。
データベースの検索を行うRepositoryをA、WebAPIを実行するRepositoryをBとして、
Aが結果を返すまでに1秒かかるとし、
Bが結果を返すまでに2秒かかるとします。
Aが終わったあとにBを順次呼び出す方法では、検索結果が得られるまでに A+Bの3秒かかります。
かたや、AとBが並列に動作すれば、検索結果が得られるまで、最大2秒で済むでしょう。これを簡単に実現できるのが、@Async
とCompletableFutureを組み合わせる方法です。
実装例
Service
ServiceクラスからRepositoryを呼ぶときに、並列で実行したい処理に対してCompletableFutureで受け取り、その後、結合したいタイミングで、CompletableFuture.allOf(並列処理した結果....).join() をします。
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.springframework.stereotype.Service;
import com.github.apz.springsample.dto.Item;
import com.github.apz.springsample.dto.Order;
import com.github.apz.springsample.dto.OrderedItem;
import com.github.apz.springsample.repository.ItemRepository;
import com.github.apz.springsample.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Service
@RequiredArgsConstructor
@Log4j2
public class MainService {
private final ItemRepository itemRepository;
private final OrderRepository orderRepository;
public List<OrderedItem> selectAllItems() throws InterruptedException, ExecutionException {
CompletableFuture<List<Item>> items = itemRepository.allItems();
CompletableFuture<List<Order>> orders = orderRepository.getRecentOrders();
CompletableFuture.allOf(items, orders).join();
log.info("MainService : merged.");
List<Item> itemList = items.get();
List<Order> orderList = orders.get();
// 結合処理をして返す
return getOrderedItem(itemList, orderList);
}
}
Repository
それぞれのRepositoryでは、@Asyncを付与して、CompletableFutureをラップした結果を返すようにします。
ItemRepositoryは1秒待機したあとデータベースに検索します(Thread.sleepを使って、仮想的に待機時間を作っています)
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Repository;
import com.github.apz.springsample.dto.Item;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Repository
@RequiredArgsConstructor
@Log4j2
public class ItemRepository {
private final SqlSessionTemplate sqlSessionTemplate;
@Async
public CompletableFuture<List<Item>> allItems() throws InterruptedException {
log.info("ItemRepository >>");
Thread.sleep(1000); // 1秒待機
// データベースから検索結果を取得
List<Item> result = sqlSessionTemplate.selectList("selectItems");
log.info("<< ItemRepository");
return CompletableFuture.completedFuture(result);
}
}
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Repository;
import com.github.apz.springsample.dto.Order;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
@Repository
@RequiredArgsConstructor
@Log4j2
public class OrderRepository {
@Async
public CompletableFuture<List<Order>> getRecentOrders() throws InterruptedException {
log.info("OrderRepository >>");
Thread.sleep(2000); // 2秒待機
List<Order> orders = getRecentOrderData();
log.info("<< OrderRepository");
return CompletableFuture.completedFuture(orders);
}
}
実際にこのServiceを実行してみると、
2018-07-22 22:03:33.613 INFO 11432 --- [ AsyncTask-2] c.g.a.s.repository.OrderRepository : OrderRepository >>
2018-07-22 22:03:33.613 INFO 11432 --- [ AsyncTask-1] c.g.a.s.repository.ItemRepository : ItemRepository >>
2018-07-22 22:03:34.615 INFO 11432 --- [ AsyncTask-1] c.g.a.s.repository.ItemRepository : << ItemRepository
2018-07-22 22:03:35.613 INFO 11432 --- [ AsyncTask-2] c.g.a.s.repository.OrderRepository : << OrderRepository
2018-07-22 22:03:35.613 INFO 11432 --- [nio-8080-exec-1] c.g.a.springsample.service.MainService : MainService : merged.
OrderRepositoryとItemRepositoryがほぼ同時に実行され、ItemRepositoryが1秒後、OrderedRepositoryが2秒後に結果を返し、MainServiceはすべてのReositoryが返事を返した時間、つまり2秒後にその後の処理を実行されました。
その他必要な設定
@Asyncを使うときは、SpringBootの設定にて、@EnableAsyncを付与します。
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class SpringAnyncServiceApplication {
public static void main(String[] args) {
SpringApplication.run(SpringAnyncServiceApplication.class, args);
}
}
また、CompletableFutureの同時スレッド数や上限を以下のように設定可能です。
import java.util.concurrent.Executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class AsyncConfiguration {
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setQueueCapacity(2);
executor.setMaxPoolSize(40);
executor.setThreadNamePrefix("AsyncTask-");
executor.initialize();
return executor;
}
}
余談
データベースを使った検索をする際に、1つの巨大なSQLで実現せずに複数のSQLに分割して実行する場合にもこの手法は非常に有効ですが、データソースの同時接続数を多くしないとあまり意味がなく、データベースの負荷は当然高まります。
参考資料・参考文献
https://spring.io/guides/gs/async-method/
blog.ik.am : 誤解しがちなThreadPoolTaskExecutorの設定