queue — 一個同步佇列類

原始碼: Lib/queue.py


queue 模組實現了多生產者、多消費者佇列。當需要在多個執行緒之間安全地交換資訊時,它線上程程式設計中尤其有用。此模組中的 Queue 類實現了所有必需的鎖定語義。

該模組實現了三種類型的佇列,它們之間的區別僅在於檢索條目的順序。在 FIFO 佇列中,先新增的任務先被檢索。在 LIFO 佇列中,最近新增的條目先被檢索(像堆疊一樣操作)。使用優先順序佇列,條目會保持排序(使用 heapq 模組),並且首先檢索值最低的條目。

在內部,這三種類型的佇列使用鎖來臨時阻止競爭執行緒;但是,它們並非旨在處理執行緒內的重入。

此外,該模組還實現了一個“簡單”的 FIFO 佇列型別 SimpleQueue,其特定實現提供了額外的保證,以換取較小的功能。

queue 模組定義了以下類和異常

class queue.Queue(maxsize=0)

用於 FIFO 佇列的建構函式。maxsize 是一個整數,用於設定可以放入佇列中的專案數的上限。一旦達到此大小,插入操作將阻塞,直到佇列中的專案被消耗完。如果 maxsize 小於或等於零,則佇列大小是無限的。

class queue.LifoQueue(maxsize=0)

用於 LIFO 佇列的建構函式。maxsize 是一個整數,用於設定可以放入佇列中的專案數的上限。一旦達到此大小,插入操作將阻塞,直到佇列中的專案被消耗完。如果 maxsize 小於或等於零,則佇列大小是無限的。

class queue.PriorityQueue(maxsize=0)

優先順序佇列的建構函式。maxsize 是一個整數,用於設定可以放入佇列中的專案數的上限。一旦達到此大小,插入操作將阻塞,直到佇列中的專案被消耗完。如果 maxsize 小於或等於零,則佇列大小是無限的。

首先檢索值最低的條目(值最低的條目是由 min(entries) 返回的條目)。條目的典型模式是元組形式:(priority_number, data)

如果 data 元素不可比較,則可以將資料包裝在一個忽略資料項並僅比較優先順序編號的類中

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

無界 FIFO 佇列的建構函式。簡單佇列缺少任務跟蹤等高階功能。

在 3.7 版本中新增。

exception queue.Empty

當在空的 Queue 物件上呼叫非阻塞的 get() (或 get_nowait()) 時引發的異常。

exception queue.Full

當在已滿的 Queue 物件上呼叫非阻塞的 put() (或 put_nowait()) 時引發的異常。

exception queue.ShutDown

當在已關閉的 Queue 物件上呼叫 put()get() 時引發的異常。

在 3.13 版本中新增。

佇列物件

佇列物件 (QueueLifoQueuePriorityQueue) 提供如下所述的公共方法。

Queue.qsize()

返回佇列的大概大小。請注意,qsize() > 0 不能保證後續的 get() 不會阻塞,qsize() < maxsize 也不能保證 put() 不會阻塞。

Queue.empty()

如果佇列為空,則返回 True,否則返回 False。如果 empty() 返回 True,則不能保證後續呼叫 put() 不會阻塞。同樣,如果 empty() 返回 False,則不能保證後續呼叫 get() 不會阻塞。

Queue.full()

如果佇列已滿,則返回 True,否則返回 False。如果 full() 返回 True,則不能保證後續呼叫 get() 不會阻塞。同樣,如果 full() 返回 False,則不能保證後續呼叫 put() 不會阻塞。

Queue.put(item, block=True, timeout=None)

item 放入佇列。如果可選引數 block 為 true 且 timeoutNone(預設值),則在有空閒槽位可用之前阻塞(如有必要)。如果 timeout 為正數,則最多阻塞 timeout 秒,如果在該時間內沒有可用的空閒槽位,則引發 Full 異常。否則(block 為 false),如果有空閒槽位立即可用,則將項放入佇列,否則引發 Full 異常(在這種情況下會忽略 timeout)。

如果佇列已關閉,則引發 ShutDown

Queue.put_nowait(item)

等效於 put(item, block=False)

Queue.get(block=True, timeout=None)

