LoginSignup
22
10

More than 5 years have passed since last update.

(翻訳)PythonからHadoop file system (HDFS)へのネイティブ接続

Last updated at Posted at 2017-03-29

始めに:pandasの作者であるWes McKinneyさんがPythonのデータツール関連でとても興味深いblogを書かれているので、翻訳して日本のPyDataコミュニティに公開してもいいでしょうか、とお聞きしたところ、快諾をいただきましたので少しずつ訳して公開していこうと思っています。

翻訳元: Native Hadoop file system (HDFS) connectivity in Python

2017/1/3

これまで、Hadoop File SystemことHDFSとのやりとりするためのPythonライブラリが数多く開発されてきました。HDFSのWebHDFSゲートウェイ経由のものもあれば、ネイティブのProtocol BufferベースのRPCインターフェースもあります。このポストでは、既存のライブラリの概要をお伝えし、Arrowのエコシステム開発の中で高パフォーマンスのHDFSインターフェースを提供するために私がやってきたことを紹介します。

このblogは、2017年のロードマップに関するポストのフォローアップです。

Hadoop file systemのプロトコル

HDFSはApache Hadoopの一部であり、もともとその設計はオリジナルのMapReduceの論文に述べられているGoogle File Systemに基づいています。HDFSは、リモートプロシージャコール、すなわちRPCでネイティブのワイヤープロトコルとしてGoogleのProtocol Buffers(短く"protobufs"と呼ばれることもあります)を使っています。

通常、HDFSとやりとりをするシステムは、メインのJavaのクライアントと同様に、ProtobufのメッセージングフォーマットとRPCプロトコルを実装しているでしょう。低負荷のアプリケーションがファイルを簡単に読み書きできるようにするために開発されたのがWebHDFSで、これはprotobufsのRPCの代わりにPUTやGETリクエストを使えるHTTPあるいはHTTPSのゲートウエイを提供します。

低負荷のアプリケーションでは、WebHDFSとネイティブのprotobufs RPCのデータスループットは同等になりますが、概してネイティブの接続はスケーラビリティに優れ、実働環境での利用に適していると考えられています。

Pythonには、私が使ったことのあるWebHDFSインターフェースが2つあります:

  • pywebhdfs
  • hdfscli

この後この記事では、ネイティブのRPCクライアントインターフェースに注目していきます。

ネイティブRPCでのPythonからのアクセス

PythonのようにCとの相性のいい言語からネイティブなやり方でHDFSに接続する場合、Apache Hadoopでの「公式」なやり方はlibhdfsを使うことです。libhdfsは、HDFS JavaクライアントのJNIベースのCのラッパーです。libhdfsの主なメリットは、主要なHadoopのベンダーから配布され、サポートされており、Apache Hadoopプロジェクトの一部であることです。欠点としては、JNIを使っており(Pythonのプロセス内からJVMが起動されます)、クライアント側にHadoopのJavaディストリビューションが一式必要になることがあります。クライアントによっては、これは受け入れられない条件であり、他のクライアントと違ってプロダクションレベルのサポートは必要なものもあります。例えば、C++のアプリケーションであるApache Impala(インキュベーションプロジェクト)は、HDFS上のデータへのアクセスにlibhdfsを使っています。

libhdfsがもともと重いことから、代わりとなるHDFSへのネイティブインターフェースが開発されてきました。

  • libhdfs3は現在Apache HAWQ (インキュベーションプロジェクト)の一部となっている、純粋にC++のライブラリです。libhdfs3はPivotal Labsによって、SQL-on-HadoopシステムのHAWQで使用するために開発されました。libhdfs3が便利なのは、CのAPIレベルでlibhdfsと高い互換性を持っていることです。一時はlibhdfs3は公式にApache Hadoopの一部になりそうでしたが、現在はそうなることはなさそうになっています(HDFS-8707で新しいC++のライブラリが開発中なので、参照してください)。

  • snakebite: Hadoopのprotobuf RPCインターフェースの純粋なPython実装で、Spotifyによって開発されました。

snakebiteはクライアントのAPIを包括的に提供しているわけではなく(例えばファイルを書くことはできません)、パフォーマンスも良くない(純粋にPythonで実装されています)ので、ここからはlibhdfsとlibhdfs3に注目していきます。

libhdfs及びlibhdfs3へのPythonのインスターフェース

これまでにも、JNIライブラリのlibhdfsへのCのレベルでのインターフェースを構築しようとする動きはたくさんありました。それらの中には、cyhdfs(Cythonを利用)、libpyhdfs(通常のPythonのCによるエクステンション)、pyhdfs(SWIGを利用)などがあります。libhdfsへのCのエクステンションを構築する際の課題の1つは、共有ライブラリのlibhdfs.soがHdoopのディストリビューションに含まれて配布されているので、この共有ライブラリをロードできるよう、$LD_LIBRARY_PATHを適切に設定しなければならないことです。加えて、インポートの際にはJVMのlibjvm.soもロードできなければなりません。これらの条件が組み合わさると、「設定地獄」にはまっていくことになります。

Apache Arrow(そしてPyArrow経由でPythonでも)で使うC++のHDFSインターフェースを構築しようと考えているときに、私はTuriのSFrameプロジェクトのlibhdfsの実装を見つけました。これは、実行時にJVMとlibhdfsをロードする際に、この両方をどちらも賢明なアプローチで見つけるようになっていました。私はArrowでこのアプローチを採用し、それはうまくいきました。この実装を使うことで、Arrowのデータシリアライゼーションツール群(Apache Parquetのような)でのI/Oのオーバーヘッドは非常に低くなり、便利なPythonのファイルインターフェースも提供してくれました。

libhdfsとlibhdfs3のドライバライブラリのCのAPIはほとんど同じなので、Pythonのキーワード引数に従ってドライバを切り替えることができました。

