spring
Akka

Spring DI を使った Akka Actor 生成

Spring Framework の DI(Dependency Injection) を使って Akka Actor を生成する方法です。生成イメージをつかみやすいように Java API で説明していますが、Scala でも同様です。

InjectedActorProducer

DI された Actor を生成するための akka.actor.IndirectActorProducer の実装クラスとして InjectedActorProducer を作成します。生成パラメータとして、ApplicationContext のインスタンスと生成したい Bean 名をコンストラクタ経由で内部に持っておき produce メソッドで Actor を生成します。

InjectedActorProducer.java
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;

public class InjectedActorProducer implements IndirectActorProducer {
  private final ApplicationContext applicationContext;
  private final String actorName;

  public InjectedActorProducer(ApplicationContext applicationContext, String actorName) {
    this.applicationContext = applicationContext;
    this.actorName = actorName;
  }

  @Override
  public Actor produce() {
    return applicationContext.getBean(actorClass, Actor.class);
  }

  @Override
  public Class<? extends Actor> actorClass() {
    return (Class<? extends Actor>) applicationContext.getType(actorName);
  }
}

Props.createActor クラスを渡すのと同じように、IndirectActorProducer の実装クラスとそのコンストラクタ引数を渡すことで、Props を生成できます。

ActorRef myActor = getContext().actorOf(
    Props.create(InjectedActorProducer.class, applicationContext, "myActor"));

Props の内部では、最終的に IndirectActorProducer のインスタンスを保持しており、生成する Actor の情報を持っています。Props を生成する際に、Props.create の第一引数が Actor クラスなら IndirectActorProducer にラップしてインスタンス生成され、IndirectActorProducer の実装クラスなら、そのままインスタンス生成されます。

インスタンス生成のスコープは prototype とします。障害発生時に Actor を再生成する必要があるため、singleton で同じインスタンスを再利用するのは問題があります。

@Component
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class MyActor extends UntypedActor {
  ...
}

IndirectActorProducer を直接使うことで、Actor 生成をカスタマイズできることはわかりましたが、ApplicationContext のインスタンスを持ちまわる必要がありますので、このままでは現実的ではありません。

InjectedActorExtension

Akka Extension を使うと、ActorSystem ごとに一度だけロードされる共有リソースを持つことができます。共有リソースとなる Extension は akka.actor.Extension の実装クラスを作成するだけです。実装すべきメソッドはなく、単に Extension であることを示すだけのインターフェイスです。

InjectedActorImpl.java
import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;

public class InjectedActorImpl implements Extension {
  private volatile ApplicationContext applicationContext;

  synchronized public void initialize(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
  }

  public Props props(String actorName) {
    return Props.create(InjectedActorProducer.class, applicationContext, actorName);
  }
}

上記の InjectedActorImplApplicationContext をインスタンス変数 applicationContext に保持しておき、先に作成した InjectedActorProducer を経由して DI された Actor の Props を生成する Extension になります。

Extension は ActorSystem ごとにロードされる共有リソースのため、スレッドセーフである必要があります。この場合は、複数スレッドで applicationContext が共有されますので、差し替えは synchronized で行い volatile で保持します。

この Extension を生成するための ExtensionId<T extends Extension> の実装クラス InjectedActorExtension を作成します。

InjectedActorExtension.java
import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;

public class InjectedActorExtension extends AbstractExtensionId<InjectedActorImpl> {

  public static InjectedActorExtension Provider = new InjectedActorExtension();

  @Override
  public InjectedActorImpl createExtension(ExtendedActorSystem system) {
    return new InjectedActorImpl();
  }
}

AbstractExtensionId<T extends Extension> を継承することで定義します。Provider として自身のインスタンスを static に public スコープで保持していますが、それぞれの Actor から静的に Extension を得られるようにするためです。

ActorSystem を生成したら InjectedActorExtension.Provider よりロードされている Extension を取り出して applicationContext を渡して初期化しておきます。

ApplicationContext applicationContext = ...;
ActorSystem system = ActorSystem.create();
InjectedActorExtension.Provider.get(system).initialize(applicationContext);

初期化以降は Extension に applicationContext が保持されていますので、props メソッド経由で取得した Props から DI された Actor を取得できます。

public class Application extends UntypedActor {
  private ActorRef myActor = getContext().actorOf(
      InjectedActorExtension.Provider.get(getContext().system()).props("myActor"));
  ...
}

Putting It All Together

整理すると、以下のモジュールを作成することになります。

  1. InjectedActorProducer
    • DI された Actor を生成する IndirectActorProducer の実装クラス
    • 障害発生による再起動時に Actor は再生成されるため prototype スコープで生成
  2. InjectedActorImpl
    • ApplicationContext を保持する Extension の実装クラス
    • Extension は ActorSystem ごとに一度だけロードされる共有リソース
  3. InjectedActorExtension
    • ExtensionId<? extends Extension> の実装クラス
    • ActorSystem にロードする Extension を生成
    • InjectedActorExtension.Provider に自身のインスタンスを static に持つ

これらを利用した例です。

TranslationService.java
public interface TranslationService {
  String translate(String source);
}
TranslationServiceImpl.java
import java.util.Map;

public class TranslationServiceImpl implements TranslationService {
  private final Map<String, String> translationMap;

  public TranslationServiceImpl(Map<String, String> translationMap) {
    this.translationMap = translationMap;
  }

  @Override
  public String translate(String source) {
    return translationMap.get(source);
  }
}
Translator.java
import akka.actor.UntypedActor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class Translator extends UntypedActor {

  private final TranslationService translationService;

  @Autowired
  public Translator(TranslationService translationService) {
    this.translationService = translationService;
  }

  @Override
  public void onReceive(Object msg) {
    String source = msg.toString();
    String translated = translationService.translate(source);
    getSender().tell(new Result(source, translated), getSelf());
  }

  public static class Result {
    public String source;
    public String translated;

    public Result(String source, String translated) {
      this.source = source;
      this.translated = translated;
    }

    @Override
    public String toString() {
      return source + " > " + translated;
    }
  }
}
Application.java
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class Application extends UntypedActor {

  private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);

  private ActorRef translator;

  @Override
  public void preStart() throws InterruptedException {
    translator = getContext().actorOf(InjectedActorExtension.Provider.get(
        getContext().system()).props("translator"));
    translator.tell("hello", getSelf());
  }

  @Override
  public void onReceive(Object message) {
    if (message instanceof Translator.Result) {
      logger.info(message.toString());
      getContext().stop(getSelf());       
      getContext().system().shutdown();
    } else {
      unhandled(message);
    }
  }
}
ApplicationConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@ComponentScan
public class ApplicationConfig {
  @Bean
  public TranslationService translationService() {
    Map<String, String> translationMap = new HashMap<String, String>();
    translationMap.put("hello", "こんにちは");
    return new TranslationServiceImpl(translationMap);
  }
}
Main.java
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

public class Main {
  public static void main(String[] args) {
    AbstractApplicationContext applicationContext =
        new AnnotationConfigApplicationContext(ApplicationConfig.class);
    applicationContext.registerShutdownHook();

    ActorSystem system = ActorSystem.create();
    InjectedActorExtension.Provider.get(system).initialize(applicationContext);

    system.actorOf(Props.create(Application.class));
  }
}

Links