0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PdfPlumber 大量pPDFファイル一括処理:行が揺れてるPDF:コード解説_2

0
Last updated at Posted at 2026-02-11

PdfPlumber 大量pPDFファイル一括処理:行が揺れてるPDF:コード解説_1
の続きです

サンプルpdfファイルと前回の内容はこちら

初心者でもわかりやすく解説します
処理フローや構文が読めてる方は、適宜スキップして読んで下さい

📄 PDF処理の詳細フロー図解

PDFファイル
    ↓
【1】convert_pdfs()
    ↓
【2】extract_table() を呼び出し
    ↓
    PDFからテーブル抽出
    ↓
【3】merge_rows() を呼び出し
    ↓
    分割された行を結合
    ↓
【2】extract_table() に戻る
    ↓
【1】convert_pdfs() に戻る
    ↓
結果: アンパックして、header, rows, marksという変数に渡される
   

🔵 1. convert_pdfs() - メイン処理

役割

1つのPDFファイルから表のデータを取り出す

def convert_pdfs(config: Config):
    """PDFファイルをCSVに一括変換"""
    pdf_files = prepare_files(config)
    
    for idx, pdf_file in enumerate(pdf_files, start=1):
        logger.info(f"処理中 {idx}/{len(pdf_files)}: {pdf_file.name}")
        
        try:
            header, rows, marks = extract_table(pdf_file, config.page_slice,
                                                config.row_slice)
            if not header or not rows:
                continue
            
            df = create_df(header, rows, pdf_file, marks)
            append_csv(df, config.output_csv)
        
        except Exception as e:
            logger.error(f"エラー {pdf_file.name}: {e}")
        
        # 30ファイルごとに休憩
        if idx % 30 == 0:
            logger.info("30ファイル処理完了。3秒休憩...")
            time.sleep(3)
ステップバイステップ解説
ステップ1: ファイル準備
pdf_files = prepare_files(config)

何をしてる?

  • prepare_files(config)で処理された
  • 自然数順に並べ替えたPDFファイル群を
  • 作業フォルダに移動する
  • ファイルのリストを返す
    PdfPlumber 大量pPDFファイル一括処理:行が揺れてるPDF:コード解説_1
    で解説
ステップ2: 各PDFを順番に処理
for idx, pdf_file in enumerate(pdf_files, start=1):
    logger.info(f"処理中 {idx}/{len(pdf_files)}: {pdf_file.name}")

何をしてる?

  • enumerate(pdf_files, start=1) でインデックス番号付きループ
  • idx = 1, 2, 3, ...(ファイル番号)
  • pdf_file = 各PDFファイルのパス
  • 返される型をタプル型
# 1回目のループ
idx = 1
pdf_file = Path("file001.pdf")
# ログ: "処理中 1/3: file001.pdf"

# 2回目のループ
idx = 2
pdf_file = Path("file002.pdf")
# ログ: "処理中 2/3: file002.pdf"

図解: enumerate の動き

pdf_files = [file001.pdf, file002.pdf, file003.pdf]
               ↓
enumerate(pdf_files, start=1)
               ↓
(1, file001.pdf)  ← idx=1, pdf_file=file001.pdf
(2, file002.pdf)  ← idx=2, pdf_file=file002.pdf
(3, file003.pdf)  ← idx=3, pdf_file=file003.pdf
ステップ3: 表データを抽出
def convert_pdfs(config: Config):
.......................
......................

try:
    header, rows, marks = extract_table(pdf_file, config.page_slice,
                                        config.row_slice) ← ココでextract_tableに移動
    if not header or not rows:
        continue

🔵 2. extract_table() - 表データ抽出

役割

1つのPDFファイルから表のデータを取り出す