from pyarrow import HdfsClient

# libhdfsを使用
hdfs = HdfsClient(host, port, username,     driver='libhdfs')

# libhdfs3を使用
hdfs_alt = HdfsClient(host, port, username,     driver='libhdfs3')

with hdfs.open('/path/to/file') as f:
    ...

これを並行して、Daskプロジェクトの開発者たちはlibhdfs3に対する純粋なPythonのインターフェースであるhdfs3を作成しました。これは、Cのエクステンションを回避するためにctypesを使っていました。hdfs3は、Pythonのファイルインターフェースとともに、libhdfs3の他の機能へのアクセスも提供しています。

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host, port, user)
with hdfs.open('/path/to/file', 'rb') as f:
    ...

pyarrow.HdfsClientとhdfs3のデータアクセスパフォーマンス

ローカルのCDH 5.6.0 HDFSクラスタに対し、3種類の設定で4KBから100MBのサイズのファイル群の読み取りパフォーマンスの集合平均を計算してみました。

  • hdfs3(常にlibhdfs3を使用)
  • driver='libhdfs'でのpyarrow.HdfsClient
  • driver='libhdfs3'でのpyarrow.HdfsClient

これらのパッケージはすべて以下のようにすれば取得できます。

conda install pyarrow hdfs3 libhdfs3 -c conda-forge

注意:pyarrowのconda-forgeパッケージは、現在Linuxでのみ提供されています。理論上は、この問題は2017年1月20日には解決されているはずです。Windowsサポートを手助けしてくださる方がおられましたら、お知らせください。

パフォーマンスの数値は、メガバイト/秒です("throughput")。ベンチマークのコードはこのポストの最後にあります。もっと多彩なプロダクション環境やHadoopの設定でこの結果がどうなるか、私は興味を持っています。

HDFS RPC data perflibhdfs_perf_linear.png

少なくとも私が行ったテストでは、以下のような興味深い結果が得られました。

  • libhdfsは、Java及びJNIベースであるにもかかわらず、このテストでは最高のスループットを示しました。
  • libhdfs3は、小さいサイズの読み取りのパフォーマンスが良くありませんでした。これは、RPCのレイテンシーか、私が認識できていない問題が設定にあるためかもしれません。
  • 厳密にlibhdfs3と比較すれば、pyarrowはhdfs310-15%程度上回っています。これは主に、ctypes(hdfs3)とC++(pyarrow)の違いによるメモリのハンドリング/コピーによるものだと思われます。

以下は、時間を対数軸にしたものです。

HDFS RPC data perflibhdfs_perf_log.png

Apache ArrowにおけるネイティブのC++ I/O

pyarrowライブラリ内でHDFSのようなI/Oインターフェースを構築する理由の1つは、そういったインターフェースはすべて共通のメモリ管理のレイヤーを使用して、非常に低い(場合によってはゼロ)コピーのオーバーヘッドだけで、データを渡せるためです。これに対して、Pythonのfileインターフェースだけを公開しているライブラリは、メモリの処理をPythonインタープリタ内のバイト列のオブジェクトで行うため、多少のオーバーヘッドが生じます。

ArrowのC++のI/Oシステムの詳細はこの記事の範囲を超えますが、将来このブログにそれに関するポストを書くことにしましょう。

ベンチマークのコード

import gc
import random
import time
import pyarrow as pa
import hdfs3
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

DATA_SIZE = 200 * (1 << 20)
data = 'a' * DATA_SIZE

hdfs = pa.HdfsClient('localhost', 20500, 'wesm')
hdfscpp = pa.HdfsClient('localhost', 20500, 'wesm', driver='libhdfs3')
hdfs3_fs = hdfs3.HDFileSystem('localhost', port=20500, user='wesm')

hdfs.delete(path)
path = '/tmp/test-data-file-1'
with hdfs.open(path, 'wb') as f:
    f.write(data)

def read_chunk(f, size):
    # do a random seek
    f.seek(random.randint(0, size))
    return f.read(size)

def ensemble_average(runner, niter=10):
    start = time.clock()
    gc.disable()
    data_chunks = []
    for i in range(niter):
        data_chunks.append(runner())
    elapsed = (time.clock() - start) / niter
    gc.enable()
    return elapsed

def make_test_func(fh, chunksize):
    def runner():
        return read_chunk(fh, chunksize)
    return runner

KB = 1024
MB = 1024 * KB
chunksizes = [4 * KB, MB, 10 * MB, 100 * MB]
iterations = [100, 100, 100, 10]

handles = {
    ('pyarrow', 'libhdfs'): hdfs.open(path),
    ('pyarrow', 'libhdfs3'): hdfscpp.open(path),
    ('hdfs3', 'libhdfs3'): hdfs3_fs.open(path, 'rb')
}

timings = []
for (library, driver), handle in handles.items():
    for chunksize, niter in zip(chunksizes, iterations):
        tester = make_test_func(handle, chunksize)
        timing = ensemble_average(tester, niter=niter)
        throughput = chunksize / timing

        result = (library, driver, chunksize, timing, throughput)
        print(result)
        timings.append(result)

results = pd.DataFrame.from_records(timings, columns=['library', 'driver', 'read_size', 'timing', 'throughput'])
results['MB/s'] = results['throughput'] / MB
results
results['type'] = results['library'] + '+' + results['driver']

plt.figure(figsize=(12, 6))
g = sns.factorplot(y='read_size', x='MB/s', hue='type', data=results, kind='bar', orient='h', size=(10))
g.despine(left=True)
#g.fig.get_axes()[0].set_xscale('log', basex=2)
g.fig.set_size_inches(12, 4)

plt.savefig('results2.png')
22
10
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
22
10