Luigiとは
Luigiは音楽ストリーミングサービスを展開するSpotifyがOSSとして開発しているデータパイプライン制御フレームワークです。名称は某有名ゲームキャラクターの弟から取ってきているようです。
Luigiは基本的に以下のメソッドを含むタスクを定義してデータパイプラインの制御を行っています。
requiresメソッド:タスクに必要な(依存している)タスクを返す
runメソッド:タスクで実施する処理(依存タスクの出力ファイルを読み込み、そのデータに対して処理を行う)
outputメソッド:タスクが出力するファイル
今回は、Luigiを自然言語処理でどう使っているのかにフォーカスしてお話しするため、基本的な使い方は省略させていただきます。
自然言語処理でどう使うのか
自然言語処理で何かを行うとなった際には、基本的に元データからモデルの学習を行うまでに一連の前処理をしなくてはいけないです。(特に日本語文書ですと分かち書き処理は必須事項です)
前処理とひとえにいっても色々な処理があります。
・形態素解析(分かち書き)
・品詞による絞り込み
・ストップワード除外
・n-gram生成
・etc...
様々な手法を試そうとした時に、これらの前処理はほとんど共通する部分が多いです。そのため、この前処理を共通化・標準化して処理時間やデータ量を削減しつつ煩雑なデータ管理を簡単化するためにLuigiを用いています。
具体的には、以下の様にタスクを分けて色々な検証を行っています。
・元データを結合するタスク
・利用列に絞り込むタスク
・形態素解析を実施するタスク
(品詞による絞り込みや分かち書き処理なども同時に行なっている)
・各手法に固有に必要なタスク
(パラメータは設定ファイルに切り出しておく)
・モデルを作成するタスク
・モデルによる予測等を実施するタスク
元データの結合から、利用列の絞り込み、分かち書きまでをLuigi上で実施する際の例が以下です。
import luigi
import pandas as pd
import MeCab
import ast # 文字列から配列をパースするのに利用
import configparser # 形態素解析系の設定情報を読み込む
import unicodedata
import nlp #自作ライブラリ
# 大元のデータを読み込んでデータフレームにする。
# ファイルが複数個あって結合が必要な場合、このタスク内で実施しています。
class MakeData(luigi.Task):
def run(self):
df = pd.read_excel("original_data.xlsx")
self.output().makedirs()
df.to_pickle(self.output().fn)
def output(self):
return luigi.LocalTarget("pickle/data.pickle", format=luigi.format.Nop)
# 利用する列のみに絞り込んだデータフレームを作るタスク
class FilterData(luigi.Task):
target_cols = luigi.Parameter()
def requires(self):
return MakeData()
def run(self):
df = pd.read_pickle(self.input().fn)
target_cols = ast.literal_eval(self.target_cols)
df = df[target_cols]
self.output().makedirs()
df.to_pickle(self.output().fn)
def output(self):
target_cols = ast.literal_eval(self.target_cols)
target_name = "_".join(target_cols)
return luigi.LocalTarget(f"pickle/filtered_data_{target_name}.pickle", format=luigi.format.Nop)
# 利用列の形態素解析を行い、分かち書きするタスク
class Wakati(luigi.Task):
target_cols = luigi.Parameter()
config_name = luigi.Parameter()
def requires(self):
return FilterData(target_cols=self.target_cols)
def run(self):
target_cols = ast.literal_eval(self.target_cols)
# 設定ファイルの読み込み
parser = configparser.SafeConfigParser()
parser.read(f"config/{self.config_name}.ini")
# 利用する形態素解析器を設定ファイルから読み取り
morph_analyzer = parser.get("settings", "morph_analyzer")
# 品詞の絞り込みルールを設定ファイルから読み取り
filter_rules = parser.get("settings", "filter_rules")
# 入力ファイルを読み込み
df = pd.read_pickle(self.input().fn)
# 形態素解析器を用意
tagger = None
if morph_analyzer == "mecab":
dictionary = parser.get("settings", "dictionary")
tagger = MeCab.Tagger(f"-d {dictionary}")
tagger.parse("")
# データフレームの各行に対して適用する分かち書き処理
def __wakati(text, tagger):
text = unicodedata.normalize("NFKC", text)
# 形態素のリストを取得
morphs = nlp.morphological_analysis(text, tagger)
# 形態素のリストを品詞情報を元に絞り込み
morphs = nlp.filter_morph(morphs, filter_rules)
# 形態素の表層形をスペースで連結する
return nlp.concat_contents(morphs, lambda x: x.surface)
# 対象の列すべてに対して分かち書き処理を実施
for target_col in target_cols:
df[target_col] = df[target_col].apply(lambda x: __wakati(x, tagger))
df["result"] = df[target_cols].apply(lambda x: " ".join(x), axis=1)
# テキストファイルに分かち書き結果を出力
self.output().makedirs()
with self.output().open("w") as output_file:
for result in df["result"]:
output_file.write(result)
output_file.write("\n")
def output(self):
target_cols = ast.literal_eval(self.target_cols)
target_name = self.config_name + "_".join(target_cols)
return luigi.LocalTarget(f"data/{target_name}.txt")
利用する列はパラメータとして直接与える様にしており、利用する形態素解析器や辞書、形態素の品詞による絞り込みルールなどは以下の様な設定ファイルに切り出しています。
[settings]
morph_analyzer = mecab
dictionary = /opt/local/lib/mecab/dic/mecab-ipadic-neologd
filter_rules = [
{"*": True, "pos": ["名詞"]},
{"*": False, "pos":[]}]
そして、以下のコマンドで処理を実行しています。
すると、利用する列と形態素解析の設定ファイルに基づいた処理がなされ、以下の様にデータが生成されます。
all_task.py data/ original_data.xlsx
config/ nlp.py pickle/
./config:
default.ini mecab_neologd.ini
./data:
mecab_neologd_col1_col2.txt
./pickle:
data.pickle filtered_data_col1_col2.pickle
初めにディレクトリの設計や設定ファイルの切り出しを行ってしまえば、ちょっとした前処理の変更の際に手間取らずに済みます。
例えば、品詞の絞り込みで数字だけ除外したケースで前処理をしたい場合には、以下の様に設定に変更を加えて
PYTHONPATH=. luigi --module all_task Wakati --target-cols "['col1','col2']" --config-name mecab_neologd_except_number --local-scheduler
を実行するだけで、設定により処理に違いが生じるWakatiタスクのみを実行することができます。
以前に実行した処理結果との共通部分を意識せずに違いが生じる部分のみ実行できるのは大変便利です。
[settings]
morph_analyzer = mecab
dictionary = /opt/local/lib/mecab/dic/mecab-ipadic-neologd
filter_rules = [
{"*": False, "pos": ["名詞", "数"]},
{"*": True, "pos": ["名詞"]},
{"*": False, "pos":[]}]
all_task.py data/ original_data.xlsx
config/ nlp.py pickle/
./config:
default.ini mecab_neologd.ini
./data:
mecab_neologd_col1_col2.txt mecab_neologd_except_number_col1_col2.txt <- これが追加される
./pickle:
data.pickle filtered_data_col1_col2.pickle
簡単な例だと少しLuigiの便利さがわかりづらいですが、前処理のバリエーションが増えてくると処理内容と処理結果を紐づけた追跡がしやすくLuigiの良さが見えてきます。また、パラメータや設定ファイルの変更だけで色々なパターンのデータが作れるようになるのも利点として挙げられます。
今回は、記事作成にあまり時間が取れなかったため簡単な使い方の紹介となってしまいましたが、
自然言語処理など複雑な前処理が必要なケースでは結構役に立つかと思いますので利用してみてください。