kafkaとは?特徴は?
- 分散型イベント駆動型プラットフォーム
- PUB/SUB(Producer/Consumer) でイベント駆動開発ができる
- message順番を守ることができる1
- message保持方法を必要に応じて設定できる2
- 時間が経過すると、自動削除
- logデータが大きくなると、自動削除
- データの上書きで最新のデータのみを保持
- 永遠に保持し、time travelもできる
- Consumer側でどこまでデータをconsumeしたのかを管理できる3
- 自動
- 手動
- データのlossをコントロールできる4
- transactionサポート5
- atomicity, consistencyを実現するため、データを大きなjsonに入れて、PUBする方法もある
kafkaでシステムレベルでのイベント駆動型開発イメージ
kafkaのlogoが表すように、様々なサービスがkafka経由でリアルタイムで情報を連携できる
システムを設定するときの留意点
- messageの順番を守るべきか
- ある程度のデータlossは許すべきか
- 重複データをconsumeするときに、処理を如何するか
- schema lessなので、topicやデータ構造のドキュメントを書くべき
- データのバージョン管理
- データ自体にバージョンを入れるか、topicでバージョン管理するかを決める必要がある
dockerでKafka serverを作る
docker-compose -f docker-compose-single-broker.yml up
rails + karafka でmessageの交換を試す
- rails appを作成
rails new karafka_example
- Gemfileに以下のgemを追加し、bundle installする
gem 'karafka'
- Karafkaの設定を作成
bundle exec karafka install
以下のファイルが作成される
app/consumers/application_consumer.rb
app/responders/application_responder.rb
karafka.rb
- karafka.rbを編集
ENV['RAILS_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
require ::File.expand_path('../config/environment', __FILE__)
Rails.application.eager_load!
class KarafkaApp < Karafka::App
setup do |config|
config.kafka.seed_brokers = %w[kafka://127.0.0.1:9092]
config.client_id = 'example_app'
config.backend = :inline
config.batch_fetching = true
config.logger = Rails.logger
end
Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)
consumer_groups.draw do
consumer_group :bigger_group do
batch_fetching false
topic :users do
consumer UsersConsumer
end
end
end
end
KarafkaApp.boot!
- consumerを作成する
app/consumers/users_consumers.rb
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params}"
end
end
- responderを作成する
app/responders/users_responder.rb
class UsersResponder < ApplicationResponder
topic :users
def respond(event_payload)
respond_to :users, event_payload
end
end
- karafka server起動
bundle exec karafka server
- rails consoleでデータ作成を試す
UsersResponder.call({ event_name: "user_created", payload: { id: 1 } }