asyncio 的概念性概述

這篇 HOWTO 文章旨在幫助您建立一個關於 asyncio 基本工作原理的紮實心智模型,幫助您理解推薦模式的“何為”和“為何”。

您可能對一些關鍵的 asyncio 概念感到好奇。透過閱讀本文,您將能夠輕鬆回答以下問題:

  • 當一個物件被 awaited 時,幕後發生了什麼?

  • asyncio 如何區分不需要 CPU 時間的任務(例如網路請求或檔案讀取)和需要 CPU 時間的任務(例如計算 n 階乘)?

  • 如何編寫一個操作的非同步變體,例如非同步睡眠或資料庫請求。

參見

概念性概述第一部分:高層

在第一部分中,我們將介紹 asyncio 的主要高層構建塊:事件迴圈、協程函式、協程物件、任務和 await

事件迴圈

asyncio 中的一切都相對於事件迴圈發生。它是演出的明星。它就像一個管絃樂隊的指揮。它在幕後管理資源。一些權力明確地授予給它,但它完成工作的許多能力來自於其工蜂的尊重和合作。

更技術性地說,事件迴圈包含了一組要執行的作業。有些作業是您直接新增的,有些是由 asyncio 間接新增的。事件迴圈從其積壓的工作中取出一個作業並呼叫它(或“將控制權交給它”),類似於呼叫一個函式,然後該作業執行。一旦它暫停或完成,它就會將控制權返回給事件迴圈。事件迴圈然後從其池中選擇另一個作業並呼叫它。您可以**大致**將作業集合視為一個佇列:作業被新增,然後一次處理一個,通常(但不總是)按順序進行。這個過程無限重複,事件迴圈無休止地迴圈。如果沒有更多待執行的作業,事件迴圈會足夠聰明地休息,避免不必要地浪費 CPU 週期,並在有更多工作要做時返回。

有效的執行依賴於作業之間的良好共享和協作;一個貪婪的作業可能會霸佔控制權並讓其他作業捱餓,從而使整體的事件迴圈方法變得相當無用。

import asyncio

# This creates an event loop and indefinitely cycles through
# its collection of jobs.
event_loop = asyncio.new_event_loop()
event_loop.run_forever()

非同步函式和協程

這是一個基本的、無聊的 Python 函式

def hello_printer():
    print(
        "Hi, I am a lowly, simple printer, though I have all I "
        "need in life -- \nfresh paper and my dearly beloved octopus "
        "partner in crime."
    )

呼叫常規函式會執行其邏輯或主體

>>> hello_printer()
Hi, I am a lowly, simple printer, though I have all I need in life --
fresh paper and my dearly beloved octopus partner in crime.

async def,與普通的 def 不同,使其成為一個非同步函式(或“協程函式”)。呼叫它會建立並返回一個 協程 物件。

async def loudmouth_penguin(magic_number: int):
    print(
     "I am a super special talking penguin. Far cooler than that printer. "
     f"By the way, my lucky number is: {magic_number}."
    )

呼叫非同步函式 loudmouth_penguin 並不會執行 print 語句;相反,它會建立一個協程物件

>>> loudmouth_penguin(magic_number=3)
<coroutine object loudmouth_penguin at 0x104ed2740>

術語“協程函式”和“協程物件”經常被混淆為協程。這可能會令人困惑!在本文中,協程特指協程物件,或者更確切地說,是 types.CoroutineType 的例項(原生協程)。請注意,協程也可以作為 collections.abc.Coroutine 的例項存在——這是一個對型別檢查很重要的區別。

協程表示函式的主體或邏輯。協程必須顯式啟動;同樣,僅僅建立協程並不會啟動它。值得注意的是,協程可以在函式主體內的各個點暫停和恢復。這種暫停和恢復的能力正是實現非同步行為的關鍵!

協程和協程函式是透過利用 生成器生成器函式 的功能構建的。回想一下,生成器函式是一個 yield 的函式,就像這個一樣

def get_random_number():
    # This would be a bad random number generator!
    print("Hi")
    yield 1
    print("Hello")
    yield 7
    print("Howdy")
    yield 4
    ...

類似於協程函式,呼叫生成器函式不會執行它。相反,它會建立一個生成器物件

>>> get_random_number()
<generator object get_random_number at 0x1048671c0>

您可以透過使用內建函式 next() 來進入生成器的下一個 yield。換句話說,生成器執行,然後暫停。例如:

>>> generator = get_random_number()
>>> next(generator)
Hi
1
>>> next(generator)
Hello
7

任務

粗略地說,任務 是與事件迴圈繫結的協程(而不是協程函式)。任務還維護一個回撥函式列表,當我們討論 await 時,它們的重要性將變得清晰。推薦建立任務的方式是透過 asyncio.create_task()

