This name come from 家紋?(family symbol in Japanese)
In this LT,
- I'm sorry for talking in Japanese
- sample code are in github
- Welcome questions & comments.
Who am I
- Name: uryyyyyyy(Koki Shibata)
- Company: Opt, Inc.
- Skill: Spark/TypeScript/Kubernetes/React Native etc...
Contents
- What is Kamon?
- simple plugin for Kamon
- collect metrics data
- report metrics data
- more practical plugin
- collect akka-actor mailbox-size
- report to google stackdriver
What is Kamon?
Kamon is a monitoring toolkit for applications running on the JVM. It gives you Metrics, Tracing and Context Propagation APIs without locking you to any specific vendor.
useful toolkit for JVM Application.
Why measuring?
If you want your system
- more fast
- more throughput
What you have to do first is,, Measuring!
we need sycle like below.
← Running ←
↓ ↑
Measuring Improvement
↓ ↑
→ find bottleneck →
Kamon is pluggable tool
- collect some metrics
- System metrics(CPU, memory, File system etc...)
- Akka(active-actors, maibox-size etc..)
- report collected metrics
and so on.
(but now, how to create plugin is undocumented)
create simple plugin
create simple collector
you should
- implement a action that collect metrics
- set the action to scheduler
// 1
class MyMetricsCollector {
val hist1 = Kamon.histogram("my-metrics.hist1")
val counter1 = Kamon.counter("my-metrics.counter1")
val sampler1 = Kamon.rangeSampler("my-metrics.sampler1")
def update() = {
logger.info("MyMetrics record")
hist1.record(10)
counter1.increment(1)
counter1.increment(1)
sampler1.increment(2)
}
}
// 2
object MyCollector {
private var scheduledCollection: ScheduledFuture[_] = null
def startCollecting() = {
val myMetrics = new MyMetrics()
val updaterSchedule = new Runnable {
override def run(): Unit = myMetrics.update()
}
scheduledCollection = Kamon.scheduler().scheduleAtFixedRate(
updaterSchedule,
Duration.ofSeconds(1).toMillis,
Duration.ofSeconds(1).toMillis,
TimeUnit.MILLISECONDS // collect data every seconds
)
}
def stopCollecting():Boolean = {
val b = scheduledCollection.cancel(false)
scheduledCollection = null
b
}
}
object Main extends App {
MyCollector.startCollecting()
// measuring
MyCollector.startCollecting()
}
create simple reporter
- extends MetricReporter
- write .conf to read custom Reporter
//1
class MyReporter extends MetricReporter {
private val logger = LoggerFactory.getLogger(classOf[MyReporter])
override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
logger.info("reportTickSnapshot")
snapshot.metrics.counters.foreach(metric => {
logger.info(s"name: ${metric.name}, value: ${metric.value}")
})
snapshot.metrics.histograms.foreach(metric => {
logger.info(s"name: ${metric.name}, value-sum: ${metric.distribution.sum}")
})
snapshot.metrics.rangeSamplers.foreach(metric => {
logger.info(s"name: ${metric.name}, value-max: ${metric.distribution.max}")
})
}
override def start(): Unit = {
logger.info("MyReporter start")
}
override def stop(): Unit = {
logger.info("MyReporter stop")
}
override def reconfigure(config: Config): Unit = {
logger.info("MyReporter reconfigure")
}
}
//2
kamon {
reporters = ["com.github.uryyyyyyy.kamon.simple.reporter.MyReporter"]
}
run simple plugin
you can see custom metrics(& value).
2018-03-17 13:11:51,479 [INFO] - Loaded metric reporter [com.github.uryyyyyyy.kamon.simple.reporter.MyReporter]
2018-03-17 13:11:51,479 [INFO] - MyReporter start
2018-03-17 13:11:51,486 [INFO] - startCollecting done
2018-03-17 13:11:52,006 [INFO] - reportTickSnapshot
2018-03-17 13:11:52,486 [INFO] - MyMetrics record
2018-03-17 13:11:53,008 [INFO] - reportTickSnapshot
2018-03-17 13:11:53,008 [INFO] - name: my-metrics.counter1, value: 2
2018-03-17 13:11:53,010 [INFO] - name: my-metrics.hist1, value-sum: 10
2018-03-17 13:11:53,010 [INFO] - name: my-metrics.sampler1, value-max: 2
2018-03-17 13:11:53,491 [INFO] - MyMetrics record
2018-03-17 13:11:54,004 [INFO] - reportTickSnapshot
2018-03-17 13:11:54,004 [INFO] - name: my-metrics.counter1, value: 2
2018-03-17 13:11:54,004 [INFO] - name: my-metrics.hist1, value-sum: 10
2018-03-17 13:11:54,004 [INFO] - name: my-metrics.sampler1, value-max: 4
2018-03-17 13:11:54,491 [INFO] - MyMetrics record
2018-03-17 13:11:55,006 [INFO] - reportTickSnapshot
more practical plugin
collect akka-actor mailbox-size
If you want to take akka-actor mailbox-size.
How should we do?
Use AspectJ, and call custom operation when message added & processed.
//It's so tricky... but want to access internal API
package akka.uryyyyyyy.kamon.aspectj
import akka.actor.{ActorCell, Cell}
import kamon.Kamon
import org.aspectj.lang.annotation._
@Aspect
class AkkaActorInstrumentation {
@Pointcut("execution(* akka.actor.ActorCell.sendMessage(*)) && this(cell) && args(*)")
def sendMessageToActor(cell: Cell): Unit = {}
@Before("sendMessageToActor(cell)")
def beforeSendMessageToActor(cell: Cell): Unit = {
val mb = cell.asInstanceOf[ActorInstrumentationBox].mailBoxSizeValue
mb.increment()
}
@Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && this(cell) && args(*)")
def invokingActorCell(cell: ActorCell): Unit = {}
@After("invokingActorCell(cell)")
def afterInvokingActorCell(cell: ActorCell): Unit = {
val mb = cell.asInstanceOf[ActorInstrumentationBox].mailBoxSizeValue
mb.decrement()
}
}
@Aspect
class MetricsIntoActorCellsMixin {
@DeclareMixin("akka.actor.ActorCell")
def mixinActorCellMetricsToActorCell: ActorInstrumentationBox = ActorInstrumentationBox()
}
trait ActorInstrumentationBox {
def mailBoxSizeValue: MailBoxSizeValue
}
object ActorInstrumentationBox {
def apply(): ActorInstrumentationBox = new ActorInstrumentationBox(){
var mb = new MailBoxSizeValue()
override def mailBoxSizeValue: MailBoxSizeValue = mb
}
}
class MailBoxSizeValue(){
val mailboxSize = Kamon.rangeSampler("akka.actor.mailbox-size")
def increment(): Unit ={
mailboxSize.increment()
}
def decrement(): Unit ={
mailboxSize.decrement()
}
}
this sample code come from kamon-akka.
report to google stackdriver
I made & published this library.
this library code is only 1 file
class StackdriverAPIMetricsSender extends MetricReporter {
private val logger = LoggerFactory.getLogger(classOf[StackdriverAPIMetricsSender])
private val config = Kamon.config().getConfig("kamon.stackdriver")
private val projectID = config.getString("project-id")
logger.info(s"project-id -> ${projectID}")
private val monitoredResourceType = config.getString("monitored-resource-type")
logger.info(s"monitored-resource-type -> ${monitoredResourceType}")
override def reportPeriodSnapshot(snapshot: PeriodSnapshot): Unit = {
val now = ZonedDateTime.now(ZoneOffset.UTC)
val nowTimestamp = Timestamp.newBuilder()
.setSeconds(now.toEpochSecond)
.setNanos(now.getNano)
.build()
val counterTimeSeries = snapshot.metrics.counters.map(metric => {
//TODO should use unit?
createTimeSeries(s"kamon.${metric.name}", MetricKind.GAUGE, metric.tags, metric.value, nowTimestamp)
})
val gaugeTimeSeries = snapshot.metrics.gauges.map(metric => {
//TODO should use unit?
createTimeSeries(s"kamon.${metric.name}", MetricKind.GAUGE, metric.tags, metric.value, nowTimestamp)
})
// API cannot accept when timeSeries over 250
val timeSeriesListList = (counterTimeSeries ++ gaugeTimeSeries ++ histgramTimeSeries ++ rangeSamplerTimeSeries).grouped(200)
try {
timeSeriesListList.foreach(_timeSeriesList => {
val createTimeSeriesRequest = CreateTimeSeriesRequest.newBuilder()
.addAllTimeSeries(_timeSeriesList.toList.asJava)
.setName(s"projects/${projectID}")
.build()
metricServiceClient.createTimeSeries(createTimeSeriesRequest)
})
} catch {
case e: Exception => logger.warn("stackdriver request failed, some metrics may have been dropped: {}", e.getMessage)
}
}
}
It's very easy, isn't it.