佇列¶
asyncio 佇列的設計與 queue
模組的類相似。儘管 asyncio 佇列不是執行緒安全的,但它們是專門設計用於 async/await 程式碼的。
請注意,asyncio 佇列的方法沒有 timeout 引數;使用 asyncio.wait_for()
函式來執行帶有超時的佇列操作。
另請參見下面的 示例 部分。
佇列¶
- class asyncio.Queue(maxsize=0)¶
一個先進先出 (FIFO) 佇列。
如果 maxsize 小於或等於零,則佇列大小是無限的。如果它是一個大於
0
的整數,則當佇列達到 maxsize 時,await put()
會阻塞,直到透過get()
刪除一個專案。與標準庫執行緒
queue
不同,佇列的大小始終是已知的,並且可以透過呼叫qsize()
方法返回。在 3.10 版本中更改: 刪除了 loop 引數。
此類是非執行緒安全的。
- maxsize¶
佇列中允許的專案數。
- empty()¶
如果佇列為空,則返回
True
,否則返回False
。
- 協程 get()¶
從佇列中刪除並返回一個專案。如果佇列為空,則等待直到有專案可用。
如果佇列已關閉且為空,或者如果佇列已立即關閉,則引發
QueueShutDown
。
- get_nowait()¶
如果專案立即可用,則返回一個專案,否則引發
QueueEmpty
。
- 協程 join()¶
阻塞,直到佇列中的所有專案都被接收和處理。
每當向佇列中新增一個專案時,未完成任務的計數就會增加。每當消費者協程呼叫
task_done()
來指示該專案已被檢索並且對其的所有工作都已完成時,計數就會減少。當未完成任務的計數降至零時,join()
將取消阻塞。
- 協程 put(item)¶
將一個專案放入佇列。如果佇列已滿,則等待,直到有空閒槽位可用,然後再新增專案。
如果佇列已關閉,則引發
QueueShutDown
。
- qsize()¶
返回佇列中的專案數。
- shutdown(immediate=False)¶
關閉佇列,使
get()
和put()
引發QueueShutDown
。預設情況下,關閉佇列上的
get()
僅在佇列為空時引發。將 immediate 設定為 true 以使get()
立即引發。所有被阻塞的
put()
和get()
呼叫者將被取消阻塞。如果 immediate 為 true,則佇列中每個剩餘的專案都將被標記為已完成,這可能會取消阻塞join()
的呼叫者。在 3.13 版本中新增。
- task_done()¶
指示先前排隊的任務已完成。
由佇列消費者使用。對於用於獲取任務的每個
get()
,後續呼叫task_done()
會告訴佇列該任務的處理已完成。如果
join()
當前正在阻塞,則當所有專案都已處理完畢時(意味著已收到task_done()
呼叫,該呼叫對應於放入佇列中的每個專案),它將恢復。shutdown(immediate=True)
為佇列中的每個剩餘專案呼叫task_done()
。如果呼叫的次數多於放入佇列中的專案數,則引發
ValueError
。
優先順序佇列¶
後進先出佇列¶
異常¶
- exception asyncio.QueueEmpty¶
當在空佇列上呼叫
get_nowait()
方法時,會引發此異常。
- exception asyncio.QueueFull¶
當在已達到其 maxsize 的佇列上呼叫
put_nowait()
方法時,會引發此異常。
示例¶
佇列可以用於在多個併發任務之間分配工作負載。
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())