image.png


This name come from 家紋?(family symbol in Japanese)

image.png


In this LT,

  • I'm sorry for talking in Japanese
  • sample code are in github
  • Welcome questions & comments.

Who am I

  • Name: uryyyyyyy(Koki Shibata)
  • Company: Opt, Inc.
  • Skill: Spark/TypeScript/Kubernetes/React Native etc...

image.png


Contents

  • What is Kamon?
  • simple plugin for Kamon
    • collect metrics data
    • report metrics data
  • more practical plugin
    • collect akka-actor mailbox-size
    • report to google stackdriver

What is Kamon?

http://kamon.io/documentation/1.x/get-started/

Kamon is a monitoring toolkit for applications running on the JVM. It gives you Metrics, Tracing and Context Propagation APIs without locking you to any specific vendor.

useful toolkit for JVM Application.


Why measuring?

If you want your system

  • more fast
  • more throughput

What you have to do first is,, Measuring!

we need sycle like below.

           ←              Running       ←
    ↓                          ↑
Measuring                   Improvement
    ↓                          ↑
      →  find bottleneck   →

Kamon is pluggable tool

and so on.

(but now, how to create plugin is undocumented)


create simple plugin


create simple collector

you should

  1. implement a action that collect metrics
  2. set the action to scheduler
// 1 

class MyMetricsCollector {

  val hist1    = Kamon.histogram("my-metrics.hist1")
  val counter1    = Kamon.counter("my-metrics.counter1")
  val sampler1    = Kamon.rangeSampler("my-metrics.sampler1")

  def update() = {
    logger.info("MyMetrics record")

    hist1.record(10)
    counter1.increment(1)
    counter1.increment(1)
    sampler1.increment(2)
  }
}
// 2

object MyCollector {

  private var scheduledCollection: ScheduledFuture[_] = null

  def startCollecting() = {
    val myMetrics = new MyMetrics()
    val updaterSchedule = new Runnable {
      override def run(): Unit = myMetrics.update()
    }
    scheduledCollection = Kamon.scheduler().scheduleAtFixedRate(
      updaterSchedule,
      Duration.ofSeconds(1).toMillis,
      Duration.ofSeconds(1).toMillis,
      TimeUnit.MILLISECONDS // collect data every seconds
    )
  }

  def stopCollecting():Boolean = {
    val b = scheduledCollection.cancel(false)
    scheduledCollection = null
    b
  }
}


object Main extends App {
  MyCollector.startCollecting()
  // measuring
  MyCollector.startCollecting()
}

source code


create simple reporter

  1. extends MetricReporter
  2. write .conf to read custom Reporter
//1

class MyReporter extends MetricReporter {

  private val logger = LoggerFactory.getLogger(classOf[MyReporter])

  override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
    logger.info("reportTickSnapshot")
    snapshot.metrics.counters.foreach(metric => {
      logger.info(s"name: ${metric.name}, value: ${metric.value}")
    })
    snapshot.metrics.histograms.foreach(metric => {
      logger.info(s"name: ${metric.name}, value-sum: ${metric.distribution.sum}")
    })
    snapshot.metrics.rangeSamplers.foreach(metric => {
      logger.info(s"name: ${metric.name}, value-max: ${metric.distribution.max}")
    })
  }

  override def start(): Unit = {
    logger.info("MyReporter start")
  }

  override def stop(): Unit = {
    logger.info("MyReporter stop")
  }

  override def reconfigure(config: Config): Unit = {
    logger.info("MyReporter reconfigure")
  }
}
//2

kamon {
  reporters = ["com.github.uryyyyyyy.kamon.simple.reporter.MyReporter"]
}

source code


run simple plugin

you can see custom metrics(& value).

2018-03-17 13:11:51,479 [INFO] - Loaded metric reporter [com.github.uryyyyyyy.kamon.simple.reporter.MyReporter]
2018-03-17 13:11:51,479 [INFO] - MyReporter start
2018-03-17 13:11:51,486 [INFO] - startCollecting done
2018-03-17 13:11:52,006 [INFO] - reportTickSnapshot
2018-03-17 13:11:52,486 [INFO] - MyMetrics record
2018-03-17 13:11:53,008 [INFO] - reportTickSnapshot
2018-03-17 13:11:53,008 [INFO] - name: my-metrics.counter1, value: 2
2018-03-17 13:11:53,010 [INFO] - name: my-metrics.hist1, value-sum: 10
2018-03-17 13:11:53,010 [INFO] - name: my-metrics.sampler1, value-max: 2
2018-03-17 13:11:53,491 [INFO] - MyMetrics record
2018-03-17 13:11:54,004 [INFO] - reportTickSnapshot
2018-03-17 13:11:54,004 [INFO] - name: my-metrics.counter1, value: 2
2018-03-17 13:11:54,004 [INFO] - name: my-metrics.hist1, value-sum: 10
2018-03-17 13:11:54,004 [INFO] - name: my-metrics.sampler1, value-max: 4
2018-03-17 13:11:54,491 [INFO] - MyMetrics record
2018-03-17 13:11:55,006 [INFO] - reportTickSnapshot

