前回の記事では、Alibaba Cloud E-MapReduceを利用したApache KafkaとApache Spark Streamingの統合方法を紹介しました。今回の記事は、引き続きTwitterメッセージの例を用いて、多くのエンタープライズ企業で導入されたKafkaとElasticsearchの構成について、Alibaba Cloud上での統合方法を説明してみたいと思います。
検証環境について
Kafka
- EMR-3.20.0
- Zookeeper 3.4.13
- Kakfa 1.1.1
- クラスタータイプは Kafka
- ハードウェア構成(Header)はecs.sn2.largeを1台
- ハードウェア構成(Worker)はecs.sn2.largeを2台
※ クラスターの作成手順は公式ドキュメントにご参照いただけますと幸いです。
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
# uname -r
3.10.0-693.2.2.el7.x86_64
# echo envi | nc localhost 2181
Environment:
zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
host.name=emr-header-1.cluster-43709
java.version=1.8.0_151
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre
java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=3.10.0-693.2.2.el7.x86_64
user.name=hadoop
user.home=/home/hadoop
user.dir=/home/hadoop
Hosebird Client
Hosebird Clientというのは、Kafka Producerと連携する時に、TwitterのStreaming APIをコールするJava Http Clientです。
詳しく知りたい方は下記のgithubにご参考頂ければと思います。
[https://github.com/twitter/hbc:embed:cite]
全体構成図
まずはじめに、Alibaba Cloudの構成図は以下となります。Elasticsearch環境を素早く構築する為、Alibaba Cloudのホスト型Elasticsearchを使いました。現時点はAlibaba Cloud Elasticsearchが Elasticsearch 5.5.3 with Commercial Feature、Elasticsearch 6.3.2 with Commercial Feature、Elasticsearch 6.7.0 with Commercial Feature3つのバージョンをサポートしており、エンタープライズレベルのアクセス制御、セキュリティモニタリング、アラーム、可視化レポート、機械学習などのX-Packプラグインも含まれております。
Kafka Producer
準備が整ったので、ローカルのJava開発環境で早速コードを書いていきましょう! まずは、以下のサンプルコードでKafka Producerを作成し、jarファイルを生成します。
Kakfa BootstrapServers
Kafkaクラスターの任意の一台のIPアドレスで構いません。
Twitter Streaming API認証情報
Twitter Streaming APIを利用する為に、consumerKey、consumerSecret、token、secretをそれぞれ事前に取得して入力します。
public class ProducerTest {
Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName());
/** ---------------------- Twitter Streaming API情報 ---------------------- */
String consumerKey = "xxxxxxxxxxxxxxxx";
String consumerSecret = "xxxxxxxxxxxxxxxx";
String token = "xxxxxxxxxxxxxxxx";
String secret = "xxxxxxxxxxxxxxxx";
String mytopic = "tweets_poc";
/** ---------------------- Tweetsキーワードを指定 ---------------------- */
List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G");
public ProducerTest(){}
public static void main(String[] args) {
new ProducerTest().run();
}
public void run(){
logger.info("Setup");
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
Client client = createTwitterClient(msgQueue);
client.connect();
KafkaProducer<String, String> producer = createKafkaProducer();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("stopping application...");
logger.info("shutting down client from twitter...");
client.stop();
logger.info("closing producer...");
producer.close();
logger.info("done!");
}));
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null){
logger.info(msg);
if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){
producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) {
producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"IoT")) {
producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"5G")) {
producer.send(new ProducerRecord<>(mytopic,5,"5G", msg));
}
else{
producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
}
}
logger.info("End of application");
}
/** ---------------------- Hosebird Clientを作成 ---------------------- */
public Client createTwitterClient(BlockingQueue<String> msgQueue){
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
hosebirdEndpoint.trackTerms(terms);
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01")
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
Client hosebirdClient = builder.build();
return hosebirdClient;
}
/** ---------------------- kakfa producerを作成 ---------------------- */
public KafkaProducer<String, String> createKafkaProducer(){
String bootstrapServers = "xxxxxxxxxxxxxxxx";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
}
Kafka Consumer
ローカルのJava開発環境で、Consumerを作成しましょう。
Elasticsearch認証情報
ElasticSearchサービスを利用する為に、 elasticsearch側のusername、password、アクセスエンドポイントをそれぞれ事前に取得しておきます。
Elasticsearchの事前準備
kibanaコンソールから、ElaticsearchのIndex(twitter)とType(tweets)を事前に作成しておきます。
public class ElasticSearchConsumer {
public static RestHighLevelClient createClient(){
/** ---------------------- ElasticSearch認証情報 ---------------------- */
String hostname = "xxxxxxxxxxxxxxxx";
String username = "xxxxxxxxxxxxxxxx";
String password = "xxxxxxxxxxxxxxxx";
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(
new HttpHost(hostname, 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
public static KafkaConsumer<String, String> createConsumer(String topic){
String bootstrapServers = "xxxxxxxxxxxxxxxx";
String groupId = "kafka-demo-elasticsearch";
/** ---------------------- consumer パラメータ設定 ---------------------- */
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
/** ---------------------- consumer 作成 ---------------------- */
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
return consumer;
}
private static JsonParser jsonParser = new JsonParser();
private static String extractIdFromTweet(String tweetJson){
return jsonParser.parse(tweetJson)
.getAsJsonObject()
.get("id_str")
.getAsString();
}
public static void main(String[] args) throws IOException {
Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
RestHighLevelClient client = createClient();
KafkaConsumer<String, String> consumer = createConsumer("tweets_poc");
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
Integer recordCount = records.count();
logger.info("Received " + recordCount + " records");
for (ConsumerRecord<String, String> record : records){
try {
String id = extractIdFromTweet(record.value());
/** ---------------------- データをElasticSearchに挿入 ---------------------- */
IndexRequest indexRequest = new IndexRequest(
"twitter",
"tweets",
id
).source(record.value(), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest,RequestOptions.DEFAULT);
logger.info(indexResponse.getId());
} catch (NullPointerException e){
logger.warn("skipping bad data: " + record.value());
}
}
if(recordCount > 0){
logger.info("Committing offsets...");
consumer.commitSync();
logger.info("Offsets have been committed");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
実行
まず、作成したKafka ProducerとKafka ConsumerのJarファイルを一旦Alibaba Cloud OSSにアップロードしておきます。そして、ECS(Kafka Producer)にsshログインして、ossutilなどのツールでKafka Producerのjarファイル(TweetsProducerTest-1.0-jar-with-dependencies.jar)をECSにダウンロードしていきます。ダウンロードできたら、下記のコマンドでKafka Producerを起動して、Twitterに投稿したメッセージを収集させます。
java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar
同様に、ECS(Kafka Consumer)にsshログインして、Kafka Consumerのjarファイル(TweetsProducerTest-1.0-jar-with-dependencies.jar)をダウンロードしていきます。ダウンロードできたら、また下記のコマンドでKafka Consumerを起動して、KafkaのPartitionからメッセージを読み出します。
java -jar kafka-elasticsearch-poc-1.0-jar-with-dependencies.jar
上記のコマンドを実行したら、下図のように、kafkaから読み出したTwitterメッセージのIDをターミナル画面上で確認することができます。
そこで、上記の赤枠のTwitterIDを例として、ElasticSearchのkibanaで検索して確認してみます。まず、Alibaba Cloudコンソールからkibanaへアクセスします。
赤枠のIDをkibanaコンソールで検索すると、該当のTwitterメッセージが出てきました!
最後
Apache KafkaとElasticsearchは、これまで見てきた数多くの企業で実際に利用されており、KafkaはElasticsearchを利用する前に、データのストリーム処理に重要な役割を担っている一方、Elasticsearch は元のデータをそのまま保存するのではなく、高速検索に適用されます。また、今回ご紹介したKafka以外、クラウドマネージドサービスを望むようでしたら、Alibaba CloudのLogServiceを使っても同じ役割を果たせるので、ご興味をお持ちの方、ぜひご参考してみてください!