1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonで実装するVisitorパターンによるデータ変換処理

Posted at

はじめに

JSONデータを異なる形式(CSV、YAML、XML)に変換する問題をテーマとします。この記事では、Visitorパターンを使用して柔軟で拡張性の高いデータ変換処理を実装する方法を解説します。

image.png

image.png

なぜVisitorパターンなのか?

image.png

従来の実装の問題点

データ変換処理でよく見られる実装方法は以下のようなものです:

def convert_data(data, format_type):
    if format_type == "csv":
        return to_csv(data)
    elif format_type == "yaml":
        return to_yaml(data)
    elif format_type == "xml":
        return to_xml(data)
    # 新しい形式を追加するたびにif文が増える...

この実装には以下の問題があります:

  1. 拡張性の問題: 新しい形式を追加するたびに既存コードを修正
  2. 保守性の問題: 各変換処理のロジックが散らばる
  3. 再利用性の問題: 共通処理の再利用が難しい

Visitorパターンのメリット

Visitorパターンを採用することで、以下のメリットが得られます:

  1. データ構造と処理の分離: データ構造自体に変更を加えずに新しい処理を追加可能
  2. 拡張性の向上: 新しい変換処理は新しいVisitorクラスを追加するだけ
  3. 責務の明確化: 各変換処理が専用のクラスに集約される
  4. 再利用性の向上: 共通のロジックを基底クラスで実装可能

実装例

1. データ構造の定義

まず、JSONデータを表現するクラス階層を定義します:

from dataclasses import dataclass
from typing import List, Dict, Any, Union, Optional

# 型定義
JSONPrimitive = Union[str, int, float, bool, None]
JSONValue = Union['JSONNode', JSONPrimitive]

@dataclass
class JSONObject(JSONNode):
    values: Dict[str, JSONValue]
    
    def accept(self, visitor: 'JSONVisitor') -> None:
        visitor.visit_object(self)
        
@dataclass
class JSONArray(JSONNode):
    items: List[JSONValue]
    
    def accept(self, visitor: 'JSONVisitor') -> None:
        visitor.visit_array(self)

2. 変換処理の実装

各形式への変換処理を実装します。以下は主要な実装の例です:

CSVコンバーター

class CSVConverter(JSONVisitor):
    def __init__(self):
        self.rows: List[Dict[str, str]] = []
        self.current_row: Dict[str, str] = {}
        self.headers: Set[str] = set()
    
    def _flatten_object(self, obj: JSONObject, prefix: str = '') -> None:
        for key, value in obj.values.items():
            full_key = f"{prefix}.{key}" if prefix else key
            if isinstance(value, JSONObject):
                self._flatten_object(value, full_key)
            elif isinstance(value, JSONArray):
                self._process_array(value, full_key)
            else:
                self.headers.add(full_key)
                self.current_row[full_key] = str(value)

YAMLコンバーター

class YAMLConverter(JSONVisitor):
    def __init__(self):
        self.result: Any = None
        
    def _convert_value(self, value: JSONValue) -> Any:
        if isinstance(value, (str, int, float, bool)) or value is None:
            return value
        elif isinstance(value, JSONNode):
            original_result = self.result
            self.result = None
            value.accept(self)
            converted = self.result
            self.result = original_result
            return converted
        return None

XMLコンバーター

class XMLConverter(JSONVisitor):
    def __init__(self):
        self.result = ""
        self.indent_level = 0
        self.indent_str = "  "
    
    def visit_object(self, obj: JSONObject) -> None:
        self.result += f"{self._indent()}<object>\n"
        self.indent_level += 1
        for key, value in obj.values.items():
            safe_key = xml_escape(key)
            self.result += f"{self._indent()}<{safe_key}>\n"
            # ... 残りの実装

変換結果の例

以下のようなサンプルデータを使用して、各形式への変換を試してみます:

sample_data = {
    "users": [
        {
            "id": 1,
            "name": "山田太郎",
            "email": "yamada@example.com",
            "age": 30,
            "active": True,
            "skills": ["Python", "JavaScript", "SQL"]
        },
        {
            "id": 2,
            "name": "鈴木花子",
            "email": "suzuki@example.com",
            "age": 25,
            "active": False,
            "skills": ["Java", "C#"],
            "department": "開発部"
        }
    ]
}

CSV出力結果

active,age,department,email,id,name,skills[0],skills[1],skills[2]
True,30,,yamada@example.com,1,山田太郎,Python,JavaScript,SQL
False,25,開発部,suzuki@example.com,2,鈴木花子,Java,C#,

CSVの特徴:

  • ヘッダーがアルファベット順にソート
  • 配列要素が個別のカラムに展開
  • 欠損値が適切に処理

YAML出力結果

users:
  - id: 1
    name: 山田太郎
    email: yamada@example.com
    age: 30
    active: true
    skills:
      - Python
      - JavaScript
      - SQL
  - id: 2
    name: 鈴木花子
    email: suzuki@example.com
    age: 25
    active: false
    skills:
      - Java
      - C#
    department: 開発部

YAMLの特徴:

  • 階層構造を保持
  • データ型を適切に保持(boolean, number, string)
  • 日本語を正しく処理

