LoginSignup
7
7

Effective Python 第2版 を自分なりにまとめてみる part3

Last updated at Posted at 2022-06-17

はじめに

こちらの書籍のまとめになります

Effective Python 第2版 ――Pythonプログラムを改良する90項目 (Brett Slatkin 著、黒川 利明 訳、石本 敦夫 技術監修)

  • 全てのパートをまとめているわけではありません

  • 個人的に難しくて理解できていなかったり腑に落ちていない箇所は省いています

  • もしくは新たな気づきは特にないなと感じたところも省略しています

  • コードに関しては書籍のものを丸々掲載するでなく、改変しています(その過程も個人的に有意義な時間でした)

  • そのような理由からこのブログでは多くの部分を削ってしまっています。オリジナルの書籍はかなり勉強になるなと思いました。興味ある人は是非読んでください。

  • このページでは本書の6〜7章序盤をまとめています。他の章はこちらを参照ください

第六章 メタクラスと属性

getメソッドやsetメソッドは使わず属性をそのまま使う

  • part2でも触れたプライベート属性の話ともリンクしてくるが、JavaやC++と違い、Pythonではなるべくgetter/setterは使わない

再利用可能な@propertyメソッドにディスクリプタを使う

  • 例えば以下のようなPokemonのステータス管理をするクラスがあったとする
class Pokemon:
    def __init__(self, name, hp=1, attack=1):
        self.name = name
        self._hp = hp
        self._attack = attack

    @property
    def hp(self):
        return self._hp

    @hp.setter
    def hp(self, value):
        if not (0 < value < 256):
            raise ValueError('Value must be between 1 and 255')
        self._hp = value

    @property
    def attack(self):
        return self._attack

    @attack.setter
    def attack(self, value):
        if not (0 < value < 256):
            raise ValueError('Value must be between 1 and 256')
        self._attack = value


lizardon = Pokemon('リザードン')
lizardon.hp = 78
lizardon.attack = 84

print(lizardon.hp)        # -> 78
print(lizardon.attack)    # -> 84

pikachu = Pokemon('ピカチュウ', hp=35, attack=55)

print(pikachu.hp)         # -> 35
print(pikachu.attack)     # -> 55
  • この場合、defense, speed, ...と増えていった場合にいちいち追加するのが面倒
  • なのでこう書く
    • 本書ではこの結論に至るまでに色々と途中経過を掲載しているのですが少し難しく、自分は咀嚼できてないので載せてません。是非みてみてください
from weakref import WeakKeyDictionary


class Status:
    def __init__(self):
        self._values = WeakKeyDictionary()

    def __get__(self, instance, instance_type):
        if instance is None:
            return self
        return self._values.get(instance, 0)

    def __set__(self, instance, value):
        if not (0 < value < 256):
            raise ValueError('Value must be between 1 and 256')
        self._values[instance] = value


class Pokemon:
    def __init__(self, name):
        self.name = name
        hp = Status()
        attack = Status()
        defense = Status()
        special_attack = Status()
        special_defense = Status()
        speed = Status()


pikachu = Pokemon('ピカチュウ')
pikachu.hp = 35
pikachu.attack = 55
pikachu.defense = 40
pikachu.special_attack = 50
pikachu.special_defense = 50
pikachu.speed = 90

print(f'{pikachu.name}の攻撃力は{pikachu.attack}')
[out]
ピカチュウの攻撃力は55

サブクラスを__init_subclass__で事前検証する

  • MetaClass -> valid用のSuperClass -> SubClass で検証していくと色々大変
    • どういう流れで大変かは本書を参照
  • SuperClassに__init_subclass__を定義することでサブクラスの妥当性をチェックする方が妥当
class Typ:
    typ = None  # サブクラスで指定する必要がある

    def __init_subclass__(cls):
        super().__init_subclass__()
        if cls.typ not in {'Normal', 'Fire', 'Water', 'Grass'}:
            raise ValueError('Typ need a valid typ')


class PokemonFire(Typ):
    typ = 'Fire'


class Pokemon(PokemonFire):
    def __init__(self, name):
        self.name = name


