やりたいこと
今回はこういった処理が点在してて、例えばこれを全てMQ経由にする場合の話です。
@Autowired
TooHeavyProcessor processor;
public void doSomething() {
// do something..
// too heavy process in async.
async(() -> processor.process(...));
}
前準備
Rabbit MQ
$ brew update
$ brew install rabbitmq
$ brew services start rabbitmq
Spring Boot
- Lombok
- AMQP
- Web
Maven
依存性を追加。
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
</dependency>
パラメータ名が必要になるのでコンパイラーオプションを追加。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgument>-parameters</compilerArgument>
<fork>true</fork>
</configuration>
</plugin>
大まかな変更内容
処理内容
変更前
[Heavy process]@any
変更後
[Heavy process]@any -> (queue) -> [Heavy process]@consumer
各クラス
インターフェース
public interface TooHeavyProcessor {
void action(User user, Item item, Date date);
void process(String key, Date date);
}
実装クラス
デフォルト実装
このクラスをMQのProducer側で動いているのでConsumer側に持っていきたい。
@Slf4j(topic = "結果")
public class DefaultTooHeavyProcessor implements TooHeavyProcessor {
@Override
public void action(User user, Item item, Date date) {
// 実行結果を確かめるためにロギング
log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
FORMATTER.format(date));
}
@Override
public void process(String key, Date date) {
// 実行結果を確かめるためにロギング
log.info("process for {} at {}", key, FORMATTER.format(date));
}
}
呼び出し自体をメッセージングしたい。
ということで、思いついたのがSpEL(Spring Expression Language)。
processor.action(user, item, date);
これを
String el = "@processor.action(#user, #item, #date)";
Map<String, Object> params = new LinkedHashMap<>();
params.add("user", user);
params.add("item", item);
params.add("date", date);
ELExpression message = new ELExpression(el, params);
// MQに送信
amqpMessageTemplate.convertAndSend("el.exchange", null, message);
public void receive(ELMessage message) {
// SpELメッセージをパース
}
という形にしたらConsumer側で実行可能になるはず。
MQ送信クラス実装
TooHeavyProcessor
が呼び出された際にDefaultTooHeavyProcessor
を実行するのではなく、SpELを作ってそれをMQに送信するような実装クラスを作る。
呼び出された際に動的な挙動が求められるのでProxy
で実装する。
Proxy
作成方法は次のような方法。
import org.springframework.util.ClassUtils;
private String beanName = "processor";
private String Class<?> type = TooHeavyProcessor.class;
@Autowired
private AmqpTemplate messageTemplate;
Proxy.newProxyInstance(ClassUtils.getDefaultClassLoader(),
new Class[] { this.type }, (bean, method, args) -> {
// SpEL生成
// beanNameとmethodとargsからELExpressionを生成するユーティリティメソッド
ELExpression el = SpelUtils.createELExpression(this.beanName, method, args);
// MQに送信
this.messageTemplate.convertAndSend("el.exchange", null, el);
return null;
});
これをBean
登録したいので、FactoryBean
で生成する
@Component
public class MessagingTooHeavyProcessorProxyFactoryBean implements FactoryBean<TooHeavyProcessor>, InitializingBean {
private final String beanName = "processor";
private final Class<TooHeavyProcessor> type = TooHeavyProcessor.class;
private TooHeavyProcessor proxy;
@Autowired
private AmqpTemplate messageTemplate;
@Override
public void afterPropertiesSet() {
this.proxy = (TooHeavyProcessor) /* proxy作成 */;
}
@Override
public TooHeavyProcessor getObject() {
return this.proxy;
}
@Override
public Class<?> getObjectType() {
return this.type;
}
@Override
public boolean isSingleton() {
return true;
}
}
MQ Receiverクラス
@Component
public class ELReceiver {
/** SpELを解析して実行するクラス */
@Autowired
private ELExecutor elExecutor;
// ExchangeとQueueを定義してRabbit側に無ければ作成する
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "el.queue", durable = "true"),
exchange = @Exchange(value = "el.exchange", type = ExchangeTypes.FANOUT, durable = "true")))
public void receive(ELExpression el) {
// SpELを解析して実行
this.elExecutor.execute(el);
}
}
SpELの解析と実行は公式のドキュメントに詳しく記載があるので割愛します。
StandardEvaluationContext
の作り方だけ。
@Autowired
private ApplicationContext appContext;
private StandardEvaluationContext createContext(Map<String, Object> args) {
StandardEvaluationContext context = new StandardEvaluationContext();
// 変数を登録
args.forEach((key, value) -> context.setVariable(key, value));
// Bean名からBeanを解決できるようにBeanResolverを登録
context.setBeanResolver(
(_self, beanName) -> this.appContext.getBean(beanName));
return context;
}
この時、MessagingTooHeavyProcessorProxyFactoryBean
でBean名をprocessor
として指定しているので、ちゃんとBean名を合わせる。
それとプロファイルでBean名を切り替えたいからプロファイル指定しておく。
// "messaging"プロファイル指定時のみBean登録する
@Profile("messaging")
@Component("processor")
public class MessagingTooHeavyProcessorProxyFactoryBean
// "default"プロファイル指定時のみBean登録する
@Profile("default")
@Component("processor")
public class DefaultTooHeavyProcessor
動作確認準備
Web API
トリガー用にWeb APIの口を用意しておく。
// Producer側だけ必要
@Profile("producer")
@RestController
public class ELDemoController {
@Autowired
private TooHeavyProcessor processor;
@GetMapping("/action")
public ActionResult action() {
User user = this.randomUser();
Item item = this.randomItem();
Date date = this.randomDate();
this.processor.action(user, item, this.randomDate());
return new ActionResult(user, item, date);
}
@GetMapping("/process")
public ProcessResult process() {
String key = RandomStringUtils.randomAlphanumeric(10);
Date date = this.randomDate();
this.processor.process(key, date);
return new ProcessResult(key, date);
}
// privateメソッド実装
}
ロギング
@Slf4j(topic = "送信側")
public class MessagingTooHeavyProcessorProxyFactoryBean {
// ... //
@Override
public void afterPropertiesSet() {
// ... //
log.info("{}", StringUtils.toString(el));
this.messageTemplate.convertAndSend("el.exchange", null, el);
}
// ... //
}
// Consumer側だけ必要
@Profile("consumer")
@Slf4j(topic = "受信側")
public class ELReceiver {
// ... //
public void receive(ELExpression el) {
log.info("{}", el);
this.elExecutor.execute(el);
}
// ... //
}
@Slf4j(topic = "結果")
public class DefaultTooHeavyProcessor {
private static final FastDateFormat FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void action(User user, Item item, Date date) {
log.info("{} actions to {} at {}!!", StringUtils.toString(user), StringUtils.toString(item),
FORMATTER.format(date));
}
@Override
public void process(String key, Date date) {
log.info("process for {} at {}", key, FORMATTER.format(date));
}
}
アプリケーションプロパティ設定
spring:
jackson:
# 見やすいようにJSON日時のフォーマット
date-format: yyyy-MM-dd HH:mm:ss.SSS
---
spring:
profiles: producer
server:
port: 8080
---
spring:
profiles: consumer
server:
port: 8181
動作確認
通常起動
最初の重たい処理をそのまま実行するだけの動作を確認する。
$ mvn spring-boot:run -Dspring.profiles.active="default,producer"
$ curl http://localhost:8080/process --silent | jq "."
{
"key": "39iYWmzDpn",
"date": "2017-07-03 00:05:31.211"
}
起動したアプリケーションで次のログが出ます。
[結果] process for 39iYWmzDpn at 2017-07-03 00:05:31.211
これを次はプロファイルを変えてMQのConsumer側で実行させます。
MQ経由の処理をさせるための起動
RabbitMQサーバ起動
$ brew services start rabbitmq
==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)
管理画面確認
http://localhost:15672/
にアクセス
ExchangeとQueue確認
まだ何もない。
Receiver側のアプリケーション起動
$ mvn spring-boot:run -Dspring.profiles.active="consumer,default"
ExchangeとQueue確認
Spring-Bootの場合はRabbitAdmin
のBean登録は自動でされるらしい。
Consumer側のアプリケーション起動
次はdefault
ではなくmessaging
プロファイルで起動する。
$ mvn spring-boot:run -Dspring.profiles.active="messaging,producer"
確認
$ curl http://localhost:8080/process --silent | jq "."
{
"key": "ar9VXblj5S",
"date": "2017-08-27 11:55:29.319"
}
[送信側] ELExpression:{
"expression" : "@processor.process(#key, #date)",
"args" : {
"key" : "ar9VXblj5S",
"date" : "2017-08-27 11:55:29.319"
}
}
[受信側] ELExpression:{
"expression" : "@processor.process(#key, #date)",
"args" : {
"key" : "ar9VXblj5S",
"date" : "2017-08-27 11:55:29.319"
}
}
[結果] process for ar9VXblj5S at 2017-08-27 11:55:29.319
まとめ
SpEL便利。
アプリケーションはGitHubに上げてます。