ジェネレータ ベースのコルーチンには、send()
呼び出し元と呼び出し先の間の双方向通信を可能にし、呼び出し元から生成されたジェネレータ コルーチンを再開するメソッドがあります。これは、ジェネレータをコルーチンに変換する機能です。
新しいネイティブ コルーチンは非同期 I/O の優れたサポートを提供しますが、それらを使用してとasync/await
同等のものを取得する方法がわかりません。 関数でのsend()
の使用は明示的に禁止されているため、ネイティブ コルーチンはステートメントを使用して 1 回だけ戻ることができます。式はコルーチンに新しい値をもたらしますが、それらの値は呼び出し元ではなく呼び出し先から取得され、待機中の呼び出しは中断したところからではなく、毎回最初から評価されます。yield
async
return
await
返されたコルーチンを中断したところから再開し、新しい値を送信する方法はありますか?David Beazleyのテクニックをエミュレートするにはどうすればよいですか?コルーチンと並行性に関する興味深いコースネイティブコルーチンを使用していますか?
私が考えている一般的なコードパターンは次のようなものです
def myCoroutine():
...
while True:
...
ping = yield(pong)
...
そして発信者
while True:
...
buzz = myCoroutineGen.send(bizz)
...
編集
私はケビンの答えを受け入れましたが、PEP言う
コルーチンは内部的にジェネレータに基づいているため、実装を共有します。ジェネレータオブジェクトと同様に、コルーチン
throw()
、send()
およびメソッドがありますclose()
。
...
throw()
、send()
方法コルーチン値をプッシュしてエラーを発生させるために使用されます未来的なオブジェクト。
どうやらネイティブ コルーチンには があるようですね。コルーチン内で値を受け取るために式なしsend()
でどのように動作するのでしょうか?yield
ベストアンサー1
Beazley によるコルーチンに関する同じ (素晴らしいと言わざるを得ない) コースを受講した後、私は自分自身にまったく同じ質問をしました。Python 3.5 で導入されたネイティブ コルーチンで動作するようにコードを調整するにはどうすればよいでしょうか?
それはできるコードに比較的小さな変更を加えるだけで済みます。読者はコースの教材に精通していると想定し、pyos4.pyベースとなるバージョン - Scheduler
「システム コール」をサポートする最初のバージョン。
ヒント:完全な実行可能な例は以下にあります。付録A最後に。
客観的
目標は、次のコルーチン コードを作成することです。
def foo():
mytid = yield GetTid() # a "system call"
for i in xrange(3):
print "I'm foo", mytid
yield # a "trap"
... ネイティブ コルーチンに組み込んで、以前と同じように使用できます。
async def foo():
mytid = await GetTid() # a "system call"
for i in range(3):
print("I'm foo", mytid)
await ??? # a "trap" (will explain the missing bit later)
asyncio
プロセス全体を駆動する独自のイベント ループ (クラス) がすでにあるため、なしで実行しますScheduler
。
待機可能なオブジェクト
ネイティブ コルーチンはすぐには機能しないため、次のコードはエラーになります。
async def foo():
mytid = await GetTid()
print("I'm foo", mytid)
sched = Scheduler()
sched.new(foo())
sched.mainloop()
トレースバック(最新の呼び出しが最後): ... mytid = GetTid() を待つ TypeError: オブジェクト GetTid は 'await' 式では使用できません
ペップ492どのような種類のオブジェクトを待機できるかを説明します。オプションの1つは__await__
「イテレータを返すメソッドを持つオブジェクト」。
と同様yield from
、ご存知のとおり、await
は、待機対象のオブジェクトとコルーチンを駆動する最も外側のコード (通常はイベント ループ) の間のトンネルとして機能します。これは、次の例で示すのが最もわかりやすいでしょう。
class Awaitable:
def __await__(self):
value = yield 1
print("Awaitable received:", value)
value = yield 2
print("Awaitable received:", value)
value = yield 3
print("Awaitable received:", value)
return 42
async def foo():
print("foo start")
result = await Awaitable()
print("foo received result:", result)
print("foo end")
コルーチンを対話的に実行するとfoo()
、次のものが生成されます。
>>> f_coro = foo() # calling foo() returns a coroutine object
>>> f_coro
<coroutine object foo at 0x7fa7f74046d0>
>>> f_coro.send(None)
foo start
1
>>> f_coro.send("one")
Awaitable received: one
2
>>> f_coro.send("two")
Awaitable received: two
3
>>> f_coro.send("three")
Awaitable received: three
foo received result: 42
foo end
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
送信されたものはすべてf_coro
インスタンスに送られますAwaitable
。同様に、Awaitable.__await__()
生成されたものはすべて、値を送信する最上位のコードにバブルアップされます。
全体のプロセスはf_coro
コルーチンに対して透過的であり、コルーチンは直接関与せず、値の受け渡しを見ることはない。しかし、Awaitable
のイテレータが使い果たされると、その戻る値は式の結果await
(この場合は 42) になり、最終的にここでf_coro
再開されます。
コルーチン内の式も連鎖できることに注意してくださいawait
。コルーチンは、別のコルーチンを待機し、そのコルーチンがさらに別のコルーチンを待機するなど、連鎖全体がどこyield
かで終了するまで続きます。
コルーチン自体に値を送信する
この知識はどのように役立つのでしょうか? コースの教材では、コルーチンはインスタンスを生成することができますSystemCall
。スケジューラはこれを理解し、システム コールが要求された操作を処理できるようにします。
SystemCall
コルーチンがスケジューラに通知するには、SystemCall
インスタンスは単に降伏自体、前のセクションで説明したように、スケジューラに送信されます。
したがって、最初に必要な変更は、このロジックを基本SystemCall
クラスに追加することです。
class SystemCall:
...
def __await__(self):
yield self
インスタンスを待機可能にするとSystemCall
、次のものが実際に実行されます。
async def foo():
mytid = await GetTid()
print("I'm foo", mytid)
>>> sched = Scheduler()
>>> sched.new(foo())
>>> sched.mainloop()
出力:
私はfoo Noneです タスク 1 終了
素晴らしい、もうクラッシュしません!
ただし、コルーチンはタスク ID を受信せず、代わりに を取得しました。これは、システム コールのメソッドによって設定され、メソッドによって送信されたNone
値が原因です。handle()
Task.run()
# in Task.run()
self.target.send(self.sendval)
...SystemCall.__await__()
メソッド内で終了しました。値をコルーチンに持ち込む場合は、システムコールで戻るawait
それがコルーチン内の式の値となるようにします。
class SystemCall:
...
def __await__(self):
return (yield self)
同じコードを変更して実行すると、SystemCall
目的の出力が生成されます。
私はフー1です タスク 1 終了
コルーチンを並行して実行する
コルーチンを一時停止する方法、つまりシステム「トラップ」コードが必要です。コースの教材では、これはyield
コルーチン内のプレーンを使用して行われますが、プレーンを使用しようとすると、await
実際には構文エラーになります。
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await # SyntaxError here
幸いなことに、回避策は簡単です。すでに機能するシステム コールがあるため、コルーチンを一時停止してすぐに再スケジュールするだけのダミーの no-op システム コールを追加できます。
class YieldControl(SystemCall):
def handle(self):
self.task.sendval = None # setting sendval is optional
self.sched.schedule(self.task)
sendval
このシステム コールは意味のある値を生成するとは想定されていないため、タスクに を設定することはオプションですが、これを明示的に行うことを選択します。
これで、マルチタスク オペレーティング システムを実行するための準備がすべて整いました。
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await YieldControl()
async def bar():
mytid = await GetTid()
for i in range(5):
print("I'm bar", mytid)
await YieldControl()
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()
出力:
私はフー1です 私はバー2です 私はフー1です 私はバー2です 私はフー1です 私はバー2です タスク 1 終了 私はバー2です 私はバー2です タスク2は終了しました
脚注
コードScheduler
はまったく変更されていません。
それは、まさに、機能します。
これは、スケジューラとその中で実行されるタスクが互いに結合されていない元の設計の美しさを示しており、知らないうちにコルーチンの実装を変更できましたScheduler
。Task
コルーチンをラップするクラスさえ変更する必要はありませんでした。
トランポリンは必要ありません。
の中にpyos8.pyシステムのバージョン、概念トランポリンが実装されています。これにより、コルーチンはスケジューラの助けを借りて、作業の一部を別のコルーチンに委任できます (スケジューラは親コルーチンに代わってサブコルーチンを呼び出し、その結果を親に送信します)。
このメカニズムは必要ありません。冒頭で説明したようにawait
、 (およびその古いバージョン) によって、このような連鎖がすでに可能になっているためです。yield from
付録 A - 完全な実行可能な例 (Python 3.5 以降が必要)
例_full.pyfrom queue import Queue
# ------------------------------------------------------------
# === Tasks ===
# ------------------------------------------------------------
class Task:
taskid = 0
def __init__(self,target):
Task.taskid += 1
self.tid = Task.taskid # Task ID
self.target = target # Target coroutine
self.sendval = None # Value to send
# Run a task until it hits the next yield statement
def run(self):
return self.target.send(self.sendval)
# ------------------------------------------------------------
# === Scheduler ===
# ------------------------------------------------------------
class Scheduler:
def __init__(self):
self.ready = Queue()
self.taskmap = {}
def new(self,target):
newtask = Task(target)
self.taskmap[newtask.tid] = newtask
self.schedule(newtask)
return newtask.tid
def exit(self,task):
print("Task %d terminated" % task.tid)
del self.taskmap[task.tid]
def schedule(self,task):
self.ready.put(task)
def mainloop(self):
while self.taskmap:
task = self.ready.get()
try:
result = task.run()
if isinstance(result,SystemCall):
result.task = task
result.sched = self
result.handle()
continue
except StopIteration:
self.exit(task)
continue
self.schedule(task)
# ------------------------------------------------------------
# === System Calls ===
# ------------------------------------------------------------
class SystemCall:
def handle(self):
pass
def __await__(self):
return (yield self)
# Return a task's ID number
class GetTid(SystemCall):
def handle(self):
self.task.sendval = self.task.tid
self.sched.schedule(self.task)
class YieldControl(SystemCall):
def handle(self):
self.task.sendval = None # setting sendval is optional
self.sched.schedule(self.task)
# ------------------------------------------------------------
# === Example ===
# ------------------------------------------------------------
if __name__ == '__main__':
async def foo():
mytid = await GetTid()
for i in range(3):
print("I'm foo", mytid)
await YieldControl()
async def bar():
mytid = await GetTid()
for i in range(5):
print("I'm bar", mytid)
await YieldControl()
sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()