0
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Markdown変換ライブラリ:MarkItDownを使う

Posted at

はじめに

マイクロソフトが開発した MarkItDownは、PDFだけじゃなく、Word、Excel、PowerPointなどの多様なファイル形式をMarkdownに変換し、テキスト分析やインデックス作成を容易できます。

中身を見ていきましょう。

内容

構成

DocumentConverter            # 全てのDocumentConverterの抽象基底クラス
├── PlainTextConverter       # text/*形式のプレーンテキストファイルを処理
├── HtmlConverter            # HTML形式のファイルをMarkdownに変換。スタイル情報とテーブルを保持
│ ├── DocxConverter          # Wordドキュメントをスタイル情報を保持しながらMarkdownに変換
│ ├── XlsxConverter          # Excelファイルの各シートを個別のMarkdownテーブルとして変換
│ └── PptxConverter          # PowerPointファイルを見出し、テーブル、画像付きでMarkdownに変換
├── WikipediaConverter       # Wikipediaページのメインコンテンツを抽出してMarkdown形式に整形
├── YouTubeConverter         # YouTube動画のタイトル、説明、メタデータ、文字起こしを抽出
├── BingSerpConverter        # Bing検索結果ページからオーガニック検索結果を抽出
├── PdfConverter             # PDFファイルをプレーンテキストとしてMarkdownに変換
└── MediaConverter           # マルチメディアファイルの基底クラス。メタデータ抽出機能を提供
    ├── WavConverter         # WAVファイルのメタデータと音声文字起こしを処理
    │ └── Mp3Converter       # WavConverterを継承し、MP3ファイルを処理。音声文字起こしも可能
    └── ImageConverter       # 画像ファイルのメタデータ、OCR、マルチモーダルLLMによる説明を処理

使用されているライブラリ

表形式で使用されているPythonライブラリをまとめました:

ライブラリ名 主な用途
base64 バイナリデータの文字列エンコーディング/デコーディング
binascii バイナリと ASCII 変換
html HTML エスケープ/アンエスケープ
json JSON データの処理
mimetypes MIME タイプの判定
os OS 依存の機能へのインターフェース
re 正規表現を使用したテキスト処理
shutil 高水準のファイル操作
subprocess サブプロセスの実行
tempfile 一時ファイルやディレクトリの作成
urllib.parse URL の解析と操作
mammoth Word文書(.docx)からHTML/テキスト変換
markdownify HTMLからMarkdownへの変換
pandas データ分析・操作
pdfminer PDFファイルの解析とテキスト抽出
pptx PowerPointファイルの操作
puremagic ファイル形式の検出
requests HTTPリクエストの処理
BeautifulSoup HTMLとXMLの解析
pydub オーディオファイルの操作(オプション)
speech_recognition 音声認識(オプション)

コード(コメント追記)


import base64
import binascii
import copy
import html
import json
import mimetypes
import os
import re
import shutil
import subprocess
import sys
import tempfile
import traceback
from typing import Any, Dict, List, Optional, Union
from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse

import mammoth
import markdownify
import pandas as pd
import pdfminer
import pdfminer.high_level
import pptx

# ファイル形式の検出
import puremagic
import requests
from bs4 import BeautifulSoup

# オプションの文字起こしサポート
try:
    import pydub
    import speech_recognition as sr

    IS_AUDIO_TRANSCRIPTION_CAPABLE = True
except ModuleNotFoundError:
    pass

# オプションのYouTube文字起こしサポート
try:
    from youtube_transcript_api import YouTubeTranscriptApi

    IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
except ModuleNotFoundError:
    pass


