LoginSignup
1
3

More than 5 years have passed since last update.

コードの変更なしにMQ経由の非同期処理に変更してみる

Last updated at Posted at 2017-02-05

やりたいこと

今回はこういった処理が点在してて、例えばこれを全てMQ経由にする場合の話です。

@Autowired
TooHeavyProcessor processor;

public void doSomething() {
  // do something..

  // too heavy process in async.
  async(() -> processor.process(...));
}

前準備

Rabbit MQ

Installing on Homebrew

$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq 

Spring Boot

Start with Spring Initializr

  • Lombok
  • AMQP
  • Web

Maven

依存性を追加。

pom.xml
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-lang3</artifactId>
  <version>3.5</version>
</dependency>
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-collections4</artifactId>
  <version>4.1</version>
</dependency>

パラメータ名が必要になるのでコンパイラーオプションを追加。

pom.xml
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <configuration>
    <compilerArgument>-parameters</compilerArgument>
    <fork>true</fork>
  </configuration>
</plugin>

大まかな変更内容

処理内容

変更前

[Heavy process]@any

変更後

[Heavy process]@any -> (queue) -> [Heavy process]@consumer

各クラス

インターフェース

TooHeavyProcessor.java
public interface TooHeavyProcessor {

  void action(User user, Item item, Date date);

  void process(String key, Date date);
}

実装クラス

デフォルト実装

このクラスをMQのProducer側で動いているのでConsumer側に持っていきたい。

DefaultTooHeavyProcessor.java
@Slf4j(topic = "結果")
public class DefaultTooHeavyProcessor implements TooHeavyProcessor {

  @Override
  public void action(User user, Item item, Date date) {
    // 実行結果を確かめるためにロギング
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
      FORMATTER.format(date));
  }

  @Override
  public void process(String key, Date date) {
    // 実行結果を確かめるためにロギング
    log.info("process for {} at {}", key, FORMATTER.format(date));
  }

}

呼び出し自体をメッセージングしたい。
ということで、思いついたのがSpEL(Spring Expression Language)。

processor.action(user, item, date);

これを

producer
String el = "@processor.action(#user, #item, #date)";
Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);

ELExpression message = new ELExpression(el, params);

// MQに送信
amqpMessageTemplate.convertAndSend("el.exchange", null, message);
consumer
public void receive(ELMessage message) {
  // SpELメッセージをパース
}

という形にしたらConsumer側で実行可能になるはず。

MQ送信クラス実装

TooHeavyProcessorが呼び出された際にDefaultTooHeavyProcessorを実行するのではなく、SpELを作ってそれをMQに送信するような実装クラスを作る。

呼び出された際に動的な挙動が求められるのでProxyで実装する。
Proxy作成方法は次のような方法。

Proxy
import org.springframework.util.ClassUtils;

private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;

@Autowired
private AmqpTemplate messageTemplate;

Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),
  new Class[] { this.type }, (bean, method, args) -> {
    // SpEL生成
    // beanNameとmethodとargsからELExpressionを生成するユーティリティメソッド
    ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
    // MQに送信
    this.messageTemplate.convertAndSend("el.exchange", null, el);
    return null;
  });

これをBean登録したいので、FactoryBeanで生成する

MessagingTooHeavyProcessorProxyFactoryBean.java
@Component
public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {

  private final String beanName = "processor";
  private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;

  private TooHeavyProcessor proxy;

  @Autowired
  private AmqpTemplate messageTemplate;

  @Override
  public void afterPropertiesSet() {
    this.proxy = (TooHeavyProcessor) /* proxy作成 */;
  }

  @Override
  public TooHeavyProcessor getObject() {
    return this.proxy;
  }

  @Override
  public Class<?> getObjectType() {
    return this.type;
  }

  @Override
  public boolean isSingleton() {
    return true;
  }
}

MQ Receiverクラス

ELReceiver
@Component
public class ELReceiver {

  /** SpELを解析して実行するクラス */
  @Autowired
  private ELExecutor elExecutor;

  // ExchangeとQueueを定義してRabbit側に無ければ作成する
  @RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "el.queue", durable = "true"),
    exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
  public void receive(ELExpression el) {
    // SpELを解析して実行
    this.elExecutor.execute(el);
  }
}

SpELの解析と実行は公式のドキュメントに詳しく記載があるので割愛します。

StandardEvaluationContextの作り方だけ。

ELExecutor.java
@Autowired
private ApplicationContext appContext;

private StandardEvaluationContext createContext(Map<String, Object> args) {
  StandardEvaluationContext context = new StandardEvaluationContext();

  // 変数を登録
  args.forEach((key, value) -> context.setVariable(key, value));
  // Bean名からBeanを解決できるようにBeanResolverを登録
  context.setBeanResolver(
    (_self, beanName) -> this.appContext.getBean(beanName));

  return context;
}

