確定申告の時期になると外貨で支払いがあった収入を日本円になおす作業をしています。為替レート(TTM)は、みずほ銀行が、以下の Web サイトで CSV ファイルとして、公開していますが、かなり古いデータも入っているので、今年のデータまでスクロールするのが、なんだかなぁ(加藤恵)っと思ってました。
そこで、Python で必要な年と通貨だけの CSV ファイルにしようというお話です。
環境
Python: 3.9.6
処理の流れ
- CSV ファイルのダウンロード
- 文字コードを変換
- ヘッダーを編集
- 必要な通貨データを抽出
- 年ごとに分けてファイルを保存
1. CSV ファイルのダウンロード
みずほ銀行の Webサイト から CSV ファイルをダウンロードします。
import sys
import requests
def download_csv(url, output_file):
response = requests.get(url)
# ステータスコードが200(成功)であることを確認
if response.status_code == 200:
with open(output_file, 'wb') as file:
file.write(response.content)
print(f'Successfully downloaded {output_file}')
else:
print(f'Failed to download file. Status code: {response.status_code}')
sys.exit(1)
# URLと出力ファイル名を指定
csv_url = 'https://www.mizuhobank.co.jp/market/quote.csv'
output_file_path = 'quote_raw.csv'
# ダウンロード関数を呼び出し
download_csv(csv_url, output_file_path)
2. 文字コードを変換
ダウンロードできる CSV ファイルの文字コードは、Shift-JIS なので、Mac 環境では、UTF-8へ変換しておきます。変換は、pandas を使用します。pandas は、pip install pandas
でインストールできます。
import platform
import shutil
import pandas as pd
def convert_shiftjis_to_utf8(input_file_path, output_file_path):
print("Running on Mac")
# Shift-JISでエンコードされたCSVファイルを読み込み
df = pd.read_csv(input_file_path, encoding='shift_jis')
# UTF-8でエンコードしてCSVファイルとして保存
df.to_csv(output_file_path, encoding='utf-8', index=False)
# Macの場合は文字コードを変更する
input_file_path = output_file_path
output_file_path = 'quote_os.csv'
if platform.system() == 'Darwin':
# 関数を呼び出してファイルを変換
convert_shiftjis_to_utf8(input_file_path, output_file_path)
else:
# 単純にコピー
shutil.copy(input_file_path, output_file_path)
3. ヘッダーを編集
理由はわかりませんが、ダウンロードできる CSV ファイルのヘッダー部分に DATE(日付)がないので、DATE を追加して、ついでに日本語の通貨名の行も消しておきます。
,米ドル,英ポンド,ユーロ,カナダドル,...
,USD,GBP,EUR,CAD,...
2002/4/1,133.15,189.79,116.12,83.48,...
2002/4/2,133.2,191.78,117.18,83.38,...
...
DATE,USD,GBP,EUR,CAD,...
2002/4/1,133.15,189.79,116.12,83.48,...
2002/4/2,133.2,191.78,117.18,83.38,...
...
def add_date_to_header(input_file, output_file):
with open(input_file, 'r', newline='') as f_in:
reader = csv.reader(f_in)
rows = list(reader)
if rows:
# 既存のヘッダーを取得し、'DATE' を先頭に追加
original_header = rows[2]
original_header[0] = 'DATE'
new_header = original_header
# 残りのデータ行
data_rows = rows[3:]
with open(output_file, 'w', newline='') as f_out:
writer = csv.writer(f_out)
# 新しいヘッダーを書き込み
writer.writerow(new_header)
# 残りのデータ行を書き込み
writer.writerows(data_rows)
# ヘッダーに DATE を追加
input_file_path = output_file_path
output_file_path = 'quote_with_date_header.csv'
# 関数を呼び出してヘッダーに'DATE'を追加
add_date_to_header(input_file_path, output_file_path)
4. 必要な通貨データを抽出
ヘッダーを整えた CSV ファイルに対して必要な通貨(ここでは、米ドル、英ポンド、ユーロ、カナダドル)を指定して抽出します。
import csv
def extract_columns(input_file, output_file, fieldnames):
with open(input_file, 'r') as f_in:
reader = csv.DictReader(f_in)
with open(output_file, 'w', newline='') as f_out:
writer = csv.DictWriter(f_out, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
new_row = {key: row[key] for key in fieldnames}
writer.writerow(new_row)
# 指定した通貨データを抽出
input_file_path = output_file_path
output_file_path = 'extracted_data.csv'
fieldnames = ['DATE', 'USD', 'GBP', 'EUR', 'CAD']
extract_columns(input_file_path, output_file_path, fieldnames)
5. 年ごとに分けてファイルを保存
あとは、必要な年の分だけ、CSV ファイルを出力して終わりです。2024_data.csv というファイル名で保存されます。
import csv
def extract_year_data(input_file, output_file, year):
with open(input_file, 'r') as f_in:
reader = csv.reader(f_in)
header = next(reader) # ヘッダー行を読み飛ばす
with open(output_file, 'w', newline='') as f_out:
writer = csv.writer(f_out)
writer.writerow(header) # ヘッダー行を書き込む
for row in reader:
date = row[0]
if date.startswith(str(year)):
writer.writerow(row)
# 2020年から2024年までの通貨データを抽出
input_file_path = output_file_path
for target_year in range(2020, 2025):
output_file_path = f'{target_year}_data.csv'
extract_year_data(input_file_path, output_file_path, target_year)
ソースコード (全部)
import sys
import csv
import platform
import shutil
import pandas as pd
import requests
def download_csv(url, output_file):
response = requests.get(url)
if response.status_code == 200:
with open(output_file, 'wb') as file:
file.write(response.content)
print(f'Successfully downloaded {output_file}')
else:
print(f'Failed to download file. Status code: {response.status_code}')
sys.exit(1)
def convert_shiftjis_to_utf8(input_file_path, output_file_path):
df = pd.read_csv(input_file_path, encoding='shift_jis')
df.to_csv(output_file_path, encoding='utf-8', index=False)
def add_date_to_header(input_file, output_file):
with open(input_file, 'r', newline='') as f_in:
reader = csv.reader(f_in)
rows = list(reader)
if rows:
original_header = rows[2]
original_header[0] = 'DATE'
new_header = original_header
data_rows = rows[3:]
with open(output_file, 'w', newline='') as f_out:
writer = csv.writer(f_out)
writer.writerow(new_header)
writer.writerows(data_rows)
def extract_columns(input_file, output_file, fieldnames):
with open(input_file, 'r') as f_in:
reader = csv.DictReader(f_in)
with open(output_file, 'w', newline='') as f_out:
writer = csv.DictWriter(f_out, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
new_row = {key: row[key] for key in fieldnames}
writer.writerow(new_row)
def extract_year_data(input_file, output_file, year):
with open(input_file, 'r') as f_in:
reader = csv.reader(f_in)
header = next(reader)
with open(output_file, 'w', newline='') as f_out:
writer = csv.writer(f_out)
writer.writerow(header)
for row in reader:
date = row[0]
if date.startswith(str(year)):
writer.writerow(row)
if __name__ == '__main__':
csv_url = 'https://www.mizuhobank.co.jp/market/quote.csv'
output_file_path = 'quote_raw.csv'
download_csv(csv_url, output_file_path)
input_file_path = output_file_path
output_file_path = 'quote_os.csv'
if platform.system() == 'Darwin':
convert_shiftjis_to_utf8(input_file_path, output_file_path)
else:
shutil.copy(input_file_path, output_file_path)
input_file_path = output_file_path
output_file_path = 'quote_with_date_header.csv'
add_date_to_header(input_file_path, output_file_path)
input_file_path = output_file_path
output_file_path = 'extracted_data.csv'
fields = ['DATE', 'USD', 'GBP', 'EUR', 'CAD']
extract_columns(input_file_path, output_file_path, fields)
input_file_path = output_file_path
for target_year in range(2020, 2025):
output_file_path = f'{target_year}_data.csv'
extract_year_data(input_file_path, output_file_path, target_year)
さいごに
正直に言うと、ちょっと冗長なコードだなと思っていますが、勉強をかねているのでよしとしています。一年に一回しか使わないのは、ナイショの話。