8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

camel-scalaでオレオレルールのロードバランシングをする

Last updated at Posted at 2015-08-26

ざっくり

  • MQにメッセージを投げる時にロードバランスできるとよさ気じゃん?うぇぇい。
  • Camelを使ってルートのロードバランシングしてみよう。
  • RoundRobinとか楽勝だったからオレオレルールでロードバランスしてみよう。
  • custom load balancing 動かないよ。camel-scalaバグってるんじゃないか?
  • ごめん。やっぱりできた

準備運動がてらRoundRobin

loadbalance.roundrobinの式で、実行したいtoを列挙すればOK
これだけで、direct:loadbalance-roundrobinルートが起動するたびに、順繰りにworker:a, b, cが呼ばれる。

  "direct:loadbalance-roundrobin" ==> {
    loadbalance.roundrobin {
      -->("direct:worker-a", "mock:a")
      -->("direct:worker-b", "mock:b")
      -->("direct:worker-c", "mock:c")
    }
  }

 -->の引数を2個にしているのは、単体テスト書くときにmock:xに対してassertをかけるのに簡単にするため。
※テストコードはgit-hubへpush

重み付けしたい場合は、こんな感じにすれば良いよ。

-     loadbalance.roundrobin {
+     loadbalance.weighted(true, "2:1:1", ":") {

なら、オレオレルールのLoadBalandingも簡単でしょ!?

custom load balancing

先ほどのroundrobinの箇所をcustom("オレオレロードバランシングの名前")とすればOK。

ただ、ハマリ箇所があった。
先に答えを全文記述するならこんな感じ。

CustomLoadBalancingRouteBuilderTest.scala
package jp.den3umegumi.experimental.camel.route

import org.apache.camel.component.mock.MockEndpoint._
import org.apache.camel.impl.{DefaultCamelContext, JndiRegistry}
import org.apache.camel.processor.loadbalancer.LoadBalancerSupport
import org.apache.camel.scala.dsl.builder.{RouteBuilderSupport, ScalaRouteBuilder}
import org.apache.camel.test.junit4.CamelTestSupport
import org.apache.camel.{AsyncCallback, Exchange}
import org.junit.Test

class CustomLoadBalancingRouteBuilderTest extends CamelTestSupport with RouteBuilderSupport {

  val loadBalancingName = "customLoadBalance"

  override protected def createRegistry: JndiRegistry = {
    val reg = super.createRegistry
    // CustomLoadBalancingをセット
    reg.bind(loadBalancingName, new MyCustomLoadBalancing)
    reg
  }

  override def createRouteBuilder() = new ScalaRouteBuilder(new DefaultCamelContext()) {
    "direct:custom-load-balance" ==> {
      loadbalance.custom(loadBalancingName) {
        -->("mock:a")
        -->("mock:b")
        -->("mock:c")
      }
    }
  }

  @Test
  def test_customLoadBalancer {
    val mockA = getMockEndpoint("mock:a")
    val mockB = getMockEndpoint("mock:b")
    val mockC = getMockEndpoint("mock:c")

    val b1 = "one"
    mockA.expectedBodiesReceived(b1)
    // mock:b mock:cは呼ばれない
    expectsMessageCount(0, mockB, mockC)
    template.sendBodyAndHeader("direct:custom-load-balance", b1, "foo", "bar");
    assertMockEndpointsSatisfied
    mockA.reset

    val b2 = "two"
    mockB.expectedBodiesReceived(b2)
    // mock:a mock:cは呼ばれない
    expectsMessageCount(0, mockA, mockC)
    template.sendBodyAndHeader("direct:custom-load-balance", b2, "foo", "bar");
    assertMockEndpointsSatisfied
    mockB.reset

    val b3 = "three"
    mockC.expectedBodiesReceived(b3)
    // mock:a mock:bは呼ばれない
    expectsMessageCount(0, mockA, mockB)
    template.sendBodyAndHeader("direct:custom-load-balance", b3, "foo", "bar");
    assertMockEndpointsSatisfied
    mockC.reset

  }
}

class MyCustomLoadBalancing extends LoadBalancerSupport {

  override def process(e: Exchange, callback: AsyncCallback): Boolean = {
    Option(e.getIn.getBody(classOf[String])) match {
      case Some("one") => getProcessors.get(0).process(e)
      case Some("two") => getProcessors.get(1).process(e)
      case _ => getProcessors.get(2).process(e)
    }
    callback.done(true)
    true
  }
}

重要なのはこれ↓

  override protected def createRegistry: JndiRegistry = {
    val reg = super.createRegistry
    // CustomLoadBalancingをセット
    reg.bind(loadBlancingName, new MyCustomLoadBalancing)
    reg
  }

registoryにオレオレルールを記載した org.apache.camel.processor.loadbalancer.Loadbalancerを継承したインスタンスを登録してやる必要がある。

・・・って、・・・
これテストクラスなんですけど・・・

ということで、Mainから実行したい場合はこんな感じに書けば良い。

package jp.den3umegumi.experimental.camel.launch

import jp.den3umegumi.experimental.camel.route._
import org.apache.camel.impl.SimpleRegistry
import org.apache.camel.main.Main
import org.apache.camel.scala.dsl.builder.RouteBuilderSupport
import org.slf4j.LoggerFactory

object RouteBuilderLauncher extends App with RouteBuilderSupport {

  private val logger = LoggerFactory.getLogger(getClass)

  val main = new Main
  main.enableHangupSupport;
  // registoryにカスタムロードバランサーを追加
  main.bind("customLoadBalance", new LoadBalancingRule)
  main.addRouteBuilder(new LoadbalanceRouteBuilder)

  logger.debug("camel routing start")
  main.run
}

ハマった経緯(検索用)

JavaのSample

Camelの本家にあるjavaのサンプルだと以下のように直接オレオレルールを引数に渡せる。

from("direct:start")
    // using our custom load balancer
    .loadBalance(new MyLoadBalancer())
    .to("mock:x", "mock:y", "mock:z");

しかし、camel-scalaを用いた場合、インスタンスを引数にとれない。
引数に文字列を渡すという事は、クラスの完全修飾名かな?
と思いつつ、試行錯誤。

Contextに設定する。

camal-scalaのコードを追いかけると、オレオレルールのインスタンスは context.getRegistry().lookupByNameAndType(name, beanType);こんな感じでContextから取得している

ならば、RouteBuilderの基底クラスとなるScalaRouteBuilderのコンストラクタ引数にContextをとるから、そこで設定してあげればいいのかな。。

ということで、↓のようなコードを書いた。

  val registory = new SimpleRegistory()
  registory.put("customLoadBalance", new LoadBalancingRule)
  val ctx = new DefaultCamelContext(registory)
  main.addRouteBuilder(new LoadbalanceRouteBuilder(ctx))

けど、↓のような例外が出てしまって、Repository に Beanが登録されない模様。。

Caused by: org.apache.camel.NoSuchBeanException: No bean could be found in the registry for: customLoadBalance of type: org.apache.camel.processor.loadbalancer.LoadBalancer
	at org.apache.camel.util.CamelContextHelper.mandatoryLookup(CamelContextHelper.java:159)
	at org.apache.camel.model.loadbalancer.CustomLoadBalancerDefinition.createLoadBalancer(CustomLoadBalancerDefinition.java:59)
	at org.apache.camel.model.LoadBalancerDefinition.getLoadBalancer(LoadBalancerDefinition.java:90)
	at org.apache.camel.model.LoadBalancerDefinition.getLoadBalancer(LoadBalancerDefinition.java:67)
	at org.apache.camel.model.LoadBalanceDefinition.createProcessor(LoadBalanceDefinition.java:142)
	at org.apache.camel.model.ProcessorDefinition.makeProcessor(ProcessorDefinition.java:505)
	at org.apache.camel.model.ProcessorDefinition.addRoutes(ProcessorDefinition.java:217)
	at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:1025)

(たぶん、このページに辿り着く人の一部はこの例外分で検索かけてくる人なのかなと。。)

んで、さらにコードを読み進んで行くと先に説明した様な感じで書くことがわかった。

はい。めでたしめでたし。

次は、Failover辺りを調べたいかなー。

git-hubのコード

8
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?