0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PythonでAvroを使って実現するデータシリアライゼーションのチュートリアル

Posted at

第1章: Avroとは

Avroは、Apache Software Foundationが開発したデータシリアライゼーションフレームワークです。主にHadoopエコシステムで使用されていますが、他の多くのシステムでも利用可能です。

Avroの主な特徴は以下の通りです:

  1. スキーマベース: データ構造を定義するスキーマを使用します。
  2. 言語に依存しない: 様々なプログラミング言語でAvroを使用できます。
  3. 効率的なデータ圧縮: バイナリフォーマットを使用し、データを効率的に圧縮します。
  4. スキーマの進化: 時間とともにスキーマを変更できます。

Pythonでは、avroライブラリを使用してAvroを扱います。以下のコマンドでインストールできます:

pip install avro-python3

第2章: Avroスキーマの作成

Avroスキーマは、JSONフォーマットで定義します。以下は簡単な例です:

import json

schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number", "type": ["int", "null"]},
        {"name": "favorite_color", "type": ["string", "null"]}
    ]
}

# スキーマをJSONファイルとして保存
with open("user.avsc", "w") as schema_file:
    json.dump(schema, schema_file)

print("スキーマを作成し、user.avscファイルに保存しました。")

この例では、Userという名前のレコードを定義し、namefavorite_numberfavorite_colorというフィールドを持つスキーマを作成しています。

第3章: Avroファイルへのデータ書き込み

スキーマを定義したら、そのスキーマに基づいてデータをAvroファイルに書き込むことができます。

from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import avro.schema

# スキーマの読み込み
schema = avro.schema.parse(open("user.avsc", "rb").read())

# データの準備
users = [
    {"name": "佐藤太郎", "favorite_number": 42, "favorite_color": ""},
    {"name": "鈴木花子", "favorite_number": 7, "favorite_color": ""},
    {"name": "田中次郎", "favorite_number": None, "favorite_color": None}
]

# Avroファイルへの書き込み
with DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema) as writer:
    for user in users:
        writer.append(user)

print("データをusers.avroファイルに書き込みました。")

この例では、3人のユーザーデータをusers.avroファイルに書き込んでいます。

第4章: Avroファイルからのデータ読み込み

Avroファイルからデータを読み込むには、以下のようにします:

from avro.datafile import DataFileReader
from avro.io import DatumReader

# Avroファイルからの読み込み
with DataFileReader(open("users.avro", "rb"), DatumReader()) as reader:
    for user in reader:
        print(user)

print("users.avroファイルからデータを読み込みました。")

このコードは、users.avroファイルからデータを読み込み、各ユーザーの情報を表示します。

第5章: Avroスキーマの進化

Avroの大きな特徴の一つは、スキーマの進化をサポートしていることです。例えば、新しいフィールドを追加する場合:

import json

# 新しいスキーマの定義
new_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number", "type": ["int", "null"]},
        {"name": "favorite_color", "type": ["string", "null"]},
        {"name": "age", "type": ["int", "null"], "default": None}  # 新しいフィールド
    ]
}

# 新しいスキーマをJSONファイルとして保存
with open("user_v2.avsc", "w") as schema_file:
    json.dump(new_schema, schema_file)

print("新しいスキーマをuser_v2.avscファイルに保存しました。")

この新しいスキーマは、古いデータを読み込むことができ、新しいフィールド(この場合はage)にはデフォルト値が使用されます。

第6章: Avroでの複雑なデータ型の使用

Avroは、複雑なデータ型もサポートしています。以下は、配列と地図(マップ)を使用する例です:

import json
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import avro.schema

# 複雑なスキーマの定義
complex_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "ComplexUser",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "friends", "type": {"type": "array", "items": "string"}},
        {"name": "attributes", "type": {"type": "map", "values": "string"}}
    ]
}

# スキーマをJSONファイルとして保存
with open("complex_user.avsc", "w") as schema_file:
    json.dump(complex_schema, schema_file)