# ===== テーブルデータ抽出 =====
def extract_table(pdf_path: Path, page_slice: slice, row_slice) -> tuple[
    list[str], list[list[str]], list[str]]:
    """PDFからテーブルデータを抽出"""

    with pdfplumber.open(pdf_path) as pdf:

        all_rows_marks = []
        header = None #ヘッダー行を自動認識させずテーブル行として扱う
        found = False

        # 指定ページから抽出
        for i, page in enumerate(pdf.pages[page_slice],
                                 start=page_slice.start or 0):
            table = page.extract_table()
            if not table or len(table) < 2:
                continue

            if header is None:
                header = table[0]

            data_rows = table[1:2]
            if data_rows:
                page_mark = f"{i + 1}p"
                all_rows_marks.extend(
                    (row, page_mark) for row in data_rows)
                found = True
        print(all_rows_marks)

        # フォールバック: 全ページスキャン
        if not found:
            for page_index,page in enumerate(pdf.pages):
                table = page.extract_table()
                if table is None or len(table) < 2:
                    continue
                header = table[0]
                data_rows = table[1:2]
                if not data_rows:
                    continue
                page_mark = f"{page_index + 1}p"
                all_rows_marks.extend((row, page_mark) for row in data_rows)

                found = True


                print(f"⚠️ フォールバック発動: {pdf_path.name}")
                break


    if header and all_rows_marks:
        merged_rows, marks = merge_rows(all_rows_marks)
        return header, merged_rows, marks
    print(f"extract_table結果:{all_rows_marks}")

    logger.error(f"テーブルが見つかりません: {pdf_path.name}")
    return [], [], []

ステップバイステップ解説

ステップ1: PDFを開く

with pdfplumber.open(pdf_path) as pdf:

何をしている?

  • PDFファイルを読み込み専用で開く
  • with 文なので、処理後は自動的に閉じる

ステップ2: 変数の初期化

all_rows_marks = []  # 全ての行とページ番号を保存
header = None        # 表の見出し(まだ不明)
found = False        # 表が見つかったか

各変数の最終形態

# 処理後のイメージ
all_rows_marks = [
    (["High", "Lib1", "100"], "1p"),
    (["Medium", "Lib2", "200"], "1p"),
    (["Low", "Lib3", "300"], "2p")
]

header = ["Quality", "Library", "Length"]

found = True  # 表が見つかった

ステップ3: 指定ページから表を抽出

for i, page in enumerate(pdf.pages[page_slice],
                         start=page_slice.start or 0):
    table = page.extract_table()
    if not table or len(table) < 2:
        continue

何をしている?

  1. enumerateは要素のペアが、tuple(タプル)型(インデックス,page) を順次生成
  2. page_sliceで指定されたページだけを処理
  3. 各ページからextract_table()で表を取得
  4. 表がない、または表の行数が2未満(1行はヘッダー)の場合スキップ(continue)
    図解:ページ のスキャン
PDFファイル(5ページ)
page_slice = slice(0, 3)  # 0, 1, 2ページ目を処理

┌────┬────┬────┬────┬────┐
│ 1p │ 2p │ 3p │ 4p │ 5p │
└────┴────┴────┴────┴────┘
  ↑    ↑    ↑
  処理  処理  処理  スキップ

ステップ4:ヘッダーとデータを分離

if header is None:
    header = table[0]  # 1行目をヘッダーとして保存

data_rows = table[1:][row_slice]  # 2行目以降からデータを取得
  1. 最初に見つかった表の1行
  2. 2ページ以降if header is NoneはFalseとなり
  3. if文ブロックから外れてdata_rows = table[1:][row_slice]を通常実行
  4. 2行目以降からrow_sliceで指定された行を取得
  5. table[1:][row_slice]はまず[1:]を処理その処理範囲で[row_slice]を実行
    例:コード解析
# table(ページから取得した表)
table = [
    ["Quality", "Library", "Length"],  # 0行目(ヘッダー)
    ["High", "Lib1", "100"],           # 1行目
    ["Medium", "Lib2", "200"],         # 2行目
    ["Low", "Lib3", "300"]             # 3行目
]

# row_slice = slice(None)  # 全行取得の場合
header = table[0]  # ["Quality", "Library", "Length"]
data_rows = table[1:]  # 1行目以降全て

# row_slice = slice(0, 2)  # 最初の2行だけの場合
data_rows = table[1:][0:2]  # 1行目と2行目のみ
# [["High", "Lib1", "100"], ["Medium", "Lib2", "200"]]

