この記事は Tech-Circle Hands on Advent Calendar 2016 の 9 日目の記事です。
昨日は、@ykoga さんによる 10さいからはじめる「お絵かきロボット」づくり でした。
※ Akka Camel は Akka 2.5.x で非推奨になります
(2016/12/12 追記)
@xuwei_k さんからコメントを頂いて気づきました (ありがとうございます!)
Akka 2.5.x から Camel のコンポーネントを使いたい場合は、Akka Camel ではなく、Alpakka (Akka Streams) の Camel integration の利用が推奨されるようになります。
この記事を公開する 2 時間前にマージされてました (´・ω・`)
おそらく、Akka Streams の Back Pressure の機構を使うことで、より安全に Camel のコンポーネントを使えるようにしようということだと思います。(私見)
Alpakka (Akka Streams) での Camel のサポートについては、この記事の「Akka Streams で Camel を使えるか?」で少しだけ書いています。
はじめに
Akka のドキュメントを眺めていると、akka-camel
という見慣れないモジュールがあったので、少し調べてみました。
Akka Camel は Apache Camel のコンポーネントを Akka で扱うための拡張です。
この拡張を使うと、様々な外部システムとアクターでデータがやりとりできるようになります。
Apache Camel?
Apache Camel は、様々な外部システムとの連携を容易にするフレームワークです。様々なプロトコルや API をサポートするためのコンポーネントの一覧が ここ にあり、例えば、HTTP や Slack の Incomming Webhook に対応したコンポーネントがあります。ざっと一覧を数えると 約240種類 のプロトコルや API がサポートされているようです。(2016/12/09 時点)
基本的な概念や、(Akka と組み合わせない) Camel の使い方は下記の記事で詳しく解説されています。
Akka Camel の概要
Akka Camel の基本的な概念は、とりあえずこの 3 つを理解すれば大丈夫です。
CamelMessage
Consumer
Producer
CamelMessage
CamelMessage
は外部とのやりとりに使うメッセージです。メッセージの本文にあたる body
と、補足情報にあたる復数の header
を持ちます。HTTP のリクエストであれば、body
にはリクエストボディ、header
には HTTP メソッドやコンテンツタイプ などが入ります。
Consumer
メッセージの消費者。つまり、メッセージを外部から受信するアクターです。
Producer
直訳すると生産者ですが、実際にはこのアクター内でメッセージを作るコードは書かないので、メッセージの提供者とイメージしたほうがわかりやすいです。
メッセージを外部に送信するアクターです。
Hello World!
簡単なプログラムを書いてみます。
まずは、akka-camel
の依存を追加します。
libraryDependencies += "com.typesafe.akka" %% "akka-camel" % "2.4.14"
ここではお手軽に、標準入力/標準出力を扱う Camel のコンポーネントを使ってみます。
libraryDependencies += "org.apache.camel" % "camel-stream" % "2.18.0"
Consumer
外部からメッセージを受け取る、Consumer
のアクターを実装します。
標準入力からメッセージを受け取り、ログに出力するアクターを実装してみます。
class StdInConsumer extends Consumer with ActorLogging {
override def endpointUri = "stream:in"
override def receive = {
case msg: CamelMessage =>
log.info("Hello, {}!", msg.body)
}
}
Consumer
を継承すると、Camel のコンポーネントからメッセージを受け取るアクターになります。
endpointUri
をオーバーライドして、メッセージの入力元のエンドポイントを指定します。
エンドポイントの URI の書き方は Apache Camel のコンポーネントのドキュメントに書かれていて、コンポーネントの一覧 からリンクされています。ここで扱う Stream コンポーネントの URI の詳細は ここ で確認できます。
メッセージの受け取りは、通常のアクターと同じように receive
メソッドを定義します。
コンポーネントからは CamelMessage
というオブジェクトでメッセージが送られてきます。
このオブジェクトは、メッセージの header
と body
を持っており、body
に標準入力へ入力された文字列が入っています。
アクターの初期化は、通常のアクターと比べて、ひと手間必要です。
val system = ActorSystem()
val camel = CamelExtension(system)
val stdInConsumer = system.actorOf(Props[StdInConsumer])
camel.activationFutureFor(stdInConsumer)(
timeout = 10 seconds, executor = system.dispatcher
)
actorOf
でアクターを作るところまでは同じですが、CamelExtension
の activationFutureFor
を使ってアクターで定義したエンドポイントをアクティベートする必要があります。
では、動かしてみます。
$ sbt run
...
negokaz # 何か文字列を入力
INFO c.e.StdInConsumer: Hello, negokaz!
起動後に、何か文字列を入力するとログに Consumer
の receive
で定義したログが表示されます。
Producer
メッセージを外部に送信する、Producer
のアクターを作ってみます。
標準出力へデータを出力するアクターを作ってみます。
class StdOutProducer extends Producer with ActorLogging {
override def endpointUri = "stream:out"
}
Producer
を継承し、Consumer
と同じように endpointUri
をオーバーライドします。
あれ、receive
は? と一瞬思うのですが、Producer
では定義できなくなっていて、このアクターが受信したメッセージは全てエンドポイントに送信されます。なので、送信したくないメッセージのフィルターなどは、このアクターに送る前にしておく必要があります。
ただし、送信するメッセージの変換はできるようになっています。
class StdOutProducer extends Producer with ActorLogging {
override def endpointUri = "stream:out"
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: String => msg.toUpperCase
case msg => msg
}
}
transformOutgoingMessage
をオーバーライドすることで、変換処理を組み込めます。
この例では、文字列のメッセージが来た場合は大文字に変換したものをエンドポイントへ送信します。
さきほど作った StdInConsumer
と繋げてみましょう。
ログを出す代わりに StdOutProducer
へ文字列を送ります。
class StdInConsumer(stdOutProducer: ActorRef) extends Consumer with ActorLogging {
override def endpointUri = "stream:in"
override def receive = {
case msg: CamelMessage =>
stdOutProducer ! s"Hello, ${msg.body}!"
}
}
val system = ActorSystem()
val camel = CamelExtension(system)
val stdOutProducer = system.actorOf(Props[StdOutProducer])
camel.activationFutureFor(stdOutProducer)(
timeout = 10 seconds, executor = system.dispatcher
)
val stdInConsumer = system.actorOf(Props(classOf[StdInConsumer], stdOutProducer))
camel.activationFutureFor(stdInConsumer)(
timeout = 10 seconds, executor = system.dispatcher
)
できました。でもここで起動して、文字を入力するとコンソールが大変なことになります。
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
HELLO, NULL!
.
.
.
一体何が起きたのでしょうか?
実は、Producer
にメッセージを送ると、そのレスポンスとして CamelMessage
が返ってくるようになっています。(Akka Camel デフォルトの振る舞い)
StdInConsumer
で、StdOutProducer
のレスポンスの CamelMessage
を受けてメッセージ送り、またレスポンスを受けて… という感じで無限ループが発生します。
StdOutProducer
がレスポンスを返さないようにするには、Producer
の oneway
メソッドをオーバーライドして true
にします。
class StdOutProducer extends Producer with ActorLogging {
override def endpointUri = "stream:out"
override def oneway = true
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: String => msg.toUpperCase
case msg => msg
}
}
もしくは、Oneway
トレイトをミックスインしても同じ効果があります。
class StdOutProducer extends Producer with Oneway with ActorLogging {
override def endpointUri = "stream:out"
override def transformOutgoingMessage(msg: Any) = msg match {
case msg: String => msg.toUpperCase
case msg => msg
}
}
ざっと Consumer
と Producer
の使い方を確認できました。
ここで使ったソースコードは negokaz/akka-camel-sample - GitHub で公開しています。
注意点
全てのエンドポイントが Consumer
と Producer
になれるわけではない
Consumer
/Producer
では endpointUri
を指定しますが、エンドポイントには Consumer
に指定できないエンドポイントや、Producer
に指定できないエンドポイントが存在します。
例えば、Mail Component の smtp://
は Producer
のエンドポイントに指定できますが、Consumer
のエンドポイントには指定できないため、Consumer
に指定されていると実行時エラーが発生します。
Netty Component のエンドポイントのように、Consumer
と Producer
の両方に指定できるものもあります。
どちらに指定可能か、は各コンポーネントのドキュメントの説明を読む必要があります。一覧になってると便利だと思うのですが、見当たりませんでした…
preStart
をオーバーライドするときは super.preStart()
を呼ぶ
Consumer
や Produer
を継承したアクターで preStart
をオーバーライドする場合は super.preStart()
を呼ぶ必要があります。
Consumer
と Producer
の preStart()
には、エンドポイントをアクティベートするための処理が定義されており、オーバーライドしてこの処理が呼び出されないと、メッセージのやりとりができなくなります。
override def preStart(): Unit = {
println("Starting...")
super.preStart()
}
Akka Streams で Camel を使えるか?
Camel は連続的にメッセージを処理するので、ストリーム処理のためのフレームワークと言っていいと思いますが、今のところ公式で Akka Streams から Camel のコンポーネントを使うための仕組みは提供されていません。
現在、Akka Streams で Camel のように様々なプロトコルや API を扱えるようにすることを目指した Alpakka というプロジェクトが動いています。 (Akka Blog)
Alpakka の issue で Akka Streams から Camel のコンポーネントを使うための議論は行われています。
実験的な実装は既に存在し、下記のライブラリを使うと Akka Streams で Camel のコンポーネントを使うことは一応できるようです。(Back Pressure のサポートが不完全そう?)
さいごに
akka-camel
を使うと、Camel のコンポーネントを Akka から簡単に扱えることが確認できました。
多種多様なプロトコルや API が Camel でサポートされているので、Akka Camel を使うことで Akka で実装したシステムを様々な外部システムと簡単につなげられます。
ただ、エンドポイントで何らかの障害(ネットワーク障害など)があった場合に、Consumer
や Producer
がどう振る舞うのかなど、まだ不明なところがあるので、使う機会があれば更に詳しく検証してみたいと思います。
明日 10 日目は、山と VR に情熱を燃やす decchi さんです。お楽しみに!