Twitter
Spark
Elasticsearch
Kafka

Data Integration as Service: Kafka-Connectの紹介と実験

More than 1 year has passed since last update.

Apache Kafka 0.9より同梱されているKafka Connectを紹介します。

Kafka-Connect

Kafka ConnectはKafkaと周辺のシステム間でストリームデータをやりとりするための通信規格とライブラリとツールです。まずは下の図をご覧ください。

kafka-connect.png

コネクタは周辺のシステムからKafkaへデータを取り込むためのソースと周辺システムへデータを送るシンクの二種類があります。データの流れは一方通行です。すでに何十ものコネクタが実装されており、サポートされている周辺システムは多種に渡ります。もちろん自分でコネクタを作ることもできます。

Kafkaの中を通過するデータの形式は基本的にAvroです。コネクタが周辺システム固有のデータ形式をAvroに変換統一しています。これによりどんなソースとシンクの組み合わせでも動くことになります。

AvroにはSchema Registryがあり、スキーマの変更にも対応できます。

コネクタはスタンドアローンモードと分散モードがあり、分散モードでは複数台のサーバーで分散処理することでスケーラビリティと耐障害性を確保しています。

Kafka ConnectのランタイムにはREST APIが装備されていて、複数のコネクタの起動/停止/ステータスの管理が行えるようになっています。

Kafkaパーティション・オフセットの管理も行なっており、障害から回復した時に、データの重複欠損がないことを保証する仕組みもあります。

https://kafka.apache.org/documentation#connect

導入

KafkaはApacheのプロジェクトですが、開発の主導はConfluentです。ここではConfluent PlatformでKafkaを導入します。

環境: Ubuntu14 都合により古いバージョンです...

ここにあるドキュメントに従います。Scalaのバージョンは2.11
http://docs.confluent.io/3.0.0/installation.html#installation-apt

パッケージインストール

wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11

Oracle Java 1.7をインストール

最新のUbuntuならこのステップは不要でしょう。

sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java7-installer
sudo apt-get install oracle-java7-set-default

confluentユーザーを追加

sudo adduser confluent

各種ディレクトリ作成

mkdir /var/log/kafka
mkdir /var/lib/zookeeper
mkdir /var/lib/kafka
chown -R confluent:confluent /var/log/kafka /var/lib/zookeeper /var/lib/kafka

サードパーティーのコネクタのjarはkafka-connect-XXXXという名称で/usr/share/javaに保存します。Kafka ConnectのREST APIで起動の指示をした時、認識して自動的にロードしてくれます。

Zookeeper, Kafka, Schema-Registry 起動

su - confluent
/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
/usr/bin/kafka-server-start /etc/kafka/server.properties &
/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &

Kafka-Connect起動

distributedモードで起動します。

connect-distributed /etc/schema-registry/connect-avro-distributed.properties

REST APIを叩いてみると、コネクターのリストは空。まだ何も走らせていないので当たり前。

$ curl http://localhost:8083/connectors | jq .
[]

syslogをElasticsearchに保存してみる実験

syslogソースコネクタとelasticsearchシンクコネクタを使ってsyslogの内容をElasticsearchに保存してみます。Elasticsearchは2.x系を別途インストールしてください。

syslogの設定

Ubuntu14はrsyslogを使っていますので、TCPソケット、ポート5514にログを書き込むようにします

*.* @@localhost:5514という行を/etc/rsyslog.d/50-default.confの先頭に書きこみ、rsyslogを再起動します。

service rsyslog restart

syslogソースコネクタ

syslogソースを次のように起動します。コネクタがポート5514でログを受け取るようになります。

curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
    "name": "tcpsyslog",
    "config": {
        "tasks.max":1,
        "connector.class":"io.confluent.kafka.connect.syslog.source.TCPSyslogSourceConnector",
        "kafka.topic":"syslog-tcp",
        "syslog.port":5514,
        "syslog.reverse.dns.remote.ip": true
    }
}
'

Elasticsearchシンクコネクタ

同様にElasticsearchシンクを起動します。

curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": 1,
        "topics": "syslog-tcp",
        "key.ignore": true,
        "connection.url": "http://localhost:9200",
        "type.name": "kafka-connect"
    }
}
'

動作確認

コネクタ一覧を見てみるとtcpsyslogとelasticsearch-sinkコネクタが見えます。

$ curl http://localhost:8083/connectors | jq .
[
  "elasticsearch-sink",
  "tcpsyslog"
]

しばらくするとsyslogが何か書き込んでいるはずですのでESをチェックして見ましょう。

$ curl http://localhost:9200/_cat/indices
red open syslog-tcp    5 1

