佇列

原始碼: Lib/asyncio/queues.py


asyncio 佇列的設計與 queue 模組的類相似。儘管 asyncio 佇列不是執行緒安全的,但它們是專門設計用於 async/await 程式碼的。

請注意,asyncio 佇列的方法沒有 timeout 引數;使用 asyncio.wait_for() 函式來執行帶有超時的佇列操作。

另請參見下面的 示例 部分。

佇列

class asyncio.Queue(maxsize=0)

一個先進先出 (FIFO) 佇列。

如果 maxsize 小於或等於零,則佇列大小是無限的。如果它是一個大於 0 的整數,則當佇列達到 maxsize 時,await put() 會阻塞,直到透過 get() 刪除一個專案。

與標準庫執行緒 queue 不同,佇列的大小始終是已知的,並且可以透過呼叫 qsize() 方法返回。

在 3.10 版本中更改: 刪除了 loop 引數。

此類是非執行緒安全的。

maxsize

佇列中允許的專案數。

empty()

如果佇列為空,則返回 True,否則返回 False

full()

如果佇列中有 maxsize 個專案,則返回 True

如果佇列使用 maxsize=0 (預設值) 初始化,則 full() 永遠不會返回 True

協程 get()

從佇列中刪除並返回一個專案。如果佇列為空,則等待直到有專案可用。

如果佇列已關閉且為空,或者如果佇列已立即關閉,則引發 QueueShutDown

get_nowait()

如果專案立即可用,則返回一個專案,否則引發 QueueEmpty

協程 join()

阻塞,直到佇列中的所有專案都被接收和處理。

每當向佇列中新增一個專案時,未完成任務的計數就會增加。每當消費者協程呼叫 task_done() 來指示該專案已被檢索並且對其的所有工作都已完成時,計數就會減少。當未完成任務的計數降至零時,join() 將取消阻塞。

協程 put(item)

將一個專案放入佇列。如果佇列已滿,則等待,直到有空閒槽位可用,然後再新增專案。

如果佇列已關閉,則引發 QueueShutDown

put_nowait(item)

將一個專案放入佇列,而不阻塞。

如果沒有立即可用的空閒槽位,則引發 QueueFull

qsize()

返回佇列中的專案數。

shutdown(immediate=False)

關閉佇列,使 get()put() 引發 QueueShutDown

預設情況下,關閉佇列上的 get() 僅在佇列為空時引發。將 immediate 設定為 true 以使 get() 立即引發。

所有被阻塞的 put()get() 呼叫者將被取消阻塞。如果 immediate 為 true,則佇列中每個剩餘的專案都將被標記為已完成,這可能會取消阻塞 join() 的呼叫者。

在 3.13 版本中新增。

task_done()

指示先前排隊的任務已完成。

由佇列消費者使用。對於用於獲取任務的每個 get(),後續呼叫 task_done() 會告訴佇列該任務的處理已完成。

如果 join() 當前正在阻塞,則當所有專案都已處理完畢時(意味著已收到 task_done() 呼叫,該呼叫對應於放入佇列中的每個專案),它將恢復。

shutdown(immediate=True) 為佇列中的每個剩餘專案呼叫 task_done()

如果呼叫的次數多於放入佇列中的專案數,則引發 ValueError

優先順序佇列

class asyncio.PriorityQueue

Queue 的變體;按優先順序順序 (最低的優先) 檢索條目。

條目通常是 (priority_number, data) 形式的元組。

後進先出佇列

class asyncio.LifoQueue

一個 Queue 的變體,它首先檢索最近新增的條目(後進先出)。

異常

exception asyncio.QueueEmpty

當在空佇列上呼叫 get_nowait() 方法時,會引發此異常。

exception asyncio.QueueFull

當在已達到其 maxsize 的佇列上呼叫 put_nowait() 方法時,會引發此異常。

exception asyncio.QueueShutDown

當在已關閉的佇列上呼叫 put()get() 時,會引發此異常。

在 3.13 版本中新增。

示例

佇列可以用於在多個併發任務之間分配工作負載。

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())