建立任務會自動安排其執行(透過向事件迴圈的待辦事項列表,即作業集合,新增一個回撥來執行它)。

由於(在每個執行緒中)只有一個事件迴圈,asyncio 會為您處理將任務與事件迴圈關聯起來。因此,無需指定事件迴圈。

coroutine = loudmouth_penguin(magic_number=5)
# This creates a Task object and schedules its execution via the event loop.
task = asyncio.create_task(coroutine)

之前,我們手動建立了事件迴圈並將其設定為永遠執行。實際上,建議使用(也常見)asyncio.run(),它負責管理事件迴圈並確保提供的協程在繼續之前完成。例如,許多非同步程式遵循這種設定

import asyncio

async def main():
    # Perform all sorts of wacky, wild asynchronous things...
    ...

if __name__ == "__main__":
    asyncio.run(main())
    # The program will not reach the following print statement until the
    # coroutine main() finishes.
    print("coroutine main() is done!")

重要的是要意識到任務本身並沒有新增到事件迴圈中,只有對任務的回撥被新增到事件迴圈中。如果事件迴圈呼叫任務物件之前它就被垃圾回收了,這將變得很重要。例如,考慮這個程式

 1async def hello():
 2    print("hello!")
 3
 4async def main():
 5    asyncio.create_task(hello())
 6    # Other asynchronous instructions which run for a while
 7    # and cede control to the event loop...
 8    ...
 9
10asyncio.run(main())

因為第 5 行建立的任務物件沒有引用,所以它_可能_在事件迴圈呼叫它之前被垃圾回收。協程 main() 中的後續指令將控制權交還給事件迴圈,以便它可以呼叫其他作業。當事件迴圈最終嘗試執行任務時,它可能會失敗並發現任務物件不存在!即使協程保留了對任務的引用但在該任務完成之前協程完成,也可能發生這種情況。當協程退出時,區域性變數超出作用域並可能被垃圾回收。asyncio 和 Python 的垃圾回收器在實踐中會非常努力地確保這種情況不會發生。但這並不是粗心大意的理由!

await

await 是一個 Python 關鍵字,通常以兩種不同的方式使用

await task
await coroutine

在一個關鍵方面,await 的行為取決於被等待物件的型別。

等待一個任務將把控制權從當前任務或協程移交給事件迴圈。在放棄控制權的過程中,會發生一些重要的事情。我們將使用以下程式碼示例進行說明

async def plant_a_tree():
    dig_the_hole_task = asyncio.create_task(dig_the_hole())
    await dig_the_hole_task

    # Other instructions associated with planting a tree.
    ...

在這個例子中,想象事件迴圈已將控制權傳遞給協程 plant_a_tree() 的開始。如上所示,協程建立了一個任務,然後等待它。await dig_the_hole_task 指令將一個回撥(它將恢復 plant_a_tree())新增到 dig_the_hole_task 物件的Callbacks列表中。然後,該指令將控制權交給事件迴圈。一段時間後,事件迴圈將控制權傳遞給 dig_the_hole_task,任務將完成它需要做的任何事情。一旦任務完成,它將將其各種回撥新增到事件迴圈中,在這種情況下,是呼叫以恢復 plant_a_tree()

一般來說,當被等待的任務 (dig_the_hole_task) 完成時,原始任務或協程 (plant_a_tree()) 被添加回事件迴圈的待辦事項列表以恢復。

這是一個基本但可靠的心智模型。在實踐中,控制權的移交稍微複雜一些,但不多。在第二部分中,我們將詳細介紹實現這一點所需的細節。

與任務不同,等待協程不會將控制權交還給事件迴圈! 如果先將協程包裝在一個任務中,然後等待該任務,則會交出控制權。await coroutine 的行為實際上與呼叫一個普通的同步 Python 函式相同。考慮以下程式

import asyncio

async def coro_a():
   print("I am coro_a(). Hi!")

async def coro_b():
   print("I am coro_b(). I sure hope no one hogs the event loop...")

async def main():
   task_b = asyncio.create_task(coro_b())
   num_repeats = 3
   for _ in range(num_repeats):
      await coro_a()
   await task_b

asyncio.run(main())

協程 main() 中的第一條語句建立了 task_b 並透過事件迴圈將其排程執行。然後,coro_a() 被重複等待。控制權從未交還給事件迴圈,這就是為什麼我們在 coro_b() 的輸出之前看到所有三個 coro_a() 呼叫的輸出

I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_b(). I sure hope no one hogs the event loop...

