15
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

kafkaとは?特徴は?

  • 分散型イベント駆動型プラットフォーム
  • PUB/SUB(Producer/Consumer) でイベント駆動開発ができる
  • message順番を守ることができる1
  • message保持方法を必要に応じて設定できる2
    • 時間が経過すると、自動削除
    • logデータが大きくなると、自動削除
    • データの上書きで最新のデータのみを保持
    • 永遠に保持し、time travelもできる
  • Consumer側でどこまでデータをconsumeしたのかを管理できる3
    • 自動
    • 手動
  • データのlossをコントロールできる4
  • transactionサポート5
    • atomicity, consistencyを実現するため、データを大きなjsonに入れて、PUBする方法もある

kafkaでシステムレベルでのイベント駆動型開発イメージ

kafkaのlogoが表すように、様々なサービスがkafka経由でリアルタイムで情報を連携できる

kafka

システムを設定するときの留意点

  • messageの順番を守るべきか
  • ある程度のデータlossは許すべきか
  • 重複データをconsumeするときに、処理を如何するか
  • schema lessなので、topicやデータ構造のドキュメントを書くべき
  • データのバージョン管理
  • データ自体にバージョンを入れるか、topicでバージョン管理するかを決める必要がある

dockerでKafka serverを作る

wurstmeister/kafka-dockerを使用

docker-compose -f docker-compose-single-broker.yml up

rails + karafka でmessageの交換を試す

  • rails appを作成
rails new karafka_example
  • Gemfileに以下のgemを追加し、bundle installする
gem 'karafka'
  • Karafkaの設定を作成
bundle exec karafka install

以下のファイルが作成される

app/consumers/application_consumer.rb
app/responders/application_responder.rb
karafka.rb
  • karafka.rbを編集
ENV['RAILS_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
require ::File.expand_path('../config/environment', __FILE__)
Rails.application.eager_load!

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka.seed_brokers = %w[kafka://127.0.0.1:9092]
    config.client_id = 'example_app'
    config.backend = :inline
    config.batch_fetching = true
    config.logger = Rails.logger
  end
 
  Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)

  consumer_groups.draw do
    consumer_group :bigger_group do
      batch_fetching false

      topic :users do
        consumer UsersConsumer
      end
    end
  end
end

KarafkaApp.boot!
  • consumerを作成する

app/consumers/users_consumers.rb

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params}"
  end
end
  • responderを作成する

app/responders/users_responder.rb

class UsersResponder < ApplicationResponder
  topic :users

  def respond(event_payload)
    respond_to :users, event_payload
  end
end
  • karafka server起動
bundle exec karafka server
  • rails consoleでデータ作成を試す
UsersResponder.call({ event_name: "user_created", payload: { id: 1 } }

関連資料

  1. message順番

  2. log削除

  3. commit-offset

  4. message guarantee

  5. transaction

15
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?