class _CustomMarkdownify(markdownify.MarkdownConverter):
    """
    markdownifyのMarkdownConverterのカスタムバージョン。主な変更点:

    - デフォルトの見出しスタイルを '#''##' などに変更
    - JavaScriptハイパーリンクの削除
    - 大きなdata:URI形式の画像ソースの切り詰め
    - URIの適切なエスケープ処理とMarkdown構文との競合回避
    """

    def __init__(self, **options: Any):
        # デフォルトの見出しスタイルを設定
        options["heading_style"] = options.get("heading_style", markdownify.ATX)
        # 必要に応じてオプションを明示的に型変換
        super().__init__(**options)

    def convert_hn(self, n: int, el: Any, text: str, convert_as_inline: bool) -> str:
        """通常の変換に加えて、新しい行で始まることを確認"""
        if not convert_as_inline:
            if not re.search(r"^\n", text):
                return "\n" + super().convert_hn(n, el, text, convert_as_inline)  

        return super().convert_hn(n, el, text, convert_as_inline)  

    def convert_a(self, el: Any, text: str, convert_as_inline: bool):
        """通常のコンバーターと同様だが、JavaScriptリンクを削除しURIをエスケープ"""
        prefix, suffix, text = markdownify.chomp(text)  
        if not text:
            return ""
        href = el.get("href")
        title = el.get("title")

        # URIをエスケープし、http/https/file以外のスキームをスキップ
        if href:
            try:
                parsed_url = urlparse(href)  
                if parsed_url.scheme and parsed_url.scheme.lower() not in ["http", "https", "file"]:  
                    return "%s%s%s" % (prefix, text, suffix)
                href = urlunparse(parsed_url._replace(path=quote(unquote(parsed_url.path))))  
            except ValueError:  
                return "%s%s%s" % (prefix, text, suffix)

        # #29の置換: テキストノードのアンダースコアをエスケープ
        if (
            self.options["autolinks"]
            and text.replace(r"\_", "_") == href
            and not title
            and not self.options["default_title"]
        ):
            # ショートカット構文
            return "<%s>" % href
        if self.options["default_title"] and not title:
            title = href
        title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
        return (
            "%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix)
            if href
            else text
        )

    def convert_img(self, el: Any, text: str, convert_as_inline: bool) -> str:
        """通常のコンバーターと同様だが、data URIを削除"""

        alt = el.attrs.get("alt", None) or ""
        src = el.attrs.get("src", None) or ""
        title = el.attrs.get("title", None) or ""
        title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
        if (
            convert_as_inline
            and el.parent.name not in self.options["keep_inline_images_in"]
        ):
            return alt

        # dataURIを削除
        if src.startswith("data:"):
            src = src.split(",")[0] + "..."

        return "![%s](%s%s)" % (alt, src, title_part)

    def convert_soup(self, soup: Any) -> str:
        return super().convert_soup(soup)  


class DocumentConverterResult:
    """ドキュメントをテキストに変換した結果を格納するクラス。"""

    def __init__(self, title: Union[str, None] = None, text_content: str = ""):
        self.title: Union[str, None] = title
        self.text_content: str = text_content


class DocumentConverter:
    """全てのDocumentConverterの抽象基底クラス。"""

    def convert(
        self, local_path: str, **kwargs: Any
    ) -> Union[None, DocumentConverterResult]:
        raise NotImplementedError()


class PlainTextConverter(DocumentConverter):
    """content typeがtext/plainのファイルを処理するコンバーター"""

    def convert(
        self, local_path: str, **kwargs: Any
    ) -> Union[None, DocumentConverterResult]:
        # ファイル拡張子からコンテンツタイプを推測
        content_type, _ = mimetypes.guess_type(
            "__placeholder" + kwargs.get("file_extension", "")
        )

        # テキストファイルのみを受け入れる
        if content_type is None:
            return None
        elif "text/" not in content_type.lower():
            return None

        text_content = ""
        with open(local_path, "rt", encoding="utf-8") as fh:
            text_content = fh.read()
        return DocumentConverterResult(
            title=None,
            text_content=text_content,
        )


class HtmlConverter(DocumentConverter):
    """content typeがtext/htmlのファイルを処理するコンバーター"""

    def convert(
        self, local_path: str, **kwargs: Any
    ) -> Union[None, DocumentConverterResult]:
        # HTMLでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() not in [".html", ".htm"]:
            return None

        result = None
        with open(local_path, "rt", encoding="utf-8") as fh:
            result = self._convert(fh.read())

        return result

    def _convert(self, html_content: str) -> Union[None, DocumentConverterResult]:
        """HTMLテキストを変換するヘルパー関数"""

        # 文字列をパース
        soup = BeautifulSoup(html_content, "html.parser")

        # JavaScriptとstyleブロックを削除
        for script in soup(["script", "style"]):
            script.extract()

        # メインコンテンツのみ出力
        body_elm = soup.find("body")
        webpage_text = ""
        if body_elm:
            webpage_text = _CustomMarkdownify().convert_soup(body_elm)
        else:
            webpage_text = _CustomMarkdownify().convert_soup(soup)

        assert isinstance(webpage_text, str)

        return DocumentConverterResult(
            title=None if soup.title is None else soup.title.string,
            text_content=webpage_text,
        )


