使用 asyncio 開發

非同步程式設計與經典的“順序”程式設計不同。

本頁列出了常見的錯誤和陷阱,並解釋瞭如何避免它們。

除錯模式

預設情況下,asyncio 在生產模式下執行。 為了方便開發,asyncio 具有除錯模式

有幾種方法可以啟用 asyncio 除錯模式

除了啟用除錯模式,還應考慮

  • asyncio 記錄器的日誌級別設定為 logging.DEBUG,例如,以下程式碼片段可以在應用程式啟動時執行

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 模組以顯示 ResourceWarning 警告。 一種方法是使用 -W default 命令列選項。

啟用除錯模式後

  • asyncio 檢查 未等待的協程 並記錄它們; 這減輕了“忘記等待”的陷阱。

  • 許多非執行緒安全的 asyncio API(例如 loop.call_soon()loop.call_at() 方法)如果從錯誤的執行緒呼叫,則會引發異常。

  • 如果 I/O 選擇器的執行時間過長而無法執行 I/O 操作,則會記錄該執行時間。

  • 記錄執行時間超過 100 毫秒的回撥。loop.slow_callback_duration 屬性可用於設定被認為是“慢”的最小執行持續時間(以秒為單位)。

併發與多執行緒

事件迴圈在一個執行緒(通常是主執行緒)中執行,並在其執行緒中執行所有回撥和任務。當任務在事件迴圈中執行時,同一執行緒中不能執行其他任務。當任務執行 await 表示式時,正在執行的任務將掛起,並且事件迴圈將執行下一個任務。

要從另一個作業系統執行緒排程回撥,應使用 loop.call_soon_threadsafe() 方法。 示例

loop.call_soon_threadsafe(callback, *args)

幾乎所有的 asyncio 物件都不是執行緒安全的,這通常不是問題,除非有程式碼從任務或回撥之外使用它們。如果需要此類程式碼來呼叫底層 asyncio API,則應使用 loop.call_soon_threadsafe() 方法,例如

loop.call_soon_threadsafe(fut.cancel)

要從不同的作業系統執行緒排程協程物件,應使用 run_coroutine_threadsafe() 函式。 它返回一個 concurrent.futures.Future 以訪問結果

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

要處理訊號,事件迴圈必須在主執行緒中執行。

loop.run_in_executor() 方法可以與 concurrent.futures.ThreadPoolExecutor 一起使用,以在不同的作業系統執行緒中執行阻塞程式碼,而不會阻塞事件迴圈執行所在的作業系統執行緒。

目前沒有辦法直接從不同的程序(例如使用 multiprocessing 啟動的程序)排程協程或回撥。事件迴圈方法 部分列出了可以從管道讀取和監視檔案描述符而不會阻塞事件迴圈的 API。 此外,asyncio 的 子程序 API 提供了一種從事件迴圈啟動程序並與其通訊的方法。 最後,上述 loop.run_in_executor() 方法也可以與 concurrent.futures.ProcessPoolExecutor 一起使用,以在不同的程序中執行程式碼。

執行阻塞程式碼

不應直接呼叫阻塞(CPU 密集型)程式碼。例如,如果一個函式執行 1 秒的 CPU 密集型計算,所有併發的 asyncio 任務和 IO 操作將延遲 1 秒。

可以使用執行器在不同的執行緒甚至不同的程序中執行任務,以避免阻塞具有事件迴圈的作業系統執行緒。 有關更多詳細資訊,請參閱 loop.run_in_executor() 方法。

日誌記錄

asyncio 使用 logging 模組,所有日誌記錄都透過 "asyncio" 記錄器執行。

預設日誌級別為 logging.INFO,可以輕鬆調整

logging.getLogger("asyncio").setLevel(logging.WARNING)

網路日誌記錄會阻塞事件迴圈。建議使用單獨的執行緒來處理日誌或使用非阻塞 IO。例如,請參閱處理阻塞的處理程式

檢測從不等待的協程

當呼叫協程函式但未等待時(例如,coro() 而不是 await coro()),或者未使用 asyncio.create_task() 排程協程時,asyncio 將發出 RuntimeWarning

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

輸出

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

除錯模式下的輸出

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的修復方法是等待協程或呼叫 asyncio.create_task() 函式

async def main():
    await test()

檢測從未檢索的異常

如果呼叫了 Future.set_exception(),但從未等待 Future 物件,則異常將永遠不會傳播到使用者程式碼。 在這種情況下,當垃圾回收 Future 物件時,asyncio 將發出一條日誌訊息。

未處理的異常示例

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

輸出

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

啟用除錯模式 以獲取建立任務的回溯

asyncio.run(main(), debug=True)

除錯模式下的輸出

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed