8
2

More than 1 year has passed since last update.

pyarrowでhdfsを操作する

Last updated at Posted at 2022-12-05

この記事は MicroAd Advent Calendar 2022 の6日目の記事です。

こんにちは。
マイクロアドのタカギです。
この記事ではpyarrowを使用してhdfsを操作してみた件について書いていきたいと思います。

PyArrowとは

Apache Arrow は大規模なデータをメモリに読み込んで処理するためのプラットフォームで、高速なデータ転送やファイル入出力機能など、効率的なデータ処理に必要な機能を提供してくれます。PyArrowはAppache ArrowのPythonインターフェースで、NumPyやpandasと連携して、Apache Arrowを利用できるようになっています。

引用
https://aish.dev/python/20200728_pyarrow.html

ドキュメントなど
https://arrow.apache.org/overview/
https://arrow.apache.org/docs/python/index.html

PyArrowを使うには

Pythonのインストールとは別に、下記の準備が必要になります。

  • JavaとHadoopのインストール
  • 環境変数CLASSPATHにHadoop jarへのパス設定追加

参考
https://arrow.apache.org/docs/python/filesystems.html#hadoop-distributed-file-system-hdfs

HadoopFileSystemには2種類ある

hdfsにアクセスするためのクラスHadoopFileSystemは2種類あります。
古い方と新しい方です。
古い方はlegacyバージョンとしてdeprecatedの旨記載があります。
この記事では、deprecatedではない、新バージョンのほうを使用しています。

legacyバージョン

pyarrow.hdfs.HadoopFileSystem
https://arrow.apache.org/docs/python/filesystems_deprecated.html

新バージョン

pyarrow.fs.HadoopFileSystem
https://arrow.apache.org/docs/python/filesystems.html#hadoop-distributed-file-system-hdfs
https://arrow.apache.org/docs/python/generated/pyarrow.fs.HadoopFileSystem.html

PyArrowを使ってみて

ポジティブ面

  • delete_dir_contentsメソッドがとても便利
    • 指定したディレクトリの配下にあるものをすべて削除して、指定したディレクトリ自体は削除しません。
    • ディレクトリが存在しない場合にエラーにしないオプションもあったりします。
    • デフォルトでルートパスの指定を許容しない安心設計です。

ネガティブ面

  • メソッド名とhdfsコマンドの対応が直感的ではない
  • 全体的に、パスにワイルドカードを使用できない
    • hdfs dfs -ls /tmp/test* みたいなことができない

個人的には、パスにワイルドカードを使用できないのは結構致命的な気がしていています。
※subprocessでhdfsコマンドを使用すればなんとかなりますが、pyarrowを導入した意味がなくなってしまいます。。

サンプルコード

さて、以降は実装したコードになりますが、hdfsで行う主な操作(ls, get, put, cp, mv, rm, mkdir)について実装してみました。

メソッド名が直感的ではないので、daoクラスを作成して、mkdir()->create_dir()を呼び出すように、ある程度直感的になるようにしてみました。しかしながら、制限が多すぎてメソッド名にもそれを反映することになりました。
コメントにも書いてますが、subprocessコマンドを使用したメソッドも作成しています。

※補足
マイクロアドでは、バッチ処理にDigdagを使用しています。
Digdagからの実行時に、環境変数CLASSPATHが未設定になってしまう状況があり、docker-entrypoint.shで設定することで解決しています。

ではどうぞ。

requirements.txt
pyarrow==9.0.0

Dockerfile

FROM python:3.7.10

# Shift timezone to Asia/Tokyo.
ENV TZ Asia/Tokyo

WORKDIR /work

# hdfsにアクセスするユーザ
# add abc user as root group
RUN groupadd -g 98 abc && \
    useradd -u 98 abc -g root -d /work && \
    chown -R abc:root /work/

# package
RUN apt-get update && apt-get install -y \
    libsasl2-dev \
    libsasl2-modules \
    tzdata \
    software-properties-common && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# Install Java