この時、MessagingTooHeavyProcessorProxyFactoryBeanでBean名をprocessorとして指定しているので、ちゃんとBean名を合わせる。
それとプロファイルでBean名を切り替えたいからプロファイル指定しておく。

MessagingTooHeavyProcessorProxyFactoryBean.java
// "messaging"プロファイル指定時のみBean登録する
@Profile("messaging")
@Component("processor")
public class MessagingTooHeavyProcessorProxyFactoryBean
DefaultTooHeavyProcessor.java
// "default"プロファイル指定時のみBean登録する
@Profile("default")
@Component("processor")
public class DefaultTooHeavyProcessor

動作確認準備

Web API

トリガー用にWeb APIの口を用意しておく。

ELDemoController.java
// Producer側だけ必要
@Profile("producer")
@RestController
public class ELDemoController {

  @Autowired
  private TooHeavyProcessor processor;

  @GetMapping("/action")
  public ActionResult action() {
    User user = this.randomUser();
    Item item = this.randomItem();
    Date date = this.randomDate();

    this.processor.action(user, item, this.randomDate());

    return new ActionResult(user, item, date);
  }

  @GetMapping("/process")
  public ProcessResult process() {
    String key = RandomStringUtils.randomAlphanumeric(10);
    Date date = this.randomDate();

    this.processor.process(key, date);

    return new ProcessResult(key, date);
  }

  // privateメソッド実装
}

ロギング

MessagingTooHeavyProcessorProxyFactoryBean.java
@Slf4j(topic = "送信側")
public class MessagingTooHeavyProcessorProxyFactoryBean {
  // ... //
  @Override
  public void afterPropertiesSet() {
    // ... //
    log.info("{}", StringUtils.toString(el));
    this.messageTemplate.convertAndSend("el.exchange", null, el);
  }
  // ... //
}
ELReceiver.java
// Consumer側だけ必要
@Profile("consumer")
@Slf4j(topic = "受信側")
public class ELReceiver {
  // ... //
  public void receive(ELExpression el) {
    log.info("{}", el);
    this.elExecutor.execute(el);
  }
  // ... //
}
DefaultTooHeavyProcessor.java
@Slf4j(topic = "結果")
public class DefaultTooHeavyProcessor {
  private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");

  @Override
  public void action(User user, Item item, Date date) {
    log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
      FORMATTER.format(date));
  }

  @Override
  public void process(String key, Date date) {
    log.info("process for {} at {}", key, FORMATTER.format(date));
  }
}

アプリケーションプロパティ設定

application.yaml
spring:
  jackson:
    # 見やすいようにJSON日時のフォーマット
    date-format: yyyy-MM-dd HH:mm:ss.SSS

---
spring:
  profiles: producer
server:
    port: 8080

---
spring:
  profiles: consumer
server:
    port: 8181

動作確認

通常起動

最初の重たい処理をそのまま実行するだけの動作を確認する。

$ mvn spring-boot:run -Dspring.profiles.active="default,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
  "key": "39iYWmzDpn",
  "date": "2017-07-03 00:05:31.211"
}

起動したアプリケーションで次のログが出ます。

[結果] process for 39iYWmzDpn at 2017-07-03 00:05:31.211

これを次はプロファイルを変えてMQのConsumer側で実行させます。

MQ経由の処理をさせるための起動

RabbitMQサーバ起動

$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

管理画面確認

http://localhost:15672/にアクセス

スクリーンショット 2017-02-05 17.20.28.png

ExchangeとQueue確認

まだ何もない。

スクリーンショット 2017-02-05 17.22.51.png

スクリーンショット 2017-02-05 17.24.13.png

Receiver側のアプリケーション起動

$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"

ExchangeとQueue確認

Spring-Bootの場合はRabbitAdminのBean登録は自動でされるらしい。

Exchange
スクリーンショット 2017-02-05 17.38.00.png

Queue
スクリーンショット 2017-02-05 17.37.04.png

Consumer側のアプリケーション起動

次はdefaultではなくmessagingプロファイルで起動する。

$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"

確認

$ curl http://localhost:8080/process --silent | jq "."
{
  "key": "ar9VXblj5S",
  "date": "2017-08-27 11:55:29.319"
}
送信側のログ
[送信側] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
  }
}
受信側のログ
[受信側] ELExpression:{
  "expression" : "@processor.process(#key, #date)",
  "args" : {
    "key" : "ar9VXblj5S",
    "date" : "2017-08-27 11:55:29.319"
  }
}
[結果] process for ar9VXblj5S at 2017-08-27 11:55:29.319

まとめ

SpEL便利。
アプリケーションはGitHubに上げてます。

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3