LoginSignup
1
1

python勉強時のメモ

Posted at

sort

基本

sortはデフォルトで昇順に並べ替えられます。


test = [1,3,2]
print(test.sort())

#結果
#1 
#2 
#3

アルファベットは大文字小文字、アルファベット順にソートされます。

test = ['h','w','N','P']
print(test.sort())

#結果
#N 
#P 
#h 
#w

lambdaを使用したsort

オブジェクトの属性でsort

test = [
    ('drill', 4),
    ('circular', 5),
    ('jackhammer', 40),
    ('sander', 4),
]

# 各toolの名前(タプルの最初の要素)でソート
test.sort(key=lambda x: x[0])

print(test)
#結果
#('circular', 5)
#('drill', 4)
#('jackhammer', 40)
#('sander', 4)

順序づけしたsortはタプルで行います。
タプルの特性としてタプル同士を比較することができます。

test = [
    ('drill', 4),
    ('circular', 5),
    ('jackhammer', 40),
    ('sander', 4),
]
test[0][0] > test[1][0] #circularよりdrillの方が大きい
test[0][1] == test[3][1] #4と4は同じ

アルファベットの次に数値でsort
test = [
    ('drill', 4),
    ('circular', 5),
    ('jackhammer', 40),
    ('sander', 4),
]

test.sort(key=lambda x: (x[0], x[1]))

print(test)

#結果
#('circular', 5)
#('drill', 4)
#('jackhammer', 40)
#('sander', 4)

ウォルラス演算子 :=

:= を使って式の中で変数に値を代入することができます。

基本

:=を使うことで処理を簡潔にすることができます。
この場合、len(a) の結果が n に代入され、その後 if ステートメントでチェックされます。

if (n := len(a)) > 10:
    print(f"List is too long ({n} elements, expected <= 10)")

使わない場合、nに代入するという工程が増えてしまう

n = len(a)
if n > 10:
    print(f"List is too long ({n} elements, expected <= 10)")

while ループでの使用

例えば、ファイルから行を読み込みながらループを続ける場合

with open('file.txt') as file:
    while (line := file.readline().rstrip()):
        print(line)

switch ケースの模倣

switch ステートメントがないため、通常は辞書を使ってこれを模倣します。代入式を使うと、いくつかの場合分けを効率的に行うことができます。

action = get_action()  
if (command := action.get("type")) == "start":
    start_service(action.get("service"))
elif command == "stop":
    stop_service(action.get("service"))
elif command == "restart":
    restart_service(action.get("service"))
else:
    print("Unknown command")

タプルのアクセス方法

インデックスを使用する方法とアンパックする方法があります。

インデックス
my_tuple = (1, 2, 3)
a = my_tuple[0]
b = my_tuple[1]
c = my_tuple[2]
アンパック
my_tuple = (1, 2, 3)
a, b, c = my_tuple

アンパックを使うことで、my_tupleの各要素が一行で直接 a, b, c に割り当てられます。これにより、コードが短くなり、各変数がタプルのどの要素に対応しているかが明確になります。

アンパックの利点

  1. 簡潔性: アンパックを使用すると、複数の変数への割り当てを一行で行うことができ、コードが簡潔になります。
  2. 可読性: コードが簡潔になることで、他の人が読んだり理解したりしやすくなります。
  3. エラーの低減: インデックスを間違えるリスクが減ります。アンパックでは、コレクションの構造に基づいて自動的に割り当てられるため、インデックスの誤りによるエラーが発生しにくくなります。

辞書の欠損値の確認方法

missing

  • 用途: カスタム辞書クラスで使用し、キーが見つからない場合の特定の動作を定義する場合に適しています。
  • 特徴: 辞書サブクラスでのみ定義でき、デフォルトの動作をオーバーライドします。このメソッドは標準の辞書には存在しません。
  • 使い方: キーが存在しない場合に、特定の計算を行ったり、特定の値を返したりするような高度なカスタマイズが必要な場合に適しています。
class MyDict(dict):
    def __missing__(self, key):
        return 'デフォルト値'

my_dict = MyDict({'a': 1, 'b': 2})
print(my_dict['a'])  # 出力: 1
print(my_dict['c'])  # 出力: デフォルト値

collections.defaultdict

  • 用途: キーが存在しない場合に自動的にデフォルト値を生成する辞書が必要な場合に使用します。
  • 特徴: defaultdictは、キーが存在しない場合にファクトリ関数を使用して自動的にデフォルト値を生成します。これにより、__missing__メソッドを直接オーバーライドする必要がなくなります。
  • 使い方: キーが存在しない場合に常に同じタイプのデフォルト値が必要な場合(例えば、すべての新しいキーに対して空のリストやゼロを割り当てる場合)に便利です。
from collections import defaultdict

my_dict = defaultdict(lambda: 'デフォルト値')
my_dict['a'] = 1
my_dict['b'] = 2
print(my_dict['a'])  # 出力: 1
print(my_dict['c'])  # 出力: デフォルト値

setdefault

  • 用途: キーが存在する場合はその値を取得し、存在しない場合は新しいキーとデフォルト値を辞書に追加する場合に使用します。
  • 特徴: キーが存在しない場合、そのキーにデフォルト値を割り当ててから値を返します。キーが存在する場合は、既存の値を返します。
  • 使い方: キーの存在をチェックし、必要に応じて初期値を設定する場合に便利です。例えば、リストや他の複合データタイプを辞書の値として初期化する際によく使われます。
my_dict = {'a': 1, 'b': 2}
print(my_dict.setdefault('a', 3))  # 出力: 1 (既に'a'は存在するので変更なし)
print(my_dict.setdefault('c', 3))  # 出力: 3 ('c'は存在しないので3が追加される)
print(my_dict)  # 出力: {'a': 1, 'b': 2, 'c': 3}

get

  • 用途: キーが存在するかどうかを確認せずに辞書から値を安全に取得する場合に使用します。
  • 特徴: キーが辞書に存在しない場合、指定されたデフォルト値(デフォルトはNone)を返しますが、辞書自体を変更しません。
  • 使い方: キーの存在を確認する必要があるが、辞書に新しいエントリを追加したくない場合に適しています。単純な値の取得に最もよく使われます。
my_dict = {'apple': 5, 'banana': 3, 'orange': 2}

# キー 'apple' の値を取得します。キーが存在するため、対応する値が返されます。
apple_count = my_dict.get('apple')
print('Apple count:', apple_count)  # 出力: Apple count: 5

# キー 'grape' の値を取得します。キーが存在しないため、デフォルト値(ここでは 0)が返されます。
grape_count = my_dict.get('grape', 0)
print('Grape count:', grape_count)  # 出力: Grape count: 0

# キー 'pear' の値を取得しますが、デフォルト値を指定していません。
# このキーは存在しないため、None が返されます。
pear_count = my_dict.get('pear')
print('Pear count:', pear_count)  # 出力: Pear count: None

関数の引数の指定

*を使った指定方法

位置引数とキーワード引数の境界を示します。* の後にある引数はキーワード引数としてのみ使用できます。これにより、引数の名前を指定して関数を呼び出す必要があることを明示できます。

def example(a, b, *, c, d):
    pass

この例では、a と b は位置引数として使用できますが、c と d はキーワード引数としてのみ使用できます。

/を使った指定方法

位置専用引数の終わりを示します。/ の前にある引数は位置引数としてのみ使用でき、キーワード引数としては使用できません。

def example(a, b, /, c, d):
    pass

この例では、a と b は位置専用引数ですが、c と d は位置引数またはキーワード引数として使用できます。

記号* と /を使った指定方法

  • と / を組み合わせることで、位置専用引数、位置またはキーワード引数、キーワード専用引数を明確に区別することができます。
def example(a, b, /, c, d, *, e, f):
    pass

この例では、a と b は位置専用引数、c と d は位置またはキーワード引数、e と f はキーワード専用引数です。

yieldを使った遅延評価

遅延評価を使うことで、データが実際に必要になるまでその計算を遅らせることができます。
メモリ使用量を削減してパフォーマンスを向上させることができ、特に大量のデータや計算量の多い操作を扱う際に有効です。

通常の関数との違い

  • 通常の関数: 関数が呼び出されると、そのコードが最初から最後まで実行され、戻り値が返された後、その関数のローカル状態は失われます。

  • ジェネレータ関数: yieldを使用する関数。この関数が呼び出されると、最初のyield式まで実行され、そこで処理が停止します。関数の状態は保持され、次にそのジェネレータが呼び出されると、停止した箇所から実行が再開されます。

基本

def simple_generator():
    yield 1
    yield 2
    yield 3

for value in simple_generator():
    print(value)

この例では、simple_generator関数はジェネレータオブジェクトを生成します。このジェネレータは最初に1を生成し、次に2、そして3を生成します。
各yieldの後、関数はその状態を保持して一時停止し、次に呼び出されると再びそこから開始します。

