21
23

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Data Integration as Service: Kafka-Connectの紹介と実験

Last updated at Posted at 2016-12-04

Apache Kafka 0.9より同梱されているKafka Connectを紹介します。

Kafka-Connect

Kafka ConnectはKafkaと周辺のシステム間でストリームデータをやりとりするための通信規格とライブラリとツールです。まずは下の図をご覧ください。

kafka-connect.png

コネクタは周辺のシステムからKafkaへデータを取り込むためのソースと周辺システムへデータを送るシンクの二種類があります。データの流れは一方通行です。すでに何十ものコネクタが実装されており、サポートされている周辺システムは多種に渡ります。もちろん自分でコネクタを作ることもできます。

Kafkaの中を通過するデータの形式は基本的にAvroです。コネクタが周辺システム固有のデータ形式をAvroに変換統一しています。これによりどんなソースとシンクの組み合わせでも動くことになります。

AvroにはSchema Registryがあり、スキーマの変更にも対応できます。

コネクタはスタンドアローンモードと分散モードがあり、分散モードでは複数台のサーバーで分散処理することでスケーラビリティと耐障害性を確保しています。

Kafka ConnectのランタイムにはREST APIが装備されていて、複数のコネクタの起動/停止/ステータスの管理が行えるようになっています。

Kafkaパーティション・オフセットの管理も行なっており、障害から回復した時に、データの重複欠損がないことを保証する仕組みもあります。

導入

KafkaはApacheのプロジェクトですが、開発の主導はConfluentです。ここではConfluent PlatformでKafkaを導入します。

環境: Ubuntu14 都合により古いバージョンです...

ここにあるドキュメントに従います。Scalaのバージョンは2.11
http://docs.confluent.io/3.0.0/installation.html#installation-apt

パッケージインストール

wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11

Oracle Java 1.7をインストール

最新のUbuntuならこのステップは不要でしょう。

sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java7-installer
sudo apt-get install oracle-java7-set-default

confluentユーザーを追加

sudo adduser confluent

各種ディレクトリ作成

mkdir /var/log/kafka
mkdir /var/lib/zookeeper
mkdir /var/lib/kafka
chown -R confluent:confluent /var/log/kafka /var/lib/zookeeper /var/lib/kafka

サードパーティーのコネクタのjarはkafka-connect-XXXXという名称で/usr/share/javaに保存します。Kafka ConnectのREST APIで起動の指示をした時、認識して自動的にロードしてくれます。

Zookeeper, Kafka, Schema-Registry 起動

su - confluent
/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
/usr/bin/kafka-server-start /etc/kafka/server.properties &
/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &

Kafka-Connect起動

distributedモードで起動します。

connect-distributed /etc/schema-registry/connect-avro-distributed.properties

REST APIを叩いてみると、コネクターのリストは空。まだ何も走らせていないので当たり前。

$ curl http://localhost:8083/connectors | jq .
[]

syslogをElasticsearchに保存してみる実験

syslogソースコネクタとelasticsearchシンクコネクタを使ってsyslogの内容をElasticsearchに保存してみます。Elasticsearchは2.x系を別途インストールしてください。

syslogの設定

Ubuntu14はrsyslogを使っていますので、TCPソケット、ポート5514にログを書き込むようにします

*.* @@localhost:5514という行を/etc/rsyslog.d/50-default.confの先頭に書きこみ、rsyslogを再起動します。

service rsyslog restart

syslogソースコネクタ

syslogソースを次のように起動します。コネクタがポート5514でログを受け取るようになります。

curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
    "name": "tcpsyslog",
    "config": {
        "tasks.max":1,
        "connector.class":"io.confluent.kafka.connect.syslog.source.TCPSyslogSourceConnector",
        "kafka.topic":"syslog-tcp",
        "syslog.port":5514,
        "syslog.reverse.dns.remote.ip": true
    }
}
'

Elasticsearchシンクコネクタ

同様にElasticsearchシンクを起動します。

curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": 1,
        "topics": "syslog-tcp",
        "key.ignore": true,
        "connection.url": "http://localhost:9200",
        "type.name": "kafka-connect"
    }
}
'

動作確認

コネクタ一覧を見てみるとtcpsyslogとelasticsearch-sinkコネクタが見えます。

$ curl http://localhost:8083/connectors | jq .
[
  "elasticsearch-sink",
  "tcpsyslog"
]

しばらくするとsyslogが何か書き込んでいるはずですのでESをチェックして見ましょう。

$ curl http://localhost:9200/_cat/indices
red open syslog-tcp    5 1

Kafkaのトピックと同じsyslog-tcpインデックスが作られていました!
読み出して見ましょう。

$ curl http://localhost:9200/syslog-tcp/_search?pretty
  ...
  {
    "_source": {
      "hostname": "localhost",
      "remote_address": "localhost/127.0.0.1:51631",
      "charset": "UTF-8",
      "message": "vagrant-ubuntu-trusty-64 kernel: [16583.344413] Hardware name: innotek GmbH VirtualBox/VirtualBox, BIOS VirtualBox 12/01/2006",
      "level": 4,
      "host": "vagrant-ubuntu-trusty-64",
      "facility": 0,
      "date": 1478654589000
    },
    "_score": 1,
    "_id": "syslog-tcp+0+75",
    "_type": "kafka-connect",
    "_index": "syslog-tcp"
  }
  ...

