LoginSignup
1
1

More than 5 years have passed since last update.

Apache Storm 1.0.0を使ってみる Native Streaming Window API編

Last updated at Posted at 2016-05-01

Apache Storm 1.0.0を使ってみるシリーズの3回目です。今まではこんなのを書いています。

Apache Storm 1.0.0を使ってみる Distributed Cache API編
Apache Storm 1.0.0を使ってみる Storm UI編

今回はNative Streaming Window APIを使ってみます。

Topologyの起動

SlidingWindowTopologyがサンプルとして用意されているので、それを使ってみます。

# bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.SlidingWindowTopology
SlidingWindowTopology
$ bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.SlidingWindowTopology
Running: /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/apache-storm-1.0.0 -Dstorm.log.dir=/opt/apache-storm-1.0.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/apache-storm-1.0.0/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.0/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.0/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.0/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.0/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.0/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.0/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.0/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.0/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.0/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.0/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.0/lib/storm-core-1.0.0.jar:/opt/apache-storm-1.0.0/lib/storm-rename-hack-1.0.0.jar:examples/storm-starter/storm-starter-topologies-1.0.0.jar:/opt/storm/conf:/opt/apache-storm-1.0.0/bin -Dstorm.jar=examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.SlidingWindowTopology
3712 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:host.name=192.168.10.100
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.version=1.8.0_66
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.class.path=/opt/apache-storm-1.0.0/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.0/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.0/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.0/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.0/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.0/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.0/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.0/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.0/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.0/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.0/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.0/lib/storm-core-1.0.0.jar:/opt/apache-storm-1.0.0/lib/storm-rename-hack-1.0.0.jar:examples/storm-starter/storm-starter-topologies-1.0.0.jar:/opt/storm/conf:/opt/apache-storm-1.0.0/bin
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.io.tmpdir=/var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.compiler=<NA>
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.name=Mac OS X
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.arch=x86_64
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.version=10.11.4
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.name=KojiIshida
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.home=/Users/KojiIshida
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.dir=/opt/apache-storm-1.0.0
3736 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/bd026224-c71e-4395-b96f-9ab3551b5c60/version-2 snapdir /var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/bd026224-c71e-4395-b96f-9ab3551b5c60/version-2
3760 [main] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:2000
3766 [main] INFO  o.a.s.zookeeper - Starting inprocess zookeeper at port 2000 and dir /var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T//bd026224-c71e-4395-b96f-9ab3551b5c60
3944 [main] INFO  o.a.s.d.nimbus - Starting Nimbus with conf {"topology.builtin.metrics.bucket.size.secs" 60, "nimbus.childopts" "-Xmx1024m", "ui.filter.params" nil, "storm.cluster.mode" "local", "storm.messaging.netty.client_worker_threads" 1, "logviewer.max.per.worker.logs.size.mb" 2048, "supervisor.run.worker.as.user" false, "topology.max.task.parallelism" nil, "topology.priority" 29, "zmq.threads" 1, "storm.group.mapping.service" "org.apache.storm.security.auth.ShellBasedGroupsMapping", "transactional.zookeeper.root" "/transactional", "topology.sleep.spout.wait.strategy.time.ms" 1, "scheduler.display.resource" false, "topology.max.replication.wait.time.sec" 60, "drpc.invocations.port" 3773, "supervisor.localizer.cache.target.size.mb" 10240, "topology.multilang.serializer" "org.apache.storm.multilang.JsonSerializer", "storm.messaging.netty.server_worker_threads" 1, "nimbus.blobstore.class" "org.apache.storm.blobstore.LocalFsBlobStore", "resource.aware.scheduler.eviction.strategy" "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy", "topology.max.error.report.per.interval" 5, "storm.thrift.transport" "org.apache.storm.security.auth.SimpleTransportPlugin", "zmq.hwm" 0, "storm.group.mapping.service.params" nil, "worker.profiler.enabled" false, "storm.principal.tolocal" "org.apache.storm.security.auth.DefaultPrincipalToLocal", "supervisor.worker.shutdown.sleep.secs" 1, "pacemaker.host" "localhost", "storm.zookeeper.retry.times" 5, "ui.actions.enabled" true, "zmq.linger.millis" 0, "supervisor.enable" true, "topology.stats.sample.rate" 0.05, "storm.messaging.netty.min_wait_ms" 100, "worker.log.level.reset.poll.secs" 30, "storm.zookeeper.port" 2000, "supervisor.heartbeat.frequency.secs" 5, "topology.enable.message.timeouts" true, "supervisor.cpu.capacity" 400.0, "drpc.worker.threads" 64, "supervisor.blobstore.download.thread.count" 5, "drpc.queue.size" 128, "topology.backpressure.enable" true, "supervisor.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "storm.blobstore.inputstream.buffer.size.bytes" 65536, "topology.shellbolt.max.pending" 100, "drpc.https.keystore.password" "", "nimbus.code.sync.freq.secs" 120, "logviewer.port" 8000, "topology.scheduler.strategy" "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy", "topology.executor.send.buffer.size" 1024, "resource.aware.scheduler.priority.strategy" "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy", "pacemaker.auth.method" "NONE", "storm.daemon.metrics.reporter.plugins" ["org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"], "topology.worker.logwriter.childopts" "-Xmx64m", "topology.spout.wait.strategy" "org.apache.storm.spout.SleepSpoutWaitStrategy", "ui.host" "0.0.0.0", "storm.nimbus.retry.interval.millis" 2000, "nimbus.inbox.jar.expiration.secs" 3600, "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.acker.executors" nil, "topology.fall.back.on.java.serialization" true, "topology.eventlogger.executors" 0, "supervisor.localizer.cleanup.interval.ms" 600000, "storm.zookeeper.servers" ["localhost"], "nimbus.thrift.threads" 64, "logviewer.cleanup.age.mins" 10080, "topology.worker.childopts" nil, "topology.classpath" nil, "supervisor.monitor.frequency.secs" 3, "nimbus.credential.renewers.freq.secs" 600, "topology.skip.missing.kryo.registrations" true, "drpc.authorizer.acl.filename" "drpc-auth-acl.yaml", "pacemaker.kerberos.users" [], "storm.group.mapping.service.cache.duration.secs" 120, "topology.testing.always.try.serialize" false, "nimbus.monitor.freq.secs" 10, "storm.health.check.timeout.ms" 5000, "supervisor.supervisors" [], "topology.tasks" nil, "topology.bolts.outgoing.overflow.buffer.enable" false, "storm.messaging.netty.socket.backlog" 500, "topology.workers" 1, "pacemaker.base.threads" 10, "storm.local.dir" "/var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T//db374078-fd57-4aeb-b7df-537f90929bc4", "topology.disable.loadaware" false, "worker.childopts" "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump", "storm.auth.simple-white-list.users" [], "topology.disruptor.batch.timeout.millis" 1, "topology.message.timeout.secs" 30, "topology.state.synchronization.timeout.secs" 60, "topology.tuple.serializer" "org.apache.storm.serialization.types.ListDelegateSerializer", "supervisor.supervisors.commands" [], "nimbus.blobstore.expiration.secs" 600, "logviewer.childopts" "-Xmx128m", "topology.environment" nil, "topology.debug" false, "topology.disruptor.batch.size" 100, "storm.messaging.netty.max_retries" 300, "ui.childopts" "-Xmx768m", "storm.network.topography.plugin" "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping", "storm.zookeeper.session.timeout" 20000, "drpc.childopts" "-Xmx768m", "drpc.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.connection.timeout" 15000, "storm.zookeeper.auth.user" nil, "storm.meta.serialization.delegate" "org.apache.storm.serialization.GzipThriftSerializationDelegate", "topology.max.spout.pending" nil, "storm.codedistributor.class" "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor", "nimbus.supervisor.timeout.secs" 60, "nimbus.task.timeout.secs" 30, "drpc.port" 3772, "pacemaker.max.threads" 50, "storm.zookeeper.retry.intervalceiling.millis" 30000, "nimbus.thrift.port" 6627, "storm.auth.simple-acl.admins" [], "topology.component.cpu.pcore.percent" 10.0, "supervisor.memory.capacity.mb" 3072.0, "storm.nimbus.retry.times" 5, "supervisor.worker.start.timeout.secs" 120, "storm.zookeeper.retry.interval" 1000, "logs.users" nil, "worker.profiler.command" "flight.bash", "transactional.zookeeper.port" nil, "drpc.max_buffer_size" 1048576, "pacemaker.thread.timeout" 10, "task.credentials.poll.secs" 30, "blobstore.superuser" "KojiIshida", "drpc.https.keystore.type" "JKS", "topology.worker.receiver.thread.count" 1, "topology.state.checkpoint.interval.ms" 1000, "supervisor.slots.ports" [6700 6701 6702 6703], "topology.transfer.buffer.size" 1024, "storm.health.check.dir" "healthchecks", "topology.worker.shared.thread.pool.size" 4, "drpc.authorizer.acl.strict" false, "nimbus.file.copy.expiration.secs" 600, "worker.profiler.childopts" "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder", "topology.executor.receive.buffer.size" 1024, "backpressure.disruptor.low.watermark" 0.4, "nimbus.task.launch.secs" 120, "storm.local.mode.zmq" false, "storm.messaging.netty.buffer_size" 5242880, "storm.cluster.state.store" "org.apache.storm.cluster_state.zookeeper_state_factory", "worker.heartbeat.frequency.secs" 1, "storm.log4j2.conf.dir" "log4j2", "ui.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.root" "/storm", "topology.tick.tuple.freq.secs" nil, "drpc.https.port" -1, "storm.workers.artifacts.dir" "workers-artifacts", "supervisor.blobstore.download.max_retries" 3, "task.refresh.poll.secs" 10, "storm.exhibitor.port" 8080, "task.heartbeat.frequency.secs" 3, "pacemaker.port" 6699, "storm.messaging.netty.max_wait_ms" 1000, "nimbus.impersonation.authorizer" "org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer", "topology.component.resources.offheap.memory.mb" 0.0, "drpc.http.port" 3774, "topology.error.throttle.interval.secs" 10, "storm.messaging.transport" "org.apache.storm.messaging.netty.Context", "storm.messaging.netty.authentication" false, "topology.component.resources.onheap.memory.mb" 128.0, "topology.kryo.factory" "org.apache.storm.serialization.DefaultKryoFactory", "worker.gc.childopts" "", "nimbus.topology.validator" "org.apache.storm.nimbus.DefaultTopologyValidator", "nimbus.seeds" ["0.0.0.0"], "nimbus.queue.size" 100000, "nimbus.cleanup.inbox.freq.secs" 600, "storm.blobstore.replication.factor" 3, "worker.heap.memory.mb" 768, "logviewer.max.sum.worker.logs.size.mb" 4096, "pacemaker.childopts" "-Xmx1024m", "ui.users" nil, "transactional.zookeeper.servers" nil, "supervisor.worker.timeout.secs" 30, "storm.zookeeper.auth.password" nil, "client.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "supervisor.childopts" "-Xmx256m", "topology.worker.max.heap.size.mb" 768.0, "backpressure.disruptor.high.watermark" 0.9, "ui.filter" nil, "ui.header.buffer.bytes" 4096, "topology.min.replication.count" 1, "topology.disruptor.wait.timeout.millis" 1000, "storm.nimbus.retry.intervalceiling.millis" 60000, "topology.trident.batch.emit.interval.millis" 50, "storm.auth.simple-acl.users" [], "drpc.invocations.threads" 64, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "ui.port" 8080, "storm.exhibitor.poll.uripath" "/exhibitor/v1/cluster/list", "storm.messaging.netty.transfer.batch.size" 262144, "logviewer.appender.name" "A1", "nimbus.thrift.max_buffer_size" 1048576, "storm.auth.simple-acl.users.commands" [], "drpc.request.timeout.secs" 600}
3948 [main] INFO  o.a.s.d.nimbus - Using default scheduler
3999 [main] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:host.name=192.168.10.100
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.version=1.8.0_66
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.vendor=Oracle Corporation
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.class.path=/opt/apache-storm-1.0.0/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.0/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.0/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.0/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.0/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.0/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.0/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.0/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.0/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.0/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.0/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.0/lib/storm-core-1.0.0.jar:/opt/apache-storm-1.0.0/lib/storm-rename-hack-1.0.0.jar:examples/storm-starter/storm-starter-topologies-1.0.0.jar:/opt/storm/conf:/opt/apache-storm-1.0.0/bin
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.compiler=<NA>
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:os.name=Mac OS X
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:os.arch=x86_64
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:os.version=10.11.4
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:user.name=KojiIshida
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:user.home=/Users/KojiIshida
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:user.dir=/opt/apache-storm-1.0.0
4005 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@77c3c037
4031 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
4047 [main] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4048 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@52226e57
4050 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
4055 [main] INFO  o.a.s.b.FileBlobStoreImpl - Creating new blob store based in /var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/db374078-fd57-4aeb-b7df-537f90929bc4/blobs
4104 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
4106 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60125
4108 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
4110 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60126
4111 [main] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4112 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60125
4113 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@68b734a8
4114 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60126
4115 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.p.FileTxnLog - Creating new log file: log.1
4117 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
4118 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
4118 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:60127
4119 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:60127
4153 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154488b21500000 with negotiated timeout 20000 for client /127.0.0.1:60125
4153 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x154488b21500000, negotiated timeout = 20000
4155 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154488b21500001 with negotiated timeout 20000 for client /127.0.0.1:60126
4155 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x154488b21500001, negotiated timeout = 20000
4157 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154488b21500002 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:60127
4157 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x154488b21500002, negotiated timeout = 20000
4158 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
4158 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
4158 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
4177 [main-EventThread] INFO  o.a.s.zookeeper - Zookeeper state update: :connected:none
4177 [main-EventThread] INFO  o.a.s.zookeeper - Zookeeper state update: :connected:none

なにやら大量にログが出てきました。というか出続けます。

何が起こったのかわからないので、少しソースを読むことに。

SlidingWindowTopology.javaを見てみる。

キーとなるのはこの辺ですかね。

SlidingWindowTopology.java
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("integer", new RandomIntegerSpout(), 1);
        builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
                .shuffleGrouping("integer");
        builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
                .shuffleGrouping("slidingsum");
        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(40000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }

メインメソッドの中で3つのBoltを設定しています。
- SlidingWindowSumBold
- TumblingWindowAvgBolt
- PrinterBolt

合計値を計算するBoltと、平均値を計算するBoltと、それを表示するBoltがあるように見えます。それぞれのBoltの中のソースを見て、何をしているかを確認した後に再度ログの方を見てみれば、何かわかりそうです。

その前にRandomIntegerSpoutを確認してみる

RandomIntegerSpoutというSpoutがありましたので、まずこの中身から確認してみます。

RandomIntegerSpout.java
    @Override
    public void nextTuple() {
        Utils.sleep(100);
        collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId), msgId);
    }