ファイルの遅延読み込み

ファイルの遅延読み込みは、ファイルの内容を必要に応じて少しずつ読み込む方法です。
これは、特に大きなファイルを扱う場合に効果的で、ファイルの全内容を一度にメモリに読み込むのではなく、必要な部分だけを順に読み込むことでメモリの使用を効率化します。

def read_file_line_by_line(filename):
    with open(filename, 'r') as file:
        for line in file:
            yield line.strip()

# ジェネレータを使用してファイルを読み込む
for line in read_file_line_by_line('example.txt'):
    print(line)

この方法では、read_file_line_by_line関数がファイルの各行をジェネレートするジェネレータとして機能します。このジェネレータをイテレートすると、必要に応じてファイルの次の行が読み込まれます。

コンテナの動作をカスタマイズする

以下は、特定の条件(この場合は数値が特定の閾値より大きいかどうか)に基づいて要素をフィルタリングするカスタムコンテナクラスの実装例です。

class FilteredContainer:
    def __init__(self, data, threshold):
        self.data = data
        self.threshold = threshold

    def __iter__(self):
        for item in self.data:
            if item > self.threshold:
                yield item

# コンテナのインスタンスを作成(閾値を5とする)
container = FilteredContainer([1, 6, 4, 7, 3, 8], 5)

# コンテナを反復処理
for item in container:
    print(item)  # 6, 7, 8 が出力される

この例では、FilteredContainerクラスはリストと閾値を受け取ります。__iter__メソッドでは、リストの各要素を反復処理し、それが閾値より大きい場合のみyieldを使ってその要素を返します。

カスタムコンテナは、標準のリストや他の組み込みコンテナにはない特定の機能を実装するのに便利です。
たとえば、データの前処理やフィルタリング等に使用できます。

yieldのsendメソッド

sendメソッドは、ジェネレータに対して値を「送信」し、ジェネレータの実行を再開することができます。
通常のnext関数(またはジェネレータの__next__メソッド)はジェネレータから次の値を取得するだけですが、sendメソッドを使用すると、ジェネレータに値を渡すことができます。
これにより、外部からの入力に基づいて異なる挙動を実装できます。

def my_generator():
    while True:
        received = yield
        print(f'Received: {received}')

# ジェネレータの生成と最初のyieldまで進める
gen = my_generator()
next(gen)  # ジェネレータを最初のyieldまで進める

gen.send(1)  # 出力: Received: 1
gen.send(2)  # 出力: Received: 2
gen.send(3)  # 出力: Received: 3
  • 最初にnextを呼び出す(またはgen.send(None)を使う)ことで、ジェネレータを最初のyield式まで進める必要があります。これは、ジェネレータを初期化し、最初のyieldの場所まで実行を進めるためです。
  • sendメソッドを使ってジェネレータに値を送信すると、その値はジェネレータ内の現在のyield式によって受け取られます。
  • ジェネレータが終了すると(つまり、もうyield式がないとき)、sendメソッドを使用するとStopIteration例外が発生します

throw メソッド

ジェネレータ内の最も最近に実行された yield 式の位置で指定された例外を発生させるために使われます。
この機能は、ジェネレータが反復処理の途中で外部から特定の例外を受け取り、その例外に応じて特定のアクションを取る必要がある場合に使用します。

def my_generator():
    try:
        yield "Doing some processing"
        yield "Doing more processing"
    except RuntimeError as e:
        yield f"Caught an exception: {e}"

gen = my_generator()
print(next(gen))  # 最初のyieldまで実行

# RuntimeErrorをジェネレータに投げる
print(gen.throw(RuntimeError, "Something went wrong"))

この例では、my_generatorジェネレータがRuntimeError例外を捕捉し、その内容を yield しています。
外部からthrowメソッドを使ってRuntimeErrorをジェネレータ内に投げると、try...except ブロックがその例外を捕捉し、処理を行います。

ただ、throwメソッドを使うとコードが複雑になり、読みにくくなる場合があります。特に、多くの異なる種類の例外を扱ったり、深いネスト構造を持つコードで throw を使うと、コードの可読性が低下する可能性があります。
より良い例外処理を実現するためには、iter メソッドと例外の状態遷移を処理するメソッドを実装したクラスを使用する方が適切な場合があります。このアプローチでは、ジェネレータの代わりに通常のクラスを使用し、そのクラスのインスタンスが反復可能(iterable)になるようにします。例外処理は、クラスのメソッド内でより明示的に管理されます。

class MyIterable:
    def __init__(self):
        self.state = "start"

    def process(self):
        if self.state == "start":
            self.state = "processing"
            # 何らかの処理
        elif self.state == "processing":
            self.state = "done"
            # さらに別の処理

    def __iter__(self):
        return self

    def __next__(self):
        if self.state == "done":
            raise StopIteration
        self.process()
        return self.state

my_iterable = MyIterable()
for state in my_iterable:
    print(state)

クラス構成

  1. __init__メソッド: クラスの初期化メソッドです。ここで、初期状態(例えば "start")を設定します。
  2. processメソッド: クラスの状態に基づいて、何らかの処理を行います。この例では、状態が "start" から "processing"、そして "done" へと遷移します。
  3. __iter__メソッド: 反復可能オブジェクトを返すための特殊メソッドです。この場合、オブジェクト自身 (self) を返します。
  4. __next__メソッド: 反復処理の各ステップで呼び出される特殊メソッドです。ここでは、状態に基づいて次の要素(または状態)を返します。全ての処理が完了したら (state == "done")、StopIteration例外を発生させて反復処理を終了します。

具体的な動作

  1. MyIterableのインスタンス my_iterable が作成されます。
  2. forループが始まると、__iter__メソッドが呼ばれ、反復可能オブジェクトが返されます(この場合はオブジェクト自身)。
  3. ループの各イテレーションで__next__メソッドが呼ばれ、processメソッドが実行され、現在の状態が返されます。
    状態が"done"になると、StopIteration例外が発生し、ループが終了します。

クラスと継承

namedtuple

タプルと同様に不変なので、データが変更されないケースで安全に使用できます。
また、インデックス番号や名前でアクセスできるため、コードの明確さが向上します。

from collections import namedtuple

# Point クラスの定義
Point = namedtuple('Point', ['x', 'y'])

# Point インスタンスの作成
p = Point(11, y=22)

# フィールドへのアクセス
print(p[0] + p[1])  # インデックスによるアクセス
print(p.x + p.y)    # 名前によるアクセス

call

インスタンス自体を関数のように呼び出すことができる特殊なメソッドです。
つまり、__call__メソッドを定義すると、そのクラスのインスタンスを関数呼び出しの構文で使用できます。

__call__メソッドを使用する場合

  1. 関数のような振る舞いが必要なとき: クラスが単一の主要な操作を実行する場合、そのインスタンスを関数のように扱うことで、コードがシンプルで直感的になります。
  2. 状態を持つ関数が必要なとき: インスタンスが内部状態を持ち、その状態に基づいて何らかの操作を行う必要がある場合、__call__を使用することで状態を保持しつつ関数のように振る舞わせることができます。
  3. デコレータやコールバックとして使用するとき: オブジェクトをデコレータやコールバック関数として使用する場合、__call__メソッドを実装することで、オブジェクトを関数のように呼び出すことができます。
import time

class Timer:
    def __init__(self):
        self.start_time = time.time()

    def __call__(self):
        return time.time() - self.start_time

timer = Timer()
time.sleep(1)  # 1秒待機
print(timer())  # 経過時間(約1秒)を出力

__call__メソッドを使用しない方が良い場合

  1. 複数の異なる操作をサポートするとき: クラスが複数の操作を提供する場合、それぞれの操作に独自のメソッドを持つ方が、コードの意図が明確になります。
  2. 操作が明確に定義されているとき: クラスの振る舞いが一つの関数的操作に限定されない場合、それぞれの操作に対して適切なメソッド名を持つことで、コードの可読性と保守性が向上します。
  3. インスタンスが資源やサービスを表すとき: クラスがファイル、データベース接続、ネットワークリソースなどを表す場合、__call__メソッドよりも明確なメソッド名を持つ方が適切です。

以下、同じ機能を持つタイマークラスですが、__call__メソッドを使用せずに、明示的なメソッド名で経過時間を取得する方法を示します。

import time

class Timer:
    def __init__(self):
        self.start_time = time.time()

    def get_elapsed_time(self):
        return time.time() - self.start_time

# タイマーの使用
timer = Timer()
time.sleep(1)  # 1秒待機
print(timer.get_elapsed_time()) 

@classmethod

クラスに対して変わりのコンストラクタを定義することができます。
クラスの主コンストラクタ(__init__メソッド)とは異なる方法でインスタンスを生成するための手段です。

