この記事は MicroAd Advent Calendar 2022 の6日目の記事です。
こんにちは。
マイクロアドのタカギです。
この記事ではpyarrowを使用してhdfsを操作してみた件について書いていきたいと思います。
PyArrowとは
Apache Arrow は大規模なデータをメモリに読み込んで処理するためのプラットフォームで、高速なデータ転送やファイル入出力機能など、効率的なデータ処理に必要な機能を提供してくれます。PyArrowはAppache ArrowのPythonインターフェースで、NumPyやpandasと連携して、Apache Arrowを利用できるようになっています。
引用
https://aish.dev/python/20200728_pyarrow.html
ドキュメントなど
https://arrow.apache.org/overview/
https://arrow.apache.org/docs/python/index.html
PyArrowを使うには
Pythonのインストールとは別に、下記の準備が必要になります。
- JavaとHadoopのインストール
- 環境変数CLASSPATHにHadoop jarへのパス設定追加
参考
https://arrow.apache.org/docs/python/filesystems.html#hadoop-distributed-file-system-hdfs
HadoopFileSystemには2種類ある
hdfsにアクセスするためのクラスHadoopFileSystemは2種類あります。
古い方と新しい方です。
古い方はlegacyバージョンとしてdeprecatedの旨記載があります。
この記事では、deprecatedではない、新バージョンのほうを使用しています。
legacyバージョン
pyarrow.hdfs.HadoopFileSystem
https://arrow.apache.org/docs/python/filesystems_deprecated.html
新バージョン
pyarrow.fs.HadoopFileSystem
https://arrow.apache.org/docs/python/filesystems.html#hadoop-distributed-file-system-hdfs
https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html
PyArrowを使ってみて
ポジティブ面
- delete_dir_contentsメソッドがとても便利
- 指定したディレクトリの配下にあるものをすべて削除して、指定したディレクトリ自体は削除しません。
- ディレクトリが存在しない場合にエラーにしないオプションもあったりします。
- デフォルトでルートパスの指定を許容しない安心設計です。
ネガティブ面
- メソッド名とhdfsコマンドの対応が直感的ではない
- 全体的に、パスにワイルドカードを使用できない
- ※
hdfs dfs -ls /tmp/test*
みたいなことができない
- ※
個人的には、パスにワイルドカードを使用できないのは結構致命的な気がしていています。
※subprocessでhdfsコマンドを使用すればなんとかなりますが、pyarrowを導入した意味がなくなってしまいます。。
サンプルコード
さて、以降は実装したコードになりますが、hdfsで行う主な操作(ls, get, put, cp, mv, rm, mkdir)について実装してみました。
メソッド名が直感的ではないので、daoクラスを作成して、mkdir()->create_dir()を呼び出すように、ある程度直感的になるようにしてみました。しかしながら、制限が多すぎてメソッド名にもそれを反映することになりました。
コメントにも書いてますが、subprocessコマンドを使用したメソッドも作成しています。
※補足
マイクロアドでは、バッチ処理にDigdagを使用しています。
Digdagからの実行時に、環境変数CLASSPATHが未設定になってしまう状況があり、docker-entrypoint.shで設定することで解決しています。
ではどうぞ。
pyarrow==9.0.0
Dockerfile
FROM python:3.7.10
# Shift timezone to Asia/Tokyo.
ENV TZ Asia/Tokyo
WORKDIR /work
# hdfsにアクセスするユーザ
# add abc user as root group
RUN groupadd -g 98 abc && \
useradd -u 98 abc -g root -d /work && \
chown -R abc:root /work/
# package
RUN apt-get update && apt-get install -y \
libsasl2-dev \
libsasl2-modules \
tzdata \
software-properties-common && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Install Java
ENV JAVA_HOME "/usr/lib/jvm/java-8-openjdk-amd64"
RUN wget -qO - https://adoptopenjdk.jfrog.io/adoptopenjdk/api/gpg/key/public | apt-key add -
RUN add-apt-repository -y https://adoptopenjdk.jfrog.io/adoptopenjdk/deb/ && \
apt-get update && apt-get install -y adoptopenjdk-8-hotspot && \
rm -rf /var/lib/apt/lists/* && \
ln -fs /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64 /usr/lib/jvm/java-8-openjdk-amd64
# Install python module
COPY requirements.txt /tmp/requirements.txt
RUN python -m pip install setuptools wheel && \
python -m pip install -r /tmp/requirements.txt && \
rm -rf /root/.cache
# Setup Hadoop
ENV HADOOP_USER_NAME=abc
ENV HADOOP_HOME "/opt/hadoop"
RUN mkdir -p /opt \
&& curl -SL https://downloads.apache.org/hadoop/common/hadoop-2.10.1/hadoop-2.10.1.tar.gz \
| tar -xzC /opt \
&& ln -sfn /opt/hadoop-2.10.1 /opt/hadoop
ENV PATH="${HADOOP_HOME}/bin:${PATH}"
# 環境変数
# digdagから実行した場合、環境変数がdigdagのもので上書きされてしまうケースがあるため、
# コンテナ起動時にdocker-entrypoint.shで設定する
COPY docker-entrypoint.sh /work/docker-entrypoint.sh
ENTRYPOINT ["/bin/bash", "/work/docker-entrypoint.sh"]
# Embed common build information in Microad
ARG GIT_REVISION=unknown
ARG GIT_ORIGIN=unknown
ARG IMAGE_NAME=unknown
LABEL git-revision=$GIT_REVISION \
git-origin=$GIT_ORIGIN \
image-name=$IMAGE_NAME
#!/bin/bash
# コンテナ起動時に環境変数を設定する
# CLASSPATHはdigdagからの実行時に未設定となってしまうため、ここで設定する
export CLASSPATH="$($HADOOP_HOME/bin/hdfs classpath --glob)"
exec "$@"
from dataclasses import dataclass
@dataclass
class HdfsConf:
"""
hdfsに接続するために必要な値を格納
"""
host: str
port: int
user: str
import logging
import subprocess
from pyarrow import fs
from typing import List
from src.dto import HdfsConf
LOGGER = logging.getLogger('app')
class HdsfDao:
"""
hdfsのデータ操作オブジェクト
"""
def __init__(self, hdfs_conf: HdfsConf) -> None:
"""
イニシャライザ
"""
self.hdfs = fs.HadoopFileSystem(
hdfs_conf.host, hdfs_conf.port, user=hdfs_conf.user)
# subprocess用
self.hdfs_root = f'hdfs://{hdfs_conf.host}:{hdfs_conf.port}'
def get_file_info(self, hdfs_path: str) -> fs.FileInfo:
"""
対象のパスのFileInfoを取得
"""
return self.hdfs.get_file_info(hdfs_path)
def ls_dir(self, hdfs_path: str) -> List[str]:
"""
指定したパスでls
hdfs_pathにディレクトリ以外を指定するとエラー
※FileSelectorはディレクトリパス以外を受け付けない
"""
LOGGER.info(f'ls for {hdfs_path}')
file_info_list = self.hdfs.get_file_info(fs.FileSelector(hdfs_path))
path_list = [f.path for f in file_info_list]
return path_list
def mkdir(self, hdfs_path: str, recursive: bool = True) -> None:
"""
指定したパスでmkdir
デフォルトはpオプション付き
"""
LOGGER.info(f'mkdir for {hdfs_path}')
self.hdfs.create_dir(hdfs_path, recursive=recursive)
def rm_dir_contents(self, hdfs_dir_path: str) -> None:
"""
指定したパス配下のファイルとディレクトリをrm ※指定したパスは削除しない
"""
LOGGER.info(f'rm directory contents for {hdfs_dir_path}')
self.hdfs.delete_dir_contents(hdfs_dir_path)
def rm_dir(self, hdfs_path: str) -> None:
"""
指定したパスでrm -r
指定したディレクトリが存在しない場合エラーになってしまうため、存在チェックを行う
"""
LOGGER.info(f'rm -r for {hdfs_path}')
file_info = self.hdfs.get_file_info(hdfs_path)
if file_info.type == fs.FileType.NotFound:
LOGGER.info(f'file or directory not exists. {hdfs_path}')
return
self.hdfs.delete_dir(hdfs_path)
def rm_single_file(self, hdfs_path: str) -> None:
"""
指定したパスでrm
単一のファイルのみ対応
hdfs_pathに単一のファイルパス以外を指定するとエラー('/tmp/test*' や ディレクトリパス)
指定したディレクトリが存在しない場合エラーになってしまうため、存在チェックを行う
"""
LOGGER.info(f'rm for {hdfs_path}')
file_info = self.hdfs.get_file_info(hdfs_path)
if file_info.type == fs.FileType.NotFound:
LOGGER.info(f'file or directory not exists. {hdfs_path}')
return
self.hdfs.delete_file(hdfs_path)
def get_single_file(self, hdfs_src_path: str, local_dst_path: str) -> None:
"""
指定したパスでget
単一のファイルのみ対応
hdfs_src_pathに単一のファイルパス以外を指定するとエラー('/tmp/test*' や ディレクトリパス)
"""
LOGGER.info(f'get from {hdfs_src_path} to {local_dst_path}')
with self.hdfs.open_input_stream(hdfs_src_path) as rstream:
with open(local_dst_path, 'wb') as wf:
wf.write(rstream.readall())
def put_single_file(self, local_src_path: str, hdfs_dst_path: str) -> None:
"""
指定したパスでput
単一のファイルのみ対応
local_src_pathに単一のファイルパス以外を指定するとエラー('/tmp/test*' や ディレクトリパス)
"""
LOGGER.info(f'put from {local_src_path} to {hdfs_dst_path}')
with self.hdfs.open_output_stream(hdfs_dst_path) as wstream:
with open(local_src_path, 'rb') as rf:
wstream.upload(rf)
def cp_single_file_or_dir(self, hdfs_src_path: str,
hdfs_dst_path: str) -> None:
"""
指定したパスでcp
単一のファイルまたはディレクトリのみ対応
hdfs_pathに'/tmp/test*'のように指定するとエラー
"""
LOGGER.info(f'cp from {hdfs_src_path} to {hdfs_dst_path}')
self.hdfs.copy_file(hdfs_src_path, hdfs_dst_path)
def mv_single_file_or_dir(self, hdfs_src_path: str,
hdfs_dst_path: str) -> None:
"""
指定したパスでmv
単一のファイルまたはディレクトリのみ対応
hdfs_src_pathに'/tmp/test*'のように指定するとエラー
hdfs_src_pathがファイルパスでhdfs_dst_pathがファイルパスの場合OK
hdfs_src_pathがディレクトリパスでhdfs_dst_pathがディレクトリパスの場合OK
hdfs_src_pathがファイルパスでhdfs_dst_pathがディレクトリパスの場合エラーとなる
hdfs_src_pathがファイルパスで、hdfs_dst_pathに同名のファイルが存在する場合上書きされる
hdfs_src_pathがディレクトリで、hdfs_dst_pathに同名の空ではないディレクトリが存在する場合エラー
ってドキュメントには書いてたけど
hdfs_src_pathがディレクトリで、hdfs_dst_pathに同名の空ディレクトリが存在する場合もエラー
"""
LOGGER.info(f'mv from {hdfs_src_path} to {hdfs_dst_path}')
self.hdfs.move(hdfs_src_path, hdfs_dst_path)
def get(self, hdfs_src_path: str, local_dst_path: str) -> None:
"""
指定したパスでget
subprocess経由でhdfsコマンドを使用
"""
move_from = f'{self.hdfs_root}{hdfs_src_path}'
cmd = f'/opt/hadoop/bin/hdfs dfs -get {move_from} {local_dst_path}'
LOGGER.info(cmd)
proc = subprocess.run(cmd, shell=True, check=True)
proc.check_returncode()
def put(self, local_src_path: str, hdfs_dst_path: str) -> None:
"""
指定したパスでput
subprocess経由でhdfsコマンドを使用
"""
move_to = f'{self.hdfs_root}{hdfs_dst_path}'
cmd = f'/opt/hadoop/bin/hdfs dfs -put {local_src_path} {move_to}'
LOGGER.info(cmd)
proc = subprocess.run(cmd, shell=True, check=True)
proc.check_returncode()
def cp(self, hdfs_src_path: str, hdfs_dst_path: str) -> None:
"""
指定したパスでcp
subprocess経由でhdfsコマンドを使用
"""
copy_from = f'{self.hdfs_root}{hdfs_src_path}'
copy_to = f'{self.hdfs_root}{hdfs_dst_path}'
cmd = f'/opt/hadoop/bin/hdfs dfs -cp {copy_from} {copy_to}'
LOGGER.info(cmd)
proc = subprocess.run(cmd, shell=True, check=True)
proc.check_returncode()
def mv(self, hdfs_src_path: str, hdfs_dst_path: str) -> None:
"""
指定したパスでmv
subprocess経由でhdfsコマンドを使用
hdfs_dst_pathに同名のファイルまたはディレクトリが存在する場合エラー
"""
move_from = f'{self.hdfs_root}{hdfs_src_path}'
move_to = f'{self.hdfs_root}{hdfs_dst_path}'
cmd = f'/opt/hadoop/bin/hdfs dfs -mv {move_from} {move_to}'
LOGGER.info(cmd)
proc = subprocess.run(cmd, shell=True, check=True)
proc.check_returncode()
def rm_file(self, hdfs_path: str) -> None:
"""
指定したパスでrm
ディレクトリ削除には対応していない
subprocess経由でhdfsコマンドを使用
"""
if not hdfs_path or hdfs_path == '/':
raise Exception
rm_path = f'{self.hdfs_root}{hdfs_path}'
cmd = f'/opt/hadoop/bin/hdfs dfs -rm -f {rm_path}'
LOGGER.info(cmd)
proc = subprocess.run(cmd, shell=True, check=True)
proc.check_returncode()
daoメソッドをcallするコードサンプル
import os
import textwrap
import logging
import subprocess
from pyarrow import fs
from src.dao.hdfs_dao import HdsfDao
from src.dto import HdfsConf
LOGGER = logging.getLogger('app')
def hdfs_check_1(hdfs_conf: HdfsConf) -> None:
"""
HDFS操作のチェック用メソッド
"""
hdsf_dao = HdsfDao(hdfs_conf)
# 対象ディレクトリ配下のファイルを削除
hdsf_dao.rm_dir_contents(hdfs_path)
# hdfsにディレクトリを作成
hdsf_dao.mkdir(hdfs_path)
# hdfsのディレクトリを削除
hdsf_dao.rm_dir(hdfs_path)
# hdfsにファイルをput
hdsf_dao.put_single_file(local_src_path, hdfs_dst_path)
# hdfsにputしたファイルをgetする
hdsf_dao.get_single_file(hdfs_src_path, local_dst_path)
# hdfsにputしたファイルを削除
hdsf_dao.rm_single_file(hdfs_dst_path)
# hdfsにファイルをcp
hdsf_dao.cp_single_file_or_dir(hdfs_src_path, hdfs_dst_path)
# hdfsにファイルをmv
hdsf_dao.mv_single_file_or_dir(hdfs_src_path, hdfs_dst_path)
# 対象ディレクトリをls
path_list = hdsf_dao.ls_dir(hdfs_path)
LOGGER.info('target dir. %s', path_list)
まとめ
pyarrowを実際に使用して色々試してみました。
hdfsを操作するという観点では、pyarrowを使っていくのはちょっと厳しいかもしれません。
「いや、こうすればワイルドカード指定(もしくはそれに準ずる方法)できるよ」という方がいればご連絡いただけると大変助かります。
ではでは。