asyncio
的概念性概述¶
這篇 HOWTO 文章旨在幫助您建立一個關於 asyncio
基本工作原理的紮實心智模型,幫助您理解推薦模式的“何為”和“為何”。
您可能對一些關鍵的 asyncio
概念感到好奇。透過閱讀本文,您將能夠輕鬆回答以下問題:
當一個物件被 awaited 時,幕後發生了什麼?
asyncio
如何區分不需要 CPU 時間的任務(例如網路請求或檔案讀取)和需要 CPU 時間的任務(例如計算 n 階乘)?如何編寫一個操作的非同步變體,例如非同步睡眠或資料庫請求。
參見
這篇 HOWTO 文章的靈感來源:Alexander Nordin 的 指南。
Python 核心團隊成員 Łukasz Langa 建立的關於
asyncio
的深入 YouTube 影片教程系列。500 行或更少程式碼:使用 asyncio 協程實現的網路爬蟲,作者 A. Jesse Jiryu Davis 和 Guido van Rossum。
概念性概述第一部分:高層¶
在第一部分中,我們將介紹 asyncio
的主要高層構建塊:事件迴圈、協程函式、協程物件、任務和 await
。
事件迴圈¶
asyncio
中的一切都相對於事件迴圈發生。它是演出的明星。它就像一個管絃樂隊的指揮。它在幕後管理資源。一些權力明確地授予給它,但它完成工作的許多能力來自於其工蜂的尊重和合作。
更技術性地說,事件迴圈包含了一組要執行的作業。有些作業是您直接新增的,有些是由 asyncio
間接新增的。事件迴圈從其積壓的工作中取出一個作業並呼叫它(或“將控制權交給它”),類似於呼叫一個函式,然後該作業執行。一旦它暫停或完成,它就會將控制權返回給事件迴圈。事件迴圈然後從其池中選擇另一個作業並呼叫它。您可以**大致**將作業集合視為一個佇列:作業被新增,然後一次處理一個,通常(但不總是)按順序進行。這個過程無限重複,事件迴圈無休止地迴圈。如果沒有更多待執行的作業,事件迴圈會足夠聰明地休息,避免不必要地浪費 CPU 週期,並在有更多工作要做時返回。
有效的執行依賴於作業之間的良好共享和協作;一個貪婪的作業可能會霸佔控制權並讓其他作業捱餓,從而使整體的事件迴圈方法變得相當無用。
import asyncio
# This creates an event loop and indefinitely cycles through
# its collection of jobs.
event_loop = asyncio.new_event_loop()
event_loop.run_forever()
非同步函式和協程¶
這是一個基本的、無聊的 Python 函式
def hello_printer():
print(
"Hi, I am a lowly, simple printer, though I have all I "
"need in life -- \nfresh paper and my dearly beloved octopus "
"partner in crime."
)
呼叫常規函式會執行其邏輯或主體
>>> hello_printer()
Hi, I am a lowly, simple printer, though I have all I need in life --
fresh paper and my dearly beloved octopus partner in crime.
async def,與普通的 def
不同,使其成為一個非同步函式(或“協程函式”)。呼叫它會建立並返回一個 協程 物件。
async def loudmouth_penguin(magic_number: int):
print(
"I am a super special talking penguin. Far cooler than that printer. "
f"By the way, my lucky number is: {magic_number}."
)
呼叫非同步函式 loudmouth_penguin
並不會執行 print 語句;相反,它會建立一個協程物件
>>> loudmouth_penguin(magic_number=3)
<coroutine object loudmouth_penguin at 0x104ed2740>
術語“協程函式”和“協程物件”經常被混淆為協程。這可能會令人困惑!在本文中,協程特指協程物件,或者更確切地說,是 types.CoroutineType
的例項(原生協程)。請注意,協程也可以作為 collections.abc.Coroutine
的例項存在——這是一個對型別檢查很重要的區別。
協程表示函式的主體或邏輯。協程必須顯式啟動;同樣,僅僅建立協程並不會啟動它。值得注意的是,協程可以在函式主體內的各個點暫停和恢復。這種暫停和恢復的能力正是實現非同步行為的關鍵!
協程和協程函式是透過利用 生成器 和 生成器函式 的功能構建的。回想一下,生成器函式是一個 yield
的函式,就像這個一樣
def get_random_number():
# This would be a bad random number generator!
print("Hi")
yield 1
print("Hello")
yield 7
print("Howdy")
yield 4
...
類似於協程函式,呼叫生成器函式不會執行它。相反,它會建立一個生成器物件
>>> get_random_number()
<generator object get_random_number at 0x1048671c0>
您可以透過使用內建函式 next()
來進入生成器的下一個 yield
。換句話說,生成器執行,然後暫停。例如:
>>> generator = get_random_number()
>>> next(generator)
Hi
1
>>> next(generator)
Hello
7
任務¶
粗略地說,任務 是與事件迴圈繫結的協程(而不是協程函式)。任務還維護一個回撥函式列表,當我們討論 await
時,它們的重要性將變得清晰。推薦建立任務的方式是透過 asyncio.create_task()
。
建立任務會自動安排其執行(透過向事件迴圈的待辦事項列表,即作業集合,新增一個回撥來執行它)。
由於(在每個執行緒中)只有一個事件迴圈,asyncio
會為您處理將任務與事件迴圈關聯起來。因此,無需指定事件迴圈。
coroutine = loudmouth_penguin(magic_number=5)
# This creates a Task object and schedules its execution via the event loop.
task = asyncio.create_task(coroutine)
之前,我們手動建立了事件迴圈並將其設定為永遠執行。實際上,建議使用(也常見)asyncio.run()
,它負責管理事件迴圈並確保提供的協程在繼續之前完成。例如,許多非同步程式遵循這種設定
import asyncio
async def main():
# Perform all sorts of wacky, wild asynchronous things...
...
if __name__ == "__main__":
asyncio.run(main())
# The program will not reach the following print statement until the
# coroutine main() finishes.
print("coroutine main() is done!")
重要的是要意識到任務本身並沒有新增到事件迴圈中,只有對任務的回撥被新增到事件迴圈中。如果事件迴圈呼叫任務物件之前它就被垃圾回收了,這將變得很重要。例如,考慮這個程式
1async def hello():
2 print("hello!")
3
4async def main():
5 asyncio.create_task(hello())
6 # Other asynchronous instructions which run for a while
7 # and cede control to the event loop...
8 ...
9
10asyncio.run(main())
因為第 5 行建立的任務物件沒有引用,所以它_可能_在事件迴圈呼叫它之前被垃圾回收。協程 main()
中的後續指令將控制權交還給事件迴圈,以便它可以呼叫其他作業。當事件迴圈最終嘗試執行任務時,它可能會失敗並發現任務物件不存在!即使協程保留了對任務的引用但在該任務完成之前協程完成,也可能發生這種情況。當協程退出時,區域性變數超出作用域並可能被垃圾回收。asyncio
和 Python 的垃圾回收器在實踐中會非常努力地確保這種情況不會發生。但這並不是粗心大意的理由!
await¶
await
是一個 Python 關鍵字,通常以兩種不同的方式使用
await task
await coroutine
在一個關鍵方面,await
的行為取決於被等待物件的型別。
等待一個任務將把控制權從當前任務或協程移交給事件迴圈。在放棄控制權的過程中,會發生一些重要的事情。我們將使用以下程式碼示例進行說明
async def plant_a_tree():
dig_the_hole_task = asyncio.create_task(dig_the_hole())
await dig_the_hole_task
# Other instructions associated with planting a tree.
...
在這個例子中,想象事件迴圈已將控制權傳遞給協程 plant_a_tree()
的開始。如上所示,協程建立了一個任務,然後等待它。await dig_the_hole_task
指令將一個回撥(它將恢復 plant_a_tree()
)新增到 dig_the_hole_task
物件的Callbacks列表中。然後,該指令將控制權交給事件迴圈。一段時間後,事件迴圈將控制權傳遞給 dig_the_hole_task
,任務將完成它需要做的任何事情。一旦任務完成,它將將其各種回撥新增到事件迴圈中,在這種情況下,是呼叫以恢復 plant_a_tree()
。
一般來說,當被等待的任務 (dig_the_hole_task
) 完成時,原始任務或協程 (plant_a_tree()
) 被添加回事件迴圈的待辦事項列表以恢復。
這是一個基本但可靠的心智模型。在實踐中,控制權的移交稍微複雜一些,但不多。在第二部分中,我們將詳細介紹實現這一點所需的細節。
與任務不同,等待協程不會將控制權交還給事件迴圈! 如果先將協程包裝在一個任務中,然後等待該任務,則會交出控制權。await coroutine
的行為實際上與呼叫一個普通的同步 Python 函式相同。考慮以下程式
import asyncio
async def coro_a():
print("I am coro_a(). Hi!")
async def coro_b():
print("I am coro_b(). I sure hope no one hogs the event loop...")
async def main():
task_b = asyncio.create_task(coro_b())
num_repeats = 3
for _ in range(num_repeats):
await coro_a()
await task_b
asyncio.run(main())
協程 main()
中的第一條語句建立了 task_b
並透過事件迴圈將其排程執行。然後,coro_a()
被重複等待。控制權從未交還給事件迴圈,這就是為什麼我們在 coro_b()
的輸出之前看到所有三個 coro_a()
呼叫的輸出
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_b(). I sure hope no one hogs the event loop...
如果我們將 await coro_a()
更改為 await asyncio.create_task(coro_a())
,行為就會改變。協程 main()
在該語句處將控制權交回給事件迴圈。事件迴圈然後繼續處理其積壓的工作,呼叫 task_b
,然後呼叫包裝 coro_a()
的任務,最後恢復協程 main()
。
I am coro_b(). I sure hope no one hogs the event loop...
I am coro_a(). Hi!
I am coro_a(). Hi!
I am coro_a(). Hi!
這種 await coroutine
的行為可能會讓很多人感到困惑!這個例子強調了僅僅使用 await coroutine
如何可能無意中霸佔其他任務的控制權,從而有效地使事件迴圈停滯。asyncio.run()
可以透過 debug=True
標誌幫助您檢測此類情況,該標誌啟用 除錯模式。除其他外,它會記錄任何獨佔執行 100 毫秒或更長時間的協程。
這種設計有意犧牲了一些關於 await
用法的概念清晰性,以換取效能提升。每次等待一個任務時,控制權都需要沿著呼叫棧一直傳遞到事件迴圈。這聽起來可能微不足道,但在一個包含許多 await
語句和深層呼叫棧的大型程式中,這種開銷可能會導致顯著的效能拖累。
概念性概述第二部分:核心細節¶
第二部分詳細介紹了 asyncio
用於管理控制流的機制。這就是奇蹟發生的地方。透過本節,您將瞭解 await
在幕後做了什麼,以及如何建立自己的非同步運算子。
協程的內部工作原理¶
asyncio
利用四個元件來傳遞控制權。
coroutine.send(arg)
是用於啟動或恢復協程的方法。如果協程已暫停並正在恢復,則引數 arg
將作為最初暫停它的 yield
語句的返回值傳送進去。如果協程是第一次使用(而不是恢復),arg
必須是 None
。
1class Rock:
2 def __await__(self):
3 value_sent_in = yield 7
4 print(f"Rock.__await__ resuming with value: {value_sent_in}.")
5 return value_sent_in
6
7async def main():
8 print("Beginning coroutine main().")
9 rock = Rock()
10 print("Awaiting rock...")
11 value_from_rock = await rock
12 print(f"Coroutine received value: {value_from_rock} from rock.")
13 return 23
14
15coroutine = main()
16intermediate_result = coroutine.send(None)
17print(f"Coroutine paused and returned intermediate value: {intermediate_result}.")
18
19print(f"Resuming coroutine and sending in value: 42.")
20try:
21 coroutine.send(42)
22except StopIteration as e:
23 returned_value = e.value
24print(f"Coroutine main() finished and provided value: {returned_value}.")
yield,像往常一樣,暫停執行並將控制權返回給呼叫者。在上面的示例中,第 3 行的 yield
由第 11 行的 ... = await rock
呼叫。更廣泛地說,await
呼叫給定物件的 __await__()
方法。await
還做了一件非常特殊的事情:它將接收到的任何 yield
傳播(或“傳遞”)到呼叫鏈的上方。在本例中,即回到第 16 行的 ... = coroutine.send(None)
。
協程透過第 21 行的 coroutine.send(42)
呼叫恢復。協程從第 3 行 yield
(或暫停)的地方繼續執行,並執行其主體中剩餘的語句。當協程完成時,它會引發一個 StopIteration
異常,返回值為 value
屬性。
該程式碼片段產生以下輸出
Beginning coroutine main().
Awaiting rock...
Coroutine paused and returned intermediate value: 7.
Resuming coroutine and sending in value: 42.
Rock.__await__ resuming with value: 42.
Coroutine received value: 42 from rock.
Coroutine main() finished and provided value: 23.
值得在此處暫停片刻,確保您理解了控制流和值傳遞的各種方式。此處涵蓋了許多重要概念,確保您的理解牢固是值得的。
從協程中 yield
(或有效放棄控制權)的唯一方法是 await
一個在其 __await__
方法中 yield
的物件。這可能聽起來很奇怪。您可能會想
1. 那直接在協程函式內部的
yield
呢?協程函式會變成一個 非同步生成器函式,這是一個完全不同的概念。2. 那在協程函式內部使用 yield from 到一個(普通)生成器呢?那會導致錯誤:
SyntaxError: yield from not allowed in a coroutine.
這是為了簡潔而故意設計的——只強制使用協程的一種方式。最初yield
也被禁止,但為了允許非同步生成器而被重新接受。儘管如此,yield from
和await
實際上做的是同一件事。
期貨¶
期貨 是一種旨在表示計算狀態和結果的物件。這個術語暗示著未來會發生或尚未發生的事情,而該物件就是一種關注該事物的方式。
一個 Future 有幾個重要的屬性。一個是它的狀態,可以是“pending”(待定)、“cancelled”(已取消)或“done”(已完成)。另一個是它的結果,在狀態轉換為“done”時設定。與協程不同,Future 不代表實際要完成的計算;相反,它代表該計算的狀態和結果,有點像一個狀態燈(紅、黃或綠)或指示器。
asyncio.Task
繼承自 asyncio.Future
,以獲得這些各種功能。上一節說任務儲存了一個回撥列表,這並不完全準確。實際上是 Future
類實現了這種邏輯,而 Task
繼承了它。
Future 也可以直接使用(而不是透過任務)。任務在其協程完成時將自己標記為已完成。Future 更具通用性,當您說完成時就會被標記為完成。透過這種方式,它們是靈活的介面,讓您可以為等待和恢復設定自己的條件。
一個自制的 asyncio.sleep¶
我們將透過一個示例,說明您如何利用 Future 建立自己的非同步睡眠 (async_sleep
) 變體,它模仿 asyncio.sleep()
。
此程式碼片段向事件迴圈註冊了一些任務,然後等待由 asyncio.create_task
建立的任務,該任務包裝了 async_sleep(3)
協程。我們希望該任務僅在三秒鐘過去後完成,但不能阻止其他任務執行。
async def other_work():
print("I like work. Work work.")
async def main():
# Add a few other tasks to the event loop, so there's something
# to do while asynchronously sleeping.
work_tasks = [
asyncio.create_task(other_work()),
asyncio.create_task(other_work()),
asyncio.create_task(other_work())
]
print(
"Beginning asynchronous sleep at time: "
f"{datetime.datetime.now().strftime("%H:%M:%S")}."
)
await asyncio.create_task(async_sleep(3))
print(
"Done asynchronous sleep at time: "
f"{datetime.datetime.now().strftime("%H:%M:%S")}."
)
# asyncio.gather effectively awaits each task in the collection.
await asyncio.gather(*work_tasks)
下面,我們使用一個 Future 來實現對該任務何時被標記為完成的自定義控制。如果從未呼叫 future.set_result()
(負責將該 Future 標記為完成的方法),那麼該任務將永遠不會完成。我們還藉助了另一個任務,我們稍後會看到,它將監控已過去的時間,並相應地呼叫 future.set_result()
。
async def async_sleep(seconds: float):
future = asyncio.Future()
time_to_wake = time.time() + seconds
# Add the watcher-task to the event loop.
watcher_task = asyncio.create_task(_sleep_watcher(future, time_to_wake))
# Block until the future is marked as done.
await future
下面,我們使用一個相當簡單的 YieldToEventLoop()
物件,從其 __await__
方法中 yield
,將控制權交還給事件迴圈。這實際上與呼叫 asyncio.sleep(0)
相同,但這種方法提供了更高的清晰度,更不用說在展示如何實現它時使用 asyncio.sleep
有點作弊!
像往常一樣,事件迴圈遍歷其任務,將控制權交給它們,並在它們暫停或完成時收回控制權。watcher_task
(執行協程 _sleep_watcher(...)
)將在事件迴圈的每個完整週期中被呼叫一次。每次恢復時,它都會檢查時間,如果時間不足,它將再次暫停並將控制權交還給事件迴圈。一旦時間充足,_sleep_watcher(...)
將 Future 標記為完成,並透過退出其無限 while
迴圈而完成。鑑於這個輔助任務在事件迴圈的每個週期中只被呼叫一次,您會正確地注意到這個非同步睡眠將睡眠_至少_三秒,而不是正好三秒。請注意,asyncio.sleep
也是如此。
class YieldToEventLoop:
def __await__(self):
yield
async def _sleep_watcher(future, time_to_wake):
while True:
if time.time() >= time_to_wake:
# This marks the future as done.
future.set_result(None)
break
else:
await YieldToEventLoop()
這是完整程式的輸出
$ python custom-async-sleep.py
Beginning asynchronous sleep at time: 14:52:22.
I like work. Work work.
I like work. Work work.
I like work. Work work.
Done asynchronous sleep at time: 14:52:25.
您可能會覺得這種非同步睡眠的實現過於複雜。嗯,確實如此。這個例子旨在透過一個簡單的例子展示 Future 的多功能性,以便可以模仿它來滿足更復雜的需求。作為參考,您可以在沒有 Future 的情況下實現它,如下所示
async def simpler_async_sleep(seconds):
time_to_wake = time.time() + seconds
while True:
if time.time() >= time_to_wake:
return
else:
await YieldToEventLoop()
但目前就這些了。希望您已準備好更自信地投入非同步程式設計,或查閱 文件的其餘部分
中的高階主題。