以下の例では、Dateクラスに対して、標準のコンストラクタ(init)とは別に、文字列から日付を生成する代替コンストラクタ(from_string)を定義しています。

class Date:
    def __init__(self, day=0, month=0, year=0):
        self.day = day
        self.month = month
        self.year = year

    @classmethod
    def from_string(cls, date_as_string):
        day, month, year = map(int, date_as_string.split('-'))
        return cls(day, month, year)

# 標準コンストラクタの使用
date1 = Date(12, 11, 2022)

# 代替コンストラクタの使用
date2 = Date.from_string('12-11-2022')

print(date1.day, date1.month, date1.year)  # 出力: 12 11 2022
print(date2.day, date2.month, date2.year)  # 出力: 12 11 2022

この例では、Date.from_stringメソッドは文字列を受け取り、それを解析して新しいDateインスタンスを生成しています。これにより、日付を異なる形式で提供できるようになり、クラスの柔軟性が向上します。

clsはそのメソッドが属するクラス自体を指します。これは、インスタンスメソッドでselfがインスタンス自体を指すのと似ていますが、clsはクラスレベルで操作を行うためのものです。

super()

結論としては、superクラスはダイヤモンド階層の共通スーパークラスが一度しか実行されないことを保証します。

そもそもダイヤモンド階層は、複数の継承が絡み合って、あるスーパークラスが階層内で複数回現れる状況を指し、この状況は、特に多重継承を使用するプログラミング言語において問題を引き起こす可能性があります。
そのような状況を起こさないためにPythonのメソッド解決順序(MRO)によって、ダイヤモンド階層の共通スーパークラスが一度しか実行されにようになっています。

MROは、クラスがどの順序でメソッドを検索するかを定義します。

class A:
    def method(self):
        print("A method")

class B(A):
    def method(self):
        print("B method")
        super().method()

class C(A):
    def method(self):
        print("C method")
        super().method()

class D(B, C):
    def method(self):
        print("D method")
        super().method()

d = D()
d.method()

この例では、D クラスは B と C を継承しており、これらは共に A を継承します。D インスタンスの method を呼び出すと、MROに従って D, B, C, A のメソッドが順に呼び出され、A のメソッドは一度だけ実行されます。
これにより、ダイヤモンド階層における重複実行の問題が解決されます。

mix-in

mix-inは、再利用可能なメソッドを提供するための小さなクラスです。
通常、特定の動作や機能を「ミックスイン(混ぜ合わせる)」するために他のクラスによって継承されます。
mix-inは、多重継承の複雑さを避けつつ、コードの再利用を可能にする一つの方法です。

mix-inクラスの定義
class JsonMixin:
    def to_json(self):
        import json
        return json.dumps(self.__dict__)

class XmlMixin:
    def to_xml(self):
        # XMLへの変換ロジック(省略)
        pass
mix-inを使用して新しいクラスを作成
class Contact(JsonMixin, XmlMixin):
    def __init__(self, name, email):
        self.name = name
        self.email = email
インスタンスの使用
c = Contact("John Doe", "jdoe@example.com")
print(c.to_json())  

この結果は、Contact インスタンスの name と email 属性が JSON 形式の文字列に変換されたものです。
json.dumps() 関数は辞書を受け取り、それを JSON 形式の文字列にシリアライズします。
ここでは、self.__dict__が{"name": "John Doe", "email": "jdoe@example.com"} という辞書を表し、これが JSON 文字列に変換されています。

self.dict は、Python のオブジェクトが持つインスタンス変数(属性)を辞書形式で格納している特別な属性です。この辞書には、コンストラクタ内で定義された属性だけでなく、オブジェクトのライフサイクル中にそのインスタンスに追加されたすべてのインスタンス属性が含まれます。

__dict__の使用例
class MyClass:
    def __init__(self):
        self.a = 1
        self.b = 2

    def add_attribute(self, name, value):
        setattr(self, name, value)

obj = MyClass()
obj.add_attribute('c', 3)
print(obj.__dict__) #{'a': 1, 'b': 2, 'c': 3}

この例では、コンストラクタ内で a と b が定義され、add_attribute メソッドを通じて後から c が追加されています。

  • インスタンス属性の継承を避ける: mix-inは通常、状態(インスタンス属性)を持たず、振る舞い(メソッド)のみを提供する。
  • カスタマイズ可能な振る舞いの提供: インスタンスレベルでの振る舞いのカスタマイズを可能にするため、mix-inはプラグイン可能な機能を提供するべきです。
  • シンプルな機能から複雑な機能へ: 単一の機能を提供する小さなmix-inを作成し、これらを組み合わせて複雑な機能を構築します。
  • メソッドの組み込み: mix-inはインスタンスメソッドまたはクラスメソッドを提供できますが、その目的に応じて適切なタイプを選択する必要があります。

メタクラスと属性

Pythonでは、属性はデフォルトでパブリックです。このため、単純なゲッターやセッターメソッドを定義する代わりに、直接属性にアクセスすることが一般的です。

class MyClass:
    def __init__(self, value):
        self.value = value

この例では、valueはパブリック属性であり、直接アクセスや更新が可能です。

@propertyの使用
@propertyデコレータは、属性にアクセスされたときに特別な振る舞いが必要な場合に使用されます。これにより、属性の取得や設定時に追加のロジックを実行できますが、外部から見ると普通の属性のように振る舞います。

class MyClass:
    def __init__(self, value):
        self._value = value

    @property
    def value(self):
        return self._value

    @value.setter
    def value(self, value):
        if value < 0:
            raise ValueError("Value cannot be negative")
        self._value = value
        
    @value.deleter
    def value(self):
        del self._value
  • ゲッターメソッドとしての value
    @propertyのvalueは読み取り専用のプロパティとして定義されています。

  • セッターメソッドとしての value
    @value.setterのvalueは値を設定するためのロジックが含まれています。value プロパティに新しい値を設定する際には、その値が負でないことを検証します。負の値を設定しようとすると、ValueError が発生します。

  • 削除メソッドとしての value
    @value.deleterでプロパティを削除するためのロジックも含めることができます。この場合、del obj.value のように記述すると、_value 属性が削除されます。

これらの例では、value は単なるメソッドではなく、属性のように振る舞う特別な関数として定義されています。これにより、オブジェクトの状態をより柔軟に管理することができます。

@staticmethod

@staticmethod は、Pythonでクラスの中に定義される静的メソッドを表すためのデコレータです。静的メソッドは、そのクラスのインスタンス(オブジェクト)やクラス自体とは独立しており、クラスのインスタンス属性やクラス属性にアクセスする必要がないメソッドです。

class MyClass:
    @staticmethod
    def my_static_method(arg1, arg2):
        # arg1とarg2を使った処理
        return arg1 + arg2

# クラスをインスタンス化せずにメソッドを呼び出す
result = MyClass.my_static_method(5, 3)

この例では、my_static_method は MyClass の静的メソッドとして定義されており、MyClass のインスタンス化なしに直接呼び出すことができます。

静的メソッドは、そのメソッドがクラスのインスタンスやクラス自体の状態に依存しないが、そのクラスの機能や目的に密接に関連している場合に便利です。例えば、あるクラスが扱うデータを検証する関数や、特定の形式のデータを生成するユーティリティ関数などがそれに該当します。

再利用可能な@propertyメソッドにディスクリプタを使う

ディスクリプタは、getsetdelete メソッドを持つクラスです。これらのメソッドを通じて、属性へのアクセス(取得、設定、削除)を制御できます。

@property等を繰り返し使う場合は、コードの重複を避け、より効率的な構造を提供するためにディスクリプタクラスを作成することが有効です。

@propertyを繰り返し使う例
class Person:
    def __init__(self, name, age):
        self._name = name
        self._age = age

    @property
    def name(self):
        return self._name

    @name.setter
    def name(self, value):
        self._name = value

    @property
    def age(self):
        return self._age

    @age.setter
    def age(self, value):
        if value < 0:
            raise ValueError("Age cannot be negative")
        self._age = value

person = Person("Alice", 30)
print(person.name)  # Alice
person.age = 31
print(person.age)  # 31
ディスクリプタクラスに置き換えた例
class NonNegative:
    def __init__(self, name):
        self.name = name

    def __get__(self, instance, owner):
        return instance.__dict__[self.name]

    def __set__(self, instance, value):
        if value < 0:
            raise ValueError(f"{self.name} cannot be negative")
        instance.__dict__[self.name] = value

class Person:
    age = NonNegative("age")

    def __init__(self, name, age):
        self.name = name
        self.age = age  #ここで NonNegative.__set__ が呼ばれる

person = Person("Bob", 25)
print(person.age)  # 25
person.age = 26
print(person.age)  # 26

try:
    person.age = -5  # ValueError が発生する
