147
119

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Python で学ぶ、コルーチンと非同期処理

Posted at

TL;DR

  • ただの個人メモです
  • 他人に説明する目的は弱く、自分の理解を記録するためのものです
  • どうやって非同期処理を書くのか、といった実践的な事はほぼ書いてません

はじめに

これは、JavaScript のイベント駆動スタイルに心が折れ、Promise やら Future やら Generator やら async/await やらを見て見ぬふりをし続けてきた人間が、Fluent Python を読んで非同期処理を学んだ記録です。

なお、Python 自体の経験は、数年前(2.x 時代)に 2, 3 ヶ月利用し、最近 AWS Lambda 関数を書くのに再入門した程度です。

非同期処理

今回学ぶ『非同期処理』は、シングルスレッドのイベントループによって実現されるものです。マルチプロセスやマルチスレッドによる並行処理については主題としません。

言い換えると、threading ではなく asyncio を学ぶという事です。

コルーチン

asyncio による非同期処理を学ぶ前に、『コルーチン』という概念を理解する必要がありました。

コルーチン(英: co-routine)とはプログラミングの構造の一種。サブルーチンがエントリーからリターンまでを一つの処理単位とするのに対し、コルーチンはいったん処理を中断した後、続きから処理を再開できる。接頭辞 co は協調を意味するが、複数のコルーチンが中断・継続により協調動作を行うことによる。
コルーチン - Wikipedia

定義は上記の様ですが、具体的にはどういう事でしょうか。Python では、『ジェネレータ』を利用してコルーチンを実装する事ができます。コルーチンを一度棚に上げ、ジェネレータについて学びます。

※ なお、厳密な『コルーチン』の定義(必要条件、十分条件)はよく分かっていません。ここでは Wikipedia に従います。

ジェネレータ

自分にとって Python のジェネレータとは、「値を遅延して生成するイテレータ」という程度のもので、それがコルーチンや非同期処理に繋がるという所まで掘り下げたことがありませんでした。ところが、実際には、ジェネレータこそが Python における非同期処理(asyncio)の要でした。

例えば、以下は延々と yes を返し続けるジェネレータ関数(yield キーワードを持つ Python の関数)です。

def yes():
    """
    >>> yes_man = yes()
    >>> yes_man.__class__.__name__
    generator
    >>> print(next(yes_man))
    yes
    >>> print(next(yes_man))
    yes
    >>> print(next(yes_man))
    yes
    """
    while True:
        yield 'yes'

ジェネレータ関数によって生成されたジェネレータオブジェクト(ジェネレータ関数の戻り値)を next() 関数に与えることで、 yield までステップを進める事ができます。これは、 yield 式を評価した時点で処理が止まるということです。

def yes():
    """
    >>> yes_man = yes()
    >>> print(next(yes_man))
    A
    B
    yes
    >>> print(next(yes_man))
    C
    B
    yes
    >>> print(next(yes_man))
    C
    B
    yes
    """
    print 'A'
    while True:
        print 'B'
        yield 'yes'
        print 'C'

なお、ジェネレータオブジェクトはイテラブルなので、for 文やリスト内包表記などで利用する事ができます。

def yes():
    while True:
        yield 'yes'

for answer in yes():
    print(answer)

ここで重要なことは、関数の中で処理を停止させる事ができ、また再開させることができる、という点です。改めてコルーチンの定義を思い返してみましょう。

コルーチン(英: co-routine)とはプログラミングの構造の一種。サブルーチンがエントリーからリターンまでを一つの処理単位とするのに対し、コルーチンはいったん処理を中断した後、続きから処理を再開できる。接頭辞 co は協調を意味するが、複数のコルーチンが中断・継続により協調動作を行うことによる。
コルーチン - Wikipedia

『コルーチンは処理を中断した後、続きから処理を再開できる』となっていますね。ジェネレータは処理を中断・再開できるので、コルーチンであると言えることが分かります。

コルーチン(再び)

それでは、棚に上げていたコルーチンを棚から下ろしましょう。Python のジェネレータによって実現されるコルーチンを学びます。

Python のジェネレータは値を返すだけでなく、値を受け取る(呼び出し元から送信する)事もできます。ジェネレータに値を送信する場合は、 next() 関数ではなくジェネレータオブジェクトの .send() メソッドを利用します。

def recv():
    """
    >>> receiver = recv()
    >>> next(receiver)
    Started.
    >>> receiver.send(1)
    Receive: 1
    >>> receiver.send(2)
    Receive: 2
    >>> receiver.send(3)
    Receive: 3
    """
    print('Started.')
    while True:
        v = yield
        print(f'Receive: {v}')

