Edited at

Akka Camel で外部システムとアクターを手軽につなぐ

More than 1 year has passed since last update.

この記事は Tech-Circle Hands on Advent Calendar 2016 の 9 日目の記事です。

昨日は、@ykoga さんによる 10さいからはじめる「お絵かきロボット」づくり でした。



※ Akka Camel は Akka 2.5.x で非推奨になります

(2016/12/12 追記)

@xuwei_k さんからコメントを頂いて気づきました (ありがとうございます!)

Akka 2.5.x から Camel のコンポーネントを使いたい場合は、Akka Camel ではなく、Alpakka (Akka Streams) の Camel integration の利用が推奨されるようになります。

この記事を公開する 2 時間前にマージされてました (´・ω・`)

おそらく、Akka Streams の Back Pressure の機構を使うことで、より安全に Camel のコンポーネントを使えるようにしようということだと思います。(私見)

Alpakka (Akka Streams) での Camel のサポートについては、この記事の「Akka Streams で Camel を使えるか?」で少しだけ書いています。


はじめに

Akka のドキュメントを眺めていると、akka-camelという見慣れないモジュールがあったので、少し調べてみました。

Akka CamelApache Camel のコンポーネントを Akka で扱うための拡張です。

この拡張を使うと、様々な外部システムとアクターでデータがやりとりできるようになります。


Apache Camel?

Apache Camel は、様々な外部システムとの連携を容易にするフレームワークです。様々なプロトコルや API をサポートするためのコンポーネントの一覧が ここ にあり、例えば、HTTPSlack の Incomming Webhook に対応したコンポーネントがあります。ざっと一覧を数えると 約240種類 のプロトコルや API がサポートされているようです。(2016/12/09 時点)

基本的な概念や、(Akka と組み合わせない) Camel の使い方は下記の記事で詳しく解説されています。


Akka Camel の概要

Akka Camel の基本的な概念は、とりあえずこの 3 つを理解すれば大丈夫です。


  • CamelMessage

  • Consumer

  • Producer


CamelMessage

CamelMessage は外部とのやりとりに使うメッセージです。メッセージの本文にあたる body と、補足情報にあたる復数の header を持ちます。HTTP のリクエストであれば、body にはリクエストボディ、header には HTTP メソッドやコンテンツタイプ などが入ります。


Consumer

メッセージの消費者。つまり、メッセージを外部から受信するアクターです。


Producer

直訳すると生産者ですが、実際にはこのアクター内でメッセージを作るコードは書かないので、メッセージの提供者とイメージしたほうがわかりやすいです。

メッセージを外部に送信するアクターです。


Hello World!

簡単なプログラムを書いてみます。

まずは、akka-camel の依存を追加します。


build.sbt

libraryDependencies += "com.typesafe.akka" %% "akka-camel" % "2.4.14"


ここではお手軽に、標準入力/標準出力を扱う Camel のコンポーネントを使ってみます。


build.sbt

libraryDependencies += "org.apache.camel" % "camel-stream" % "2.18.0"



Consumer

外部からメッセージを受け取る、Consumer のアクターを実装します。

標準入力からメッセージを受け取り、ログに出力するアクターを実装してみます。


StdInConsumer.scala

class StdInConsumer extends Consumer with ActorLogging {

override def endpointUri = "stream:in"

override def receive = {

case msg: CamelMessage =>
log.info("Hello, {}!", msg.body)
}
}


Consumer を継承すると、Camel のコンポーネントからメッセージを受け取るアクターになります。

endpointUri をオーバーライドして、メッセージの入力元のエンドポイントを指定します。

エンドポイントの URI の書き方は Apache Camel のコンポーネントのドキュメントに書かれていて、コンポーネントの一覧 からリンクされています。ここで扱う Stream コンポーネントの URI の詳細は ここ で確認できます。

メッセージの受け取りは、通常のアクターと同じように receive メソッドを定義します。

コンポーネントからは CamelMessage というオブジェクトでメッセージが送られてきます。

このオブジェクトは、メッセージの headerbody を持っており、body に標準入力へ入力された文字列が入っています。

アクターの初期化は、通常のアクターと比べて、ひと手間必要です。


ApplictionMain.scala

val system = ActorSystem()

val camel = CamelExtension(system)

val stdInConsumer = system.actorOf(Props[StdInConsumer])

camel.activationFutureFor(stdInConsumer)(
timeout = 10 seconds, executor = system.dispatcher
)


actorOf でアクターを作るところまでは同じですが、CamelExtensionactivationFutureFor を使ってアクターで定義したエンドポイントをアクティベートする必要があります。

では、動かしてみます。

$ sbt run

...
negokaz # 何か文字列を入力
INFO c.e.StdInConsumer: Hello, negokaz!

起動後に、何か文字列を入力するとログに Consumerreceive で定義したログが表示されます。


Producer

メッセージを外部に送信する、Producer のアクターを作ってみます。

標準出力へデータを出力するアクターを作ってみます。


StdOutProducer.scala

class StdOutProducer extends Producer with ActorLogging {

override def endpointUri = "stream:out"
}

Producer を継承し、Consumer と同じように endpointUri をオーバーライドします。

あれ、receive は? と一瞬思うのですが、Producer では定義できなくなっていて、このアクターが受信したメッセージは全てエンドポイントに送信されます。なので、送信したくないメッセージのフィルターなどは、このアクターに送る前にしておく必要があります。

ただし、送信するメッセージの変換はできるようになっています。

class StdOutProducer extends Producer with ActorLogging {

override def endpointUri = "stream:out"

override def transformOutgoingMessage(msg: Any) = msg match {
case msg: String => msg.toUpperCase
case msg => msg
}
}

transformOutgoingMessage をオーバーライドすることで、変換処理を組み込めます。

この例では、文字列のメッセージが来た場合は大文字に変換したものをエンドポイントへ送信します。

さきほど作った StdInConsumer と繋げてみましょう。

ログを出す代わりに StdOutProducer へ文字列を送ります。


StdInConsumer.class

class StdInConsumer(stdOutProducer: ActorRef) extends Consumer with ActorLogging {

override def endpointUri = "stream:in"

override def receive = {

case msg: CamelMessage =>
stdOutProducer ! s"Hello, ${msg.body}!"
}
}



ApplicationMain.scala

val system = ActorSystem()

val camel = CamelExtension(system)

val stdOutProducer = system.actorOf(Props[StdOutProducer])

camel.activationFutureFor(stdOutProducer)(
timeout = 10 seconds, executor = system.dispatcher
)

val stdInConsumer = system.actorOf(Props(classOf[StdInConsumer], stdOutProducer))

camel.activationFutureFor(stdInConsumer)(
timeout = 10 seconds, executor = system.dispatcher
)


できました。でもここで起動して、文字を入力するとコンソールが大変なことになります

HELLO, NULL!

HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
.
.
.

一体何が起きたのでしょうか?

実は、Producer にメッセージを送ると、そのレスポンスとして CamelMessage が返ってくるようになっています。(Akka Camel デフォルトの振る舞い)

StdInConsumer で、StdOutProducer のレスポンスの CamelMessage を受けてメッセージ送り、またレスポンスを受けて… という感じで無限ループが発生します。

StdOutProducer がレスポンスを返さないようにするには、Produceroneway メソッドをオーバーライドして true にします。


StdOutProducer.scala

class StdOutProducer extends Producer with ActorLogging {

override def endpointUri = "stream:out"

override def oneway = true

override def transformOutgoingMessage(msg: Any) = msg match {
case msg: String => msg.toUpperCase
case msg => msg
}
}


もしくは、Oneway トレイトをミックスインしても同じ効果があります。


StdOutProducer.scala

class StdOutProducer extends Producer with Oneway with ActorLogging {

override def endpointUri = "stream:out"

override def transformOutgoingMessage(msg: Any) = msg match {
case msg: String => msg.toUpperCase
case msg => msg
}
}


ざっと ConsumerProducer の使い方を確認できました。

ここで使ったソースコードは negokaz/akka-camel-sample - GitHub で公開しています。


注意点


全てのエンドポイントが ConsumerProducer になれるわけではない

Consumer/Producer では endpointUri を指定しますが、エンドポイントには Consumer に指定できないエンドポイントや、Producer に指定できないエンドポイントが存在します。

例えば、Mail Componentsmtp://Producer のエンドポイントに指定できますが、Consumer のエンドポイントには指定できないため、Consumer に指定されていると実行時エラーが発生します。

Netty Component のエンドポイントのように、ConsumerProducer の両方に指定できるものもあります。

どちらに指定可能か、は各コンポーネントのドキュメントの説明を読む必要があります。一覧になってると便利だと思うのですが、見当たりませんでした…


preStart をオーバーライドするときは super.preStart() を呼ぶ

ConsumerProduer を継承したアクターで preStart をオーバーライドする場合は super.preStart() を呼ぶ必要があります。

ConsumerProducerpreStart() には、エンドポイントをアクティベートするための処理が定義されており、オーバーライドしてこの処理が呼び出されないと、メッセージのやりとりができなくなります。

override def preStart(): Unit = {

println("Starting...")
super.preStart()
}


Akka Streams で Camel を使えるか?

Camel は連続的にメッセージを処理するので、ストリーム処理のためのフレームワークと言っていいと思いますが、今のところ公式で Akka Streams から Camel のコンポーネントを使うための仕組みは提供されていません。

現在、Akka Streams で Camel のように様々なプロトコルや API を扱えるようにすることを目指した Alpakka というプロジェクトが動いています。 (Akka Blog)

Alpakka の issue で Akka Streams から Camel のコンポーネントを使うための議論は行われています。

実験的な実装は既に存在し、下記のライブラリを使うと Akka Streams で Camel のコンポーネントを使うことは一応できるようです。(Back Pressure のサポートが不完全そう?)


さいごに

akka-camel を使うと、Camel のコンポーネントを Akka から簡単に扱えることが確認できました。

多種多様なプロトコルや API が Camel でサポートされているので、Akka Camel を使うことで Akka で実装したシステムを様々な外部システムと簡単につなげられます。

ただ、エンドポイントで何らかの障害(ネットワーク障害など)があった場合に、ConsumerProducer がどう振る舞うのかなど、まだ不明なところがあるので、使う機会があれば更に詳しく検証してみたいと思います。

明日 10 日目は、山と VR に情熱を燃やす decchi さんです。お楽しみに!