# スキーマの読み込み
schema = avro.schema.parse(open("complex_user.avsc", "rb").read())

# データの準備
users = [
    {
        "name": "山田太郎",
        "friends": ["佐藤", "鈴木", "田中"],
        "attributes": {"height": "180cm", "weight": "70kg"}
    },
    {
        "name": "中村花子",
        "friends": ["高橋", "伊藤"],
        "attributes": {"hobby": "読書", "favorite_food": "寿司"}
    }
]

# Avroファイルへの書き込み
with DataFileWriter(open("complex_users.avro", "wb"), DatumWriter(), schema) as writer:
    for user in users:
        writer.append(user)

print("複雑なデータをcomplex_users.avroファイルに書き込みました。")

# Avroファイルからの読み込み
with DataFileReader(open("complex_users.avro", "rb"), DatumReader()) as reader:
    for user in reader:
        print(user)

print("complex_users.avroファイルから複雑なデータを読み込みました。")

この例では、friendsという配列フィールドとattributesというマップフィールドを持つ複雑なユーザーデータを扱っています。

第7章: Avroでの論理型の使用

Avroは、基本的なデータ型に加えて、論理型もサポートしています。論理型は、基本的なデータ型に特別な意味を持たせるものです。以下は、日付と時刻を扱う例です:

import json
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import avro.schema
from datetime import date, time

# 論理型を使用したスキーマの定義
logical_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "LogicalUser",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "birth_date", "type": {"type": "int", "logicalType": "date"}},
        {"name": "wake_up_time", "type": {"type": "int", "logicalType": "time-millis"}}
    ]
}

# スキーマをJSONファイルとして保存
with open("logical_user.avsc", "w") as schema_file:
    json.dump(logical_schema, schema_file)

# スキーマの読み込み
schema = avro.schema.parse(open("logical_user.avsc", "rb").read())

# データの準備
users = [
    {
        "name": "佐藤一郎",
        "birth_date": (date(1990, 1, 1) - date(1970, 1, 1)).days,
        "wake_up_time": (time(6, 30).hour * 3600 + time(6, 30).minute * 60) * 1000
    },
    {
        "name": "鈴木二郎",
        "birth_date": (date(1985, 5, 15) - date(1970, 1, 1)).days,
        "wake_up_time": (time(7, 0).hour * 3600 + time(7, 0).minute * 60) * 1000
    }
]

# Avroファイルへの書き込み
with DataFileWriter(open("logical_users.avro", "wb"), DatumWriter(), schema) as writer:
    for user in users:
        writer.append(user)

print("論理型を使用したデータをlogical_users.avroファイルに書き込みました。")

# Avroファイルからの読み込み
with DataFileReader(open("logical_users.avro", "rb"), DatumReader()) as reader:
    for user in reader:
        print(user)

print("logical_users.avroファイルから論理型を使用したデータを読み込みました。")

この例では、birth_dateを日付として、wake_up_timeを時刻として扱っています。ただし、Avroの制約により、これらの値は整数として保存されます。

第8章: Avroスキーマの検証

データがスキーマに適合しているかを確認することは重要です。以下は、Avroスキーマを使用してデータを検証する例です:

import json
from avro.schema import parse
from avro.io import validate

# スキーマの定義
schema_json = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": ["int", "null"]},
        {"name": "email", "type": "string"}
    ]
}

# スキーマの解析
schema = parse(json.dumps(schema_json))

# 有効なデータ
valid_data = {
    "name": "田中三郎",
    "age": 30,
    "email": "tanaka@example.com"
}

# 無効なデータ(ageが文字列)
invalid_data = {
    "name": "山本四郎",
    "age": "40",  # これは整数であるべき
    "email": "yamamoto@example.com"
}

# データの検証
def validate_data(data):
    try:
        validate(schema, data)
        print(f"{data['name']}のデータは有効です。")
    except Exception as e:
        print(f"{data['name']}のデータは無効です: {str(e)}")

# 検証の実行
validate_data(valid_data)
validate_data(invalid_data)