ENV JAVA_HOME "/usr/lib/jvm/java-8-openjdk-amd64"
RUN wget -qO - https://adoptopenjdk.jfrog.io/adoptopenjdk/api/gpg/key/public | apt-key add -
RUN add-apt-repository -y https://adoptopenjdk.jfrog.io/adoptopenjdk/deb/ && \
    apt-get update && apt-get install -y adoptopenjdk-8-hotspot && \
    rm -rf /var/lib/apt/lists/* && \
    ln -fs /usr/lib/jvm/adoptopenjdk-8-hotspot-amd64 /usr/lib/jvm/java-8-openjdk-amd64

# Install python module
COPY requirements.txt /tmp/requirements.txt
RUN python -m pip install setuptools wheel && \
    python -m pip install -r /tmp/requirements.txt && \
    rm -rf /root/.cache

# Setup Hadoop
ENV HADOOP_USER_NAME=abc
ENV HADOOP_HOME "/opt/hadoop"

RUN mkdir -p /opt \
    && curl -SL https://downloads.apache.org/hadoop/common/hadoop-2.10.1/hadoop-2.10.1.tar.gz \
    | tar -xzC /opt \
    && ln -sfn /opt/hadoop-2.10.1 /opt/hadoop

ENV PATH="${HADOOP_HOME}/bin:${PATH}"

# 環境変数
# digdagから実行した場合、環境変数がdigdagのもので上書きされてしまうケースがあるため、
# コンテナ起動時にdocker-entrypoint.shで設定する
COPY docker-entrypoint.sh /work/docker-entrypoint.sh
ENTRYPOINT ["/bin/bash", "/work/docker-entrypoint.sh"]

# Embed common build information in Microad
ARG GIT_REVISION=unknown
ARG GIT_ORIGIN=unknown
ARG IMAGE_NAME=unknown
LABEL git-revision=$GIT_REVISION \
    git-origin=$GIT_ORIGIN \
    image-name=$IMAGE_NAME

docker-entrypoint.sh
#!/bin/bash

# コンテナ起動時に環境変数を設定する
# CLASSPATHはdigdagからの実行時に未設定となってしまうため、ここで設定する
export CLASSPATH="$($HADOOP_HOME/bin/hdfs classpath --glob)"

exec "$@"
src/dto/dto.py
from dataclasses import dataclass


@dataclass
class HdfsConf:
    """
    hdfsに接続するために必要な値を格納
    """
    host: str
    port: int
    user: str
src/dao/hdfs_dao.py
import logging
import subprocess
from pyarrow import fs
from typing import List

from src.dto import HdfsConf

LOGGER = logging.getLogger('app')


class HdsfDao:
    """
    hdfsのデータ操作オブジェクト
    """

    def __init__(self, hdfs_conf: HdfsConf) -> None:
        """
        イニシャライザ
        """
        self.hdfs = fs.HadoopFileSystem(
            hdfs_conf.host, hdfs_conf.port, user=hdfs_conf.user)

        # subprocess用
        self.hdfs_root = f'hdfs://{hdfs_conf.host}:{hdfs_conf.port}'

    def get_file_info(self, hdfs_path: str) -> fs.FileInfo:
        """
        対象のパスのFileInfoを取得
        """
        return self.hdfs.get_file_info(hdfs_path)

    def ls_dir(self, hdfs_path: str) -> List[str]:
        """
        指定したパスでls
        hdfs_pathにディレクトリ以外を指定するとエラー
        ※FileSelectorはディレクトリパス以外を受け付けない
        """
        LOGGER.info(f'ls for {hdfs_path}')
        file_info_list = self.hdfs.get_file_info(fs.FileSelector(hdfs_path))
        path_list = [f.path for f in file_info_list]
        return path_list

    def mkdir(self, hdfs_path: str, recursive: bool = True) -> None:
        """
        指定したパスでmkdir
        デフォルトはpオプション付き
        """
        LOGGER.info(f'mkdir for {hdfs_path}')
        self.hdfs.create_dir(hdfs_path, recursive=recursive)

    def rm_dir_contents(self, hdfs_dir_path: str) -> None:
        """
        指定したパス配下のファイルとディレクトリをrm ※指定したパスは削除しない
        """
        LOGGER.info(f'rm directory contents for {hdfs_dir_path}')
        self.hdfs.delete_dir_contents(hdfs_dir_path)

    def rm_dir(self, hdfs_path: str) -> None:
        """
        指定したパスでrm -r
        指定したディレクトリが存在しない場合エラーになってしまうため、存在チェックを行う
        """
        LOGGER.info(f'rm -r for {hdfs_path}')

        file_info = self.hdfs.get_file_info(hdfs_path)
        if file_info.type == fs.FileType.NotFound:
            LOGGER.info(f'file or directory not exists. {hdfs_path}')
            return

        self.hdfs.delete_dir(hdfs_path)

    def rm_single_file(self, hdfs_path: str) -> None:
        """
        指定したパスでrm
        単一のファイルのみ対応
        hdfs_pathに単一のファイルパス以外を指定するとエラー('/tmp/test*' や ディレクトリパス)
        指定したディレクトリが存在しない場合エラーになってしまうため、存在チェックを行う
        """
        LOGGER.info(f'rm for {hdfs_path}')

        file_info = self.hdfs.get_file_info(hdfs_path)
        if file_info.type == fs.FileType.NotFound:
            LOGGER.info(f'file or directory not exists. {hdfs_path}')
            return

        self.hdfs.delete_file(hdfs_path)

    def get_single_file(self, hdfs_src_path: str, local_dst_path: str) -> None:
        """
        指定したパスでget
        単一のファイルのみ対応
        hdfs_src_pathに単一のファイルパス以外を指定するとエラー('/tmp/test*' や ディレクトリパス)
        """
        LOGGER.info(f'get from {hdfs_src_path} to {local_dst_path}')
        with self.hdfs.open_input_stream(hdfs_src_path) as rstream:
            with open(local_dst_path, 'wb') as wf:
                wf.write(rstream.readall())

    def put_single_file(self, local_src_path: str, hdfs_dst_path: str) -> None:
        """
        指定したパスでput
        単一のファイルのみ対応
        local_src_pathに単一のファイルパス以外を指定するとエラー('/tmp/test*' や ディレクトリパス)
        """
        LOGGER.info(f'put from {local_src_path} to {hdfs_dst_path}')
        with self.hdfs.open_output_stream(hdfs_dst_path) as wstream:
            with open(local_src_path, 'rb') as rf:
                wstream.upload(rf)

    def cp_single_file_or_dir(self, hdfs_src_path: str,
                              hdfs_dst_path: str) -> None:
        """
        指定したパスでcp
        単一のファイルまたはディレクトリのみ対応
        hdfs_pathに'/tmp/test*'のように指定するとエラー
        """
        LOGGER.info(f'cp from {hdfs_src_path} to {hdfs_dst_path}')
        self.hdfs.copy_file(hdfs_src_path, hdfs_dst_path)

    def mv_single_file_or_dir(self, hdfs_src_path: str,
                              hdfs_dst_path: str) -> None:
        """
        指定したパスでmv
        単一のファイルまたはディレクトリのみ対応
        hdfs_src_pathに'/tmp/test*'のように指定するとエラー
        hdfs_src_pathがファイルパスでhdfs_dst_pathがファイルパスの場合OK
        hdfs_src_pathがディレクトリパスでhdfs_dst_pathがディレクトリパスの場合OK
        hdfs_src_pathがファイルパスでhdfs_dst_pathがディレクトリパスの場合エラーとなる

        hdfs_src_pathがファイルパスで、hdfs_dst_pathに同名のファイルが存在する場合上書きされる

        hdfs_src_pathがディレクトリで、hdfs_dst_pathに同名の空ではないディレクトリが存在する場合エラー
        ってドキュメントには書いてたけど
        hdfs_src_pathがディレクトリで、hdfs_dst_pathに同名の空ディレクトリが存在する場合もエラー
        """
        LOGGER.info(f'mv from {hdfs_src_path} to {hdfs_dst_path}')
        self.hdfs.move(hdfs_src_path, hdfs_dst_path)

    def get(self, hdfs_src_path: str, local_dst_path: str) -> None:
        """
        指定したパスでget
        subprocess経由でhdfsコマンドを使用
        """
        move_from = f'{self.hdfs_root}{hdfs_src_path}'
        cmd = f'/opt/hadoop/bin/hdfs dfs -get {move_from} {local_dst_path}'
        LOGGER.info(cmd)

        proc = subprocess.run(cmd, shell=True, check=True)
        proc.check_returncode()

    def put(self, local_src_path: str, hdfs_dst_path: str) -> None:
        """
        指定したパスでput
        subprocess経由でhdfsコマンドを使用
        """
        move_to = f'{self.hdfs_root}{hdfs_dst_path}'
        cmd = f'/opt/hadoop/bin/hdfs dfs -put {local_src_path} {move_to}'
        LOGGER.info(cmd)

        proc = subprocess.run(cmd, shell=True, check=True)
        proc.check_returncode()

    def cp(self, hdfs_src_path: str, hdfs_dst_path: str) -> None:
        """
        指定したパスでcp
        subprocess経由でhdfsコマンドを使用
        """
        copy_from = f'{self.hdfs_root}{hdfs_src_path}'
        copy_to = f'{self.hdfs_root}{hdfs_dst_path}'
        cmd = f'/opt/hadoop/bin/hdfs dfs -cp {copy_from} {copy_to}'
        LOGGER.info(cmd)

        proc = subprocess.run(cmd, shell=True, check=True)
        proc.check_returncode()

    def mv(self, hdfs_src_path: str, hdfs_dst_path: str) -> None:
        """
        指定したパスでmv
        subprocess経由でhdfsコマンドを使用
        hdfs_dst_pathに同名のファイルまたはディレクトリが存在する場合エラー
        """
        move_from = f'{self.hdfs_root}{hdfs_src_path}'
        move_to = f'{self.hdfs_root}{hdfs_dst_path}'
        cmd = f'/opt/hadoop/bin/hdfs dfs -mv {move_from} {move_to}'
        LOGGER.info(cmd)

        proc = subprocess.run(cmd, shell=True, check=True)
        proc.check_returncode()

    def rm_file(self, hdfs_path: str) -> None:
        """
        指定したパスでrm
        ディレクトリ削除には対応していない
        subprocess経由でhdfsコマンドを使用
        """
        if not hdfs_path or hdfs_path == '/':
            raise Exception

        rm_path = f'{self.hdfs_root}{hdfs_path}'
        cmd = f'/opt/hadoop/bin/hdfs dfs -rm -f {rm_path}'
        LOGGER.info(cmd)

        proc = subprocess.run(cmd, shell=True, check=True)

        proc.check_returncode()

daoメソッドをcallするコードサンプル

src/hdfs_dao_check_impl.py
import os
import textwrap
import logging
import subprocess
from pyarrow import fs

from src.dao.hdfs_dao import HdsfDao
from src.dto import HdfsConf

LOGGER = logging.getLogger('app')


def hdfs_check_1(hdfs_conf: HdfsConf) -> None:
    """
    HDFS操作のチェック用メソッド
    """
    hdsf_dao = HdsfDao(hdfs_conf)

    # 対象ディレクトリ配下のファイルを削除
    hdsf_dao.rm_dir_contents(hdfs_path)

    # hdfsにディレクトリを作成
    hdsf_dao.mkdir(hdfs_path)

    # hdfsのディレクトリを削除
    hdsf_dao.rm_dir(hdfs_path)

    # hdfsにファイルをput
    hdsf_dao.put_single_file(local_src_path, hdfs_dst_path)

    # hdfsにputしたファイルをgetする
    hdsf_dao.get_single_file(hdfs_src_path, local_dst_path)

    # hdfsにputしたファイルを削除
    hdsf_dao.rm_single_file(hdfs_dst_path)

    # hdfsにファイルをcp
    hdsf_dao.cp_single_file_or_dir(hdfs_src_path, hdfs_dst_path)

    # hdfsにファイルをmv
    hdsf_dao.mv_single_file_or_dir(hdfs_src_path, hdfs_dst_path)

    # 対象ディレクトリをls
    path_list = hdsf_dao.ls_dir(hdfs_path)
    LOGGER.info('target dir. %s', path_list)

まとめ

pyarrowを実際に使用して色々試してみました。
hdfsを操作するという観点では、pyarrowを使っていくのはちょっと厳しいかもしれません。
「いや、こうすればワイルドカード指定(もしくはそれに準ずる方法)できるよ」という方がいればご連絡いただけると大変助かります。
ではでは。

8
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
2