charmander = Pokemon('ヒトカゲ')
  • 以下の場合、サブクラスを定義しようとした時点でValueErrorが発生する
class PokemonEsper(Typ):
    typ = 'Esper'
  • ダイヤモンド継承のような複雑なケースでも__init_subclass__は使える
  • 下記例の場合、BottomLeftRightの両方が継承されているので初期化の際に2回Top.__init_subclass__が呼び出されそうだが、実際は1回しか呼び出されない。上手くできてる
class Top:
    def __init_subclass__(cls):
        super().__init_subclass__()
        print(f'Top -> {cls}')

print()

class Left(Top):
    def __init_subclass__(cls):
        super().__init_subclass__()
        print(f'Left -> {cls}')

print()

class Right(Top):
    def __init_subclass__(cls):
        super().__init_subclass__()
        print(f'Right -> {cls}')

print()

class Bottom(Left, Right):
    def __init_subclass__(cls):
        super().__init_subclass__()
        print(f'Bottom -> {cls}')

print()

class Bottom2(Bottom):
    def __init_subclass__(cls):
        super().__init_subclass__()
        print(f'Bottom2 -> {cls}')
[out]
Top -> <class '__main__.Left'>

Top -> <class '__main__.Right'>

Top -> <class '__main__.Bottom'>
Right -> <class '__main__.Bottom'>
Left -> <class '__main__.Bottom'>

Top -> <class '__main__.Bottom2'>
Right -> <class '__main__.Bottom2'>
Left -> <class '__main__.Bottom2'>
Bottom -> <class '__main__.Bottom2'>

第七章 並行性と並列性

  • 並行性
    • 異なるタスクを見かけ上同じ時間に行う。例えばOSは単一プロセッサ上で実行するプログラムを忙しく切り替えている
    • Pythonでは比較的容易に並行性プログラムを書くことができる
  • 並列性
    • 異なるタスクを実際に同じ時間で行う。各CPUが別々のプログラムを同時に実行する
    • 並行Pythonコードを本当に並列に実行することは非常に難しい場合もある。このような困難な状況において、Pythonをどのように使うのが最良かを理解することが重要

subprocessで外部プログラムを実行する子プロセスを管理する

  • 色んなことを同時実行しようとした挙句、シェルスクリプトがとんでもないことになり始めたらPythonのsubprocessの導入を考える
  • Python3.5以降はsubprocess.runでコマンドを実行することが公式的に推奨されている
import subprocess


result = subprocess.run(['ls'], stdout=subprocess.PIPE)
print(result.stdout.decode())   # -> Untitled.ipynb
  • runでなくPopenを使うと、より高度な操作が可能
  • 例えば子プロセスの状態を定期的にポーリングすることが可能
import time


proc = subprocess.Popen(['sleep', '10'])


while proc.poll() is None:
    print('実行中')
    time.sleep(1)
print(proc.poll())
[out]
実行中
実行中
実行中
実行中
実行中
実行中
実行中
実行中
実行中
実行中
0
  • 多数の子プロセスを並列に実行可能
before = time.time()

p03 = subprocess.Popen(['sleep', '3'])
p10 = subprocess.Popen(['sleep', '10'])
proc_list = [p03, p10]

for proc in proc_list:
    proc.communicate()

after = time.time()
print(after - before)   # -> 10.019080877304077(13秒かかってないs)
  • 二つの子プロセスをパイプで繋ぎ、順番に実行することも可能
