同步原語

原始碼: Lib/asyncio/locks.py


asyncio 同步原語旨在與 threading 模組的同步原語相似,但有兩個重要注意事項:

  • asyncio 原語不是執行緒安全的,因此不應將它們用於作業系統執行緒同步(為此請使用 threading);

  • 這些同步原語的方法不接受 timeout 引數;請使用 asyncio.wait_for() 函式執行帶超時的操作。

asyncio 具有以下基本同步原語:


Lock

class asyncio.Lock

為 asyncio 任務實現互斥鎖。非執行緒安全。

asyncio 鎖可用於保證對共享資源的獨佔訪問。

使用 Lock 的首選方式是 async with 語句

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

這等同於

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

版本 3.10 中已更改: 移除了 loop 引數。

async acquire()

獲取鎖。

此方法會等待直到鎖處於 解鎖 狀態,將其設定為 鎖定 狀態並返回 True

當多個協程在 acquire() 中阻塞等待鎖被解鎖時,最終只有一個協程會繼續執行。

獲取鎖是 公平的:繼續執行的協程將是第一個開始等待鎖的協程。

release()

釋放鎖。

當鎖處於 鎖定 狀態時,將其重置為 解鎖 狀態並返回。

如果鎖處於 解鎖 狀態,則會引發 RuntimeError

locked()

如果鎖處於 鎖定 狀態,則返回 True

Event

class asyncio.Event

一個事件物件。非執行緒安全。

asyncio 事件可用於通知多個 asyncio 任務某個事件已發生。

Event 物件管理一個內部標誌,該標誌可以使用 set() 方法設定為 true,並使用 clear() 方法重置為 falsewait() 方法會阻塞直到標誌設定為 true。標誌最初設定為 false

版本 3.10 中已更改: 移除了 loop 引數。

示例

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
async wait()

等待直到事件被設定。

如果事件已設定,則立即返回 True。否則,阻塞直到另一個任務呼叫 set()

set()

設定事件。

所有等待事件被設定的任務將立即被喚醒。

clear()

清除(取消設定)事件。

後續等待 wait() 的任務現在將阻塞,直到再次呼叫 set() 方法。

is_set()

如果事件已設定,則返回 True

Condition

class asyncio.Condition(lock=None)

一個條件物件。非執行緒安全。

asyncio 條件原語可用於任務等待某個事件發生,然後獲得對共享資源的獨佔訪問。

本質上,Condition 物件結合了 EventLock 的功能。可以有多個 Condition 物件共享一個 Lock,這允許在對共享資源特定狀態感興趣的不同任務之間協調對共享資源的獨佔訪問。

可選的 lock 引數必須是 Lock 物件或 None。在後一種情況下,會自動建立一個新的 Lock 物件。

版本 3.10 中已更改: 移除了 loop 引數。

使用 Condition 的首選方式是 async with 語句

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

這等同於

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
async acquire()

獲取底層鎖。

此方法會等待直到底層鎖處於 解鎖 狀態,將其設定為 鎖定 狀態並返回 True

notify(n=1)

喚醒 n 個(預設為 1 個)在此條件上等待的任務。如果等待的任務少於 n 個,則所有任務都會被喚醒。

此方法呼叫之前必須獲取鎖,並在之後儘快釋放。如果使用 未鎖定 的鎖呼叫,則會引發 RuntimeError 錯誤。

locked()

如果底層鎖已獲取,則返回 True

notify_all()

喚醒所有在此條件上等待的任務。

此方法的作用類似於 notify(),但會喚醒所有等待中的任務。

此方法呼叫之前必須獲取鎖,並在之後儘快釋放。如果使用 未鎖定 的鎖呼叫,則會引發 RuntimeError 錯誤。

release()

釋放底層鎖。

如果在未鎖定的鎖上呼叫,則會引發 RuntimeError

async wait()

等待直到被通知。

如果呼叫此方法時呼叫任務尚未獲取鎖,則會引發 RuntimeError