最初に next() 関数を実行しているのは、ジェネレータを実行状態に遷移させるためです。参考までに、以下にジェネレータの状態を紹介します。

  1. 実行の開始を待機中(GEN_CREATED)
  2. インタプリタが現在実行中(GEN_RUNNING)
  3. yield の箇所で現在休止中(GEN_SUSPENDED)
  4. 実行完了(GEN_CLOSED)

最初の next() でジェネレータを活性化させる事を予備処理などと呼ぶ様ですが、これを毎回行うのは冗長に思えます。一般的には、この処理はデコレータ関数によって済ませる様です。

from functools import wraps

def coroutine(f):
    @wraps(f)
    def primer(*args, **kwargs):
        g = f(*args, **kwargs)
        next(g)
        return g
    return primer

@coroutine
def recv():
    """
    >>> receiver = recv()
    Started.
    >>> receiver.send(1)
    Receive: 1
    >>> receiver.send(2)
    Receive: 2
    >>> receiver.send(3)
    Receive: 3
    """
    print('Started.')
    while True:
        v = yield
        print(f'Receive: {v}')

ジェネレータ関数の内部で return を書く事もできます。ジェネレータオブジェクトは、最後の yield 式を評価して関数のスコープを終了する際に StopIteration 例外を発生させます。この例外オブジェクトの value 属性に return 式の値が代入されています。

@coroutine
def counter():
    """
    >>> c = counter()
    >>> c.send(1)
    >>> c.send(2)
    >>> c.send(3)
    >>> try:
    ...     c.send(None)
    ... except StopIteration as e:
    ...     res = e.value
    >>> res
    6
    """
    count = 0
    while True:
        by = yield
        if by is None:
            break
        count += by
    return count

自然に扱えるという感じではありませんが、Python のジェネレータによって処理を中断・再開させられるコルーチンを実現することができる、という事を学びました。

ジェネレータのネスト

ここで、再びコルーチンから離れ、Python のジェネレータのネストについて学びます。

例えば、以下の様にイテレータを使ってループ内で yield する場合、これを yield from を使って書きかえる事ができます。

def g():
    """
    >>> list(g())
    ['A', 'B', 1, 2]
    """
    for c in 'AB':
        yield c
    for i in range(1, 3):
        yield i
def g():
    """
    >>> list(g())
    ['A', 'B', 1, 2]
    """
    yield from 'AB'
    yield from range(1, 3)

これだけでは yield fromyield ループのシンタックスシュガーのように思えてしまいますが、そうではありません。本来の yield from の目的は、ネストされたジェネレータについて、最も外側の呼び出し側と最も内側のジェネレータの間で直接通信(値の送受信や例外処理)できる様にすることにあります。 yield from を導入する際の PEP 380 のタイトルは Syntax for Delegating to a Subgenerator です。

外側からサブジェネレータと通信する例を示します。外側のデリゲーションジェネレータとサブジェネレータの反復数は同調している必要があるため、両方で while True としています。

def delegator():
    """
    >>> g = delegator()
    >>> next(g)
    >>> g.send('A')
    A
    >>> g.send('B')
    B
    >>> g.send(None)
    fin.
    """
    while True:
        res = yield from subgenerator()
        print(res)

def subgenerator():
    while True:
        recv = yield
        if recv is None:
            break
        print(recv)
    return 'fin.'

通常のジェネレータ関数では return 式の値を受け取るために StopIteration 例外の value 属性を参照するハックが必要でしたが、 yield from ではサブジェネレータの return の値を返せる様になっています。

非同期処理(再び)

コルーチン(ジェネレータ関数)を学び、コルーチン(ジェネレータ)のネストを学んだところで、asyncio を利用した非同期処理について学びます。

以下は、長時間かかる heavy() コルーチンの完了を待つ間、画面にスピナーを表示させるプログラム(Fluent Python のコードを参考)です。

import asyncio
import sys
import itertools

@asyncio.coroutine
def heavy():
    yield from asyncio.sleep(10)
    return 'done.'

@asyncio.coroutine
def spin():
    write, flush = sys.stdout.write, sys.stdout.flush
    for c in itertools.cycle('|/-\\'):
        write(c)
        flush()
        write('\x08')
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break
    write(' \x08')

@asyncio.coroutine
def task():
    spinner = asyncio.async(spin())
    result = yield from heavy()
    spinner.cancel()
    return result

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task())
    loop.close()
    print(f'Result: {result}')

