9
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Logstash input plugins(Kafka Producer/consumer)

Last updated at Posted at 2019-12-06

##1.目的

 Kafkaは、ProducerとConsumerのAPIを利用することにより、Kafka Brokderに対して、Topicの生成と受信などの操作を行うことが可能です。ElatticStackのコンポーネントの1つであるLogstashには、これらAPIを実装したプラグインが存在します。
 本投稿では、それらのプラグインをlogstash kafka producerlogstash kafka consumerと呼ぶこととします。本投稿では、あるユースケースを実現するための基本設定と基本パラメータについて解説します。

2.基本設定

2-1.前提構成

 基本設定では、下記構成(ユースケース)に対するLogstashのconfファイルの設定方法について説明します。
このユースケースでは、syslog clientから受信したsyslogデータを下記経路を通じて、最終的にKibanaにて可視化を行います。

22222222222.PNG

 本投稿では、Logstashの設定に主眼を置いているため、太字の箇所(logstash kafka producer/logstash kafka consumer)の基本設定について解説し、その他コンポーネントについては解説しません。

2-2.logstash kafka producerの基本設定

logstash kafka producerの基本設定は以下の通りです。input pluginにはTCP/UDPプラグインを使用して、output pluginにはKafkaを利用しています。

/etc/logstash/conf.d
### in Kafka producer
input {
       #TCP input moduleにてport5005listen
        tcp {
          host => "0.0.0.0"
          port => "50005"
          type => "syslog"
        }
       #udp input moduleにてport5005listen
        udp {
          host => "0.0.0.0"
          port => "50005"
          type => "syslog"
        }
}
output {
  kafka {
    #Kafka broker serverIPアドレスを記載する。
    bootstrap_servers => "broker1:9092,broker2:9092,broker3:9092"
    topic_id => "syslog"
  }
}

TCP/UDP portがListenしているがどうかは下記コマンドで確認することが可能です。

### in Kafka producer
ss -nltu | grep 50005
udp  UNCONN 0      0                               0.0.0.0:50005        0.0.0.0:*                                                                               
tcp  LISTEN 0      128                                   *:50005              *:*  

TCP/UDP input pluginを使用しなくても、syslog pluginでも設定できます。その場合の設定はこちらです。

/etc/logstash/conf.d
### in Kafka producer
input {
  syslog {
    #syslogの受信ポートを設定する。
      port => 50006
  }
}

output {
  kafka {
    #Kafka broker serverIPアドレスを記載する。
    bootstrap_servers => "broker1:9092,broker2:9092,broker3:9092"
    topic_id => "syslog"
  }
}

2-3.logstash kafka consumerの基本設定

Input plugin(logstash kafka consumer)の基本設定は以下の通りです。

/etc/logstash/conf.d
input {
    kafka {
        #Kafka broker serverIPアドレスを記載する。
        bootstrap_servers => "broker1:9092,broker2:9092,broker3:9092"
        #Consumerが取得するtopic名称を記載する。
        topics => ["topic"]
    }
}

output {
  #elasticsearchIPアドレスを記載する。
    elasticsearch {
        hosts => ["localhost:9200"]
    }
}

2-4.動作確認

syslog clientにてloggerコマンドを入力すると、

#syslog client
logger default-log_777

kafka brokerにてtopicの一覧を確認すると、logstashで指定したtopicが生成されていることが確認できます。

#kafka broker
# kafka-topics --zookeeper broker1:2181,broker2:2181,broker3 --list
syslog

また、kibanaにてdashboard画面を確認すると、syslog clientにて送出したsyslog message(logger default-log_777)を受信していることも確認できます。

image.png

4.logstash kafka producerのパラメータ