いい感じにとても単純ですね。0〜999の乱数を発生させているだけのようです。

設定されているBoltのソースを見てみる

SlidingWindowSumBoldを見てみる

では、Boltの確認に入ります。まずはSlidingWindowSumBolt

SlidingWindowSumBolt.java
    @Override
    public void execute(TupleWindow inputWindow) {
            /*
             * The inputWindow gives a view of
             * (a) all the events in the window
             * (b) events that expired since last activation of the window
             * (c) events that newly arrived since last activation of the window
             */
        List<Tuple> tuplesInWindow = inputWindow.get();
        List<Tuple> newTuples = inputWindow.getNew();
        List<Tuple> expiredTuples = inputWindow.getExpired();

        LOG.debug("Events in current window: " + tuplesInWindow.size());
            /*
             * Instead of iterating over all the tuples in the window to compute
             * the sum, the values for the new events are added and old events are
             * subtracted. Similar optimizations might be possible in other
             * windowing computations.
             */
        for (Tuple tuple : newTuples) {
            sum += (int) tuple.getValue(0);
        }
        for (Tuple tuple : expiredTuples) {
            sum -= (int) tuple.getValue(0);
        }
        collector.emit(new Values(sum));
    }

ふむ。受信したTupleの中身を足し合わせていますが、expiredになったものは減算しているので、この辺がSlidingWindowと関係してくるのでしょう。何はともあれ、ソース自体は単純でした。

TumblingWindowAvgBoltを見てみる