more practical plugin


collect akka-actor mailbox-size

If you want to take akka-actor mailbox-size.
How should we do?

Use AspectJ, and call custom operation when message added & processed.

//It's so tricky... but want to access internal API
package akka.uryyyyyyy.kamon.aspectj

import akka.actor.{ActorCell, Cell}
import kamon.Kamon
import org.aspectj.lang.annotation._

@Aspect
class AkkaActorInstrumentation {

  @Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(*)")
  def sendMessageToActor(cell: Cell): Unit = {}

  @Before("sendMessageToActor(cell)")
  def beforeSendMessageToActor(cell: Cell): Unit = {
    val mb = cell.asInstanceOf[ActorInstrumentationBox].mailBoxSizeValue
    mb.increment()
  }

  @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(*)")
  def invokingActorCell(cell: ActorCell): Unit = {}

  @After("invokingActorCell(cell)")
  def afterInvokingActorCell(cell: ActorCell): Unit = {
    val mb = cell.asInstanceOf[ActorInstrumentationBox].mailBoxSizeValue
    mb.decrement()
  }
}

@Aspect
class MetricsIntoActorCellsMixin {
  @DeclareMixin("akka.actor.ActorCell")
  def mixinActorCellMetricsToActorCell: ActorInstrumentationBox = ActorInstrumentationBox()
}

trait ActorInstrumentationBox {
  def mailBoxSizeValue: MailBoxSizeValue
}

object ActorInstrumentationBox {
  def apply(): ActorInstrumentationBox = new ActorInstrumentationBox(){
    var mb = new MailBoxSizeValue()
    override def mailBoxSizeValue: MailBoxSizeValue = mb
  }
}

class MailBoxSizeValue(){
  val mailboxSize = Kamon.rangeSampler("akka.actor.mailbox-size")

  def increment(): Unit ={
    mailboxSize.increment()
  }

  def decrement(): Unit ={
    mailboxSize.decrement()
  }
}

source code

this sample code come from kamon-akka.


report to google stackdriver

I made & published this library.

kamon-stackdriver

this library code is only 1 file

class StackdriverAPIMetricsSender extends MetricReporter {
  private val logger = LoggerFactory.getLogger(classOf[StackdriverAPIMetricsSender])
  private val config = Kamon.config().getConfig("kamon.stackdriver")
  private val projectID = config.getString("project-id")
  logger.info(s"project-id -> ${projectID}")
  private val monitoredResourceType = config.getString("monitored-resource-type")
  logger.info(s"monitored-resource-type -> ${monitoredResourceType}")

  override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
    val now = ZonedDateTime.now(ZoneOffset.UTC)
    val nowTimestamp = Timestamp.newBuilder()
      .setSeconds(now.toEpochSecond)
      .setNanos(now.getNano)
      .build()

    val counterTimeSeries = snapshot.metrics.counters.map(metric => {
      //TODO should use unit?
      createTimeSeries(s"kamon.${metric.name}", MetricKind.GAUGE, metric.tags, metric.value, nowTimestamp)
    })
    val gaugeTimeSeries = snapshot.metrics.gauges.map(metric => {
      //TODO should use unit?
      createTimeSeries(s"kamon.${metric.name}", MetricKind.GAUGE, metric.tags, metric.value, nowTimestamp)
    })

    // API cannot accept when timeSeries over 250
    val timeSeriesListList = (counterTimeSeries ++ gaugeTimeSeries ++ histgramTimeSeries ++ rangeSamplerTimeSeries).grouped(200)

    try {
      timeSeriesListList.foreach(_timeSeriesList => {
        val createTimeSeriesRequest = CreateTimeSeriesRequest.newBuilder()
          .addAllTimeSeries(_timeSeriesList.toList.asJava)
          .setName(s"projects/${projectID}")
          .build()
        metricServiceClient.createTimeSeries(createTimeSeriesRequest)
      })
    } catch {
      case e: Exception => logger.warn("stackdriver request failed, some metrics may have been dropped: {}", e.getMessage)
    }
  }
}

It's very easy, isn't it.


Come on! to the Kamon world.

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.