asyncioで並列に走っているタスクを中断する
前回は、asyncio
で複数のTask
を並列に実行するコードを書いた。今回は、実行を開始したTask
を中断したり再開したりする方法をメモする。
Task
のAPIには、cancel
はあるのだが中断したり再開したりといった機能は用意されていないようである。なので、自前で実装しないといけない。考え方としては、実行中/停止中を表すようなフラグを用意して、Task
の中で、そのフラグを適宜参照するようにする。
複数のTask
で、このような状態を共有するには、Event
という仕組みが利用できる(link)。Event
は、内部にboolean
の値を持つオブジェクトである。ポイントは、その状態がFalse
→True
に変わるのを他のコルーチンがawait
することができるような仕組みを提供してくれていることである。
以下が実装例である。前回と同じように🐇と🐢が、異なる頻度で一定回数だけメッセージを表示する。ただ、今回のプログラムでは、実行中にSIGINTシグナルを送ることで(ターミナルでCtrl-C
を押すことで)、それぞれのタスクの動作を中断/再開することができる。
コード
sample2.pyとして保存する。
import asyncio
from datetime import datetime
import signal
def message(m):
print(f"{m} [{datetime.now().strftime('%H:%M:%S')}]")
async def runner(name, speed, event):
distance = 0
while True:
await event.wait()
await asyncio.sleep(1.0 / speed)
distance += 1
message(f"{name}: distance={distance}")
if distance >= 10:
message(f"{name}: finish !")
return
def signal_handler(event, loop):
def switch():
if event.is_set():
event.clear()
else:
event.set()
def handler(*_):
message(" receive signal. race will be " +
("stopped." if event.is_set() else "restarted."))
loop.run_in_executor(None, switch)
return handler
async def main():
event = asyncio.Event()
loop = asyncio.get_running_loop()
signal.signal(signal.SIGINT, signal_handler(event, loop))
rabbit = asyncio.create_task(runner("🐰", 3, event))
turtle = asyncio.create_task(runner("🐢", 1, event))
event.set()
await asyncio.gather(rabbit, turtle)
if __name__ == "__main__":
asyncio.run(main())
実行結果
$ python sample2.py
🐰: distance=1 [13:27:24]
🐰: distance=2 [13:27:24]
🐢: distance=1 [13:27:25]
🐰: distance=3 [13:27:25]
🐰: distance=4 [13:27:25]
^C receive signal. race will be stopped. [13:27:25]
🐰: distance=5 [13:27:25]
🐢: distance=2 [13:27:26]
^C receive signal. race will be restarted. [13:27:32]
🐰: distance=6 [13:27:32]
🐰: distance=7 [13:27:33]
🐢: distance=3 [13:27:33]
🐰: distance=8 [13:27:33]
🐰: distance=9 [13:27:33]
🐰: distance=10 [13:27:34]
🐰: finish ! [13:27:34]
🐢: distance=4 [13:27:34]
^C receive signal. race will be stopped. [13:27:35]
🐢: distance=5 [13:27:35]
^C receive signal. race will be restarted. [13:27:43]
🐢: distance=6 [13:27:44]
🐢: distance=7 [13:27:45]
🐢: distance=8 [13:27:46]
🐢: distance=9 [13:27:47]
🐢: distance=10 [13:27:48]
🐢: finish ! [13:27:48]
観察
^C
を送ったタイミングで、シグナルハンドラが起動している- そのタイミングで、メッセージの表示が中断している
- ただし、シグナルハンドラの起動前に実行されるようにスケジュールされていた
Task
内のループは実行されている(stoppedとrestartedの間にメッセージが表示されてしまっている)
ポイント
Event
オブジェクトを作成し、Task
間で共有している:runner
の引数にevent
が追加されている部分である。SIGINT
で起動するハンドラを登録し、このハンドラの中でEvent
オブジェクトの中身を反転させる: この反転処理は、イベントループの中で実行されるようにコードが調整されている。ここは若干、複雑だが、本稿の主題ではないので説明は省略する。Task
の中で、Event.wait()
をawait
している: ここの部分に来たときにEvent
の中身がFalse
の場合、True
になるまでTask
の実行が中断されたままになる。