if __name__ == '__main__':
    main()

いくつか見慣れない箇所がありますが、基本的にはコルーチンを組み合わせたプログラムである事が分かると思います。重要なことは、複数のコルーチンを非同期に並行して実行させることができる、という点です。非同期処理を実現している箇所について、簡単に説明します。

  • @asyncio.coroutine デコレータで asyncio API 互換のコルーチンである事を宣言できる(必須ではないとか)
  • asyncio.async(spin()) によってコルーチンを起動する様にスケジュールされる
    • asyncio.async() 関数は引数のコルーチンをラップした asyncio.Task オブジェクトを返す
  • asyncio.get_event_loop() 関数によってコルーチンを非同期実行するためのイベントループを取得できる
    • イベントループの run_until_complete() 関数にコルーチンを渡す事で、コルーチンを開始することができる

ところで、Python の asyncio について簡単に知ろうと思い検索した際によく見たのは async defawait という構文です。これは使わないのでしょうか?これについては、Fluent Python で以下の様に説明されていました。

awaitとasyncというキーワードの追加を提案する「PEP 492-Coroutines with async and await syntax」がありますが、本書執筆時点ではまだ議論中です。
監訳注:PEP 492はPython 3.5から標準ライブラリに組み込まれました。

それでは、先のスピナーの例は async await を使って書きかえる事ができるでしょうか。

基本的にはコルーチンを async def で宣言し、内部の yield fromawait に置き換える事で書きかえる事ができました。ただし、コルーチン内から複数のコルーチンを並行実行させる方法は、よく分かっていません。今回は async.wait() 関数を利用しています。

import asyncio
import sys
import itertools

async def heavy():
    await asyncio.sleep(10)
    return 'done.'

async def spin():
    write, flush = sys.stdout.write, sys.stdout.flush
    for c in itertools.cycle('|/-\\'):
        write(c)
        flush()
        write('\x08')
        try:
            await asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break
    write(' \x08')

async def task():
    done, pending = await asyncio.wait([spin(), heavy()], return_when=asyncio.FIRST_COMPLETED)
    for task in pending:
        task.cancel()
    return done.pop().result()

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task())
    loop.close()
    print(f'Result: {result}')

if __name__ == '__main__':
    main()

改めて、asyncio による非同期処理(コルーチン)を書く場合の作法を整理します。

  • async def でコルーチンを宣言する
  • コルーチンからコルーチンを呼び出す際は await を使用する
  • コルーチンを起動させるためにはイベントループを使用する

『非同期処理』を語るのであれば、並行処理のサンプルや非同期化のためのタスクスケジューリングの話にまで踏み込むべきですが、今回はここまで。

まとめ

今回は、コルーチンを使って非同期処理を実現する事を学びました。一方、残念ながら asyncio の詳しい使い方や実践的な非同期プログラミングにまでは踏み込むことができませんでした。非同期処理による高速化で分かりやすい例として、aiohttp HTTP ダウンロードの並行化がありますが、今回は割愛します。

  • シングルスレッドのイベントループで実現する非同期処理の話
  • 非同期処理はコルーチンで実現する
  • コルーチンはジェネレータで実現する
  • ジェネレータは処理を中断・再開する

この文章が他人の役に立つとは思いませんが、個人的な学習記録として残しておく事にしました。個人的に、非同期処理の概念になぜ『ジェネレータ』という名前のモノが登場するのか、まるで理解できていなかったのですが、コルーチンという概念と絡める事で、「なるほどわかった(気になれた)」という所まで腹落ちできてスッキリしています。

  • Python のジェネレータをイテレータと解釈していたため、JavaScript で登場した時に完全にフリーズした
  • 普段は Ruby をよく使っているため yield キーワードの整理が追いついていなかった
  • 基本的に非同期処理/並行処理から距離を取って生きてきた

余談:Python と GIL とブロッキング I/O と

CPython は GIL (Global Interpreter Lock) が用意されているため、通常は Python のバイトコードを一度にひとつのスレッドしか実行しません。ところが、I/O をブロックする様な標準ライブラリ関数は、すべて OS から結果が返ってくるまでの間 GIL を解除するとのこと。

つまり、複数の I/O でブロックされる処理を並行処理して高速化するために、マルチスレッド(threadhing ライブラリ)の利用はとても有効だということが言える様です。この辺の話も Fluent Python には詳しく書かれています。

147
119
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
147
119

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?