如果我們將 await coro_a() 更改為 await asyncio.create_task(coro_a()),行為就會改變。協程 main() 在該語句處將控制權交回給事件迴圈。事件迴圈然後繼續處理其積壓的工作,呼叫 task_b,然後呼叫包裝 coro_a() 的任務,最後恢復協程 main()

I am coro_b(). I sure hope no one hogs the event loop...
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_a(). Hi!

這種 await coroutine 的行為可能會讓很多人感到困惑!這個例子強調了僅僅使用 await coroutine 如何可能無意中霸佔其他任務的控制權,從而有效地使事件迴圈停滯。asyncio.run() 可以透過 debug=True 標誌幫助您檢測此類情況,該標誌啟用 除錯模式。除其他外,它會記錄任何獨佔執行 100 毫秒或更長時間的協程。

這種設計有意犧牲了一些關於 await 用法的概念清晰性,以換取效能提升。每次等待一個任務時,控制權都需要沿著呼叫棧一直傳遞到事件迴圈。這聽起來可能微不足道,但在一個包含許多 await 語句和深層呼叫棧的大型程式中,這種開銷可能會導致顯著的效能拖累。

概念性概述第二部分:核心細節

第二部分詳細介紹了 asyncio 用於管理控制流的機制。這就是奇蹟發生的地方。透過本節,您將瞭解 await 在幕後做了什麼,以及如何建立自己的非同步運算子。

協程的內部工作原理

asyncio 利用四個元件來傳遞控制權。

coroutine.send(arg) 是用於啟動或恢復協程的方法。如果協程已暫停並正在恢復,則引數 arg 將作為最初暫停它的 yield 語句的返回值傳送進去。如果協程是第一次使用(而不是恢復),arg 必須是 None

 1class Rock:
 2    def __await__(self):
 3        value_sent_in = yield 7
 4        print(f"Rock.__await__ resuming with value: {value_sent_in}.")
 5        return value_sent_in
 6
 7async def main():
 8    print("Beginning coroutine main().")
 9    rock = Rock()
10    print("Awaiting rock...")
11    value_from_rock = await rock
12    print(f"Coroutine received value: {value_from_rock} from rock.")
13    return 23
14
15coroutine = main()
16intermediate_result = coroutine.send(None)
17print(f"Coroutine paused and returned intermediate value: {intermediate_result}.")
18
19print(f"Resuming coroutine and sending in value: 42.")
20try:
21    coroutine.send(42)
22except StopIteration as e:
23    returned_value = e.value
24print(f"Coroutine main() finished and provided value: {returned_value}.")

yield,像往常一樣,暫停執行並將控制權返回給呼叫者。在上面的示例中,第 3 行的 yield 由第 11 行的 ... = await rock 呼叫。更廣泛地說,await 呼叫給定物件的 __await__() 方法。await 還做了一件非常特殊的事情:它將接收到的任何 yield 傳播(或“傳遞”)到呼叫鏈的上方。在本例中,即回到第 16 行的 ... = coroutine.send(None)

協程透過第 21 行的 coroutine.send(42) 呼叫恢復。協程從第 3 行 yield(或暫停)的地方繼續執行,並執行其主體中剩餘的語句。當協程完成時,它會引發一個 StopIteration 異常,返回值為 value 屬性。

該程式碼片段產生以下輸出

Beginning coroutine main().
Awaiting rock...
Coroutine paused and returned intermediate value: 7.
Resuming coroutine and sending in value: 42.
Rock.__await__ resuming with value: 42.
Coroutine received value: 42 from rock.
Coroutine main() finished and provided value: 23.

值得在此處暫停片刻,確保您理解了控制流和值傳遞的各種方式。此處涵蓋了許多重要概念,確保您的理解牢固是值得的。

從協程中 yield(或有效放棄控制權)的唯一方法是 await 一個在其 __await__ 方法中 yield 的物件。這可能聽起來很奇怪。您可能會想

1. 那直接在協程函式內部的 yield 呢?協程函式會變成一個 非同步生成器函式,這是一個完全不同的概念。

2. 那在協程函式內部使用 yield from 到一個(普通)生成器呢?那會導致錯誤:SyntaxError: yield from not allowed in a coroutine. 這是為了簡潔而故意設計的——只強制使用協程的一種方式。最初 yield 也被禁止,但為了允許非同步生成器而被重新接受。儘管如此,yield fromawait 實際上做的是同一件事。

期貨

期貨 是一種旨在表示計算狀態和結果的物件。這個術語暗示著未來會發生或尚未發生的事情,而該物件就是一種關注該事物的方式。

