こちらのセクションをウォークスルーします。
Pandas UDFを用いることで、pandas.Seriesやpandas.DataFrameに対する処理を記述することができ、さらにこれをSparkで並列処理することができます。この仕組みはTransformerモデルにも適用できます。
以下の例ではTransformersのpipelineを用いて、英語からフランス語への翻訳を行うパイプラインを作成しています。
import pandas as pd
from transformers import pipeline
import torch
from pyspark.sql.functions import pandas_udf
device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)
@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
return pd.Series(translations)
texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["texts"]))
display(df.select(df.texts, translation_udf(df.texts).alias('translation')))
翻訳されました。今回は2レコードだけでしたが、レコード数増えてもSparkの並列処理の恩恵を受けられると言うことですね。バッチで大量のドキュメントを翻訳すると言うユースケースで効果が出そうです。
さらには別のモデルとタスクを指定することで、NER(Named Entity Recognition: 固有表現抽出)を行うこともできます。
from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)
ner_pipeline(texts)
[[{'entity_group': 'ORG',
'score': 0.99933606,
'word': 'Hugging Face',
'start': 0,
'end': 12},
{'entity_group': 'LOC',
'score': 0.99967843,
'word': 'New York City',
'start': 42,
'end': 55}],
[{'entity_group': 'ORG',
'score': 0.9996372,
'word': 'Databricks',
'start': 0,
'end': 10},
{'entity_group': 'LOC',
'score': 0.999588,
'word': 'San Francisco',
'start': 23,
'end': 36}]]