23
Help us understand the problem. What are the problem?

More than 3 years have passed since last update.

posted at

SpringBootの@AsyncとCompletableFutureを組み合わせて並列ロジックを作る

はじめに

SpringBootでは非同期処理を簡単に使える仕組みが用意されています。 非同期処理をさせたいメソッドに @Async をつけ、アプリケーション全体の設定として @EnableAsyc をつけるだけで良いのですが、これはControllerだけではなく、他のステレオタイプでも実施できます。

これにより、1つのController+Serviceから複数のデータソース・複数環境のデータを結合して返せるようになります。

簡単な例として、データベースの検索結果を取得するRepository、もう一方はWebAPIからJSONで検索結果をもらうRepositoryから得られた結果を結合して返す機能を作るとします。
この2つを結合した結果を返す、一番単純で簡単な方法、2つのRepositoryの結果を順次呼び出して、その結果を返すことです。

データベースの検索を行うRepositoryをA、WebAPIを実行するRepositoryをBとして、

Aが結果を返すまでに1秒かかるとし、
Bが結果を返すまでに2秒かかるとします。

Aが終わったあとにBを順次呼び出す方法では、検索結果が得られるまでに A+Bの3秒かかります。

かたや、AとBが並列に動作すれば、検索結果が得られるまで、最大2秒で済むでしょう。これを簡単に実現できるのが、@AsyncとCompletableFutureを組み合わせる方法です。

実装例

Service

ServiceクラスからRepositoryを呼ぶときに、並列で実行したい処理に対してCompletableFutureで受け取り、その後、結合したいタイミングで、CompletableFuture.allOf(並列処理した結果....).join() をします。

MainService.java
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.springframework.stereotype.Service;

import com.github.apz.springsample.dto.Item;
import com.github.apz.springsample.dto.Order;
import com.github.apz.springsample.dto.OrderedItem;
import com.github.apz.springsample.repository.ItemRepository;
import com.github.apz.springsample.repository.OrderRepository;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Service
@RequiredArgsConstructor
@Log4j2
public class MainService {
    private final ItemRepository itemRepository;
    private final OrderRepository orderRepository;

    public List<OrderedItem> selectAllItems() throws InterruptedException, ExecutionException {
        CompletableFuture<List<Item>> items = itemRepository.allItems();
        CompletableFuture<List<Order>> orders = orderRepository.getRecentOrders();

        CompletableFuture.allOf(items, orders).join();
        log.info("MainService : merged.");

        List<Item> itemList = items.get();
        List<Order> orderList = orders.get();

        // 結合処理をして返す
        return getOrderedItem(itemList, orderList);
    }
}

Repository

それぞれのRepositoryでは、@Asyncを付与して、CompletableFutureをラップした結果を返すようにします。

ItemRepositoryは1秒待機したあとデータベースに検索します(Thread.sleepを使って、仮想的に待機時間を作っています)

ItemRepository.java
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Repository;

import com.github.apz.springsample.dto.Item;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Repository
@RequiredArgsConstructor
@Log4j2
public class ItemRepository {
    private final SqlSessionTemplate sqlSessionTemplate;

    @Async
    public CompletableFuture<List<Item>> allItems() throws InterruptedException {
        log.info("ItemRepository >>");
        Thread.sleep(1000);  // 1秒待機

        // データベースから検索結果を取得
        List<Item> result = sqlSessionTemplate.selectList("selectItems");
        log.info("<< ItemRepository");
        return CompletableFuture.completedFuture(result);

    }
}
OrderRepository.java
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Repository;

import com.github.apz.springsample.dto.Order;

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

@Repository
@RequiredArgsConstructor
@Log4j2
public class OrderRepository {
    @Async
    public CompletableFuture<List<Order>> getRecentOrders() throws InterruptedException {
        log.info("OrderRepository >>");
        Thread.sleep(2000);  // 2秒待機
        List<Order> orders = getRecentOrderData();
        log.info("<< OrderRepository");
        return CompletableFuture.completedFuture(orders);
    }
}

実際にこのServiceを実行してみると、

2018-07-22 22:03:33.613  INFO 11432 --- [    AsyncTask-2] c.g.a.s.repository.OrderRepository       : OrderRepository >>
2018-07-22 22:03:33.613  INFO 11432 --- [    AsyncTask-1] c.g.a.s.repository.ItemRepository        : ItemRepository >>
2018-07-22 22:03:34.615  INFO 11432 --- [    AsyncTask-1] c.g.a.s.repository.ItemRepository        : << ItemRepository
2018-07-22 22:03:35.613  INFO 11432 --- [    AsyncTask-2] c.g.a.s.repository.OrderRepository       : << OrderRepository
2018-07-22 22:03:35.613  INFO 11432 --- [nio-8080-exec-1] c.g.a.springsample.service.MainService   : MainService : merged.

OrderRepositoryとItemRepositoryがほぼ同時に実行され、ItemRepositoryが1秒後、OrderedRepositoryが2秒後に結果を返し、MainServiceはすべてのReositoryが返事を返した時間、つまり2秒後にその後の処理を実行されました。

その他必要な設定

@Asyncを使うときは、SpringBootの設定にて、@EnableAsyncを付与します。

SpringAnyncServiceApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class SpringAnyncServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringAnyncServiceApplication.class, args);
    }
}

また、CompletableFutureの同時スレッド数や上限を以下のように設定可能です。

AsyncConfiguration.java
import java.util.concurrent.Executor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class AsyncConfiguration {
    @Bean
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setQueueCapacity(2);
        executor.setMaxPoolSize(40);
        executor.setThreadNamePrefix("AsyncTask-");
        executor.initialize();
        return executor;
    }
}

余談

データベースを使った検索をする際に、1つの巨大なSQLで実現せずに複数のSQLに分割して実行する場合にもこの手法は非常に有効ですが、データソースの同時接続数を多くしないとあまり意味がなく、データベースの負荷は当然高まります。

参考資料・参考文献

https://spring.io/guides/gs/async-method/
blog.ik.am : 誤解しがちなThreadPoolTaskExecutorの設定

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
23
Help us understand the problem. What are the problem?