概要
コードを書いていて、思うように動かない。どう書けばいいかわからない。困った。
そういうことはないだろうか?
試行錯誤を繰り返して動作確認する必要があることは時々ある。そんなときに、大きなproductionコードを動かしながらtry and errorしていたりしないだろうか?
「時間がないから」と言い訳し別の方法を検討する事を拒否して、production上で何度も何度も実行しながら「遅い!」と文句を言っていないだろうか?
大きなコードで動作確認を繰り返すと無駄に時間を消費するし、それだけでなく確認したいそもそもの対象以外から影響を受けて、次第に自分が何を確認しているか分からなくなってくる。
【動作確認は可能な限り絞ったサンプルを使ってテストするべきだ。】
【言語にシェル環境があれば、シェル環境を利用することを勧める。】
そんな話をしたい。
あるケースの場合
あるバッチ処理で、大きなcsvファイルをpythonで処理してs3に格納する必要があった。
そのため、最終的にpandasが持つ大きなデータをboto3を用いてアップロードする方法が問題になった。
オリジナルコード
s3 = boto3.client('s3');
// df : pandas.DataFrame
s3.put_object(df.to_csv().encode(), bucket, key)
この場合、to_csv()が全データを文字列化して出力し、さらにそれをencode()が
bytes配列化して出力する。つまり、dfが持つデータが巨大になるとその数倍の速度でメモリを食いつぶしていた。
これをstream処理にしたい。
stream処理?
巨大なデータを全てオンメモリに展開するのではなく、一部ずつ処理していく事によりメモリ消費量を減らすことができる。一般的にこのような処理をstream処理と呼ぶ。
pythonのstream処理
stream処理を行う方法には、generatorを使う方法とioパッケージを使用する方法がある。
データストリーム
ioパッケージには、io.BytesIOやio.StringIOというオンメモリストリーム処理をするためのクラスが用意されている。
class | 意味 |
---|---|
io.BytesIO | バイトストリーム |
io.StringIO | 文字列ストリーム |
io.TextIOWrapper | 文字列IOラッパー(バイトストリームを文字列ストリーム化する) |
このあたりのクラスを使用する。
論理処理(クラスなど)のストリーム
generatorは、多くの論理ブロックを順次処理するときに用いる。
例えば大量の行を持つcsvを、一行ずつ処理したい場合、全ての行を一度に読み込むのではなくブロックごとに読みながら処理できる。
generatorをここでは説明しないが、pythonで初心者を脱するためにはgeneratorの知識は必須だ。
実際にstream化してみよう!
今回やりたいのはデータ処理のストリーム化なので、ioパッケージの話。
$ python
Python 3.6.8 (default, Dec 5 2019, 17:44:50)
[GCC 7.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
コマンドラインでpythonと打ち込むと、pythonシェルが起動する。
ここで調査を行う。
>>> import pandas
>>> import boto3
>>> import io
今回動作確認に利用するパッケージはこの3つ。
まず検索したところ、以下で動きそうな雰囲気だった。
>>> df = pandas.DataFrame([1,2,3])
>>> sio = io.StringIO()
>>> df.to_csv(sio)
>>> s3 = boto3.client('s3')
>>> s3.upload_fileobj(sio, backet, key)
s3.upload_fileobj
はs3でstreamingにアップロードするための関数らしい。
実行したところ、s3上にファイルはできていたが、ファイルのサイズは0だった。
調べたところ、upload_fileobjがbyte streamでなければならないようだ。
そこで、StringIOではなくBytesIOを使ってみる。
>>> bio = io.BytesIO()
>>> df.to_csv(bio)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/kishibashi/pandas/pandas/core/generic.py", line 3167, in to_csv
formatter.save()
File "/home/kishibashi/pandas/pandas/io/formats/csvs.py", line 206, in save
self._save()
File "/home/kishibashi/pandas/pandas/io/formats/csvs.py", line 314, in _save
self._save_header()
File "/home/kishibashi/pandas/pandas/io/formats/csvs.py", line 283, in _save_header
writer.writerow(encoded_labels)
TypeError: a bytes-like object is required, not 'str'
to_csvがBytesIOにstrを渡したために、BytesIOが怒った。
つまり、to_csvからテキストストリームで受け取りつつ、upload_fileobjにバイトストリームで渡す必要がある。
このようなストリームの変換に使うのがio.TextIOWrapper
というクラスだ。使ってみよう。
>>> bio = io.BytesIO()
>>> w = TextIOWrapper(bio)
bioやwにwriteしたりreadしながらどう動くのか確認して以下の事がわかった。
TextIOWrapperはBytesIOをラップしてくれて、StringIOと同じ機能をもつ。そして、bioとwは入力と出力のような方向性をもっていない。bioにbytesを書き込むとbioからもwからも読み出せるし、wにテキストを書き込むとwからもbioからも読み出せる。
つまりこうなる。
>>> bio = io.BytesIO()
>>> w = TextIOWrapper(bio)
>>> df.to_csv(w)
>>> w.seek(0)
>>> s3.upload_fileobj(bio, bucket, key)
これでs3上のファイルにデータがアップロードされることが確認できた。
ちなみにseekは読み始める位置を最初に変更するおまじないだ。
伝えたい事
この文章で伝えたい事は、ここに書いてあるstreaming処理の詳細ではない。
【分からないことは動作確認するのが理解の早道であり、動作確認はできるだけ小さくしよう】
ということ。
上記のpython shell上の動作確認では、pandas、io、boto3以外なにもない。
どこからpandasに与えるcsvデータを持ってくるのか、サーバーがdjangoで動いているかどうかなど、productionにありがちな外部環境は、この動作確認には全く関係のない事だ。
- shell上での確認
- 単体テストを書いて確認
- jupyter notebookのような環境に慣れているならそちらでも
プロダクションコード上での動作確認は時間の無駄なので絶対にしないように。
(もちろん、少し修正すれば動く事が分かっているならその限りではない)
他の言語のシェル環境は?
JavaScript
$ node
Welcome to Node.js v12.13.1.
Type ".help" for more information.
>
Golang
標準では用意されていない模様。
Java
JDK9からJShellというモノが追加されている模様。使ったことはない。