ステップ5:ページマークを付けて保存

if data_rows:
    page_mark = f"{i + 1}p"  # ページ番号(1p, 2p, ...)
    all_rows_marks.extend(
        (row, page_mark) for row in data_rows)
    found = True

何をしてる?

  1. all_rows_marks.extend(
    (row, page_mark)for row in data_rows):
  • ジェネレータ式です
  • タプルでrow, とpage_markのペアを生成
  1. ページ番号のラベル(ヘッダーのpage_number列)を作る(例:"1p", "2p")
  2. all_rows_marksに追加
  3. append()を使うと、とんでもないことになります
    図解:データの蓄積
1ページ目の処理
data_rows = [["High", "Lib1", "100"], ["Medium", "Lib2", "200"]]
page_mark = "1p"
  ↓
all_rows_marks = [
    (["High", "Lib1", "100"], "1p"), ← extend()でdata_rowsのリスト内
    (["Medium", "Lib2", "200"], "1p")  のリストを一つの要素(row)、もう
                         一つの要素page_markのstr
                                         をペアでall_rows_marksリストに追加していく
                                         append()は塊で追加、このprojectには向かない 
                                         (row, page_mark)の括弧がタプルで返すあらわれ   
]
2ページ目の処理
data_rows = [["Low", "Lib3", "300"]]
page_mark = "2p"
  ↓
all_rows_marks = [
    (["High", "Lib1", "100"], "1p"),
    (["Medium", "Lib2", "200"], "1p"),
    (["Low", "Lib3", "300"], "2p")  ← 追加
]

ステップ6:フォールバック関数

if not found:
    for page_index, page in enumerate(pdf.pages):
        table = page.extract_table()
        if table and len(table) < 2:
            continue

何をしてる?

  • 指定ページで表が見つからなっかった場合の
  • 全ページ最初から探します
  • 最初に見つかった表を使う
    図解:フォールバックの流れ
指定ページ(0-2p)で表が見つからない
  ↓
全ページ(1-5p)をスキャン
  ↓
3ページ目で表を発見
  ↓
3ページ目のデータを使う
  ↓
ログ: "フォールバックスキャン: file001.pdf"

コード試行

Configクラスの設定を以下にする

file_slice=slice(None),  # 全ファイル
        page_slice=slice(0, 1),  # 1ページ目のみ
        row_slice=slice(1,2),  
        output_csv='Smart_extracted.csv'

上記のようにして
前回に載せたようにサンプルPDFファイルを配置し実行
VS Code, PyCharmなどのコンソール(ターミナル)に以下
のよう出力されます

バッファ_hatudou.PNG

1ページ目にテーブルがないためフォールバックの発動
2ページ目のテーブルのデータ1行目が取得できます

ステップ7:行のマージと返却

if header and all_rows_marks:
    merged_rows, marks = merge_rows(all_rows_marks)  ここでmerge_rows()に移動
    return header, merged_rows, marks

logger.error(f"テーブルが見つかりません: {pdf_path.name}")
return [], [], []

何をしてる?
➀ データがあればmerge_rows()に移行、分割行があれば結合
➁ ヘッダー、行データ、ページ番号を返す
➂ データがなければ空のリストを返す

🔵 3. merge_rows() - 行のマージ処理

役割

ここは、行の揺れを解決するところです
行の揺れ:

  • ヘッダーラベルに沿って入力されてるはずの1行のデータが
  • がある列の値が改行されたことによって2行に分割されている
  • 空の行がある
    等の原因で行が安定してないことを指します

コード全体

