同步原語

原始碼: Lib/asyncio/locks.py


asyncio 同步原語的設計與 threading 模組中的原語類似,但有兩個重要的注意事項:

  • asyncio 原語不是執行緒安全的,因此不應用於作業系統執行緒同步(為此請使用 threading);

  • 這些同步原語的方法不接受 timeout 引數; 使用 asyncio.wait_for() 函式來執行帶超時的操作。

asyncio 具有以下基本同步原語:


class asyncio.Lock

為 asyncio 任務實現互斥鎖。 不是執行緒安全的。

asyncio 鎖可用於保證對共享資源的獨佔訪問。

使用 Lock 的首選方式是 async with 語句

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

它等效於

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

在 3.10 版本中更改: 刪除了 loop 引數。

coroutine acquire()

獲取鎖。

此方法會等待直到鎖被解鎖,將其設定為鎖定並返回 True

當多個協程在 acquire() 中被阻塞,等待鎖被解鎖時,最終只會有一個協程繼續執行。

獲取鎖是公平的:繼續執行的協程將是第一個開始等待鎖的協程。

release()

釋放鎖。

當鎖為鎖定時,將其重置為解鎖並返回。

如果鎖為解鎖,則會引發 RuntimeError

locked()

如果鎖為鎖定,則返回 True

事件

class asyncio.Event

一個事件物件。不是執行緒安全的。

asyncio 事件可用於通知多個 asyncio 任務已發生某些事件。

Event 物件管理一個內部標誌,可以使用 set() 方法設定為 true,並使用 clear() 方法重置為 falsewait() 方法會阻塞,直到該標誌設定為 true。該標誌最初設定為 false

在 3.10 版本中更改: 刪除了 loop 引數。

示例

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

等待直到事件被設定。

如果事件已設定,則立即返回 True。否則,將阻塞,直到另一個任務呼叫 set()

set()

設定事件。

所有等待設定事件的任務都將被立即喚醒。

clear()

清除(取消設定)事件。

等待 wait() 的任務現在將阻塞,直到再次呼叫 set() 方法。

is_set()

如果事件已設定,則返回 True

條件

class asyncio.Condition(lock=None)

一個條件物件。不是執行緒安全的。

asyncio 條件原語可供任務使用,以等待某些事件發生,然後獲得對共享資源的獨佔訪問許可權。

本質上,Condition 物件結合了 EventLock 的功能。可以讓多個 Condition 物件共享一個 Lock,從而允許在對共享資源的特定狀態感興趣的不同任務之間協調對共享資源的獨佔訪問。

可選的 lock 引數必須是 Lock 物件或 None。在後一種情況下,會自動建立一個新的 Lock 物件。

在 3.10 版本中更改: 刪除了 loop 引數。

使用 Condition 的首選方法是 async with 語句

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

它等效於

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

獲取底層鎖。

此方法會等待直到底層鎖被解鎖,將其設定為鎖定並返回 True

notify(n=1)

喚醒 n 個(預設為 1 個)等待此條件的任務。如果等待的任務少於 n 個,則會喚醒所有任務。

必須在呼叫此方法之前獲取鎖,並在之後立即釋放。如果在解鎖鎖的情況下呼叫,則會引發 RuntimeError 錯誤。

locked()

如果獲取了底層鎖,則返回 True

notify_all()

喚醒所有等待此條件的任務。

此方法的行為類似於 notify(),但會喚醒所有等待的任務。

必須在呼叫此方法之前獲取鎖,並在之後立即釋放。如果在解鎖鎖的情況下呼叫,則會引發 RuntimeError 錯誤。

release()

釋放底層鎖。

當在解鎖的鎖上呼叫時,會引發 RuntimeError

coroutine wait()

等待直到收到通知。

如果呼叫此方法時呼叫任務尚未獲取鎖,則會引發 RuntimeError

