5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spark Streamingの動作確認 ローカル及びクラスタ環境

Posted at

Spark入門の7章及び以下のサイトを参考にSparkStreamingの動作確認を行なった際のメモ。
http://spark.apache.org/docs/latest/streaming-programming-guide.html

ローカル環境でSpark Streamingの動作確認

ポート9999番で待ち受けるデータサーバ(データを書き込む)がある。このネットワークソケットに接続して、1秒毎にテキストデータをストリームデータとして読み込み、そのストリームデータに含まれる単語数をカウントするプログラムの動作確認を行う。
ソースは上記で参照しているチュートリアルから参照可能。

作成したプログラムは以下のとおり。

chap7-1.py
# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel

sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999,StorageLevel.MEMORY_AND_DISK_SER) 

words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()             
ssc.awaitTermination() 

StreamingContextを生成

StreamingContextはストリーミング機能のメインエントリーポイントであり、StreamingContextを利用して入力ストリームデータの定義などを行う。

class pyspark.streaming.StreamingContext(sparkContext, batchDuration=None, jssc=None)

以下の例では1秒毎のバッチ処理を繰り返すStreamingContextを生成。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

DStreamの作成

このコンテキスト(StreamingContext)を使用して、DStreamを定義する。DStreamは時々刻々と生成されるRDDみたいなもので、データサーバから受信するストリームデータである。DStreamに含まれるそれぞれのレコードは1行のテキストである。
以下の例ではsocketTextStreamメソッドを使用し、localhostの9999番ポートに接続するように定義。

lines = ssc.socketTextStream("localhost", 9999)

SocketTextStreamは、TCPソース(hostname:port)からinputを生成する。データはTCPソケットを使用して受け取とられ、UTF-8でエンコードし、¥nをデリミタとした行単位でバイトで受け取る。

socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))

StorageLevelは次のURLを参照。 http://spark.apache.org/docs/latest/api/python/pyspark.html

word count処理

ストリームデータとして得られたテキストラインをスペース区切りで文字に分割する。

words = lines.flatMap(lambda line: line.split(" "))

この例では、それぞれのテキストラインが複数の語(word)に分けられ、そしてwordのストリームがDStreamとなる。

次にwordをカウントする。
words DStreamを(word,1)ペアのDStreamにマップし、それからwordの頻度を得るためにreduce処理を行う。

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

これで単語毎の出現回数を保持するDStreamが定義できた。

次に毎秒生成されるカウントをpprintメソッドで出力する。

wordCounts.pprint()

ここまででセットアップは完了したので、以下のコードを追加してストリーム処理を開始する。

ssc.start()             # 計算処理を開始
ssc.awaitTermination()  # stopメソッドによるcontextの終了や例外が発生するまで待つ

動作確認

作成したSparkStreamingのアプリを実行

$ spark-submit --master local chap7-1.py

別ターミナルにて、データ書き込み用に、NetCatを使って9999番ポートで待ち受けるようにし、テキストデータを書き込む。

$ nc -lk 9999
hello world

プログラムを実行したターミナルにて以下の出力があることを確認。

・・・
------------------------------------------
Time: 2015-12-22 08:28:02
-------------------------------------------
('world', 1)
('hello', 1)
・・・

HDFS環境でSpark Streamingの動作確認

HDFS上のディレクトリを監視し、新しいファイルが生成されたら、入力データとして取り込み、ワードをカウントするアプリを実行する。
※既存のファイルを更新しても入力データとして認識されないので注意。

変更点

先ほどのコードとの違いは以下の点。

  • localの代わりにyarn-clientをmasterとして動作するように指定
sc = SparkContext("yarn-client", "Chap7-2")
  • socketTextStreamメソッドの代わりにtextFileStreamメソッドを使用し、監視対象のhdfsディレクトリを引数で指定。textFileStreamはディレクトリに含まれるファイルを監視し、新しいファイルが生成されたらストリームデータとして取得する。
lines = ssc.textFileStream("hdfs:///user/y_tadayasu/data/") 

動作確認

アプリの実行

$ spark-submit --master yarn-client ./chap7-2.py

HDFS上にファイルを生成

$ hdfs dfs -put /opt/spark/README.md /user/y_tadayasu/data/

アプリを実行しているターミナルに以下の出力があることを確認。

-------------------------------------------
Time: 2015-12-23 04:15:50
-------------------------------------------
('', 67)
('guide,', 1)
('APIs', 1)
('name', 1)
('It', 2)
('package.', 1)
('particular', 3)
('tools', 1)
('must', 1)
('params', 1)
...

この結果だと空白もカウントされているので、filterを使用して空白は取り除くようにする。

words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
-------------------------------------------
Time: 2015-12-23 04:25:30
-------------------------------------------
('guide,', 1)
('APIs', 1)
('optimized', 1)
('name', 1)
('It', 2)
('package.', 1)
('particular', 3)
('tools', 1)
('must', 1)
('params', 1)
...

コード全体は以下のとおり。

chap7-2.py
# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel
sc = SparkContext("yarn-client", "Chap7-2")
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream("hdfs:///user/y_tadayasu/data/") 
words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()             
ssc.awaitTermination() 
5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?