Study Flink Data Streaming Exercise Code

More than 1 year has passed since last update.

This post is based on Flink exercise materials here.


OS: Ubuntu 16.04
Kafka: kafka_2.10-
Elasticsearch: elasticsearch-2.4.3
Kibana: kibana-4.6.4-linux-x86_64
Flink: flink-1.3.1-bin-hadoop27-scala_2.10
Java: openjdk version "1.8.0_131"

ubuntu@ubuntu:~$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)

IDE: IntelliJ
Build Tool: Apache Maven 3.5.0

buntu@ubuntu:~$ mvn --version
Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 2017-04-03T12:39:06-07:00)

Generate a new Flink project

To create a new Flink Scala project, run the following command.

ubuntu@ubuntu:~/workspace$ mvn archetype:generate                             \
    -DarchetypeGroupId=org.apache.flink            \
    -DarchetypeArtifactId=flink-quickstart-scala   \
    -DarchetypeVersion=1.3.0                       \
    -DgroupId=org.apache.flink.quickstart          \
    -DartifactId=flink-scala-project               \
    -Dversion=0.1                                  \
    -Dpackage=org.apache.flink.quickstart          \

After the installation, you can find a new project directory.

ubuntu@ubuntu:~/workspace$ ls

Next, clone "flink-training-exercises" project and install it

ubuntu@ubuntu:~/workspace$ git clone
ubuntu@ubuntu:~/workspace$ cd flink-training-exercises
ubuntu@ubuntu:~/workspace/flink-training-exercises$ mvn clean install

After the installation, add the following dependencies to "~/workspace/flink-scala-project/pom.xml" file.

ubuntu@ubuntu:~/workspace/flink-scala-project$ diff -u ~/workspace/backups/flink-scala-project_backup/pom.xml  ./pom.xml 
--- /home/ubuntu/workspace/backups/flink-scala-project_backup/pom.xml 2017-07-01 04:41:21.979384836 -0700
+++ ./pom.xml 2017-07-01 18:03:26.341580985 -0700
@@ -92,6 +92,11 @@
+  <dependency>
+   <groupId></groupId>
+   <artifactId>flink-training-exercises</artifactId>
+   <version>0.10.0</version>
+  </dependency>

Finally, run "mvn clean package" command under /flink-scala-project

ubuntu@ubuntu:~/workspace/flink-scala-project$ mvn clean package

Memo: What is the differene between "mvn clean install" and "mvn clean package"

When you use "mvn clean install", it put necessary packages in your local repository so that other projects can refer the local repository.
In the above steps, "mvn clean install" is used for "training-exercising-project". It means, other projects can refer its package later. On the other hand, "mvn clean package" is used for "quickstart-project". It means "quickstart-project" refers to the package of "training-exercising-project". In order to refer the local repository, maven dependency is added in pom.xml before "mvn clean package".


Grab the overview of Scala programming

Taxi Ride Cleansing

Start from this exercise Taxi Ride Cleansing.
(You can find the source code here)

The task of the “Taxi Ride Cleansing” exercise is to cleanse a stream of TaxiRide events by removing events that do not start or end in New York City.

1. Pass input data to "TaxiRideSource" which generates a stream of TaxiRide events
val rides = env.addSource(new TaxiRideSource(input, maxDelay, speed))
2. Filter out ride events that do not start and end in NYC by using "GeoUtils.isInNYC" function can be called within a FilterFunction to check if a location is in the New York City area.
    val filteredRides = rides.filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))

Popular Places

Next exercise is Popular Places

The task of the “Popular Places” exercise is to identify popular places from the taxi ride data stream.

1. Pass input data as like in "Taxi Ride Cleansing"
val rides = env.addSource(new TaxiRideSource(input, maxDelay, speed))
2. After filtering out the events outside of NYC, map "Start events -> their departure location" and "end events -> their destination location
    val popularPlaces = rides
      // remove all rides which are not within NYC
      .filter { r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat) }
      // match ride to grid cell and event type (start or end)
      .map(new GridCellMatcher)

Here, "GridCellMatcher" is defined as below.

  class GridCellMatcher extends MapFunction[TaxiRide, (Int, Boolean)] {
    def map(taxiRide: TaxiRide): (Int, Boolean) = {
      if (taxiRide.isStart) {
        // get grid cell id for start location
        val gridId: Int = GeoUtils.mapToGridCell(taxiRide.startLon, taxiRide.startLat)
        (gridId, true)
      } else {
        // get grid cell id for end location
        val gridId: Int = GeoUtils.mapToGridCell(taxiRide.endLon, taxiRide.endLat)
        (gridId, false)
3. Then, identify popular places which exceeds the threshod defined by time window of 15 minutes triggers every five minutes.

This is done by counting every five minutes the number of taxi rides that started and ended in the same area within the last 15 minutes.

    val popularPlaces = rides
      .timeWindow(Time.minutes(15), Time.minutes(5))
      // count events in window
      .apply{ (key: (Int, Boolean), window, vals, out: Collector[(Int, Long, Boolean, Int)]) =>
      out.collect( (key._1, window.getEnd, key._2, vals.size) )
      // filter by popularity threshold
      .filter( c => { c._4 >= popThreshold } )