此方法會釋放底層鎖,然後阻塞,直到被 notify()notify_all() 呼叫喚醒。一旦被喚醒,Condition 會重新獲取其鎖,並且此方法返回 True

請注意,任務可能會從此次呼叫中意外返回,這就是為什麼呼叫者應該始終重新檢查狀態,並準備好再次wait()的原因。因此,您可能更傾向於使用wait_for()

協程 wait_for(predicate)

等待直到謂詞變為true

謂詞必須是一個可呼叫物件,其結果將被解釋為布林值。該方法將重複wait(),直到謂詞求值為true。最終值是返回值。

訊號量

class asyncio.Semaphore(value=1)

一個訊號量物件。非執行緒安全。

訊號量管理一個內部計數器,每次呼叫acquire()時計數器會遞減,每次呼叫release()時計數器會遞增。計數器永遠不會低於零;當acquire()發現計數器為零時,它會阻塞,等待直到某個任務呼叫release()

可選的value引數給出內部計數器的初始值(預設為1)。如果給定的值小於0,則會引發ValueError

在 3.10 版本中更改: 刪除了 loop 引數。

使用訊號量的首選方式是async with語句

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

它等效於

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
協程 acquire()

獲取訊號量。

如果內部計數器大於零,則將其減一併立即返回True。如果計數器為零,則等待直到呼叫release()並返回True

locked()

如果訊號量無法立即獲取,則返回True

release()

釋放訊號量,將內部計數器加一。可以喚醒正在等待獲取訊號量的任務。

BoundedSemaphore不同,Semaphore允許執行比acquire()呼叫更多的release()呼叫。

有界訊號量

class asyncio.BoundedSemaphore(value=1)

一個有界訊號量物件。非執行緒安全。

有界訊號量是Semaphore的一個版本,如果release()將內部計數器增加到初始value之上,則會引發ValueError

在 3.10 版本中更改: 刪除了 loop 引數。

屏障

class asyncio.Barrier(parties)

一個屏障物件。非執行緒安全。

屏障是一種簡單的同步原語,允許阻塞,直到有parties個任務在它上面等待。任務可以在wait()方法上等待,並且會被阻塞,直到指定數量的任務在wait()上等待。屆時,所有等待的任務將同時解除阻塞。

async with可以用作等待wait()的替代方法。

屏障可以重複使用任意次數。

示例

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

此示例的結果是

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

3.11 版本中新增。

協程 wait()

透過屏障。當參與屏障的所有任務都呼叫此函式時,它們將同時解除阻塞。

當屏障中等待或阻塞的任務被取消時,該任務將退出屏障,屏障保持相同的狀態。如果屏障的狀態為“填充”,則等待任務的數量會減少 1。

返回值是一個介於 0 到 parties-1 之間的整數,每個任務都不同。這可以用來選擇一個任務來執行一些特殊的管理任務,例如。

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

如果屏障在任務等待時被破壞或重置,此方法可能會引發BrokenBarrierError異常。如果任務被取消,則可能會引發CancelledError

協程 reset()

將屏障返回到預設的空狀態。任何在屏障上等待的任務都將收到BrokenBarrierError異常。

如果屏障被破壞,最好是直接保留它並建立一個新的屏障。

協程 abort()

將屏障置於破壞狀態。這會導致任何活動的或未來的wait()呼叫失敗,並出現BrokenBarrierError。例如,如果其中一個任務需要中止,請使用此選項以避免無限等待任務。

parties

透過屏障所需的任務數。

n_waiting

當前正在屏障中等待填充的任務數。

broken

一個布林值,如果屏障處於破壞狀態,則為True

exception asyncio.BrokenBarrierError

Barrier物件被重置或破壞時,將引發此異常,它是RuntimeError的子類。


在 3.9 版本中更改:使用 await lockyield from lock 和/或 with 語句(with await lockwith (yield from lock))獲取鎖的操作已被刪除。請改用 async with lock