次はTumblingWindowAvgBoltです。こちらはSlidingWindowTopologyに内包されていました。

TumblingWindowAvgBolt.java
        @Override
        public void execute(TupleWindow inputWindow) {
            int sum = 0;
            List<Tuple> tuplesInWindow = inputWindow.get();
            LOG.debug("Events in current window: " + tuplesInWindow.size());
            if (tuplesInWindow.size() > 0) {
                /*
                * Since this is a tumbling window calculation,
                * we use all the tuples in the window to compute the avg.
                */
                for (Tuple tuple : tuplesInWindow) {
                    sum += (int) tuple.getValue(0);
                }
                collector.emit(new Values(sum / tuplesInWindow.size()));
            }
        }

設定されたWindowのサイズによって、足し合わせてきた合計値から平均値を出しているようです。そういえばmainメソッドの中ではこの設定値に3が指定されていたので、おそらく3回合計値を受信したら、平均値の算出をするのでしょう。

PrinterBoltを見てみる

解説が不要なくらい、とてもシンプルな作りでした。

PrinterBolt.java
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    System.out.println(tuple);
  }

うん、とっても単純。

このTopologyの動きをまとめると

予想では以下の通りに動くはずですね。
1. 0〜999の乱数を生成して、加算用のBoltに送信する。
2. 受信したTupleの数値を10回足し合わせて保持する。
3. 平均値計算用のBoltに合計値を送付する。
4. 3回合計値が溜まったら、平均値を算出する。

出力されたログを見てみる

サンプルを動かして、出力されたログを確認してみます。mainメソッドによれば40秒で実施が完了するようなので、待ってみます。

22857 [Thread-21-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Preparing bolt __system:(-1)
22864 [Thread-21-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Prepared bolt __system:(-1)
22874 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - Opening spout integer:(2)
22878 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - Opened spout integer:(2)
22881 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - Activating spout integer:(2)
22887 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - Preparing bolt tumblingavg:(5)
22891 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - Prepared bolt tumblingavg:(5)
22893 [Thread-17-printer-executor[3 3]] INFO  o.a.s.d.executor - Preparing bolt printer:(3)
22893 [Thread-17-printer-executor[3 3]] INFO  o.a.s.d.executor - Prepared bolt printer:(3)
22900 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Preparing bolt slidingsum:(4)
22901 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Prepared bolt slidingsum:(4)
22950 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Preparing bolt __acker:(1)
22952 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Prepared bolt __acker:(1)

最初に書くSpoutとBoltが初期化されているように見えますね。各コンポーネントに割り当てられた数字が、今後見る上で大事になります。

22988 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.task - Emitting: integer default [778, 1462006953163, 1]
22994 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 4 tuple: source: integer:2, stream: default, id: {7447250857071583863=-3578654789271241288}, [778, 1462006953163, 1]]
22995 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.task - Emitting: integer __ack_init [7447250857071583863 -3578654789271241288 2]
22996 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 1 tuple: source: integer:2, stream: __ack_init, id: {}, [7447250857071583863 -3578654789271241288 2]]
22997 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Processing received message FOR 4 TUPLE: source: integer:2, stream: default, id: {7447250857071583863=-3578654789271241288}, [778, 1462006953163, 1]
22998 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: default, id: {7447250857071583863=-3578654789271241288}, [778, 1462006953163, 1] TASK: 4 DELTA: 
22999 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Processing received message FOR 1 TUPLE: source: integer:2, stream: __ack_init, id: {}, [7447250857071583863 -3578654789271241288 2]
23000 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: integer:2, stream: __ack_init, id: {}, [7447250857071583863 -3578654789271241288 2]

さて、早速乱数が作成されてEmitされているのがわかります。1番目として[778, 1462006953163, 1]を送付している、ということですね。で、その次の行に

TRANSFERING tuple [dest: 4 tuple: source: integer:2, stream: default, id: {7447250857071583863=-3578654789271241288}, [778, 1462006953163, 1]]

というログが出ていますが、4番目のコンポーネント「slidingsum」に向けて2番目のコンポーネント「integer」からデータが送付されている、という意味になります。最初の初期化していたIDがここで使われています。しばらくこのようなやり取りをしながら、10番目まで送付を続けています。で、10番目まで送付をすると、ログの出力のされ方が変わります。

23921 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Processing received message FOR 4 TUPLE: source: integer:2, stream: default, id: {4389696495574006045=2681496747949810702}, [645, 1462006954095, 10]
23921 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Processing received message FOR 1 TUPLE: source: integer:2, stream: __ack_init, id: {}, [4389696495574006045 2681496747949810702 2]
23921 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: integer:2, stream: __ack_init, id: {}, [4389696495574006045 2681496747949810702 2]
23921 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: __ack_init, id: {}, [4389696495574006045 2681496747949810702 2] TASK: 1 DELTA: 
23922 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [6050]
23923 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {-4601633546221640614=-1172784951702062669, 6973864295393994913=1708061666379587936, 7447250857071583863=5623972221711691701, -6102906463311307032=5518334604031423874, -7638682188827435429=4478428313701290490, 4389696495574006045=5932148635671668893, -6197802534174832134=1497464440461667439, -4266159872746606489=3561774369052512930, 4334659838991266401=-7106335497305084820, 4712299375428287216=3508884140400825778}, [6050]]

合計値として算出された6050を5番目のコンポーネント「tumblingavg」に送付しました。これで6050という数値が保存されました。おそらくこれを3回繰り返すので、2回目、3回目でそれぞれどのような数値が送付されているのか見ていきます。

24961 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [11976]
24961 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {-4601633546221640614=492010799657169784, 317525622483766670=2638127789448427461, 3002989214851206566=128002525292759865, 7447250857071583863=8767372787937047960, -6102906463311307032=-4546730023383659346, 3562603101316339696=-7385522463102788783, -7547522651842339766=4915704745663260957, -8228450230436970384=-4055518290977434481, 6973864295393994913=6291843886548489067, -7638682188827435429=8441873508646088204, 4389696495574006045=-1674211243428697808, -6197802534174832134=2733983747371624992, 2558430164343979559=285379101380278186, -4266159872746606489=-6678113015319323641, 4334659838991266401=999638572688010822, 4712299375428287216=-2506689729719803634, -3288150160972036797=6163930983830697438, -6632312692089791299=4450995360647585394, 3635266110827390635=325375251356977723, 7564414229761366650=126552843783268400}, [11976]]
:
:
26001 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [15909]
26001 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: integer:2, stream: __ack_init, id: {}, [-3948625081878221959 -8424958756548618666 2]
26001 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: __ack_init, id: {}, [-3948625081878221959 -8424958756548618666 2] TASK: 1 DELTA: 1
26001 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {317525622483766670=7555764236357609298, 3002989214851206566=1694128471165560315, 3562603101316339696=-9141305483820762660, -7547522651842339766=-2863953427390624123, 6875073760184740346=-5132920613823939509, -3948625081878221959=-5437427497122664614, -6197802534174832134=-5798652776913323486, 2558430164343979559=-3974124564791740629, 5827912698427451791=-8755182930855226192, 4334659838991266401=1875094829666965635, -3288150160972036797=2882654408400632661, -6632312692089791299=-7416978337353888496, 5660369989147374586=-146257236873007036, -4601633546221640614=314568010495525006, 7447250857071583863=-7894145466289122325, -6102906463311307032=-734768705874586731, 2416468512832932723=5395971769109383713, -7913753694545619816=-422366601723792798, 2216548725421935911=-1747022309515322275, 5545722913635347065=-4183811763737068653, -8228450230436970384=7072197569993326849, 1069888987022072121=1339914806148366726, 6973864295393994913=881160568155161730, -1038295836277073885=-6098822845524698690, -7638682188827435429=-1166341425454927490, 4389696495574006045=621900626234935483, -4266159872746606489=5786349201308905180, 4712299375428287216=7586810889173452616, 3635266110827390635=-4551921407959807206, 7564414229761366650=-2358375753654415983}, [15909]]
26001 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: default, id: {-3948625081878221959=-8424958756548618666}, [34, 1462006956174, 30] TASK: 4 DELTA: 
26003 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - Processing received message FOR 5 TUPLE: source: slidingsum:4, stream: default, id: {317525622483766670=7555764236357609298, 3002989214851206566=1694128471165560315, 3562603101316339696=-9141305483820762660, -7547522651842339766=-2863953427390624123, 6875073760184740346=-5132920613823939509, -3948625081878221959=-5437427497122664614, -6197802534174832134=-5798652776913323486, 2558430164343979559=-3974124564791740629, 5827912698427451791=-8755182930855226192, 4334659838991266401=1875094829666965635, -3288150160972036797=2882654408400632661, -6632312692089791299=-7416978337353888496, 5660369989147374586=-146257236873007036, -4601633546221640614=314568010495525006, 7447250857071583863=-7894145466289122325, -6102906463311307032=-734768705874586731, 2416468512832932723=5395971769109383713, -7913753694545619816=-422366601723792798, 2216548725421935911=-1747022309515322275, 5545722913635347065=-4183811763737068653, -8228450230436970384=7072197569993326849, 1069888987022072121=1339914806148366726, 6973864295393994913=881160568155161730, -1038295836277073885=-6098822845524698690, -7638682188827435429=-1166341425454927490, 4389696495574006045=621900626234935483, -4266159872746606489=5786349201308905180, 4712299375428287216=7586810889173452616, 3635266110827390635=-4551921407959807206, 7564414229761366650=-2358375753654415983}, [15909]

