2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

The Basics of Kafka Streams

Posted at

What is Kafka Streams?

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.

What are the input and output of Kafka Streams?

Input: Kafka topics

Output: Kafka topics

image.png

Concepts of Kafka Streams

You can refer to https://kafka.apache.org/27/documentation/streams/core-concepts#streams_topology

processor topologies

a processor topology is a graph of stream processors (nodes) that are connected by streams (edges)

  • processor topologies help us to understand how data is processed

stream

A stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.

  • Sequence of immutable data records defined as a key-value pair

  • Ordered

  • Fault-tolerant

stream processor

A stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.

  • Receive one input record at a time from its upstream processors

  • Transform data

  • Produce one or more output records to its downstream processors

There are two special processors in the topology:

Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.

Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.

image.png

How do Kafka Streams process data records?

You can refer to https://kafka.apache.org/27/documentation/streams/core-concepts#streams_topology.

Kafka Streams offers two ways to define the stream processing topology: the Kafka Streams DSL provides the most common data transformation operations such as map, filter, join and aggregations out of the box; the lower-level Processor API allows developers define and connect custom processors as well as to interact with state stores.

Kafka Streams DSL

You can refer to https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html.

The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.

In Kafka Streams DSL, there are two ways (KStream and KTable) to have data from Kafka topic.

KStream

  • A record stream is always interpreted as an "INSERT" 

  • No record replaces an existing row with the same key

image.png

KTable

  • the value in a data record is interpreted as an "UPDATE" of the last value for the same record key

  • a record with a null value represents a "DELETE" or tombstone for the record's key.

image.png

The duality of Streams and Tables

Can we only use KStream or KTable for streaming processing?

→ The answer is NO. We can convert KStream into KTable and vice versa.

The stream-table duality describes the close relationship between streams and tables.

Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively).

Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table.

Summary

  • A stream can be considered a changelog of a table
  • A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream

How can we convert KTable into KStream?

image.png

How can we convert KStream into KTable?

image.png

As you can see in the above tables, we can convert KStream into KTable and vice versa!

Try on local

Set up Kafka cluster

You can use https://lenses.io/ to set up Kafka cluster in an easy way.

> docker run -d -e ADV_HOST=127.0.0.1 \
  -e EULA="https://licenses.lenses.io/download/lensesdl?id=<your id>" \
  --rm -p 3030:3030 -p 9092:9092 -p 2181:2181 lensesio/box
c268c46791347d908db6ee34d51ea2c240e9c4fcd80964c963ac4eb795f9b59b

> kafka-topics --zookeeper localhost:2181 --list
__consumer_offsets
__topology
__topology__metrics
_schemas
backblaze_smart
cc_data
cc_payments
connect-configs
connect-offsets
connect-statuses
fast_vessel_processor
financial_tweets
nyc_yellow_taxi_trip_data
sea_vessel_position_reports
telecom_italia_data
telecom_italia_grid

> kafka-topics --zookeeper localhost:2181 --topic word-count-input --replication-factor 1 --partitions 2 --create
Created topic word-count-input.

> kafka-topics --zookeeper localhost:2181 --topic word-count-output --replication-factor 1 --partitions 2 --create
Created topic word-count-output.

> kafka-topics --zookeeper localhost:2181 --list
__consumer_offsets
__topology
__topology__metrics
_schemas
backblaze_smart
cc_data
cc_payments
connect-configs
connect-offsets
connect-statuses
fast_vessel_processor
financial_tweets
logs_broker
nyc_yellow_taxi_trip_data
sea_vessel_position_reports
telecom_italia_data
telecom_italia_grid
word-count-input
word-count-output

Set up Kafka Streams application

This is a sample application to count words in Kafka topic.

> git clone git@github.com:k3forx/word-count-streams.git
Cloning into 'word-count-streams'...
Enter passphrase for key '/Users/Kanata-Miyahana/.ssh/github_kanata':
remote: Enumerating objects: 19, done.
remote: Counting objects: 100% (19/19), done.
remote: Compressing objects: 100% (8/8), done.
remote: Total 19 (delta 0), reused 16 (delta 0), pack-reused 0
Receiving objects: 100% (19/19), done.

> cd word-count-streams

