Help us understand the problem. What is going on with this article?

KafkaとElasticSearchの連携を試しました。

前回の記事では、Alibaba Cloud E-MapReduceを利用したApache KafkaとApache Spark Streamingの統合方法を紹介しました。今回の記事は、引き続きTwitterメッセージの例を用いて、多くのエンタープライズ企業で導入されたKafkaとElasticsearchの構成について、Alibaba Cloud上での統合方法を説明してみたいと思います。

検証環境について

Kafka
  • EMR-3.20.0
  • Zookeeper 3.4.13
  • Kakfa 1.1.1
  • クラスタータイプは Kafka
  • ハードウェア構成(Header)はecs.sn2.largeを1台
  • ハードウェア構成(Worker)はecs.sn2.largeを2台

※ クラスターの作成手順は公式ドキュメントにご参照いただけますと幸いです。

# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
# uname -r
3.10.0-693.2.2.el7.x86_64
# echo envi | nc localhost 2181
Environment:
zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
host.name=emr-header-1.cluster-43709
java.version=1.8.0_151
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre
java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=3.10.0-693.2.2.el7.x86_64
user.name=hadoop
user.home=/home/hadoop
user.dir=/home/hadoop
Hosebird Client

Hosebird Clientというのは、Kafka Producerと連携する時に、TwitterのStreaming APIをコールするJava Http Clientです。
詳しく知りたい方は下記のgithubにご参考頂ければと思います。
[https://github.com/twitter/hbc:embed:cite]

全体構成図

まずはじめに、Alibaba Cloudの構成図は以下となります。Elasticsearch環境を素早く構築する為、Alibaba Cloudのホスト型Elasticsearchを使いました。現時点はAlibaba Cloud Elasticsearchが Elasticsearch 5.5.3 with Commercial Feature、Elasticsearch 6.3.2 with Commercial Feature、Elasticsearch 6.7.0 with Commercial Feature3つのバージョンをサポートしており、エンタープライズレベルのアクセス制御、セキュリティモニタリング、アラーム、可視化レポート、機械学習などのX-Packプラグインも含まれております。

f:id:sbc_kou:20190807105656p:plain

Kafka Producer

準備が整ったので、ローカルのJava開発環境で早速コードを書いていきましょう! まずは、以下のサンプルコードでKafka Producerを作成し、jarファイルを生成します。

Kakfa BootstrapServers

Kafkaクラスターの任意の一台のIPアドレスで構いません。

Twitter Streaming API認証情報

Twitter Streaming APIを利用する為に、consumerKey、consumerSecret、token、secretをそれぞれ事前に取得して入力します。

public class ProducerTest {

    Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName());

 /** ---------------------- Twitter Streaming API情報 ---------------------- */
    String consumerKey = "xxxxxxxxxxxxxxxx";
    String consumerSecret = "xxxxxxxxxxxxxxxx";
    String token = "xxxxxxxxxxxxxxxx";
    String secret = "xxxxxxxxxxxxxxxx";
    String mytopic = "tweets_poc";

 /** ---------------------- Tweetsキーワードを指定 ---------------------- */
    List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G");


    public ProducerTest(){}

    public static void main(String[] args) {
        new ProducerTest().run();
    }

    public void run(){

        logger.info("Setup");

        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);

        Client client = createTwitterClient(msgQueue);
        client.connect();

        KafkaProducer<String, String> producer = createKafkaProducer();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            logger.info("shutting down client from twitter...");
            client.stop();
            logger.info("closing producer...");
            producer.close();
            logger.info("done!");
        }));

        while (!client.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                client.stop();
            }

            if (msg != null){
                logger.info(msg);
                if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){
                    producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) {
                    producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"IoT")) {
                    producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"5G")) {
                    producer.send(new ProducerRecord<>(mytopic,5,"5G", msg));
                }
                else{
                    producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
            }
        }
        logger.info("End of application");
    }

 /** ---------------------- Hosebird Clientを作成 ---------------------- */
    public Client createTwitterClient(BlockingQueue<String> msgQueue){

        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

        hosebirdEndpoint.trackTerms(terms);

        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")                              
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));

        Client hosebirdClient = builder.build();
        return hosebirdClient;
    }

  /** ---------------------- kakfa producerを作成 ---------------------- */
    public KafkaProducer<String, String> createKafkaProducer(){

        String bootstrapServers = "xxxxxxxxxxxxxxxx";

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); 

        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        return producer;
    }

}

Kafka Consumer

ローカルのJava開発環境で、Consumerを作成しましょう。

Elasticsearch認証情報