except ValueError as e:
    print(e)

上記ディスクリプタクラスの例では、Person クラスの age 属性が NonNegative ディスクリプタによって管理されています。

person = Person("Bob", 25) もしくは person.age = -5 で__set__メソッドが呼び出さたとき、instance 引数は person(属性を設定しているインスタンス)を指し、value 引数は設定される値 25 もしくは -5 を指します。

getattr

クラスが__getattr__を定義している場合、オブジェクトのインスタンス辞書に属性が見つからないときは、いつも__getattr__が呼び出されます

以下、getattr を使ってロギング機能を実装し、クラスの属性へのアクセスが行われた際に、その情報を記録することができる例です

class MyLoggingClass:
    def __init__(self):
        self.existing_attribute = "This attribute exists"

    def __getattr__(self, name):
        # 属性が見つからない場合に呼び出される
        message = f"Attribute '{name}' was accessed but does not exist"
        self.log_message(message)
        raise AttributeError(message)

    def log_message(self, message):
        # ロギングメッセージを処理する(ここでは単に出力する)
        print(f"Logging: {message}")

# クラスのインスタンス化
obj = MyLoggingClass()

# 存在する属性にアクセス
print(obj.existing_attribute)

# 存在しない属性にアクセス(__getattr__ が呼び出される)
try:
    obj.non_existing_attribute
except AttributeError as e:
    print(e)

getattribute

getattribute を使用すると、クラスのインスタンスに対するあらゆる属性アクセスがこのメソッドを通過します。

getattribute は、属性アクセスのたびに呼び出されるため、ログ取得に適しています。ただし、getattribute で属性が見つからない場合、自動的に getattr が呼び出されることにも注意が必要です。

以下の例では、LoggingClass において、任意の属性アクセスに対するログ記録を行います。存在しない属性へのアクセスがあった場合、Pythonの標準的な振る舞いに従って AttributeError を発生させます。

class LoggingClass:
    def __init__(self, *args, **kwargs):
        self._allowed_attributes = set(['existing_attribute'])
        for key, value in kwargs.items():
            setattr(self, key, value)

    def __getattribute__(self, name):
        if name.startswith('_'):
            return super().__getattribute__(name)

        # ログ記録
        print(f"Accessing attribute: {name}")

        # 存在しない属性へのアクセスの処理
        if name not in super().__getattribute__('_allowed_attributes'):
            raise AttributeError(f"{self.__class__.__name__} object has no attribute '{name}'")

        return super().__getattribute__(name)

    def __getattr__(self, name):
        # __getattribute__ で処理されなかった属性へのアクセス
        print(f"__getattr__ called for: {name}")
        raise AttributeError(f"{self.__class__.__name__} object has no attribute '{name}'")

# 使用例
obj = LoggingClass(existing_attribute="This exists")
print(obj.existing_attribute)  # 存在する属性へのアクセス

try:
    obj.non_existing_attribute
except AttributeError as e:
    print(e)  # 存在しない属性へのアクセス

この例では、getattribute で属性のアクセスを捕捉し、ログを記録します。_allowed_attributes に含まれていない属性にアクセスしようとすると、AttributeError が発生します。getattribute で処理されなかった属性へのアクセスは getattr によって処理されます。

注意

__getattribute__内で属性を参照すると、その参照が再び getattribute を呼び出し、結果的に無限ループに陥る可能性があります。
この場合、object クラスの getattribute メソッドを直接呼び出します。これにより、カスタム getattribute メソッドをバイパスして、Pythonのデフォルトの属性アクセス処理が行われます

  • getattr は、getattribute によって属性が見つからなかった場合(つまり AttributeError が発生した場合)にのみ呼び出されます。

  • 同じ属性へのアクセスに対して AttributeError が2回発生することはないです。getattribute がエラーを発生させると、そのエラーは getattr に引き継がれ、getattr で処理されます。

  • startswithで内部的またはプライベートな属性な属性かチェックし、その場合は特殊な処理は不要という考えから、return super().__getattribute__(name)で追加の処理なしに値を取得しています

メタクラスと__new__メソッド

メタクラスはtypeを継承して定義され、クラスの作成とカスタマイズに関与します。
__new__メソッドは、クラスのインスタンス(つまり、新しいクラス)が実際に作成される前に実行されます。このメソッドはクラスの定義が完了した後に呼び出され、そのクラスの作成をカスタマイズ(確認)できます。

class Meta(type):
    def __new__(cls, name, bases, dct):
        # クラス定義のカスタマイズ
        return super().__new__(cls, name, bases, dct)

class MyClass(metaclass=Meta):
    pass

ここではMetaというメタクラスがあり、MyClassの作成時にMeta.__new__が呼ばれ、クラスの定義をカスタマイズできます。

new メソッドの引数

  1. cls:メタクラスのインスタンス。__new__が呼び出される際には、これはメタクラス自身を指します。
  2. name:作成されるクラスの名前。
  3. bases:作成されるクラスのベースクラスのタプル。
  4. dct:クラスの属性を含む辞書。クラス本体で定義されたメソッドや変数が含まれます。

return super().__new__(cls, name, bases, dct)によって実際に新しいクラスを作成するために用いられます。
super()は、継承階層の次のクラス(この場合はtype)の同名メソッドを呼び出します。ここで、typeの__new__メソッドが新しいクラスを作成し、それを返します。

metaclass=Metaはこのクラス定義によって作成されるクラスは、Metaによって定義されたルールに従って作成されます。

init_subclass

クラスが別のクラスのサブクラスとして定義される際に自動的に呼び出されます。これを使用してサブクラスの定義時に検証やカスタマイズを行うことができます。

class Grandparent:
    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        print(f"Grandparent __init_subclass__ called for {cls.__name__}")

class Parent(Grandparent):
    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        print(f"Parent __init_subclass__ called for {cls.__name__}")

class Child(Parent):
    pass

この例では、Childクラスが定義されると、以下の順序で__init_subclass__メソッドが呼び出されます:

処理の流れ(最も親の階層まで遡り、そこから出力される)

  1. Childクラスの定義:
    Childクラスが定義されると、Pythonの内部処理により__init_subclass__が自動的に呼び出されます。
  2. Parent.__init_subclass__の実行:
    最初に、Childクラスの直接の親クラスであるParentの__init_subclass__が呼び出されます。
    このメソッド内でsuper().init_subclass(**kwargs)が呼び出され、制御がさらに上の階層に移ります。
  3. Grandparent.__init_subclass__の実行:
    Parent.__init_subclass__からのsuper()呼び出しにより、次にGrandparentの__init_subclass__が実行されます。
    Grandparentクラスはobjectクラスから継承されているため、ここで初期化プロセスは完了します。
  4. 出力:
    Grandparent.__init_subclass__が最初に実行され、「Grandparent init_subclass called for Child」と出力されます。
    次に、制御がParent.__init_subclass__に戻り、「Parent init_subclass called for Child」と出力されます。
重要な点
  • super().init_subclass(**kwargs)の呼び出しは、サブクラスが定義される際に、そのサブクラスのすべての親クラスで定義された__init_subclass__メソッドが適切に実行されることを保証します。
  • これにより、複数の階層を持つクラス構造でも、各階層の初期化プロセスが適切に行われます。
  • **kwargsを通じて、サブクラスから親クラスの__init_subclass__に追加の情報を渡すことができます。

クラス登録とメタクラスの使用

クラス登録は、特定の基底クラスのすべてのサブクラスを自動的に記録する方法です。これは、特定のタイプのオブジェクトを動的に識別し、使うために役立ちます。メタクラスを使うと、このプロセスを自動化できます。

class ProcessorBase:
    registry = []

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        cls.registry.append(cls)

    def process(self, data):
        raise NotImplementedError("You must implement the process method.")

class ImageProcessor(ProcessorBase):
    def process(self, data):
        print(f"Processing image data: {data}")

class TextProcessor(ProcessorBase):
    def process(self, data):
        print(f"Processing text data: {data}")

# 使用例
for processor_cls in ProcessorBase.registry:
    processor = processor_cls()
    processor.process("my_data")

この例では、ProcessorBase は基底クラスで、ImageProcessor と TextProcessor はサブクラスです。ProcessorBase.registry には、これらのサブクラスが自動的に登録されます。プログラムの後半では、登録されたすべてのプロセッサーを反復処理し、それぞれにデータを処理させます。

以下別の例として、ファクトリーパターンを考えます。このパターンでは、オブジェクトの作成をサブクラスに委ね、基底クラスはどのサブクラスを使用するかを動的に決定します。

class Animal:
    registry = []

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        cls.registry.append(cls)

    def make_sound(self):
        raise NotImplementedError

class Dog(Animal):
    def make_sound(self):
        return "Bark!"

class Cat(Animal):
    def make_sound(self):
        return "Meow!"

# ファクトリー関数
def create_animal(animal_type):
    for animal_cls in Animal.registry:
        if animal_cls.__name__.lower() == animal_type.lower():
            return animal_cls()
    raise ValueError(f"Unknown animal type: {animal_type}")

# 使用例
animal = create_animal("dog")
print(animal.make_sound())  # Bark!

この例では、Animal 基底クラスとそのサブクラス Dog と Cat があります。create_animal 関数は、指定された動物タイプに基づいて適切な動物オブジェクトを作成します。このように、基底クラスを通じてサブクラスを動的に管理・利用できます。

クラス属性に__set_nbame__で注釈を加える

このメソッドは、ディスクリプタがクラスに割り当てられたときに自動的に呼び出され、ディスクリプタに取り囲むクラスとその属性名に関する情報を提供します。

class Descriptor:
    def __set_name__(self, owner, name):
        self.public_name = name
        self.private_name = '_' + name

    def __get__(self, instance, owner):
        return getattr(instance, self.private_name)

    def __set__(self, instance, value):
        setattr(instance, self.private_name, value)

class MyClass:
    name = Descriptor()

この例では、ディスクリプタは属性名に基づいて内部的にプライベートな名前を生成し、それを使用して値を格納します。

  1. obj.name = 'Test Name' で Descriptor.set が呼ばれ、obj の _name 属性に 'Test Name' が設定されます。
  2. print(obj.name) で Descriptor.get が呼ばれ、obj の _name 属性の値 'Test Name' が取得されて出力されます。

クラスデコレータ

クラスの全メソッドや属性に対して一括で変更を加えたい場合や、クラスの拡張や修正を行いたい場合に非常に便利です。

以下は対象クラスの各メソッドの実行時間を計測して出力します

import time
import functools

def time_tracker(cls):
    class Wrapper:
        def __init__(self, *args, **kwargs):
            self.wrapped = cls(*args, **kwargs)

        def __getattribute__(self, name):
            try:
                attr = super().__getattribute__(name)
            except AttributeError:
                pass
            else:
                if callable(attr):
                    return functools.partial(self._time_method, attr)
                return attr

        def _time_method(self, method, *args, **kwargs):
            start_time = time.time()
            result = method(*args, **kwargs)
            end_time = time.time()
            print(f'{cls.__name__}.{method.__name__} - 実行時間: {end_time - start_time:.4f}')
            return result

    return Wrapper

@time_tracker
class MyClass:
    def method1(self):
        time.sleep(1)

    def method2(self, n):
        time.sleep(n)

obj = MyClass()
obj.method1()
obj.method2(2)
  1. time_tracker は引数としてクラス cls (この場合は MyClass)を受け取ります。
  2. デコレータ内部で Wrapper という内部クラスを定義します。このクラスは cls のインスタンスをラップ(包み込む)するために使われます。
  3. Wrapper クラスの init メソッドは cls のインスタンスを作成し、wrapped 属性に格納します。
  4. getattribute メソッドは、Wrapper インスタンスの任意の属性にアクセスされたときに呼び出されます。このメソッドは属性が呼び出し可能(関数やメソッド)である場合、その属性を _time_method メソッドでラップしてから返します。
  5. _time_method メソッドは、実際のメソッドの実行前後で時間を記録し、その実行時間を出力します。

使用例

  • obj = MyClass() で MyClass のインスタンスを作成します。ここで、実際には Wrapper クラスのインスタンスが作成されます。
  • obj.method1() と obj.method2(2) を呼び出すと、これらのメソッド呼び出しは Wrapper クラスの _time_method を通じて行われ、各メソッドの実行時間が計測されて出力されます。

並行性と並列性

subprocessを使って子プロセスを管理する

subprocessモジュールは、外部のプログラムやコマンドをPythonプログラムから実行するために使用されます。
これにより、Pythonプログラム内で別のプログラムを起動し、そのプログラムの出力を受け取ったり、入力を送ったりすることができます。

「子プロセス」とは、Pythonプログラムから生成され、独立して実行されるプロセス(またはタスク)のことです。
Pythonプログラムが実行されている間、subprocessを使って起動された外部プログラム(子プロセス)は、Pythonインタプリタとは別に、自分自身のプロセスとして動作します。つまり、Pythonプログラムと同時かつ、別々に動くということです。

[1] run 関数を使った基本的な使用例

import subprocess

result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout)
  • subprocess.run(...): ここでlsコマンドを実行します。
  • capture_output=True: コマンドの出力(結果)をキャプチャ(取得)するための設定です。
  • text=True: 出力をテキスト形式で取得するための設定です。
  • print(result.stdout): コマンドの出力を表示します。

[2]Popen クラスを使った使用例
この例では、grepコマンドを使ってファイル内の特定の文字列を検索しています。

import subprocess

with subprocess.Popen(['grep', '特定の文字列', 'ファイル名'], stdout=subprocess.PIPE) as proc:
    result = proc.stdout.read()

print(result.decode('utf-8'))
  • subprocess.Popen(...): ここでgrepコマンドを実行します。
  • コマンド['grep', '特定の文字列', 'ファイル名']: grepコマンドに渡す引数です。ここで特定の文字列をファイル名内で検索します。
  • stdout=subprocess.PIPE: grepコマンドの出力をプログラム内で利用できるように設定します。
  • proc.stdout.read(): grepコマンドの出力を読み取ります。
  • result.decode('utf-8'): 出力を文字列として表示するためのデコード処理です。

stdout=subprocess.PIPEの補足
stdoutは、プログラムやコマンドが実行された結果として出力されるデータのストリームです。通常、コマンドラインでコマンドを実行すると、この出力はコマンドライン上に直接表示されます。
subprocess.PIPEは、この標準出力を現在のPythonプログラム内の特定のパイプ(通信チャネル)にリダイレクトするための指示です。これにより、プログラムが生成する出力をプログラム内で直接読み取り、操作することが可能になります。

[3]UNIX的なパイプライン
以下の例では、2つのコマンド(lsとgrep)を組み合わせて、特定のファイルを検索しています。

import subprocess

with subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE) as ls_proc:
    with subprocess.Popen(['grep', '特定のファイル'], stdin=ls_proc.stdout, stdout=subprocess.PIPE) as grep_proc:
        ls_proc.stdout.close()
        output = grep_proc.communicate()[0]

print(output.decode('utf-8'))

[4]タイムアウトを設定する
以下の例では、長時間実行される可能性があるコマンドに対してタイムアウトを設定しています。

import subprocess

try:
    proc = subprocess.Popen(['some_command'], stdout=subprocess.PIPE)
    stdout, stderr = proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
    proc.kill()
    outs, errs = proc.communicate()
    print("タイムアウト:プロセスが長時間実行されました。")

except内で再度communicateを実行する理由は、プロセスがタイムアウトで強制終了された後も、プロセスが生成した出力やエラーがバッファに残っている可能性があるため、proc.communicate()を再度実行することで、これらの情報を取得し、処理やデバッグに利用できるようにしています。

スレッド

Pythonのスレッド処理であるGILは、一度に一つのスレッドのみがPythonオブジェクトにアクセスできるように制限するロックです。つまり、PythonのスレッドはマルチコアCPUを使って真の並列処理を実現できません。

しかし、スレッドは見かけ上同時に複数のタスクを処理することができます。
これは特に、ブロックするI/O操作(ファイル読み書き、ネットワークリクエストなど)を多用するプログラムにおいて有効です。
GILはCPUバウンド処理(計算が主なタスク)に影響を与えますが、I/Oバウンド処理(入出力が主なタスク)ではその影響が少ないためです。

ブロックするI/O(入出力)操作とは、プログラムがデータの読み書きを行う際にその操作が完了するまでプログラムの実行を一時停止(ブロック)することを指します。この間、プログラムは他の作業を進めることができません。

  • ファイル読み書き
  • ネットワークリクエスト
  • データベース操作

以下Pythonのスレッドを使用した簡単な例

import threading
import time

def thread_function(name):
    print(f"Thread {name}: starting")
    time.sleep(2)
    print(f"Thread {name}: finishing")

if __name__ == "__main__":
    threads = []
    for index in range(3):
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for thread in threads:
        thread.join()

この例では、3つのスレッドが同時に開始され、それぞれ2秒間スリープします。スリープ中も他のスレッドは独立して実行されるため、全体の実行時間は2秒程度になります。

