Edited at

コードの変更なしにMQ経由の非同期処理に変更してみる

More than 1 year has passed since last update.


やりたいこと

今回はこういった処理が点在してて、例えばこれを全てMQ経由にする場合の話です。

@Autowired

TooHeavyProcessor processor;

public void doSomething() {
// do something..

// too heavy process in async.
async(() -> processor.process(...));
}


前準備


Rabbit MQ

Installing on Homebrew

$ brew update

$ brew install rabbitmq
$ brew services start rabbitmq


Spring Boot

Start with Spring Initializr


  • Lombok

  • AMQP

  • Web


Maven

依存性を追加。


pom.xml

<dependency>

<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>

パラメータ名が必要になるのでコンパイラーオプションを追加。


pom.xml

<plugin>

<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgument>-parameters</compilerArgument>
<fork>true</fork>
</configuration>
</plugin>


大まかな変更内容


処理内容


変更前

[Heavy process]@any


変更後

[Heavy process]@any -> (queue) -> [Heavy process]@consumer


各クラス


インターフェース


TooHeavyProcessor.java

public interface TooHeavyProcessor {

void action(User user, Item item, Date date);

void process(String key, Date date);
}



実装クラス


デフォルト実装

このクラスをMQのProducer側で動いているのでConsumer側に持っていきたい。


DefaultTooHeavyProcessor.java

@Slf4j(topic = "結果")

public class DefaultTooHeavyProcessor implements TooHeavyProcessor {

@Override
public void action(User user, Item item, Date date) {
// 実行結果を確かめるためにロギング
log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
FORMATTER.format(date));
}

@Override
public void process(String key, Date date) {
// 実行結果を確かめるためにロギング
log.info("process for {} at {}", key, FORMATTER.format(date));
}

}


呼び出し自体をメッセージングしたい。

ということで、思いついたのがSpEL(Spring Expression Language)。

processor.action(user, item, date);

これを


producer

String el = "@processor.action(#user, #item, #date)";

Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);

ELExpression message = new ELExpression(el, params);

// MQに送信
amqpMessageTemplate.convertAndSend("el.exchange", null, message);



consumer

public void receive(ELMessage message) {

// SpELメッセージをパース
}

という形にしたらConsumer側で実行可能になるはず。


MQ送信クラス実装

TooHeavyProcessorが呼び出された際にDefaultTooHeavyProcessorを実行するのではなく、SpELを作ってそれをMQに送信するような実装クラスを作る。

呼び出された際に動的な挙動が求められるのでProxyで実装する。

Proxy作成方法は次のような方法。


Proxy

import org.springframework.util.ClassUtils;

private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;

@Autowired
private AmqpTemplate messageTemplate;

Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),
new Class[] { this.type }, (bean, method, args) -> {
// SpEL生成
// beanNameとmethodとargsからELExpressionを生成するユーティリティメソッド
ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
// MQに送信
this.messageTemplate.convertAndSend("el.exchange", null, el);
return null;
});


これをBean登録したいので、FactoryBeanで生成する


MessagingTooHeavyProcessorProxyFactoryBean.java

@Component

public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {

private final String beanName = "processor";
private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;

private TooHeavyProcessor proxy;

@Autowired
private AmqpTemplate messageTemplate;

@Override
public void afterPropertiesSet() {
this.proxy = (TooHeavyProcessor) /* proxy作成 */;
}

@Override
public TooHeavyProcessor getObject() {
return this.proxy;
}

@Override
public Class<?> getObjectType() {
return this.type;
}

@Override
public boolean isSingleton() {
return true;
}
}



MQ Receiverクラス


ELReceiver

@Component

public class ELReceiver {

/** SpELを解析して実行するクラス */
@Autowired
private ELExecutor elExecutor;

// ExchangeとQueueを定義してRabbit側に無ければ作成する
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "el.queue", durable = "true"),
exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
public void receive(ELExpression el) {
// SpELを解析して実行
this.elExecutor.execute(el);
}
}


SpELの解析と実行は公式のドキュメントに詳しく記載があるので割愛します。

StandardEvaluationContextの作り方だけ。


ELExecutor.java

@Autowired

private ApplicationContext appContext;

private StandardEvaluationContext createContext(Map<String, Object> args) {
StandardEvaluationContext context = new StandardEvaluationContext();

// 変数を登録
args.forEach((key, value) -> context.setVariable(key, value));
// Bean名からBeanを解決できるようにBeanResolverを登録
context.setBeanResolver(
(_self, beanName) -> this.appContext.getBean(beanName));

return context;
}


この時、MessagingTooHeavyProcessorProxyFactoryBeanでBean名をprocessorとして指定しているので、ちゃんとBean名を合わせる。