def merge_rows(rows_with_marks: list[tuple[list[str], str]]) -> tuple[
    list[list[str]], list[str]]:
    """分割された行をマージし、ページマークを保持"""
    merged_rows = []
    merged_marks = []
    pending_mark = None
    i = 0

    while i < len(rows_with_marks):
        row, mark = rows_with_marks[i]

        # 空行の処理
        if not any(row):
            if mark:
                pending_mark = mark
            i += 1
            continue

        # 次の行が継続行か確認
        if i + 1 < len(rows_with_marks):
            next_row, _ = rows_with_marks[i + 1]
            if next_row[0] == "" and any(next_row[1:]):
                # 継続行をマージ
                row = [(a or "") + (b or "") for a, b in zip(row, next_row)]
                merged_rows.append(row)
                merged_marks.append(mark or pending_mark or "")
                pending_mark = None
                i += 2
                continue

        # 前の行への継続の場合
        if row[0] == "" and any(row[1:]) and merged_rows:
            prev_row = merged_rows.pop()
            prev_mark = merged_marks.pop()
            row = [(a or "") + (b or "") for a, b in zip(prev_row, row)]
            merged_rows.append(row)
            merged_marks.append(prev_mark)
            i += 1
            continue

        # 通常の行
        merged_rows.append(row)
        merged_marks.append(pending_mark or mark or "")
        pending_mark = None
        i += 1

    return merged_rows, merged_marks

ステップ1: whileループで全行を処理

while i < len(rows_with_marks):
    row, mark = rows_with_marks[i]

何をしている?

  • forループではなくwhileを使う理由:行を飛ばしたり、
    2行まとめて処理したりするため
      for i in range(...):
    
    • for は 勝手に i を 1 ずつ増やす仕組みです
    • i を 1 ずつ進める
    • スキップしたい行があっても止められない
    • 「次の行も一緒に処理したい」などの柔軟な動きができない
🧠つまり i の動きを自分でコントロールできないのです
  • 一方、while ループは…
i = 0
while i < len(rows_with_marks):
    ...
    i += 1  # ← 自分で増やす

while は i を自分で増やす
例:

i=1 の行
i=2 の行(結合対象)

この2行をまとめて処理したら、
次に見るべき行は i=3 です。
つまり:

i += 2
❌ for ではできない

for は i を勝手に 1 ずつ増やすので:

  • i=1 の次は i=2 になってしまう
  • i=2 をスキップできない
  • 結合処理が壊れる
row, mark = rows_with_marks[i]

何をさしているか?

例:
rows_with_marks:
 (["Medium", "Lib2", "200"], "1p") 
 row:
 ["Medium", "Lib2", "200"]
 mark:
"1p"

row_with_marksの内容をタプルを無視
してrowとmarkにアンパックできる

ステップ3: 空行の処理

if not any(row):
    if mark:
        pending_mark = mark
    i += 1
    continue

何をしている?

  1. any(row) = 行に何かデータがあるか確認
  2. 全て空なら、ページマークだけ保存
  • 行としては処理される(i が進む)
  • しかし merged_rows には追加されない
  • not any(row)の場合:
  any(["", "", ""]) → False
  not any(["", "", ""]) → True
  • だから最終表には出てこない
  • 次へi += 1
    ステップ4: 次の行が継続行か確認
if i + 1 < len(rows_with_marks):
    next_row, _ = rows_with_marks[i + 1]
    if next_row[0] == "" and any(next_row[1:]):
        # 継続行をマージ
        row = [(a or "") + (b or "") for a, b in zip(row, next_row)]
        merged_rows.append(row)
        merged_marks.append(mark or pending_mark or "")
        pending_mark = None
        i += 2  # 2行分進める
        continue

何をしているか?

  1. 次の行が存在するか確認
  2. 次の行の1列目が空で、他の列にデータがあるか確認
  3. そうなら、現在の行と次の行を結合
 row = [(a or "") + (b or "") for a, b in zip(row, next_row)]

構文はlist内包表記

① zip(row, next_row)

例:

row       = ["Very-", "Lib2", "200"]
next_row  = ["High",  "",     ""]

zipにすると:

("Very-", "High")
("Lib2",  "")
("200",   "")
② (a or "") + (b or "")
  • a が None や "" でも "" に置き換える
  • b が None や "" でも "" に置き換える
  • 最後に a と b を文字として足す(連結する)
    例 :
("Very-", "High") → "Very-" + "High" → "Very-High"
("Lib2",  "")     → "Lib2" + ""      → "Lib2"
("200",   "")     → "200" + ""       → "200"
➂ [... for a, b in zip(...)]