此方法會釋放底層鎖,然後阻塞直到被 notify()notify_all() 呼叫喚醒。一旦被喚醒,Condition 會重新獲取其鎖,此方法返回 True

請注意,任務 可能會 意外地從此次呼叫返回,這就是為什麼呼叫者應始終重新檢查狀態並準備好再次 wait()。因此,您可能更喜歡使用 wait_for()

async wait_for(predicate)

等待直到謂詞變為 true

謂詞必須是一個可呼叫物件,其結果將被解釋為布林值。此方法將重複呼叫 wait() 直到謂詞評估為 true。最終值是返回值。

Semaphore

class asyncio.Semaphore(value=1)

一個訊號量物件。非執行緒安全。

訊號量管理一個內部計數器,該計數器在每次 acquire() 呼叫時遞減,並在每次 release() 呼叫時遞增。計數器永遠不能低於零;當 acquire() 發現它為零時,它會阻塞,等待直到某個任務呼叫 release()

可選的 value 引數給出內部計數器的初始值(預設為 1)。如果給定值小於 0,則會引發 ValueError

版本 3.10 中已更改: 移除了 loop 引數。

使用 Semaphore 的首選方式是 async with 語句

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

這等同於

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
async acquire()

獲取訊號量。

如果內部計數器大於零,則將其遞減一併立即返回 True。如果為零,則等待直到呼叫 release() 並返回 True

locked()

如果訊號量不能立即獲取,則返回 True

release()

釋放訊號量,將內部計數器遞增一。可以喚醒等待獲取訊號量的任務。

BoundedSemaphore 不同,Semaphore 允許進行比 acquire() 呼叫更多的 release() 呼叫。

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)

一個有界訊號量物件。非執行緒安全。

有界訊號量是 Semaphore 的一個版本,它在 release() 中如果將內部計數器增加到超過初始 value,則會引發 ValueError

版本 3.10 中已更改: 移除了 loop 引數。

Barrier

class asyncio.Barrier(parties)

一個屏障物件。非執行緒安全。

屏障是一種簡單的同步原語,允許阻塞直到 parties 數量的任務都在其上等待。任務可以在 wait() 方法上等待,並將被阻塞直到指定數量的任務最終在 wait() 上等待。此時,所有等待的任務將同時解除阻塞。

async with 可以用作等待 wait() 的替代方法。

屏障可以重複使用任意次數。

示例

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

此示例的結果是

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

在 3.11 版本中新增。

async wait()

透過屏障。當所有參與屏障的任務都已呼叫此函式時,它們將同時解除阻塞。

當屏障中等待或阻塞的任務被取消時,該任務會退出屏障,屏障保持相同狀態。如果屏障的狀態是“填充”,則等待任務的數量減少 1。

返回值是一個整數,範圍從 0 到 parties-1,每個任務都不同。這可以用於選擇一個任務來執行一些特殊的內務管理,例如

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

如果屏障在任務等待時被破壞或重置,此方法可能會引發 BrokenBarrierError 異常。如果任務被取消,它可能會引發 CancelledError

async reset()

將屏障恢復到預設的空狀態。任何等待它的任務都將收到 BrokenBarrierError 異常。

如果屏障損壞,最好直接放棄並建立一個新的。

async abort()

將屏障置於損壞狀態。這會導致任何當前或未來對 wait() 的呼叫都以 BrokenBarrierError 失敗。例如,如果其中一個任務需要中止,請使用此方法以避免任務無限期等待。

parties

透過屏障所需的任務數量。

n_waiting

在填充過程中當前在屏障中等待的任務數量。

broken

一個布林值,如果屏障處於損壞狀態,則為 True

exception asyncio.BrokenBarrierError

Barrier 物件被重置或損壞時,會引發此異常,它是 RuntimeError 的子類。


版本 3.9 中已更改: 移除了使用 await lockyield from lock 和/或 with 語句(with await lock, with (yield from lock))獲取鎖的方式。請改用 async with lock