> mvn clean package
[INFO] Scanning for projects...
[INFO]
[INFO] ------< com.github.k3forx.kafka.streams:stereams-starter-project >------
[INFO] Building stereams-starter-project 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ stereams-starter-project ---
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ stereams-starter-project ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.6.1:compile (default-compile) @ stereams-starter-project ---
[INFO] Changes detected - recompiling the module!
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 1 source file to /Users/kanata-miyahana/program/kafka/word-count-streams/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ stereams-starter-project ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/kanata-miyahana/program/kafka/word-count-streams/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.6.1:testCompile (default-testCompile) @ stereams-starter-project ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ stereams-starter-project ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ stereams-starter-project ---
[INFO] Building jar: /Users/kanata-miyahana/program/kafka/word-count-streams/target/stereams-starter-project-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-assembly-plugin:2.6:single (assemble-all) @ stereams-starter-project ---
[INFO] Building jar: /Users/kanata-miyahana/program/kafka/word-count-streams/target/stereams-starter-project-1.0-SNAPSHOT-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  7.227 s
[INFO] Finished at: 2021-02-18T22:19:35+09:00
[INFO] ------------------------------------------------------------------------

> java -jar target/stereams-starter-project-1.0-SNAPSHOT-jar-with-dependencies.jar
INFO StreamsConfig values:
        application.id = wordcount-application
        application.server =
        bootstrap.servers = [localhost:9092]
        buffered.records.per.partition = 1000
        cache.max.bytes.buffering = 10485760
        client.id =
        commit.interval.ms = 30000
        connections.max.idle.ms = 540000
        default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
        default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
        default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
        key.serde = null
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        num.standby.replicas = 0
        num.stream.threads = 1
        partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
        poll.ms = 100
        processing.guarantee = at_least_once
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        replication.factor = 1
        request.timeout.ms = 40000
        retry.backoff.ms = 100
        rocksdb.config.setter = null
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        state.cleanup.delay.ms = 600000
        state.dir = /tmp/kafka-streams
        timestamp.extractor = null
        value.serde = null
        windowstore.changelog.additional.retention.ms = 86400000
        zookeeper.connect =
 (org.apache.kafka.streams.StreamsConfig:223)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Creating consumer client (org.apache.kafka.streams.processor.internals.StreamThread:473)
INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1-consumer
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = wordcount-application
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        internal.leave.group.on.close = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 2147483647
        max.poll.records = 1000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Creating restore consumer client (org.apache.kafka.streams.processor.internals.StreamThread:483)
INFO ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1-restore-consumer
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id =
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        internal.leave.group.on.close = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 2147483647
        max.poll.records = 1000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] State transition from CREATED to RUNNING. (org.apache.kafka.streams.processor.internals.StreamThread:980)
INFO stream-client [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa] State transition from CREATED to RUNNING. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-client [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa] Started Kafka Stream process (org.apache.kafka.streams.KafkaStreams:453)
KafkaStreams processID: c3360652-4348-4225-83bd-3623ddf818fa
        StreamsThread appId: wordcount-application
                StreamsThread clientId: wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa
                StreamsThread threadId: wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1
                Active tasks:
                Standby tasks:



INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Starting (org.apache.kafka.streams.processor.internals.StreamThread:523)
INFO Discovered coordinator 127.0.0.1:9092 (id: 2147483546 rack: null) for group wordcount-application. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
INFO Revoking previously assigned partitions [] for group wordcount-application (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance.
        current assigned active tasks: []
        current assigned standby tasks: []
 (org.apache.kafka.streams.processor.internals.StreamThread:205)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED. (org.apache.kafka.streams.processor.internals.StreamThread:980)
INFO stream-client [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa] State transition from RUNNING to REBALANCING. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Updating suspended tasks to contain active tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1400)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Removing all active tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1407)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Removing all standby tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1421)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] partition revocation took 1 ms.
        suspended active tasks: []
        suspended standby tasks: []
        previous active tasks: []
 (org.apache.kafka.streams.processor.internals.StreamThread:227)