4-1.kafka producerとは

  • ProducerはメッセージをTopicに書き込むためのクライアントライブラリです。ユーザアプリケーションはProducerを使用して、Brokerクラスタ上に構成されたTopicの各Partitionにメッセージを書き込みます。
  • Producerは定期的にいずれかのBrokerからメタデータを取得することで、各Brokerのホスト名、接続先ポート、各Topic PartitionのLeader Replicaの場所など把握します。Producerはこの情報を元に、どのメッセージをどのBrokerに送信するかを決定します。

4-2.logstash kafka producerとは

 logstash kafka producerは、Kafka Client 2.1.0の仕様に準じたプラグインとなっています。本プラグインでは、Kafkaがサポートしているなかで、以下セキュリティ通信をサポートしています。

  • SSL (requires plugin version 3.0.0 or later)
  • Kerberos SASL (requires plugin version 5.1.0 or later)

 logstash kafka producerのパラメータ設定について、さらに活用するために本節では主要パラメータについて解説します。セキュリティに関するパラメータについては、本記事の対象外とします。

 デフォルトの設定では、セキュリティ通信は無効となっているが、必要に応じて設定をONにすることが可能です。本記事ではセキュリティ設定について触れません。詳細は、kafka output pluginを参照してください。

 logstash kafka producerにて、デフォルトで必須で設定しなければならないパラメータは、topic_idのみです。
 デフォルトのコーデックはplainとなっておるが、ユーザー自身で変更することも可能であります。
 例えば、イベント内のすべてのコンテンツをjson形式で送信したい場合は、outputのコンフィグレーションを
下記の通り設定する必要することで実現が可能となります。

    output {
      kafka {
        codec => json
        topic_id => "mytopic"
      }
    }

4-3.主要パラメータ

 logstash kafka producerの主要パラメータについて下記で解説します。
 また、構成図とのマッピングを下記に載せます(図の関係上、すべてのパラメータを載せることはできなかったので主観で重要そうなパラメータをpick upした)。

4444444444444.png

bootstrap_servers
topicを送信するkafka brokderの接続情報(IPアドレス等)を記載する。producerは、メタデータ(topic,partition,replica)の情報を得るために使用する。producerは受信したメタデータをもとに、kafka brokderに対して実データを送信する。フォーマットの記載の仕方は、host1:port1,host2:port2である。
client_id
client idは、kafka brokerへとrequestを作成する際に通知される。request内に含めることでrequest sourceをトラッキングできることがメリットである(IP/Port以外のトラッキング情報として)。デフォルトでは設定する必要はない。
security_protocol
各kafka producer~kafka broker間で使用するセキュリティプロトコルである。PLAINTEXT, SSL, SASL_PLAINTEXTの選択肢から選択することができる。デフォルト値は、PLAINTEXTである。
topic_id
producerが送信するtopic名称
value_serializer
Messageに使用するSerializerクラス。メッセージをエンコードするクラスを指定する。
buffer_memory
producer自身が使用可能な自身の最大メモリサイズ。recordをバッファする為に使用することができ、kafka brokerへの送信前に蓄積するための領域となる。デフォルト値は33,354,432 byte(= 33 Mbyte)となる。
acks
ProducerがRequest完了前にleaderに求めるackの数。ackが「0」の場合は、すべてのbroker serverからのackを受け取らずにrequestが完了したと判断する。 ackが「1」の場合は、leaderが自身のlogに書き込みが完了したが、すべてのfollowerからのackを返信されていない状態を指す。ackが「all」の場合は、すべてのin-sysnc replicaからレコードを完了した旨のackを受信した状態でrequestが完了したと判断する。
batch_size
producerが送出するrecord batchの最大サイズ。このパラメータは、Kafka producerとKafka Serverの両方の性能値に影響を大きく与える。デフォルトのbatch sizeは、16384 byteとなる。この値がkafka brokerが許容する受信サイズを超えることができない。
compression_type
producerがrecord batchを送信する際に利用する圧縮方式。選定する圧縮方式に応じて、record batchのoverheadの値が変化する。圧縮方式としては、none、gzip、snappy、lz4を選択することができる。
key_serializer
デフォルト値は"org.apache.kafka.common.serialization.StringSerializer"となる。producerがmessageのkey値をシリアライズするためのSerializer Classを指定する。
linger_ms
送信待機時間。送信シングルスレッドのループ待機時間。prodecer側で指定した最大のRequestサイズ(max_request_size)に達しなくても、この時間が経過したら送信される。
max_request_size
request最大サイズ。producerが送信するrecord batchの最大サイズ。この上限値に達したとき、producerはrequestを送出する。デフォルト値は、1.048,576 byte(=約1MB)である。
metadata_fetch_timeout_ms
initial meta data requestのさいに、topicのメタデータをfetchするためのタイムアウト値。デフォルトは60,000 msec(=60 sec)である。
metadata_max_age_ms
メタデータを強制リフレッシュするための最大待ち時間。デフォルト値は300,000 msec(= 5 min)である。
receive_buffer_bytes
データ読み込み時に指定するTCP receive bufferの最大サイズ。デフォルト値は、32,768 byte(=約32 kbyte)である。
reconnect_backoff_ms
Connection failが発生した際に、Kafka brokerにreconnectを実施するための待ち時間。デフォルト値は、10msec。
request_timeout_ms
このパラメータは、kafka producerがrequestのレスポンスを待つために使用する最大待ち時間をコントロールする。もしレスポンスがこのタイムアウト内に到着しない場合は、kafka producerは、必要に応じてrequestを再送信する。
retries
デフォルトのリトライの挙動は、成功するまでリトライをし続けるというものである。リトライ回数はユーザー自身で設定することが可能であるが、送信データの損失を防ぐために、このパラメータを調整することは推奨されない。もし、このリトライ回数を設定した場合(0以上の整数値を設定する)、kafka producerはその設定値しかリトライを実施しないという挙動となる。 0より小さい値を設定した場合はコンフィグレーションエラーとなる。
retry_backoff_ms
topic partitionへのrequestが失敗した場合にretryし続ける最大待ち時間である。デフォルト値は100であり、最大100回リトライを実施することとなる。
send_buffer_bytes
TCPデータ送信する際に使用する最大バッファサイズ。