20番目は11976、30番目は15909ですね。対象となるIDも一緒に送付されるので、どんどんログが長くなっていきます。(^^;で、3つの合計値がたまったので、ここから平均値の計算がされます。

26003 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.task - Emitting: tumblingavg default [11311]
26003 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 3 tuple: source: tumblingavg:5, stream: default, id: {317525622483766670=5280243493335298546, 3002989214851206566=5280243493335298546, 3562603101316339696=5280243493335298546, -7547522651842339766=5280243493335298546, 6875073760184740346=-224228321843401347, -3948625081878221959=-224228321843401347, -6197802534174832134=3666185645087914158, 2558430164343979559=5280243493335298546, 5827912698427451791=-224228321843401347, 4334659838991266401=3666185645087914158, -3288150160972036797=5280243493335298546, -6632312692089791299=5280243493335298546, 5660369989147374586=-224228321843401347, -4601633546221640614=3666185645087914158, 7447250857071583863=3666185645087914158, -6102906463311307032=3666185645087914158, 2416468512832932723=-224228321843401347, -7913753694545619816=-224228321843401347, 2216548725421935911=-224228321843401347, 5545722913635347065=-224228321843401347, -8228450230436970384=5280243493335298546, 1069888987022072121=-224228321843401347, 6973864295393994913=3666185645087914158, -1038295836277073885=-224228321843401347, -7638682188827435429=3666185645087914158, 4389696495574006045=3666185645087914158, -4266159872746606489=3666185645087914158, 4712299375428287216=3666185645087914158, 3635266110827390635=5280243493335298546, 7564414229761366650=5280243493335298546}, [11311]]
26004 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - Execute done TUPLE source: slidingsum:4, stream: default, id: {317525622483766670=7555764236357609298, 3002989214851206566=1694128471165560315, 3562603101316339696=-9141305483820762660, -7547522651842339766=-2863953427390624123, 6875073760184740346=-5132920613823939509, -3948625081878221959=-5437427497122664614, -6197802534174832134=-5798652776913323486, 2558430164343979559=-3974124564791740629, 5827912698427451791=-8755182930855226192, 4334659838991266401=1875094829666965635, -3288150160972036797=2882654408400632661, -6632312692089791299=-7416978337353888496, 5660369989147374586=-146257236873007036, -4601633546221640614=314568010495525006, 7447250857071583863=-7894145466289122325, -6102906463311307032=-734768705874586731, 2416468512832932723=5395971769109383713, -7913753694545619816=-422366601723792798, 2216548725421935911=-1747022309515322275, 5545722913635347065=-4183811763737068653, -8228450230436970384=7072197569993326849, 1069888987022072121=1339914806148366726, 6973864295393994913=881160568155161730, -1038295836277073885=-6098822845524698690, -7638682188827435429=-1166341425454927490, 4389696495574006045=621900626234935483, -4266159872746606489=5786349201308905180, 4712299375428287216=7586810889173452616, 3635266110827390635=-4551921407959807206, 7564414229761366650=-2358375753654415983}, [15909] TASK: 5 DELTA: 
26006 [Thread-17-printer-executor[3 3]] INFO  o.a.s.d.executor - Processing received message FOR 3 TUPLE: source: tumblingavg:5, stream: default, id: {317525622483766670=5280243493335298546, 3002989214851206566=5280243493335298546, 3562603101316339696=5280243493335298546, -7547522651842339766=5280243493335298546, 6875073760184740346=-224228321843401347, -3948625081878221959=-224228321843401347, -6197802534174832134=3666185645087914158, 2558430164343979559=5280243493335298546, 5827912698427451791=-224228321843401347, 4334659838991266401=3666185645087914158, -3288150160972036797=5280243493335298546, -6632312692089791299=5280243493335298546, 5660369989147374586=-224228321843401347, -4601633546221640614=3666185645087914158, 7447250857071583863=3666185645087914158, -6102906463311307032=3666185645087914158, 2416468512832932723=-224228321843401347, -7913753694545619816=-224228321843401347, 2216548725421935911=-224228321843401347, 5545722913635347065=-224228321843401347, -8228450230436970384=5280243493335298546, 1069888987022072121=-224228321843401347, 6973864295393994913=3666185645087914158, -1038295836277073885=-224228321843401347, -7638682188827435429=3666185645087914158, 4389696495574006045=3666185645087914158, -4266159872746606489=3666185645087914158, 4712299375428287216=3666185645087914158, 3635266110827390635=5280243493335298546, 7564414229761366650=5280243493335298546}, [11311]
source: tumblingavg:5, stream: default, id: {317525622483766670=5280243493335298546, 3002989214851206566=5280243493335298546, 3562603101316339696=5280243493335298546, -7547522651842339766=5280243493335298546, 6875073760184740346=-224228321843401347, -3948625081878221959=-224228321843401347, -6197802534174832134=3666185645087914158, 2558430164343979559=5280243493335298546, 5827912698427451791=-224228321843401347, 4334659838991266401=3666185645087914158, -3288150160972036797=5280243493335298546, -6632312692089791299=5280243493335298546, 5660369989147374586=-224228321843401347, -4601633546221640614=3666185645087914158, 7447250857071583863=3666185645087914158, -6102906463311307032=3666185645087914158, 2416468512832932723=-224228321843401347, -7913753694545619816=-224228321843401347, 2216548725421935911=-224228321843401347, 5545722913635347065=-224228321843401347, -8228450230436970384=5280243493335298546, 1069888987022072121=-224228321843401347, 6973864295393994913=3666185645087914158, -1038295836277073885=-224228321843401347, -7638682188827435429=3666185645087914158, 4389696495574006045=3666185645087914158, -4266159872746606489=3666185645087914158, 4712299375428287216=3666185645087914158, 3635266110827390635=5280243493335298546, 7564414229761366650=5280243493335298546}, [11311]

(6050 + 11976 + 15909) / 3 = 11311.66666...ですので、小数点以下は切り捨てのようですね。11311がPrinter用のBoltに送付されました。

これで一通りの動きを確認できました。

Native Streaming Window APIを振り返る

さて、今回Native Streaming Window APIを実際に使ってみましたが、このAPIの最大の特徴は、これらのSpoutやBoltが自前で作れる、というところでしょうか。作り方は他のSpoutやBoltを参考にすれば、割と簡単に実装ができてしまうので、ある程度慣れてくれば、今までSiddhiなどを使ってWindow制御をしていたことなどが、Stormだけで実現可能になるかもしれませんね。

English translation

I was requested from a Storm user to translate this article.
Following contents are same meaning as above contents.

Topology Start

SlidingWindowTopology is used as a sample so I use it.

# bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.SlidingWindowTopology
SlidingWindowTopology
$ bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.SlidingWindowTopology
Running: /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/opt/apache-storm-1.0.0 -Dstorm.log.dir=/opt/apache-storm-1.0.0/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /opt/apache-storm-1.0.0/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.0/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.0/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.0/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.0/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.0/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.0/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.0/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.0/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.0/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.0/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.0/lib/storm-core-1.0.0.jar:/opt/apache-storm-1.0.0/lib/storm-rename-hack-1.0.0.jar:examples/storm-starter/storm-starter-topologies-1.0.0.jar:/opt/storm/conf:/opt/apache-storm-1.0.0/bin -Dstorm.jar=examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.SlidingWindowTopology
3712 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:host.name=192.168.10.100
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.version=1.8.0_66
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.vendor=Oracle Corporation
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.class.path=/opt/apache-storm-1.0.0/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.0/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.0/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.0/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.0/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.0/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.0/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.0/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.0/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.0/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.0/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.0/lib/storm-core-1.0.0.jar:/opt/apache-storm-1.0.0/lib/storm-rename-hack-1.0.0.jar:examples/storm-starter/storm-starter-topologies-1.0.0.jar:/opt/storm/conf:/opt/apache-storm-1.0.0/bin
3713 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.io.tmpdir=/var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:java.compiler=<NA>
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.name=Mac OS X
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.arch=x86_64
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:os.version=10.11.4
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.name=KojiIshida
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.home=/Users/KojiIshida
3714 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Server environment:user.dir=/opt/apache-storm-1.0.0
3736 [main] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/bd026224-c71e-4395-b96f-9ab3551b5c60/version-2 snapdir /var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/bd026224-c71e-4395-b96f-9ab3551b5c60/version-2
3760 [main] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - binding to port 0.0.0.0/0.0.0.0:2000
3766 [main] INFO  o.a.s.zookeeper - Starting inprocess zookeeper at port 2000 and dir /var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T//bd026224-c71e-4395-b96f-9ab3551b5c60
3944 [main] INFO  o.a.s.d.nimbus - Starting Nimbus with conf {"topology.builtin.metrics.bucket.size.secs" 60, "nimbus.childopts" "-Xmx1024m", "ui.filter.params" nil, "storm.cluster.mode" "local", "storm.messaging.netty.client_worker_threads" 1, "logviewer.max.per.worker.logs.size.mb" 2048, "supervisor.run.worker.as.user" false, "topology.max.task.parallelism" nil, "topology.priority" 29, "zmq.threads" 1, "storm.group.mapping.service" "org.apache.storm.security.auth.ShellBasedGroupsMapping", "transactional.zookeeper.root" "/transactional", "topology.sleep.spout.wait.strategy.time.ms" 1, "scheduler.display.resource" false, "topology.max.replication.wait.time.sec" 60, "drpc.invocations.port" 3773, "supervisor.localizer.cache.target.size.mb" 10240, "topology.multilang.serializer" "org.apache.storm.multilang.JsonSerializer", "storm.messaging.netty.server_worker_threads" 1, "nimbus.blobstore.class" "org.apache.storm.blobstore.LocalFsBlobStore", "resource.aware.scheduler.eviction.strategy" "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy", "topology.max.error.report.per.interval" 5, "storm.thrift.transport" "org.apache.storm.security.auth.SimpleTransportPlugin", "zmq.hwm" 0, "storm.group.mapping.service.params" nil, "worker.profiler.enabled" false, "storm.principal.tolocal" "org.apache.storm.security.auth.DefaultPrincipalToLocal", "supervisor.worker.shutdown.sleep.secs" 1, "pacemaker.host" "localhost", "storm.zookeeper.retry.times" 5, "ui.actions.enabled" true, "zmq.linger.millis" 0, "supervisor.enable" true, "topology.stats.sample.rate" 0.05, "storm.messaging.netty.min_wait_ms" 100, "worker.log.level.reset.poll.secs" 30, "storm.zookeeper.port" 2000, "supervisor.heartbeat.frequency.secs" 5, "topology.enable.message.timeouts" true, "supervisor.cpu.capacity" 400.0, "drpc.worker.threads" 64, "supervisor.blobstore.download.thread.count" 5, "drpc.queue.size" 128, "topology.backpressure.enable" true, "supervisor.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "storm.blobstore.inputstream.buffer.size.bytes" 65536, "topology.shellbolt.max.pending" 100, "drpc.https.keystore.password" "", "nimbus.code.sync.freq.secs" 120, "logviewer.port" 8000, "topology.scheduler.strategy" "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy", "topology.executor.send.buffer.size" 1024, "resource.aware.scheduler.priority.strategy" "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy", "pacemaker.auth.method" "NONE", "storm.daemon.metrics.reporter.plugins" ["org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"], "topology.worker.logwriter.childopts" "-Xmx64m", "topology.spout.wait.strategy" "org.apache.storm.spout.SleepSpoutWaitStrategy", "ui.host" "0.0.0.0", "storm.nimbus.retry.interval.millis" 2000, "nimbus.inbox.jar.expiration.secs" 3600, "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.acker.executors" nil, "topology.fall.back.on.java.serialization" true, "topology.eventlogger.executors" 0, "supervisor.localizer.cleanup.interval.ms" 600000, "storm.zookeeper.servers" ["localhost"], "nimbus.thrift.threads" 64, "logviewer.cleanup.age.mins" 10080, "topology.worker.childopts" nil, "topology.classpath" nil, "supervisor.monitor.frequency.secs" 3, "nimbus.credential.renewers.freq.secs" 600, "topology.skip.missing.kryo.registrations" true, "drpc.authorizer.acl.filename" "drpc-auth-acl.yaml", "pacemaker.kerberos.users" [], "storm.group.mapping.service.cache.duration.secs" 120, "topology.testing.always.try.serialize" false, "nimbus.monitor.freq.secs" 10, "storm.health.check.timeout.ms" 5000, "supervisor.supervisors" [], "topology.tasks" nil, "topology.bolts.outgoing.overflow.buffer.enable" false, "storm.messaging.netty.socket.backlog" 500, "topology.workers" 1, "pacemaker.base.threads" 10, "storm.local.dir" "/var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T//db374078-fd57-4aeb-b7df-537f90929bc4", "topology.disable.loadaware" false, "worker.childopts" "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump", "storm.auth.simple-white-list.users" [], "topology.disruptor.batch.timeout.millis" 1, "topology.message.timeout.secs" 30, "topology.state.synchronization.timeout.secs" 60, "topology.tuple.serializer" "org.apache.storm.serialization.types.ListDelegateSerializer", "supervisor.supervisors.commands" [], "nimbus.blobstore.expiration.secs" 600, "logviewer.childopts" "-Xmx128m", "topology.environment" nil, "topology.debug" false, "topology.disruptor.batch.size" 100, "storm.messaging.netty.max_retries" 300, "ui.childopts" "-Xmx768m", "storm.network.topography.plugin" "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping", "storm.zookeeper.session.timeout" 20000, "drpc.childopts" "-Xmx768m", "drpc.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.connection.timeout" 15000, "storm.zookeeper.auth.user" nil, "storm.meta.serialization.delegate" "org.apache.storm.serialization.GzipThriftSerializationDelegate", "topology.max.spout.pending" nil, "storm.codedistributor.class" "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor", "nimbus.supervisor.timeout.secs" 60, "nimbus.task.timeout.secs" 30, "drpc.port" 3772, "pacemaker.max.threads" 50, "storm.zookeeper.retry.intervalceiling.millis" 30000, "nimbus.thrift.port" 6627, "storm.auth.simple-acl.admins" [], "topology.component.cpu.pcore.percent" 10.0, "supervisor.memory.capacity.mb" 3072.0, "storm.nimbus.retry.times" 5, "supervisor.worker.start.timeout.secs" 120, "storm.zookeeper.retry.interval" 1000, "logs.users" nil, "worker.profiler.command" "flight.bash", "transactional.zookeeper.port" nil, "drpc.max_buffer_size" 1048576, "pacemaker.thread.timeout" 10, "task.credentials.poll.secs" 30, "blobstore.superuser" "KojiIshida", "drpc.https.keystore.type" "JKS", "topology.worker.receiver.thread.count" 1, "topology.state.checkpoint.interval.ms" 1000, "supervisor.slots.ports" [6700 6701 6702 6703], "topology.transfer.buffer.size" 1024, "storm.health.check.dir" "healthchecks", "topology.worker.shared.thread.pool.size" 4, "drpc.authorizer.acl.strict" false, "nimbus.file.copy.expiration.secs" 600, "worker.profiler.childopts" "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder", "topology.executor.receive.buffer.size" 1024, "backpressure.disruptor.low.watermark" 0.4, "nimbus.task.launch.secs" 120, "storm.local.mode.zmq" false, "storm.messaging.netty.buffer_size" 5242880, "storm.cluster.state.store" "org.apache.storm.cluster_state.zookeeper_state_factory", "worker.heartbeat.frequency.secs" 1, "storm.log4j2.conf.dir" "log4j2", "ui.http.creds.plugin" "org.apache.storm.security.auth.DefaultHttpCredentialsPlugin", "storm.zookeeper.root" "/storm", "topology.tick.tuple.freq.secs" nil, "drpc.https.port" -1, "storm.workers.artifacts.dir" "workers-artifacts", "supervisor.blobstore.download.max_retries" 3, "task.refresh.poll.secs" 10, "storm.exhibitor.port" 8080, "task.heartbeat.frequency.secs" 3, "pacemaker.port" 6699, "storm.messaging.netty.max_wait_ms" 1000, "nimbus.impersonation.authorizer" "org.apache.storm.security.auth.authorizer.ImpersonationAuthorizer", "topology.component.resources.offheap.memory.mb" 0.0, "drpc.http.port" 3774, "topology.error.throttle.interval.secs" 10, "storm.messaging.transport" "org.apache.storm.messaging.netty.Context", "storm.messaging.netty.authentication" false, "topology.component.resources.onheap.memory.mb" 128.0, "topology.kryo.factory" "org.apache.storm.serialization.DefaultKryoFactory", "worker.gc.childopts" "", "nimbus.topology.validator" "org.apache.storm.nimbus.DefaultTopologyValidator", "nimbus.seeds" ["0.0.0.0"], "nimbus.queue.size" 100000, "nimbus.cleanup.inbox.freq.secs" 600, "storm.blobstore.replication.factor" 3, "worker.heap.memory.mb" 768, "logviewer.max.sum.worker.logs.size.mb" 4096, "pacemaker.childopts" "-Xmx1024m", "ui.users" nil, "transactional.zookeeper.servers" nil, "supervisor.worker.timeout.secs" 30, "storm.zookeeper.auth.password" nil, "client.blobstore.class" "org.apache.storm.blobstore.NimbusBlobStore", "supervisor.childopts" "-Xmx256m", "topology.worker.max.heap.size.mb" 768.0, "backpressure.disruptor.high.watermark" 0.9, "ui.filter" nil, "ui.header.buffer.bytes" 4096, "topology.min.replication.count" 1, "topology.disruptor.wait.timeout.millis" 1000, "storm.nimbus.retry.intervalceiling.millis" 60000, "topology.trident.batch.emit.interval.millis" 50, "storm.auth.simple-acl.users" [], "drpc.invocations.threads" 64, "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "ui.port" 8080, "storm.exhibitor.poll.uripath" "/exhibitor/v1/cluster/list", "storm.messaging.netty.transfer.batch.size" 262144, "logviewer.appender.name" "A1", "nimbus.thrift.max_buffer_size" 1048576, "storm.auth.simple-acl.users.commands" [], "drpc.request.timeout.secs" 600}
3948 [main] INFO  o.a.s.d.nimbus - Using default scheduler
3999 [main] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:host.name=192.168.10.100
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.version=1.8.0_66
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.vendor=Oracle Corporation
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.class.path=/opt/apache-storm-1.0.0/lib/asm-5.0.3.jar:/opt/apache-storm-1.0.0/lib/clojure-1.7.0.jar:/opt/apache-storm-1.0.0/lib/disruptor-3.3.2.jar:/opt/apache-storm-1.0.0/lib/kryo-3.0.3.jar:/opt/apache-storm-1.0.0/lib/log4j-api-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-core-2.1.jar:/opt/apache-storm-1.0.0/lib/log4j-over-slf4j-1.6.6.jar:/opt/apache-storm-1.0.0/lib/log4j-slf4j-impl-2.1.jar:/opt/apache-storm-1.0.0/lib/minlog-1.3.0.jar:/opt/apache-storm-1.0.0/lib/objenesis-2.1.jar:/opt/apache-storm-1.0.0/lib/reflectasm-1.10.1.jar:/opt/apache-storm-1.0.0/lib/servlet-api-2.5.jar:/opt/apache-storm-1.0.0/lib/slf4j-api-1.7.7.jar:/opt/apache-storm-1.0.0/lib/storm-core-1.0.0.jar:/opt/apache-storm-1.0.0/lib/storm-rename-hack-1.0.0.jar:examples/storm-starter/storm-starter-topologies-1.0.0.jar:/opt/storm/conf:/opt/apache-storm-1.0.0/bin
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:java.compiler=<NA>
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:os.name=Mac OS X
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:os.arch=x86_64
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:os.version=10.11.4
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:user.name=KojiIshida
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:user.home=/Users/KojiIshida
4004 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Client environment:user.dir=/opt/apache-storm-1.0.0
4005 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@77c3c037
4031 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
4047 [main] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4048 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000/storm sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@52226e57
4050 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2000. Will not attempt to authenticate using SASL (unknown error)
4055 [main] INFO  o.a.s.b.FileBlobStoreImpl - Creating new blob store based in /var/folders/f2/dn55vt054w5djx70_r7nq1sr0000gn/T/db374078-fd57-4aeb-b7df-537f90929bc4/blobs
4104 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
4106 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60125
4108 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/127.0.0.1:2000, initiating session
4110 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /127.0.0.1:60126
4111 [main] INFO  o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - Starting
4112 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60125
4113 [main] INFO  o.a.s.s.o.a.z.ZooKeeper - Initiating client connection, connectString=localhost:2000 sessionTimeout=20000 watcher=org.apache.storm.shade.org.apache.curator.ConnectionState@68b734a8
4114 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /127.0.0.1:60126
4115 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.p.FileTxnLog - Creating new log file: log.1
4117 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2000. Will not attempt to authenticate using SASL (unknown error)
4118 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Socket connection established to localhost/0:0:0:0:0:0:0:1:2000, initiating session
4118 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Accepted socket connection from /0:0:0:0:0:0:0:1:60127
4119 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2000] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Client attempting to establish new session at /0:0:0:0:0:0:0:1:60127
4153 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154488b21500000 with negotiated timeout 20000 for client /127.0.0.1:60125
4153 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x154488b21500000, negotiated timeout = 20000
4155 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154488b21500001 with negotiated timeout 20000 for client /127.0.0.1:60126
4155 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x154488b21500001, negotiated timeout = 20000
4157 [SyncThread:0] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x154488b21500002 with negotiated timeout 20000 for client /0:0:0:0:0:0:0:1:60127
4157 [main-SendThread(localhost:2000)] INFO  o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2000, sessionid = 0x154488b21500002, negotiated timeout = 20000
4158 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
4158 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
4158 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
4177 [main-EventThread] INFO  o.a.s.zookeeper - Zookeeper state update: :connected:none
4177 [main-EventThread] INFO  o.a.s.zookeeper - Zookeeper state update: :connected:none

