同步原語¶
原始碼: Lib/asyncio/locks.py
asyncio 同步原語旨在與 threading
模組的同步原語相似,但有兩個重要注意事項:
asyncio 原語不是執行緒安全的,因此不應將它們用於作業系統執行緒同步(為此請使用
threading
);這些同步原語的方法不接受 timeout 引數;請使用
asyncio.wait_for()
函式執行帶超時的操作。
asyncio 具有以下基本同步原語:
Lock¶
- class asyncio.Lock¶
為 asyncio 任務實現互斥鎖。非執行緒安全。
asyncio 鎖可用於保證對共享資源的獨佔訪問。
使用 Lock 的首選方式是
async with
語句lock = asyncio.Lock() # ... later async with lock: # access shared state
這等同於
lock = asyncio.Lock() # ... later await lock.acquire() try: # access shared state finally: lock.release()
版本 3.10 中已更改: 移除了 loop 引數。
- async acquire()¶
獲取鎖。
此方法會等待直到鎖處於 解鎖 狀態,將其設定為 鎖定 狀態並返回
True
。當多個協程在
acquire()
中阻塞等待鎖被解鎖時,最終只有一個協程會繼續執行。獲取鎖是 公平的:繼續執行的協程將是第一個開始等待鎖的協程。
- release()¶
釋放鎖。
當鎖處於 鎖定 狀態時,將其重置為 解鎖 狀態並返回。
如果鎖處於 解鎖 狀態,則會引發
RuntimeError
。
- locked()¶
如果鎖處於 鎖定 狀態,則返回
True
。
Event¶
- class asyncio.Event¶
一個事件物件。非執行緒安全。
asyncio 事件可用於通知多個 asyncio 任務某個事件已發生。
Event 物件管理一個內部標誌,該標誌可以使用
set()
方法設定為 true,並使用clear()
方法重置為 false。wait()
方法會阻塞直到標誌設定為 true。標誌最初設定為 false。版本 3.10 中已更改: 移除了 loop 引數。
示例
async def waiter(event): print('waiting for it ...') await event.wait() print('... got it!') async def main(): # Create an Event object. event = asyncio.Event() # Spawn a Task to wait until 'event' is set. waiter_task = asyncio.create_task(waiter(event)) # Sleep for 1 second and set the event. await asyncio.sleep(1) event.set() # Wait until the waiter task is finished. await waiter_task asyncio.run(main())
- set()¶
設定事件。
所有等待事件被設定的任務將立即被喚醒。
- is_set()¶
如果事件已設定,則返回
True
。
Condition¶
- class asyncio.Condition(lock=None)¶
一個條件物件。非執行緒安全。
asyncio 條件原語可用於任務等待某個事件發生,然後獲得對共享資源的獨佔訪問。
本質上,Condition 物件結合了
Event
和Lock
的功能。可以有多個 Condition 物件共享一個 Lock,這允許在對共享資源特定狀態感興趣的不同任務之間協調對共享資源的獨佔訪問。可選的 lock 引數必須是
Lock
物件或None
。在後一種情況下,會自動建立一個新的 Lock 物件。版本 3.10 中已更改: 移除了 loop 引數。
使用 Condition 的首選方式是
async with
語句cond = asyncio.Condition() # ... later async with cond: await cond.wait()
這等同於
cond = asyncio.Condition() # ... later await cond.acquire() try: await cond.wait() finally: cond.release()
- async acquire()¶
獲取底層鎖。
此方法會等待直到底層鎖處於 解鎖 狀態,將其設定為 鎖定 狀態並返回
True
。
- notify(n=1)¶
喚醒 n 個(預設為 1 個)在此條件上等待的任務。如果等待的任務少於 n 個,則所有任務都會被喚醒。
此方法呼叫之前必須獲取鎖,並在之後儘快釋放。如果使用 未鎖定 的鎖呼叫,則會引發
RuntimeError
錯誤。
- locked()¶
如果底層鎖已獲取,則返回
True
。
- notify_all()¶
喚醒所有在此條件上等待的任務。
此方法的作用類似於
notify()
,但會喚醒所有等待中的任務。此方法呼叫之前必須獲取鎖,並在之後儘快釋放。如果使用 未鎖定 的鎖呼叫,則會引發
RuntimeError
錯誤。
- release()¶
釋放底層鎖。
如果在未鎖定的鎖上呼叫,則會引發
RuntimeError
。
- async wait()¶
等待直到被通知。
如果呼叫此方法時呼叫任務尚未獲取鎖,則會引發
RuntimeError
。此方法會釋放底層鎖,然後阻塞直到被
notify()
或notify_all()
呼叫喚醒。一旦被喚醒,Condition 會重新獲取其鎖,此方法返回True
。請注意,任務 可能會 意外地從此次呼叫返回,這就是為什麼呼叫者應始終重新檢查狀態並準備好再次
wait()
。因此,您可能更喜歡使用wait_for()
。
Semaphore¶
- class asyncio.Semaphore(value=1)¶
一個訊號量物件。非執行緒安全。
訊號量管理一個內部計數器,該計數器在每次
acquire()
呼叫時遞減,並在每次release()
呼叫時遞增。計數器永遠不能低於零;當acquire()
發現它為零時,它會阻塞,等待直到某個任務呼叫release()
。可選的 value 引數給出內部計數器的初始值(預設為
1
)。如果給定值小於0
,則會引發ValueError
。版本 3.10 中已更改: 移除了 loop 引數。
使用 Semaphore 的首選方式是
async with
語句sem = asyncio.Semaphore(10) # ... later async with sem: # work with shared resource
這等同於
sem = asyncio.Semaphore(10) # ... later await sem.acquire() try: # work with shared resource finally: sem.release()
- locked()¶
如果訊號量不能立即獲取,則返回
True
。
- release()¶
釋放訊號量,將內部計數器遞增一。可以喚醒等待獲取訊號量的任務。
與
BoundedSemaphore
不同,Semaphore
允許進行比acquire()
呼叫更多的release()
呼叫。
BoundedSemaphore¶
- class asyncio.BoundedSemaphore(value=1)¶
一個有界訊號量物件。非執行緒安全。
有界訊號量是
Semaphore
的一個版本,它在release()
中如果將內部計數器增加到超過初始 value,則會引發ValueError
。版本 3.10 中已更改: 移除了 loop 引數。
Barrier¶
- class asyncio.Barrier(parties)¶
一個屏障物件。非執行緒安全。
屏障是一種簡單的同步原語,允許阻塞直到 parties 數量的任務都在其上等待。任務可以在
wait()
方法上等待,並將被阻塞直到指定數量的任務最終在wait()
上等待。此時,所有等待的任務將同時解除阻塞。async with
可以用作等待wait()
的替代方法。屏障可以重複使用任意次數。
示例
async def example_barrier(): # barrier with 3 parties b = asyncio.Barrier(3) # create 2 new waiting tasks asyncio.create_task(b.wait()) asyncio.create_task(b.wait()) await asyncio.sleep(0) print(b) # The third .wait() call passes the barrier await b.wait() print(b) print("barrier passed") await asyncio.sleep(0) print(b) asyncio.run(example_barrier())
此示例的結果是
<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]> <asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]> barrier passed <asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>
在 3.11 版本中新增。
- async wait()¶
透過屏障。當所有參與屏障的任務都已呼叫此函式時,它們將同時解除阻塞。
當屏障中等待或阻塞的任務被取消時,該任務會退出屏障,屏障保持相同狀態。如果屏障的狀態是“填充”,則等待任務的數量減少 1。
返回值是一個整數,範圍從 0 到
parties-1
,每個任務都不同。這可以用於選擇一個任務來執行一些特殊的內務管理,例如... async with barrier as position: if position == 0: # Only one task prints this print('End of *draining phase*')
如果屏障在任務等待時被破壞或重置,此方法可能會引發
BrokenBarrierError
異常。如果任務被取消,它可能會引發CancelledError
。
- async reset()¶
將屏障恢復到預設的空狀態。任何等待它的任務都將收到
BrokenBarrierError
異常。如果屏障損壞,最好直接放棄並建立一個新的。
- async abort()¶
將屏障置於損壞狀態。這會導致任何當前或未來對
wait()
的呼叫都以BrokenBarrierError
失敗。例如,如果其中一個任務需要中止,請使用此方法以避免任務無限期等待。
- parties¶
透過屏障所需的任務數量。
- n_waiting¶
在填充過程中當前在屏障中等待的任務數量。
- broken¶
一個布林值,如果屏障處於損壞狀態,則為
True
。
- exception asyncio.BrokenBarrierError¶
當
Barrier
物件被重置或損壞時,會引發此異常,它是RuntimeError
的子類。
版本 3.9 中已更改: 移除了使用 await lock
或 yield from lock
和/或 with
語句(with await lock
, with (yield from lock)
)獲取鎖的方式。請改用 async with lock
。