5.logstash kafka producerのパラメータ

5-1.kafka consumerとは

  • ConsumerはメッセージをTopicから読み出すためのクライアントライブラリです。ユーザアプリケーションはConsumerを使用して、Brokerクラスタ上に構成されたTopicの各Partitionからメッセージを読み出します。
  • ConsumerもProducerと同様に、定期的にいずれかのBrokerからメタデータを取得することで、各Brokerのホスト名、接続先ポート、各Topic PartitionのLeader Replicaの場所など把握します。Consumerはこの情報を元に、どのBrokerからRecordを取得するのか決定します。
  • ConsumerがRecordをどこまで読みだしたのかを示すOffsetはConsumer側で管理し、Broker側では排他制御を行いません。そのためConsumer数が増加しても、Broker側の負担は少なく済みます。Consumerはアプリケーションの再起動時に、自身のOffsetから再開できるようOffsetを永続化する必要があります。そのため、Consumerは自身のOffsetをKafka上のOffset用Topicや、任意のデータストアに保存します。
  • ユーザアプリケーションは、1つ以上のConsumerでConsumer Groupを構成することで、1TopicのデータをGroup内のConsumerで分散読み出しできます。このようにデータを分散読み出しできるため、KafkaはHadoop/Spark/Stormなどの並列分散処理フレームワークと相性が良いです。また、Topicのデータは読み出されてもすぐには削除されないため、同じTopicのデータを複数のConsumer Groupから読み出すことができます。

5-2.logstash kafka consumerとは

 logstash kafka consumerは、group management(group idを用いた論理的なグループ管理)を取り扱うことができる。異なる物理マシン上のLogstashインスタンスにて、同じgroup_idを付与することでデータ受信速度を向上させることができます。デフォルト設定では、Logstashインスタンスは1グループにしか所属していません。各コンフィグレーション上のgroup_idを同一の値に設定する必要があります。
