Environment
OS: Ubuntu 16.04
Kafka: kafka_2.10-0.10.2.0
Elasticsearch: elasticsearch-2.4.3
Kibana: kibana-4.6.4-linux-x86_64
Flink: flink-1.3.1-bin-hadoop27-scala_2.10
Java: openjdk version "1.8.0_131"
Scala: 2.10
Build Tool: Apache Maven 3.5.0
IDE: IntelliJ
Add dependencies to pom.xml to use Elasticsearch
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.2.0</version>
</dependency>
Write to Elasticsearch
Use "ElasticsearchSink" class which is provided by Flink’s Elasticsearch Connector to write a DataStream to an Elasticsearch index
Where,
"jConfig" is userConfig defined as a map of user settings.
"jTransports" is transportAddresses of Elasticsearch nodes to which to connect using a TransportClient.
"PopularPlaceInserter" is elasticsearchSinkFunction used to generate multiple ActionRequest from the incoming element.
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.html)
popularPlaces.addSink(
new ElasticsearchSink(jConfig, jTransports, new PopularPlaceInserter))
"PopularPlaceInserter" is defined as following.
class PopularPlaceInserter extends ElasticsearchSinkFunction[(Float, Float, Long, Boolean, Int)] {
def process(record: (Float, Float, Long, Boolean, Int), ctx: RuntimeContext, indexer: RequestIndexer) {
val json = Map(
"time" -> record._3.toString,
"location" -> (record._2 + "," + record._1),
"isStart" -> record._4.toString,
"cnt" -> record._5.toString
)
val rqst: IndexRequest = Requests.indexRequest
.index("nyc-places")
.`type`("popular-locations")
.source(json.asJava)
indexer.add(rqst)
}
}
Memo: What is Sink?
https://en.wikipedia.org/wiki/Sink_(computing)
In computing, a sink, event sink or data sink is a class or function designed to receive incoming events from another object or function. This is commonly implemented in C++ as callbacks. Other Object-oriented languages, such as Java and C#, have built-in support for sinks by allowing events to be fired to delegate functions.
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html
So what?? In Flink, "Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams".
"addSink" is just one of built-in output formats, which "invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions".
(I hope someday I will fully understand what they say here.)
Memo: What is "class xxx extends"?
Reading through Scala code, I often find statement like "class ArrayElement extends Element". What is this?
By using "extends", class ArrayElement can inherit all non-private members from class Element. If a member with the same name and parameters is already implemented in the subclass, a member of a superclass is not inherited. Therefore, in the example above, "PopularPlaceInserter" inherited non-private members from "ElasticsearchSinkFunction".
According to this API document,
the method creates multiple ActionRequests from an element in a Stream. This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.