A massive log follows, continuing…

Will be looking up at the source since I have no idea what’s happened.

Looking at #slidingwindowtopology.java

Looks like I found something here…

SlidingWindowTopology.java
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("integer", new RandomIntegerSpout(), 1);
        builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
                .shuffleGrouping("integer");
        builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
                .shuffleGrouping("slidingsum");
        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(40000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }

The following three bolts are configured inside main method.

  • SlidingWindowSumBold
  • TumblingWindowAvgBolt
  • PrinterBolt

Seems there are a Bolt to calculate the total value, and another Bolt to calculate the average and another one for outputting results.

Check the sources of each bolts, confirm what they’re doing and looking again the logs, we can understand more.

Beforehand confirming RandomIntegerSpout

RandomIntegerSpout.java
    @Override
    public void nextTuple() {
        Utils.sleep(100);
        collector.emit(new Values(rand.nextInt(1000), System.currentTimeMillis() - (24 * 60 * 60 * 1000), ++msgId), msgId);
    }

Pretty simple actually. It just generates a random number (0~999).

Examining source of SlidingWindowSumBolt

SlidingWindowSumBolt.java
    @Override
    public void execute(TupleWindow inputWindow) {
            /*
             * The inputWindow gives a view of
             * (a) all the events in the window
             * (b) events that expired since last activation of the window
             * (c) events that newly arrived since last activation of the window
             */
        List<Tuple> tuplesInWindow = inputWindow.get();
        List<Tuple> newTuples = inputWindow.getNew();
        List<Tuple> expiredTuples = inputWindow.getExpired();

        LOG.debug("Events in current window: " + tuplesInWindow.size());
            /*
             * Instead of iterating over all the tuples in the window to compute
             * the sum, the values for the new events are added and old events are
             * subtracted. Similar optimizations might be possible in other
             * windowing computations.
             */
        for (Tuple tuple : newTuples) {
            sum += (int) tuple.getValue(0);
        }
        for (Tuple tuple : expiredTuples) {
            sum -= (int) tuple.getValue(0);
        }
        collector.emit(new Values(sum));
    }