if name == "main":はスクリプトが直接実行された場合にのみ以下のコードブロックを実行するようにしています。
また、threading.Threadで新しいスレッドが作成され、argsでtargetに渡す引数をタプルとして指定しています。x.start()でスレッドをバッククラウンドで開始します

  1. スレッドの開始 (start() メソッド):
  • for index in range(3): ループ内で、新しいスレッドが作成され、その都度 start() メソッドが呼び出されます。
  • start() メソッドは、対応するスレッドに thread_function の実行を開始させます。ただし、この時点では thread_function の実行が完了しているわけではありません。スレッドはバックグラウンドで実行を続けます。
  1. for thread in threads: ループの実行:
  • for index in range(3): ループが完了し、全てのスレッドが start() メソッドによって開始された後、次のステップに移ります。
  • この時点で、for thread in threads: ループが実行されます。このループは、リスト threads 内の各スレッドに対して thread.join() メソッドを呼び出します。
  • join() メソッドは、それぞれのスレッドの実行が完了するまで、メインスレッド(スクリプトを実行しているメインの実行スレッド)の実行を一時停止します。
  1. スレッドの終了待機:
  • 各スレッドが thread_function の実行を完了すると、join() メソッドはそのスレッドに対する待機を終了します。
  • 全てのスレッドが終了すると、for thread in threads: ループは完了し、メインスレッドは次の処理に進むことができます。

要するに、start() メソッドによってスレッドが開始された後、すぐに for thread in threads: ループが実行されますが、このループは全てのスレッドが終了するまでメインスレッドの実行を一時停止します。これにより、スクリプトの残りの部分は、全てのバックグラウンドスレッドのタスクが完了するまで実行されません。

スレッド間のデータ競合を避ける

スレッド間のデータ競合は、複数のスレッドが同じデータに同時にアクセスし、それを変更しようとする場合に発生します。これによりデータの整合性が損なわれ、予期しないバグやエラーが発生する可能性があります。

実戦に近い例として、ウェブスクレイピングやデータ処理などのタスクを複数のスレッドで並行して実行するケースを考えてみましょう。この場合、複数のスレッドが同時に共有リソース(例えば、データベースへの書き込み)にアクセスする可能性があり、データ競合が発生するリスクがあります。

import threading
import requests
from bs4 import BeautifulSoup

scraped_data = []
data_lock = threading.Lock()

def scrape_website(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    title = soup.find('h1').text
    with data_lock:  # データの安全な追加
        scraped_data.append(title)

urls = ['http://example.com/page1', 'http://example.com/page2', ...]
threads = [threading.Thread(target=scrape_website, args=(url,)) for url in urls]

for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print(scraped_data)

with data_lock:により、ブロック内のコードは一度に一つのスレッドのみが実行されるので、共有で追加していくリストへのアクセスが安全になります。

スレッド間の協調作業にはqueueを使う

パイプラインは、異なるステージ間でデータを連続的に処理することを可能にします。
各ステージは別のスレッドで実行され、Queueクラスを使用してステージ間でデータをやり取りします。

例として、ウェブからデータをスクレイピングし、それを処理して最終的にデータベースに保存するパイプラインを挙げます。
このプロセスを3つのステージ(スクレイピング、処理、保存)に分け、それぞれを別のスレッドで実行します。

  1. スクレイピング: ウェブページからデータを取得。
  2. データ処理: 取得したデータを解析・整形。
  3. データベースへの保存: 処理したデータをデータベースに保存。

各ステージはQueueを介して通信します。スクレイピングステージはデータをQueueに入れ、データ処理ステージはそのQueueからデータを取り出し、処理後のデータを次のQueueに入れます。最終ステージである保存ステージがこのデータを取り出してデータベースに保存します。

import threading
import queue
import requests
from bs4 import BeautifulSoup

# ステージ間のQueue
scrape_queue = queue.Queue()
process_queue = queue.Queue()

def scraper():
    urls = ['http://example.com/page1', 'http://example.com/page2', ...]
    for url in urls:
        data = requests.get(url).content
        scrape_queue.put(data)
    scrape_queue.put(None)  # スクレイピング終了シグナル

def processor():
    while True:
        data = scrape_queue.get()
        if data is None:
            process_queue.put(None)  # 処理終了シグナル
            scrape_queue.task_done()
            break
        # データの処理
        processed_data = BeautifulSoup(data, 'html.parser').find_all('p')
        process_queue.put(processed_data)
        scrape_queue.task_done()

def saver():
    while True:
        data = process_queue.get()
        if data is None:
            process_queue.task_done()
            break
        # データベースへの保存処理
        # save_to_database(data)
        process_queue.task_done()

# スレッドの開始
threading.Thread(target=scraper).start()
threading.Thread(target=processor).start()
threading.Thread(target=saver).start()

# Queueの完了を待機
scrape_queue.join()
process_queue.join()

これらのスレッドは非同期に動作します。つまり、一つのスレッドが別のスレッドの完了を待たずに、それぞれ独立して作業を進めます。
また、Queueはスレッド間のデータのやり取りを同期します。つまり、一つのスレッドがQueueにデータを追加すると、別のスレッドがそのデータを取り出して処理を行うことができます。

今回の例では、各queueは終了シグナル(none)を実行終了後に追加することで、noneを受け取った別スレッドがそのqueueを終了させています(そうしないとwhile True:で回り続ける)

パイプラインの問題点
ビジーウェイト: 一つのステージが遅い場合、後続のステージがデータを待つ必要があり、全体のスループットが低下します。
作業停止: ステージ間のデータの流れが不均一な場合、一部のステージがアイドル状態になる可能性があります。
メモリ爆発: Queueのサイズを適切に管理しないと、メモリ消費が増大し、システムに負荷をかける原因となります。

平行処理が必要な場合を認知する

プログラムが大規模かつ複雑になると、複数の作業を同時に実行することが効率的、または必要になることがあります。このような場合、プログラムの並列実行を考慮する必要があります。
その中でも、ファンアウトとファンインは、並列処理の典型的なパターンです。

  • ファンアウト
    ファンアウトは、一つのタスクが複数のサブタスクに分割され、それらが並列に実行されるパターンです。これは特に、データ処理やネットワークリクエストのようなI/O重視のタスクで効果的です。
    具体例
    例として、ウェブスクレイピングを考えます。複数のURLからデータを取得する必要がある場合、各URLに対して別々のスレッドを作成し、それぞれのスレッドで異なるページを同時にスクレイピングします。

  • ファンイン
    ファンインは、複数のサブタスクの結果を一つの結果に統合するパターンです。サブタスクの完了後、その結果を集めて最終的な出力を生成します。
    具体例
    先のウェブスクレイピングの例で、各スレッドが取得したデータ(例えば、ウェブページからのテキストデータ)を一つのデータ構造(例えば、リストやデータベース)に統合します。

ファンアウトとファンインを行うための様々な方法
  1. threading モジュール
    メリット
    直感的な使用: Threadクラスは理解しやすく、使いやすい。
    リアルタイム処理: レスポンスタイムが重要なタスクに適しています。
    デメリット
    GILの制限: PythonのGIL(グローバルインタープリタロック)により、CPUバウンドタスクでは並列処理の効果が限られる。
    リソースの管理: スレッドの数が多くなると、リソースの管理が難しくなる。
  2. multiprocessing モジュール
    メリット
    GILの回避: 各プロセスは独自のPythonインタプリタとメモリ空間を持つため、CPUバウンドタスクで真の並列処理が可能。
    大規模なデータ処理: 大量のデータや複雑な処理に適しています。
    デメリット
    オーバーヘッド: プロセス間通信(IPC)にはオーバーヘッドが伴う。
    複雑性: threadingに比べて使用が複雑で、デバッグが難しいことがある。
  3. concurrent.futures モジュール
    メリット
    高レベルの抽象化: ThreadPoolExecutorやProcessPoolExecutorを使用して簡単に並列処理が実装できる。
    柔軟性: スレッドベースとプロセスベースの並列処理を選択できる。
    デメリット
    エラーハンドリングの複雑さ: 並列処理中のエラーを追跡し処理するのが難しい場合がある。
    スケーラビリティの限界: 一定数以上のタスクでは効率が落ちる可能性がある。
  4. アクターモデル(pykkaなど)
    メリット
    メッセージ駆動: タスク間でのメッセージパッシングによる並列処理。
    モジュール性と分離: コンポーネントの独立性が高く、モジュール性に優れている。
    デメリット
    学習曲線: アクターモデルの概念は習得が必要。
    依存性: 外部ライブラリに依存することが多い。
  5. asyncio(非同期プログラミング)
    メリット
    I/Oバウンドタスク向け: I/O待ち時間を効率的に処理。
    スケーラブル: 大量の非同期タスクを効率的に管理できる。
    デメリット
    CPUバウンドタスクには不向き: CPU処理にはあまり適していない。
    複雑なフロー制御: 非同期コードのフロー制御は複雑になることがある。
スレッドの欠点
  1. メモリとパフォーマンスのコスト:
    多数のスレッドを同時に実行する場合、それぞれが独自のメモリを必要とします。
    スレッドの開始と管理にはオーバーヘッドが伴います。
  2. 同期の複雑さ:
    複数のスレッドが共有リソースにアクセスする場合、データ競合を防ぐためにロックなどの同期メカニズムが必要です。
    不適切な同期はデッドロックやライブロックなどの問題を引き起こす可能性があります。
  3. デバッグの難しさ:
    スレッド内で発生した例外を適切に処理し、メインスレッドに伝達するための組み込みの方法がありません。
    複数のスレッドが同時に実行されるため、どのスレッドがいつ、どのように実行されるかを追跡するのが難しいです。

これらの欠点を考慮すると、スレッドの使用は注意深い設計とエラーハンドリングが必要であり、場合によっては他の並列・非同期処理手法(例:multiprocessing、asyncio)の検討が望ましいです。

Queueを用いたファンアウトとファンインの改善

メリット

  1. 作業スレッドの管理: 決まった数の作業スレッドを使用することで、スレッドの生成と破棄に関連するコストを削減できます。
  2. データの整合性: スレッド間でデータを安全にやり取りするための同期メカニズムとして、Queueは非常に役立ちます。
  3. スレッドの負荷分散: Queueを使用することで、スレッド間でタスクを効率的に分配し、負荷を均一にすることが可能です。

デメリット

  1. リファクタリングのコスト: 既存のコードをQueueベースのパイプライン処理にリファクタリングするには、多大な作業とコードの再構築が必要になることがあります。
  2. I/O並列性の制限: Queueを使用したプログラムでは、基本的にスレッドの数によって処理できるI/Oの並列度が制限されます。これは、各スレッドが一度に一つのタスクしか処理できないためです。
import threading
import queue
import requests

def worker(url_queue, result_queue):
    while True:
        # QueueからURLを取得
        url = url_queue.get()
        if url is None:
            # 終了シグナルの場合
            break

        # スクレイピング
        result = requests.get(url).content
        result_queue.put(result)
        url_queue.task_done()

urls = ["http://example.com/page1", "http://example.com/page2", ...]

url_queue = queue.Queue()
result_queue = queue.Queue()

# ワーカースレッドの作成と開始
for _ in range(5):  # 例えば、5つのワーカースレッド
    threading.Thread(target=worker, args=(url_queue, result_queue)).start()

# URLをQueueに追加
for url in urls:
    url_queue.put(url)

# 終了シグナルの追加
for _ in range(5):
    url_queue.put(None)

# 全てのタスクが完了するのを待つ
url_queue.join()

# 結果を処理
while not result_queue.empty():
    print(result_queue.get())

url_queue.task_done() を呼び出すと、それは url_queue に対して一つのタスクが完了したことを示します。ただし、url_queue.join() は、url_queue に追加された全てのアイテムについて task_done() が呼び出されるまで待機するため、全てのURLが処理されるまでプログラムは次のステップに進みません。

並行性のためにスレッドが必要なときはThreadPoolExecutorを考える

簡単に並列処理を実装することができます。これにより、特にI/Oバウンドなタスクの並列ファンアウトにおいて、スレッドの起動コストを削減し、リファクタリングの労力を軽減することが可能です。

ThreadPoolExecutorのメリット

  1. スレッド管理の簡略化: スレッドの作成、実行、終了処理が内部で自動的に管理されるため、手動でのスレッド管理の必要がなくなります。

  2. リソースの効率的な使用: スレッドプールを使用することで、使用可能なリソース内で最適な数のスレッドが動作します。これにより、システムの過負荷を防ぎつつ、タスクの並列実行が可能になります。

  3. 簡単なリファクタリング: 既存の非並列コードをThreadPoolExecutorを使った並列コードにリファクタリングするのは比較的簡単です。特に、関数呼び出しをexecutor.submit()やexecutor.map()に置き換えるだけで並列化できる場合が多いです。

  4. I/O並列化の最適化: I/Oバウンドなタスク(例: ファイル読み書き、ネットワークリクエストなど)については、ThreadPoolExecutorを使用することで、効率的に並列処理を行うことができます。

以下の例では、複数のユーザーIDに基づいてユーザー情報をデータベースから取得しますが、各クエリは独立しているため、並列に実行することが可能です。

import psycopg2
from concurrent.futures import ThreadPoolExecutor

def fetch_user_data(user_id):
    with psycopg2.connect("dbname=test user=postgres password=secret") as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT * FROM users WHERE id = %s;", (user_id,))
            return cur.fetchone()

user_ids = [1, 2, 3, 4, 5]  # 例えば、取得したいユーザーIDのリスト

# スレッドプールを使用してデータベースクエリを並列に実行
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch_user_data, user_ids))

