Python ネイティブコルーチンと send() 質問する

Python ネイティブコルーチンと send() 質問する

ジェネレータ ベースのコルーチンには、send()呼び出し元と呼び出し先の間の双方向通信を可能にし、呼び出し元から生成されたジェネレータ コルーチンを再開するメソッドがあります。これは、ジェネレータをコルーチンに変換する機能です。

新しいネイティブ コルーチンは非同期 I/O の優れたサポートを提供しますが、それらを使用してとasync/await同等のものを取得する方法がわかりません。 関数でのsend()の使用は明示的に禁止されているため、ネイティブ コルーチンはステートメントを使用して 1 回だけ戻ることができます。式はコルーチンに新しい値をもたらしますが、それらの値は呼び出し元ではなく呼び出し先から取得され、待機中の呼び出しは中断したところからではなく、毎回最初から評価されます。yieldasyncreturnawait

返されたコルーチンを中断したところから再開し、新しい値を送信する方法はありますか?David Beazleyのテクニックをエミュレートするにはどうすればよいですか?コルーチンと並行性に関する興味深いコースネイティブコルーチンを使用していますか?

私が考えている一般的なコードパターンは次のようなものです

def myCoroutine():
  ...
  while True:
    ...
    ping = yield(pong)
    ...

そして発信者

while True:
  ...
  buzz = myCoroutineGen.send(bizz)
  ...

編集

私はケビンの答えを受け入れましたが、PEP言う

コルーチンは内部的にジェネレータに基づいているため、実装を共有します。ジェネレータオブジェクトと同様に、コルーチンthrow()send()およびメソッドがありますclose()

...

throw()send()方法コルーチン値をプッシュしてエラーを発生させるために使用されます未来的なオブジェクト。

どうやらネイティブ コルーチンには があるようですね。コルーチン内で値を受け取るために式なしsend()でどのように動作するのでしょうか?yield

ベストアンサー1

Beazley によるコルーチンに関する同じ (素晴らしいと言わざるを得ない) コースを受講した後、私は自分自身にまったく同じ質問をしました。Python 3.5 で導入されたネイティブ コルーチンで動作するようにコードを調整するにはどうすればよいでしょうか?

それはできるコードに比較的小さな変更を加えるだけで済みます。読者はコースの教材に精通していると想定し、pyos4.pyベースとなるバージョン - Scheduler「システム コール」をサポートする最初のバージョン。

ヒント:完全な実行可能な例は以下にあります。付録A最後に。

客観的

目標は、次のコルーチン コードを作成することです。

def foo():
    mytid = yield GetTid()  # a "system call"
    for i in xrange(3):
        print "I'm foo", mytid
        yield  # a "trap"

... ネイティブ コルーチンに組み込んで、以前と同じように使用できます。

async def foo():
    mytid = await GetTid()  # a "system call"
    for i in range(3):
        print("I'm foo", mytid)
        await ???  # a "trap" (will explain the missing bit later)

asyncioプロセス全体を駆動する独自のイベント ループ (クラス) がすでにあるため、なしで実行しますScheduler

待機可能なオブジェクト

ネイティブ コルーチンはすぐには機能しないため、次のコードはエラーになります。

async def foo():
    mytid = await GetTid()
    print("I'm foo", mytid)

sched = Scheduler()
sched.new(foo())
sched.mainloop()
トレースバック(最新の呼び出しが最後):
    ...
    mytid = GetTid() を待つ
TypeError: オブジェクト GetTid は 'await' 式では使用できません

ペップ492どのような種類のオブジェクトを待機できるかを説明します。オプションの1つは__await__「イテレータを返すメソッドを持つオブジェクト」

と同様yield from、ご存知のとおり、awaitは、待機対象のオブジェクトとコルーチンを駆動する最も外側のコード (通常はイベント ループ) の間のトンネルとして機能します。これは、次の例で示すのが最もわかりやすいでしょう。

class Awaitable:
    def __await__(self):
        value = yield 1
        print("Awaitable received:", value)
        value = yield 2
        print("Awaitable received:", value)
        value = yield 3
        print("Awaitable received:", value)
        return 42


async def foo():
    print("foo start")
    result = await Awaitable()
    print("foo received result:", result)
    print("foo end")

コルーチンを対話的に実行するとfoo()、次のものが生成されます。

>>> f_coro = foo()  # calling foo() returns a coroutine object
>>> f_coro
<coroutine object foo at 0x7fa7f74046d0>
>>> f_coro.send(None)
foo start
1
>>> f_coro.send("one")
Awaitable received: one
2
>>> f_coro.send("two")
Awaitable received: two
3
>>> f_coro.send("three")
Awaitable received: three
foo received result: 42
foo end
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

送信されたものはすべてf_coroインスタンスに送られますAwaitable。同様に、Awaitable.__await__()生成されたものはすべて、値を送信する最上位のコードにバブルアップされます。

全体のプロセスはf_coroコルーチンに対して透過的であり、コルーチンは直接関与せず、値の受け渡しを見ることはない。しかし、Awaitableのイテレータが使い果たされると、その戻る値は式の結果await(この場合は 42) になり、最終的にここでf_coro再開されます。

コルーチン内の式も連鎖できることに注意してくださいawait。コルーチンは、別のコルーチンを待機し、そのコルーチンがさらに別のコルーチンを待機するなど、連鎖全体がどこyieldかで終了するまで続きます。

コルーチン自体に値を送信する

この知識はどのように役立つのでしょうか? コースの教材では、コルーチンはインスタンスを生成することができますSystemCall。スケジューラはこれを理解し、システム コールが要求された操作を処理できるようにします。

