PdfPlumber 大量pPDFファイル一括処理:行が揺れてるPDF:コード解説_1
の続きです
サンプルpdfファイルと前回の内容はこちら
初心者でもわかりやすく解説します
処理フローや構文が読めてる方は、適宜スキップして読んで下さい
📄 PDF処理の詳細フロー図解
PDFファイル
↓
【1】convert_pdfs()
↓
【2】extract_table() を呼び出し
↓
PDFからテーブル抽出
↓
【3】merge_rows() を呼び出し
↓
分割された行を結合
↓
【2】extract_table() に戻る
↓
【1】convert_pdfs() に戻る
↓
結果: アンパックして、header, rows, marksという変数に渡される
🔵 1. convert_pdfs() - メイン処理
役割
1つのPDFファイルから表のデータを取り出す
def convert_pdfs(config: Config):
"""PDFファイルをCSVに一括変換"""
pdf_files = prepare_files(config)
for idx, pdf_file in enumerate(pdf_files, start=1):
logger.info(f"処理中 {idx}/{len(pdf_files)}: {pdf_file.name}")
try:
header, rows, marks = extract_table(pdf_file, config.page_slice,
config.row_slice)
if not header or not rows:
continue
df = create_df(header, rows, pdf_file, marks)
append_csv(df, config.output_csv)
except Exception as e:
logger.error(f"エラー {pdf_file.name}: {e}")
# 30ファイルごとに休憩
if idx % 30 == 0:
logger.info("30ファイル処理完了。3秒休憩...")
time.sleep(3)
ステップバイステップ解説
ステップ1: ファイル準備
pdf_files = prepare_files(config)
何をしてる?
- prepare_files(config)で処理された
- 自然数順に並べ替えたPDFファイル群を
- 作業フォルダに移動する
- ファイルのリストを返す
PdfPlumber 大量pPDFファイル一括処理:行が揺れてるPDF:コード解説_1
で解説
ステップ2: 各PDFを順番に処理
for idx, pdf_file in enumerate(pdf_files, start=1):
logger.info(f"処理中 {idx}/{len(pdf_files)}: {pdf_file.name}")
何をしてる?
- enumerate(pdf_files, start=1) でインデックス番号付きループ
- idx = 1, 2, 3, ...(ファイル番号)
- pdf_file = 各PDFファイルのパス
- 返される型をタプル型
例
# 1回目のループ
idx = 1
pdf_file = Path("file001.pdf")
# ログ: "処理中 1/3: file001.pdf"
# 2回目のループ
idx = 2
pdf_file = Path("file002.pdf")
# ログ: "処理中 2/3: file002.pdf"
図解: enumerate の動き
pdf_files = [file001.pdf, file002.pdf, file003.pdf]
↓
enumerate(pdf_files, start=1)
↓
(1, file001.pdf) ← idx=1, pdf_file=file001.pdf
(2, file002.pdf) ← idx=2, pdf_file=file002.pdf
(3, file003.pdf) ← idx=3, pdf_file=file003.pdf
ステップ3: 表データを抽出
def convert_pdfs(config: Config):
.......................
......................
try:
header, rows, marks = extract_table(pdf_file, config.page_slice,
config.row_slice) ← ココでextract_tableに移動
if not header or not rows:
continue
🔵 2. extract_table() - 表データ抽出
役割
1つのPDFファイルから表のデータを取り出す
# ===== テーブルデータ抽出 =====
def extract_table(pdf_path: Path, page_slice: slice, row_slice) -> tuple[
list[str], list[list[str]], list[str]]:
"""PDFからテーブルデータを抽出"""
with pdfplumber.open(pdf_path) as pdf:
all_rows_marks = []
header = None #ヘッダー行を自動認識させずテーブル行として扱う
found = False
# 指定ページから抽出
for i, page in enumerate(pdf.pages[page_slice],
start=page_slice.start or 0):
table = page.extract_table()
if not table or len(table) < 2:
continue
if header is None:
header = table[0]
data_rows = table[1:2]
if data_rows:
page_mark = f"{i + 1}p"
all_rows_marks.extend(
(row, page_mark) for row in data_rows)
found = True
print(all_rows_marks)
# フォールバック: 全ページスキャン
if not found:
for page_index,page in enumerate(pdf.pages):
table = page.extract_table()
if table is None or len(table) < 2:
continue
header = table[0]
data_rows = table[1:2]
if not data_rows:
continue
page_mark = f"{page_index + 1}p"
all_rows_marks.extend((row, page_mark) for row in data_rows)
found = True
print(f"⚠️ フォールバック発動: {pdf_path.name}")
break
if header and all_rows_marks:
merged_rows, marks = merge_rows(all_rows_marks)
return header, merged_rows, marks
print(f"extract_table結果:{all_rows_marks}")
logger.error(f"テーブルが見つかりません: {pdf_path.name}")
return [], [], []
ステップバイステップ解説
ステップ1: PDFを開く
with pdfplumber.open(pdf_path) as pdf:
何をしている?
- PDFファイルを読み込み専用で開く
- with 文なので、処理後は自動的に閉じる
ステップ2: 変数の初期化
all_rows_marks = [] # 全ての行とページ番号を保存
header = None # 表の見出し(まだ不明)
found = False # 表が見つかったか
各変数の最終形態
# 処理後のイメージ
all_rows_marks = [
(["High", "Lib1", "100"], "1p"),
(["Medium", "Lib2", "200"], "1p"),
(["Low", "Lib3", "300"], "2p")
]
header = ["Quality", "Library", "Length"]
found = True # 表が見つかった
ステップ3: 指定ページから表を抽出
for i, page in enumerate(pdf.pages[page_slice],
start=page_slice.start or 0):
table = page.extract_table()
if not table or len(table) < 2:
continue
何をしている?
- enumerateは要素のペアが、tuple(タプル)型(インデックス,page) を順次生成
- page_sliceで指定されたページだけを処理
- 各ページからextract_table()で表を取得
- 表がない、または表の行数が2未満(1行はヘッダー)の場合スキップ(continue)
図解:ページ のスキャン
PDFファイル(5ページ)
page_slice = slice(0, 3) # 0, 1, 2ページ目を処理
┌────┬────┬────┬────┬────┐
│ 1p │ 2p │ 3p │ 4p │ 5p │
└────┴────┴────┴────┴────┘
↑ ↑ ↑
処理 処理 処理 スキップ
ステップ4:ヘッダーとデータを分離
if header is None:
header = table[0] # 1行目をヘッダーとして保存
data_rows = table[1:][row_slice] # 2行目以降からデータを取得
- 最初に見つかった表の1行
- 2ページ以降if header is NoneはFalseとなり
- if文ブロックから外れてdata_rows = table[1:][row_slice]を通常実行
- 2行目以降からrow_sliceで指定された行を取得
- table[1:][row_slice]はまず[1:]を処理その処理範囲で[row_slice]を実行
例:コード解析
# table(ページから取得した表)
table = [
["Quality", "Library", "Length"], # 0行目(ヘッダー)
["High", "Lib1", "100"], # 1行目
["Medium", "Lib2", "200"], # 2行目
["Low", "Lib3", "300"] # 3行目
]
# row_slice = slice(None) # 全行取得の場合
header = table[0] # ["Quality", "Library", "Length"]
data_rows = table[1:] # 1行目以降全て
# row_slice = slice(0, 2) # 最初の2行だけの場合
data_rows = table[1:][0:2] # 1行目と2行目のみ
# [["High", "Lib1", "100"], ["Medium", "Lib2", "200"]]
ステップ5:ページマークを付けて保存
if data_rows:
page_mark = f"{i + 1}p" # ページ番号(1p, 2p, ...)
all_rows_marks.extend(
(row, page_mark) for row in data_rows)
found = True
何をしてる?
- all_rows_marks.extend(
(row, page_mark)for row in data_rows):
- ジェネレータ式です
- タプルでrow, とpage_markのペアを生成
- ページ番号のラベル(ヘッダーのpage_number列)を作る(例:"1p", "2p")
- all_rows_marksに追加
- append()を使うと、とんでもないことになります
図解:データの蓄積
1ページ目の処理
data_rows = [["High", "Lib1", "100"], ["Medium", "Lib2", "200"]]
page_mark = "1p"
↓
all_rows_marks = [
(["High", "Lib1", "100"], "1p"), ← extend()でdata_rowsのリスト内
(["Medium", "Lib2", "200"], "1p") のリストを一つの要素(row)、もう
一つの要素page_markのstr
をペアでall_rows_marksリストに追加していく
append()は塊で追加、このprojectには向かない
(row, page_mark)の括弧がタプルで返すあらわれ
]
2ページ目の処理
data_rows = [["Low", "Lib3", "300"]]
page_mark = "2p"
↓
all_rows_marks = [
(["High", "Lib1", "100"], "1p"),
(["Medium", "Lib2", "200"], "1p"),
(["Low", "Lib3", "300"], "2p") ← 追加
]
ステップ6:フォールバック関数
if not found:
for page_index, page in enumerate(pdf.pages):
table = page.extract_table()
if table and len(table) < 2:
continue
何をしてる?
- 指定ページで表が見つからなっかった場合の
- 全ページ最初から探します
- 最初に見つかった表を使う
図解:フォールバックの流れ
指定ページ(0-2p)で表が見つからない
↓
全ページ(1-5p)をスキャン
↓
3ページ目で表を発見
↓
3ページ目のデータを使う
↓
ログ: "フォールバックスキャン: file001.pdf"
コード試行
Configクラスの設定を以下にする
file_slice=slice(None), # 全ファイル
page_slice=slice(0, 1), # 1ページ目のみ
row_slice=slice(1,2),
output_csv='Smart_extracted.csv'
上記のようにして
前回に載せたようにサンプルPDFファイルを配置し実行
VS Code, PyCharmなどのコンソール(ターミナル)に以下
のよう出力されます
1ページ目にテーブルがないためフォールバックの発動
2ページ目のテーブルのデータ1行目が取得できます
ステップ7:行のマージと返却
if header and all_rows_marks:
merged_rows, marks = merge_rows(all_rows_marks) ← ここでmerge_rows()に移動
return header, merged_rows, marks
logger.error(f"テーブルが見つかりません: {pdf_path.name}")
return [], [], []
何をしてる?
➀ データがあればmerge_rows()に移行、分割行があれば結合
➁ ヘッダー、行データ、ページ番号を返す
➂ データがなければ空のリストを返す
🔵 3. merge_rows() - 行のマージ処理
役割
ここは、行の揺れを解決するところです
行の揺れ:
- ヘッダーラベルに沿って入力されてるはずの1行のデータが
- がある列の値が改行されたことによって2行に分割されている
- 空の行がある
等の原因で行が安定してないことを指します
コード全体
def merge_rows(rows_with_marks: list[tuple[list[str], str]]) -> tuple[
list[list[str]], list[str]]:
"""分割された行をマージし、ページマークを保持"""
merged_rows = []
merged_marks = []
pending_mark = None
i = 0
while i < len(rows_with_marks):
row, mark = rows_with_marks[i]
# 空行の処理
if not any(row):
if mark:
pending_mark = mark
i += 1
continue
# 次の行が継続行か確認
if i + 1 < len(rows_with_marks):
next_row, _ = rows_with_marks[i + 1]
if next_row[0] == "" and any(next_row[1:]):
# 継続行をマージ
row = [(a or "") + (b or "") for a, b in zip(row, next_row)]
merged_rows.append(row)
merged_marks.append(mark or pending_mark or "")
pending_mark = None
i += 2
continue
# 前の行への継続の場合
if row[0] == "" and any(row[1:]) and merged_rows:
prev_row = merged_rows.pop()
prev_mark = merged_marks.pop()
row = [(a or "") + (b or "") for a, b in zip(prev_row, row)]
merged_rows.append(row)
merged_marks.append(prev_mark)
i += 1
continue
# 通常の行
merged_rows.append(row)
merged_marks.append(pending_mark or mark or "")
pending_mark = None
i += 1
return merged_rows, merged_marks
ステップ1: whileループで全行を処理
while i < len(rows_with_marks):
row, mark = rows_with_marks[i]
何をしている?
- forループではなくwhileを使う理由:行を飛ばしたり、
2行まとめて処理したりするためfor i in range(...):- for は 勝手に i を 1 ずつ増やす仕組みです
- i を 1 ずつ進める
- スキップしたい行があっても止められない
- 「次の行も一緒に処理したい」などの柔軟な動きができない
🧠つまり i の動きを自分でコントロールできないのです
- 一方、while ループは…
i = 0
while i < len(rows_with_marks):
...
i += 1 # ← 自分で増やす
while は i を自分で増やす
例:
i=1 の行
i=2 の行(結合対象)
この2行をまとめて処理したら、
次に見るべき行は i=3 です。
つまり:
i += 2
❌ for ではできない
for は i を勝手に 1 ずつ増やすので:
- i=1 の次は i=2 になってしまう
- i=2 をスキップできない
- 結合処理が壊れる
row, mark = rows_with_marks[i]
何をさしているか?
例:
rows_with_marks:
(["Medium", "Lib2", "200"], "1p")
row:
["Medium", "Lib2", "200"]
mark:
"1p"
row_with_marksの内容をタプルを無視
してrowとmarkにアンパックできる
ステップ3: 空行の処理
if not any(row):
if mark:
pending_mark = mark
i += 1
continue
何をしている?
- any(row) = 行に何かデータがあるか確認
- 全て空なら、ページマークだけ保存
- 行としては処理される(i が進む)
- しかし merged_rows には追加されない
- not any(row)の場合:
any(["", "", ""]) → False
not any(["", "", ""]) → True
- だから最終表には出てこない
- 次へi += 1
ステップ4: 次の行が継続行か確認
if i + 1 < len(rows_with_marks):
next_row, _ = rows_with_marks[i + 1]
if next_row[0] == "" and any(next_row[1:]):
# 継続行をマージ
row = [(a or "") + (b or "") for a, b in zip(row, next_row)]
merged_rows.append(row)
merged_marks.append(mark or pending_mark or "")
pending_mark = None
i += 2 # 2行分進める
continue
何をしているか?
- 次の行が存在するか確認
- 次の行の1列目が空で、他の列にデータがあるか確認
- そうなら、現在の行と次の行を結合
row = [(a or "") + (b or "") for a, b in zip(row, next_row)]
構文はlist内包表記
① zip(row, next_row)
例:
row = ["Very-", "Lib2", "200"]
next_row = ["High", "", ""]
zipにすると:
("Very-", "High")
("Lib2", "")
("200", "")
② (a or "") + (b or "")
- a が None や "" でも "" に置き換える
- b が None や "" でも "" に置き換える
- 最後に a と b を文字として足す(連結する)
例 :
("Very-", "High") → "Very-" + "High" → "Very-High"
("Lib2", "") → "Lib2" + "" → "Lib2"
("200", "") → "200" + "" → "200"
➂ [... for a, b in zip(...)]
これは 上の処理を列ごとに繰り返して、新しい行を作る。
結果:
["Very-High", "Lib2", "200"]
これが 結合後の 1 行 になります。
図解で例:
rows_with_marks = [
["High", "Lib1", "100"], # i=0
["Very-", "Lib2", "200"], # i=1
["High", "", ""], # i=2 ← これが結合される
["", "", ""], # i=3 ← 空行
["Medium", "Lib3", "150"] # i=4
]
┌──────────┬──────────┬──────────┬
│ Quality │ Library │ Length │ ← ヘッダー
├──────────┼──────────┼──────────│
│ High │ Lib1 │ 100 │ ← i = 0: 通常行
├──────────┼──────────┼──────────│
│ Very- │ Lib2 │ 200 │ ← i = 1: 分割行1; Quality列が改行されている
│ │ │ │ 次の行(i=2)と結合 → i=3へ(i+=2)
├──────────┼──────────┼──────────│
│ High │ │ │
│ │ │ │ ← 分割行2:i=1 の行に吸収されて、
├──────────┼──────────┼──────────│ 単独行としては出てこない
│ │ │ │ ← i = 3: 空行(["", "", ""]) merged_rowsには入らない
├──────────┼──────────┼──────────│
│ Medium │ Lib3 │ 150 │ ← i = 4: 通常行(そのまま出力
└──────────┴──────────┴──────────┴
ステップ5: 前の行への継続の場合
if row[0] == "" and any(row[1:]) and merged_rows:
prev_row = merged_rows.pop()
prev_mark = merged_marks.pop()
row = [(a or "") + (b or "") for a, b in zip(prev_row, row)]
merged_rows.append(row)
merged_marks.append(prev_mark)
i += 1
continue
このPDFファイルではレアなケースですがチェックのため
重要名なのは:
prev_row = merged_rows.pop()
- merged_rowsリストからpop() で前の行を取り出す
- その行と今の行(row)を 合体させる
- 合体した行を もう一度 merged_rows に入れ直す
- pop() は リストの最後の要素を取り出して削除する 関数です。
例:
# 既に処理済み
merged_rows = [["High", "Lib1", "100"]]
# 現在の行
row = ["", "Extra", ""]
↑
1列目が空 → 前の行の続き
# 結合
prev_row = ["High", "Lib1", "100"]
row = ["", "Extra", ""]
↓
["High", "Lib1Extra", "100"]
ステップ6: 通常の行
merged_rows.append(row)
merged_marks.append(pending_mark or mark or "")
pending_mark = None
i += 1
何をしている?
- 分割行でもない、継続行でもない普通の行
- そのまま追加
✨merge_rowsの成果
merge_rows() 結合
merged_rows = [
["High", "Lib1", "100"],
["Very-High", "Lib2", "200"]
]
marks = ["1p", "1p"]
✨extract_tableの成果
呼び出し元のextract_tableに戻り
all_mark_rows(rows_with_marks))をアンパック
したmerged_rows,marksを返す
extract_table() 抽出
header = ["Quality", "Library", "Length"]
all_rows_marks = [
(["High", "Lib1", "100"], "1p"),
(["Very-", "Lib2", "200"], "1p"),
(["High", "", ""], "1p")
]
呼び出し元のconvert_pdfsに戻る
header, rows, marks = extract_table(pdf_file,
config.page_slice, config.row_slice)
- pdf_fileはextract_tableにpdfファイルの手渡し役
- config.page_slice, config.row_sliceはConfigクラスの定義に基づく
ファイルの内容チェック
if not header or not rows:
continue
何をしてるのか?
PDF① → 表あり → header あり、rows あり → 処理続行
PDF② → 表なし → header = [] → スキップ
PDF③ → 表はあるが行が無い → rows = [] → スキップ
PDF④ → 正常 → 処理続行
そしてcreate_df()を呼び出す
df = create_df(header, rows, pdf_file, marks)
一番大切な山場でした
次はシリーズの最期として
に続きます