# 結果の処理
for user_data in results:
    print(user_data)  # 各ユーザーのデータを表示

max_workers=5 により、同時に最大5つのスレッドを実行するスレッドプールを作成します。

  1. executor.map は user_ids の各要素(ユーザーID)に対して fetch_user_data 関数を並列に実行します。

  2. 各スレッドは独立してデータベースクエリを実行し、特定のユーザーのデータを取得します。

  3. 取得したデータはイテレータに格納され、list() 関数によってリストに変換されます。

  4. 最終的に、このリスト(results)には、すべてのユーザーIDに対するクエリの結果が順番に格納されます。

このアプローチにより、複数のデータベースクエリを効率的に並列実行し、その結果を簡単に収集することができます。

コルーチンで高度なI/O平行処理を行う

コルーチンは、async キーワードを使用して定義される特殊な関数です。これらの関数は、非同期処理をサポートし、await キーワードを使用して他の非同期処理を待機することができます。

async

  • 関数の定義: async def は、関数をコルーチンとして定義します。これにより、その関数内で await キーワードが使用可能になります。

await

  • 結果の待機: 非同期関数(コルーチン)の実行結果を待機するためにawait が使用されます。awaitに記述した非同期処理の完了を待つ間、プログラムの他の部分(後続)が実行されることができます。

コルーチンのメリット

  • 効率的な並列実行: コルーチンはI/Oバウンドな操作(例: ネットワークリクエスト、ディスクI/O)において効率的です。これらの操作がブロッキングせずに実行されるため、何千、何万ものタスクを効率的に「見かけ上」並行して実行することができます。
  • リソースの節約: コルーチンはスレッドよりも少ないメモリを消費し、コンテキストスイッチのコストが低いため、リソースを節約できます。
  • ファンアウトとファンイン: 非同期プログラミングにおけるコルーチンを使用することで、複数のタスク(ファンアウト)を同時に実行し、その結果を一か所に集約(ファンイン)することが容易になります。
  • スレッド関連の問題の克服: スレッドを使った並列処理に伴うデッドロック、競合状態などの問題を避けることができます。
import asyncio

#実行したい関数
async def fetch_data(task_id):
    await asyncio.sleep(1)  # 仮の待機処理
    return f"Data from task {task_id}"

#コルーチンの作成と実行
async def main():
    tasks = [fetch_data(i) for i in range(1000)]
    return await asyncio.gather(*tasks)

# 非同期イベントループを開始
results = asyncio.run(main())

実行の流れ
asyncio.run(main()) により、メインコルーチン main がイベントループ上で実行されます。
main 内で、fetch_data 関数のインスタンス(コルーチンオブジェクト)が1000個生成されます。
これらのコルーチンは asyncio.gather を使って並列に実行されます。asyncio.gather は各タスクの完了を待ち、すべてのタスクの結果をリストとして返します。
各 fetch_data タスクは独立して asyncio.sleep(1) を実行し、1秒間の非同期待機を行います。この待機は非ブロッキングであり、イベントループは他のタスクを進行させることができます。
すべての fetch_data タスクが完了すると、main から返された結果のリストが results に格納されます。
このプロセスにより、1000個の非同期タスクが効率的に並列に実行され、その結果が集約されます。これにより、I/Oバウンドなタスクの高速化と効率的なリソース使用が実現されます。

様々な非同期バージョン

[1]非同期forループ (async for)

非同期forループは、非同期イテラブル(例: 非同期APIからデータを取得する場合)を反復処理するために使用されます。

import asyncio

async def fetch_data(i):
    await asyncio.sleep(1)  # データ取得の模擬
    return f"data {i}"

async def async_generator():
    for i in range(5):
        yield await fetch_data(i)

async def main():
    async for data in async_generator():
        print(data)

asyncio.run(main())

この例では、async_generator が非同期ジェネレータとして定義され、async for ループでその要素を反復処理しています。

[2]非同期with文 (async with)

非同期with文は、非同期コンテキストマネージャ(例: 非同期データベース接続)を使用する場合に役立ちます。

import asyncio
import asyncpg

async def fetch_data():
    async with asyncpg.connect(user='user', password='password', database='db', host='127.0.0.1') as conn:
        # 非同期データベース操作
        return await conn.fetch("SELECT * FROM table")

asyncio.run(fetch_data())

