import pathlib
import pandas as pd
import pdfplumber
import requests
def fetch_file(url, dir="."):
p = pathlib.Path(dir, pathlib.PurePath(url).name)
p.parent.mkdir(parents=True, exist_ok=True)
if not p.exists():
r = requests.get(url)
r.raise_for_status()
with p.open(mode="wb") as fw:
fw.write(r.content)
return p
def snap_adjustment(s, limit=5):
count = s.value_counts().sort_index()
index = 0
value = 0
for i, v in count.items():
if (i - index) < limit:
if v > value:
s = s.replace(index, i)
index = i
value = v
else:
s = s.replace(i, index)
else:
index = i
value = v
return s
# PDFリスト
links = [
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/201901.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/201902.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou3103.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou3104.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou0105.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou0106.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou0107.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou0108.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou0109.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou0110.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou0111.pdf",
"https://www.pref.okinawa.jp/site/norin/oroshiuri/documents/documents/geppou0112.pdf",
]
"""
# ディレクトリ名と範囲を指定
+ yasai
[6:10]
+ kudamono
[10:13]
+ kiribana
[29:32]
+ hachimono
[32:35]
"""
# ディレクトリ名
DATA_DIR = "kiribana"
for link in links:
print(link)
path_pdf = fetch_file(link, DATA_DIR)
with pdfplumber.open(path_pdf) as pdf:
dfs = []
flag = False
# ページ範囲指定(0スタート)
for page in pdf.pages[29:32]:
print(page.page_number, end=" ")
def test(obj):
if obj["object_type"] == "rect":
if obj["height"] > 2:
return False
return True
# rectフィルター
filtered = page.filter(test)
# lineの場合
if page.lines:
print("line")
# 最初と最後のラインの範囲を指定
crop = page.within_bbox(
(0, page.lines[0]["top"] - 2, page.width, page.lines[-1]["top"] + 2)
)
# rectの線のみ抽出
elif filtered.rects:
print("rect")
crop = page.within_bbox(
(
0,
filtered.rects[0]["top"] - 2,
page.width,
filtered.rects[-1]["top"] + 2,
)
)
# 位置指定
else:
# 失敗する場合はこちらで直接範囲指定
crop = page.within_bbox((0, 105, page.width, 680))
print("error")
df_tmp = (
pd.DataFrame(crop.extract_words(keep_blank_chars=True))
.astype({"x0": float, "x1": float, "top": float, "bottom": float})
.sort_values(["top", "x0"])
)
df_tmp["top"] = snap_adjustment(df_tmp["top"])
df_tmp["page"] = page.page_number
dfs.append(df_tmp)
if dfs:
# 結合
df = pd.concat(dfs)
# テキスト処理
df["text"] = df["text"].str.replace(" ", "").str.replace(",", "")
df["center"] = df.loc[:, ["x0", "x1"]].median(axis=1)
# 中央基準
df["center"] = snap_adjustment(df["center"], 25)
# 座標で並び替え
table = (
df.pivot_table(
index=["page", "top"],
columns="center", # 基準を指定:"x0", "x1", "center"
values="text",
aggfunc=lambda x: "".join(str(v) for v in x),
)
).values
df1 = pd.DataFrame(table).dropna(thresh=2).dropna(how="all", axis=1)
# 「品目」の行は除去
df2 = df1[df1[0] != "品目"].copy()
# 偶数行の品目を補完
df2[0] = df2[0].fillna(method="ffill")
# 奇数行抽出
df_even = (
df2[::2]
.set_axis(
[
"品目",
"県内_数量",
"県内_単価",
"県外_数量",
"県外_単価",
"外国_数量",
"外国_単価",
"総計_数量",
"総計_単価",
],
axis=1,
)
.dropna(how="all", axis=1)
)
# 偶数行抽出
df_odd = (
df2[1::2]
.set_axis(
[
"品目",
"県内_金額",
"県内_単価",
"県外_金額",
"県外_単価",
"外国_金額",
"外国_単価",
"総計_金額",
"総計_単価",
],
axis=1,
)
.dropna(how="all", axis=1)
)
# 奇数行と偶数行を1行に結合、並び替え
df3 = (
pd.merge(df_even, df_odd, on="品目")
.set_index("品目")
.reindex(
columns=[
"県内_数量",
"県内_単価",
"県内_金額",
"県外_数量",
"県外_単価",
"県外_金額",
"外国_数量",
"外国_単価",
"外国_金額",
"総計_数量",
"総計_単価",
"総計_金額",
]
)
)
# CSV保存
path_csv = path_pdf.with_suffix(".csv")
df3.to_csv(path_csv, encoding="utf_8_sig")
else:
print("error!")