queue — 同步佇列類

原始碼: Lib/queue.py


queue 模組實現了多生產者、多消費者佇列。在需要多個執行緒之間安全交換資訊的執行緒程式設計中,它特別有用。Queue 類在這個模組中實現了所有必需的鎖定語義。

該模組實現了三種類型的佇列,它們僅在檢索條目的順序上有所不同。在 FIFO 佇列中,首先新增的任務首先被檢索。在 LIFO 佇列中,最近新增的條目首先被檢索(像堆疊一樣操作)。對於優先順序佇列,條目保持排序(使用 heapq 模組),並且值最低的條目首先被檢索。

在內部,這三種類型的佇列使用鎖來暫時阻塞競爭執行緒;然而,它們並非設計用於處理同一執行緒內的重入。

此外,該模組實現了一種“簡單”的 FIFO 佇列型別,即 SimpleQueue,其特定實現以較小的功能換取額外的保證。

queue 模組定義了以下類和異常

class queue.Queue(maxsize=0)

構造一個 FIFO 佇列。 maxsize 是一個整數,設定了可以放入佇列的項數的上限。一旦達到此大小,插入將阻塞,直到佇列項被消費。如果 maxsize 小於或等於零,則佇列大小是無限的。

class queue.LifoQueue(maxsize=0)

構造一個 LIFO 佇列。 maxsize 是一個整數,設定了可以放入佇列的項數的上限。一旦達到此大小,插入將阻塞,直到佇列項被消費。如果 maxsize 小於或等於零,則佇列大小是無限的。

class queue.PriorityQueue(maxsize=0)

構造一個優先順序佇列。 maxsize 是一個整數,設定了可以放入佇列的項數的上限。一旦達到此大小,插入將阻塞,直到佇列項被消費。如果 maxsize 小於或等於零,則佇列大小是無限的。

值最低的條目首先被檢索(值最低的條目是 min(entries) 將返回的那個)。條目的典型模式是形如 (priority_number, data) 的元組。

如果 data 元素不可比較,則可以將資料封裝在一個忽略資料項而只比較優先順序數字的類中

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

構造一個無界 FIFO 佇列。簡單佇列缺乏高階功能,例如任務跟蹤。

在 3.7 版本加入。

exception queue.Empty

當在空的 Queue 物件上呼叫非阻塞的 get() (或 get_nowait())時引發的異常。

exception queue.Full

當在已滿的 Queue 物件上呼叫非阻塞的 put() (或 put_nowait())時引發的異常。

exception queue.ShutDown

當在已關閉的 Queue 物件上呼叫 put()get() 時引發的異常。

在 3.13 版本加入。

佇列物件

佇列物件(QueueLifoQueuePriorityQueue)提供下面描述的公共方法。

Queue.qsize()

返回佇列的大致大小。請注意,qsize() > 0 不保證隨後的 get() 不會阻塞,qsize() < maxsize 也不保證 put() 不會阻塞。

Queue.empty()

如果佇列為空,則返回 True,否則返回 False。如果 empty() 返回 True,並不保證隨後呼叫 put() 不會阻塞。同樣,如果 empty() 返回 False,也不保證隨後呼叫 get() 不會阻塞。

Queue.full()

如果佇列已滿,則返回 True,否則返回 False。如果 full() 返回 True,並不保證隨後呼叫 get() 不會阻塞。同樣,如果 full() 返回 False,也不保證隨後呼叫 put() 不會阻塞。

Queue.put(item, block=True, timeout=None)

item 放入佇列。如果可選引數 block 為真且 timeoutNone (預設值),則必要時阻塞,直到有空閒槽可用。如果 timeout 是一個正數,它將最多阻塞 timeout 秒,如果在此時間內沒有空閒槽可用,則引發 Full 異常。否則(block 為假),如果有空閒槽立即可用,則將項放入佇列,否則引發 Full 異常(在這種情況下 timeout 被忽略)。

如果佇列已關閉,則引發 ShutDown

Queue.put_nowait(item)

等同於 put(item, block=False)

Queue.get(block=True, timeout=None)

從佇列中移除並返回一個項。如果可選引數 block 為真且 timeoutNone (預設值),則必要時阻塞,直到有項可用。如果 timeout 是一個正數,它將最多阻塞 timeout 秒,如果在此時間內沒有項可用,則引發 Empty 異常。否則(block 為假),如果有項立即可用,則返回一個項,否則引發 Empty 異常(在這種情況下 timeout 被忽略)。

