佇列¶
asyncio 佇列旨在與 queue
模組的類相似。儘管 asyncio 佇列不是執行緒安全的,但它們專門設計用於 async/await 程式碼。
請注意,asyncio 佇列的方法沒有 timeout 引數;請使用 asyncio.wait_for()
函式來執行帶超時的佇列操作。
另請參閱下面的示例部分。
佇列¶
- class asyncio.Queue(maxsize=0)¶
一個先進先出 (FIFO) 佇列。
如果 maxsize 小於或等於零,則佇列大小是無限的。如果它是一個大於
0
的整數,則當佇列達到 maxsize 時,await put()
會阻塞,直到透過get()
移除一個專案。與標準庫的執行緒
queue
不同,佇列的大小始終可知,並可透過呼叫qsize()
方法返回。版本 3.10 中已更改: 移除了 loop 引數。
此類不是執行緒安全的。
- maxsize¶
佇列中允許的專案數量。
- empty()¶
如果佇列為空,返回
True
,否則返回False
。
- async get()¶
從佇列中移除並返回一個專案。如果佇列為空,則等待直到專案可用。
如果佇列已關閉且為空,或者佇列已立即關閉,則引發
QueueShutDown
。
- get_nowait()¶
如果專案立即可用,則返回一個專案,否則引發
QueueEmpty
。
- async join()¶
阻塞直到佇列中的所有專案都已接收和處理完畢。
每當有專案新增到佇列時,未完成任務的計數就會增加。每當消費者協程呼叫
task_done()
表示專案已被檢索並且其上的所有工作都已完成時,計數就會減少。當未完成任務的計數降至零時,join()
會解除阻塞。
- async put(item)¶
將一個專案放入佇列。如果佇列已滿,則等待直到有可用空位,然後新增專案。
如果佇列已關閉,則引發
QueueShutDown
。
- qsize()¶
返回佇列中的專案數量。
- shutdown(immediate=False)¶
將
Queue
例項置於關閉模式。佇列不再能增長。將來呼叫
put()
將引發QueueShutDown
。當前阻塞的put()
呼叫者將被解除阻塞,並在之前阻塞的執行緒中引發QueueShutDown
。如果 immediate 為 false(預設值),佇列可以透過
get()
呼叫正常關閉,以提取已載入的任務。如果為每個剩餘任務呼叫
task_done()
,則掛起的join()
將正常解除阻塞。一旦佇列為空,將來呼叫
get()
將引發QueueShutDown
。如果 immediate 為 true,佇列將立即終止。佇列將被清空。所有
join()
的呼叫者都會解除阻塞,無論未完成任務的數量如何。阻塞的get()
呼叫者會解除阻塞,並由於佇列為空而引發QueueShutDown
。在使用 immediate 設定為 true 的
join()
時請謹慎。這會解除連線的阻塞,即使任務上沒有完成任何工作,從而違反了連線佇列的通常不變式。在 3.13 版本加入。
- task_done()¶
表示先前入隊的工單已完成。
由佇列消費者使用。對於每個用於獲取工單的
get()
呼叫,後續呼叫task_done()
會告訴佇列工單上的處理已完成。如果
join()
當前正在阻塞,當所有專案都已處理(意味著對於每個已put()
到佇列中的專案,都收到了task_done()
呼叫)時,它將恢復。如果呼叫次數多於佇列中放置的專案數量,則引發
ValueError
。
優先佇列¶
後進先出佇列¶
異常¶
- exception asyncio.QueueEmpty¶
當在空佇列上呼叫
get_nowait()
方法時,會引發此異常。
- exception asyncio.QueueFull¶
當在已達到 maxsize 的佇列上呼叫
put_nowait()
方法時,引發的異常。
示例¶
佇列可用於在多個併發任務之間分配工作負載。
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())