The values from received tuples are being summarized but values from expired tuples are being subtracted. So this part should be related with SlidingWindow. The source itself was simple enough.

Examining the source of TumblingWindowAvgBolt

Next is TumblingWindowAvgBolt which is included in the SlidingWIndowTopology.

TumblingWindowAvgBolt.java
        @Override
        public void execute(TupleWindow inputWindow) {
            int sum = 0;
            List<Tuple> tuplesInWindow = inputWindow.get();
            LOG.debug("Events in current window: " + tuplesInWindow.size());
            if (tuplesInWindow.size() > 0) {
                /*
                * Since this is a tumbling window calculation,
                * we use all the tuples in the window to compute the avg.
                */
                for (Tuple tuple : tuplesInWindow) {
                    sum += (int) tuple.getValue(0);
                }
                collector.emit(new Values(sum / tuplesInWindow.size()));
            }
        }

Depending on the size of the Window, the average value of the tuples from inputWindow is output. By the way, since count “3” is configured inside topology main method for this bolt, after three times of summing up, the avg value will be output.

Examining source of PrinterBolt

Pretty much simple. I don't need to explain at all.

PrinterBolt.java
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    System.out.println(tuple);
  }

Year, very simple.

Summarizing the flow of this topology

The sample should work in following steps.
1. Generates random number from 0~999.
2. Sum up up to 10 times from the numerical values of the received tuple.
3. Send total value to next bolt for calculating average value.
4. Accumulating the total value for three times, then calculate the average value.

Confirming output logs

After executing sample program, confirm output logs. According to main method, it takes 40 sec to finish executing.

22857 [Thread-21-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Preparing bolt __system:(-1)
22864 [Thread-21-__system-executor[-1 -1]] INFO  o.a.s.d.executor - Prepared bolt __system:(-1)
22874 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - Opening spout integer:(2)
22878 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - Opened spout integer:(2)
22881 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - Activating spout integer:(2)
22887 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - Preparing bolt tumblingavg:(5)
22891 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - Prepared bolt tumblingavg:(5)
22893 [Thread-17-printer-executor[3 3]] INFO  o.a.s.d.executor - Preparing bolt printer:(3)
22893 [Thread-17-printer-executor[3 3]] INFO  o.a.s.d.executor - Prepared bolt printer:(3)
22900 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Preparing bolt slidingsum:(4)
22901 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Prepared bolt slidingsum:(4)
22950 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Preparing bolt __acker:(1)
22952 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Prepared bolt __acker:(1)

Here, Spout and Bolt that we implement firstly seem to be initialized. Numbers which is configured to each components are important to check behavior later.

22988 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.task - Emitting: integer default [778, 1462006953163, 1]
22994 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 4 tuple: source: integer:2, stream: default, id: {7447250857071583863=-3578654789271241288}, [778, 1462006953163, 1]]
22995 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.task - Emitting: integer __ack_init [7447250857071583863 -3578654789271241288 2]
22996 [Thread-15-integer-executor[2 2]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 1 tuple: source: integer:2, stream: __ack_init, id: {}, [7447250857071583863 -3578654789271241288 2]]
22997 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Processing received message FOR 4 TUPLE: source: integer:2, stream: default, id: {7447250857071583863=-3578654789271241288}, [778, 1462006953163, 1]
22998 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: default, id: {7447250857071583863=-3578654789271241288}, [778, 1462006953163, 1] TASK: 4 DELTA: 
22999 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Processing received message FOR 1 TUPLE: source: integer:2, stream: __ack_init, id: {}, [7447250857071583863 -3578654789271241288 2]
23000 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: integer:2, stream: __ack_init, id: {}, [7447250857071583863 -3578654789271241288 2]

Well, you can understand random numbers were created and emitted. At first the program sent [778, 1462006953163, 1], and also at next line

TRANSFERING tuple [dest: 4 tuple: source: integer:2, stream: default, id: {7447250857071583863=-3578654789271241288}, [778, 1462006953163, 1]]

Above log was output. That means 2nd component "integer" sent data to 4th component "slidingsum". The IDs initialized firstly were used here. When continuing these processes until 10th. After that output logs pattern was changed.

23921 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Processing received message FOR 4 TUPLE: source: integer:2, stream: default, id: {4389696495574006045=2681496747949810702}, [645, 1462006954095, 10]
23921 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Processing received message FOR 1 TUPLE: source: integer:2, stream: __ack_init, id: {}, [4389696495574006045 2681496747949810702 2]
23921 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: integer:2, stream: __ack_init, id: {}, [4389696495574006045 2681496747949810702 2]
23921 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: __ack_init, id: {}, [4389696495574006045 2681496747949810702 2] TASK: 1 DELTA: 
23922 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [6050]
23923 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {-4601633546221640614=-1172784951702062669, 6973864295393994913=1708061666379587936, 7447250857071583863=5623972221711691701, -6102906463311307032=5518334604031423874, -7638682188827435429=4478428313701290490, 4389696495574006045=5932148635671668893, -6197802534174832134=1497464440461667439, -4266159872746606489=3561774369052512930, 4334659838991266401=-7106335497305084820, 4712299375428287216=3508884140400825778}, [6050]]

Total number "6050" was sent to 5th component "tumblingavg". 6050 was stored. This process was done 3 times so check 2nd and 3rd time's sending data.

24961 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [11976]
24961 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {-4601633546221640614=492010799657169784, 317525622483766670=2638127789448427461, 3002989214851206566=128002525292759865, 7447250857071583863=8767372787937047960, -6102906463311307032=-4546730023383659346, 3562603101316339696=-7385522463102788783, -7547522651842339766=4915704745663260957, -8228450230436970384=-4055518290977434481, 6973864295393994913=6291843886548489067, -7638682188827435429=8441873508646088204, 4389696495574006045=-1674211243428697808, -6197802534174832134=2733983747371624992, 2558430164343979559=285379101380278186, -4266159872746606489=-6678113015319323641, 4334659838991266401=999638572688010822, 4712299375428287216=-2506689729719803634, -3288150160972036797=6163930983830697438, -6632312692089791299=4450995360647585394, 3635266110827390635=325375251356977723, 7564414229761366650=126552843783268400}, [11976]]
:
:
26001 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [15909]
26001 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - BOLT ack TASK: 1 TIME:  TUPLE: source: integer:2, stream: __ack_init, id: {}, [-3948625081878221959 -8424958756548618666 2]
26001 [Thread-19-__acker-executor[1 1]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: __ack_init, id: {}, [-3948625081878221959 -8424958756548618666 2] TASK: 1 DELTA: 1
26001 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {317525622483766670=7555764236357609298, 3002989214851206566=1694128471165560315, 3562603101316339696=-9141305483820762660, -7547522651842339766=-2863953427390624123, 6875073760184740346=-5132920613823939509, -3948625081878221959=-5437427497122664614, -6197802534174832134=-5798652776913323486, 2558430164343979559=-3974124564791740629, 5827912698427451791=-8755182930855226192, 4334659838991266401=1875094829666965635, -3288150160972036797=2882654408400632661, -6632312692089791299=-7416978337353888496, 5660369989147374586=-146257236873007036, -4601633546221640614=314568010495525006, 7447250857071583863=-7894145466289122325, -6102906463311307032=-734768705874586731, 2416468512832932723=5395971769109383713, -7913753694545619816=-422366601723792798, 2216548725421935911=-1747022309515322275, 5545722913635347065=-4183811763737068653, -8228450230436970384=7072197569993326849, 1069888987022072121=1339914806148366726, 6973864295393994913=881160568155161730, -1038295836277073885=-6098822845524698690, -7638682188827435429=-1166341425454927490, 4389696495574006045=621900626234935483, -4266159872746606489=5786349201308905180, 4712299375428287216=7586810889173452616, 3635266110827390635=-4551921407959807206, 7564414229761366650=-2358375753654415983}, [15909]]
26001 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: default, id: {-3948625081878221959=-8424958756548618666}, [34, 1462006956174, 30] TASK: 4 DELTA: 
26003 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - Processing received message FOR 5 TUPLE: source: slidingsum:4, stream: default, id: {317525622483766670=7555764236357609298, 3002989214851206566=1694128471165560315, 3562603101316339696=-9141305483820762660, -7547522651842339766=-2863953427390624123, 6875073760184740346=-5132920613823939509, -3948625081878221959=-5437427497122664614, -6197802534174832134=-5798652776913323486, 2558430164343979559=-3974124564791740629, 5827912698427451791=-8755182930855226192, 4334659838991266401=1875094829666965635, -3288150160972036797=2882654408400632661, -6632312692089791299=-7416978337353888496, 5660369989147374586=-146257236873007036, -4601633546221640614=314568010495525006, 7447250857071583863=-7894145466289122325, -6102906463311307032=-734768705874586731, 2416468512832932723=5395971769109383713, -7913753694545619816=-422366601723792798, 2216548725421935911=-1747022309515322275, 5545722913635347065=-4183811763737068653, -8228450230436970384=7072197569993326849, 1069888987022072121=1339914806148366726, 6973864295393994913=881160568155161730, -1038295836277073885=-6098822845524698690, -7638682188827435429=-1166341425454927490, 4389696495574006045=621900626234935483, -4266159872746606489=5786349201308905180, 4712299375428287216=7586810889173452616, 3635266110827390635=-4551921407959807206, 7564414229761366650=-2358375753654415983}, [15909]

