Edited at

Data Integration as Service: Kafka-Connectの紹介と実験

More than 1 year has passed since last update.

Apache Kafka 0.9より同梱されているKafka Connectを紹介します。


Kafka-Connect

Kafka ConnectはKafkaと周辺のシステム間でストリームデータをやりとりするための通信規格とライブラリとツールです。まずは下の図をご覧ください。

コネクタは周辺のシステムからKafkaへデータを取り込むためのソースと周辺システムへデータを送るシンクの二種類があります。データの流れは一方通行です。すでに何十ものコネクタが実装されており、サポートされている周辺システムは多種に渡ります。もちろん自分でコネクタを作ることもできます。

Kafkaの中を通過するデータの形式は基本的にAvroです。コネクタが周辺システム固有のデータ形式をAvroに変換統一しています。これによりどんなソースとシンクの組み合わせでも動くことになります。

AvroにはSchema Registryがあり、スキーマの変更にも対応できます。

コネクタはスタンドアローンモードと分散モードがあり、分散モードでは複数台のサーバーで分散処理することでスケーラビリティと耐障害性を確保しています。

Kafka ConnectのランタイムにはREST APIが装備されていて、複数のコネクタの起動/停止/ステータスの管理が行えるようになっています。

Kafkaパーティション・オフセットの管理も行なっており、障害から回復した時に、データの重複欠損がないことを保証する仕組みもあります。

https://kafka.apache.org/documentation#connect


導入

KafkaはApacheのプロジェクトですが、開発の主導はConfluentです。ここではConfluent PlatformでKafkaを導入します。

環境: Ubuntu14 都合により古いバージョンです...

ここにあるドキュメントに従います。Scalaのバージョンは2.11

http://docs.confluent.io/3.0.0/installation.html#installation-apt


パッケージインストール

wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -

sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11


Oracle Java 1.7をインストール

最新のUbuntuならこのステップは不要でしょう。

sudo add-apt-repository ppa:webupd8team/java

sudo apt-get update
sudo apt-get install oracle-java7-installer
sudo apt-get install oracle-java7-set-default


confluentユーザーを追加

sudo adduser confluent


各種ディレクトリ作成

mkdir /var/log/kafka

mkdir /var/lib/zookeeper
mkdir /var/lib/kafka
chown -R confluent:confluent /var/log/kafka /var/lib/zookeeper /var/lib/kafka

サードパーティーのコネクタのjarはkafka-connect-XXXXという名称で/usr/share/javaに保存します。Kafka ConnectのREST APIで起動の指示をした時、認識して自動的にロードしてくれます。


Zookeeper, Kafka, Schema-Registry 起動

su - confluent

/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
/usr/bin/kafka-server-start /etc/kafka/server.properties &
/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &


Kafka-Connect起動

distributedモードで起動します。

connect-distributed /etc/schema-registry/connect-avro-distributed.properties

REST APIを叩いてみると、コネクターのリストは空。まだ何も走らせていないので当たり前。

$ curl http://localhost:8083/connectors | jq .

[]


syslogをElasticsearchに保存してみる実験

syslogソースコネクタとelasticsearchシンクコネクタを使ってsyslogの内容をElasticsearchに保存してみます。Elasticsearchは2.x系を別途インストールしてください。


syslogの設定

Ubuntu14はrsyslogを使っていますので、TCPソケット、ポート5514にログを書き込むようにします

*.* @@localhost:5514という行を/etc/rsyslog.d/50-default.confの先頭に書きこみ、rsyslogを再起動します。

service rsyslog restart


syslogソースコネクタ

syslogソースを次のように起動します。コネクタがポート5514でログを受け取るようになります。

curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '

{
"name": "tcpsyslog",
"config": {
"tasks.max":1,
"connector.class":"io.confluent.kafka.connect.syslog.source.TCPSyslogSourceConnector",
"kafka.topic":"syslog-tcp",
"syslog.port":5514,
"syslog.reverse.dns.remote.ip": true
}
}
'


Elasticsearchシンクコネクタ

同様にElasticsearchシンクを起動します。

curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '

{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": 1,
"topics": "syslog-tcp",
"key.ignore": true,
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect"
}
}
'


動作確認

コネクタ一覧を見てみるとtcpsyslogとelasticsearch-sinkコネクタが見えます。

$ curl http://localhost:8083/connectors | jq .

[
"elasticsearch-sink",
"tcpsyslog"
]

しばらくするとsyslogが何か書き込んでいるはずですのでESをチェックして見ましょう。

$ curl http://localhost:9200/_cat/indices

red open syslog-tcp 5 1

Kafkaのトピックと同じsyslog-tcpインデックスが作られていました!

読み出して見ましょう。

$ curl http://localhost:9200/syslog-tcp/_search?pretty

