佇列

原始碼: Lib/asyncio/queues.py


asyncio 佇列旨在與 queue 模組的類相似。儘管 asyncio 佇列不是執行緒安全的,但它們專門設計用於 async/await 程式碼。

請注意,asyncio 佇列的方法沒有 timeout 引數;請使用 asyncio.wait_for() 函式來執行帶超時的佇列操作。

另請參閱下面的示例部分。

佇列

class asyncio.Queue(maxsize=0)

一個先進先出 (FIFO) 佇列。

如果 maxsize 小於或等於零,則佇列大小是無限的。如果它是一個大於 0 的整數,則當佇列達到 maxsize 時,await put() 會阻塞,直到透過 get() 移除一個專案。

與標準庫的執行緒 queue 不同,佇列的大小始終可知,並可透過呼叫 qsize() 方法返回。

版本 3.10 中已更改: 移除了 loop 引數。

此類不是執行緒安全的

maxsize

佇列中允許的專案數量。

empty()

如果佇列為空,返回 True,否則返回 False

full()

如果佇列中有 maxsize 個專案,返回 True

如果佇列以 maxsize=0(預設值)初始化,則 full() 永遠不會返回 True

async get()

從佇列中移除並返回一個專案。如果佇列為空,則等待直到專案可用。

如果佇列已關閉且為空,或者佇列已立即關閉,則引發 QueueShutDown

get_nowait()

如果專案立即可用,則返回一個專案,否則引發 QueueEmpty

async join()

阻塞直到佇列中的所有專案都已接收和處理完畢。

每當有專案新增到佇列時,未完成任務的計數就會增加。每當消費者協程呼叫 task_done() 表示專案已被檢索並且其上的所有工作都已完成時,計數就會減少。當未完成任務的計數降至零時,join() 會解除阻塞。

async put(item)

將一個專案放入佇列。如果佇列已滿,則等待直到有可用空位,然後新增專案。

如果佇列已關閉,則引發 QueueShutDown

put_nowait(item)

將一個專案放入佇列而不阻塞。

如果沒有立即可用的空位,則引發 QueueFull

qsize()

返回佇列中的專案數量。

shutdown(immediate=False)

Queue 例項置於關閉模式。

佇列不再能增長。將來呼叫 put() 將引發 QueueShutDown。當前阻塞的 put() 呼叫者將被解除阻塞,並在之前阻塞的執行緒中引發 QueueShutDown

如果 immediate 為 false(預設值),佇列可以透過 get() 呼叫正常關閉,以提取已載入的任務。

如果為每個剩餘任務呼叫 task_done(),則掛起的 join() 將正常解除阻塞。

一旦佇列為空,將來呼叫 get() 將引發 QueueShutDown

如果 immediate 為 true,佇列將立即終止。佇列將被清空。所有 join() 的呼叫者都會解除阻塞,無論未完成任務的數量如何。阻塞的 get() 呼叫者會解除阻塞,並由於佇列為空而引發 QueueShutDown

在使用 immediate 設定為 true 的 join() 時請謹慎。這會解除連線的阻塞,即使任務上沒有完成任何工作,從而違反了連線佇列的通常不變式。

在 3.13 版本加入。

task_done()

表示先前入隊的工單已完成。

由佇列消費者使用。對於每個用於獲取工單的 get() 呼叫,後續呼叫 task_done() 會告訴佇列工單上的處理已完成。

如果 join() 當前正在阻塞,當所有專案都已處理(意味著對於每個已 put() 到佇列中的專案,都收到了 task_done() 呼叫)時,它將恢復。

如果呼叫次數多於佇列中放置的專案數量,則引發 ValueError

優先佇列

class asyncio.PriorityQueue

Queue 的變體;以優先順序順序(最低優先)檢索條目。

條目通常是 (priority_number, data) 形式的元組。

後進先出佇列

class asyncio.LifoQueue

Queue 的變體,它首先檢索最近新增的條目(後進先出)。

異常

exception asyncio.QueueEmpty

當在空佇列上呼叫 get_nowait() 方法時,會引發此異常。

exception asyncio.QueueFull

當在已達到 maxsize 的佇列上呼叫 put_nowait() 方法時,引發的異常。

exception asyncio.QueueShutDown

當在已關閉的佇列上呼叫 put()get() 時,引發的異常。

在 3.13 版本加入。

示例

佇列可用於在多個併發任務之間分配工作負載。

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())