20th data was 11976 and 30th data was 15909. Target ID was sent together so log length became long and long (^^; Well, we can get 3 total data so average will be calculated.

26003 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.task - Emitting: tumblingavg default [11311]
26003 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 3 tuple: source: tumblingavg:5, stream: default, id: {317525622483766670=5280243493335298546, 3002989214851206566=5280243493335298546, 3562603101316339696=5280243493335298546, -7547522651842339766=5280243493335298546, 6875073760184740346=-224228321843401347, -3948625081878221959=-224228321843401347, -6197802534174832134=3666185645087914158, 2558430164343979559=5280243493335298546, 5827912698427451791=-224228321843401347, 4334659838991266401=3666185645087914158, -3288150160972036797=5280243493335298546, -6632312692089791299=5280243493335298546, 5660369989147374586=-224228321843401347, -4601633546221640614=3666185645087914158, 7447250857071583863=3666185645087914158, -6102906463311307032=3666185645087914158, 2416468512832932723=-224228321843401347, -7913753694545619816=-224228321843401347, 2216548725421935911=-224228321843401347, 5545722913635347065=-224228321843401347, -8228450230436970384=5280243493335298546, 1069888987022072121=-224228321843401347, 6973864295393994913=3666185645087914158, -1038295836277073885=-224228321843401347, -7638682188827435429=3666185645087914158, 4389696495574006045=3666185645087914158, -4266159872746606489=3666185645087914158, 4712299375428287216=3666185645087914158, 3635266110827390635=5280243493335298546, 7564414229761366650=5280243493335298546}, [11311]]
26004 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.executor - Execute done TUPLE source: slidingsum:4, stream: default, id: {317525622483766670=7555764236357609298, 3002989214851206566=1694128471165560315, 3562603101316339696=-9141305483820762660, -7547522651842339766=-2863953427390624123, 6875073760184740346=-5132920613823939509, -3948625081878221959=-5437427497122664614, -6197802534174832134=-5798652776913323486, 2558430164343979559=-3974124564791740629, 5827912698427451791=-8755182930855226192, 4334659838991266401=1875094829666965635, -3288150160972036797=2882654408400632661, -6632312692089791299=-7416978337353888496, 5660369989147374586=-146257236873007036, -4601633546221640614=314568010495525006, 7447250857071583863=-7894145466289122325, -6102906463311307032=-734768705874586731, 2416468512832932723=5395971769109383713, -7913753694545619816=-422366601723792798, 2216548725421935911=-1747022309515322275, 5545722913635347065=-4183811763737068653, -8228450230436970384=7072197569993326849, 1069888987022072121=1339914806148366726, 6973864295393994913=881160568155161730, -1038295836277073885=-6098822845524698690, -7638682188827435429=-1166341425454927490, 4389696495574006045=621900626234935483, -4266159872746606489=5786349201308905180, 4712299375428287216=7586810889173452616, 3635266110827390635=-4551921407959807206, 7564414229761366650=-2358375753654415983}, [15909] TASK: 5 DELTA: 
26006 [Thread-17-printer-executor[3 3]] INFO  o.a.s.d.executor - Processing received message FOR 3 TUPLE: source: tumblingavg:5, stream: default, id: {317525622483766670=5280243493335298546, 3002989214851206566=5280243493335298546, 3562603101316339696=5280243493335298546, -7547522651842339766=5280243493335298546, 6875073760184740346=-224228321843401347, -3948625081878221959=-224228321843401347, -6197802534174832134=3666185645087914158, 2558430164343979559=5280243493335298546, 5827912698427451791=-224228321843401347, 4334659838991266401=3666185645087914158, -3288150160972036797=5280243493335298546, -6632312692089791299=5280243493335298546, 5660369989147374586=-224228321843401347, -4601633546221640614=3666185645087914158, 7447250857071583863=3666185645087914158, -6102906463311307032=3666185645087914158, 2416468512832932723=-224228321843401347, -7913753694545619816=-224228321843401347, 2216548725421935911=-224228321843401347, 5545722913635347065=-224228321843401347, -8228450230436970384=5280243493335298546, 1069888987022072121=-224228321843401347, 6973864295393994913=3666185645087914158, -1038295836277073885=-224228321843401347, -7638682188827435429=3666185645087914158, 4389696495574006045=3666185645087914158, -4266159872746606489=3666185645087914158, 4712299375428287216=3666185645087914158, 3635266110827390635=5280243493335298546, 7564414229761366650=5280243493335298546}, [11311]
source: tumblingavg:5, stream: default, id: {317525622483766670=5280243493335298546, 3002989214851206566=5280243493335298546, 3562603101316339696=5280243493335298546, -7547522651842339766=5280243493335298546, 6875073760184740346=-224228321843401347, -3948625081878221959=-224228321843401347, -6197802534174832134=3666185645087914158, 2558430164343979559=5280243493335298546, 5827912698427451791=-224228321843401347, 4334659838991266401=3666185645087914158, -3288150160972036797=5280243493335298546, -6632312692089791299=5280243493335298546, 5660369989147374586=-224228321843401347, -4601633546221640614=3666185645087914158, 7447250857071583863=3666185645087914158, -6102906463311307032=3666185645087914158, 2416468512832932723=-224228321843401347, -7913753694545619816=-224228321843401347, 2216548725421935911=-224228321843401347, 5545722913635347065=-224228321843401347, -8228450230436970384=5280243493335298546, 1069888987022072121=-224228321843401347, 6973864295393994913=3666185645087914158, -1038295836277073885=-224228321843401347, -7638682188827435429=3666185645087914158, 4389696495574006045=3666185645087914158, -4266159872746606489=3666185645087914158, 4712299375428287216=3666185645087914158, 3635266110827390635=5280243493335298546, 7564414229761366650=5280243493335298546}, [11311]

(6050 + 11976 + 15909) / 3 = 11311.66666... So the decimal point seems to be suppressed. 11131 was sent to Printer Bolt.

Then we could confirm all the behavior of sample program.

  • Review Native Streaming Window API I used Native Streaming Window API and I understood the most biggest features for this API is that we can develop original Spout and Bolt as we like. For development way you can refer another Spout and Bolt. If we can be familiar with development of this API, we don't need to use CEP libraries like "Siddhi", we can just implement that controlling with Storm only.
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1