協程和任務¶
本節概述了用於協程和任務的高階 asyncio API。
協程¶
原始碼: Lib/asyncio/coroutines.py
使用 async/await 語法宣告的協程是編寫 asyncio 應用程式的首選方式。例如,以下程式碼片段會列印“hello”,等待 1 秒,然後列印“world”
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
請注意,簡單地呼叫協程並不會將其排程為執行
>>> main()
<coroutine object main at 0x1053bb7c8>
要實際執行協程,asyncio 提供了以下機制
asyncio.run()
函式用於執行頂層入口點“main()”函式(參見上面的示例)。等待協程。以下程式碼片段將等待 1 秒後列印“hello”,然後等待**再** 2 秒後列印“world”
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
預期輸出
started at 17:13:52 hello world finished at 17:13:55
asyncio.create_task()
函式用於將協程作為 asyncio任務
併發執行。讓我們修改上面的示例,**併發**執行兩個
say_after
協程async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}")
請注意,預期輸出現在顯示該程式碼片段比之前快了 1 秒
started at 17:14:32 hello world finished at 17:14:34
asyncio.TaskGroup
類提供了create_task()
的更現代的替代方案。使用此 API,上一個示例變為async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task( say_after(1, 'hello')) task2 = tg.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # The await is implicit when the context manager exits. print(f"finished at {time.strftime('%X')}")
時間和輸出應與上一個版本相同。
3.11 版本新增:
asyncio.TaskGroup
。
可等待物件¶
如果一個物件可以在 await
表示式中使用,我們稱之為**可等待**物件。許多 asyncio API 都設計為接受可等待物件。
**可等待**物件主要有三種類型:**協程**、**任務**和**Future**。
協程
Python 協程是**可等待物件**,因此可以從其他協程中等待
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested() # will raise a "RuntimeWarning".
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
任務
**任務**用於**併發**排程協程。
當協程被 asyncio.create_task()
等函式封裝成**任務**時,協程會自動排程並儘快執行
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Future
Future
是一種特殊的**低階**可等待物件,表示非同步操作的**最終結果**。
當一個 Future 物件被*等待*時,意味著協程將等待 Future 在其他地方得到解決。
asyncio 中的 Future 物件是為了允許基於回撥的程式碼與 async/await 一起使用。
通常**不需要**在應用程式級別程式碼中建立 Future 物件。
Future 物件,有時由庫和一些 asyncio API 公開,可以被等待
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
一個返回 Future 物件的低階函式的好例子是 loop.run_in_executor()
。
建立任務¶
原始碼: Lib/asyncio/tasks.py
- asyncio.create_task(coro, *, name=None, context=None, eager_start=None, **kwargs)¶
將 *coro* 協程封裝到
Task
中並排程其執行。返回 Task 物件。完整的函式簽名與
Task
建構函式(或工廠)的簽名基本相同——此函式的所有關鍵字引數都傳遞給該介面。一個可選的僅限關鍵字引數 *context* 允許為 *coro* 指定一個自定義的
contextvars.Context
來執行。如果未提供 *context*,則建立當前上下文的副本。一個可選的僅限關鍵字引數 *eager_start* 允許指定任務是否應在呼叫 create_task 期間急切執行,或稍後排程。如果未傳遞 *eager_start*,則將使用
loop.set_task_factory()
設定的模式。任務在
get_running_loop()
返回的迴圈中執行,如果當前執行緒中沒有執行的迴圈,則會引發RuntimeError
。備註
asyncio.TaskGroup.create_task()
是一種新的替代方案,利用了結構化併發;它允許以強大的安全保證等待一組相關任務完成。重要
儲存此函式結果的引用,以避免任務在執行中途消失。事件迴圈只對任務保持弱引用。未在其他地方引用的任務可能隨時被垃圾回收,即使在完成之前也是如此。對於可靠的“即發即棄”後臺任務,將它們收集到一個集合中
background_tasks = set() for i in range(10): task = asyncio.create_task(some_coro(param=i)) # Add task to the set. This creates a strong reference. background_tasks.add(task) # To prevent keeping references to finished tasks forever, # make each task remove its own reference from the set after # completion: task.add_done_callback(background_tasks.discard)
在 3.7 版本加入。
3.8 版本變化:新增了 *name* 引數。
3.11 版本變化:新增了 *context* 引數。
3.14 版本變化:透過傳遞所有 *kwargs* 增加了 *eager_start* 引數。
任務取消¶
任務可以輕鬆安全地取消。當任務被取消時,asyncio.CancelledError
將在下次機會在任務中引發。
建議協程使用 try/finally
塊來健壯地執行清理邏輯。如果顯式捕獲到 asyncio.CancelledError
,則在清理完成後通常應將其傳播。asyncio.CancelledError
直接繼承自 BaseException
,因此大多數程式碼無需瞭解它。
支援結構化併發的 asyncio 元件,如 asyncio.TaskGroup
和 asyncio.timeout()
,內部使用取消來實現,如果協程吞噬了 asyncio.CancelledError
,可能會出現異常行為。類似地,使用者程式碼通常不應該呼叫 uncancel
。但是,在真正需要抑制 asyncio.CancelledError
的情況下,還需要呼叫 uncancel()
以完全移除取消狀態。
任務組¶
任務組結合了任務建立 API 和一種方便可靠的方式來等待組中的所有任務完成。
- class asyncio.TaskGroup¶
一個包含任務組的 非同步上下文管理器。任務可以使用
create_task()
新增到組中。當上下文管理器退出時,所有任務都會被等待。在 3.11 版本中新增。
- create_task(coro, *, name=None, context=None, eager_start=None, **kwargs)¶
在此任務組中建立一個任務。簽名與
asyncio.create_task()
的簽名匹配。如果任務組不活躍(例如,尚未進入,已完成,或正在關閉),我們將關閉給定的coro
。3.13 版本變化:如果任務組不活躍,則關閉給定的協程。
3.14 版本變化:將所有 *kwargs* 傳遞給
loop.create_task()
示例
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro(...))
task2 = tg.create_task(another_coro(...))
print(f"Both tasks have completed now: {task1.result()}, {task2.result()}")
async with
語句將等待組中的所有任務完成。在等待期間,仍可以向組中新增新任務(例如,透過將 tg
傳遞給其中一個協程並在該協程中呼叫 tg.create_task()
)。一旦最後一個任務完成並且 async with
塊退出,就不能再向組中新增新任務。
當組中任何任務因 asyncio.CancelledError
之外的異常失敗時,其餘任務將被取消。此後不能再向組中新增任務。此時,如果 async with
語句的主體仍處於活動狀態(即,__aexit__()
尚未被呼叫),則直接包含 async with
語句的任務也將被取消。由此產生的 asyncio.CancelledError
將中斷 await
,但它不會冒出包含的 async with
語句。
一旦所有任務完成,如果任何任務因 asyncio.CancelledError
之外的異常失敗,這些異常將被組合到 ExceptionGroup
或 BaseExceptionGroup
中(根據需要;參見其文件),然後引發。
兩個基本異常被特殊處理:如果任何任務因 KeyboardInterrupt
或 SystemExit
而失敗,任務組仍然會取消其餘任務並等待它們,但隨後會重新引發最初的 KeyboardInterrupt
或 SystemExit
,而不是 ExceptionGroup
或 BaseExceptionGroup
。
如果 async with
語句的主體因異常而退出(因此 __aexit__()
被呼叫並設定了異常),這與其中一個任務失敗的情況相同:其餘任務被取消並等待,非取消異常被分組到一個異常組中並引發。傳遞給 __aexit__()
的異常,除非它是 asyncio.CancelledError
,也包含在異常組中。對於 KeyboardInterrupt
和 SystemExit
,與上一段相同地進行了特殊處理。
任務組會小心地不將用於“喚醒”其 __aexit__()
的內部取消與由其他方發出的對其正在執行的任務的取消請求混淆。特別是,當一個任務組在語法上巢狀在另一個任務組中,並且兩者都同時在其子任務之一中遇到異常時,內部任務組將處理其異常,然後外部任務組將收到另一個取消並處理其自身的異常。
在任務組被外部取消並且還必須引發 ExceptionGroup
的情況下,它將呼叫父任務的 cancel()
方法。這確保了在下一個 await
處將引發 asyncio.CancelledError
,因此取消不會丟失。
任務組保留了 asyncio.Task.cancelling()
報告的取消計數。
3.13 版本變化:改進了同時進行的內部和外部取消處理,並正確保留了取消計數。
終止任務組¶
雖然標準庫本身不支援終止任務組,但可以透過向任務組新增一個引發異常的任務並忽略所引發的異常來實現終止
import asyncio
from asyncio import TaskGroup
class TerminateTaskGroup(Exception):
"""Exception raised to terminate a task group."""
async def force_terminate_task_group():
"""Used to force termination of a task group."""
raise TerminateTaskGroup()
async def job(task_id, sleep_time):
print(f'Task {task_id}: start')
await asyncio.sleep(sleep_time)
print(f'Task {task_id}: done')
async def main():
try:
async with TaskGroup() as group:
# spawn some tasks
group.create_task(job(1, 0.5))
group.create_task(job(2, 1.5))
# sleep for 1 second
await asyncio.sleep(1)
# add an exception-raising task to force the group to terminate
group.create_task(force_terminate_task_group())
except* TerminateTaskGroup:
pass
asyncio.run(main())
預期輸出
Task 1: start
Task 2: start
Task 1: done
休眠¶
- async asyncio.sleep(delay, result=None)¶
阻塞 *delay* 秒。
如果提供了 *result*,它將在協程完成時返回給呼叫者。
sleep()
總是暫停當前任務,允許其他任務執行。將延遲設定為 0 提供了一種最佳化路徑,以允許其他任務執行。這可以由長時間執行的函式使用,以避免在函式呼叫期間完全阻塞事件迴圈。
協程每秒顯示當前日期 5 秒的示例
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
版本 3.10 中已更改: 移除了 loop 引數。
3.13 版本變化:如果 *delay* 是
nan
,則引發ValueError
。
併發執行任務¶
- awaitable asyncio.gather(*aws, return_exceptions=False)¶
**併發**執行 *aws* 序列中的可等待物件。
如果 *aws* 中的任何可等待物件是協程,它會自動排程為任務。
如果所有可等待物件都成功完成,結果是一個返回值的聚合列表。結果值的順序與 *aws* 中可等待物件的順序相對應。
如果 *return_exceptions* 為
False
(預設),則第一個引發的異常會立即傳播到等待gather()
的任務。*aws* 序列中的其他可等待物件**不會被取消**,並將繼續執行。如果 *return_exceptions* 為
True
,異常將與成功結果一樣處理,並聚合到結果列表中。如果
gather()
被**取消**,則所有已提交的可等待物件(尚未完成的)也會被**取消**。如果 *aws* 序列中的任何 Task 或 Future 被*取消*,則將其視為引發了
CancelledError
—— 在這種情況下,gather()
呼叫**不會**被取消。這是為了防止一個提交的 Task/Future 的取消導致其他 Task/Future 被取消。備註
建立和執行任務並等待其完成的新替代方案是
asyncio.TaskGroup
。*TaskGroup* 為排程子任務巢狀提供了比 *gather* 更強的安全保證:如果一個任務(或子任務,由任務排程的任務)引發異常,*TaskGroup* 將取消剩餘的已排程任務,而 *gather* 不會。示例
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({number}), currently i={i}...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") return f async def main(): # Schedule three calls *concurrently*: L = await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) print(L) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2), currently i=2... # Task B: Compute factorial(3), currently i=2... # Task C: Compute factorial(4), currently i=2... # Task A: factorial(2) = 2 # Task B: Compute factorial(3), currently i=3... # Task C: Compute factorial(4), currently i=3... # Task B: factorial(3) = 6 # Task C: Compute factorial(4), currently i=4... # Task C: factorial(4) = 24 # [2, 6, 24]
備註
如果 *return_exceptions* 為 false,則在 gather() 被標記為完成之後取消它不會取消任何已提交的可等待物件。例如,gather 在將異常傳播給呼叫者後可能會被標記為完成,因此,在從 gather 捕獲異常(由其中一個可等待物件引發)後呼叫
gather.cancel()
不會取消任何其他可等待物件。3.7 版本變化:如果 *gather* 本身被取消,無論 *return_exceptions* 如何,取消都會傳播。
版本 3.10 中已更改: 移除了 loop 引數。
自 3.10 版本棄用:如果未提供位置引數或並非所有位置引數都是類 Future 物件且沒有執行中的事件迴圈,則會發出棄用警告。
急切任務工廠¶
- asyncio.eager_task_factory(loop, coro, *, name=None, context=None)¶
用於急切任務執行的任務工廠。
當使用此工廠(透過
loop.set_task_factory(asyncio.eager_task_factory)
)時,協程在Task
構造期間同步開始執行。任務僅在其阻塞時才在事件迴圈中排程。這可以提高效能,因為對於同步完成的協程,避免了迴圈排程的開銷。一個有益的常見示例是協程,它們採用快取或記憶化來儘可能避免實際的 I/O。
備註
協程的立即執行是一個語義變化。如果協程返回或引發,任務永遠不會被排程到事件迴圈。如果協程執行阻塞,任務將被排程到事件迴圈。此更改可能會給現有應用程式帶來行為變化。例如,應用程式的任務執行順序可能會發生變化。
3.12 新版功能.
- asyncio.create_eager_task_factory(custom_task_constructor)¶
建立一個急切任務工廠,類似於
eager_task_factory()
,在建立新任務時使用提供的 *custom_task_constructor* 而不是預設的Task
。*custom_task_constructor* 必須是**可呼叫物件**,其簽名與
Task.__init__
的簽名匹配。該可呼叫物件必須返回一個與asyncio.Task
相容的物件。此函式返回一個**可呼叫物件**,旨在透過
loop.set_task_factory(factory)
) 用作事件迴圈的任務工廠。3.12 新版功能.
避免取消¶
- awaitable asyncio.shield(aw)¶
-
如果 *aw* 是協程,它會自動排程為任務。
語句
task = asyncio.create_task(something()) res = await shield(task)
等價於
res = await something()
但是,如果包含它的協程被取消,則在
something()
中執行的任務不會被取消。從something()
的角度來看,取消沒有發生。儘管其呼叫者仍然被取消,因此“await”表示式仍然引發CancelledError
。如果
something()
被其他方式取消(即從自身內部取消),那也會取消shield()
。如果希望完全忽略取消(不建議),則應將
shield()
函式與 try/except 子句結合使用,如下所示task = asyncio.create_task(something()) try: res = await shield(task) except CancelledError: res = None
重要
儲存傳遞給此函式的任務的引用,以避免任務在執行中途消失。事件迴圈只對任務保持弱引用。未在其他地方引用的任務可能隨時被垃圾回收,即使在完成之前也是如此。
版本 3.10 中已更改: 移除了 loop 引數。
自 3.10 版本棄用:如果 *aw* 不是類 Future 物件且沒有執行中的事件迴圈,則會發出棄用警告。
超時¶
- asyncio.timeout(delay)¶
返回一個非同步上下文管理器,可用於限制等待時間。
*delay* 可以是
None
,也可以是一個浮點數/整數的秒數。如果 *delay* 是None
,則不應用時間限制;當建立上下文管理器時延遲未知時,這可能很有用。在任何一種情況下,上下文管理器都可以在建立後使用
Timeout.reschedule()
重新排程。示例
async def main(): async with asyncio.timeout(10): await long_running_task()
如果
long_running_task
耗時超過 10 秒才能完成,上下文管理器將取消當前任務並在內部處理由此產生的asyncio.CancelledError
,將其轉換為可被捕獲和處理的TimeoutError
。備註
asyncio.timeout()
上下文管理器將asyncio.CancelledError
轉換為TimeoutError
,這意味著TimeoutError
只能在上下文管理器**外部**捕獲。捕獲
TimeoutError
的示例async def main(): try: async with asyncio.timeout(10): await long_running_task() except TimeoutError: print("The long operation timed out, but we've handled it.") print("This statement will run regardless.")
由
asyncio.timeout()
生成的上下文管理器可以重新排程到不同的截止日期並進行檢查。- class asyncio.Timeout(when)¶
一個用於取消過期協程的 非同步上下文管理器。
when
應該是一個絕對時間,上下文應該在該時間超時,由事件迴圈的時鐘測量如果
when
為None
,則超時永遠不會觸發。如果
when < loop.time()
,則超時將在事件迴圈的下一次迭代中觸發。
示例
async def main(): try: # We do not know the timeout when starting, so we pass ``None``. async with asyncio.timeout(None) as cm: # We know the timeout now, so we reschedule it. new_deadline = get_running_loop().time() + 10 cm.reschedule(new_deadline) await long_running_task() except TimeoutError: pass if cm.expired(): print("Looks like we haven't finished on time.")
超時上下文管理器可以安全地巢狀。
在 3.11 版本中新增。
- asyncio.timeout_at(when)¶
類似於
asyncio.timeout()
,只是 *when* 是停止等待的絕對時間,或None
。示例
async def main(): loop = get_running_loop() deadline = loop.time() + 20 try: async with asyncio.timeout_at(deadline): await long_running_task() except TimeoutError: print("The long operation timed out, but we've handled it.") print("This statement will run regardless.")
在 3.11 版本中新增。
- async asyncio.wait_for(aw, timeout)¶
等待 *aw* 可等待物件在超時內完成。
如果 *aw* 是協程,它會自動排程為任務。
*timeout* 可以是
None
,也可以是等待的浮點數或整數秒數。如果 *timeout* 是None
,則阻塞直到 future 完成。如果發生超時,它將取消任務並引發
TimeoutError
。函式將一直等待直到 future 實際被取消,因此總等待時間可能超過 *timeout*。如果在取消期間發生異常,它將被傳播。
如果等待被取消,future *aw* 也會被取消。
示例
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
3.7 版本變化:當 *aw* 因超時而被取消時,
wait_for
會等待 *aw* 被取消。以前,它會立即引發TimeoutError
。版本 3.10 中已更改: 移除了 loop 引數。
3.11 版本變化:引發
TimeoutError
而不是asyncio.TimeoutError
。
等待原語¶
- async asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)¶
併發執行 *aws* 可迭代物件中的
Future
和Task
例項,並阻塞直到 *return_when* 指定的條件。*aws* 可迭代物件不能為空。
返回兩組任務/期物:
(done, pending)
。用法
done, pending = await asyncio.wait(aws)
如果指定了 *timeout*(浮點數或整數),可用於控制返回前的最大等待秒數。
請注意,此函式不會引發
TimeoutError
。超時發生時未完成的 Futures 或 Tasks 只會返回在第二個集合中。*return_when* 指示此函式何時返回。它必須是以下常量之一
常量
描述
- asyncio.FIRST_COMPLETED¶
當任何 future 完成或被取消時,函式將返回。
- asyncio.FIRST_EXCEPTION¶
當任何 future 因引發異常而完成時,函式將返回。如果沒有 future 引發異常,則它等同於
ALL_COMPLETED
。- asyncio.ALL_COMPLETED¶
當所有 future 完成或被取消時,函式將返回。
與
wait_for()
不同,wait()
在超時發生時不會取消期物。版本 3.10 中已更改: 移除了 loop 引數。
3.11 版本變化:直接向
wait()
傳遞協程物件是被禁止的。3.12 版本變化:增加了對生成器產生任務的支援。
- asyncio.as_completed(aws, *, timeout=None)¶
併發執行 *aws* 可迭代物件中的可等待物件。返回的物件可以迭代以獲取可等待物件完成後的結果。
as_completed()
返回的物件可以作為非同步迭代器或普通迭代器進行迭代。當使用非同步迭代時,如果提供的可等待物件是任務或期物,則會生成它們。這使得將先前排程的任務與其結果關聯起來變得容易。示例ipv4_connect = create_task(open_connection("127.0.0.1", 80)) ipv6_connect = create_task(open_connection("::1", 80)) tasks = [ipv4_connect, ipv6_connect] async for earliest_connect in as_completed(tasks): # earliest_connect is done. The result can be obtained by # awaiting it or calling earliest_connect.result() reader, writer = await earliest_connect if earliest_connect is ipv6_connect: print("IPv6 connection established.") else: print("IPv4 connection established.")
在非同步迭代期間,對於不是任務或期物的已提供的可等待物件,將生成隱式建立的任務。
當用作普通迭代器時,每次迭代都會產生一個新的協程,該協程返回結果或引發下一個完成的可等待物件的異常。此模式與 Python 3.13 之前的版本相容
ipv4_connect = create_task(open_connection("127.0.0.1", 80)) ipv6_connect = create_task(open_connection("::1", 80)) tasks = [ipv4_connect, ipv6_connect] for next_connect in as_completed(tasks): # next_connect is not one of the original task objects. It must be # awaited to obtain the result value or raise the exception of the # awaitable that finishes next. reader, writer = await next_connect
如果在所有可等待物件完成之前發生超時,則會引發
TimeoutError
。這由非同步迭代期間的async for
迴圈或普通迭代期間生成的協程引發。版本 3.10 中已更改: 移除了 loop 引數。
自 3.10 版本棄用:如果 *aws* 可迭代物件中並非所有可等待物件都是類 Future 物件且沒有執行中的事件迴圈,則會發出棄用警告。
3.12 版本變化:增加了對生成器產生任務的支援。
線上程中執行¶
- async asyncio.to_thread(func, /, *args, **kwargs)¶
在單獨的執行緒中非同步執行函式 *func*。
為此函式提供的任何 *args 和 **kwargs 將直接傳遞給 *func*。此外,當前的
contextvars.Context
將被傳播,允許在單獨的執行緒中訪問事件迴圈執行緒的上下文變數。返回一個可等待的協程,以獲取 *func* 的最終結果。
此協程函式主要用於執行 I/O 密集型函式/方法,否則如果它們在主執行緒中執行,將阻塞事件迴圈。例如
def blocking_io(): print(f"start blocking_io at {time.strftime('%X')}") # Note that time.sleep() can be replaced with any blocking # IO-bound operation, such as file operations. time.sleep(1) print(f"blocking_io complete at {time.strftime('%X')}") async def main(): print(f"started main at {time.strftime('%X')}") await asyncio.gather( asyncio.to_thread(blocking_io), asyncio.sleep(1)) print(f"finished main at {time.strftime('%X')}") asyncio.run(main()) # Expected output: # # started main at 19:50:53 # start blocking_io at 19:50:53 # blocking_io complete at 19:50:54 # finished main at 19:50:54
在任何協程中直接呼叫
blocking_io()
會在其持續時間內阻塞事件迴圈,導致額外 1 秒的執行時間。相反,透過使用asyncio.to_thread()
,我們可以在單獨的執行緒中執行它,而不會阻塞事件迴圈。備註
由於 GIL,
asyncio.to_thread()
通常只能用於使 I/O 密集型函式非阻塞。但是,對於釋放 GIL 的擴充套件模組或沒有 GIL 的替代 Python 實現,asyncio.to_thread()
也可以用於 CPU 密集型函式。在 3.9 版本中新增。
從其他執行緒排程¶
- asyncio.run_coroutine_threadsafe(coro, loop)¶
向給定的事件迴圈提交一個協程。執行緒安全。
返回一個
concurrent.futures.Future
,用於等待來自另一個作業系統執行緒的結果。此函式旨在從與事件迴圈執行執行緒不同的作業系統執行緒中呼叫。示例
def in_thread(loop: asyncio.AbstractEventLoop) -> None: # Run some blocking IO pathlib.Path("example.txt").write_text("hello world", encoding="utf8") # Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout=2) == 3 async def amain() -> None: # Get the running loop loop = asyncio.get_running_loop() # Run something in a thread await asyncio.to_thread(in_thread, loop)
也可以反向執行。示例
@contextlib.contextmanager def loop_in_thread() -> Generator[asyncio.AbstractEventLoop]: loop_fut = concurrent.futures.Future[asyncio.AbstractEventLoop]() stop_event = asyncio.Event() async def main() -> None: loop_fut.set_result(asyncio.get_running_loop()) await stop_event.wait() with concurrent.futures.ThreadPoolExecutor(1) as tpe: complete_fut = tpe.submit(asyncio.run, main()) for fut in concurrent.futures.as_completed((loop_fut, complete_fut)): if fut is loop_fut: loop = loop_fut.result() try: yield loop finally: loop.call_soon_threadsafe(stop_event.set) else: fut.result() # Create a loop in another thread with loop_in_thread() as loop: # Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout=2) == 3
如果在協程中引發異常,則返回的 Future 將收到通知。它還可以用於取消事件迴圈中的任務
try: result = future.result(timeout) except TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
請參閱文件的併發和多執行緒部分。
與其他 asyncio 函式不同,此函式需要顯式傳遞 *loop* 引數。
3.5.1 版本新增。
自省¶
- asyncio.current_task(loop=None)¶
返回當前正在執行的
Task
例項,如果當前沒有任務正在執行,則返回None
。如果 *loop* 為
None
,則使用get_running_loop()
獲取當前迴圈。在 3.7 版本加入。
- asyncio.all_tasks(loop=None)¶
返回由迴圈執行的尚未完成的
Task
物件的集合。如果 *loop* 是
None
,則使用get_running_loop()
獲取當前迴圈。在 3.7 版本加入。
- asyncio.iscoroutine(obj)¶
如果 *obj* 是協程物件,則返回
True
。在 3.4 版本加入。
任務物件¶
- class asyncio.Task(coro, *, loop=None, name=None, context=None, eager_start=False)¶
一個
類似Future
的物件,執行Python協程。非執行緒安全。任務用於在事件迴圈中執行協程。如果協程等待 Future,任務會暫停協程的執行並等待 Future 完成。當 Future *完成*後,被封裝的協程的執行會恢復。
事件迴圈使用協作排程:一個事件迴圈一次執行一個任務。當一個任務等待 Future 完成時,事件迴圈會執行其他任務、回撥或執行 I/O 操作。
使用高階
asyncio.create_task()
函式建立任務,或低階loop.create_task()
或ensure_future()
函式。不鼓勵手動例項化任務。要取消正在執行的任務,請使用
cancel()
方法。呼叫它將導致任務向被封裝的協程丟擲CancelledError
異常。如果協程在取消期間正在等待 Future 物件,則 Future 物件將被取消。cancelled()
可用於檢查任務是否被取消。如果封裝的協程沒有抑制CancelledError
異常並實際被取消,則該方法返回True
。asyncio.Task
繼承了Future
的所有 API,除了Future.set_result()
和Future.set_exception()
。一個可選的僅限關鍵字引數 *context* 允許為 *coro* 指定一個自定義的
contextvars.Context
來執行。如果未提供 *context*,則任務複製當前上下文,並在複製的上下文中執行其協程。可選的僅限關鍵字引數 *eager_start* 允許在任務建立時急切地啟動
asyncio.Task
的執行。如果設定為True
且事件迴圈正在執行,任務將立即開始執行協程,直到協程第一次阻塞。如果協程在不阻塞的情況下返回或引發,任務將急切完成並跳過排程到事件迴圈。3.7 版本變化:增加了對
contextvars
模組的支援。3.8 版本變化:新增了 *name* 引數。
自 3.10 版本棄用:如果未指定 *loop* 且沒有執行中的事件迴圈,則會發出棄用警告。
3.11 版本變化:新增了 *context* 引數。
3.12 版本變化:新增了 *eager_start* 引數。
- done()¶
如果任務*完成*,則返回
True
。當封裝的協程返回一個值、引發一個異常或任務被取消時,任務就*完成*了。
- result()¶
返回任務的結果。
如果任務*完成*,則返回封裝協程的結果(如果協程引發了異常,則重新引發該異常)。
如果任務已被*取消*,此方法將引發
CancelledError
異常。如果任務的結果尚未可用,此方法將引發
InvalidStateError
異常。
- exception()¶
返回任務的異常。
如果封裝的協程引發了異常,則返回該異常。如果封裝的協程正常返回,此方法返回
None
。如果任務已被*取消*,此方法將引發
CancelledError
異常。如果任務尚未*完成*,此方法將引發
InvalidStateError
異常。
- add_done_callback(callback, *, context=None)¶
新增一個回撥函式,當任務*完成*時執行。
此方法僅應在低階基於回撥的程式碼中使用。
有關更多詳細資訊,請參閱
Future.add_done_callback()
的文件。
- remove_done_callback(callback)¶
從回撥列表中移除 callback。
此方法僅應在低階基於回撥的程式碼中使用。
有關更多詳細資訊,請參閱
Future.remove_done_callback()
的文件。
- get_stack(*, limit=None)¶
返回此 Task 的堆疊幀列表。
如果包裝的協程尚未完成,則此方法返回其暫停時的堆疊。如果協程已成功完成或已取消,則此方法返回一個空列表。如果協程因異常而終止,則此方法返回回溯幀列表。
幀始終按從最舊到最新的順序排列。
對於暫停的協程,只返回一個堆疊幀。
可選的 limit 引數設定要返回的最大幀數;預設情況下,返回所有可用幀。返回列表的順序取決於返回的是堆疊還是回溯:返回堆疊的最新幀,但返回回溯的最舊幀。(這與 traceback 模組的行為匹配。)
- print_stack(*, limit=None, file=None)¶
列印此 Task 的堆疊或回溯。
此方法生成的輸出類似於 traceback 模組對
get_stack()
檢索到的幀的輸出。limit 引數直接傳遞給
get_stack()
。file 引數是一個 I/O 流,輸出將寫入其中;預設情況下,輸出寫入
sys.stdout
。
- get_coro()¶
返回由
Task
包裝的協程物件。備註
對於已急切完成的 Task,此方法將返回
None
。請參閱 急切任務工廠。在 3.8 版本加入。
版本 3.12 中的變更: 新新增的急切任務執行意味著結果可能為
None
。
- get_context()¶
返回與任務關聯的
contextvars.Context
物件。3.12 新版功能.
- get_name()¶
返回 Task 的名稱。
如果未明確為 Task 分配名稱,則預設的 asyncio Task 實現會在例項化期間生成一個預設名稱。
在 3.8 版本加入。
- set_name(value)¶
設定 Task 的名稱。
value 引數可以是任何物件,然後將其轉換為字串。
在預設的 Task 實現中,名稱將在 Task 物件的
repr()
輸出中可見。在 3.8 版本加入。
- cancel(msg=None)¶
請求取消任務。
如果任務已 完成 或 已取消,則返回
False
,否則返回True
。此方法安排在事件迴圈的下一個週期中向包裝的協程丟擲
CancelledError
異常。然後,協程有機會透過使用
try
……except CancelledError
……finally
塊來清理甚至拒絕請求。因此,與Future.cancel()
不同,Task.cancel()
不能保證任務會被取消,儘管完全抑制取消並不常見,並且強烈不建議這樣做。如果協程仍然決定抑制取消,它除了捕獲異常外,還需要呼叫Task.uncancel()
。版本 3.9 中的變更: 添加了 msg 引數。
版本 3.11 中的變更:
msg
引數從取消的任務傳播到其等待者。以下示例說明了協程如何截獲取消請求
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now
- cancelled()¶
如果 Task 已 取消,則返回
True
。當使用
cancel()
請求取消並且包裝的協程傳播了拋入其中的CancelledError
異常時,Task 就會被 取消。
- uncancel()¶
減少對此 Task 的取消請求計數。
返回剩餘的取消請求數量。
請注意,一旦已取消任務的執行完成,對
uncancel()
的後續呼叫將無效。在 3.11 版本中新增。
此方法由 asyncio 的內部機制使用,不應由終端使用者程式碼使用。特別是,如果 Task 成功取消,這將允許結構化併發的元素(如 Task Groups 和
asyncio.timeout()
)繼續執行,將取消隔離到相應的結構化塊。例如async def make_request_with_timeout(): try: async with asyncio.timeout(1): # Structured block affected by the timeout: await make_request() await make_another_request() except TimeoutError: log("There was a timeout") # Outer code not affected by the timeout: await unrelated_code()
雖然包含
make_request()
和make_another_request()
的塊可能會由於超時而被取消,但即使在超時情況下,unrelated_code()
也應該繼續執行。這是透過uncancel()
實現的。TaskGroup
上下文管理器以類似的方式使用uncancel()
。如果終端使用者程式碼由於某種原因透過捕獲
CancelledError
來抑制取消,則需要呼叫此方法來移除取消狀態。當此方法將取消計數遞減到零時,該方法會檢查之前的
cancel()
呼叫是否已安排將CancelledError
拋入任務中。如果尚未丟擲,則該安排將被撤銷(透過重置內部_must_cancel
標誌)。
版本 3.13 中的變更: 更改為在達到零時撤銷待處理的取消請求。
- cancelling()¶
返回對此 Task 的待處理取消請求數量,即呼叫
cancel()
的次數減去呼叫uncancel()
的次數。請注意,如果此數字大於零但 Task 仍在執行,
cancelled()
仍將返回False
。這是因為此數字可以透過呼叫uncancel()
來降低,如果取消請求降至零,這可能導致任務最終不會被取消。此方法由 asyncio 的內部機制使用,不應由終端使用者程式碼使用。有關更多詳細資訊,請參閱
uncancel()
。在 3.11 版本中新增。