INFO (Re-)joining group wordcount-application (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Constructed client metadata {c3360652-4348-4225-83bd-3623ddf818fa=ClientMetadata{hostInfo=null, consumers=[wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1-consumer-5f77c9d4-0415-4d77-800c-b459311c10bb], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member subscriptions. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:316)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Completed validating internal topics in partition assignor (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:672)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Completed validating internal topics in partition assignor (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:672)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Assigned tasks to clients as {c3360652-4348-4225-83bd-3623ddf818fa=[activeTasks: ([0_0, 0_1, 1_0, 1_1]) standbyTasks: ([]) assignedTasks: ([0_0, 0_1, 1_0, 1_1]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:493)
INFO The following not-subscribed topics are assigned to group wordcount-application, and their metadata will be fetched from the brokers : [wordcount-application-Counts-repartition] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:390)
INFO Successfully joined group wordcount-application with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
INFO Setting newly assigned partitions [wordcount-application-Counts-repartition-0, wordcount-application-Counts-repartition-1, word-count-input-0, word-count-input-1] for group wordcount-application (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] at state PARTITIONS_REVOKED: new partitions [wordcount-application-Counts-repartition-0, wordcount-application-Counts-repartition-1, word-count-input-0, word-count-input-1] assigned at the end of consumer rebalance.
        assigned active tasks: [0_0, 0_1, 1_0, 1_1]
        assigned standby tasks: []
        current suspended active tasks: []
        current suspended standby tasks: []
        previous active tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread:160)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. (org.apache.kafka.streams.processor.internals.StreamThread:980)
INFO stream-client [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa] State transition from REBALANCING to REBALANCING. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Adding assigned tasks as active {0_0=[word-count-input-0], 0_1=[word-count-input-1], 1_0=[wordcount-application-Counts-repartition-0], 1_1=[wordcount-application-Counts-repartition-1]} (org.apache.kafka.streams.processor.internals.StreamThread:1280)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Creating active task 0_0 with assigned partitions [[word-count-input-0]] (org.apache.kafka.streams.processor.internals.StreamThread:1229)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Creating shared producer client (org.apache.kafka.streams.processor.internals.StreamThread:1263)
INFO ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.id = wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1-producer
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 100
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 10
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig:223)
INFO Kafka version : 0.11.0.0 (org.apache.kafka.common.utils.AppInfoParser:83)
INFO Kafka commitId : cb8625948210849f (org.apache.kafka.common.utils.AppInfoParser:84)
INFO task [0_0] Created state store manager for task 0_0 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122)
INFO task [0_0] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Created active task 0_0 with assigned partitions [word-count-input-0] (org.apache.kafka.streams.processor.internals.StreamThread:1248)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Creating active task 0_1 with assigned partitions [[word-count-input-1]] (org.apache.kafka.streams.processor.internals.StreamThread:1229)
INFO task [0_1] Created state store manager for task 0_1 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122)
INFO task [0_1] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Created active task 0_1 with assigned partitions [word-count-input-1] (org.apache.kafka.streams.processor.internals.StreamThread:1248)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Creating active task 1_0 with assigned partitions [[wordcount-application-Counts-repartition-0]] (org.apache.kafka.streams.processor.internals.StreamThread:1229)
INFO task [1_0] Created state store manager for task 1_0 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122)
INFO task [1_0] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Created active task 1_0 with assigned partitions [wordcount-application-Counts-repartition-0] (org.apache.kafka.streams.processor.internals.StreamThread:1248)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Creating active task 1_1 with assigned partitions [[wordcount-application-Counts-repartition-1]] (org.apache.kafka.streams.processor.internals.StreamThread:1229)
INFO task [1_1] Created state store manager for task 1_1 with the acquired state dir lock (org.apache.kafka.streams.processor.internals.ProcessorStateManager:122)
INFO task [1_1] Register global stores [] (org.apache.kafka.streams.processor.internals.ProcessorStateManager:352)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Created active task 1_1 with assigned partitions [wordcount-application-Counts-repartition-1] (org.apache.kafka.streams.processor.internals.StreamThread:1248)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Starting restoring state stores from changelog topics [wordcount-application-Counts-changelog-1, wordcount-application-Counts-changelog-0] (org.apache.kafka.streams.processor.internals.StoreChangelogReader:121)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] Adding assigned standby tasks {} (org.apache.kafka.streams.processor.internals.StreamThread:1345)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] State transition from ASSIGNING_PARTITIONS to RUNNING. (org.apache.kafka.streams.processor.internals.StreamThread:980)
INFO stream-client [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa] State transition from REBALANCING to RUNNING. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-thread [wordcount-application-c3360652-4348-4225-83bd-3623ddf818fa-StreamThread-1] partition assignment took 683 ms.
        current active tasks: [0_0, 0_1, 1_0, 1_1]
        current standby tasks: [] (org.apache.kafka.streams.processor.internals.StreamThread:193)

Play with Kafka Streams

> kafka-console-producer --broker-list localhost:9092 --topic word-count-input
>aaa bbb ccc
>aaa
>bbb
>aaa
>ddd
>eee
>fff
>bbb
>ccc fff BBB AAA AAA
>aaa bbb ddd
>ddd bbb ccc
>

Check on console

access to http://localhost:3030/data/topics/word-count-output/data

image.png

What processing is done?

The following are corresponding to logic to count words.

        ...
        KTable<String, Long> wordCounts = wordCountInput
                .mapValues(textLine -> textLine.toLowerCase(Locale.ROOT))
                // 3 - flatmap values split by space
                .flatMapValues(lowercasedTextLine -> Arrays.asList(lowercasedTextLine.split(" ")))
                // 4 - select key to apply a key (we discard the old key)
                .selectKey((ignoredkey, word) -> word)
                // 5 - group by key before aggregation
                .groupByKey()
                // 6- count occurences
                .count("Counts");
        ...

image.png

You can refer to https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#transform-a-stream for the detailed operation.

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?