この例では、asyncpg 非同期データベースクライアントを使用して、非同期with文内でデータベース操作を行っています。

[3]非同期ジェネレータ

非同期ジェネレータは、非同期処理を含むジェネレータを作成するのに使用されます。

import asyncio

async def async_generator():
    for i in range(5):
        await asyncio.sleep(1)
        yield i

async def main():
    async for item in async_generator():
        print(item)

asyncio.run(main())

[4]非同期内包表記

非同期処理を含む内包表記を直接的に書くことはできませんが、async for ループ内でコレクションを構築することで類似の結果を得ることができます。

import asyncio

async def fetch_data(i):
    await asyncio.sleep(1)
    return i

async def main():
    results = [await fetch_data(i) for i in range(5)]
    print(results)

asyncio.run(main())

この例では、リスト内包表記の中で await を使用して、非同期処理の結果をリストに格納しています。

[5]非同期ヘルパー関数

asyncio.gather は、複数の非同期タスクを同時に実行し、その結果を集約するために使用されます。

import asyncio
import aiohttp  # 非同期HTTPクライアント

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.text(), response.status

async def main():
    urls = [
        "https://api.example.com/data1",
        "https://api.example.com/data2",
        "https://api.example.com/data3",
        "https://api.example.com/data4",
        "https://api.example.com/data5"
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)  # 結果を表示

asyncio.run(main())

複数のAPIエンドポイントからデータを取得する例です。
各API呼び出しはネットワークI/Oを伴うため、これを並列に実行することで効率を大幅に向上させることができます。

コルーチンの実際の実行は await asyncio.gather(*tasks) によって行われます。この行が実行されると、asyncio.gather はリスト内のすべてのコルーチンを並列に実行します。
tasks = [fetch_data(session, url) for url in urls]はあくまでfetch_dataの各タスクを格納しているだけです。
asyncio.gather は渡されたすべてのコルーチンが完了するまで待機し、それぞれの結果を集めたリストを返します。

スレッドとこルーチンを組み合わせてasynioへの移行を容易にする

asyncio モジュールには、既存の同期コードを非同期化するための機能がいくつか用意されています。
これらの機能を使うことで、コードを asyncio に移行することが可能です。
以下三つのメソッドで、そのようなマイグレーションを行います。

1.run_in_executor メソッド

変更前:同期関数の例

def blocking_io_operation(n):
    for i in range(n):
        time.sleep(1)
    return "I/O操作完了"

変更後:run_in_executor を使用

import asyncio
import time

def blocking_io_operation(n):
    for i in range(n):
        time.sleep(1)
    return "I/O操作完了"

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, blocking_io_operation, 5)
    print(result)

asyncio.run(main())

run_in_executor は、非同期コルーチン内で時間のかかるブロッキング操作(この場合は blocking_io_operation)を実行するために使用されます。この方法により、ブロッキング操作が非同期的に実行され、イベントループが他のタスクを実行できるようになります。

2.run_until_complete メソッド

変更前:コルーチンの定義

async def async_task():
    await asyncio.sleep(1)
    return "非同期タスク完了"

変更後:run_until_complete を使用

loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_task())
print(result)

run_until_complete は、非同期コルーチンを同期的な環境で実行するために使用されます。このメソッドは、指定されたコルーチンが完了するまでプログラムの実行をブロックし、その結果を返します。

3.run_coroutine_threadsafe 関数

例えば、メインスレッドがUI更新や他の処理に専念している間に、バックグラウンドで非同期I/O操作を行う場合などが考えられます。

例:
メインスレッドでは他の処理(例えば、GUIの更新や他のタスク)を行いながら、バックグラウンドスレッドから非同期的にWeb APIへのリクエストを実行するような、

非同期HTTPリクエストを行うコルーチン
import asyncio
import aiohttp
import threading

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()
バックグラウンドスレッドからこのコルーチンを起動するための関数
def start_background_task(loop, url):
    asyncio.run_coroutine_threadsafe(fetch_url(url), loop)
メインスレッドでイベントループを設定し、バックグラウンドスレッドから非同期タスクを起動
def main():
    # メインイベントループを取得
    loop = asyncio.get_event_loop()

    # バックグラウンドスレッドで非同期タスクを実行
    url = "http://example.com"
    thread = threading.Thread(target=start_background_task, args=(loop, url))
    thread.start()

    # メインスレッドの他の処理...
    # 例: time.sleep(5)

    # スレッドの終了を待機
    thread.join()

    # イベントループを閉じる(プログラムの最後に)
    loop.close()

if __name__ == "__main__":
    main()

この例では、以下の手順で処理が行われます:

  1. fetch_url コルーチンは、指定されたURLに対する非同期HTTPリクエストを行います。
  2. start_background_task 関数は、与えられたイベントループ loop で fetch_url コルーチンをスケジュールします。asyncio.run_coroutine_threadsafe は、異なるスレッドからでもイベントループに安全にアクセスできるようにします。
  3. メインスレッドでイベントループが開始され、バックグラウンドスレッドで非同期タスクが実行されます。
  4. メインスレッドは他の処理を続けながら、バックグラウンドスレッドでのタスクの完了を待機します。

使用上の注意
asyncio.run_coroutine_threadsafe は、スレッド間での非同期タスクの実行を可能にしますが、スレッド間でのデータ共有や同期には注意が必要です。
イベントループは通常、プログラムのメインスレッドでのみ操作するべきですが、asyncio.run_coroutine_threadsafe を使うと、別のスレッドから安全にタスクを追加できます。

頑健性と性能

with

with文はリソースの取得と解放を自動化するために使われます。主にファイル操作、ネットワーク接続、データベースセッションなどで有用です。
通常、リソースを手動で開き、最後には必ず閉じなければなりませんが、with文を使用すると、この「開く」と「閉じる」のプロセスが自動化され、エラー処理も簡単になります。

contextlibモジュールとcontextmanagerデコレータ

contextlibモジュールとcontextmanagerデコレータはリソースの管理を簡素化し、コードの可読性を向上させるために使用されます。

contextlib

contextlibはPythonの標準ライブラリの一部で、コンテキスト管理のためのユーティリティを提供します。コンテキスト管理とは、特定のリソース(ファイル、ネットワーク接続、データベースセッションなど)の取得と解放、または特定の状態(設定変更、ロックの取得/解放など)の設定とリセットを自動化するプロセスのことです。

contextmanagerデコレータ

contextmanagerデコレータはcontextlibモジュール内にあり、通常の関数をコンテキストマネージャーに変換するために使用されます。コンテキストマネージャーは、withステートメントと共に使用され、リソースの取得と解放、状態の設定とリセットを管理します。

使用方法

  • 関数の定義: contextmanagerデコレータを使って、リソース管理のための関数を定義します。この関数では、リソースの取得、リソースを使った操作、そしてリソースの解放に関するコードが含まれます。
  • yieldの使用: 関数内でyieldを使い、リソースまたは必要なオブジェクトをyieldします。yieldは関数の実行を一時的に停止し、リソースをwith文に渡します。
  • リソースの自動解放: withブロックを抜けると(正常終了するか例外が発生したかに関わらず)、yieldの後のコードが実行され、リソースが適切に解放されます。
from contextlib import contextmanager

@contextmanager
def managed_resource(*args, **kwargs):
    resource = acquire_resource(*args, **kwargs)
    try:
        yield resource
    finally:
        release_resource(resource)

# 使用例
with managed_resource() as resource:
    # ここでresourceを使う
    pass

ローカルクロックにはtimeではなくdatetimeを使う

異なるタイムゾーン間の変換を行う際にtimeモジュールを使用しない理由として、timeモジュールがタイムゾーンの変換や操作に関しては基本的な機能しか提供していない点が挙げられます。
代わりに、datetimeモジュールとpytzというサードパーティモジュールを組み合わせて使用することが一般的です。

日付の変換手順

  • UTCで時刻を取得または設定する:
    UTC(協定世界時)は国際的な標準時で、タイムゾーンの影響を受けません。プログラム内で時刻を扱う際は、UTCで取得または設定することがベストプラクティスです。

  • 必要に応じてタイムゾーンを変換する:
    ユーザーのローカルタイムゾーンに合わせて表示する前に、UTCの時刻を適切なタイムゾーンに変換します。

from datetime import datetime
import pytz

# UTCで現在時刻を取得
utc_now = datetime.now(pytz.utc)

# ローカルタイムゾーン(例:東京)に変換
tokyo_timezone = pytz.timezone('Asia/Tokyo')
local_time = utc_now.astimezone(tokyo_timezone)

print(f"UTC Time: {utc_now}")
print(f"Tokyo Time: {local_time}")

このコードでは、まずUTCの現在時刻を取得し、その後pytzを使用して東京のタイムゾーンに変換しています。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1