目次
- ETLプロセスとは?
- ファイル構造
- 必要なライブラリのインストール
- 設定ファイルの作成(config/config.py)
- データ抽出(APIからデータを取得)
- データの変換と整理
- データの保存
- ETLプロセス全体の実行
はじめに
ETLプロセスは、データを扱う基本的な流れです。この流れでは、API等からデータを取り出し、データを整理し、データベースに保存する一連の処理を行います。
この記事では、Pythonを使ってニュースデータを取得し、それを変換して保存する仕組みを作る方法を解説します。
1. ETLプロセスとは?
ETLは3つのステップからなります。
- Extract(取り出し): データを外部から取得することです
- Transform(変換): 取得したデータを、分析や保存に使いやすい形へ変えることです
- Load(保存): 変換したデータをデータベースなどに保存することです
2. ファイル構造
次に、このETLプロセスを作るためのプロジェクトのファイル構成例をご紹介します。
ファイル構造
etl_project/
├── data/ # データ保存用フォルダ
│ └── ai_news.csv # 保存されるCSVファイル
├── config/
│ └── config.py # APIキーやデータベース設定
├── etl/
│ ├── extract.py # データ抽出(APIからデータを取得)
│ ├── transform.py # データ変換(Pandasで整理・クリーニング)
│ └── load.py # データ保存(CSVやDBに保存)
├── main.py # 全体のプロセスを実行するファイル
└── requirements.txt # 必要なライブラリの一覧
3. 必要なライブラリのインストール
次に、必要なPythonライブラリをインストールします。以下のコマンドで簡単にインストールできます。
pip install newsapi-python pandas sqlalchemy psycopg2
4. 設定ファイルの作成(config/config.py)
APIキーやデータベース情報を管理するために、config/config.py
ファイルを作成します。このファイルに、APIキーやデータベースの情報を記載します。
コード: config/config.py
NEWS_API_KEY = 'API_KEY' # APIキー
DB_USER = 'username' # ユーザー名
DB_PASSWORD = 'password' # パスワード
DB_HOST = 'localhost' # ホスト名
DB_PORT = '5432' # ポート番号
DB_NAME = 'db_name' # データベース名
DB_URL = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
APIキーやデータベースのパスワードなどの機密情報は、ソースコードに直接書き込まず、環境変数や設定ファイルで管理することが推奨されます。
5. データ抽出(APIからデータを取得)
ニュースAPIを使って、AIに関するニュースデータを取得します。
コード: etl/extract.py
from newsapi import NewsApiClient
from config.config import NEWS_API_KEY
def extract_news(query='AI', language='en'):
newsapi = NewsApiClient(api_key=NEWS_API_KEY)
news_data = newsapi.get_everything(q=query, language=language, sort_by='publishedAt')
return news_data['articles']
6. データの変換と整理
取得したニュースデータを整えて、必要な情報だけを取り出します。
コード: etl/transform.py
import pandas as pd
def transform_news(articles):
df = pd.DataFrame(articles)
df_cleaned = df[['title', 'description', 'url', 'publishedAt']]
return df_cleaned
7. データの保存
整理したデータをCSVファイルやデータベースに保存します。
コード: etl/load.py
import os
from sqlalchemy import create_engine
from config.config import DB_URL
def load_data_to_csv(df, file_path='data/ai_news.csv'):
df.to_csv(file_path, mode='a', header=not os.path.exists(file_path), index=False)
print(f"データが {file_path} に保存されました。")
def load_data_to_db(df, table_name='ai_news'):
engine = create_engine(DB_URL)
df.to_sql(table_name, con=engine, if_exists='append', index=False)
print(f"データがデータベースの {table_name} に追加されました。")
8. ETLプロセス全体の実行
最後に、全てのプロセスを一括で実行するためのコードです。
コード: main.py
from etl.extract import extract_news
from etl.transform import transform_news
from etl.load import load_data_to_csv, load_data_to_db
def main():
articles = extract_news()
df_cleaned = transform_news(articles)
load_data_to_csv(df_cleaned)
load_data_to_db(df_cleaned)
if __name__ == "__main__":
main()
まとめ
この記事では、ニュースデータをAPIから取得し、整理し、データベースに保存するETLプロセスを作る方法をご紹介いたしました。