協程和任務¶
本節概述了用於處理協程和任務的高階 asyncio API。
協程¶
原始碼: Lib/asyncio/coroutines.py
使用 async/await 語法宣告的協程是編寫 asyncio 應用程式的首選方法。例如,以下程式碼片段會列印 “hello”,等待 1 秒,然後列印 “world”
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
請注意,簡單地呼叫協程不會將其安排為執行
>>> main()
<coroutine object main at 0x1053bb7c8>
為了實際執行協程,asyncio 提供了以下機制
asyncio.run()
函式用於執行頂層入口點 “main()” 函式(請參見上面的示例。)等待協程。以下程式碼片段將在等待 1 秒後列印 “hello”,然後在等待另外 2 秒後列印 “world”
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
預期輸出
started at 17:13:52 hello world finished at 17:13:55
asyncio.create_task()
函式用於將協程作為 asyncio任務
併發執行。讓我們修改上面的示例並併發執行兩個
say_after
協程async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}")
請注意,預期輸出現在表明該程式碼片段比以前快 1 秒執行
started at 17:14:32 hello world finished at 17:14:34
asyncio.TaskGroup
類提供了create_task()
更現代的替代方案。使用此 API,上一個示例變為async def main(): async with asyncio.TaskGroup() as tg: task1 = tg.create_task( say_after(1, 'hello')) task2 = tg.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # The await is implicit when the context manager exits. print(f"finished at {time.strftime('%X')}")
時間和輸出應與以前的版本相同。
3.11 版本新增:
asyncio.TaskGroup
。
可等待物件¶
如果一個物件可以在 await
表示式中使用,我們稱其為可等待物件。許多 asyncio API 都被設計為接受可等待物件。
主要有三種類型的可等待物件:協程、任務和 Future。
協程
Python 協程是可等待的,因此可以從其他協程中等待
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested() # will raise a "RuntimeWarning".
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
任務
任務用於併發排程協程。
當使用諸如 asyncio.create_task()
之類的函式將協程包裝到 Task 中時,協程會自動安排為立即執行
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Future
Future
是一個特殊的低階可等待物件,它表示非同步操作的最終結果。
當一個 Future 物件被等待時,這意味著協程將等待,直到 Future 在其他地方被解析。
asyncio 中的 Future 物件是必需的,以便允許基於回撥的程式碼與 async/await 一起使用。
通常無需在應用程式級別的程式碼中建立 Future 物件。
Future 物件,有時由庫和某些 asyncio API 公開,可以被等待
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
返回 Future 物件的低階函式的一個很好的例子是 loop.run_in_executor()
。
建立任務¶
原始碼: Lib/asyncio/tasks.py
- asyncio.create_task(coro, *, name=None, context=None)¶
將 coro 協程包裝到
Task
中並安排其執行。返回 Task 物件。如果 name 不是
None
,則使用Task.set_name()
將其設定為任務的名稱。可選的僅關鍵字 context 引數允許為 coro 指定自定義的
contextvars.Context
來執行。如果沒有提供 context,則建立當前上下文的副本。該任務在
get_running_loop()
返回的迴圈中執行,如果當前執行緒中沒有正在執行的迴圈,則引發RuntimeError
。注意
asyncio.TaskGroup.create_task()
是一種利用結構化併發的新替代方案;它允許等待一組具有強大安全保證的相關任務。重要
儲存對此函式結果的引用,以避免任務在執行過程中消失。事件迴圈僅保留對任務的弱引用。沒有在其他地方引用的任務可能會在任何時候被垃圾回收,甚至在完成之前。對於可靠的“fire-and-forget”後臺任務,請將它們收集到集合中
background_tasks = set() for i in range(10): task = asyncio.create_task(some_coro(param=i)) # Add task to the set. This creates a strong reference. background_tasks.add(task) # To prevent keeping references to finished tasks forever, # make each task remove its own reference from the set after # completion: task.add_done_callback(background_tasks.discard)
3.7 版本新增。
在 3.8 版本中更改: 添加了 name 引數。
在 3.11 版本中更改: 添加了 context 引數。
任務取消¶
可以輕鬆安全地取消任務。取消任務後,將在下次機會在任務中引發 asyncio.CancelledError
。
建議協程使用 try/finally
塊來可靠地執行清理邏輯。如果顯式捕獲了 asyncio.CancelledError
,則通常應在清理完成後傳播它。asyncio.CancelledError
直接繼承 BaseException
,因此大多數程式碼不需要知道它。
像 asyncio.TaskGroup
和 asyncio.timeout()
這樣啟用結構化併發的 asyncio 元件,其內部實現使用了取消機制,如果協程吞噬了 asyncio.CancelledError
,可能會出現異常行為。同樣,使用者程式碼通常不應呼叫 uncancel
。但是,在確實需要抑制 asyncio.CancelledError
的情況下,也必須呼叫 uncancel()
以完全移除取消狀態。
任務組¶
任務組結合了任務建立 API 以及一種方便可靠的方式來等待組中的所有任務完成。
- class asyncio.TaskGroup¶
一個 非同步上下文管理器,持有一組任務。可以使用
create_task()
將任務新增到組中。當上下文管理器退出時,將等待所有任務完成。3.11 版本新增。
- create_task(coro, *, name=None, context=None)¶
在此任務組中建立一個任務。簽名與
asyncio.create_task()
的簽名匹配。如果任務組處於非活動狀態(例如,尚未進入、已經完成或正在關閉過程中),我們將關閉給定的coro
。在 3.13 版本中更改: 如果任務組未處於活動狀態,則關閉給定的協程。
示例
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro(...))
task2 = tg.create_task(another_coro(...))
print(f"Both tasks have completed now: {task1.result()}, {task2.result()}")
async with
語句將等待組中的所有任務完成。在等待時,仍然可以將新任務新增到組中(例如,透過將 tg
傳遞到其中一個協程並在該協程中呼叫 tg.create_task()
)。一旦最後一個任務完成並且 async with
塊退出,則無法將新任務新增到組中。
當組中的任何一個任務首次因 asyncio.CancelledError
之外的異常而失敗時,組中其餘的任務將被取消。此時無法再向組中新增任何新任務。此時,如果 async with
語句的主體仍處於活動狀態(即,尚未呼叫 __aexit__()
),則直接包含 async with
語句的任務也將被取消。產生的 asyncio.CancelledError
將中斷 await
,但它不會從包含的 async with
語句中冒泡出來。
一旦所有任務完成,如果有任何任務因 asyncio.CancelledError
之外的異常而失敗,這些異常將合併到 ExceptionGroup
或 BaseExceptionGroup
中(視情況而定;請參閱其文件),然後引發該異常。
有兩個基本異常會被特殊處理:如果任何任務因 KeyboardInterrupt
或 SystemExit
而失敗,則任務組仍會取消其餘的任務並等待它們,但隨後將重新引發初始的 KeyboardInterrupt
或 SystemExit
,而不是 ExceptionGroup
或 BaseExceptionGroup
。
如果 async with
語句的主體因異常而退出(因此呼叫了帶有異常集的 __aexit__()
),則這被視為與其中一個任務失敗相同:其餘任務將被取消並等待,並且非取消異常將被分組到異常組中並引發。傳遞到 __aexit__()
的異常(除非它是 asyncio.CancelledError
)也會包含在異常組中。對於 KeyboardInterrupt
和 SystemExit
,也會像上一段中那樣進行特殊處理。
任務組會小心地不將用於“喚醒”其 __aexit__()
的內部取消與由其他方對其正在執行的任務發出的取消請求混淆。特別是,當一個任務組在語法上巢狀在另一個任務組中,並且兩者同時在其子任務之一中遇到異常時,內部任務組將處理其異常,然後外部任務組將接收另一個取消並處理其自己的異常。
在任務組在外部被取消並且還必須引發 ExceptionGroup
的情況下,它將呼叫父任務的 cancel()
方法。這確保將在下一個 await
處引發 asyncio.CancelledError
,這樣取消就不會丟失。
任務組會保留 asyncio.Task.cancelling()
報告的取消計數。
在 3.13 版本中更改: 改進了對同時發生的內部和外部取消的處理,並正確保留了取消計數。
終止任務組¶
雖然標準庫本身不支援終止任務組,但可以透過將引發異常的任務新增到任務組並忽略引發的異常來實現終止。
import asyncio
from asyncio import TaskGroup
class TerminateTaskGroup(Exception):
"""Exception raised to terminate a task group."""
async def force_terminate_task_group():
"""Used to force termination of a task group."""
raise TerminateTaskGroup()
async def job(task_id, sleep_time):
print(f'Task {task_id}: start')
await asyncio.sleep(sleep_time)
print(f'Task {task_id}: done')
async def main():
try:
async with TaskGroup() as group:
# spawn some tasks
group.create_task(job(1, 0.5))
group.create_task(job(2, 1.5))
# sleep for 1 second
await asyncio.sleep(1)
# add an exception-raising task to force the group to terminate
group.create_task(force_terminate_task_group())
except* TerminateTaskGroup:
pass
asyncio.run(main())
預期輸出
Task 1: start
Task 2: start
Task 1: done
休眠¶
- coroutine asyncio.sleep(delay, result=None)¶
阻塞 *delay* 秒。
如果提供了 *result*,則當協程完成時,它將返回給呼叫者。
sleep()
總是會掛起當前任務,允許其他任務執行。將延遲設定為 0 提供了一種最佳化路徑,允許其他任務執行。長時間執行的函式可以使用此方法來避免阻塞事件迴圈的整個函式呼叫持續時間。
每秒顯示當前日期 5 秒的協程示例
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
在 3.10 版本中更改: 刪除了 *loop* 引數。
在 3.13 版本中更改: 如果 *delay* 為
nan
,則引發ValueError
。
併發執行任務¶
- awaitable asyncio.gather(*aws, return_exceptions=False)¶
併發執行 aws 序列中的 可等待物件。
如果 aws 中的任何可等待物件是協程,它將自動計劃為任務。
如果所有可等待物件都成功完成,結果將是返回值的聚合列表。結果值的順序與 aws 中可等待物件的順序相對應。
如果 return_exceptions 為
False
(預設值),則第一個引發的異常會立即傳播到等待gather()
的任務。aws 序列中的其他可等待物件不會被取消,並將繼續執行。如果 return_exceptions 為
True
,則異常的處理方式與成功結果相同,並聚合在結果列表中。如果
gather()
被取消,則所有已提交的(尚未完成的)可等待物件也會被取消。如果來自 aws 序列的任何 Task 或 Future 被取消,則將其視為引發
CancelledError
– 在這種情況下,gather()
呼叫不會被取消。這是為了防止取消一個已提交的 Task/Future 導致其他 Task/Future 被取消。注意
建立和併發執行任務並等待其完成的新替代方案是
asyncio.TaskGroup
。對於排程子任務的巢狀,TaskGroup 提供了比 gather 更強的安全保證:如果一個任務(或子任務,即由任務排程的任務)引發異常,TaskGroup 將取消剩餘的計劃任務,而 gather 則不會。示例
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({number}), currently i={i}...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") return f async def main(): # Schedule three calls *concurrently*: L = await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) print(L) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2), currently i=2... # Task B: Compute factorial(3), currently i=2... # Task C: Compute factorial(4), currently i=2... # Task A: factorial(2) = 2 # Task B: Compute factorial(3), currently i=3... # Task C: Compute factorial(4), currently i=3... # Task B: factorial(3) = 6 # Task C: Compute factorial(4), currently i=4... # Task C: factorial(4) = 24 # [2, 6, 24]
注意
如果 return_exceptions 為 false,在 gather() 被標記為完成後取消 gather() 不會取消任何已提交的可等待物件。例如,gather 可以在將異常傳播給呼叫者後被標記為完成,因此,在從 gather 捕獲異常(由其中一個可等待物件引發)後呼叫
gather.cancel()
不會取消任何其他可等待物件。在 3.7 版本中更改:如果 gather 本身被取消,則無論 return_exceptions 如何,都會傳播取消。
在 3.10 版本中更改: 刪除了 *loop* 引數。
自 3.10 版本起棄用:如果未提供位置引數,或者並非所有位置引數都是類似 Future 的物件,並且沒有正在執行的事件迴圈,則會發出棄用警告。
急切任務工廠¶
- asyncio.eager_task_factory(loop, coro, *, name=None, context=None)¶
用於急切任務執行的任務工廠。
當使用此工廠時(透過
loop.set_task_factory(asyncio.eager_task_factory)
),協程在Task
構造期間開始同步執行。只有當任務阻塞時,才會將其排程到事件迴圈上。這可以提高效能,因為避免了同步完成的協程的迴圈排程開銷。一個常見示例是使用快取或記憶化來避免儘可能實際的 I/O 的協程。
注意
立即執行協程是一種語義更改。如果協程返回或引發異常,則永遠不會將任務排程到事件迴圈。如果協程執行阻塞,則將任務排程到事件迴圈。此更改可能會對現有應用程式引入行為更改。例如,應用程式的任務執行順序可能會更改。
在 3.12 版本中新增。
- asyncio.create_eager_task_factory(custom_task_constructor)¶
建立一個急切任務工廠,類似於
eager_task_factory()
,在建立新任務時使用提供的 custom_task_constructor 而不是預設的Task
。custom_task_constructor 必須是具有與
Task.__init__
簽名匹配的簽名的可呼叫物件。該可呼叫物件必須返回一個與asyncio.Task
相容的物件。此函式返回一個可呼叫物件,旨在透過
loop.set_task_factory(factory)
) 用作事件迴圈的任務工廠。在 3.12 版本中新增。
防止取消¶
- awaitableasyncio.shield(aw)¶
-
如果 aw 是協程,它會自動作為 Task 進行排程。
語句
task = asyncio.create_task(something()) res = await shield(task)
等價於
res = await something()
不同之處在於,如果包含它的協程被取消,則在
something()
中執行的 Task 不會被取消。從something()
的角度來看,取消並未發生。儘管其呼叫者仍然被取消,因此 “await” 表示式仍然會引發CancelledError
。如果
something()
透過其他方式(即從其內部)取消,也會取消shield()
。如果希望完全忽略取消(不建議),則
shield()
函式應與 try/except 子句結合使用,如下所示task = asyncio.create_task(something()) try: res = await shield(task) except CancelledError: res = None
重要
儲存傳遞給此函式的任務的引用,以避免任務在執行過程中消失。事件迴圈只保留對任務的弱引用。未在其他地方引用的任務可能隨時被垃圾回收,甚至在完成之前。
在 3.10 版本中更改: 刪除了 *loop* 引數。
自 3.10 版本起棄用:如果 aw 不是類似 Future 的物件,並且沒有正在執行的事件迴圈,則會發出棄用警告。
超時¶
- asyncio.timeout(delay)¶
返回一個 非同步上下文管理器,可用於限制等待某些事物所花費的時間。
delay 可以是
None
,也可以是浮點數/整數的等待秒數。如果 delay 為None
,則不會應用時間限制;如果上下文管理器建立時延遲未知,這可能會很有用。在任何一種情況下,都可以使用
Timeout.reschedule()
在建立後重新排程上下文管理器。示例
async def main(): async with asyncio.timeout(10): await long_running_task()
如果
long_running_task
的完成時間超過 10 秒,則上下文管理器將取消當前任務並內部處理產生的asyncio.CancelledError
,將其轉換為可以捕獲和處理的TimeoutError
。注意
asyncio.timeout()
上下文管理器是將asyncio.CancelledError
轉換為TimeoutError
的原因,這意味著TimeoutError
只能在上下文管理器外部捕獲。捕獲
TimeoutError
的示例async def main(): try: async with asyncio.timeout(10): await long_running_task() except TimeoutError: print("The long operation timed out, but we've handled it.") print("This statement will run regardless.")
asyncio.timeout()
生成的上下文管理器可以重新排程到不同的截止日期並進行檢查。- class asyncio.Timeout(when)¶
一個用於取消過期協程的 非同步上下文管理器。
when
應該是一個絕對時間,表示上下文應該超時的時刻,以事件迴圈的時鐘為準。如果
when
為None
,則永遠不會觸發超時。如果
when < loop.time()
,則超時將在事件迴圈的下一次迭代時觸發。
示例
async def main(): try: # We do not know the timeout when starting, so we pass ``None``. async with asyncio.timeout(None) as cm: # We know the timeout now, so we reschedule it. new_deadline = get_running_loop().time() + 10 cm.reschedule(new_deadline) await long_running_task() except TimeoutError: pass if cm.expired(): print("Looks like we haven't finished on time.")
超時上下文管理器可以安全地巢狀使用。
3.11 版本新增。
- asyncio.timeout_at(when)¶
與
asyncio.timeout()
類似,不同之處在於 when 是停止等待的絕對時間,或None
。示例
async def main(): loop = get_running_loop() deadline = loop.time() + 20 try: async with asyncio.timeout_at(deadline): await long_running_task() except TimeoutError: print("The long operation timed out, but we've handled it.") print("This statement will run regardless.")
3.11 版本新增。
- coroutine asyncio.wait_for(aw, timeout)¶
等待 aw 可等待物件 在超時時間內完成。
如果 aw 是協程,它會自動作為 Task 進行排程。
timeout 可以是
None
,也可以是浮點數或整數,表示等待的秒數。如果 timeout 為None
,則會阻塞直到 future 完成。如果發生超時,它會取消任務並引發
TimeoutError
。該函式將等待直到 future 真正被取消,因此總等待時間可能超過 timeout。如果在取消期間發生異常,則會傳播該異常。
如果等待被取消,則 future aw 也會被取消。
示例
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
在 3.7 版本中更改: 當 aw 因超時而被取消時,
wait_for
會等待 aw 被取消。以前,它會立即引發TimeoutError
。在 3.10 版本中更改: 刪除了 *loop* 引數。
在 3.11 版本中更改: 引發
TimeoutError
而不是asyncio.TimeoutError
。
等待原語¶
- coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)¶
併發執行 aws 可迭代物件中的
Future
和Task
例項,並阻塞直到 return_when 指定的條件成立。aws 可迭代物件不能為空。
返回兩組 Task/Future:
(done, pending)
。用法
done, pending = await asyncio.wait(aws)
timeout (浮點數或整數),如果指定,可用於控制返回前等待的最大秒數。
請注意,此函式不會引發
TimeoutError
。當發生超時時未完成的 Future 或 Task 只會返回到第二個集合中。return_when 指示此函式應何時返回。它必須是以下常量之一
常量
描述
- asyncio.FIRST_COMPLETED¶
當任何 future 完成或被取消時,該函式將返回。
- asyncio.FIRST_EXCEPTION¶
當任何 future 透過引發異常完成時,該函式將返回。如果沒有 future 引發異常,則它等效於
ALL_COMPLETED
。- asyncio.ALL_COMPLETED¶
當所有 future 完成或被取消時,該函式將返回。
與
wait_for()
不同,當發生超時時,wait()
不會取消 future。在 3.10 版本中更改: 刪除了 *loop* 引數。
在 3.11 版本中更改: 禁止直接將協程物件傳遞給
wait()
。在 3.12 版本中更改: 添加了對生成器產生任務的支援。
- asyncio.as_completed(aws, *, timeout=None)¶
併發執行 aws 可迭代物件中的 可等待物件。返回的物件可以迭代以獲取可等待物件完成時的結果。
由
as_completed()
返回的物件可以作為 非同步迭代器 或普通 迭代器 進行迭代。當使用非同步迭代時,如果原始提供的可等待物件是任務或 future,則會產生這些物件。這使得將先前計劃的任務與其結果關聯起來變得容易。例如ipv4_connect = create_task(open_connection("127.0.0.1", 80)) ipv6_connect = create_task(open_connection("::1", 80)) tasks = [ipv4_connect, ipv6_connect] async for earliest_connect in as_completed(tasks): # earliest_connect is done. The result can be obtained by # awaiting it or calling earliest_connect.result() reader, writer = await earliest_connect if earliest_connect is ipv6_connect: print("IPv6 connection established.") else: print("IPv4 connection established.")
在非同步迭代期間,將為提供的不是任務或 future 的可等待物件生成隱式建立的任務。
當用作普通迭代器時,每次迭代都會產生一個新的協程,該協程返回下一個已完成的可等待物件的結果或引發異常。此模式與低於 3.13 的 Python 版本相容
ipv4_connect = create_task(open_connection("127.0.0.1", 80)) ipv6_connect = create_task(open_connection("::1", 80)) tasks = [ipv4_connect, ipv6_connect] for next_connect in as_completed(tasks): # next_connect is not one of the original task objects. It must be # awaited to obtain the result value or raise the exception of the # awaitable that finishes next. reader, writer = await next_connect
如果在所有可等待物件完成之前發生超時,則會引發
TimeoutError
。這由非同步迭代期間的async for
迴圈或普通迭代期間產生的協程引發。在 3.10 版本中更改: 刪除了 *loop* 引數。
自 3.10 版本起棄用: 如果 aws 可迭代物件中並非所有可等待物件都是 Future 類物件,並且沒有正在執行的事件迴圈,則會發出棄用警告。
在 3.12 版本中更改: 添加了對生成器產生任務的支援。
線上程中執行¶
- coroutine asyncio.to_thread(func, /, *args, **kwargs)¶
在單獨的執行緒中非同步執行函式 func。
為此函式提供的任何 *args 和 **kwargs 都會直接傳遞給 func。此外,當前的
contextvars.Context
也會傳播,允許從事件迴圈執行緒訪問單獨執行緒中的上下文變數。返回一個協程,可以等待以獲取 func 的最終結果。
此協程函式主要用於執行 IO 密集型函式/方法,如果它們在主執行緒中執行,則會阻塞事件迴圈。例如
def blocking_io(): print(f"start blocking_io at {time.strftime('%X')}") # Note that time.sleep() can be replaced with any blocking # IO-bound operation, such as file operations. time.sleep(1) print(f"blocking_io complete at {time.strftime('%X')}") async def main(): print(f"started main at {time.strftime('%X')}") await asyncio.gather( asyncio.to_thread(blocking_io), asyncio.sleep(1)) print(f"finished main at {time.strftime('%X')}") asyncio.run(main()) # Expected output: # # started main at 19:50:53 # start blocking_io at 19:50:53 # blocking_io complete at 19:50:54 # finished main at 19:50:54
在任何協程中直接呼叫
blocking_io()
都會在其持續時間內阻塞事件迴圈,從而導致額外的 1 秒執行時間。相反,透過使用asyncio.to_thread()
,我們可以在單獨的執行緒中執行它,而不會阻塞事件迴圈。注意
由於 GIL,
asyncio.to_thread()
通常只能用於使 IO 密集型函式變為非阻塞。但是,對於釋放 GIL 的擴充套件模組或沒有 GIL 的其他 Python 實現,asyncio.to_thread()
也可以用於 CPU 密集型函式。在 3.9 版本中新增。
從其他執行緒排程¶
- asyncio.run_coroutine_threadsafe(coro, loop)¶
將協程提交到給定的事件迴圈中。執行緒安全。
返回一個
concurrent.futures.Future
,用於等待來自另一個作業系統執行緒的結果。此函式旨在從事件迴圈正在執行的作業系統執行緒之外的不同執行緒呼叫。例如:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果在協程中引發了異常,則會通知返回的 Future。它也可以用於取消事件迴圈中的任務。
try: result = future.result(timeout) except TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
請參閱文件的 併發和多執行緒 部分。
與其他的 asyncio 函式不同,此函式需要顯式傳遞 loop 引數。
在版本 3.5.1 中新增。
自省¶
- asyncio.current_task(loop=None)¶
返回當前正在執行的
Task
例項,如果沒有任務正在執行,則返回None
。如果 loop 為
None
,則會使用get_running_loop()
獲取當前迴圈。3.7 版本新增。
- asyncio.all_tasks(loop=None)¶
返回迴圈執行的尚未完成的
Task
物件的集合。如果 loop 為
None
,則會使用get_running_loop()
獲取當前迴圈。3.7 版本新增。
- asyncio.iscoroutine(obj)¶
如果 obj 是一個協程物件,則返回
True
。在版本 3.4 中新增。
任務物件¶
- class asyncio.Task(coro, *, loop=None, name=None, context=None, eager_start=False)¶
一個
類似於 Future
的物件,它執行一個 Python 協程。不是執行緒安全的。任務用於在事件迴圈中執行協程。如果一個協程等待 Future,則任務會暫停協程的執行,並等待 Future 完成。當 Future 完成 時,包裝的協程的執行將恢復。
事件迴圈使用協作排程:一個事件迴圈一次執行一個任務。當一個任務等待 Future 完成時,事件迴圈會執行其他任務、回撥或執行 IO 操作。
使用高階的
asyncio.create_task()
函式建立任務,或使用低階的loop.create_task()
或ensure_future()
函式。不建議手動例項化任務。要取消正在執行的任務,請使用
cancel()
方法。呼叫它將導致任務向包裝的協程丟擲CancelledError
異常。如果協程在取消期間正在等待 Future 物件,則該 Future 物件將被取消。cancelled()
可用於檢查任務是否已取消。如果包裝的協程沒有抑制CancelledError
異常並且實際上被取消了,則該方法返回True
。asyncio.Task
從Future
繼承了除Future.set_result()
和Future.set_exception()
之外的所有 API。可選的僅限關鍵字的 context 引數允許為要執行的 coro 指定自定義
contextvars.Context
。 如果未提供 context,則 Task 會複製當前上下文,然後在複製的上下文中執行其協程。可選的僅限關鍵字的 eager_start 引數允許在任務建立時立即開始執行
asyncio.Task
。 如果設定為True
並且事件迴圈正在執行,則任務將立即開始執行協程,直到協程首次阻塞為止。 如果協程返回或引發而沒有阻塞,則任務將盡早完成,並將跳過排程到事件迴圈。在 3.7 版本中更改: 添加了對
contextvars
模組的支援。在 3.8 版本中更改: 添加了 name 引數。
在 3.10 版本中已棄用: 如果未指定 loop 並且沒有正在執行的事件迴圈,則會發出棄用警告。
在 3.11 版本中更改: 添加了 context 引數。
在 3.12 版本中更改: 添加了 eager_start 引數。
- done()¶
如果任務已完成,則返回
True
。當包裝的協程返回值、引發異常或任務被取消時,任務即被視為完成。
- result()¶
返回任務的結果。
如果任務已完成,則返回包裝的協程的結果(或者,如果協程引發了異常,則會重新引發該異常。)
如果任務已取消,則此方法會引發
CancelledError
異常。如果任務的結果尚不可用,則此方法會引發
InvalidStateError
異常。
- exception()¶
返回任務的異常。
如果包裝的協程引發了異常,則返回該異常。 如果包裝的協程正常返回,則此方法返回
None
。如果任務已取消,則此方法會引發
CancelledError
異常。如果任務尚未完成,則此方法會引發
InvalidStateError
異常。
- add_done_callback(callback, *, context=None)¶
新增一個回撥函式,以便在任務完成時執行。
此方法僅應在低階基於回撥的程式碼中使用。
有關更多詳細資訊,請參閱
Future.add_done_callback()
的文件。
- remove_done_callback(callback)¶
從回撥列表刪除 callback。
此方法僅應在低階基於回撥的程式碼中使用。
有關更多詳細資訊,請參閱
Future.remove_done_callback()
的文件。
- get_stack(*, limit=None)¶
返回此任務的堆疊幀列表。
如果包裝的協程尚未完成,則返回其掛起的堆疊。 如果協程已成功完成或已取消,則返回一個空列表。 如果協程因異常而終止,則返回回溯幀列表。
幀始終按從最舊到最新的順序排列。
對於掛起的協程,僅返回一個堆疊幀。
可選的 limit 引數設定返回的最大幀數;預設情況下,返回所有可用的幀。返回的列表的順序取決於返回的是堆疊還是回溯:返回堆疊的最新幀,但返回回溯的最舊幀。(這與 traceback 模組的行為一致。)
- print_stack(*, limit=None, file=None)¶
列印此任務的堆疊或回溯。
這會產生類似於 traceback 模組的輸出,用於透過
get_stack()
檢索的幀。limit 引數直接傳遞給
get_stack()
。file 引數是寫入輸出的 I/O 流;預設情況下,輸出寫入
sys.stdout
。
- get_coro()¶
返回
Task
封裝的協程物件。注意
對於已經提前完成的任務,這將返回
None
。請參閱 Eager Task Factory。在 3.8 版本中新增。
在 3.12 版本中更改: 新新增的提前任務執行意味著結果可能為
None
。
- get_context()¶
返回與任務關聯的
contextvars.Context
物件。在 3.12 版本中新增。
- get_name()¶
返回任務的名稱。
如果未顯式為任務分配名稱,則預設的 asyncio 任務實現會在例項化期間生成預設名稱。
在 3.8 版本中新增。
- set_name(value)¶
設定任務的名稱。
value 引數可以是任何物件,然後將其轉換為字串。
在預設的任務實現中,該名稱將在任務物件的
repr()
輸出中可見。在 3.8 版本中新增。
- cancel(msg=None)¶
請求取消任務。
這會安排在事件迴圈的下一個週期中將
CancelledError
異常拋入封裝的協程中。然後,協程有機會清理甚至透過使用
try
… …except CancelledError
…finally
塊來抑制異常來拒絕該請求。因此,與Future.cancel()
不同,Task.cancel()
不保證任務將被取消,儘管完全抑制取消並不常見,並且不鼓勵這樣做。如果協程仍然決定抑制取消,則除了捕獲異常外,還需要呼叫Task.uncancel()
。在 3.9 版本中更改: 添加了 msg 引數。
在 3.11 版本中更改:
msg
引數從已取消的任務傳播到其等待者。以下示例說明了協程如何攔擷取消請求
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now
- cancelled()¶
如果任務已取消,則返回
True
。當使用
cancel()
請求取消,並且封裝的協程傳播了拋入其中的CancelledError
異常時,該任務才被視為取消。
- uncancel()¶
減少對此任務的取消請求計數。
返回剩餘的取消請求數。
請注意,一旦取消的任務執行完成,進一步呼叫
uncancel()
將無效。3.11 版本新增。
此方法由 asyncio 的內部使用,不希望終端使用者程式碼使用。特別是,如果任務成功地被取消,這將允許結構化併發的元素(如 任務組 和
asyncio.timeout()
)繼續執行,從而將取消隔離到相應的結構化塊。例如async def make_request_with_timeout(): try: async with asyncio.timeout(1): # Structured block affected by the timeout: await make_request() await make_another_request() except TimeoutError: log("There was a timeout") # Outer code not affected by the timeout: await unrelated_code()
雖然由於超時,
make_request()
和make_another_request()
的塊可能會被取消,但即使在超時的情況下,unrelated_code()
也應繼續執行。這是透過uncancel()
實現的。TaskGroup
上下文管理器以類似的方式使用uncancel()
。如果終端使用者程式碼由於某種原因透過捕獲
CancelledError
來抑制取消,則需要呼叫此方法來刪除取消狀態。當此方法將取消計數減少到零時,該方法會檢查之前的
cancel()
呼叫是否已安排將CancelledError
拋入任務中。如果尚未丟擲,則會取消該安排(透過重置內部_must_cancel
標誌)。
在 3.13 版本中更改: 更改為在達到零時撤銷掛起的取消請求。
- cancelling()¶
返回此任務的掛起取消請求數,即
cancel()
的呼叫次數減去uncancel()
的呼叫次數。請注意,如果此數字大於零,但任務仍在執行,則
cancelled()
仍將返回False
。這是因為可以透過呼叫uncancel()
來降低此數字,這可能會導致任務在取消請求降至零後最終不會被取消。此方法由 asyncio 的內部使用,不希望終端使用者程式碼使用。有關詳細資訊,請參閱
uncancel()
。3.11 版本新增。