ざっくり
- MQにメッセージを投げる時にロードバランスできるとよさ気じゃん?うぇぇい。
- Camelを使ってルートのロードバランシングしてみよう。
- RoundRobinとか楽勝だったからオレオレルールでロードバランスしてみよう。
- custom load balancing 動かないよ。camel-scalaバグってるんじゃないか?
- ごめん。やっぱりできた
準備運動がてらRoundRobin
loadbalance.roundrobinの式で、実行したいtoを列挙すればOK
これだけで、direct:loadbalance-roundrobinルートが起動するたびに、順繰りにworker:a, b, cが呼ばれる。
"direct:loadbalance-roundrobin" ==> {
loadbalance.roundrobin {
-->("direct:worker-a", "mock:a")
-->("direct:worker-b", "mock:b")
-->("direct:worker-c", "mock:c")
}
}
-->
の引数を2個にしているのは、単体テスト書くときにmock:x
に対してassertをかけるのに簡単にするため。
※テストコードはgit-hubへpush
重み付けしたい場合は、こんな感じにすれば良いよ。
- loadbalance.roundrobin {
+ loadbalance.weighted(true, "2:1:1", ":") {
なら、オレオレルールのLoadBalandingも簡単でしょ!?
custom load balancing
先ほどのroundrobin
の箇所をcustom("オレオレロードバランシングの名前")
とすればOK。
ただ、ハマリ箇所があった。
先に答えを全文記述するならこんな感じ。
package jp.den3umegumi.experimental.camel.route
import org.apache.camel.component.mock.MockEndpoint._
import org.apache.camel.impl.{DefaultCamelContext, JndiRegistry}
import org.apache.camel.processor.loadbalancer.LoadBalancerSupport
import org.apache.camel.scala.dsl.builder.{RouteBuilderSupport, ScalaRouteBuilder}
import org.apache.camel.test.junit4.CamelTestSupport
import org.apache.camel.{AsyncCallback, Exchange}
import org.junit.Test
class CustomLoadBalancingRouteBuilderTest extends CamelTestSupport with RouteBuilderSupport {
val loadBalancingName = "customLoadBalance"
override protected def createRegistry: JndiRegistry = {
val reg = super.createRegistry
// CustomLoadBalancingをセット
reg.bind(loadBalancingName, new MyCustomLoadBalancing)
reg
}
override def createRouteBuilder() = new ScalaRouteBuilder(new DefaultCamelContext()) {
"direct:custom-load-balance" ==> {
loadbalance.custom(loadBalancingName) {
-->("mock:a")
-->("mock:b")
-->("mock:c")
}
}
}
@Test
def test_customLoadBalancer {
val mockA = getMockEndpoint("mock:a")
val mockB = getMockEndpoint("mock:b")
val mockC = getMockEndpoint("mock:c")
val b1 = "one"
mockA.expectedBodiesReceived(b1)
// mock:b mock:cは呼ばれない
expectsMessageCount(0, mockB, mockC)
template.sendBodyAndHeader("direct:custom-load-balance", b1, "foo", "bar");
assertMockEndpointsSatisfied
mockA.reset
val b2 = "two"
mockB.expectedBodiesReceived(b2)
// mock:a mock:cは呼ばれない
expectsMessageCount(0, mockA, mockC)
template.sendBodyAndHeader("direct:custom-load-balance", b2, "foo", "bar");
assertMockEndpointsSatisfied
mockB.reset
val b3 = "three"
mockC.expectedBodiesReceived(b3)
// mock:a mock:bは呼ばれない
expectsMessageCount(0, mockA, mockB)
template.sendBodyAndHeader("direct:custom-load-balance", b3, "foo", "bar");
assertMockEndpointsSatisfied
mockC.reset
}
}
class MyCustomLoadBalancing extends LoadBalancerSupport {
override def process(e: Exchange, callback: AsyncCallback): Boolean = {
Option(e.getIn.getBody(classOf[String])) match {
case Some("one") => getProcessors.get(0).process(e)
case Some("two") => getProcessors.get(1).process(e)
case _ => getProcessors.get(2).process(e)
}
callback.done(true)
true
}
}
重要なのはこれ↓
override protected def createRegistry: JndiRegistry = {
val reg = super.createRegistry
// CustomLoadBalancingをセット
reg.bind(loadBlancingName, new MyCustomLoadBalancing)
reg
}
registoryにオレオレルールを記載した org.apache.camel.processor.loadbalancer.Loadbalancer
を継承したインスタンスを登録してやる必要がある。
・・・って、・・・
これテストクラスなんですけど・・・
ということで、Mainから実行したい場合はこんな感じに書けば良い。
package jp.den3umegumi.experimental.camel.launch
import jp.den3umegumi.experimental.camel.route._
import org.apache.camel.impl.SimpleRegistry
import org.apache.camel.main.Main
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
import org.slf4j.LoggerFactory
object RouteBuilderLauncher extends App with RouteBuilderSupport {
private val logger = LoggerFactory.getLogger(getClass)
val main = new Main
main.enableHangupSupport;
// registoryにカスタムロードバランサーを追加
main.bind("customLoadBalance", new LoadBalancingRule)
main.addRouteBuilder(new LoadbalanceRouteBuilder)
logger.debug("camel routing start")
main.run
}
ハマった経緯(検索用)
JavaのSample
Camelの本家にあるjavaのサンプルだと以下のように直接オレオレルールを引数に渡せる。
from("direct:start")
// using our custom load balancer
.loadBalance(new MyLoadBalancer())
.to("mock:x", "mock:y", "mock:z");
しかし、camel-scalaを用いた場合、インスタンスを引数にとれない。
引数に文字列を渡すという事は、クラスの完全修飾名かな?
と思いつつ、試行錯誤。
Contextに設定する。
camal-scalaのコードを追いかけると、オレオレルールのインスタンスは context.getRegistry().lookupByNameAndType(name, beanType);
こんな感じでContextから取得している
ならば、RouteBuilderの基底クラスとなるScalaRouteBuilder
のコンストラクタ引数にContextをとるから、そこで設定してあげればいいのかな。。
ということで、↓のようなコードを書いた。
val registory = new SimpleRegistory()
registory.put("customLoadBalance", new LoadBalancingRule)
val ctx = new DefaultCamelContext(registory)
main.addRouteBuilder(new LoadbalanceRouteBuilder(ctx))
けど、↓のような例外が出てしまって、Repository に Beanが登録されない模様。。
Caused by: org.apache.camel.NoSuchBeanException: No bean could be found in the registry for: customLoadBalance of type: org.apache.camel.processor.loadbalancer.LoadBalancer
at org.apache.camel.util.CamelContextHelper.mandatoryLookup(CamelContextHelper.java:159)
at org.apache.camel.model.loadbalancer.CustomLoadBalancerDefinition.createLoadBalancer(CustomLoadBalancerDefinition.java:59)
at org.apache.camel.model.LoadBalancerDefinition.getLoadBalancer(LoadBalancerDefinition.java:90)
at org.apache.camel.model.LoadBalancerDefinition.getLoadBalancer(LoadBalancerDefinition.java:67)
at org.apache.camel.model.LoadBalanceDefinition.createProcessor(LoadBalanceDefinition.java:142)
at org.apache.camel.model.ProcessorDefinition.makeProcessor(ProcessorDefinition.java:505)
at org.apache.camel.model.ProcessorDefinition.addRoutes(ProcessorDefinition.java:217)
at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:1025)
(たぶん、このページに辿り着く人の一部はこの例外分で検索かけてくる人なのかなと。。)
んで、さらにコードを読み進んで行くと先に説明した様な感じで書くことがわかった。
はい。めでたしめでたし。
次は、Failover辺りを調べたいかなー。
git-hubのコード