ElasticSearchサービスを利用する為に、 elasticsearch側のusername、password、アクセスエンドポイントをそれぞれ事前に取得しておきます。

Elasticsearchの事前準備

kibanaコンソールから、ElaticsearchのIndex(twitter)とType(tweets)を事前に作成しておきます。

public class ElasticSearchConsumer {

    public static RestHighLevelClient createClient(){

        /** ---------------------- ElasticSearch認証情報 ---------------------- */
        String hostname = "xxxxxxxxxxxxxxxx"; 
        String username = "xxxxxxxxxxxxxxxx"; 
        String password = "xxxxxxxxxxxxxxxx"; 

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(username, password));

        RestClientBuilder builder = RestClient.builder(
                new HttpHost(hostname, 9200, "http"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });

        RestHighLevelClient client = new RestHighLevelClient(builder);
        return client;
    }

    public static KafkaConsumer<String, String> createConsumer(String topic){

        String bootstrapServers = "xxxxxxxxxxxxxxxx";
        String groupId = "kafka-demo-elasticsearch";

         /** ---------------------- consumer パラメータ設定 ---------------------- */
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

        /** ---------------------- consumer 作成 ---------------------- */
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(topic));

        return consumer;

    }

    private static JsonParser jsonParser = new JsonParser();

    private static String extractIdFromTweet(String tweetJson){

        return jsonParser.parse(tweetJson)
                .getAsJsonObject()
                .get("id_str")
                .getAsString();
    }

    public static void main(String[] args) throws IOException {

        Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
        RestHighLevelClient client = createClient();

        KafkaConsumer<String, String> consumer = createConsumer("tweets_poc");

        while(true){
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100)); 

            Integer recordCount = records.count();
            logger.info("Received " + recordCount + " records");    

            for (ConsumerRecord<String, String> record : records){

                try {
                    String id = extractIdFromTweet(record.value());

                    /** ---------------------- データをElasticSearchに挿入 ---------------------- */
                    IndexRequest indexRequest = new IndexRequest(
                            "twitter",
                            "tweets",
                            id 
                    ).source(record.value(), XContentType.JSON);


                    IndexResponse indexResponse = client.index(indexRequest,RequestOptions.DEFAULT);
                    logger.info(indexResponse.getId());

                } catch (NullPointerException e){
                    logger.warn("skipping bad data: " + record.value());
                }

            }

            if(recordCount > 0){

                logger.info("Committing offsets...");
                consumer.commitSync();
                logger.info("Offsets have been committed");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }

    }
}

実行

まず、作成したKafka ProducerとKafka ConsumerのJarファイルを一旦Alibaba Cloud OSSにアップロードしておきます。そして、ECS(Kafka Producer)にsshログインして、ossutilなどのツールでKafka Producerのjarファイル(TweetsProducerTest-1.0-jar-with-dependencies.jar)をECSにダウンロードしていきます。ダウンロードできたら、下記のコマンドでKafka Producerを起動して、Twitterに投稿したメッセージを収集させます。

java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar

同様に、ECS(Kafka Consumer)にsshログインして、Kafka Consumerのjarファイル(TweetsProducerTest-1.0-jar-with-dependencies.jar)をダウンロードしていきます。ダウンロードできたら、また下記のコマンドでKafka Consumerを起動して、KafkaのPartitionからメッセージを読み出します。

java -jar kafka-elasticsearch-poc-1.0-jar-with-dependencies.jar

上記のコマンドを実行したら、下図のように、kafkaから読み出したTwitterメッセージのIDをターミナル画面上で確認することができます。
f:id:sbc_kou:20190807110631p:plain

そこで、上記の赤枠のTwitterIDを例として、ElasticSearchのkibanaで検索して確認してみます。まず、Alibaba Cloudコンソールからkibanaへアクセスします。
f:id:sbc_kou:20190820143751p:plain
赤枠のIDをkibanaコンソールで検索すると、該当のTwitterメッセージが出てきました!
f:id:sbc_kou:20190807110421p:plain

最後

いかがでしたでしょうか

Apache KafkaとElasticsearchは、これまで見てきた数多くの企業で実際に利用されており、KafkaはElasticsearchを利用する前に、データのストリーム処理に重要な役割を担っている一方、Elasticsearch は元のデータをそのまま保存するのではなく、高速検索に適用されます。また、今回ご紹介したKafka以外、クラウドマネージドサービスを望むようでしたら、Alibaba CloudのLogServiceを使っても同じ役割を果たせるので、ご興味をお持ちの方、ぜひご参考してみてください!

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした