asyncioは実際どのように動作するのでしょうか? 質問する

asyncioは実際どのように動作するのでしょうか? 質問する

この質問は、私の別の質問から生まれました:cdef で await するにはどうすればいいですか?

ウェブ上に に関する記事やブログ投稿が山ほどありますasyncioが、それらはすべて非常に表面的なものです。asyncioが実際にどのように実装されているか、そして I/O が非同期になる理由に関する情報は見つかりませんでした。 ソース コードを読もうとしましたが、それは最高レベルの C コードではなく、その多くは補助オブジェクトを扱っていますが、最も重要なのは、Python 構文とそれが変換される C コードとのつながりが難しいことです。

Asycnio 自身のドキュメントはさらに役に立ちません。そこには Asycnio がどのように動作するかに関する情報はなく、使用方法に関するガイドラインがあるだけですが、それも誤解を招く場合があり、非常に書き方が悪いです。

私は Go のコルーチンの実装に精通しており、Python でも同じことができるのではないかと期待していました。もしそうなら、上記のリンク先の投稿で私が作成したコードは機能していたはずです。機能しなかったため、その理由を解明しようとしています。これまでのところ、私の推測は次のようになります。間違っているところがあれば訂正してください。

  1. フォームのプロシージャ定義はasync def foo(): ...、実際には を継承するクラスのメソッドとして解釈されますcoroutine
  2. おそらく、async def実際にはステートメントによって複数のメソッドに分割されておりawait、これらのメソッドが呼び出されるオブジェクトは、これまでの実行を通じて行われた進行状況を追跡することができます。
  3. 上記が真実であれば、本質的には、コルーチンの実行は、何らかのグローバル マネージャー (ループ?) によるコルーチン オブジェクトのメソッドの呼び出しに帰着します。
  4. グローバル マネージャーは、何らかの方法で (どのように?) I/O 操作が Python (のみ?) コードによって実行されることを認識し、現在実行中のメソッドが制御を放棄した後 (ステートメントにヒットawait)、保留中のコルーチン メソッドの 1 つを選択して実行することができます。

言い換えれば、いくつかのasyncio構文を「脱糖化」して、より理解しやすいものにする私の試みは次のとおりです。

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

私の推測が正しいと証明された場合、問題が発生します。このシナリオでは、I/O は実際にどのように発生しますか? 別のスレッドで発生しますか? インタープリター全体が一時停止され、I/O はインタープリターの外部で発生しますか? I/O とは正確には何を意味しますか? Python プロシージャが Copen()プロシージャを呼び出し、次にカーネルに割り込みを送信して制御を放棄した場合、Python インタープリターはこれをどのように認識し、カーネルコードが実際の I/O を実行し、割り込みを最初に送信した Python プロシージャを起動するまで、他のコードの実行を継続できるのでしょうか? Python インタープリターは、原理的に、これが起こっていることをどのように認識できるのでしょうか?

ベストアンサー1

asyncio はどのように機能しますか?

この質問に答える前に、いくつかの基本用語を理解する必要があります。すでに知っている用語がある場合は、これをスキップしてください。

発電機

ジェネレータはPython関数の実行を一時停止できるオブジェクトです。ユーザーキュレーションのジェネレータはキーワードを使用して実装されます。yieldキーワードを含む通常の関数を作成することでyield、その関数をジェネレーターに変換します。

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

ご覧のとおり、next()ジェネレーターで を呼び出すと、インタープリターはテストのフレームをロードし、yielded 値を返します。next()再度呼び出すと、フレームがインタープリター スタックに再度ロードされ、yield別の値の ing が続行されます。

3回目next()が呼ばれる頃には発電機は完成しており、StopIteration投げられました。

発電機との通信

ジェネレータのあまり知られていない機能は、次の 2 つの方法でジェネレータと通信できることです。send()そしてthrow()

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

を呼び出すとgen.send()、キーワードからの戻り値として値が渡されますyield

gen.throw()一方、ジェネレーター内で例外をスローすることを許可し、例外はyield呼び出された同じ場所で発生します。

ジェネレータから値を返す

ジェネレータから値を返すと、その値はStopIteration例外内に置かれます。後で例外から値を回復し、必要に応じて使用することができます。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

新しいキーワードをご覧ください:yield from

Python 3.4 では、新しいキーワードが追加されました。yield fromこのキーワードを使用すると、任意のnext()send()、 をthrow()最も内側のネストされたジェネレータに渡すことができます。内側のジェネレータが値を返す場合、それは の戻り値でもありますyield from

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

私は書いた記事このトピックについてさらに詳しく説明します。

すべてを一緒に入れて

Python 3.4 で新しいキーワードが導入されたことで、ジェネレーターの中にジェネレーターを作成できるようになりました。ジェネレーターはトンネルのように、最も内側のジェネレーターから最も外側のジェネレーターにデータをやり取りします。これにより、ジェネレーターにコルーチンyield fromという新しい意味が生まれました。

