Posted at

Spring DI を使った Akka Actor 生成

More than 1 year has passed since last update.

Spring Framework の DI(Dependency Injection) を使って Akka Actor を生成する方法です。生成イメージをつかみやすいように Java API で説明していますが、Scala でも同様です。


InjectedActorProducer

DI された Actor を生成するための akka.actor.IndirectActorProducer の実装クラスとして InjectedActorProducer を作成します。生成パラメータとして、ApplicationContext のインスタンスと生成したい Bean 名をコンストラクタ経由で内部に持っておき produce メソッドで Actor を生成します。


InjectedActorProducer.java

import akka.actor.Actor;

import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;

public class InjectedActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorName;

public InjectedActorProducer(ApplicationContext applicationContext, String actorName) {
this.applicationContext = applicationContext;
this.actorName = actorName;
}

@Override
public Actor produce() {
return applicationContext.getBean(actorClass, Actor.class);
}

@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorName);
}
}


Props.createActor クラスを渡すのと同じように、IndirectActorProducer の実装クラスとそのコンストラクタ引数を渡すことで、Props を生成できます。

ActorRef myActor = getContext().actorOf(

Props.create(InjectedActorProducer.class, applicationContext, "myActor"));

Props の内部では、最終的に IndirectActorProducer のインスタンスを保持しており、生成する Actor の情報を持っています。Props を生成する際に、Props.create の第一引数が Actor クラスなら IndirectActorProducer にラップしてインスタンス生成され、IndirectActorProducer の実装クラスなら、そのままインスタンス生成されます。

インスタンス生成のスコープは prototype とします。障害発生時に Actor を再生成する必要があるため、singleton で同じインスタンスを再利用するのは問題があります。

@Component

@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class MyActor extends UntypedActor {
...
}

IndirectActorProducer を直接使うことで、Actor 生成をカスタマイズできることはわかりましたが、ApplicationContext のインスタンスを持ちまわる必要がありますので、このままでは現実的ではありません。


InjectedActorExtension

Akka Extension を使うと、ActorSystem ごとに一度だけロードされる共有リソースを持つことができます。共有リソースとなる Extension は akka.actor.Extension の実装クラスを作成するだけです。実装すべきメソッドはなく、単に Extension であることを示すだけのインターフェイスです。


InjectedActorImpl.java

import akka.actor.Extension;

import akka.actor.Props;
import org.springframework.context.ApplicationContext;

public class InjectedActorImpl implements Extension {
private volatile ApplicationContext applicationContext;

synchronized public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

public Props props(String actorName) {
return Props.create(InjectedActorProducer.class, applicationContext, actorName);
}
}


上記の InjectedActorImplApplicationContext をインスタンス変数 applicationContext に保持しておき、先に作成した InjectedActorProducer を経由して DI された Actor の Props を生成する Extension になります。

Extension は ActorSystem ごとにロードされる共有リソースのため、スレッドセーフである必要があります。この場合は、複数スレッドで applicationContext が共有されますので、差し替えは synchronized で行い volatile で保持します。

この Extension を生成するための ExtensionId<T extends Extension> の実装クラス InjectedActorExtension を作成します。


InjectedActorExtension.java

import akka.actor.AbstractExtensionId;

import akka.actor.ExtendedActorSystem;

public class InjectedActorExtension extends AbstractExtensionId<InjectedActorImpl> {

public static InjectedActorExtension Provider = new InjectedActorExtension();

@Override
public InjectedActorImpl createExtension(ExtendedActorSystem system) {
return new InjectedActorImpl();
}
}


AbstractExtensionId<T extends Extension> を継承することで定義します。Provider として自身のインスタンスを static に public スコープで保持していますが、それぞれの Actor から静的に Extension を得られるようにするためです。

ActorSystem を生成したら InjectedActorExtension.Provider よりロードされている Extension を取り出して applicationContext を渡して初期化しておきます。

ApplicationContext applicationContext = ...;

ActorSystem system = ActorSystem.create();
InjectedActorExtension.Provider.get(system).initialize(applicationContext);

初期化以降は Extension に applicationContext が保持されていますので、props メソッド経由で取得した Props から DI された Actor を取得できます。

public class Application extends UntypedActor {

private ActorRef myActor = getContext().actorOf(
InjectedActorExtension.Provider.get(getContext().system()).props("myActor"));
...
}


Putting It All Together

整理すると、以下のモジュールを作成することになります。



  1. InjectedActorProducer


    • DI された Actor を生成する IndirectActorProducer の実装クラス

    • 障害発生による再起動時に Actor は再生成されるため prototype スコープで生成




  2. InjectedActorImpl



    • ApplicationContext を保持する Extension の実装クラス

    • Extension は ActorSystem ごとに一度だけロードされる共有リソース




  3. InjectedActorExtension



    • ExtensionId<? extends Extension> の実装クラス


    • ActorSystem にロードする Extension を生成


    • InjectedActorExtension.Provider に自身のインスタンスを static に持つ



これらを利用した例です。


TranslationService.java

public interface TranslationService {

String translate(String source);
}


TranslationServiceImpl.java

import java.util.Map;

public class TranslationServiceImpl implements TranslationService {
private final Map<String, String> translationMap;

public TranslationServiceImpl(Map<String, String> translationMap) {
this.translationMap = translationMap;
}

@Override
public String translate(String source) {
return translationMap.get(source);
}
}



Translator.java

import akka.actor.UntypedActor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class Translator extends UntypedActor {

private final TranslationService translationService;

@Autowired
public Translator(TranslationService translationService) {
this.translationService = translationService;
}

@Override
public void onReceive(Object msg) {
String source = msg.toString();
String translated = translationService.translate(source);
getSender().tell(new Result(source, translated), getSelf());
}

public static class Result {
public String source;
public String translated;

public Result(String source, String translated) {
this.source = source;
this.translated = translated;
}

@Override
public String toString() {
return source + " > " + translated;
}
}
}



Application.java

import akka.actor.ActorRef;

import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class Application extends UntypedActor {

private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);

private ActorRef translator;

@Override
public void preStart() throws InterruptedException {
translator = getContext().actorOf(InjectedActorExtension.Provider.get(
getContext().system()).props("translator"));
translator.tell("hello", getSelf());
}

@Override
public void onReceive(Object message) {
if (message instanceof Translator.Result) {
logger.info(message.toString());
getContext().stop(getSelf());
getContext().system().shutdown();
} else {
unhandled(message);
}
}
}



ApplicationConfig.java

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@ComponentScan
public class ApplicationConfig {
@Bean
public TranslationService translationService() {
Map<String, String> translationMap = new HashMap<String, String>();
translationMap.put("hello", "こんにちは");
return new TranslationServiceImpl(translationMap);
}
}



Main.java

import akka.actor.ActorSystem;

import akka.actor.Props;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

public class Main {
public static void main(String[] args) {
AbstractApplicationContext applicationContext =
new AnnotationConfigApplicationContext(ApplicationConfig.class);
applicationContext.registerShutdownHook();

ActorSystem system = ActorSystem.create();
InjectedActorExtension.Provider.get(system).initialize(applicationContext);

system.actorOf(Props.create(Application.class));
}
}



Links