LoginSignup
0
0

More than 5 years have passed since last update.

Study Flilnk with Elasticsearch Exercise Code

Last updated at Posted at 2017-07-09

Environment

OS: Ubuntu 16.04
Kafka: kafka_2.10-0.10.2.0
Elasticsearch: elasticsearch-2.4.3
Kibana: kibana-4.6.4-linux-x86_64
Flink: flink-1.3.1-bin-hadoop27-scala_2.10
Java: openjdk version "1.8.0_131"
Scala: 2.10
Build Tool: Apache Maven 3.5.0
IDE: IntelliJ

Add dependencies to pom.xml to use Elasticsearch

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.2.0</version>
</dependency>

Write to Elasticsearch

Use "ElasticsearchSink" class which is provided by Flink’s Elasticsearch Connector to write a DataStream to an Elasticsearch index

Where,
"jConfig" is userConfig defined as a map of user settings.
"jTransports" is transportAddresses of Elasticsearch nodes to which to connect using a TransportClient.
"PopularPlaceInserter" is elasticsearchSinkFunction used to generate multiple ActionRequest from the incoming element.
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.html)

 popularPlaces.addSink(
      new ElasticsearchSink(jConfig, jTransports, new PopularPlaceInserter))

"PopularPlaceInserter" is defined as following.

  class PopularPlaceInserter extends ElasticsearchSinkFunction[(Float, Float, Long, Boolean, Int)] {
    def process(record: (Float, Float, Long, Boolean, Int), ctx: RuntimeContext, indexer: RequestIndexer) {
      val json = Map(
        "time" -> record._3.toString,
        "location" -> (record._2 + "," + record._1),
        "isStart" -> record._4.toString,
        "cnt" -> record._5.toString
      )
      val rqst: IndexRequest = Requests.indexRequest
        .index("nyc-places")
        .`type`("popular-locations")
        .source(json.asJava)
      indexer.add(rqst)
    }
  }

Memo: What is Sink?

https://en.wikipedia.org/wiki/Sink_(computing)
In computing, a sink, event sink or data sink is a class or function designed to receive incoming events from another object or function. This is commonly implemented in C++ as callbacks. Other Object-oriented languages, such as Java and C#, have built-in support for sinks by allowing events to be fired to delegate functions.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html
So what?? In Flink, "Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams".
"addSink" is just one of built-in output formats, which "invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions".
(I hope someday I will fully understand what they say here.)

Memo: What is "class xxx extends"?

Reading through Scala code, I often find statement like "class ArrayElement extends Element". What is this?
By using "extends", class ArrayElement can inherit all non-private members from class Element. If a member with the same name and parameters is already implemented in the subclass, a member of a superclass is not inherited. Therefore, in the example above, "PopularPlaceInserter" inherited non-private members from "ElasticsearchSinkFunction".
According to this API document,
the method creates multiple ActionRequests from an element in a Stream. This is used by ElasticsearchSink to prepare elements for sending them to Elasticsearch.

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0