class WikipediaConverter(DocumentConverter):
    """Wikipediaページを個別に処理し、メインコンテンツのみに焦点を当てる"""

    def convert(
        self, local_path: str, **kwargs: Any
    ) -> Union[None, DocumentConverterResult]:
        # Wikipediaでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() not in [".html", ".htm"]:
            return None
        url = kwargs.get("url", "")
        if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url):
            return None

        # ファイルをパース
        soup = None
        with open(local_path, "rt", encoding="utf-8") as fh:
            soup = BeautifulSoup(fh.read(), "html.parser")

        # JavaScriptとstyleブロックを削除
        for script in soup(["script", "style"]):
            script.extract()

        # メインコンテンツのみ出力
        body_elm = soup.find("div", {"id": "mw-content-text"})
        title_elm = soup.find("span", {"class": "mw-page-title-main"})

        webpage_text = ""
        main_title = None if soup.title is None else soup.title.string

        if body_elm:
            # タイトルの取得
            if title_elm and len(title_elm) > 0:
                main_title = title_elm.string  
                assert isinstance(main_title, str)

            # ページを変換
            webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup(
                body_elm
            )
        else:
            webpage_text = _CustomMarkdownify().convert_soup(soup)

        return DocumentConverterResult(
            title=main_title,
            text_content=webpage_text,
        )


class YouTubeConverter(DocumentConverter):
    """YouTubeを特別に処理し、動画のタイトル、説明、トランスクリプトに焦点を当てる"""

    def convert(
        self, local_path: str, **kwargs: Any
    ) -> Union[None, DocumentConverterResult]:
        # YouTubeでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() not in [".html", ".htm"]:
            return None
        url = kwargs.get("url", "")
        if not url.startswith("https://www.youtube.com/watch?"):
            return None

        # ファイルをパース
        soup = None
        with open(local_path, "rt", encoding="utf-8") as fh:
            soup = BeautifulSoup(fh.read(), "html.parser")

        # メタタグを読み込む
        assert soup.title is not None and soup.title.string is not None
        metadata: Dict[str, str] = {"title": soup.title.string}
        for meta in soup(["meta"]):
            for a in meta.attrs:
                if a in ["itemprop", "property", "name"]:
                    metadata[meta[a]] = meta.get("content", "")
                    break

        # 完全な説明の読み込みを試みる(ページ実装に依存するため、より壊れやすい)
        try:
            for script in soup(["script"]):
                content = script.text
                if "ytInitialData" in content:
                    lines = re.split(r"\r?\n", content)
                    obj_start = lines[0].find("{")
                    obj_end = lines[0].rfind("}")
                    if obj_start >= 0 and obj_end >= 0:
                        data = json.loads(lines[0][obj_start : obj_end + 1])
                        attrdesc = self._findKey(data, "attributedDescriptionBodyText")  
                        if attrdesc:
                            metadata["description"] = str(attrdesc["content"])
                    break
        except Exception:
            pass

        # ページの準備を開始
        webpage_text = "# YouTube\n"

        title = self._get(metadata, ["title", "og:title", "name"])  
        assert isinstance(title, str)

        if title:
            webpage_text += f"\n## {title}\n"

        stats = ""
        views = self._get(metadata, ["interactionCount"])  
        if views:
            stats += f"- **再生回数:** {views}\n"

        keywords = self._get(metadata, ["keywords"])  
        if keywords:
            stats += f"- **キーワード:** {keywords}\n"

        runtime = self._get(metadata, ["duration"])  
        if runtime:
            stats += f"- **動画時間:** {runtime}\n"

        if len(stats) > 0:
            webpage_text += f"\n### 動画メタデータ\n{stats}\n"

        description = self._get(metadata, ["description", "og:description"])  
        if description:
            webpage_text += f"\n### 説明\n{description}\n"

        if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
            transcript_text = ""
            parsed_url = urlparse(url)  
            params = parse_qs(parsed_url.query)  
            if "v" in params:
                assert isinstance(params["v"][0], str)
                video_id = str(params["v"][0])
                try:
                    # 単一の文字起こしである必要がある
                    transcript = YouTubeTranscriptApi.get_transcript(video_id)  
                    transcript_text = " ".join([part["text"] for part in transcript])  
                except Exception:
                    pass
            if transcript_text:
                webpage_text += f"\n### 文字起こし\n{transcript_text}\n"

        title = title if title else soup.title.string
        assert isinstance(title, str)

        return DocumentConverterResult(
            title=title,
            text_content=webpage_text,
        )

    def _get(
        self,
        metadata: Dict[str, str],
        keys: List[str],
        default: Union[str, None] = None,
    ) -> Union[str, None]:
        for k in keys:
            if k in metadata:
                return metadata[k]
        return default

    def _findKey(self, json: Any, key: str) -> Union[str, None]:  
        if isinstance(json, list):
            for elm in json:
                ret = self._findKey(elm, key)
                if ret is not None:
                    return ret
        elif isinstance(json, dict):
            for k in json:
                if k == key:
                    return json[k]
                else:
                    ret = self._findKey(json[k], key)
                    if ret is not None:
                        return ret
        return None