Topicに含まれるmessageは、同一のgroup_idをもつ全てのLogstashインスタンスに送信されます。

以下に含まれるKafka brokerからのメタデータは、@metadata fieldに含まれる。メタデータは、decorate_eventオプションが「true」に設定された場合のみ送信されます。デフォルトでは、decorate_eventオプションは、falseとなっています。

  • [@metadata][kafka][topic]: Kafkatトピック名称
  • [@metadata][kafka][consumer_group]: Consumerグループ
  • [@metadata][kafka][partition]: messageが含まれていたpartition情報
  • [@metadata][kafka][offset]: messageのrecord offset
  • [@metadata][kafka][key]: record keyの情報
  • [@metadata][kafka][timestamp]: Recordのタイムスタンプ情報。ただし、この情報は、Kafka brokerの設定に依存するので注意が必要です。デフォルトでは、recordが生成されたタイミングのタイムスタンプを意味します。kafka brokerのより詳細な仕様を理解する為には、log.message.timestamp.typeのパラメータ仕様を確認する必要があります。

5-3.主要パラメータ(編集中)

 logstash kafka consumerの主要パラメータについて下記で解説します。
 また、構成図とのマッピングを下記に載せます(図の関係上、すべてのパラメータを載せることはできなかったので主観で重要そうなパラメータをpick upしました)。

  • auto_commit_interval_ms
    ConsumerがもつoffsetがKafkaにコミットする周期間隔。デフォルト値は、5,000 msec。

  • auto_offset_reset
    ConsumerがFetchリクエストを出した場合に、initial offsetがなかった場合、あるいはoffsetが範囲外だった場合に、Consumerがもつoffset情報をどのように設定し直すかを、このパラメータで決定することができる。

    • earliest: earliest offsetに自動的にリセットする。
    • latest: latest offsetに自動的にリセットする。
    • none: consumerグループ内に以前のOffsetが無い場合、exception(例外通知)をconsumerに送付する。
    • anything else: グループ内に問い合わせを問い合わせず、exception(例外通知)consumerに送付する。
  • bootstrap_servers
    Kafka brokerクラスタに対しての初回接続の際に使用されるKafkaインスタンスのリスト。複数のKafka borkerを指定する場合は、「host1:port1,host2:port2」の形式で記載します。すべてのクラスタメンバーシップを検知するためのInitial Connectionのみで使用されるため、この値は必ずしもすべてのKafka brokerの値を入力する必要はないです。Initial Connectionが完了したのち、すべてのKafka brokderと通信することが可能です。しかし、念のため(単一のKafka brokderがDownしているケースを考慮して)複数のKafka brokderを記載したほうが耐障害性を考慮すると無難な選択肢となります。

  • check_crcs
    record内のCRC32を自動的にチェックする機能である。このチェック機能はオーバーヘッドが存在するため、パフォーマンスを最優先する場合は機能をOFFすることを推奨する。

  • client_id
    このパラメータを設定した場合、requestを作成するタイミングでKafka serverに対して、client_idをKafka brokerに通知されます。client_idを設定することで、ip/port情報以外の情報でトラッキングすることができる。

  • connections_max_idle_ms
    このパラメータで指定した時間後、idleコネクションを閉じます。

  • consumer_threads
    デフォルト値は1である。完全なバランスを保つためには、この値はKafka partitionの数と理想的な設定である。

  • decorate_events
    デフォルトは、falseである。この設定をtrueにすることで、以下のKafkaのメタデータをlogstashのイベント内に含めることができる。

  • enable_auto_commit
    デフォルト値はtrueである。このパラメータをtrueに設定すると、kafka brokerに対して定期的にコミットを行う。このコミットのは、プロセス失敗が発生した場合に発生する。

  • exclude_internal_topics
    internal topic(offsetなど)を取り除くかどうかを設定する。

  • fetch_max_bytes
    fetchリクエストで取得可能なデータ最大量。デフォルト値は、52,428,800(50 MB)

  • fetch_max_wait_ms
    fetch_min_byteによって設定した条件を満足しないデータが存在しない場合、fetchリクエストを応答する前にkafka brokerがブロックする。そのブロックを行うまでの最大時間。デフォルトでは、500msec(=5 sec)である。

  • fetch_min_bytes
    consumerからのfetch requestに対してkafka brokerが応答するための最小データ容量。

  • group_id
    デフォルト値は、「logstash」である。consumerが所属するグループ識別子である。トピック内のMessageは同一group_idのすべてのlogstashインスタンスに通知される。

  • heartbeat_interval_ms
    同一グループ内のconsumerに適用されるハートビート。ハートビートはconsumerのセッションの正常動作性を確認することに使用される。また、新しいconsumerがjoinしたりleaveした場合のリバランシングに使用される。この値は、session_timeout_msより必ず小さく設定する必要がある。ただし、session_time_outの値の3分の1より大きくするのを推奨するので注意が必要となる。デフォルト値は、3,000
    msec (= 3 sec)である。

  • key_deserializer_class
    デフォルト値は、"org.apache.kafka.common.serialization.StringDeserializer"である。record keyのシリアル化をデコードするさいに使用されるJavaクラスである。

  • max_partition_fetch_bytes
    pertitionごとの最大フェッチバイト数。

  • max_poll_interval_ms
    consumerグループ管理をおこなうときにpoll起動間の最大待ち時間。

  • max_poll_records
    1処理で取得可能な最大record数。

  • metadata_max_age_ms
    kafka brokerのメタデータを強制リフレッシュする最大待ち時間。

  • partition_assignment_strategy
    デフォルト値は、「org.apache.kafka.clients.consumer.RangeAssignor」である。kafka consumerとpartitonの配布のために使用される。

  • poll_timeout_ms
    topicから新しいメッセージを受信するための最大待ち時間。この時間間隔で指定topicからデータを受信する。

  • receive_buffer_bytes
    データ受信時に使用するTCP受信バッファ。

  • reconnect_backoff_ms
    kafka brokerに李コネクトするための最大待ち時間。

  • request_timeout_ms
    requestのタイムアウト値。kafka clientが投げたrequestのレスポンスの待ち時間。

  • retry_backoff_ms
    fetchリクエストが失敗したときにtopic partitionに他しいてトライするまでの待ち時間。

  • security_protocol
    利用するセキュリティプロトコル。PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSLのいづれかを使用する。

  • send_buffer_bytes
    データ送信時のTCPバッファサイズ。

  • session_timeout_ms
    consumerがdead状態と認識してリバランシングの操作を実施するためのトリガー値である。

  • topics
    定期購読するtopicsのリストである。デフォルト値は、["logstash"]である。

  • topics_pattern
    定期購読するtopicのパターンを記載します。この値を設定すると、topicsの値が無視されるので注意が必要である。

  • value_deserializer_class
    デフォルト値は、"org.apache.kafka.common.serialization.StringDeserializer"である。recordのvalueをデコードする際に資料されるJavaクラス。

4.Kafkaの基礎概念

上記パラメータを理解するための、Kafkaの基礎概念を下記に示す。(気が向いたら編集予定)

4-1.Record batch

from this URL

  • Record: messageの最小単位。keyとvalueのペアで構成される。
  • Record batch: recordの集合。producerがtopic partitionに送信する際は、record batchの単位で送信される。それぞれのreord batchは複数のrecordで構成される。record batchはrecordに加えて、後述されるoverheadも含まれる。
  • Record batch overhead:messageのバージョンによってoverheadの大きさが変化する。producer側の設定では、overheadのサイズは3つの方法により、増減させることとが可能である。linger time、compression、スキーマ定義(valueとkey)である。

1 4UOYy2WLNt3cQCqMDqCYLA.jpeg

9
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?