この例では、validate関数を使用してデータがスキーマに適合しているかをチェックしています。有効なデータは検証を通過し、無効なデータはエラーを発生させます。

第9章: Avroでのスキーマ互換性

Avroの重要な特徴の一つは、スキーマの互換性です。新しいバージョンのスキーマで古いデータを読み取ったり、その逆を行ったりすることができます。以下は、スキーマの互換性を示す例です:

import json
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import avro.schema

# 古いスキーマ
old_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number", "type": ["int", "null"]}
    ]
}

# 新しいスキーマ(フィールドを追加)
new_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number", "type": ["int", "null"]},
        {"name": "favorite_color", "type": ["string", "null"], "default": None}
    ]
}

# 古いスキーマでデータを書き込む
old_schema_parsed = avro.schema.parse(json.dumps(old_schema))
with DataFileWriter(open("old_users.avro", "wb"), DatumWriter(), old_schema_parsed) as writer:
    writer.append({"name": "高橋五郎", "favorite_number": 5})
    writer.append({"name": "佐藤六子", "favorite_number": None})

print("古いスキーマでデータを書き込みました。")

# 新しいスキーマで古いデータを読み込む
new_schema_parsed = avro.schema.parse(json.dumps(new_schema))
with DataFileReader(open("old_users.avro", "rb"), DatumReader()) as reader:
    for user in reader:
        print(f"古いデータ: {user}")

print("新しいスキーマで古いデータを読み込みました。")

# 新しいスキーマでデータを書き込む
with DataFileWriter(open("new_users.avro", "wb"), DatumWriter(), new_schema_parsed) as writer:
    writer.append({"name": "伊藤七郎", "favorite_number": 7, "favorite_color": ""})
    writer.append({"name": "渡辺八子", "favorite_number": None, "favorite_color": None})

print("新しいスキーマでデータを書き込みました。")

# 古いスキーマで新しいデータを読み込む
with DataFileReader(open("new_users.avro", "rb"), DatumReader()) as reader:
    for user in reader:
        print(f"新しいデータ: {user}")

print("古いスキーマで新しいデータを読み込みました。")

この例では、まず古いスキーマでデータを書き込み、それを新しいスキーマで読み込んでいます。次に、新しいスキーマでデータを書き込み、それを古いスキーマで読み込んでいます。Avroのスキーマ互換性により、これらの操作が可能になっています。

第10章: Avroでのデータ圧縮

Avroは効率的なデータ圧縮をサポートしています。以下は、圧縮を使用してデータを書き込む例です:

import json
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import avro.schema

# スキーマの定義
schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": ["int", "null"]},
        {"name": "email", "type": "string"}
    ]
}

# スキーマの解析
parsed_schema = avro.schema.parse(json.dumps(schema))

# データの準備
users = [
    {"name": "中村九郎", "age": 35, "email": "nakamura@example.com"},
    {"name": "小林十子", "age": 28, "email": "kobayashi@example.com"},
    {"name": "加藤十一郎", "age": 42, "email": "kato@example.com"}
]

# 圧縮なしでデータを書き込む
with DataFileWriter(open("users_uncompressed.avro", "wb"), DatumWriter(), parsed_schema) as writer:
    for user in users:
        writer.append(user)

print("圧縮なしでデータを書き込みました。")

# Snappy圧縮を使用してデータを書き込む
with DataFileWriter(open("users_snappy.avro", "wb"), DatumWriter(), parsed_schema, codec="snappy") as writer:
    for user in users:
        writer.append(user)

print("Snappy圧縮でデータを書き込みました。")

# Deflate圧縮を使用してデータを書き込む
with DataFileWriter(open("users_deflate.avro", "wb"), DatumWriter(), parsed_schema, codec="deflate") as writer:
    for user in users:
        writer.append(user)

print("Deflate圧縮でデータを書き込みました。")

# ファイルサイズの比較
import os

