Help us understand the problem. What is going on with this article?

Spark Streamingの動作確認 ローカル及びクラスタ環境

More than 3 years have passed since last update.

Spark入門の7章及び以下のサイトを参考にSparkStreamingの動作確認を行なった際のメモ。
http://spark.apache.org/docs/latest/streaming-programming-guide.html

ローカル環境でSpark Streamingの動作確認

ポート9999番で待ち受けるデータサーバ(データを書き込む)がある。このネットワークソケットに接続して、1秒毎にテキストデータをストリームデータとして読み込み、そのストリームデータに含まれる単語数をカウントするプログラムの動作確認を行う。
ソースは上記で参照しているチュートリアルから参照可能。

作成したプログラムは以下のとおり。

chap7-1.py
# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel

sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999,StorageLevel.MEMORY_AND_DISK_SER) 

words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()             
ssc.awaitTermination() 

StreamingContextを生成

StreamingContextはストリーミング機能のメインエントリーポイントであり、StreamingContextを利用して入力ストリームデータの定義などを行う。

class pyspark.streaming.StreamingContext(sparkContext, batchDuration=None, jssc=None)

以下の例では1秒毎のバッチ処理を繰り返すStreamingContextを生成。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

DStreamの作成

このコンテキスト(StreamingContext)を使用して、DStreamを定義する。DStreamは時々刻々と生成されるRDDみたいなもので、データサーバから受信するストリームデータである。DStreamに含まれるそれぞれのレコードは1行のテキストである。
以下の例ではsocketTextStreamメソッドを使用し、localhostの9999番ポートに接続するように定義。

lines = ssc.socketTextStream("localhost", 9999)

SocketTextStreamは、TCPソース(hostname:port)からinputを生成する。データはTCPソケットを使用して受け取とられ、UTF-8でエンコードし、¥nをデリミタとした行単位でバイトで受け取る。

socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))

StorageLevelは次のURLを参照。 http://spark.apache.org/docs/latest/api/python/pyspark.html

word count処理

ストリームデータとして得られたテキストラインをスペース区切りで文字に分割する。

words = lines.flatMap(lambda line: line.split(" "))

この例では、それぞれのテキストラインが複数の語(word)に分けられ、そしてwordのストリームがDStreamとなる。

次にwordをカウントする。
words DStreamを(word,1)ペアのDStreamにマップし、それからwordの頻度を得るためにreduce処理を行う。

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

これで単語毎の出現回数を保持するDStreamが定義できた。

次に毎秒生成されるカウントをpprintメソッドで出力する。

wordCounts.pprint()

ここまででセットアップは完了したので、以下のコードを追加してストリーム処理を開始する。

ssc.start()             # 計算処理を開始
ssc.awaitTermination()  # stopメソッドによるcontextの終了や例外が発生するまで待つ

動作確認

作成したSparkStreamingのアプリを実行

$ spark-submit --master local chap7-1.py

別ターミナルにて、データ書き込み用に、NetCatを使って9999番ポートで待ち受けるようにし、テキストデータを書き込む。

$ nc -lk 9999
hello world

プログラムを実行したターミナルにて以下の出力があることを確認。

・・・
------------------------------------------
Time: 2015-12-22 08:28:02
-------------------------------------------
('world', 1)
('hello', 1)
・・・

HDFS環境でSpark Streamingの動作確認

HDFS上のディレクトリを監視し、新しいファイルが生成されたら、入力データとして取り込み、ワードをカウントするアプリを実行する。
※既存のファイルを更新しても入力データとして認識されないので注意。

変更点

先ほどのコードとの違いは以下の点。

  • localの代わりにyarn-clientをmasterとして動作するように指定 sc = SparkContext("yarn-client", "Chap7-2")
  • socketTextStreamメソッドの代わりにtextFileStreamメソッドを使用し、監視対象のhdfsディレクトリを引数で指定。textFileStreamはディレクトリに含まれるファイルを監視し、新しいファイルが生成されたらストリームデータとして取得する。
lines = ssc.textFileStream("hdfs:///user/y_tadayasu/data/") 

動作確認

アプリの実行

$ spark-submit --master yarn-client ./chap7-2.py

HDFS上にファイルを生成

$ hdfs dfs -put /opt/spark/README.md /user/y_tadayasu/data/

アプリを実行しているターミナルに以下の出力があることを確認。

-------------------------------------------
Time: 2015-12-23 04:15:50
-------------------------------------------
('', 67)
('guide,', 1)
('APIs', 1)
('name', 1)
('It', 2)
('package.', 1)
('particular', 3)
('tools', 1)
('must', 1)
('params', 1)
...

この結果だと空白もカウントされているので、filterを使用して空白は取り除くようにする。

words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
-------------------------------------------
Time: 2015-12-23 04:25:30
-------------------------------------------
('guide,', 1)
('APIs', 1)
('optimized', 1)
('name', 1)
('It', 2)
('package.', 1)
('particular', 3)
('tools', 1)
('must', 1)
('params', 1)
...

コード全体は以下のとおり。

chap7-2.py
# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel
sc = SparkContext("yarn-client", "Chap7-2")
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream("hdfs:///user/y_tadayasu/data/") 
words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()             
ssc.awaitTermination() 
t-yotsu
ここに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away