class BingSerpConverter(DocumentConverter):
    """
    Bingの検索結果ページを処理する(オーガニック検索結果のみ)
    注: Bing APIを使用する方が望ましい
    """

    def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
        # Bing SERPでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() not in [".html", ".htm"]:
            return None
        url = kwargs.get("url", "")
        if not re.search(r"^https://www\.bing\.com/search\?q=", url):
            return None

        # クエリパラメータを解析
        parsed_params = parse_qs(urlparse(url).query)
        query = parsed_params.get("q", [""])[0]

        # ファイルを解析
        soup = None
        with open(local_path, "rt", encoding="utf-8") as fh:
            soup = BeautifulSoup(fh.read(), "html.parser")

        # フォーマットを整理
        for tptt in soup.find_all(class_="tptt"):
            if hasattr(tptt, "string") and tptt.string:
                tptt.string += " "
        for slug in soup.find_all(class_="algoSlug_icon"):
            slug.extract()

        # アルゴリズム的な結果を解析
        _markdownify = _CustomMarkdownify()
        results = list()
        for result in soup.find_all(class_="b_algo"):
            # リダイレクトURLを書き換え
            for a in result.find_all("a", href=True):
                parsed_href = urlparse(a["href"])
                qs = parse_qs(parsed_href.query)

                # 目的のURLはuパラメータに含まれているが、
                # base64エンコードされており、プレフィックスが付いている
                if "u" in qs:
                    u = (
                        qs["u"][0][2:].strip() + "=="
                    )  # Python 3は余分なパディングを気にしない

                    try:
                        # RFC 4648 / Base64URLバリアント("-"と"_"を使用)
                        a["href"] = base64.b64decode(u, altchars="-_").decode("utf-8")
                    except UnicodeDecodeError:
                        pass
                    except binascii.Error:
                        pass

            # Markdownに変換
            md_result = _markdownify.convert_soup(result).strip()
            lines = [line.strip() for line in re.split(r"\n+", md_result)]
            results.append("\n".join([line for line in lines if len(line) > 0]))

        webpage_text = (
            f"## '{query}'のBing検索結果:\n\n"
            + "\n\n".join(results)
        )

        return DocumentConverterResult(
            title=None if soup.title is None else soup.title.string,
            text_content=webpage_text,
        )


class PdfConverter(DocumentConverter):
    """
    PDFをMarkdownに変換。ほとんどのスタイル情報は無視され、本質的にプレーンテキストとなる。
    """

    def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
        # PDFでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() != ".pdf":
            return None

        return DocumentConverterResult(
            title=None,
            text_content=pdfminer.high_level.extract_text(local_path),
        )


class DocxConverter(HtmlConverter):
    """
    DOCXファイルをMarkdownに変換。可能な限りスタイル情報(見出しなど)とテーブルは保持される。
    """

    def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
        # DOCXでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() != ".docx":
            return None

        result = None
        with open(local_path, "rb") as docx_file:
            result = mammoth.convert_to_html(docx_file)
            html_content = result.value
            result = self._convert(html_content)

        return result


