はじめに
マイクロソフトが開発した MarkItDownは、PDFだけじゃなく、Word、Excel、PowerPointなどの多様なファイル形式をMarkdownに変換し、テキスト分析やインデックス作成を容易できます。
中身を見ていきましょう。
内容
構成
DocumentConverter # 全てのDocumentConverterの抽象基底クラス
├── PlainTextConverter # text/*形式のプレーンテキストファイルを処理
├── HtmlConverter # HTML形式のファイルをMarkdownに変換。スタイル情報とテーブルを保持
│ ├── DocxConverter # Wordドキュメントをスタイル情報を保持しながらMarkdownに変換
│ ├── XlsxConverter # Excelファイルの各シートを個別のMarkdownテーブルとして変換
│ └── PptxConverter # PowerPointファイルを見出し、テーブル、画像付きでMarkdownに変換
├── WikipediaConverter # Wikipediaページのメインコンテンツを抽出してMarkdown形式に整形
├── YouTubeConverter # YouTube動画のタイトル、説明、メタデータ、文字起こしを抽出
├── BingSerpConverter # Bing検索結果ページからオーガニック検索結果を抽出
├── PdfConverter # PDFファイルをプレーンテキストとしてMarkdownに変換
└── MediaConverter # マルチメディアファイルの基底クラス。メタデータ抽出機能を提供
├── WavConverter # WAVファイルのメタデータと音声文字起こしを処理
│ └── Mp3Converter # WavConverterを継承し、MP3ファイルを処理。音声文字起こしも可能
└── ImageConverter # 画像ファイルのメタデータ、OCR、マルチモーダルLLMによる説明を処理
使用されているライブラリ
表形式で使用されているPythonライブラリをまとめました:
ライブラリ名 | 主な用途 |
---|---|
base64 | バイナリデータの文字列エンコーディング/デコーディング |
binascii | バイナリと ASCII 変換 |
html | HTML エスケープ/アンエスケープ |
json | JSON データの処理 |
mimetypes | MIME タイプの判定 |
os | OS 依存の機能へのインターフェース |
re | 正規表現を使用したテキスト処理 |
shutil | 高水準のファイル操作 |
subprocess | サブプロセスの実行 |
tempfile | 一時ファイルやディレクトリの作成 |
urllib.parse | URL の解析と操作 |
mammoth | Word文書(.docx)からHTML/テキスト変換 |
markdownify | HTMLからMarkdownへの変換 |
pandas | データ分析・操作 |
pdfminer | PDFファイルの解析とテキスト抽出 |
pptx | PowerPointファイルの操作 |
puremagic | ファイル形式の検出 |
requests | HTTPリクエストの処理 |
BeautifulSoup | HTMLとXMLの解析 |
pydub | オーディオファイルの操作(オプション) |
speech_recognition | 音声認識(オプション) |
コード(コメント追記)
import base64
import binascii
import copy
import html
import json
import mimetypes
import os
import re
import shutil
import subprocess
import sys
import tempfile
import traceback
from typing import Any, Dict, List, Optional, Union
from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse
import mammoth
import markdownify
import pandas as pd
import pdfminer
import pdfminer.high_level
import pptx
# ファイル形式の検出
import puremagic
import requests
from bs4 import BeautifulSoup
# オプションの文字起こしサポート
try:
import pydub
import speech_recognition as sr
IS_AUDIO_TRANSCRIPTION_CAPABLE = True
except ModuleNotFoundError:
pass
# オプションのYouTube文字起こしサポート
try:
from youtube_transcript_api import YouTubeTranscriptApi
IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
except ModuleNotFoundError:
pass
class _CustomMarkdownify(markdownify.MarkdownConverter):
"""
markdownifyのMarkdownConverterのカスタムバージョン。主な変更点:
- デフォルトの見出しスタイルを '#'、'##' などに変更
- JavaScriptハイパーリンクの削除
- 大きなdata:URI形式の画像ソースの切り詰め
- URIの適切なエスケープ処理とMarkdown構文との競合回避
"""
def __init__(self, **options: Any):
# デフォルトの見出しスタイルを設定
options["heading_style"] = options.get("heading_style", markdownify.ATX)
# 必要に応じてオプションを明示的に型変換
super().__init__(**options)
def convert_hn(self, n: int, el: Any, text: str, convert_as_inline: bool) -> str:
"""通常の変換に加えて、新しい行で始まることを確認"""
if not convert_as_inline:
if not re.search(r"^\n", text):
return "\n" + super().convert_hn(n, el, text, convert_as_inline)
return super().convert_hn(n, el, text, convert_as_inline)
def convert_a(self, el: Any, text: str, convert_as_inline: bool):
"""通常のコンバーターと同様だが、JavaScriptリンクを削除しURIをエスケープ"""
prefix, suffix, text = markdownify.chomp(text)
if not text:
return ""
href = el.get("href")
title = el.get("title")
# URIをエスケープし、http/https/file以外のスキームをスキップ
if href:
try:
parsed_url = urlparse(href)
if parsed_url.scheme and parsed_url.scheme.lower() not in ["http", "https", "file"]:
return "%s%s%s" % (prefix, text, suffix)
href = urlunparse(parsed_url._replace(path=quote(unquote(parsed_url.path))))
except ValueError:
return "%s%s%s" % (prefix, text, suffix)
# #29の置換: テキストノードのアンダースコアをエスケープ
if (
self.options["autolinks"]
and text.replace(r"\_", "_") == href
and not title
and not self.options["default_title"]
):
# ショートカット構文
return "<%s>" % href
if self.options["default_title"] and not title:
title = href
title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
return (
"%s[%s](%s%s)%s" % (prefix, text, href, title_part, suffix)
if href
else text
)
def convert_img(self, el: Any, text: str, convert_as_inline: bool) -> str:
"""通常のコンバーターと同様だが、data URIを削除"""
alt = el.attrs.get("alt", None) or ""
src = el.attrs.get("src", None) or ""
title = el.attrs.get("title", None) or ""
title_part = ' "%s"' % title.replace('"', r"\"") if title else ""
if (
convert_as_inline
and el.parent.name not in self.options["keep_inline_images_in"]
):
return alt
# dataURIを削除
if src.startswith("data:"):
src = src.split(",")[0] + "..."
return "" % (alt, src, title_part)
def convert_soup(self, soup: Any) -> str:
return super().convert_soup(soup)
class DocumentConverterResult:
"""ドキュメントをテキストに変換した結果を格納するクラス。"""
def __init__(self, title: Union[str, None] = None, text_content: str = ""):
self.title: Union[str, None] = title
self.text_content: str = text_content
class DocumentConverter:
"""全てのDocumentConverterの抽象基底クラス。"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
raise NotImplementedError()
class PlainTextConverter(DocumentConverter):
"""content typeがtext/plainのファイルを処理するコンバーター"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# ファイル拡張子からコンテンツタイプを推測
content_type, _ = mimetypes.guess_type(
"__placeholder" + kwargs.get("file_extension", "")
)
# テキストファイルのみを受け入れる
if content_type is None:
return None
elif "text/" not in content_type.lower():
return None
text_content = ""
with open(local_path, "rt", encoding="utf-8") as fh:
text_content = fh.read()
return DocumentConverterResult(
title=None,
text_content=text_content,
)
class HtmlConverter(DocumentConverter):
"""content typeがtext/htmlのファイルを処理するコンバーター"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# HTMLでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".html", ".htm"]:
return None
result = None
with open(local_path, "rt", encoding="utf-8") as fh:
result = self._convert(fh.read())
return result
def _convert(self, html_content: str) -> Union[None, DocumentConverterResult]:
"""HTMLテキストを変換するヘルパー関数"""
# 文字列をパース
soup = BeautifulSoup(html_content, "html.parser")
# JavaScriptとstyleブロックを削除
for script in soup(["script", "style"]):
script.extract()
# メインコンテンツのみ出力
body_elm = soup.find("body")
webpage_text = ""
if body_elm:
webpage_text = _CustomMarkdownify().convert_soup(body_elm)
else:
webpage_text = _CustomMarkdownify().convert_soup(soup)
assert isinstance(webpage_text, str)
return DocumentConverterResult(
title=None if soup.title is None else soup.title.string,
text_content=webpage_text,
)
class WikipediaConverter(DocumentConverter):
"""Wikipediaページを個別に処理し、メインコンテンツのみに焦点を当てる"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# Wikipediaでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".html", ".htm"]:
return None
url = kwargs.get("url", "")
if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url):
return None
# ファイルをパース
soup = None
with open(local_path, "rt", encoding="utf-8") as fh:
soup = BeautifulSoup(fh.read(), "html.parser")
# JavaScriptとstyleブロックを削除
for script in soup(["script", "style"]):
script.extract()
# メインコンテンツのみ出力
body_elm = soup.find("div", {"id": "mw-content-text"})
title_elm = soup.find("span", {"class": "mw-page-title-main"})
webpage_text = ""
main_title = None if soup.title is None else soup.title.string
if body_elm:
# タイトルの取得
if title_elm and len(title_elm) > 0:
main_title = title_elm.string
assert isinstance(main_title, str)
# ページを変換
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup(
body_elm
)
else:
webpage_text = _CustomMarkdownify().convert_soup(soup)
return DocumentConverterResult(
title=main_title,
text_content=webpage_text,
)
class YouTubeConverter(DocumentConverter):
"""YouTubeを特別に処理し、動画のタイトル、説明、トランスクリプトに焦点を当てる"""
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
# YouTubeでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".html", ".htm"]:
return None
url = kwargs.get("url", "")
if not url.startswith("https://www.youtube.com/watch?"):
return None
# ファイルをパース
soup = None
with open(local_path, "rt", encoding="utf-8") as fh:
soup = BeautifulSoup(fh.read(), "html.parser")
# メタタグを読み込む
assert soup.title is not None and soup.title.string is not None
metadata: Dict[str, str] = {"title": soup.title.string}
for meta in soup(["meta"]):
for a in meta.attrs:
if a in ["itemprop", "property", "name"]:
metadata[meta[a]] = meta.get("content", "")
break
# 完全な説明の読み込みを試みる(ページ実装に依存するため、より壊れやすい)
try:
for script in soup(["script"]):
content = script.text
if "ytInitialData" in content:
lines = re.split(r"\r?\n", content)
obj_start = lines[0].find("{")
obj_end = lines[0].rfind("}")
if obj_start >= 0 and obj_end >= 0:
data = json.loads(lines[0][obj_start : obj_end + 1])
attrdesc = self._findKey(data, "attributedDescriptionBodyText")
if attrdesc:
metadata["description"] = str(attrdesc["content"])
break
except Exception:
pass
# ページの準備を開始
webpage_text = "# YouTube\n"
title = self._get(metadata, ["title", "og:title", "name"])
assert isinstance(title, str)
if title:
webpage_text += f"\n## {title}\n"
stats = ""
views = self._get(metadata, ["interactionCount"])
if views:
stats += f"- **再生回数:** {views}\n"
keywords = self._get(metadata, ["keywords"])
if keywords:
stats += f"- **キーワード:** {keywords}\n"
runtime = self._get(metadata, ["duration"])
if runtime:
stats += f"- **動画時間:** {runtime}\n"
if len(stats) > 0:
webpage_text += f"\n### 動画メタデータ\n{stats}\n"
description = self._get(metadata, ["description", "og:description"])
if description:
webpage_text += f"\n### 説明\n{description}\n"
if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
transcript_text = ""
parsed_url = urlparse(url)
params = parse_qs(parsed_url.query)
if "v" in params:
assert isinstance(params["v"][0], str)
video_id = str(params["v"][0])
try:
# 単一の文字起こしである必要がある
transcript = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = " ".join([part["text"] for part in transcript])
except Exception:
pass
if transcript_text:
webpage_text += f"\n### 文字起こし\n{transcript_text}\n"
title = title if title else soup.title.string
assert isinstance(title, str)
return DocumentConverterResult(
title=title,
text_content=webpage_text,
)
def _get(
self,
metadata: Dict[str, str],
keys: List[str],
default: Union[str, None] = None,
) -> Union[str, None]:
for k in keys:
if k in metadata:
return metadata[k]
return default
def _findKey(self, json: Any, key: str) -> Union[str, None]:
if isinstance(json, list):
for elm in json:
ret = self._findKey(elm, key)
if ret is not None:
return ret
elif isinstance(json, dict):
for k in json:
if k == key:
return json[k]
else:
ret = self._findKey(json[k], key)
if ret is not None:
return ret
return None
class BingSerpConverter(DocumentConverter):
"""
Bingの検索結果ページを処理する(オーガニック検索結果のみ)
注: Bing APIを使用する方が望ましい
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# Bing SERPでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".html", ".htm"]:
return None
url = kwargs.get("url", "")
if not re.search(r"^https://www\.bing\.com/search\?q=", url):
return None
# クエリパラメータを解析
parsed_params = parse_qs(urlparse(url).query)
query = parsed_params.get("q", [""])[0]
# ファイルを解析
soup = None
with open(local_path, "rt", encoding="utf-8") as fh:
soup = BeautifulSoup(fh.read(), "html.parser")
# フォーマットを整理
for tptt in soup.find_all(class_="tptt"):
if hasattr(tptt, "string") and tptt.string:
tptt.string += " "
for slug in soup.find_all(class_="algoSlug_icon"):
slug.extract()
# アルゴリズム的な結果を解析
_markdownify = _CustomMarkdownify()
results = list()
for result in soup.find_all(class_="b_algo"):
# リダイレクトURLを書き換え
for a in result.find_all("a", href=True):
parsed_href = urlparse(a["href"])
qs = parse_qs(parsed_href.query)
# 目的のURLはuパラメータに含まれているが、
# base64エンコードされており、プレフィックスが付いている
if "u" in qs:
u = (
qs["u"][0][2:].strip() + "=="
) # Python 3は余分なパディングを気にしない
try:
# RFC 4648 / Base64URLバリアント("-"と"_"を使用)
a["href"] = base64.b64decode(u, altchars="-_").decode("utf-8")
except UnicodeDecodeError:
pass
except binascii.Error:
pass
# Markdownに変換
md_result = _markdownify.convert_soup(result).strip()
lines = [line.strip() for line in re.split(r"\n+", md_result)]
results.append("\n".join([line for line in lines if len(line) > 0]))
webpage_text = (
f"## '{query}'のBing検索結果:\n\n"
+ "\n\n".join(results)
)
return DocumentConverterResult(
title=None if soup.title is None else soup.title.string,
text_content=webpage_text,
)
class PdfConverter(DocumentConverter):
"""
PDFをMarkdownに変換。ほとんどのスタイル情報は無視され、本質的にプレーンテキストとなる。
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# PDFでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() != ".pdf":
return None
return DocumentConverterResult(
title=None,
text_content=pdfminer.high_level.extract_text(local_path),
)
class DocxConverter(HtmlConverter):
"""
DOCXファイルをMarkdownに変換。可能な限りスタイル情報(見出しなど)とテーブルは保持される。
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# DOCXでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() != ".docx":
return None
result = None
with open(local_path, "rb") as docx_file:
result = mammoth.convert_to_html(docx_file)
html_content = result.value
result = self._convert(html_content)
return result
class XlsxConverter(HtmlConverter):
"""
XLSXファイルをMarkdownに変換し、各シートを別々のMarkdownテーブルとして表示。
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# XLSXでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() != ".xlsx":
return None
sheets = pd.read_excel(local_path, sheet_name=None)
md_content = ""
for s in sheets:
md_content += f"## {s}\n"
html_content = sheets[s].to_html(index=False)
md_content += self._convert(html_content).text_content.strip() + "\n\n"
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
class PptxConverter(HtmlConverter):
"""
PPTXファイルをMarkdownに変換。見出し、テーブル、代替テキスト付き画像をサポート。
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# PPTXでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() != ".pptx":
return None
md_content = ""
presentation = pptx.Presentation(local_path)
slide_num = 0
for slide in presentation.slides:
slide_num += 1
md_content += f"\n\n<!-- スライド番号: {slide_num} -->\n"
title = slide.shapes.title
for shape in slide.shapes:
# 画像
if self._is_picture(shape):
# https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069
alt_text = ""
try:
alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "")
except Exception:
pass
# プレースホルダー名
filename = re.sub(r"\W", "", shape.name) + ".jpg"
md_content += (
"\n\n"
)
# テーブル
if self._is_table(shape):
html_table = "<html><body><table>"
first_row = True
for row in shape.table.rows:
html_table += "<tr>"
for cell in row.cells:
if first_row:
html_table += "<th>" + html.escape(cell.text) + "</th>"
else:
html_table += "<td>" + html.escape(cell.text) + "</td>"
html_table += "</tr>"
first_row = False
html_table += "</table></body></html>"
md_content += (
"\n" + self._convert(html_table).text_content.strip() + "\n"
)
# テキストエリア
elif shape.has_text_frame:
if shape == title:
md_content += "# " + shape.text.lstrip() + "\n"
else:
md_content += shape.text + "\n"
md_content = md_content.strip()
if slide.has_notes_slide:
md_content += "\n\n### ノート:\n"
notes_frame = slide.notes_slide.notes_text_frame
if notes_frame is not None:
md_content += notes_frame.text
md_content = md_content.strip()
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
def _is_picture(self, shape):
# 画像かどうかを判定
if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE:
return True
if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PLACEHOLDER:
if hasattr(shape, "image"):
return True
return False
def _is_table(self, shape):
# テーブルかどうかを判定
if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE:
return True
return False
class MediaConverter(DocumentConverter):
"""
マルチモーダルメディア(画像や音声など)のための抽象クラス
"""
def _get_metadata(self, local_path):
# exiftoolが利用可能か確認
exiftool = shutil.which("exiftool")
if not exiftool:
return None
else:
try:
# メタデータをJSON形式で取得
result = subprocess.run(
[exiftool, "-json", local_path], capture_output=True, text=True
).stdout
return json.loads(result)[0]
except Exception:
return None
class WavConverter(MediaConverter):
"""
WAVファイルをMarkdownに変換。exiftoolがインストールされている場合はメタデータを、
speech_recognitionがインストールされている場合は音声の文字起こしを抽出。
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# WAVでない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() != ".wav":
return None
md_content = ""
# メタデータを追加
metadata = self._get_metadata(local_path)
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# 文字起こし
if IS_AUDIO_TRANSCRIPTION_CAPABLE:
try:
transcript = self._transcribe_audio(local_path)
md_content += "\n\n### 音声の文字起こし:\n" + (
"[音声が検出されませんでした]" if transcript == "" else transcript
)
except Exception:
md_content += (
"\n\n### 音声の文字起こし:\nエラー。この音声を文字起こしできませんでした。"
)
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
def _transcribe_audio(self, local_path) -> str:
# Google Speech RecognitionのAPIを使用して音声を文字起こし
recognizer = sr.Recognizer()
with sr.AudioFile(local_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_google(audio).strip()
class Mp3Converter(WavConverter):
"""
MP3ファイルをMarkdownに変換。exiftoolがインストールされている場合はメタデータを、
speech_recognitionとpydubがインストールされている場合は音声の文字起こしを抽出。
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# MP3でない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() != ".mp3":
return None
md_content = ""
# メタデータを追加
metadata = self._get_metadata(local_path)
if metadata:
for f in [
"Title",
"Artist",
"Author",
"Band",
"Album",
"Genre",
"Track",
"DateTimeOriginal",
"CreateDate",
"Duration",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# 文字起こし
if IS_AUDIO_TRANSCRIPTION_CAPABLE:
handle, temp_path = tempfile.mkstemp(suffix=".wav")
os.close(handle)
try:
# MP3をWAVに変換
sound = pydub.AudioSegment.from_mp3(local_path)
sound.export(temp_path, format="wav")
_args = dict()
_args.update(kwargs)
_args["file_extension"] = ".wav"
try:
transcript = super()._transcribe_audio(temp_path).strip()
md_content += "\n\n### 音声の文字起こし:\n" + (
"[音声が検出されませんでした]" if transcript == "" else transcript
)
except Exception:
md_content += "\n\n### 音声の文字起こし:\nエラー。この音声を文字起こしできませんでした。"
finally:
# 一時ファイルを削除
os.unlink(temp_path)
return DocumentConverterResult(
title=None,
text_content=md_content.strip(),
)
class ImageConverter(MediaConverter):
"""
画像をMarkdownに変換。exiftoolがインストールされている場合はメタデータを、
easyocrがインストールされている場合はOCRを、マルチモーダルLLMが設定されている場合は
画像の説明を抽出。
"""
def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]:
# 画像でない場合は処理しない
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".jpg", ".jpeg", ".png"]:
return None
md_content = ""
# メタデータを追加
metadata = self._get_metadata(local_path)
if metadata:
for f in [
"ImageSize",
"Title",
"Caption",
"Description",
"Keywords",
"Artist",
"Author",
"DateTimeOriginal",
"CreateDate",
"GPSPosition",
]:
if f in metadata:
md_content += f"{f}: {metadata[f]}\n"
# GPTVで画像を説明
mlm_client = kwargs.get("mlm_client")
mlm_model = kwargs.get("mlm_model")
if mlm_client is not None and mlm_model is not None:
md_content += (
"\n# 説明:\n"
+ self._get_mlm_description(
local_path,
extension,
mlm_client,
mlm_model,
prompt=kwargs.get("mlm_prompt"),
).strip()
+ "\n"
)
return DocumentConverterResult(
title=None,
text_content=md_content,
)
def _get_mlm_description(self, local_path, extension, client, model, prompt=None):
# プロンプトが指定されていない場合はデフォルトを使用
if prompt is None or prompt.strip() == "":
prompt = "この画像の詳細な説明を書いてください。"
sys.stderr.write(f"MLM プロンプト:\n{prompt}\n")
# 画像をbase64エンコード
data_uri = ""
with open(local_path, "rb") as image_file:
content_type, encoding = mimetypes.guess_type("_dummy" + extension)
if content_type is None:
content_type = "image/jpeg"
image_base64 = base64.b64encode(image_file.read()).decode("utf-8")
data_uri = f"data:{content_type};base64,{image_base64}"
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": data_uri,
},
},
],
}
]
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content
class FileConversionException(BaseException):
"""ファイル変換時のエラーを表すクラス"""
pass
class UnsupportedFormatException(BaseException):
"""サポートされていないファイル形式のエラーを表すクラス"""
pass
class MarkItDown:
"""
(プレビュー段階) LLM利用に適した、非常にシンプルなテキストベースのドキュメントリーダー。
一般的なファイル形式やWebページをMarkdownに変換します。
"""
def __init__(
self,
requests_session: Optional[requests.Session] = None,
mlm_client: Optional[Any] = None,
mlm_model: Optional[Any] = None,
):
if requests_session is None:
self._requests_session = requests.Session()
else:
self._requests_session = requests_session
self._mlm_client = mlm_client
self._mlm_model = mlm_model
self._page_converters: List[DocumentConverter] = []
# 成功するブラウジング操作のためのコンバーターを登録
# 後で登録されたものが先に試行される / 早く登録されたものより優先される
# そのため、最も具体的なコンバーターを最も一般的なコンバーターの後に配置する
self.register_page_converter(PlainTextConverter())
self.register_page_converter(HtmlConverter())
self.register_page_converter(WikipediaConverter())
self.register_page_converter(YouTubeConverter())
self.register_page_converter(BingSerpConverter())
self.register_page_converter(DocxConverter())
self.register_page_converter(XlsxConverter())
self.register_page_converter(PptxConverter())
self.register_page_converter(WavConverter())
self.register_page_converter(Mp3Converter())
self.register_page_converter(ImageConverter())
self.register_page_converter(PdfConverter())
def convert(
self, source: Union[str, requests.Response], **kwargs: Any
) -> DocumentConverterResult:
"""
引数:
- source: パスまたはURLを表す文字列、もしくはrequests.responseオブジェクト
- extension: ファイルの解釈に使用する拡張子。Noneの場合はsource(パス、URI、content-typeなど)から推測
"""
# ローカルパスまたはURL
if isinstance(source, str):
if (
source.startswith("http://")
or source.startswith("https://")
or source.startswith("file://")
):
return self.convert_url(source, **kwargs)
else:
return self.convert_local(source, **kwargs)
# リクエストレスポンス
elif isinstance(source, requests.Response):
return self.convert_response(source, **kwargs)
def convert_local(
self, path: str, **kwargs: Any
) -> DocumentConverterResult:
# 試行する拡張子のリストを準備(優先順位順)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# パスとpuremagicから拡張子の代替候補を取得
base, ext = os.path.splitext(path)
self._append_ext(extensions, ext)
for g in self._guess_ext_magic(path):
self._append_ext(extensions, g)
# 変換を実行
return self._convert(path, extensions, **kwargs)
def convert_stream(
self, stream: Any, **kwargs: Any
) -> DocumentConverterResult:
# 試行する拡張子のリストを準備(優先順位順)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# ストリームを一時ファイルに保存。このメソッドの終了前に削除される
handle, temp_path = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
result = None
try:
# 一時ファイルに書き込み
content = stream.read()
if isinstance(content, str):
fh.write(content.encode("utf-8"))
else:
fh.write(content)
fh.close()
# puremagicを使用して拡張子の候補を追加
for g in self._guess_ext_magic(temp_path):
self._append_ext(extensions, g)
# 変換を実行
result = self._convert(temp_path, extensions, **kwargs)
# クリーンアップ
finally:
try:
fh.close()
except Exception:
pass
os.unlink(temp_path)
return result
def convert_url(
self, url: str, **kwargs: Any
) -> DocumentConverterResult:
# URLにHTTPリクエストを送信
response = self._requests_session.get(url, stream=True)
response.raise_for_status()
return self.convert_response(response, **kwargs)
def convert_response(
self, response: requests.Response, **kwargs: Any
) -> DocumentConverterResult:
# 試行する拡張子のリストを準備(優先順位順)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# MIMEタイプから推測
content_type = response.headers.get("content-type", "").split(";")[0]
self._append_ext(extensions, mimetypes.guess_extension(content_type))
# Content-Dispositionヘッダがある場合は読み取り
content_disposition = response.headers.get("content-disposition", "")
m = re.search(r"filename=([^;]+)", content_disposition)
if m:
base, ext = os.path.splitext(m.group(1).strip("\"'"))
self._append_ext(extensions, ext)
# パスから拡張子を読み取り
base, ext = os.path.splitext(urlparse(response.url).path)
self._append_ext(extensions, ext)
# レスポンスを一時ファイルに保存。このメソッドの終了前に削除される
handle, temp_path = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
result = None
try:
# ファイルをダウンロード
for chunk in response.iter_content(chunk_size=512):
fh.write(chunk)
fh.close()
# puremagicを使用して拡張子の候補を追加
for g in self._guess_ext_magic(temp_path):
self._append_ext(extensions, g)
# 変換を実行
result = self._convert(temp_path, extensions, url=response.url)
# クリーンアップ
finally:
try:
fh.close()
except Exception:
pass
os.unlink(temp_path)
return result
def _convert(
self, local_path: str, extensions: List[Union[str, None]], **kwargs
) -> DocumentConverterResult:
error_trace = ""
# 拡張子なしの場合も含めて全ての可能性を試す
for ext in extensions + [None]:
for converter in self._page_converters:
_kwargs = copy.deepcopy(kwargs)
# file_extensionを適切に上書き
if ext is None:
if "file_extension" in _kwargs:
del _kwargs["file_extension"]
else:
_kwargs.update({"file_extension": ext})
# グローバルオプションを追加
if "mlm_client" not in _kwargs and self._mlm_client is not None:
_kwargs["mlm_client"] = self._mlm_client
if "mlm_model" not in _kwargs and self._mlm_model is not None:
_kwargs["mlm_model"] = self._mlm_model
# エラーが発生した場合はログを取って続行
try:
res = converter.convert(local_path, **_kwargs)
except Exception:
error_trace = ("\n\n" + traceback.format_exc()).strip()
if res is not None:
# コンテンツを正規化
res.text_content = "\n".join(
[line.rstrip() for line in re.split(r"\r?\n", res.text_content)]
)
res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content)
return res
# ここまで来て成功しない場合は例外を報告
if len(error_trace) > 0:
raise FileConversionException(
f"'{local_path}'をMarkdownに変換できませんでした。ファイルタイプは{extensions}として認識されました。変換中に次のエラーが発生しました:\n\n{error_trace}"
)
# 何も処理できない場合
raise UnsupportedFormatException(
f"'{local_path}'をMarkdownに変換できませんでした。形式{extensions}はサポートされていません。"
)
def _append_ext(self, extensions, ext):
"""一意の非None、非空の拡張子をリストに追加"""
if ext is None:
return
ext = ext.strip()
if ext == "":
return
# if ext not in extensions:
if True:
extensions.append(ext)
def _guess_ext_magic(self, path):
"""puremaic(libmagicのPython実装)を使用してファイルの最初の数バイトから拡張子を推測"""
# puremaicを使用して推測
try:
guesses = puremagic.magic_file(path)
extensions = list()
for g in guesses:
ext = g.extension.strip()
if len(ext) > 0:
if not ext.startswith("."):
ext = "." + ext
if ext not in extensions:
extensions.append(ext)
return extensions
except FileNotFoundError:
pass
except IsADirectoryError:
pass
except PermissionError:
pass
return []
def register_page_converter(self, converter: DocumentConverter) -> None:
"""ページテキストコンバーターを登録"""
self._page_converters.insert(0, converter)
おわりに
Mark it down!!