それとプロファイルでBean名を切り替えたいからプロファイル指定しておく。


MessagingTooHeavyProcessorProxyFactoryBean.java

// "messaging"プロファイル指定時のみBean登録する

@Profile("messaging")
@Component("processor")
public class MessagingTooHeavyProcessorProxyFactoryBean


DefaultTooHeavyProcessor.java

// "default"プロファイル指定時のみBean登録する

@Profile("default")
@Component("processor")
public class DefaultTooHeavyProcessor


動作確認準備


Web API

トリガー用にWeb APIの口を用意しておく。


ELDemoController.java

// Producer側だけ必要

@Profile("producer")
@RestController
public class ELDemoController {

@Autowired
private TooHeavyProcessor processor;

@GetMapping("/action")
public ActionResult action() {
User user = this.randomUser();
Item item = this.randomItem();
Date date = this.randomDate();

this.processor.action(user, item, this.randomDate());

return new ActionResult(user, item, date);
}

@GetMapping("/process")
public ProcessResult process() {
String key = RandomStringUtils.randomAlphanumeric(10);
Date date = this.randomDate();

this.processor.process(key, date);

return new ProcessResult(key, date);
}

// privateメソッド実装
}



ロギング


MessagingTooHeavyProcessorProxyFactoryBean.java

@Slf4j(topic = "送信側")

public class MessagingTooHeavyProcessorProxyFactoryBean {
// ... //
@Override
public void afterPropertiesSet() {
// ... //
log.info("{}", StringUtils.toString(el));
this.messageTemplate.convertAndSend("el.exchange", null, el);
}
// ... //
}


ELReceiver.java

// Consumer側だけ必要

@Profile("consumer")
@Slf4j(topic = "受信側")
public class ELReceiver {
// ... //
public void receive(ELExpression el) {
log.info("{}", el);
this.elExecutor.execute(el);
}
// ... //
}


DefaultTooHeavyProcessor.java

@Slf4j(topic = "結果")

public class DefaultTooHeavyProcessor {
private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");

@Override
public void action(User user, Item item, Date date) {
log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
FORMATTER.format(date));
}

@Override
public void process(String key, Date date) {
log.info("process for {} at {}", key, FORMATTER.format(date));
}
}



アプリケーションプロパティ設定


application.yaml

spring:

jackson:
# 見やすいようにJSON日時のフォーマット
date-format: yyyy-MM-dd HH:mm:ss.SSS

---
spring:
profiles: producer
server:
port: 8080

---
spring:
profiles: consumer
server:
port: 8181



動作確認


通常起動

最初の重たい処理をそのまま実行するだけの動作を確認する。

$ mvn spring-boot:run -Dspring.profiles.active="default,producer"

$ curl http://localhost:8080/process --silent | jq "."

{
"key": "39iYWmzDpn",
"date": "2017-07-03 00:05:31.211"
}

起動したアプリケーションで次のログが出ます。

[結果] process for 39iYWmzDpn at 2017-07-03 00:05:31.211

これを次はプロファイルを変えてMQのConsumer側で実行させます。


MQ経由の処理をさせるための起動


RabbitMQサーバ起動

$ brew services start rabbitmq

==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)


管理画面確認

http://localhost:15672/にアクセス

スクリーンショット 2017-02-05 17.20.28.png


ExchangeとQueue確認

まだ何もない。

スクリーンショット 2017-02-05 17.22.51.png

スクリーンショット 2017-02-05 17.24.13.png


Receiver側のアプリケーション起動

$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"


ExchangeとQueue確認

Spring-Bootの場合はRabbitAdminのBean登録は自動でされるらしい。

Exchange

スクリーンショット 2017-02-05 17.38.00.png

Queue

スクリーンショット 2017-02-05 17.37.04.png


Consumer側のアプリケーション起動

次はdefaultではなくmessagingプロファイルで起動する。

$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"


確認

$ curl http://localhost:8080/process --silent | jq "."

{
"key": "ar9VXblj5S",
"date": "2017-08-27 11:55:29.319"
}


送信側のログ

[送信側] ELExpression:{

"expression" : "@processor.process(#key, #date)",
"args" : {
"key" : "ar9VXblj5S",
"date" : "2017-08-27 11:55:29.319"
}
}


受信側のログ

[受信側] ELExpression:{

"expression" : "@processor.process(#key, #date)",
"args" : {
"key" : "ar9VXblj5S",
"date" : "2017-08-27 11:55:29.319"
}
}
[結果] process for ar9VXblj5S at 2017-08-27 11:55:29.319


まとめ

SpEL便利。

アプリケーションはGitHubに上げてます。