class XlsxConverter(HtmlConverter):
    """
    XLSXファイルをMarkdownに変換し、各シートを別々のMarkdownテーブルとして表示。
    """

    def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
        # XLSXでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() != ".xlsx":
            return None

        sheets = pd.read_excel(local_path, sheet_name=None)
        md_content = ""
        for s in sheets:
            md_content += f"## {s}\n"
            html_content = sheets[s].to_html(index=False)
            md_content += self._convert(html_content).text_content.strip() + "\n\n"

        return DocumentConverterResult(
            title=None,
            text_content=md_content.strip(),
        )


class PptxConverter(HtmlConverter):
    """
    PPTXファイルをMarkdownに変換。見出し、テーブル、代替テキスト付き画像をサポート。
    """

    def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
        # PPTXでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() != ".pptx":
            return None

        md_content = ""

        presentation = pptx.Presentation(local_path)
        slide_num = 0
        for slide in presentation.slides:
            slide_num += 1

            md_content += f"\n\n<!-- スライド番号: {slide_num} -->\n"

            title = slide.shapes.title
            for shape in slide.shapes:
                # 画像
                if self._is_picture(shape):
                    # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069
                    alt_text = ""
                    try:
                        alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "")
                    except Exception:
                        pass

                    # プレースホルダー名
                    filename = re.sub(r"\W", "", shape.name) + ".jpg"
                    md_content += (
                        "\n!["
                        + (alt_text if alt_text else shape.name)
                        + "]("
                        + filename
                        + ")\n"
                    )

                # テーブル
                if self._is_table(shape):
                    html_table = "<html><body><table>"
                    first_row = True
                    for row in shape.table.rows:
                        html_table += "<tr>"
                        for cell in row.cells:
                            if first_row:
                                html_table += "<th>" + html.escape(cell.text) + "</th>"
                            else:
                                html_table += "<td>" + html.escape(cell.text) + "</td>"
                        html_table += "</tr>"
                        first_row = False
                    html_table += "</table></body></html>"
                    md_content += (
                        "\n" + self._convert(html_table).text_content.strip() + "\n"
                    )

                # テキストエリア
                elif shape.has_text_frame:
                    if shape == title:
                        md_content += "# " + shape.text.lstrip() + "\n"
                    else:
                        md_content += shape.text + "\n"

            md_content = md_content.strip()

            if slide.has_notes_slide:
                md_content += "\n\n### ノート:\n"
                notes_frame = slide.notes_slide.notes_text_frame
                if notes_frame is not None:
                    md_content += notes_frame.text
                md_content = md_content.strip()

        return DocumentConverterResult(
            title=None,
            text_content=md_content.strip(),
        )

    def _is_picture(self, shape):
        # 画像かどうかを判定
        if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE:
            return True
        if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER:
            if hasattr(shape, "image"):
                return True
        return False

    def _is_table(self, shape):
        # テーブルかどうかを判定
        if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE:
            return True
        return False


class MediaConverter(DocumentConverter):
    """
    マルチモーダルメディア(画像や音声など)のための抽象クラス
    """

    def _get_metadata(self, local_path):
        # exiftoolが利用可能か確認
        exiftool = shutil.which("exiftool")
        if not exiftool:
            return None
        else:
            try:
                # メタデータをJSON形式で取得
                result = subprocess.run(
                    [exiftool, "-json", local_path], capture_output=True, text=True
                ).stdout
                return json.loads(result)[0]
            except Exception:
                return None


class WavConverter(MediaConverter):
    """
    WAVファイルをMarkdownに変換。exiftoolがインストールされている場合はメタデータを、
    speech_recognitionがインストールされている場合は音声の文字起こしを抽出。
    """

    def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
        # WAVでない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() != ".wav":
            return None

        md_content = ""

        # メタデータを追加
        metadata = self._get_metadata(local_path)
        if metadata:
            for f in [
                "Title",
                "Artist",
                "Author",
                "Band",
                "Album",
                "Genre",
                "Track",
                "DateTimeOriginal",
                "CreateDate",
                "Duration",
            ]:
                if f in metadata:
                    md_content += f"{f}: {metadata[f]}\n"

        # 文字起こし
        if IS_AUDIO_TRANSCRIPTION_CAPABLE:
            try:
                transcript = self._transcribe_audio(local_path)
                md_content += "\n\n### 音声の文字起こし:\n" + (
                    "[音声が検出されませんでした]" if transcript == "" else transcript
                )
            except Exception:
                md_content += (
                    "\n\n### 音声の文字起こし:\nエラー。この音声を文字起こしできませんでした。"
                )

        return DocumentConverterResult(
            title=None,
            text_content=md_content.strip(),
        )

    def _transcribe_audio(self, local_path) -> str:
        # Google Speech RecognitionのAPIを使用して音声を文字起こし
        recognizer = sr.Recognizer()
        with sr.AudioFile(local_path) as source:
            audio = recognizer.record(source)
            return recognizer.recognize_google(audio).strip()