一個 Future 有幾個重要的屬性。一個是它的狀態,可以是“pending”(待定)、“cancelled”(已取消)或“done”(已完成)。另一個是它的結果,在狀態轉換為“done”時設定。與協程不同,Future 不代表實際要完成的計算;相反,它代表該計算的狀態和結果,有點像一個狀態燈(紅、黃或綠)或指示器。

asyncio.Task 繼承自 asyncio.Future,以獲得這些各種功能。上一節說任務儲存了一個回撥列表,這並不完全準確。實際上是 Future 類實現了這種邏輯,而 Task 繼承了它。

Future 也可以直接使用(而不是透過任務)。任務在其協程完成時將自己標記為已完成。Future 更具通用性,當您說完成時就會被標記為完成。透過這種方式,它們是靈活的介面,讓您可以為等待和恢復設定自己的條件。

一個自制的 asyncio.sleep

我們將透過一個示例,說明您如何利用 Future 建立自己的非同步睡眠 (async_sleep) 變體,它模仿 asyncio.sleep()

此程式碼片段向事件迴圈註冊了一些任務,然後等待由 asyncio.create_task 建立的任務,該任務包裝了 async_sleep(3) 協程。我們希望該任務僅在三秒鐘過去後完成,但不能阻止其他任務執行。

async def other_work():
    print("I like work. Work work.")

async def main():
    # Add a few other tasks to the event loop, so there's something
    # to do while asynchronously sleeping.
    work_tasks = [
        asyncio.create_task(other_work()),
        asyncio.create_task(other_work()),
        asyncio.create_task(other_work())
    ]
    print(
        "Beginning asynchronous sleep at time: "
        f"{datetime.datetime.now().strftime("%H:%M:%S")}."
    )
    await asyncio.create_task(async_sleep(3))
    print(
        "Done asynchronous sleep at time: "
        f"{datetime.datetime.now().strftime("%H:%M:%S")}."
    )
    # asyncio.gather effectively awaits each task in the collection.
    await asyncio.gather(*work_tasks)

下面,我們使用一個 Future 來實現對該任務何時被標記為完成的自定義控制。如果從未呼叫 future.set_result()(負責將該 Future 標記為完成的方法),那麼該任務將永遠不會完成。我們還藉助了另一個任務,我們稍後會看到,它將監控已過去的時間,並相應地呼叫 future.set_result()

async def async_sleep(seconds: float):
    future = asyncio.Future()
    time_to_wake = time.time() + seconds
    # Add the watcher-task to the event loop.
    watcher_task = asyncio.create_task(_sleep_watcher(future, time_to_wake))
    # Block until the future is marked as done.
    await future

下面,我們使用一個相當簡單的 YieldToEventLoop() 物件,從其 __await__ 方法中 yield,將控制權交還給事件迴圈。這實際上與呼叫 asyncio.sleep(0) 相同,但這種方法提供了更高的清晰度,更不用說在展示如何實現它時使用 asyncio.sleep 有點作弊!

像往常一樣,事件迴圈遍歷其任務,將控制權交給它們,並在它們暫停或完成時收回控制權。watcher_task(執行協程 _sleep_watcher(...))將在事件迴圈的每個完整週期中被呼叫一次。每次恢復時,它都會檢查時間,如果時間不足,它將再次暫停並將控制權交還給事件迴圈。一旦時間充足,_sleep_watcher(...) 將 Future 標記為完成,並透過退出其無限 while 迴圈而完成。鑑於這個輔助任務在事件迴圈的每個週期中只被呼叫一次,您會正確地注意到這個非同步睡眠將睡眠_至少_三秒,而不是正好三秒。請注意,asyncio.sleep 也是如此。

class YieldToEventLoop:
    def __await__(self):
        yield

async def _sleep_watcher(future, time_to_wake):
    while True:
        if time.time() >= time_to_wake:
            # This marks the future as done.
            future.set_result(None)
            break
        else:
            await YieldToEventLoop()

這是完整程式的輸出

$ python custom-async-sleep.py
Beginning asynchronous sleep at time: 14:52:22.
I like work. Work work.
I like work. Work work.
I like work. Work work.
Done asynchronous sleep at time: 14:52:25.

您可能會覺得這種非同步睡眠的實現過於複雜。嗯,確實如此。這個例子旨在透過一個簡單的例子展示 Future 的多功能性,以便可以模仿它來滿足更復雜的需求。作為參考,您可以在沒有 Future 的情況下實現它,如下所示

async def simpler_async_sleep(seconds):
    time_to_wake = time.time() + seconds
    while True:
        if time.time() >= time_to_wake:
            return
        else:
            await YieldToEventLoop()

但目前就這些了。希望您已準備好更自信地投入非同步程式設計,或查閱 文件的其餘部分 中的高階主題。