print(f"圧縮なしファイルサイズ: {os.path.getsize('users_uncompressed.avro')} バイト")
print(f"Snappy圧縮ファイルサイズ: {os.path.getsize('users_snappy.avro')} バイト")
print(f"Deflate圧縮ファイルサイズ: {os.path.getsize('users_deflate.avro')} バイト")

この例では、圧縮なし、Snappy圧縮、Deflate圧縮の3つの方法でデータを書き込み、それぞれのファイルサイズを比較しています。

第11章: Avroでのスキーマ解決

大規模なシステムでは、複数のスキーマを管理する必要があります。以下は、スキーマ解決の例です:

import json
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import avro.schema

# スキーマレジストリの模擬
schema_registry = {}

def register_schema(name, schema):
    schema_registry[name] = avro.schema.parse(json.dumps(schema))

def get_schema(name):
    return schema_registry.get(name)

# スキーマの登録
user_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": ["int", "null"]}
    ]
}

address_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "Address",
    "fields": [
        {"name": "street", "type": "string"},
        {"name": "city", "type": "string"}
    ]
}

register_schema("User", user_schema)
register_schema("Address", address_schema)

# データの準備
users = [
    {"name": "山田十二子", "age": 30},
    {"name": "佐々木十三郎", "age": None}
]

addresses = [
    {"street": "桜通り1-2-3", "city": "東京"},
    {"street": "梅町4-5-6", "city": "大阪"}
]

# ユーザーデータの書き込み
with DataFileWriter(open("users.avro", "wb"), DatumWriter(), get_schema("User")) as writer:
    for user in users:
        writer.append(user)

# アドレスデータの書き込み
with DataFileWriter(open("addresses.avro", "wb"), DatumWriter(), get_schema("Address")) as writer:
    for address in addresses:
        writer.append(address)

print("ユーザーデータとアドレスデータを別々のファイルに書き込みました。")

# データの読み込みと表示
def read_and_print(filename, schema_name):
    with DataFileReader(open(filename, "rb"), DatumReader()) as reader:
        print(f"{schema_name}データ:")
        for record in reader:
            print(record)

read_and_print("users.avro", "User")
read_and_print("addresses.avro", "Address")

この例では、簡単なスキーマレジストリを実装し、複数のスキーマを管理しています。各データ型に対して適切なスキーマを使用してデータを書き込み、読み込んでいます。

第12章: Avroとパンダスの統合

データ分析でよく使用されるパンダスライブラリとAvroを統合する例を示します:

import json
import pandas as pd
from io import BytesIO
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import avro.schema

# スキーマの定義
schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "SalesRecord",
    "fields": [
        {"name": "date", "type": "string"},
        {"name": "product", "type": "string"},
        {"name": "quantity", "type": "int"},
        {"name": "price", "type": "float"}
    ]
}

# スキーマの解析
parsed_schema = avro.schema.parse(json.dumps(schema))

# パンダスデータフレームの作成
df = pd.DataFrame({
    "date": ["2024-09-01", "2024-09-02", "2024-09-03"],
    "product": ["A", "B", "C"],
    "quantity": [10, 20, 15],
    "price": [100.0, 200.0, 150.0]
})

# データフレームをAvroファイルに書き込む
with DataFileWriter(open("sales.avro", "wb"), DatumWriter(), parsed_schema) as writer:
    for _, row in df.iterrows():
        writer.append(row.to_dict())

print("データフレームをAvroファイルに書き込みました。")

# Avroファイルからデータフレームを読み込む
records = []
with DataFileReader(open("sales.avro", "rb"), DatumReader()) as reader:
    for record in reader:
        records.append(record)

df_from_avro = pd.DataFrame(records)
print("Avroファイルから読み込んだデータフレーム:")
print(df_from_avro)

# データフレームの分析
print("\n合計売上:")
print(df_from_avro["quantity"] * df_from_avro["price"].sum())

print("\n製品ごとの平均価格:")
print(df_from_avro.groupby("product")["price"].mean())