class Mp3Converter(WavConverter):
    """
    MP3ファイルをMarkdownに変換。exiftoolがインストールされている場合はメタデータを、
    speech_recognitionとpydubがインストールされている場合は音声の文字起こしを抽出。
    """

    def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
        # MP3でない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() != ".mp3":
            return None

        md_content = ""

        # メタデータを追加
        metadata = self._get_metadata(local_path)
        if metadata:
            for f in [
                "Title",
                "Artist",
                "Author",
                "Band",
                "Album",
                "Genre",
                "Track",
                "DateTimeOriginal",
                "CreateDate",
                "Duration",
            ]:
                if f in metadata:
                    md_content += f"{f}: {metadata[f]}\n"

        # 文字起こし
        if IS_AUDIO_TRANSCRIPTION_CAPABLE:
            handle, temp_path = tempfile.mkstemp(suffix=".wav")
            os.close(handle)
            try:
                # MP3をWAVに変換
                sound = pydub.AudioSegment.from_mp3(local_path)
                sound.export(temp_path, format="wav")

                _args = dict()
                _args.update(kwargs)
                _args["file_extension"] = ".wav"

                try:
                    transcript = super()._transcribe_audio(temp_path).strip()
                    md_content += "\n\n### 音声の文字起こし:\n" + (
                        "[音声が検出されませんでした]" if transcript == "" else transcript
                    )
                except Exception:
                    md_content += "\n\n### 音声の文字起こし:\nエラー。この音声を文字起こしできませんでした。"

            finally:
                # 一時ファイルを削除
                os.unlink(temp_path)

        return DocumentConverterResult(
            title=None,
            text_content=md_content.strip(),
        )


class ImageConverter(MediaConverter):
    """
    画像をMarkdownに変換。exiftoolがインストールされている場合はメタデータを、
    easyocrがインストールされている場合はOCRを、マルチモーダルLLMが設定されている場合は
    画像の説明を抽出。
    """

    def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
        # 画像でない場合は処理しない
        extension = kwargs.get("file_extension", "")
        if extension.lower() not in [".jpg", ".jpeg", ".png"]:
            return None

        md_content = ""

        # メタデータを追加
        metadata = self._get_metadata(local_path)
        if metadata:
            for f in [
                "ImageSize",
                "Title",
                "Caption",
                "Description",
                "Keywords",
                "Artist",
                "Author",
                "DateTimeOriginal",
                "CreateDate",
                "GPSPosition",
            ]:
                if f in metadata:
                    md_content += f"{f}: {metadata[f]}\n"

        # GPTVで画像を説明
        mlm_client = kwargs.get("mlm_client")
        mlm_model = kwargs.get("mlm_model")
        if mlm_client is not None and mlm_model is not None:
            md_content += (
                "\n# 説明:\n"
                + self._get_mlm_description(
                    local_path,
                    extension,
                    mlm_client,
                    mlm_model,
                    prompt=kwargs.get("mlm_prompt"),
                ).strip()
                + "\n"
            )

        return DocumentConverterResult(
            title=None,
            text_content=md_content,
        )

    def _get_mlm_description(self, local_path, extension, client, model, prompt=None):
        # プロンプトが指定されていない場合はデフォルトを使用
        if prompt is None or prompt.strip() == "":
            prompt = "この画像の詳細な説明を書いてください。"

        sys.stderr.write(f"MLM プロンプト:\n{prompt}\n")

        # 画像をbase64エンコード
        data_uri = ""
        with open(local_path, "rb") as image_file:
            content_type, encoding = mimetypes.guess_type("_dummy" + extension)
            if content_type is None:
                content_type = "image/jpeg"
            image_base64 = base64.b64encode(image_file.read()).decode("utf-8")
            data_uri = f"data:{content_type};base64,{image_base64}"

        messages = [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": prompt},
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": data_uri,
                        },
                    },
                ],
            }
        ]

        response = client.chat.completions.create(model=model, messages=messages)
        return response.choices[0].message.content