Kafkaのトピックと同じsyslog-tcpインデックスが作られていました!
読み出して見ましょう。

$ curl http://localhost:9200/syslog-tcp/_search?pretty
  ...
  {
    "_source": {
      "hostname": "localhost",
      "remote_address": "localhost/127.0.0.1:51631",
      "charset": "UTF-8",
      "message": "vagrant-ubuntu-trusty-64 kernel: [16583.344413] Hardware name: innotek GmbH VirtualBox/VirtualBox, BIOS VirtualBox 12/01/2006",
      "level": 4,
      "host": "vagrant-ubuntu-trusty-64",
      "facility": 0,
      "date": 1478654589000
    },
    "_score": 1,
    "_id": "syslog-tcp+0+75",
    "_type": "kafka-connect",
    "_index": "syslog-tcp"
  }
  ...

...おぉカーネルのログらしきものが入ってます!成功です。あっという間にパイプラインが出来上がりました。

まとめ 〜Data Integreation as Service〜

Kafka-Connectの紹介と簡単な実験をして見ました。コネクタにより"関心の分離"が可能となり、システムが疎結合になりました。Data Integreation as Serviceへの重要な布石で「データはとりあえずKafkaに送っておけ」ば後はいかようにもなります。

次回はKafka-Streamsの紹介をしたいと思います。Kafka-StreamsはKafka-Connectと違い、双方向の通信が可能です。組み合わせると非常に強力かつ柔軟なパイプラインが構築できます。

おまけ

Confluent公認コネクタ

コネクタ ソース・シンク タグ 開発元 ダウンロード
HDFS Sink HDFS, Hadoop, Hive Confluent Confluent
JDBC Source JDBC, MySQL Confluent Confluent
JDBC Sink JDBC, MySQL Confluent Confluent
Attunity Source CDC Attunity Attunity
Couchbase Source Couchbase, NoSQL Couchbase Couchbase
DataStax Sink Cassandra, DataStax Data Mountaineer Data Mountaineer
Elasticsearch Sink search, Elastic, log, analytics Confluent Confluet
GoldenGate Source CDC, Oracle Oracle Community
JustOne Sink Postgres JustOne JustOne
Striim Source CDC, MS SQLServer, Oracle, MySQL Striim Striim
Syncsort DMX Source DB2, IMS, VSAM, CICS Syncsort Syncsort
Syncsort DMX Sink DB2, IMS, VSAM, CICS Syncsort Syncsort
Vertica Source Vertica HP Enterprise HP Enterprise
Vertica Sink Vertica HP Enterprise HP Enterprise
VoltDB Sink VoltDB, NewSQL VoltDB VoltDB

コミュティにより開発されたコネクタ

コネクタ ソース・シンク タグ 開発元 ダウンロード
Apache Ignite Source File System Community Community
Apache Ignite Sink File System Community Community
Bloomberg Ticker Source Application feed Community Community
Cassandra Source Cassandra Community Community 1
Cassandra Sink Cassandra Community Community
DynamoDB ? Dynamo, NoSQL Community Community
Elasticsearch Sink Elastic, search, log, analytics Community Community 1 Community 2 Community 3
FTP Source File System Community Community
Google PubSub Source Messaging Community Community
Google PubSub Sink Messaging Community Community
Hazelcast Sink Datastore, In-memory Community Community
Hbase Sink Hbase, NoSQL Community Community 1 Community 2
InfluxDB Sink Datastore, Time-series Community Community
Internet Relay Chat Source Application feed Community Community
Jenkins Source Application feed Community Community
JMS Sink Messaging Community Community
Kudu Sink Kudu Community Community
Mixpanel Source analytics Community Community
MongoDB Source Mongo, MongoDB, NoSQL Community Community
MongoDB CDC - Debezium Source MongoDB, CDC Community Community
MQTT Source MQTT, messaging Community Community
MySQL CDC - Debezium Source MySQL, CDC, Oracle Community Community
Redis Sink Datastore, Messaging Community Community
RethinkDB Source NoSQL, Streaming Community Community
RethinkDB Sink NoSQL, Streaming Community Community
Salesforce Source Datastore Community Community
Solr Sink search, solr Community Community 1 Community 2
Solr Source search, solr Community Community
Splunk Sink Datastore, log Community Community
Splunk Source Datastore, log Community Community
spooldir Source File System Community Community
SQS Source AWS, SQS, Messaging Community Community
Syslog Source log Community Community
S3 Sink AWS, S3, log Community Community 1 Community 2
Twitter Source Application feed Community Community 1 Community 2
Twitter Sink Application feed Community Community