はじめに
本記事では、Pythonを使って、データの加工と分析の流れを簡易的にハンズオンします。
お急ぎの方は次のGoogle Colaboratoryのリンクからコードを試すことができます。
やりたいこと
技術評論社のRSSからSQLクエリを使って、最も使用頻度の高いワードを使った記事のタイトルとリンクを取得する。
ステップ
- 技術評論社のRSS(XML形式)をJSON形式へ変換
- JSONからRSSのtitleタグを取得し、タイトル一覧をtxtファイルに出力
- タイトル一覧.txtから使用頻度の高いワードを抽出
- JSONファイルとtitle情報からデータ分析・取得を行う
PandasとPySpark
今回使うPythonのパッケージにはPandasとPySparkがあります。コードの紹介の前に、まずは二つのパッケージの概要について説明しましょう。
Pandas
Pandasは、データ分析および機械学習タスクに最も広く使用されているオープンソースPythonパッケージです。Pandas を使用すると、「データクレンジング」「データ分析」「統計分析」など、時間のかかる反復的なタスクを簡単に行うことができます。
ちなみに”Pandas”という名前は「Panel Data」から来ているそうです。
PySpark
PySparkは「Apache Spark」のPython APIです。
Apache Sparkは、大規模なデータ処理のための統合分析エンジンです。
Apache Sparkの機能の一つに構造化データ処理用の「Spark SQL」があります。
Spark SQLを使うことで、SQLの書き方でデータ分析を行えるようになります。
1. 技術評論社のRSS(XML形式)をJSON形式へ変換
まずは、RSSがXML形式のため、JSON形式へ変換します。JSONにする理由は、ファイルが視覚的にわかりやすく、またkey:valueという非常に汎用的に使えるデータ形式だからです。
Pandasは元データがXMLでもCSVでもJSONでも、データを読み込むことができるため、開発者はデータ処理に注力できるのが魅力の一つです。
技術評論者のRSSのリンクは以下です。
https://gihyo.jp/feed/rss2
Colaboratory(以下Colab)のルートにリンクから取得したXMLファイル「feed.xml」を配置しています。次のコードを実行すると、data/feed.json
が出力されます。
※data/feed.json
は一時ファイルのため、時間経過で削除されます。
import pandas as pd
import os
# 入力XMLファイルのパス
input_xml_path = "feed.xml"
# 出力JSONファイルのパス
output_json_path = "data/feed.json"
# ファイルの存在確認
if not os.path.exists(input_xml_path):
print(f"ファイルが見つかりません: {input_xml_path}")
else:
try:
# XMLファイルの 'item' タグをDataFrameに変換
df_items = pd.read_xml(input_xml_path, xpath=".//item", encoding="utf-8")
# DataFrameをJSON形式の文字列に変換
json_str = df_items.to_json(orient='records', force_ascii=False, date_format='iso')
# JSON形式の文字列をファイルに書き出す
with open(output_json_path, "w", encoding='utf-8') as f:
f.write(json_str)
except Exception as e:
print(f"ファイルの読み込み中にエラーが発生しました: {e}")
コード解説
# XMLファイルの 'item' タグをDataFrameに変換
df_items = pd.read_xml(input_xml_path, xpath=".//item", encoding="utf-8")
pd.read_xml(ファイル名)
でXMLファイルを読み込んでいます。
オプションのxpath=".//item"
は<item>
タグの内側のみ読み込むという意味です。XMLファイルには余分なタグがいくつかあるため、このオプションを使用しています。
# DataFrameをJSON形式の文字列に変換
json_str = df_items.to_json(orient='records', force_ascii=False, date_format='iso'
この行では、DataFrame(XML)の内容をJSON形式の文字列に変換しています。
orient='records'
は、出力されるJSONの形式を指定しています。records は、DataFrame(XML)の各行を個別のJSONオブジェクトとして出力する形式を意味しています。
force_ascii=False
は、JSON文字列に非ASCII文字(日本語など)が含まれる場合に、それらをエスケープせずにそのまま出力するよう指定しています。このオプションを指定しないと、非ASCII文字はエスケープされた形(例えば \uXXXX など)で出力されます。
date_format='iso'
は、DataFrame内の日付データのフォーマットをISO 8601標準にするよう指定しています。これにより、日付と時刻は YYYY-MM-DDTHH:MM:SS の形式で出力されます。
2. JSONからRSSのtitleタグを取得し、txtファイルに出力
タイトルから、最も使用頻度の高いワードを取得したいため、先ほど作成したfeed.json
からtitleキーのバリューを抜き出しタイトル一覧ファイルを作成します。
# JSONファイルを読み込み、DataFrameに変換
df = pd.read_json("data/feed.json")
# 'title' 列のデータを取得
titles = df['title'].tolist()
# 'title' のデータをtxtファイルに保存
with open('data/title.txt', 'w', encoding='utf-8') as f:
for title in titles:
f.write(title + '\n')
3. タイトル一覧.txtから使用頻度の高いワードを抽出
PySparkを使ってタイトル一覧から使用頻度の高いワードを抽出します。
ここで一つ謝罪があります。頻出ワードを取得する単語は、英語にする必要があります。しかし翻訳するコードを書く手間よりGoogle翻訳に突っ込んだ方が早かったので、タイトル一覧ファイルを翻訳した、英語版タイトル一覧"title_en.txt"
をColabのルートディレクトリに配置しました。
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import explode,split,col,lower
# ストップワードのリストを定義
stopwords = ['of', 'and', 'the', 'is', 'in', 'to', 'a', 'for', 'on', 'with', 'by', 'at', 'from', 'as', 'it', 'an', 'are', 'this', 'that', 'be', 'not', 'or', 'but', 'if', 'than', 'so', 'such', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'now', '-', 'no.', '/','using','new','about','how','2023']
spark = SparkSession.builder\
.master("local")\
.appName('word_count')\
.getOrCreate()
df=spark.read.text("title_en.txt")
df_count = (
df.withColumn('word', explode(split(lower(col('value')), ' '))) # 単語を小文字に変換
.filter(~col('word').isin(stopwords)) # ストップワードを除外
.groupBy('word')
.count()
.sort('count', ascending=False)
)
df_count.show()
次のような結果になります。最頻単語は「ubuntu」でしたね。
コード解説
spark = SparkSession.builder\
.master("local")\
.appName('word_count')\
.getOrCreate()
Apache Sparkの初期化をしています。
df_count = (
df.withColumn('word', explode(split(lower(col('value')), ' '))) # 単語を小文字に変換
.filter(~col('word').isin(stopwords)) # ストップワードを除外
.groupBy('word')
.count()
.sort('count', ascending=False)
)
読み込んだ英語版タイトル一覧から、頻出単語をカウントしています。大文字と小文字で別カウントになってしまうため、pyspark.sqlの関数lower()
を使って、全て小文字にしています。
また、ストップワードを除外しています。ストップワードとは除外ワードとも呼ばれ、テキスト検索処理に関連する内容をもたないワードのことです("and"、"or"、"in" など)。
4. JSONファイルとtitle情報からデータ分析・取得を行う
最頻単語は「Ubuntu」であることを発見したため、「Ubuntu」が使われているタイトルの記事一覧を取得します。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# JSONファイルを読み込む
df = spark.read.json("data/feed.json")
# タイトルとリンクを取得
df.createOrReplaceTempView("temp_view")
ubuntu = spark.sql("SELECT title, link FROM temp_view WHERE title LIKE '%Ubuntu%'")
# 結果を表示する
ubuntu.show(truncate=False)
コード解説
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Apach Sparkの新しいセッションを作成しています。
# タイトルとリンクを取得
df.createOrReplaceTempView("temp_view")
ubuntu = spark.sql("SELECT title, link FROM temp_view WHERE title LIKE '%Ubuntu%'")
createOrReplaceTempView
メソッドを呼び出すと、一時的にビューテーブルを作成することができ、Spark SQLでSQLライクな書き方でデータ取得を行うことができます。
まとめ
Pandasを使ってデータ加工、PySparkを使って、データ分析を行いました。
備考
本記事は「WEB+DB PRESS Vol.128」の特集1演習で慣れる!データベース入門SQL操作,テーブル設計,分析用データ処理を参考にしています。
https://gihyo.jp/magazine/wdpress/archive/2022/vol128
参考