この例では、パンダスのデータフレームをAvroファイルに書き込み、そのファイルを読み込んで新しいデータフレームを作成しています。その後、簡単なデータ分析を行っています。

第13章: Avroとスパークの統合

Apache Sparkはビッグデータ処理に広く使用されており、Avroとの統合も可能です。以下は、PySpark(SparkのPython API)を使用してAvroファイルを読み書きする例です:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Sparkセッションの作成
spark = SparkSession.builder \
    .appName("AvroExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.2") \
    .getOrCreate()

# スキーマの定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("height", FloatType(), True)
])

# データの作成
data = [
    ("田中十四郎", 45, 175.5),
    ("佐藤十五子", 30, 160.0),
    ("鈴木十六郎", 55, 180.0)
]

# データフレームの作成
df = spark.createDataFrame(data, schema)

# Avroファイルとして保存
df.write.format("avro").save("spark_users.avro")

print("データフレームをAvroファイルとして保存しました。")

# Avroファイルの読み込み
df_read = spark.read.format("avro").load("spark_users.avro")

print("Avroファイルから読み込んだデータ:")
df_read.show()

# データの分析
print("年齢の平均値:")
df_read.select("age").groupBy().mean().show()

print("身長の最大値:")
df_read.select("height").groupBy().max().show()

# Sparkセッションの終了
spark.stop()

この例では、SparkのデータフレームをAvroファイルとして保存し、そのファイルを読み込んで新しいデータフレームを作成しています。その後、簡単なデータ分析を行っています。

注意: この例を実行するには、Spark環境が必要です。また、Spark Avroパッケージをダウンロードするために、インターネット接続が必要です。

第14章: Avroのスキーマ進化とバージョニング

Avroの重要な特徴の一つは、スキーマの進化をサポートしていることです。以下は、スキーマの進化とバージョニングの例です:

import json
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import avro.schema

# バージョン1のスキーマ
schema_v1 = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": ["int", "null"]}
    ]
}

# バージョン2のスキーマ(フィールドの追加)
schema_v2 = {
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": ["int", "null"]},
        {"name": "email", "type": ["string", "null"], "default": None}
    ]
}

# バージョン1のデータ
data_v1 = [
    {"name": "山本十七子", "age": 28},
    {"name": "中村十八郎", "age": 35}
]

# バージョン1のスキーマでデータを書き込む
with DataFileWriter(open("users_v1.avro", "wb"), DatumWriter(), avro.schema.parse(json.dumps(schema_v1))) as writer:
    for user in data_v1:
        writer.append(user)

print("バージョン1のデータを書き込みました。")

# バージョン2のスキーマでバージョン1のデータを読み込む
with DataFileReader(open("users_v1.avro", "rb"), DatumReader(readers_schema=avro.schema.parse(json.dumps(schema_v2)))) as reader:
    print("バージョン2のスキーマで読み込んだバージョン1のデータ:")
    for user in reader:
        print(user)

# バージョン2のデータ
data_v2 = [
    {"name": "木村十九子", "age": 42, "email": "kimura@example.com"},
    {"name": "林二十郎", "age": 31, "email": "hayashi@example.com"}
]

# バージョン2のスキーマでデータを書き込む
with DataFileWriter(open("users_v2.avro", "wb"), DatumWriter(), avro.schema.parse(json.dumps(schema_v2))) as writer:
    for user in data_v2:
        writer.append(user)

print("バージョン2のデータを書き込みました。")

# バージョン1のスキーマでバージョン2のデータを読み込む
with DataFileReader(open("users_v2.avro", "rb"), DatumReader(writers_schema=avro.schema.parse(json.dumps(schema_v2)), readers_schema=avro.schema.parse(json.dumps(schema_v1)))) as reader:
    print("バージョン1のスキーマで読み込んだバージョン2のデータ:")
    for user in reader:
        print(user)

print("\nスキーマの進化とバージョニングのデモが完了しました。")