class FileConversionException(BaseException):
    """ファイル変換時のエラーを表すクラス"""
    pass


class UnsupportedFormatException(BaseException):
    """サポートされていないファイル形式のエラーを表すクラス"""
    pass


class MarkItDown:
    """
    (プレビュー段階) LLM利用に適した、非常にシンプルなテキストベースのドキュメントリーダー。
    一般的なファイル形式やWebページをMarkdownに変換します。
    """

    def __init__(
        self,
        requests_session: Optional[requests.Session] = None,
        mlm_client: Optional[Any] = None,
        mlm_model: Optional[Any] = None,
    ):
        if requests_session is None:
            self._requests_session = requests.Session()
        else:
            self._requests_session = requests_session

        self._mlm_client = mlm_client
        self._mlm_model = mlm_model

        self._page_converters: List[DocumentConverter] = []

        # 成功するブラウジング操作のためのコンバーターを登録
        # 後で登録されたものが先に試行される / 早く登録されたものより優先される
        # そのため、最も具体的なコンバーターを最も一般的なコンバーターの後に配置する
        self.register_page_converter(PlainTextConverter())
        self.register_page_converter(HtmlConverter())
        self.register_page_converter(WikipediaConverter())
        self.register_page_converter(YouTubeConverter())
        self.register_page_converter(BingSerpConverter())
        self.register_page_converter(DocxConverter())
        self.register_page_converter(XlsxConverter())
        self.register_page_converter(PptxConverter())
        self.register_page_converter(WavConverter())
        self.register_page_converter(Mp3Converter())
        self.register_page_converter(ImageConverter())
        self.register_page_converter(PdfConverter())

    def convert(
        self, source: Union[str, requests.Response], **kwargs: Any
    ) -> DocumentConverterResult: 
        """
        引数:
            - source: パスまたはURLを表す文字列、もしくはrequests.responseオブジェクト
            - extension: ファイルの解釈に使用する拡張子。Noneの場合はsource(パス、URI、content-typeなど)から推測
        """

        # ローカルパスまたはURL
        if isinstance(source, str):
            if (
                source.startswith("http://")
                or source.startswith("https://")
                or source.startswith("file://")
            ):
                return self.convert_url(source, **kwargs)
            else:
                return self.convert_local(source, **kwargs)
        # リクエストレスポンス
        elif isinstance(source, requests.Response):
            return self.convert_response(source, **kwargs)

    def convert_local(
        self, path: str, **kwargs: Any
    ) -> DocumentConverterResult:
        # 試行する拡張子のリストを準備(優先順位順)
        ext = kwargs.get("file_extension")
        extensions = [ext] if ext is not None else []

        # パスとpuremagicから拡張子の代替候補を取得
        base, ext = os.path.splitext(path)
        self._append_ext(extensions, ext)

        for g in self._guess_ext_magic(path):
            self._append_ext(extensions, g)

        # 変換を実行
        return self._convert(path, extensions, **kwargs)

    def convert_stream(
        self, stream: Any, **kwargs: Any
    ) -> DocumentConverterResult:
        # 試行する拡張子のリストを準備(優先順位順)
        ext = kwargs.get("file_extension")
        extensions = [ext] if ext is not None else []

        # ストリームを一時ファイルに保存。このメソッドの終了前に削除される
        handle, temp_path = tempfile.mkstemp()
        fh = os.fdopen(handle, "wb")
        result = None
        try:
            # 一時ファイルに書き込み
            content = stream.read()
            if isinstance(content, str):
                fh.write(content.encode("utf-8"))
            else:
                fh.write(content)
            fh.close()

            # puremagicを使用して拡張子の候補を追加
            for g in self._guess_ext_magic(temp_path):
                self._append_ext(extensions, g)

            # 変換を実行
            result = self._convert(temp_path, extensions, **kwargs)
        # クリーンアップ
        finally:
            try:
                fh.close()
            except Exception:
                pass
            os.unlink(temp_path)

        return result

    def convert_url(
        self, url: str, **kwargs: Any
    ) -> DocumentConverterResult:
        # URLにHTTPリクエストを送信
        response = self._requests_session.get(url, stream=True)
        response.raise_for_status()
        return self.convert_response(response, **kwargs)

    def convert_response(
        self, response: requests.Response, **kwargs: Any
    ) -> DocumentConverterResult:
        # 試行する拡張子のリストを準備(優先順位順)
        ext = kwargs.get("file_extension")
        extensions = [ext] if ext is not None else []

        # MIMEタイプから推測
        content_type = response.headers.get("content-type", "").split(";")[0]
        self._append_ext(extensions, mimetypes.guess_extension(content_type))

        # Content-Dispositionヘッダがある場合は読み取り
        content_disposition = response.headers.get("content-disposition", "")
        m = re.search(r"filename=([^;]+)", content_disposition)
        if m:
            base, ext = os.path.splitext(m.group(1).strip("\"'"))
            self._append_ext(extensions, ext)

        # パスから拡張子を読み取り
        base, ext = os.path.splitext(urlparse(response.url).path)
        self._append_ext(extensions, ext)

        # レスポンスを一時ファイルに保存。このメソッドの終了前に削除される
        handle, temp_path = tempfile.mkstemp()
        fh = os.fdopen(handle, "wb")
        result = None
        try:
            # ファイルをダウンロード
            for chunk in response.iter_content(chunk_size=512):
                fh.write(chunk)
            fh.close()

            # puremagicを使用して拡張子の候補を追加
            for g in self._guess_ext_magic(temp_path):
                self._append_ext(extensions, g)

            # 変換を実行
            result = self._convert(temp_path, extensions, url=response.url)
        # クリーンアップ
        finally:
            try:
                fh.close()
            except Exception:
                pass
            os.unlink(temp_path)

        return result

    def _convert(
        self, local_path: str, extensions: List[Union[str, None]], **kwargs
    ) -> DocumentConverterResult:
        error_trace = ""
        # 拡張子なしの場合も含めて全ての可能性を試す
        for ext in extensions + [None]:
            for converter in self._page_converters:
                _kwargs = copy.deepcopy(kwargs)

                # file_extensionを適切に上書き
                if ext is None:
                    if "file_extension" in _kwargs:
                        del _kwargs["file_extension"]
                else:
                    _kwargs.update({"file_extension": ext})

                # グローバルオプションを追加
                if "mlm_client" not in _kwargs and self._mlm_client is not None:
                    _kwargs["mlm_client"] = self._mlm_client

                if "mlm_model" not in _kwargs and self._mlm_model is not None:
                    _kwargs["mlm_model"] = self._mlm_model

                # エラーが発生した場合はログを取って続行
                try:
                    res = converter.convert(local_path, **_kwargs)
                except Exception:
                    error_trace = ("\n\n" + traceback.format_exc()).strip()

                if res is not None:
                    # コンテンツを正規化
                    res.text_content = "\n".join(
                        [line.rstrip() for line in re.split(r"\r?\n", res.text_content)]
                    )
                    res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content)

                    return res

        # ここまで来て成功しない場合は例外を報告
        if len(error_trace) > 0:
            raise FileConversionException(
                f"'{local_path}'をMarkdownに変換できませんでした。ファイルタイプは{extensions}として認識されました。変換中に次のエラーが発生しました:\n\n{error_trace}"
            )

        # 何も処理できない場合
        raise UnsupportedFormatException(
            f"'{local_path}'をMarkdownに変換できませんでした。形式{extensions}はサポートされていません。"
        )

    def _append_ext(self, extensions, ext):
        """一意の非None、非空の拡張子をリストに追加"""
        if ext is None:
            return
        ext = ext.strip()
        if ext == "":
            return
        # if ext not in extensions:
        if True:
            extensions.append(ext)

    def _guess_ext_magic(self, path):
        """puremaic(libmagicのPython実装)を使用してファイルの最初の数バイトから拡張子を推測"""
        # puremaicを使用して推測
        try:
            guesses = puremagic.magic_file(path)
            extensions = list()
            for g in guesses:
                ext = g.extension.strip()
                if len(ext) > 0:
                    if not ext.startswith("."):
                        ext = "." + ext
                    if ext not in extensions:
                        extensions.append(ext)
            return extensions
        except FileNotFoundError:
            pass
        except IsADirectoryError:
            pass
        except PermissionError:
            pass
        return []

    def register_page_converter(self, converter: DocumentConverter) -> None:
        """ページテキストコンバーターを登録"""
        self._page_converters.insert(0, converter)

おわりに

Mark it down!!

0
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?