コルーチンは実行中に停止したり再開したりできる関数です。Pythonでは、コルーチンはasync defキーワード。ジェネレーターと同様に、独自の形式を使用しますyield fromawaitasyncPython 3.5 でとが導入される前はawait、ジェネレーターが作成されるのとまったく同じ方法 ( のyield from代わりにを使用await) でコルーチンを作成していました。

async def inner():
    return 1
    
async def outer():
    await inner()

すべてのイテレータとジェネレータが メソッドを実装するのと同様に__iter__()、すべてのコルーチンは、呼び出される__await__()たびに継続できるようにするメソッドを実装します。await coro

素敵なシーケンス図内部のPython ドキュメントぜひチェックしてみてください。

asyncio には、コルーチン関数の他に、タスクフューチャーという 2 つの重要なオブジェクトがあります。

先物

Future はメソッドが__await__()実装されたオブジェクトであり、特定の状態と結果を保持する役割を担います。状態は次のいずれかになります。

  1. PENDING - 将来には結果または例外が設定されていません。
  2. キャンセル済み - 将来はキャンセルされましたfut.cancel()
  3. FINISHED - フューチャーは、結果セットを使用して終了しました。fut.set_result()または、例外セットによってfut.set_exception()

結果は、ご想像のとおり、返される Python オブジェクトか、発生する可能性のある例外のいずれかになります。

オブジェクトのもう一つの重要な特徴は、futureadd_done_callback()このメソッドを使用すると、例外が発生したか終了したかに関係なく、タスクが完了するとすぐに関数を呼び出すことができます。

タスク

タスク オブジェクトは、コルーチンをラップし、最も内側のコルーチンと最も外側のコルーチンと通信する特別なフューチャーです。コルーチンがフューチャーをawait実行するたびに、フューチャーはタスクに渡され ( の場合と同様にyield from)、タスクはそれを受信します。

次に、タスクは自身を未来にバインドします。これは、未来を呼び出すことによって行われますadd_done_callback()。今後、キャンセルされるか、例外が渡されるか、結果として Python オブジェクトが渡されるかのいずれかによって未来が終了すると、タスクのコールバックが呼び出され、タスクは再び存在するようになります。

アシンシオ

私たちが答えなければならない最後の重要な質問は、IO はどのように実装されるかということです。

asyncio の奥深くに、イベント ループがあります。タスクのイベント ループです。イベント ループの役割は、タスクの準備ができるたびにタスクを呼び出し、そのすべての作業を 1 つの作業マシンに調整することです。

イベントループのIO部分は、1つの重要な関数に基づいて構築されています。selectSelect は、オペレーティング システムによって実装されるブロッキング関数で、ソケットの受信データまたは送信データを待機できるようにします。データを受信すると起動し、データを受信したソケット、または書き込み準備が整ったソケットを返します。

asyncio を介してソケット経由でデータを受信または送信しようとすると、実際には、ソケットにすぐに読み取りまたは送信できるデータがあるかどうかが最初にチェックされます。.send()バッファーがいっぱいの場合、または.recv()バッファーが空の場合、ソケットは関数に登録され(およびselectのリストの 1 つにソケットを追加するだけです) 、適切な関数が新しく作成されたオブジェクトとしてそのソケットに関連付けられます。rlistrecvwlistsendawaitfuture

利用可能なすべてのタスクが futures を待機している場合、イベント ループが呼び出されselectて待機します。ソケットの 1 つに着信データがあるか、そのsendバッファーが空になると、asyncio はそのソケットに関連付けられている future オブジェクトをチェックし、それを done に設定します。

ここで、すべての魔法が起こります。future が done に設定され、以前に自身を追加したタスクがadd_done_callback()再び起動し、.send()コルーチンを呼び出します。コルーチンは (チェーンのためawait) 最も内側のコルーチンを再開し、新しく受信したデータを、それがあふれた近くのバッファーから読み取ります。

メソッドチェーンを再度実行すると、次のようになりますrecv()

  1. select.select待ちます。
  2. データを含む準備完了ソケットが返されます。
  3. ソケットからのデータはバッファに移動されます。
  4. future.set_result()と呼ばれます。
  5. 自身を追加したタスクadd_done_callback()が起動されました。
  6. タスクは.send()コルーチンを呼び出し、最も内側のコルーチンまで移動してそれを起動します。
  7. データはバッファから読み取られ、ユーザーに返されます。

要約すると、asyncio は関数の一時停止と再開を可能にするジェネレーター機能を使用します。yield from最も内側のジェネレーターから最も外側のジェネレーターにデータをやり取りできる機能を使用します。これらすべてを使用して、IO が完了するのを待機している間 (OSselect関数を使用)、関数の実行を停止します。

そして、何よりも素晴らしいのは、1 つの関数が一時停止している間に、別の関数が実行され、繊細なファブリックとインターリーブできることです。これが asyncio です。

おすすめ記事