p1 = subprocess.Popen(['ls'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['wc', '-l'], stdin=p1.stdout, stdout=subprocess.PIPE)
out, err = p2.communicate()
print(out.decode())   # -> 1
  • 前プロセスの正常終了が確かでない場合など、後プロセスにTimeOutを加えることも可能
proc = subprocess.Popen(['sleep', '120'])
proc.communicate(timeout=1)   # -> TimeoutExpired

マルチスレッドでは高速化できない

  • Pythonには複数の実装が存在するC言語で実装されたCPythonやJava実装のJythonなど

  • その中でもCPythonが標準実装

  • CPythonの一貫性を保つためにPython(やRuby)ではGlobal Interpreter Lock(GIL)といった排他ロックの仕組みだ導入されている

  • C++やJavaなどでいうマルチスレッド(並列性)の仕組みがGILのせいで機能しない

  • 他言語では複数CPUを同時に動かせるのにPythonではそれができない

  • シングルスレッドで計算

import time
import random
from math import gcd


def lcm(m, n):
    """最小公倍数アルゴリズム
    """
    return (m * n) // gcd(m, n)


def lcm_list(m_list, n_list):
    return [lcm(m, n) for m, n in zip(m_list, n_list)]


numbers1 = [random.randint(1, 10**2) for _ in range(10**6)]
numbers2 = [random.randint(1, 10**2) for _ in range(10**6)]

start = time.time()
ans = lcm_list(numbers1, numbers2)
end = time.time()

print(f'{end - start:.3f} sec.')   # -> 0.142 sec.
  • マルチスレッドで計算、シングルスレッドと同じような時間で終わると思いきや倍以上かかってる
from threading import Thread

start = time.time()

# 処理開始
t1 = Thread(target=lcm_list, args=(numbers1, numbers2))
t2 = Thread(target=lcm_list, args=(numbers1, numbers2))
t1.start()
t2.start()
t1.join()
t2.join()

end = time.time()

print(f'{end - start:.3f} sec.')   # -> 0.305 sec.
  • なお、下記のような場合は特にsleepさせている間にロックが解除され、別スレッドが動き始めるので確かに同時に実行しているといえる
def sleep(n):
    time.sleep(n)


start = time.time()
t1 = Thread(target=sleep, args=(1,))
t2 = Thread(target=sleep, args=(2,))
t3 = Thread(target=sleep, args=(3,))

for t in [t1, t2, t3]:
    t.start()

for t in [t1, t2, t3]:
    t.join()

end = time.time()
print(f'{end - start:.3f} sec.')   # -> 3.002 sec.

スレッド間の線形パイプラインにはQueueを使う

  • 例えば下記のような実装をした場合いくつかの問題点がある
    1. 前処理が終わってないのにポーリングしていて作業の無駄が発生してしまってる
    2. スレッド3つが常時run状態
    3. これらの影響でどこかのqueueに渋滞が発生した場合、メモリクラッシュもありうる
import time
from collections import deque
from threading import Thread, Lock


def plus(item):
    return item + 2

def times(item):
    return item * 2

def minus(item):
    return item - 2


class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()

    def put(self, item):
        with self.lock:
            self.items.append(item)

    def get(self):
        with self.lock:
            return self.items.popleft()


class MyThread(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0  # ポーリングの総回数
        self.work_done = 0     # エンキュー/デキューが行われた回数

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01)   # 待ち行列がない時に発生
            except AttributeError:
                return
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1


queue1 = MyQueue()
queue2 = MyQueue()
queue3 = MyQueue()
queue4 = MyQueue()
threads = [
    MyThread(plus, queue1, queue2),
    MyThread(times, queue2, queue3),
    MyThread(minus, queue3, queue4),
]

# スレッドを起動
# まだ値は追加されてないのでこの時点では実際の処理は始まらない
for thread in threads:
    thread.start()

# 1番目のqueueにエンキュー
for i in range(10**5):
    queue1.put(i)

while len(queue4.items) < 10**5:
    print('処理中...')
    time.sleep(0.1)

print('処理終了')
for thread in threads:
    thread.in_queue = None
    thread.join()
[out]
処理中...
処理中...
処理終了
print(f'スレッド1の無駄: {threads[0].polled_count - threads[0].work_done}')
print(f'スレッド2の無駄: {threads[1].polled_count - threads[1].work_done}')
print(f'スレッド3の無駄: {threads[2].polled_count - threads[2].work_done}')
[out]
スレッド1の無駄: 10
スレッド2の無駄: 12
スレッド3の無駄: 11
  • なので組み込みモジュールのQueueクラスを使う
    • コードは本書の丸コピになってしまいそうなので割愛
7
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
7