Apache Kafka 0.9より同梱されているKafka Connectを紹介します。
Kafka-Connect
Kafka ConnectはKafkaと周辺のシステム間でストリームデータをやりとりするための通信規格とライブラリとツールです。まずは下の図をご覧ください。
コネクタは周辺のシステムからKafkaへデータを取り込むためのソース
と周辺システムへデータを送るシンク
の二種類があります。データの流れは一方通行です。すでに何十ものコネクタが実装されており、サポートされている周辺システムは多種に渡ります。もちろん自分でコネクタを作ることもできます。
Kafkaの中を通過するデータの形式は基本的にAvroです。コネクタが周辺システム固有のデータ形式をAvroに変換統一しています。これによりどんなソースとシンクの組み合わせでも動くことになります。
AvroにはSchema Registryがあり、スキーマの変更にも対応できます。
コネクタはスタンドアローンモードと分散モードがあり、分散モードでは複数台のサーバーで分散処理することでスケーラビリティと耐障害性を確保しています。
Kafka ConnectのランタイムにはREST APIが装備されていて、複数のコネクタの起動/停止/ステータスの管理が行えるようになっています。
Kafkaパーティション・オフセットの管理も行なっており、障害から回復した時に、データの重複欠損がないことを保証する仕組みもあります。
導入
KafkaはApacheのプロジェクトですが、開発の主導はConfluentです。ここではConfluent PlatformでKafkaを導入します。
環境: Ubuntu14 都合により古いバージョンです...
ここにあるドキュメントに従います。Scalaのバージョンは2.11
http://docs.confluent.io/3.0.0/installation.html#installation-apt
パッケージインストール
wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11
Oracle Java 1.7をインストール
最新のUbuntuならこのステップは不要でしょう。
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java7-installer
sudo apt-get install oracle-java7-set-default
confluentユーザーを追加
sudo adduser confluent
各種ディレクトリ作成
mkdir /var/log/kafka
mkdir /var/lib/zookeeper
mkdir /var/lib/kafka
chown -R confluent:confluent /var/log/kafka /var/lib/zookeeper /var/lib/kafka
サードパーティーのコネクタのjarはkafka-connect-XXXXという名称で/usr/share/javaに保存します。Kafka ConnectのREST APIで起動の指示をした時、認識して自動的にロードしてくれます。
Zookeeper, Kafka, Schema-Registry 起動
su - confluent
/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
/usr/bin/kafka-server-start /etc/kafka/server.properties &
/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &
Kafka-Connect起動
distributedモードで起動します。
connect-distributed /etc/schema-registry/connect-avro-distributed.properties
REST APIを叩いてみると、コネクターのリストは空。まだ何も走らせていないので当たり前。
$ curl http://localhost:8083/connectors | jq .
[]
syslogをElasticsearchに保存してみる実験
syslogソースコネクタとelasticsearchシンクコネクタを使ってsyslogの内容をElasticsearchに保存してみます。Elasticsearchは2.x系を別途インストールしてください。
syslogの設定
Ubuntu14はrsyslogを使っていますので、TCPソケット、ポート5514にログを書き込むようにします
*.* @@localhost:5514
という行を/etc/rsyslog.d/50-default.confの先頭に書きこみ、rsyslogを再起動します。
service rsyslog restart
syslogソースコネクタ
syslogソースを次のように起動します。コネクタがポート5514でログを受け取るようになります。
curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
"name": "tcpsyslog",
"config": {
"tasks.max":1,
"connector.class":"io.confluent.kafka.connect.syslog.source.TCPSyslogSourceConnector",
"kafka.topic":"syslog-tcp",
"syslog.port":5514,
"syslog.reverse.dns.remote.ip": true
}
}
'
Elasticsearchシンクコネクタ
同様にElasticsearchシンクを起動します。
curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": 1,
"topics": "syslog-tcp",
"key.ignore": true,
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect"
}
}
'
動作確認
コネクタ一覧を見てみるとtcpsyslogとelasticsearch-sinkコネクタが見えます。
$ curl http://localhost:8083/connectors | jq .
[
"elasticsearch-sink",
"tcpsyslog"
]
しばらくするとsyslogが何か書き込んでいるはずですのでESをチェックして見ましょう。
$ curl http://localhost:9200/_cat/indices
red open syslog-tcp 5 1
Kafkaのトピックと同じsyslog-tcpインデックスが作られていました!
読み出して見ましょう。
$ curl http://localhost:9200/syslog-tcp/_search?pretty
...
{
"_source": {
"hostname": "localhost",
"remote_address": "localhost/127.0.0.1:51631",
"charset": "UTF-8",
"message": "vagrant-ubuntu-trusty-64 kernel: [16583.344413] Hardware name: innotek GmbH VirtualBox/VirtualBox, BIOS VirtualBox 12/01/2006",
"level": 4,
"host": "vagrant-ubuntu-trusty-64",
"facility": 0,
"date": 1478654589000
},
"_score": 1,
"_id": "syslog-tcp+0+75",
"_type": "kafka-connect",
"_index": "syslog-tcp"
}
...
...おぉカーネルのログらしきものが入ってます!成功です。あっという間にパイプラインが出来上がりました。
まとめ 〜Data Integreation as Service〜
Kafka-Connectの紹介と簡単な実験をして見ました。コネクタにより"関心の分離"が可能となり、システムが疎結合になりました。Data Integreation as Serviceへの重要な布石で「データはとりあえずKafkaに送っておけ」ば後はいかようにもなります。
次回はKafka-Streamsの紹介をしたいと思います。Kafka-StreamsはKafka-Connectと違い、双方向の通信が可能です。組み合わせると非常に強力かつ柔軟なパイプラインが構築できます。
おまけ
Confluent公認コネクタ
| コネクタ | ソース・シンク | タグ | 開発元 | ダウンロード |
|-----------+-------------+------+-------------------+----------+
|HDFS | Sink | HDFS, Hadoop, Hive | Confluent |Confluent|
|JDBC | Source | JDBC, MySQL | Confluent |Confluent|
|JDBC | Sink | JDBC, MySQL | Confluent |Confluent|
|Attunity | Source | CDC | Attunity |Attunity|
|Couchbase | Source | Couchbase, NoSQL | Couchbase |Couchbase|
|DataStax | Sink | Cassandra, DataStax | Data Mountaineer |Data Mountaineer|
|Elasticsearch | Sink | search, Elastic, log, analytics | Confluent |Confluet|
|GoldenGate | Source | CDC, Oracle | Oracle |Community|
|JustOne | Sink | Postgres | JustOne |JustOne|
|Striim | Source | CDC, MS SQLServer, Oracle, MySQL | Striim |Striim|
|Syncsort DMX | Source | DB2, IMS, VSAM, CICS | Syncsort |Syncsort|
|Syncsort DMX | Sink | DB2, IMS, VSAM, CICS | Syncsort |Syncsort|
|Vertica | Source | Vertica | HP Enterprise |HP Enterprise|
|Vertica | Sink | Vertica | HP Enterprise |HP Enterprise|
|VoltDB | Sink | VoltDB, NewSQL | VoltDB |VoltDB|
コミュティにより開発されたコネクタ
| コネクタ | ソース・シンク | タグ | 開発元 | ダウンロード |
|-----------+-------------+------+-------------------+----------+
|Apache Ignite | Source | File System | Community |Community|
|Apache Ignite | Sink | File System | Community |Community|
|Bloomberg Ticker | Source | Application feed | Community |Community|
|Cassandra | Source | Cassandra | Community |Community 1|
|Cassandra | Sink | Cassandra | Community |Community|
|DynamoDB | ? | Dynamo, NoSQL | Community |Community|
|Elasticsearch | Sink | Elastic, search, log, analytics | Community |Community 1 Community 2 Community 3|
|FTP | Source | File System | Community |Community|
|Google PubSub | Source | Messaging | Community |Community|
|Google PubSub | Sink | Messaging | Community |Community|
|Hazelcast | Sink | Datastore, In-memory | Community |Community|
|Hbase | Sink | Hbase, NoSQL | Community |Community 1 Community 2|
|InfluxDB | Sink | Datastore, Time-series | Community |Community|
|Internet Relay Chat | Source | Application feed | Community |Community|
|Jenkins | Source | Application feed | Community |Community|
|JMS | Sink | Messaging | Community |Community|
|Kudu | Sink | Kudu | Community |Community|
|Mixpanel | Source | analytics | Community |Community|
|MongoDB | Source | Mongo, MongoDB, NoSQL | Community |Community|
|MongoDB CDC - Debezium | Source | MongoDB, CDC | Community |Community|
|MQTT | Source | MQTT, messaging | Community |Community|
|MySQL CDC - Debezium | Source | MySQL, CDC, Oracle | Community |Community|
|Redis | Sink | Datastore, Messaging | Community |Community|
|RethinkDB | Source | NoSQL, Streaming | Community |Community|
|RethinkDB | Sink | NoSQL, Streaming | Community |Community|
|Salesforce | Source | Datastore | Community |Community|
|Solr | Sink | search, solr | Community |Community 1 Community 2|
|Solr | Source | search, solr | Community |Community|
|Splunk | Sink | Datastore, log | Community |Community|
|Splunk | Source | Datastore, log | Community |Community|
|spooldir | Source | File System | Community |Community|
|SQS | Source | AWS, SQS, Messaging | Community | Community|
|Syslog | Source | log | Community |Community|
|S3 | Sink | AWS, S3, log | Community | Community 1 Community 2|
|Twitter | Source | Application feed | Community |Community 1 Community 2|
|Twitter | Sink | Application feed | Community | Community|