XML出力結果

<?xml version="1.0" encoding="UTF-8"?>
<object>
  <users>
    <array>
      <item>
        <object>
          <id>1</id>
          <name>山田太郎</name>
          <email>yamada@example.com</email>
          <age>30</age>
          <active>True</active>
          <skills>
            <array>
              <item>Python</item>
              <item>JavaScript</item>
              <item>SQL</item>
            </array>
          </skills>
        </object>
      </item>
      <!-- ... 2番目のユーザー情報 ... -->
    </array>
  </users>
</object>

XMLの特徴:

  • 適切なインデントで整形
  • 階層構造を明確に表現
  • XML宣言を含む

実装のポイント

  1. 型安全性

    • 型ヒントを活用
    • dataclassでの構造定義
  2. データ構造の処理

    • 再帰的なデータ構造のサポート
    • 配列要素の適切な処理
  3. 出力形式の最適化

    • CSVのヘッダー管理
    • YAMLのデータ型保持
    • XMLの整形とエスケープ

まとめ

Visitorパターンを使用することで:

  • 新しい変換形式の追加が容易
  • コードの保守性が向上
  • 変換処理の再利用が可能

実装時は以下の点に注意が必要です:

  • データ構造の適切な設計
  • 各Visitorの責務の明確化
  • エラー処理の考慮

実装(動作確認用)

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Any, Union, Optional, TypeVar, Set
import yaml
from xml.sax.saxutils import escape as xml_escape
import csv
import io

# 型定義
JSONPrimitive = Union[str, int, float, bool, None]
JSONValue = Union['JSONNode', JSONPrimitive]
T = TypeVar('T', bound='JSONNode')

class JSONNode(ABC):
    """JSONデータの基底クラス"""
    @abstractmethod
    def accept(self, visitor: 'JSONVisitor') -> None:
        """Visitorを受け入れるメソッド"""
        pass

    def get(self, key: str, default: Any = None) -> Optional[JSONValue]:
        """キーに対応する値を取得(JSONObjectの場合)"""
        return default

@dataclass
class JSONObject(JSONNode):
    """JSONオブジェクトを表現するクラス"""
    values: Dict[str, JSONValue]
    
    def accept(self, visitor: 'JSONVisitor') -> None:
        visitor.visit_object(self)

    def get(self, key: str, default: Any = None) -> Optional[JSONValue]:
        """キーに対応する値を取得"""
        return self.values.get(key, default)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'JSONObject':
        """通常の辞書からJSONObjectを生成"""
        values = {}
        for key, value in data.items():
            if isinstance(value, dict):
                values[key] = cls.from_dict(value)
            elif isinstance(value, list):
                values[key] = JSONArray.from_list(value)
            else:
                values[key] = value
        return cls(values)
        
@dataclass
class JSONArray(JSONNode):
    """JSON配列を表現するクラス"""
    items: List[JSONValue]
    
    def accept(self, visitor: 'JSONVisitor') -> None:
        visitor.visit_array(self)

    @classmethod
    def from_list(cls, data: List[Any]) -> 'JSONArray':
        """通常のリストからJSONArrayを生成"""
        items = []
        for value in data:
            if isinstance(value, dict):
                items.append(JSONObject.from_dict(value))
            elif isinstance(value, list):
                items.append(cls.from_list(value))
            else:
                items.append(value)
        return cls(items)

class JSONVisitor(ABC):
    """JSON変換の基底Visitorクラス"""
    @abstractmethod
    def visit_object(self, obj: JSONObject) -> None:
        """オブジェクトの処理"""
        pass
        
    @abstractmethod
    def visit_array(self, arr: JSONArray) -> None:
        """配列の処理"""
        pass

class CSVConverter(JSONVisitor):
    """CSV形式への変換を行うVisitor"""
    def __init__(self):
        self.rows: List[Dict[str, str]] = []
        self.current_row: Dict[str, str] = {}
        self.headers: Set[str] = set()
    
    def _flatten_object(self, obj: JSONObject, prefix: str = '') -> None:
        """オブジェクトをフラットな構造に変換"""
        for key, value in obj.values.items():
            full_key = f"{prefix}.{key}" if prefix else key
            if isinstance(value, JSONObject):
                self._flatten_object(value, full_key)
            elif isinstance(value, JSONArray):
                self._process_array(value, full_key)
            else:
                self.headers.add(full_key)
                self.current_row[full_key] = str(value)

    def _process_array(self, arr: JSONArray, prefix: str) -> None:
        """配列を処理し、各要素を個別の列として追加"""
        for i, item in enumerate(arr.items):
            if isinstance(item, (str, int, float, bool)) or item is None:
                self.headers.add(f"{prefix}[{i}]")
                self.current_row[f"{prefix}[{i}]"] = str(item)
            elif isinstance(item, JSONObject):
                self._flatten_object(item, f"{prefix}[{i}]")
            elif isinstance(item, JSONArray):
                self._process_array(item, f"{prefix}[{i}]")

    def visit_object(self, obj: JSONObject) -> None:
        """JSONObjectを処理"""
        self.current_row = {}
        self._flatten_object(obj)
        if self.current_row:
            self.rows.append(self.current_row.copy())

    def visit_array(self, arr: JSONArray) -> None:
        """JSONArrayを処理"""
        for item in arr.items:
            if isinstance(item, JSONObject):
                self.current_row = {}
                self._flatten_object(item)
                self.rows.append(self.current_row.copy())

    def get_result(self) -> str:
        """CSV形式の文字列を返す"""
        if not self.rows:
            return ""
        
        # ヘッダーをソート
        headers = sorted(self.headers)
        
        # CSV出力の準備
        output = io.StringIO()
        writer = csv.DictWriter(output, fieldnames=headers)
        
        # ヘッダー行とデータ行を書き込み
        writer.writeheader()
        for row in self.rows:
            writer.writerow(row)
        
        return output.getvalue()