在 POSIX 系統上 3.0 版本之前,以及在所有版本的 Windows 上,如果 block 為真且 timeoutNone,此操作將進入底層鎖上的不可中斷等待。這意味著不會發生任何異常,特別是 SIGINT 不會觸發 KeyboardInterrupt

如果佇列已關閉且為空,或者佇列已立即關閉,則引發 ShutDown

Queue.get_nowait()

等同於 get(False)

提供了兩種方法來支援跟蹤已入隊任務是否已被守護消費者執行緒完全處理。

Queue.task_done()

指示一個先前入隊的任務已完成。由佇列消費者執行緒使用。對於每個用於獲取任務的 get() 呼叫,隨後的 task_done() 呼叫會告知佇列該任務的處理已完成。

如果 join() 當前正在阻塞,它將在所有專案都被處理後恢復(這意味著對於佇列中每個 put() 進去的專案,都收到了一個 task_done() 呼叫)。

如果呼叫次數多於佇列中放入的項數,則引發 ValueError

Queue.join()

阻塞直到佇列中的所有專案都被獲取和處理。

每當有專案新增到佇列時,未完成任務的計數就會增加。每當消費者執行緒呼叫 task_done() 來表示該專案已被檢索並且其所有工作都已完成時,計數就會減少。當未完成任務的計數降至零時,join() 會解除阻塞。

等待任務完成

等待入隊任務完成的示例

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

終止佇列

當不再需要時,Queue 物件可以正常結束直到為空,或透過硬關閉立即終止。

Queue.shutdown(immediate=False)

Queue 例項置於關閉模式。

佇列無法再增長。將來對 put() 的呼叫將引發 ShutDown。目前阻塞的 put() 呼叫者將被解除阻塞,並在先前阻塞的執行緒中引發 ShutDown

如果 immediate 為 false(預設值),佇列可以透過 get() 呼叫正常結束,以提取已載入的任務。

如果對每個剩餘任務都呼叫了 task_done(),則掛起的 join() 將正常解除阻塞。

一旦佇列為空,將來對 get() 的呼叫將引發 ShutDown

如果 immediate 為真,佇列將立即終止。佇列將被清空。所有 join() 的呼叫者都會解除阻塞,無論未完成任務的數量如何。阻塞的 get() 呼叫者也會解除阻塞,並將引發 ShutDown,因為佇列已為空。

當使用 immediate 設定為 true 的 join() 時請謹慎。這會解除 join 的阻塞,即使任務尚未完成任何工作,違反了連線佇列的通常不變數。

在 3.13 版本加入。

SimpleQueue 物件

SimpleQueue 物件提供以下公共方法。

SimpleQueue.qsize()

返回佇列的大致大小。請注意,qsize() > 0 不保證隨後的 get() 不會阻塞。

SimpleQueue.empty()

如果佇列為空,則返回 True,否則返回 False。如果 empty() 返回 False,並不保證隨後呼叫 get() 不會阻塞。

SimpleQueue.put(item, block=True, timeout=None)

item 放入佇列。此方法永不阻塞且始終成功(除了潛在的低階錯誤,例如記憶體分配失敗)。可選引數 blocktimeout 被忽略,僅為了與 Queue.put() 相容而提供。

CPython 實現細節: 此方法有一個 C 實現,它是可重入的。也就是說,在同一個執行緒中,一個 put()get() 呼叫可以被另一個 put() 呼叫中斷,而不會死鎖或破壞佇列內部狀態。這使得它適用於解構函式,例如 __del__ 方法或 weakref 回撥。

SimpleQueue.put_nowait(item)

等同於 put(item, block=False),為了與 Queue.put_nowait() 相容而提供。

SimpleQueue.get(block=True, timeout=None)

從佇列中移除並返回一個項。如果可選引數 block 為真且 timeoutNone (預設值),則必要時阻塞,直到有項可用。如果 timeout 是一個正數,它將最多阻塞 timeout 秒,如果在此時間內沒有項可用,則引發 Empty 異常。否則(block 為假),如果有項立即可用,則返回一個項,否則引發 Empty 異常(在這種情況下 timeout 被忽略)。

SimpleQueue.get_nowait()

等同於 get(False)

參見

multiprocessing.Queue

一個用於多程序(而非多執行緒)上下文的佇列類。

collections.deque 是無界佇列的另一種實現,具有快速原子 append()popleft() 操作,無需鎖定且支援索引。