SystemCallコルーチンがスケジューラに通知するには、SystemCallインスタンスは単に降伏自体、前のセクションで説明したように、スケジューラに送信されます。

したがって、最初に必要な変更は、このロジックを基本SystemCallクラスに追加することです。

class SystemCall:
    ...
    def __await__(self):
        yield self

インスタンスを待機可能にするとSystemCall、次のものが実際に実行されます。

async def foo():
    mytid = await GetTid()
    print("I'm foo", mytid)

>>> sched = Scheduler()
>>> sched.new(foo())
>>> sched.mainloop()

出力:

私はfoo Noneです
タスク 1 終了

素晴らしい、もうクラッシュしません!

ただし、コルーチンはタスク ID を受信せず、代わりに を取得しました。これは、システム コールのメソッドによって設定され、メソッドによって送信されたNone値が原因です。handle()Task.run()

# in Task.run()
self.target.send(self.sendval)

...SystemCall.__await__()メソッド内で終了しました。値をコルーチンに持ち込む場合は、システムコールで戻るawaitそれがコルーチン内の式の値となるようにします。

class SystemCall:
    ...
    def __await__(self):
        return (yield self)

同じコードを変更して実行すると、SystemCall目的の出力が生成されます。

私はフー1です
タスク 1 終了

コルーチンを並行して実行する

コルーチンを一時停止する方法、つまりシステム「トラップ」コードが必要です。コースの教材では、これはyieldコルーチン内のプレーンを使用して行われますが、プレーンを使用しようとすると、await実際には構文エラーになります。

async def foo():
    mytid = await GetTid()
    for i in range(3):
        print("I'm foo", mytid)
        await  # SyntaxError here

幸いなことに、回避策は簡単です。すでに機能するシステム コールがあるため、コルーチンを一時停止してすぐに再スケジュールするだけのダミーの no-op システム コールを追加できます。

class YieldControl(SystemCall):
    def handle(self):
        self.task.sendval = None   # setting sendval is optional
        self.sched.schedule(self.task)

sendvalこのシステム コールは意味のある値を生成するとは想定されていないため、タスクに を設定することはオプションですが、これを明示的に行うことを選択します。

これで、マルチタスク オペレーティング システムを実行するための準備がすべて整いました。

async def foo():
    mytid = await GetTid()
    for i in range(3):
        print("I'm foo", mytid)
        await YieldControl()


async def bar():
    mytid = await GetTid()
    for i in range(5):
        print("I'm bar", mytid)
        await YieldControl()


sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()

出力:

私はフー1です
私はバー2です
私はフー1です
私はバー2です
私はフー1です
私はバー2です
タスク 1 終了
私はバー2です
私はバー2です
タスク2は終了しました

脚注

コードSchedulerはまったく変更されていません。

それは、まさに、機能します。

これは、スケジューラとその中で実行されるタスクが互いに結合されていない元の設計の美しさを示しており、知らないうちにコルーチンの実装を変更できましたSchedulerTaskコルーチンをラップするクラスさえ変更する必要はありませんでした。

トランポリンは必要ありません。

の中にpyos8.pyシステムのバージョン、概念トランポリンが実装されています。これにより、コルーチンはスケジューラの助けを借りて、作業の一部を別のコルーチンに委任できます (スケジューラは親コルーチンに代わってサブコルーチンを呼び出し、その結果を親に送信します)。

このメカニズムは必要ありません。冒頭で説明したようにawait、 (およびその古いバージョン) によって、このような連鎖がすでに可能になっているためです。yield from

付録 A - 完全な実行可能な例 (Python 3.5 以降が必要)

例_full.py
from queue import Queue


# ------------------------------------------------------------
#                       === Tasks ===
# ------------------------------------------------------------
class Task:
    taskid = 0
    def __init__(self,target):
        Task.taskid += 1
        self.tid = Task.taskid   # Task ID
        self.target = target        # Target coroutine
        self.sendval = None          # Value to send

    # Run a task until it hits the next yield statement
    def run(self):
        return self.target.send(self.sendval)


# ------------------------------------------------------------
#                      === Scheduler ===
# ------------------------------------------------------------
class Scheduler:
    def __init__(self):
        self.ready = Queue()   
        self.taskmap = {}        

    def new(self,target):
        newtask = Task(target)
        self.taskmap[newtask.tid] = newtask
        self.schedule(newtask)
        return newtask.tid

    def exit(self,task):
        print("Task %d terminated" % task.tid)
        del self.taskmap[task.tid]

    def schedule(self,task):
        self.ready.put(task)

    def mainloop(self):
         while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result,SystemCall):
                    result.task  = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
                continue
            self.schedule(task)


# ------------------------------------------------------------
#                   === System Calls ===
# ------------------------------------------------------------
class SystemCall:
    def handle(self):
        pass

    def __await__(self):
        return (yield self)


# Return a task's ID number
class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.tid
        self.sched.schedule(self.task)


class YieldControl(SystemCall):
    def handle(self):
        self.task.sendval = None   # setting sendval is optional
        self.sched.schedule(self.task)


# ------------------------------------------------------------
#                      === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
    async def foo():
        mytid = await GetTid()
        for i in range(3):
            print("I'm foo", mytid)
            await YieldControl()


    async def bar():
        mytid = await GetTid()
        for i in range(5):
            print("I'm bar", mytid)
            await YieldControl()

    sched = Scheduler()
    sched.new(foo())
    sched.new(bar())
    sched.mainloop()

おすすめ記事