...
{
"_source": {
"hostname": "localhost",
"remote_address": "localhost/127.0.0.1:51631",
"charset": "UTF-8",
"message": "vagrant-ubuntu-trusty-64 kernel: [16583.344413] Hardware name: innotek GmbH VirtualBox/VirtualBox, BIOS VirtualBox 12/01/2006",
"level": 4,
"host": "vagrant-ubuntu-trusty-64",
"facility": 0,
"date": 1478654589000
},
"_score": 1,
"_id": "syslog-tcp+0+75",
"_type": "kafka-connect",
"_index": "syslog-tcp"
}
...

...おぉカーネルのログらしきものが入ってます!成功です。あっという間にパイプラインが出来上がりました。


まとめ 〜Data Integreation as Service〜

Kafka-Connectの紹介と簡単な実験をして見ました。コネクタにより"関心の分離"が可能となり、システムが疎結合になりました。Data Integreation as Serviceへの重要な布石で「データはとりあえずKafkaに送っておけ」ば後はいかようにもなります。

次回はKafka-Streamsの紹介をしたいと思います。Kafka-StreamsはKafka-Connectと違い、双方向の通信が可能です。組み合わせると非常に強力かつ柔軟なパイプラインが構築できます。


おまけ

Confluent公認コネクタ

コネクタ
ソース・シンク
タグ
開発元
ダウンロード

HDFS
Sink
HDFS, Hadoop, Hive
Confluent
Confluent

JDBC
Source
JDBC, MySQL
Confluent
Confluent

JDBC
Sink
JDBC, MySQL
Confluent
Confluent

Attunity
Source
CDC
Attunity
Attunity

Couchbase
Source
Couchbase, NoSQL
Couchbase
Couchbase

DataStax
Sink
Cassandra, DataStax
Data Mountaineer
Data Mountaineer

Elasticsearch
Sink
search, Elastic, log, analytics
Confluent
Confluet

GoldenGate
Source
CDC, Oracle
Oracle
Community

JustOne
Sink
Postgres
JustOne
JustOne

Striim
Source
CDC, MS SQLServer, Oracle, MySQL
Striim
Striim

Syncsort DMX
Source
DB2, IMS, VSAM, CICS
Syncsort
Syncsort

Syncsort DMX
Sink
DB2, IMS, VSAM, CICS
Syncsort
Syncsort

Vertica
Source
Vertica
HP Enterprise
HP Enterprise

Vertica
Sink
Vertica
HP Enterprise
HP Enterprise

VoltDB
Sink
VoltDB, NewSQL
VoltDB
VoltDB

コミュティにより開発されたコネクタ

コネクタ
ソース・シンク
タグ
開発元
ダウンロード

Apache Ignite
Source
File System
Community
Community

Apache Ignite
Sink
File System
Community
Community

Bloomberg Ticker
Source
Application feed
Community
Community

Cassandra
Source
Cassandra
Community
Community 1

Cassandra
Sink
Cassandra
Community
Community

DynamoDB
?
Dynamo, NoSQL
Community
Community

Elasticsearch
Sink
Elastic, search, log, analytics
Community

Community 1 Community 2 Community 3

FTP
Source
File System
Community
Community

Google PubSub
Source
Messaging
Community
Community

Google PubSub
Sink
Messaging
Community
Community

Hazelcast
Sink
Datastore, In-memory
Community
Community

Hbase
Sink
Hbase, NoSQL
Community

Community 1 Community 2

InfluxDB
Sink
Datastore, Time-series
Community
Community

Internet Relay Chat
Source
Application feed
Community
Community

Jenkins
Source
Application feed
Community
Community

JMS
Sink
Messaging
Community
Community

Kudu
Sink
Kudu
Community
Community

Mixpanel
Source
analytics
Community
Community

MongoDB
Source
Mongo, MongoDB, NoSQL
Community
Community

MongoDB CDC - Debezium
Source
MongoDB, CDC
Community
Community

MQTT
Source
MQTT, messaging
Community
Community

MySQL CDC - Debezium
Source
MySQL, CDC, Oracle
Community
Community

Redis
Sink
Datastore, Messaging
Community
Community

RethinkDB
Source
NoSQL, Streaming
Community
Community

RethinkDB
Sink
NoSQL, Streaming
Community
Community

Salesforce
Source
Datastore
Community
Community

Solr
Sink
search, solr
Community

Community 1 Community 2

Solr
Source
search, solr
Community
Community

Splunk
Sink
Datastore, log
Community
Community

Splunk
Source
Datastore, log
Community
Community

spooldir
Source
File System
Community
Community

SQS
Source
AWS, SQS, Messaging
Community
Community

Syslog
Source
log
Community
Community

S3
Sink
AWS, S3, log
Community

Community 1 Community 2

Twitter
Source
Application feed
Community

Community 1 Community 2

Twitter
Sink
Application feed
Community
Community