これは 上の処理を列ごとに繰り返して、新しい行を作る。
結果:

["Very-High", "Lib2", "200"]

これが 結合後の 1 行 になります。
図解で例:

rows_with_marks = [
    ["High", "Lib1", "100"],      # i=0
    ["Very-", "Lib2", "200"],     # i=1
    ["High", "", ""],               # i=2 ← これが結合される
    ["", "", ""],                 # i=3 ← 空行
    ["Medium", "Lib3", "150"]     # i=4
]
┌──────────┬──────────┬──────────┬
│ Quality  │ Library  │ Length   │ ← ヘッダー
├──────────┼──────────┼──────────│
│ High     │ Lib1     │ 100      │ ← i = 0: 通常行
├──────────┼──────────┼──────────│
│ Very-    │ Lib2     │ 200      │ ← i = 1: 分割行1; Quality列が改行されている
│        │          │          │        次の行(i=2)と結合 → i=3へ(i+=2)
├──────────┼──────────┼──────────│         
│ High     │          │          │         
│          │          │          │ ← 分割行2:i=1 の行に吸収されて、
├──────────┼──────────┼──────────│    単独行としては出てこない
│          │          │          │ ← i = 3: 空行(["", "", ""]) merged_rowsには入らない
├──────────┼──────────┼──────────│         
│ Medium   │ Lib3     │ 150      │ ← i = 4: 通常行(そのまま出力
└──────────┴──────────┴──────────┴


ステップ5: 前の行への継続の場合

if row[0] == "" and any(row[1:]) and merged_rows:
    prev_row = merged_rows.pop()
    prev_mark = merged_marks.pop()
    row = [(a or "") + (b or "") for a, b in zip(prev_row, row)]
    merged_rows.append(row)
    merged_marks.append(prev_mark)
    i += 1
    continue

このPDFファイルではレアなケースですがチェックのため

重要名なのは:
prev_row = merged_rows.pop()

  • merged_rowsリストからpop() で前の行を取り出す
  • その行と今の行(row)を 合体させる
  • 合体した行を もう一度 merged_rows に入れ直す
  • pop() は リストの最後の要素を取り出して削除する 関数です。
    例:
# 既に処理済み
merged_rows = [["High", "Lib1", "100"]]

# 現在の行
row = ["", "Extra", ""]
      ↑
   1列目が空 → 前の行の続き

# 結合
prev_row = ["High", "Lib1", "100"]
row = ["", "Extra", ""]
  ↓
["High", "Lib1Extra", "100"]

ステップ6: 通常の行

merged_rows.append(row)
merged_marks.append(pending_mark or mark or "")
pending_mark = None
i += 1

何をしている?

  • 分割行でもない、継続行でもない普通の行
  • そのまま追加
✨merge_rowsの成果
merge_rows() 結合
merged_rows = [
    ["High", "Lib1", "100"],
    ["Very-High", "Lib2", "200"]
]
marks = ["1p", "1p"]
✨extract_tableの成果

呼び出し元のextract_tableに戻り
all_mark_rows(rows_with_marks))をアンパック
したmerged_rows,marksを返す

extract_table() 抽出
header = ["Quality", "Library", "Length"]
all_rows_marks = [
    (["High", "Lib1", "100"], "1p"),
    (["Very-", "Lib2", "200"], "1p"),
    (["High", "", ""], "1p")
]
呼び出し元のconvert_pdfsに戻る
header, rows, marks = extract_table(pdf_file, 
             config.page_slice, config.row_slice)
                                               
  • pdf_fileはextract_tableにpdfファイルの手渡し役
  • config.page_slice, config.row_sliceはConfigクラスの定義に基づく

ファイルの内容チェック

if not header or not rows:
    continue

何をしてるのか?

PDF① → 表あり → header あり、rows あり → 処理続行
PDF② → 表なし → header = [] → スキップ
PDF③ → 表はあるが行が無い → rows = [] → スキップ
PDF④ → 正常 → 処理続行

そしてcreate_df()を呼び出す

df = create_df(header, rows, pdf_file, marks)
一番大切な山場でした

次はシリーズの最期として

に続きます

     

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?