Spring Framework の DI(Dependency Injection) を使って Akka Actor を生成する方法です。生成イメージをつかみやすいように Java API で説明していますが、Scala でも同様です。
InjectedActorProducer
DI された Actor を生成するための akka.actor.IndirectActorProducer
の実装クラスとして InjectedActorProducer
を作成します。生成パラメータとして、ApplicationContext のインスタンスと生成したい Bean 名をコンストラクタ経由で内部に持っておき produce
メソッドで Actor を生成します。
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;
public class InjectedActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorName;
public InjectedActorProducer(ApplicationContext applicationContext, String actorName) {
this.applicationContext = applicationContext;
this.actorName = actorName;
}
@Override
public Actor produce() {
return applicationContext.getBean(actorClass, Actor.class);
}
@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorName);
}
}
Props.create
で Actor
クラスを渡すのと同じように、IndirectActorProducer
の実装クラスとそのコンストラクタ引数を渡すことで、Props
を生成できます。
ActorRef myActor = getContext().actorOf(
Props.create(InjectedActorProducer.class, applicationContext, "myActor"));
Props
の内部では、最終的に IndirectActorProducer
のインスタンスを保持しており、生成する Actor の情報を持っています。Props
を生成する際に、Props.create
の第一引数が Actor
クラスなら IndirectActorProducer
にラップしてインスタンス生成され、IndirectActorProducer
の実装クラスなら、そのままインスタンス生成されます。
インスタンス生成のスコープは prototype
とします。障害発生時に Actor を再生成する必要があるため、singleton
で同じインスタンスを再利用するのは問題があります。
@Component
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class MyActor extends UntypedActor {
...
}
IndirectActorProducer
を直接使うことで、Actor 生成をカスタマイズできることはわかりましたが、ApplicationContext
のインスタンスを持ちまわる必要がありますので、このままでは現実的ではありません。
InjectedActorExtension
Akka Extension を使うと、ActorSystem ごとに一度だけロードされる共有リソースを持つことができます。共有リソースとなる Extension は akka.actor.Extension
の実装クラスを作成するだけです。実装すべきメソッドはなく、単に Extension であることを示すだけのインターフェイスです。
import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;
public class InjectedActorImpl implements Extension {
private volatile ApplicationContext applicationContext;
synchronized public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public Props props(String actorName) {
return Props.create(InjectedActorProducer.class, applicationContext, actorName);
}
}
上記の InjectedActorImpl
は ApplicationContext
をインスタンス変数 applicationContext
に保持しておき、先に作成した InjectedActorProducer
を経由して DI された Actor の Props
を生成する Extension になります。
Extension は ActorSystem ごとにロードされる共有リソースのため、スレッドセーフである必要があります。この場合は、複数スレッドで applicationContext
が共有されますので、差し替えは synchronized で行い volatile で保持します。
この Extension を生成するための ExtensionId<T extends Extension>
の実装クラス InjectedActorExtension
を作成します。
import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;
public class InjectedActorExtension extends AbstractExtensionId<InjectedActorImpl> {
public static InjectedActorExtension Provider = new InjectedActorExtension();
@Override
public InjectedActorImpl createExtension(ExtendedActorSystem system) {
return new InjectedActorImpl();
}
}
AbstractExtensionId<T extends Extension>
を継承することで定義します。Provider
として自身のインスタンスを static に public スコープで保持していますが、それぞれの Actor から静的に Extension を得られるようにするためです。
ActorSystem
を生成したら InjectedActorExtension.Provider
よりロードされている Extension を取り出して applicationContext を渡して初期化しておきます。
ApplicationContext applicationContext = ...;
ActorSystem system = ActorSystem.create();
InjectedActorExtension.Provider.get(system).initialize(applicationContext);
初期化以降は Extension に applicationContext が保持されていますので、props
メソッド経由で取得した Props
から DI された Actor を取得できます。
public class Application extends UntypedActor {
private ActorRef myActor = getContext().actorOf(
InjectedActorExtension.Provider.get(getContext().system()).props("myActor"));
...
}
Putting It All Together
整理すると、以下のモジュールを作成することになります。
InjectedActorProducer
- DI された Actor を生成する
IndirectActorProducer
の実装クラス - 障害発生による再起動時に Actor は再生成されるため prototype スコープで生成
InjectedActorImpl
-
ApplicationContext
を保持する Extension の実装クラス - Extension は
ActorSystem
ごとに一度だけロードされる共有リソース
InjectedActorExtension
-
ExtensionId<? extends Extension>
の実装クラス -
ActorSystem
にロードする Extension を生成 -
InjectedActorExtension.Provider
に自身のインスタンスを static に持つ
これらを利用した例です。
public interface TranslationService {
String translate(String source);
}
import java.util.Map;
public class TranslationServiceImpl implements TranslationService {
private final Map<String, String> translationMap;
public TranslationServiceImpl(Map<String, String> translationMap) {
this.translationMap = translationMap;
}
@Override
public String translate(String source) {
return translationMap.get(source);
}
}
import akka.actor.UntypedActor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class Translator extends UntypedActor {
private final TranslationService translationService;
@Autowired
public Translator(TranslationService translationService) {
this.translationService = translationService;
}
@Override
public void onReceive(Object msg) {
String source = msg.toString();
String translated = translationService.translate(source);
getSender().tell(new Result(source, translated), getSelf());
}
public static class Result {
public String source;
public String translated;
public Result(String source, String translated) {
this.source = source;
this.translated = translated;
}
@Override
public String toString() {
return source + " > " + translated;
}
}
}
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class Application extends UntypedActor {
private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
private ActorRef translator;
@Override
public void preStart() throws InterruptedException {
translator = getContext().actorOf(InjectedActorExtension.Provider.get(
getContext().system()).props("translator"));
translator.tell("hello", getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof Translator.Result) {
logger.info(message.toString());
getContext().stop(getSelf());
getContext().system().shutdown();
} else {
unhandled(message);
}
}
}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@ComponentScan
public class ApplicationConfig {
@Bean
public TranslationService translationService() {
Map<String, String> translationMap = new HashMap<String, String>();
translationMap.put("hello", "こんにちは");
return new TranslationServiceImpl(translationMap);
}
}
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
public class Main {
public static void main(String[] args) {
AbstractApplicationContext applicationContext =
new AnnotationConfigApplicationContext(ApplicationConfig.class);
applicationContext.registerShutdownHook();
ActorSystem system = ActorSystem.create();
InjectedActorExtension.Provider.get(system).initialize(applicationContext);
system.actorOf(Props.create(Application.class));
}
}