1月15日開催!現年収非公開で企業からスカウトをもらってみませんか?PR

転職ドラフトでリアルな市場価値を測る。レジュメをもとに、企業から年収とミッションが提示されます。

3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spring DI を使った Akka Actor 生成

Posted at

Spring Framework の DI(Dependency Injection) を使って Akka Actor を生成する方法です。生成イメージをつかみやすいように Java API で説明していますが、Scala でも同様です。

InjectedActorProducer

DI された Actor を生成するための akka.actor.IndirectActorProducer の実装クラスとして InjectedActorProducer を作成します。生成パラメータとして、ApplicationContext のインスタンスと生成したい Bean 名をコンストラクタ経由で内部に持っておき produce メソッドで Actor を生成します。

InjectedActorProducer.java
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;

public class InjectedActorProducer implements IndirectActorProducer {
  private final ApplicationContext applicationContext;
  private final String actorName;

  public InjectedActorProducer(ApplicationContext applicationContext, String actorName) {
    this.applicationContext = applicationContext;
    this.actorName = actorName;
  }

  @Override
  public Actor produce() {
    return applicationContext.getBean(actorClass, Actor.class);
  }

  @Override
  public Class<? extends Actor> actorClass() {
    return (Class<? extends Actor>) applicationContext.getType(actorName);
  }
}

Props.createActor クラスを渡すのと同じように、IndirectActorProducer の実装クラスとそのコンストラクタ引数を渡すことで、Props を生成できます。

ActorRef myActor = getContext().actorOf(
    Props.create(InjectedActorProducer.class, applicationContext, "myActor"));

Props の内部では、最終的に IndirectActorProducer のインスタンスを保持しており、生成する Actor の情報を持っています。Props を生成する際に、Props.create の第一引数が Actor クラスなら IndirectActorProducer にラップしてインスタンス生成され、IndirectActorProducer の実装クラスなら、そのままインスタンス生成されます。

インスタンス生成のスコープは prototype とします。障害発生時に Actor を再生成する必要があるため、singleton で同じインスタンスを再利用するのは問題があります。

@Component
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class MyActor extends UntypedActor {
  ...
}

IndirectActorProducer を直接使うことで、Actor 生成をカスタマイズできることはわかりましたが、ApplicationContext のインスタンスを持ちまわる必要がありますので、このままでは現実的ではありません。

InjectedActorExtension

Akka Extension を使うと、ActorSystem ごとに一度だけロードされる共有リソースを持つことができます。共有リソースとなる Extension は akka.actor.Extension の実装クラスを作成するだけです。実装すべきメソッドはなく、単に Extension であることを示すだけのインターフェイスです。

InjectedActorImpl.java
import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;

public class InjectedActorImpl implements Extension {
  private volatile ApplicationContext applicationContext;

  synchronized public void initialize(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
  }

  public Props props(String actorName) {
    return Props.create(InjectedActorProducer.class, applicationContext, actorName);
  }
}

上記の InjectedActorImplApplicationContext をインスタンス変数 applicationContext に保持しておき、先に作成した InjectedActorProducer を経由して DI された Actor の Props を生成する Extension になります。

Extension は ActorSystem ごとにロードされる共有リソースのため、スレッドセーフである必要があります。この場合は、複数スレッドで applicationContext が共有されますので、差し替えは synchronized で行い volatile で保持します。

この Extension を生成するための ExtensionId<T extends Extension> の実装クラス InjectedActorExtension を作成します。

InjectedActorExtension.java
import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;

public class InjectedActorExtension extends AbstractExtensionId<InjectedActorImpl> {

  public static InjectedActorExtension Provider = new InjectedActorExtension();

  @Override
  public InjectedActorImpl createExtension(ExtendedActorSystem system) {
    return new InjectedActorImpl();
  }
}

AbstractExtensionId<T extends Extension> を継承することで定義します。Provider として自身のインスタンスを static に public スコープで保持していますが、それぞれの Actor から静的に Extension を得られるようにするためです。

ActorSystem を生成したら InjectedActorExtension.Provider よりロードされている Extension を取り出して applicationContext を渡して初期化しておきます。

ApplicationContext applicationContext = ...;
ActorSystem system = ActorSystem.create();
InjectedActorExtension.Provider.get(system).initialize(applicationContext);

初期化以降は Extension に applicationContext が保持されていますので、props メソッド経由で取得した Props から DI された Actor を取得できます。

public class Application extends UntypedActor {
  private ActorRef myActor = getContext().actorOf(
      InjectedActorExtension.Provider.get(getContext().system()).props("myActor"));
  ...
}

Putting It All Together

整理すると、以下のモジュールを作成することになります。

  1. InjectedActorProducer
  • DI された Actor を生成する IndirectActorProducer の実装クラス
  • 障害発生による再起動時に Actor は再生成されるため prototype スコープで生成
  1. InjectedActorImpl
  • ApplicationContext を保持する Extension の実装クラス
  • Extension は ActorSystem ごとに一度だけロードされる共有リソース
  1. InjectedActorExtension
  • ExtensionId<? extends Extension> の実装クラス
  • ActorSystem にロードする Extension を生成
  • InjectedActorExtension.Provider に自身のインスタンスを static に持つ

これらを利用した例です。

TranslationService.java
public interface TranslationService {
  String translate(String source);
}
TranslationServiceImpl.java
import java.util.Map;

public class TranslationServiceImpl implements TranslationService {
  private final Map<String, String> translationMap;

  public TranslationServiceImpl(Map<String, String> translationMap) {
    this.translationMap = translationMap;
  }

  @Override
  public String translate(String source) {
    return translationMap.get(source);
  }
}
Translator.java
import akka.actor.UntypedActor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Component
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public class Translator extends UntypedActor {

  private final TranslationService translationService;

  @Autowired
  public Translator(TranslationService translationService) {
    this.translationService = translationService;
  }

  @Override
  public void onReceive(Object msg) {
    String source = msg.toString();
    String translated = translationService.translate(source);
    getSender().tell(new Result(source, translated), getSelf());
  }

  public static class Result {
    public String source;
    public String translated;

    public Result(String source, String translated) {
      this.source = source;
      this.translated = translated;
    }

    @Override
    public String toString() {
      return source + " > " + translated;
    }
  }
}
Application.java
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class Application extends UntypedActor {

  private LoggingAdapter logger = Logging.getLogger(getContext().system(), this);

  private ActorRef translator;

  @Override
  public void preStart() throws InterruptedException {
    translator = getContext().actorOf(InjectedActorExtension.Provider.get(
        getContext().system()).props("translator"));
    translator.tell("hello", getSelf());
  }

  @Override
  public void onReceive(Object message) {
    if (message instanceof Translator.Result) {
      logger.info(message.toString());
      getContext().stop(getSelf());       
      getContext().system().shutdown();
    } else {
      unhandled(message);
    }
  }
}
ApplicationConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@ComponentScan
public class ApplicationConfig {
  @Bean
  public TranslationService translationService() {
    Map<String, String> translationMap = new HashMap<String, String>();
    translationMap.put("hello", "こんにちは");
    return new TranslationServiceImpl(translationMap);
  }
}
Main.java
import akka.actor.ActorSystem;
import akka.actor.Props;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

public class Main {
  public static void main(String[] args) {
    AbstractApplicationContext applicationContext =
        new AnnotationConfigApplicationContext(ApplicationConfig.class);
    applicationContext.registerShutdownHook();

    ActorSystem system = ActorSystem.create();
    InjectedActorExtension.Provider.get(system).initialize(applicationContext);

    system.actorOf(Props.create(Application.class));
  }
}

Links

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?