この例では、以下のことを示しています:

  1. バージョン1のスキーマでデータを書き込む
  2. バージョン2のスキーマ(新しいフィールドを追加)でバージョン1のデータを読み込む
  3. バージョン2のスキーマで新しいデータを書き込む
  4. バージョン1のスキーマでバージョン2のデータを読み込む

これにより、Avroがどのようにスキーマの進化を処理し、異なるバージョン間の互換性を維持するかを示しています。

第15章: Avroの実践的な使用例:センサーデータの処理

最後に、Avroの実践的な使用例として、IoTセンサーからのデータを処理するシナリオを考えてみましょう。

import json
import random
from datetime import datetime, timedelta
from avro.datafile import DataFileWriter, DataFileReader
from avro.io import DatumWriter, DatumReader
import avro.schema

# センサーデータのスキーマ
sensor_schema = {
    "namespace": "example.avro",
    "type": "record",
    "name": "SensorData",
    "fields": [
        {"name": "sensor_id", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "temperature", "type": "float"},
        {"name": "humidity", "type": "float"}
    ]
}

# スキーマの解析
parsed_schema = avro.schema.parse(json.dumps(sensor_schema))

# センサーデータの生成
def generate_sensor_data(num_records):
    base_time = datetime.now()
    for i in range(num_records):
        yield {
            "sensor_id": f"sensor_{i % 5 + 1}",
            "timestamp": int((base_time + timedelta(minutes=i)).timestamp() * 1000),
            "temperature": round(random.uniform(20.0, 30.0), 2),
            "humidity": round(random.uniform(40.0, 60.0), 2)
        }

# データの書き込み
with DataFileWriter(open("sensor_data.avro", "wb"), DatumWriter(), parsed_schema) as writer:
    for data in generate_sensor_data(100):
        writer.append(data)

print("センサーデータをAvroファイルに書き込みました。")

# データの読み込みと分析
sensor_stats = {}

with DataFileReader(open("sensor_data.avro", "rb"), DatumReader()) as reader:
    for data in reader:
        sensor_id = data["sensor_id"]
        if sensor_id not in sensor_stats:
            sensor_stats[sensor_id] = {"temp_sum": 0, "humid_sum": 0, "count": 0}
        
        sensor_stats[sensor_id]["temp_sum"] += data["temperature"]
        sensor_stats[sensor_id]["humid_sum"] += data["humidity"]
        sensor_stats[sensor_id]["count"] += 1

# 結果の表示
print("\nセンサーごとの平均値:")
for sensor_id, stats in sensor_stats.items():
    avg_temp = stats["temp_sum"] / stats["count"]
    avg_humid = stats["humid_sum"] / stats["count"]
    print(f"{sensor_id}: 平均温度 = {avg_temp:.2f}°C, 平均湿度 = {avg_humid:.2f}%")

# 異常値の検出(例:温度が28°Cを超える場合)
print("\n温度が28°Cを超えるデータ:")
with DataFileReader(open("sensor_data.avro", "rb"), DatumReader()) as reader:
    for data in reader:
        if data["temperature"] > 28.0:
            print(f"センサーID: {data['sensor_id']}, 時刻: {datetime.fromtimestamp(data['timestamp']/1000)}, 温度: {data['temperature']}°C")

この例では、以下のことを行っています:

  1. IoTセンサーからのデータを模擬したスキーマを定義
  2. ランダムなセンサーデータを生成
  3. 生成したデータをAvroファイルに書き込む
  4. Avroファイルからデータを読み込み、各センサーの平均温度と湿度を計算
  5. 温度が特定の閾値を超えるデータを検出(異常値検出の簡単な例)

この例は、Avroがどのように実際のデータ処理シナリオで使用できるかを示しています。大量のセンサーデータを効率的に保存し、後で分析するのに適しています。

以上で、PythonでのAvroの使用に関する15章からなる詳細な解説が完了しました。この記事を通じて、Avroの基本から高度な使用方法まで、幅広くカバーしました。Avroは大規模データ処理において非常に有用なツールであり、この知識がお役に立てば幸いです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?