...おぉカーネルのログらしきものが入ってます!成功です。あっという間にパイプラインが出来上がりました。

まとめ 〜Data Integreation as Service〜

Kafka-Connectの紹介と簡単な実験をして見ました。コネクタにより"関心の分離"が可能となり、システムが疎結合になりました。Data Integreation as Serviceへの重要な布石で「データはとりあえずKafkaに送っておけ」ば後はいかようにもなります。

次回はKafka-Streamsの紹介をしたいと思います。Kafka-StreamsはKafka-Connectと違い、双方向の通信が可能です。組み合わせると非常に強力かつ柔軟なパイプラインが構築できます。

おまけ

Confluent公認コネクタ

| コネクタ | ソース・シンク | タグ | 開発元 | ダウンロード |
|-----------+-------------+------+-------------------+----------+
|HDFS | Sink | HDFS, Hadoop, Hive | Confluent |Confluent|
|JDBC | Source | JDBC, MySQL | Confluent |Confluent|
|JDBC | Sink | JDBC, MySQL | Confluent |Confluent|
|Attunity | Source | CDC | Attunity |Attunity|
|Couchbase | Source | Couchbase, NoSQL | Couchbase |Couchbase|
|DataStax | Sink | Cassandra, DataStax | Data Mountaineer |Data Mountaineer|
|Elasticsearch | Sink | search, Elastic, log, analytics | Confluent |Confluet|
|GoldenGate | Source | CDC, Oracle | Oracle |Community|
|JustOne | Sink | Postgres | JustOne |JustOne|
|Striim | Source | CDC, MS SQLServer, Oracle, MySQL | Striim |Striim|
|Syncsort DMX | Source | DB2, IMS, VSAM, CICS | Syncsort |Syncsort|
|Syncsort DMX | Sink | DB2, IMS, VSAM, CICS | Syncsort |Syncsort|
|Vertica | Source | Vertica | HP Enterprise |HP Enterprise|
|Vertica | Sink | Vertica | HP Enterprise |HP Enterprise|
|VoltDB | Sink | VoltDB, NewSQL | VoltDB |VoltDB|

コミュティにより開発されたコネクタ

| コネクタ | ソース・シンク | タグ | 開発元 | ダウンロード |
|-----------+-------------+------+-------------------+----------+
|Apache Ignite | Source | File System | Community |Community|
|Apache Ignite | Sink | File System | Community |Community|
|Bloomberg Ticker | Source | Application feed | Community |Community|
|Cassandra | Source | Cassandra | Community |Community 1|
|Cassandra | Sink | Cassandra | Community |Community|
|DynamoDB | ? | Dynamo, NoSQL | Community |Community|
|Elasticsearch | Sink | Elastic, search, log, analytics | Community |Community 1 Community 2 Community 3|
|FTP | Source | File System | Community |Community|
|Google PubSub | Source | Messaging | Community |Community|
|Google PubSub | Sink | Messaging | Community |Community|
|Hazelcast | Sink | Datastore, In-memory | Community |Community|
|Hbase | Sink | Hbase, NoSQL | Community |Community 1 Community 2|
|InfluxDB | Sink | Datastore, Time-series | Community |Community|
|Internet Relay Chat | Source | Application feed | Community |Community|
|Jenkins | Source | Application feed | Community |Community|
|JMS | Sink | Messaging | Community |Community|
|Kudu | Sink | Kudu | Community |Community|
|Mixpanel | Source | analytics | Community |Community|
|MongoDB | Source | Mongo, MongoDB, NoSQL | Community |Community|
|MongoDB CDC - Debezium | Source | MongoDB, CDC | Community |Community|
|MQTT | Source | MQTT, messaging | Community |Community|
|MySQL CDC - Debezium | Source | MySQL, CDC, Oracle | Community |Community|
|Redis | Sink | Datastore, Messaging | Community |Community|
|RethinkDB | Source | NoSQL, Streaming | Community |Community|
|RethinkDB | Sink | NoSQL, Streaming | Community |Community|
|Salesforce | Source | Datastore | Community |Community|
|Solr | Sink | search, solr | Community |Community 1 Community 2|
|Solr | Source | search, solr | Community |Community|
|Splunk | Sink | Datastore, log | Community |Community|
|Splunk | Source | Datastore, log | Community |Community|
|spooldir | Source | File System | Community |Community|
|SQS | Source | AWS, SQS, Messaging | Community | Community|
|Syslog | Source | log | Community |Community|
|S3 | Sink | AWS, S3, log | Community | Community 1 Community 2|
|Twitter | Source | Application feed | Community |Community 1 Community 2|
|Twitter | Sink | Application feed | Community | Community|

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
23

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?