class YAMLConverter(JSONVisitor):
    """YAML形式への変換を行うVisitor"""
    def __init__(self):
        self.result: Any = None
        
    def _convert_value(self, value: JSONValue) -> Any:
        """値をYAML互換の形式に変換"""
        if isinstance(value, (str, int, float, bool)) or value is None:
            return value
        elif isinstance(value, JSONNode):
            original_result = self.result
            self.result = None
            value.accept(self)
            converted = self.result
            self.result = original_result
            return converted
        return None

    def visit_object(self, obj: JSONObject) -> None:
        """JSONObjectを処理"""
        result = {}
        for key, value in obj.values.items():
            result[key] = self._convert_value(value)
        self.result = result
    
    def visit_array(self, arr: JSONArray) -> None:
        """JSONArrayを処理"""
        result = []
        for item in arr.items:
            converted = self._convert_value(item)
            result.append(converted)
        self.result = result
    
    def get_result(self) -> str:
        """YAML形式の文字列を返す"""
        if self.result is None:
            return ""
        return yaml.dump(self.result, allow_unicode=True, sort_keys=False)

class XMLConverter(JSONVisitor):
    """XML形式への変換を行うVisitor"""
    def __init__(self):
        self.result = ""
        self.indent_level = 0
        self.indent_str = "  "
    
    def _indent(self) -> str:
        """現在のインデントレベルに応じたインデント文字列を返す"""
        return self.indent_str * self.indent_level
    
    def visit_object(self, obj: JSONObject) -> None:
        """JSONObjectを処理"""
        self.result += f"{self._indent()}<object>\n"
        self.indent_level += 1
        for key, value in obj.values.items():
            safe_key = xml_escape(key)
            self.result += f"{self._indent()}<{safe_key}>\n"
            self.indent_level += 1
            if isinstance(value, JSONNode):
                value.accept(self)
            else:
                self.result += f"{self._indent()}{xml_escape(str(value))}\n"
            self.indent_level -= 1
            self.result += f"{self._indent()}</{safe_key}>\n"
        self.indent_level -= 1
        self.result += f"{self._indent()}</object>\n"
    
    def visit_array(self, arr: JSONArray) -> None:
        """JSONArrayを処理"""
        self.result += f"{self._indent()}<array>\n"
        self.indent_level += 1
        for item in arr.items:
            self.result += f"{self._indent()}<item>\n"
            self.indent_level += 1
            if isinstance(item, JSONNode):
                item.accept(self)
            else:
                self.result += f"{self._indent()}{xml_escape(str(item))}\n"
            self.indent_level -= 1
            self.result += f"{self._indent()}</item>\n"
        self.indent_level -= 1
        self.result += f"{self._indent()}</array>\n"
    
    def get_result(self) -> str:
        """整形されたXML文字列を返す"""
        return f'<?xml version="1.0" encoding="UTF-8"?>\n{self.result}'

def create_sample_data() -> JSONObject:
    """サンプルデータを生成する関数"""
    raw_data = {
        "users": [
            {
                "id": 1,
                "name": "山田太郎",
                "email": "yamada@example.com",
                "age": 30,
                "active": True,
                "skills": ["Python", "JavaScript", "SQL"]
            },
            {
                "id": 2,
                "name": "鈴木花子",
                "email": "suzuki@example.com",
                "age": 25,
                "active": False,
                "skills": ["Java", "C#"],
                "department": "開発部"
            }
        ]
    }
    return JSONObject.from_dict(raw_data)

def main():
    """使用例を示すメイン関数"""
    # サンプルデータの作成
    data = create_sample_data()
    
    # users配列の取得
    users = data.get("users")
    if not isinstance(users, JSONNode):
        print("Error: Invalid data structure")
        return
    
    # CSV形式に変換
    csv_converter = CSVConverter()
    users.accept(csv_converter)
    print("\nCSV Output:")
    print(csv_converter.get_result())

    # YAML形式に変換
    yaml_converter = YAMLConverter()
    data.accept(yaml_converter)
    print("\nYAML Output:")
    print(yaml_converter.get_result())
    
    # XML形式に変換
    xml_converter = XMLConverter()
    data.accept(xml_converter)
    print("\nXML Output:")
    print(xml_converter.get_result())

if __name__ == "__main__":
    main()

実行例: google colab
image.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?