從佇列中移除並返回一個項。如果可選引數 block 為 true 且 timeoutNone (預設值),則在必要時阻塞,直到有項可用。如果 timeout 為正數,則最多阻塞 timeout 秒,如果在該時間內沒有可用的項,則引發 Empty 異常。否則(block 為 false),如果立即有項可用,則返回該項,否則引發 Empty 異常(在這種情況下會忽略 timeout)。

在 POSIX 系統上的 3.0 之前的版本中,以及在 Windows 上的所有版本中,如果 block 為 true 且 timeoutNone,則此操作會在底層鎖上進入不可中斷的等待。這意味著不會發生任何異常,特別是 SIGINT 不會觸發 KeyboardInterrupt

如果佇列已關閉且為空,或者佇列已立即關閉,則會引發 ShutDown

Queue.get_nowait()

等同於 get(False)

提供了兩種方法來支援跟蹤排隊的任務是否已由守護程序消費者執行緒完全處理。

Queue.task_done()

指示先前排隊的任務已完成。由佇列消費者執行緒使用。對於每個用於獲取任務的 get(),隨後對 task_done() 的呼叫會告訴佇列,該任務的處理已完成。

如果 join() 當前正在阻塞,則當所有項都已處理完畢時(意味著對放入佇列的每個項都已收到 task_done() 呼叫時),它將恢復。

shutdown(immediate=True) 對佇列中剩餘的每個項呼叫 task_done()

如果呼叫次數多於放入佇列中的項數,則會引發 ValueError

Queue.join()

阻塞,直到佇列中的所有項都已被獲取和處理。

每當向佇列新增項時,未完成任務的計數就會增加。每當消費者執行緒呼叫 task_done() 以指示該項已被檢索且其上的所有工作均已完成時,該計數就會減少。當未完成任務的計數降至零時,join() 將解除阻塞。

如何等待排隊任務完成的示例

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

終止佇列

可以使 Queue 物件透過關閉它們來防止進一步的互動。

Queue.shutdown(immediate=False)

關閉佇列,使 get()put() 引發 ShutDown

預設情況下,關閉佇列上的 get() 僅在佇列為空時才會引發。將 immediate 設定為 true 以使 get() 立即引發。

所有阻塞的 put()get() 呼叫者將被解除阻塞。如果 immediate 為 true,則佇列中剩餘的每個項都將被標記為已完成,這可能會解除 join() 的呼叫者的阻塞。

在 3.13 版本中新增。

SimpleQueue 物件

SimpleQueue 物件提供以下描述的公共方法。

SimpleQueue.qsize()

返回佇列的大致大小。請注意,qsize() > 0 並不能保證後續的 get() 不會阻塞。

SimpleQueue.empty()

如果佇列為空,則返回 True,否則返回 False。如果 empty() 返回 False,則不能保證後續對 get() 的呼叫不會阻塞。

SimpleQueue.put(item, block=True, timeout=None)

item 放入佇列。該方法永遠不會阻塞,並且總是成功(除了潛在的低階錯誤,例如無法分配記憶體)。可選引數 blocktimeout 將被忽略,僅為了與 Queue.put() 相容而提供。

CPython 實現細節:此方法具有可重入的 C 實現。也就是說,put()get() 呼叫可以被同一執行緒中的另一個 put() 呼叫中斷,而不會發生死鎖或損壞佇列內部狀態。這使其適用於解構函式(例如 __del__ 方法)或 weakref 回撥。

SimpleQueue.put_nowait(item)

等同於 put(item, block=False),為了與 Queue.put_nowait() 相容而提供。

SimpleQueue.get(block=True, timeout=None)

從佇列中移除並返回一個項。如果可選引數 block 為 true 且 timeoutNone (預設值),則在必要時阻塞,直到有項可用。如果 timeout 為正數,則最多阻塞 timeout 秒,如果在該時間內沒有可用的項,則引發 Empty 異常。否則(block 為 false),如果立即有項可用,則返回該項,否則引發 Empty 異常(在這種情況下會忽略 timeout)。

SimpleQueue.get_nowait()

等同於 get(False)

另請參閱

multiprocessing.Queue

用於多程序(而不是多執行緒)環境中的佇列類。

collections.deque 是無界佇列的另一種實現,具有快速的原子 append()popleft() 操作,這些操作不需要鎖定,並且還支援索引。