10
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWS Kinesis Streams + Spark Streamingでストリーム処理を試してみた

Posted at

はじめに

今後はストリーム処理の時代がくるはずです。たぶん。そんな気がします。
というわけで、適当なデータ発生を発生させて、Kinesis Streams+Spark streamingでストリーム処理を体験してみました。

Spark 2.0.0 (EMRを利用)

Kinesis Streamsとは

昔は単にKinesisと呼ばれていましたが、後からKinesis FirehoseとKinesis Analyticsが追加されたため、Kinesis3兄弟のうちの1人と呼ばれています。
大規模でスケール可能で、メッセージが一定時間保存されるPubSub型キューのことを指します。

つまり、データを発生させるProducer相当と、後段の処理であるConsumer相当を作成する必要があります。

Spark Streamingとは

大規模データ分散処理フレームワークのApache sparkのライブラリの一つで、少しのコード変更でストリーミング処理が記述できます。

今回Spark環境はAmazon EMR(マネージドHadoop、Hadoopエコシステムも使える)で用意します。
https://aws.amazon.com/jp/emr/

準備

Kinesis streamsの準備

AWSコンソールからKinesis streamsのページに行きます。

  • ストリーム名
  • シャード数

を決めるだけです。スループット要件等に応じて決定してください。ここでは適当に2にします。
スクリーンショット 2016-11-03 8.50.13.png

EMRの準備

AWSコンソールからEMRのページに行きます。
サブネットを指定する場合はクラスター作成から詳細オプションに移動します。
Sparkに必ずチェックをつけてください。

あとはデフォルトで問題ありませんが、マスターのセキュリティグループはデフォルトではSSHポートが空いていないので他のセキュリティグループをつける必要があります。

スクリーンショット 2016-11-03 8.56.30.png

EC2

データの作成用にEC2も1台準備しておきます。botoが使えればなんでもよいです。

データ作成(producer)

データを発生させるスクリプトです。色々な言語で記述できるのですが、ここではPythonを使います。

put.py
#!/usr/bin/python
# -*- coding: utf-8 -*-

import boto.kinesis,datetime,time,random

connection = boto.kinesis.connect_to_region('ap-northeast-1')
stream_name = 'stream-test'

partition_key = str(time.time()) + str(random.random())
source_str = 'abcdefghijklmnopqrstuvwxyz'

for loop in range(10):
    for i in range(10):
        sampledata=str(i)+","+random.choice(source_str)
        put_data = connection.put_record(stream_name,sampledata,partition_key)
    time.sleep(10)

やっていることとしては、

  1. Kinesis接続の準備
  2. 数値およびアルファベットのランダム文字列を10回発生させる
  3. Kinesis streamsにPutする
  4. 10秒間隔で2-4を10回やる

です。

partition_keyで設定した値をハッシュ化したものをもちいてシャードの振り分けが行われるみたいです。
なのである程度ランダムになるようにしています。

データ処理(Consumer)

Kinesis streamsからデータを取得し、何らかの処理をするスクリプトです。

get.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

if __name__ == "__main__":

    sc = SparkContext()
    ssc = StreamingContext(sc, 5)
    appName="stream-app"
    streamName="stream-test"
    endpointUrl="https://kinesis.ap-northeast-1.amazonaws.com"
    regionName="ap-northeast-1"

    lines = KinesisUtils.createStream(
        ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 1)

    lines.pprint()
    ssc.start()
    ssc.awaitTermination()

Sparkの中にKinesis stramsを利用するためのモジュールが用意されているのでそれを使います。
まずSparkを利用するための入り口ともいえるSparkContextとStreamingでつかうためのStreamingContextを用意します。
StreamingContextはWindows単位で処理をするので、ここでは5秒おきとしています。

    sc = SparkContext()
    ssc = StreamingContext(sc, 5)

Streamsを作ります。appnameはなんでもよいのでstream-appとしています。

    appName="stream-app"
    streamName="stream-test"
    endpointUrl="https://kinesis.ap-northeast-1.amazonaws.com"
    regionName="ap-northeast-1"
    lines = KinesisUtils.createStream(
        ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 1)

表示します。

    lines.pprint()

Stream処理を起動します。

    ssc.start()
    ssc.awaitTermination()

実行

EMRでSparkのジョブを実行します。この時JARファイルを指定します。

spark-submit --jars /usr/lib/spark/external/lib/spark-streaming-kinesis-asl-assembly_*.jar ./get.py

EC2でデータを発生させます。

python put.py

少し待つとConsumer側で表示されます。

-------------------------------------------
Time: 2016-11-03 01:38:00
-------------------------------------------
0,c
1,e
2,x
3,w
4,g
5,h
6,t
7,q
8,c
9,o

行数をカウントしたい場合は例えば以下のようにします。

    counts=lines.count()
    counts.pprint()

これでストリーム処理を実装する際のイメージがつかめたかな、と思います。
性能等については何も考えていないので、要検討です。

10
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?