threading — 基於執行緒的並行

原始碼: Lib/threading.py


此模組在低階 _thread 模組之上構建了更高階的執行緒介面。

可用性:非 WASI。

此模組在 WebAssembly 上不起作用或不可用。有關更多資訊,請參閱 WebAssembly 平臺

引言

threading 模組提供了一種在單個程序中併發執行多個執行緒(程序的更小單元)的方法。它允許建立和管理執行緒,從而可以並行執行任務並共享記憶體空間。當任務是 I/O 密集型時(例如檔案操作或網路請求,大部分時間都在等待外部資源),執行緒特別有用。

threading 的典型用例包括管理一個工作執行緒池,這些工作執行緒可以併發處理多個任務。以下是使用 Thread 建立和啟動執行緒的基本示例:

import threading
import time

def crawl(link, delay=3):
    print(f"crawl started for {link}")
    time.sleep(delay)  # Blocking I/O (simulating a network request)
    print(f"crawl ended for {link}")

links = [
    "https://python.club.tw",
    "https://docs.python.club.tw",
    "https://peps.python.org",
]

# Start threads for each link
threads = []
for link in links:
    # Using `args` to pass positional arguments and `kwargs` for keyword arguments
    t = threading.Thread(target=crawl, args=(link,), kwargs={"delay": 2})
    threads.append(t)

# Start each thread
for t in threads:
    t.start()

# Wait for all threads to finish
for t in threads:
    t.join()

版本 3.7 中的變化: 該模組以前是可選的,現在始終可用。

參見

concurrent.futures.ThreadPoolExecutor 提供了一個更高階的介面,可以將任務推送到後臺執行緒而不會阻塞呼叫執行緒的執行,同時仍然可以在需要時檢索其結果。

queue 提供了一個執行緒安全介面,用於在執行中的執行緒之間交換資料。

asyncio 提供了一種替代方法來實現任務級別的併發,而無需使用多個作業系統執行緒。

備註

在 Python 2.x 系列中,此模組包含一些方法和函式的 camelCase 名稱。從 Python 3.10 開始,這些名稱已被棄用,但為了相容 Python 2.5 及更低版本,它們仍然受支援。

CPython 實現細節: 在 CPython 中,由於 全域性直譯器鎖,一次只能有一個執行緒執行 Python 程式碼(儘管某些注重效能的庫可能會克服此限制)。如果希望應用程式更好地利用多核機器的計算資源,建議使用 multiprocessingconcurrent.futures.ProcessPoolExecutor。但是,如果要同時執行多個 I/O 密集型任務,執行緒仍然是一個合適的模型。

GIL 和效能考量

與使用單獨程序來繞過 全域性直譯器鎖 (GIL) 的 multiprocessing 模組不同,`threading` 模組在一個程序內執行,這意味著所有執行緒共享相同的記憶體空間。然而,當涉及到 CPU 密集型任務時,GIL 限制了執行緒的效能增益,因為一次只能有一個執行緒執行 Python 位元組碼。儘管如此,執行緒在許多場景中仍然是實現併發的有用工具。

自 Python 3.13 起,自由執行緒 構建可以停用 GIL,從而實現真正的執行緒並行執行,但此功能預設不可用(請參閱 PEP 703)。

參考

該模組定義了以下函式:

threading.active_count()

返回當前存活的 Thread 物件的數量。返回的數量等於 enumerate() 返回的列表的長度。

函式 activeCount 是此函式的已棄用別名。

threading.current_thread()

返回當前 Thread 物件,對應於呼叫者的控制執行緒。如果呼叫者的控制執行緒不是透過 threading 模組建立的,則返回一個功能有限的虛擬執行緒物件。

函式 currentThread 是此函式的已棄用別名。

threading.excepthook(args, /)

處理由 Thread.run() 引發的未捕獲異常。

args 引數具有以下屬性

  • exc_type: 異常型別。

  • exc_value: 異常值,可以為 None

  • exc_traceback: 異常回溯,可以為 None

  • thread: 引發異常的執行緒,可以為 None

如果 exc_typeSystemExit,則異常會被靜默忽略。否則,異常會列印到 sys.stderr

如果此函式引發異常,則會呼叫 sys.excepthook() 來處理它。

threading.excepthook() 可以被重寫以控制如何處理由 Thread.run() 引發的未捕獲異常。

使用自定義鉤子儲存 exc_value 可能會建立引用迴圈。當不再需要異常時,應明確清除它以打破引用迴圈。

使用自定義鉤子儲存 thread 可能會在物件正在終結時使其復活。為避免物件復活,請在自定義鉤子完成後避免儲存 thread

參見

sys.excepthook() 處理未捕獲的異常。

在 3.8 版本加入。

threading.__excepthook__

儲存 threading.excepthook() 的原始值。它被儲存下來,以便在原始值可能被損壞或替代物件替換時可以恢復。

在 3.10 版本加入。

threading.get_ident()

返回當前執行緒的“執行緒識別符號”。這是一個非零整數。其值沒有直接意義;它旨在用作一個神奇的“cookie”,例如用於索引執行緒特定資料的字典。當一個執行緒退出並建立另一個執行緒時,執行緒識別符號可能會被回收。

在 3.3 版本加入。

threading.get_native_id()

返回核心分配的當前執行緒的本地整數執行緒 ID。這是一個非負整數。其值可用於在系統範圍內唯一標識此特定執行緒(直到執行緒終止,之後該值可能被作業系統回收)。

可用性: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD, GNU/kFreeBSD。

在 3.8 版本加入。

版本 3.13 中的變化: 添加了對 GNU/kFreeBSD 的支援。

threading.enumerate()

返回當前所有活躍的 Thread 物件的列表。該列表包括守護執行緒和由 current_thread() 建立的虛擬執行緒物件。它不包括已終止的執行緒和尚未啟動的執行緒。然而,主執行緒總是結果的一部分,即使它已終止。

threading.main_thread()

返回主 Thread 物件。在正常情況下,主執行緒是 Python 直譯器啟動的執行緒。

在 3.4 版本加入。

threading.settrace(func)

為從 threading 模組啟動的所有執行緒設定跟蹤函式。在每個執行緒的 run() 方法被呼叫之前,func 將被傳遞給每個執行緒的 sys.settrace()

threading.settrace_all_threads(func)

為從 threading 模組啟動的所有執行緒以及所有當前正在執行的 Python 執行緒設定跟蹤函式。

在每個執行緒的 run() 方法被呼叫之前,func 將被傳遞給每個執行緒的 sys.settrace()

3.12 新版功能.

threading.gettrace()

獲取由 settrace() 設定的跟蹤函式。

在 3.10 版本加入。

threading.setprofile(func)

為從 threading 模組啟動的所有執行緒設定配置檔案函式。在每個執行緒的 run() 方法被呼叫之前,func 將被傳遞給每個執行緒的 sys.setprofile()

threading.setprofile_all_threads(func)

為從 threading 模組啟動的所有執行緒以及所有當前正在執行的 Python 執行緒設定配置檔案函式。

在每個執行緒的 run() 方法被呼叫之前,func 將被傳遞給每個執行緒的 sys.setprofile()

3.12 新版功能.

threading.getprofile()

獲取由 setprofile() 設定的分析器函式。

在 3.10 版本加入。

threading.stack_size([size])

返回建立新執行緒時使用的執行緒棧大小。可選的 size 引數指定後續建立的執行緒將使用的棧大小,必須為 0 (使用平臺或配置的預設值) 或至少 32,768 (32 KiB) 的正整數值。如果未指定 size,則使用 0。如果不支援更改執行緒棧大小,則會引發 RuntimeError。如果指定的棧大小無效,則會引發 ValueError 且棧大小不會修改。32 KiB 是當前支援的最小棧大小值,以保證直譯器本身有足夠的棧空間。請注意,某些平臺可能對棧大小值有特定限制,例如要求最小棧大小 > 32 KiB 或要求以系統記憶體頁大小的倍數進行分配 - 有關更多資訊應查閱平臺文件(4 KiB 頁很常見;在沒有更具體資訊的情況下,建議使用 4096 的倍數作為棧大小)。

可用性: Windows, pthreads。

支援 POSIX 執行緒的 Unix 平臺。

此模組還定義了以下常量

threading.TIMEOUT_MAX

阻塞函式(Lock.acquire()RLock.acquire()Condition.wait() 等)的 timeout 引數允許的最大值。指定大於此值的超時將引發 OverflowError

在 3.2 版本加入。

此模組定義了許多類,這些類在以下部分中詳細介紹。

此模組的設計大致基於 Java 的執行緒模型。然而,Java 將鎖和條件變數作為每個物件的基本行為,而在 Python 中它們是單獨的物件。Python 的 Thread 類支援 Java 的 Thread 類行為的一個子集;目前,沒有優先順序,沒有執行緒組,並且執行緒不能被銷燬、停止、暫停、恢復或中斷。Java 的 Thread 類的靜態方法在實現時被對映到模組級函式。

下面描述的所有方法都是原子地執行的。

執行緒區域性資料

執行緒區域性資料是其值特定於執行緒的資料。如果您有希望對執行緒區域性的資料,請建立一個 local 物件並使用其屬性

>>> mydata = local()
>>> mydata.number = 42
>>> mydata.number
42

您還可以訪問 local 物件的字典

>>> mydata.__dict__
{'number': 42}
>>> mydata.__dict__.setdefault('widgets', [])
[]
>>> mydata.widgets
[]

如果我們在不同的執行緒中訪問資料

>>> log = []
>>> def f():
...     items = sorted(mydata.__dict__.items())
...     log.append(items)
...     mydata.number = 11
...     log.append(mydata.number)

>>> import threading
>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> thread.join()
>>> log
[[], 11]

我們得到不同的資料。此外,在其他執行緒中進行的更改不會影響此執行緒中看到的資料

>>> mydata.number
42

當然,您從 local 物件獲得的值,包括它們的 __dict__ 屬性,都適用於讀取該屬性時當前所在的執行緒。因此,您通常不希望線上程之間儲存這些值,因為它們僅適用於它們所來自的執行緒。

您可以透過子類化 local 類來建立自定義 local 物件

>>> class MyLocal(local):
...     number = 2
...     def __init__(self, /, **kw):
...         self.__dict__.update(kw)
...     def squared(self):
...         return self.number ** 2

這對於支援預設值、方法和初始化很有用。請注意,如果您定義了一個 __init__() 方法,則每次在單獨的執行緒中使用 local 物件時都會呼叫它。這是初始化每個執行緒字典所必需的。

現在如果我們建立一個 local 物件

>>> mydata = MyLocal(color='red')

我們有一個預設數字

>>> mydata.number
2

一個初始顏色

>>> mydata.color
'red'
>>> del mydata.color

以及一個對資料進行操作的方法

>>> mydata.squared()
4

像以前一樣,我們可以在單獨的執行緒中訪問資料

>>> log = []
>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> thread.join()
>>> log
[[('color', 'red')], 11]

而不會影響此執行緒的資料

>>> mydata.number
2
>>> mydata.color
Traceback (most recent call last):
...
AttributeError: 'MyLocal' object has no attribute 'color'

請注意,子類可以定義 __slots__,但它們不是執行緒區域性的。它們線上程之間共享

>>> class MyLocal(local):
...     __slots__ = 'number'

>>> mydata = MyLocal()
>>> mydata.number = 42
>>> mydata.color = 'red'

所以,單獨的執行緒

>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> thread.join()

影響我們所看到的

>>> mydata.number
11
class threading.local

一個表示執行緒區域性資料的類。

執行緒物件

Thread 類表示在單獨控制執行緒中執行的活動。有兩種方法可以指定活動:透過將可呼叫物件傳遞給建構函式,或透過在子類中重寫 run() 方法。在子類中不應重寫其他方法(建構函式除外)。換句話說,*僅*重寫此類的 __init__()run() 方法。

一旦建立了執行緒物件,就必須透過呼叫執行緒的 start() 方法來啟動其活動。這會在單獨的控制執行緒中呼叫 run() 方法。

一旦執行緒的活動開始,執行緒就被認為是“活躍的”。當其 run() 方法終止時(無論是正常終止還是透過引發未處理的異常),它就不再活躍。is_alive() 方法測試執行緒是否活躍。

其他執行緒可以呼叫執行緒的 join() 方法。這會阻塞呼叫執行緒,直到呼叫了其 join() 方法的執行緒終止。

執行緒有一個名稱。該名稱可以傳遞給建構函式,並透過 name 屬性讀取或更改。

如果 run() 方法引發異常,則會呼叫 threading.excepthook() 來處理它。預設情況下,threading.excepthook() 會靜默忽略 SystemExit

執行緒可以標記為“守護執行緒”。此標誌的意義在於,當只剩下守護執行緒時,整個 Python 程式都會退出。初始值從建立執行緒繼承。該標誌可以透過 daemon 屬性或 daemon 建構函式引數設定。

備註

守護執行緒在關閉時會突然停止。它們的資源(如開啟的檔案、資料庫事務等)可能無法正確釋放。如果您希望執行緒優雅地停止,請將它們設定為非守護執行緒,並使用合適的訊號機制,例如 Event

存在一個“主執行緒”物件;它對應於 Python 程式中初始的控制執行緒。它不是守護執行緒。

有可能建立“虛擬執行緒物件”。這些是與“外部執行緒”對應的執行緒物件,外部執行緒是在 `threading` 模組之外啟動的控制執行緒,例如直接從 C 程式碼啟動。虛擬執行緒物件功能有限;它們總是被認為是活躍的和守護的,並且不能被 連線。它們永遠不會被刪除,因為無法檢測外部執行緒的終止。

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None, context=None)

此建構函式應始終使用關鍵字引數呼叫。引數為

group 應為 None;保留用於將來實現 ThreadGroup 類時的擴充套件。

target 是由 run() 方法呼叫的可呼叫物件。預設為 None,表示不呼叫任何內容。

name 是執行緒名稱。預設情況下,會構造一個唯一的名稱,形式為“Thread-N”,其中 N 是一個小的十進位制數字,或者如果指定了 target 引數,則為“Thread-N (target)”,其中“target”是 target.__name__

args 是用於目標呼叫的引數列表或元組。預設為 ()

kwargs 是用於目標呼叫的關鍵字引數字典。預設為 {}

如果不是 Nonedaemon 明確設定執行緒是否為守護執行緒。如果為 None(預設值),則守護屬性從當前執行緒繼承。

context 是啟動執行緒時使用的 Context 值。預設值為 None,表示 sys.flags.thread_inherit_context 標誌控制行為。如果該標誌為真,執行緒將以 start() 呼叫者的上下文副本啟動。如果為假,它們將以空上下文啟動。要明確地以空上下文啟動,請傳遞 Context() 的新例項。要明確地以當前上下文的副本啟動,請傳遞 copy_context() 的值。在自由執行緒構建中,該標誌預設為真,否則為假。

如果子類重寫了建構函式,它必須確保在對執行緒執行任何其他操作之前呼叫基類建構函式(Thread.__init__())。

版本 3.3 中的變化: 添加了 daemon 引數。

版本 3.10 中的變化: 如果省略 name 引數,則使用 target 名稱。

版本 3.14 中的變化: 添加了 context 引數。

start()

啟動執行緒的活動。

它必須每個執行緒物件最多呼叫一次。它安排在單獨的控制執行緒中呼叫物件的 run() 方法。

如果對同一個執行緒物件多次呼叫此方法,將引發 RuntimeError

如果支援,將作業系統執行緒名稱設定為 threading.Thread.name。該名稱可能會根據作業系統執行緒名稱限制而被截斷。

版本 3.14 中的變化: 設定作業系統執行緒名稱。

run()

表示執行緒活動的方法。

您可以在子類中重寫此方法。標準的 run() 方法呼叫傳遞給物件建構函式作為 target 引數的可呼叫物件(如果有),並分別使用 argskwargs 引數中的位置引數和關鍵字引數。

使用列表或元組作為傳遞給 Threadargs 引數可以達到相同的效果。

示例

>>> from threading import Thread
>>> t = Thread(target=print, args=[1])
>>> t.run()
1
>>> t = Thread(target=print, args=(1,))
>>> t.run()
1
join(timeout=None)

等待直到執行緒終止。這將阻塞呼叫執行緒,直到呼叫了其 join() 方法的執行緒終止(無論是正常終止還是透過未處理的異常),或者直到可選的超時發生。

timeout 引數存在且不為 None 時,它應該是一個浮點數,指定操作的超時時間(以秒或其分數計)。由於 join() 總是返回 None,因此您必須在 join() 之後呼叫 is_alive() 來判斷是否發生了超時——如果執行緒仍然活躍,則 join() 呼叫超時。

timeout 引數不存在或為 None 時,操作將阻塞直到執行緒終止。

一個執行緒可以被多次 присоединить。

join() 如果嘗試連線當前執行緒,則會引發 RuntimeError,因為那會導致死鎖。線上程啟動之前連線執行緒也是一個錯誤,嘗試這樣做會引發相同的異常。

如果在 Python 終結化的後期階段嘗試連線正在執行的守護執行緒,join() 將引發 PythonFinalizationError

版本 3.14 中的變化: 可能會引發 PythonFinalizationError

name

一個僅用於標識目的的字串。它沒有語義。多個執行緒可以具有相同的名稱。初始名稱由建構函式設定。

在某些平臺上,執行緒啟動時會在作業系統級別設定執行緒名稱,使其在任務管理器中可見。此名稱可能會被截斷以適應系統特定的限制(例如,Linux 上為 15 位元組,macOS 上為 63 位元組)。

name 的更改僅在當前執行的執行緒被重新命名時才反映在作業系統級別。(設定不同執行緒的 name 屬性僅更新 Python Thread 物件。)

getName()
setName()

name 的已棄用 getter/setter API;請直接將其用作屬性。

3.10 版本後已棄用。

ident

此執行緒的“執行緒識別符號”,如果執行緒尚未啟動,則為 None。這是一個非零整數。請參閱 get_ident() 函式。當一個執行緒退出並建立另一個執行緒時,執行緒識別符號可能會被回收。即使線上程退出後,識別符號仍然可用。

native_id

此執行緒的執行緒 ID (TID),由作業系統(核心)分配。這是一個非負整數,如果執行緒尚未啟動,則為 None。請參閱 get_native_id() 函式。此值可用於在系統範圍內唯一標識此特定執行緒(直到執行緒終止,之後該值可能被作業系統回收)。

備註

與程序 ID 類似,執行緒 ID 僅線上程建立到執行緒終止期間有效(保證系統範圍唯一)。

可用性: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD。

在 3.8 版本加入。

is_alive()

返回執行緒是否活躍。

此方法在 run() 方法開始之前到 run() 方法終止之後返回 True。模組函式 enumerate() 返回所有活躍執行緒的列表。

daemon

一個布林值,指示此執行緒是否為守護執行緒 (True) 或不是 (False)。這必須在呼叫 start() 之前設定,否則會引發 RuntimeError。其初始值從建立執行緒繼承;主執行緒不是守護執行緒,因此在主執行緒中建立的所有執行緒預設 daemon = False

當沒有存活的非守護執行緒時,整個 Python 程式退出。

isDaemon()
setDaemon()

daemon 的已棄用 getter/setter API;請直接將其用作屬性。

3.10 版本後已棄用。

鎖物件

一個原始鎖是一種同步原語,當它被鎖定時,它不被特定的執行緒擁有。在 Python 中,它目前是可用的最低級別的同步原語,由 _thread 擴充套件模組直接實現。

一個原始鎖有兩種狀態:“鎖定”或“未鎖定”。它在未鎖定狀態下建立。它有兩個基本方法:acquire()release()。當狀態為未鎖定時,acquire() 會將狀態更改為鎖定並立即返回。當狀態為鎖定時,acquire() 會阻塞,直到另一個執行緒呼叫 release() 將其更改為未鎖定,然後 acquire() 呼叫將其重置為鎖定並返回。release() 方法應僅在鎖定狀態下呼叫;它將狀態更改為未鎖定並立即返回。如果嘗試釋放未鎖定的鎖,將引發 RuntimeError

鎖還支援上下文管理協議

當多個執行緒在 acquire() 中阻塞,等待狀態變為未鎖定時,當 release() 呼叫將狀態重置為未鎖定時,只有一個執行緒會繼續;具體是哪個等待執行緒會繼續,這是未定義的,並且可能因實現而異。

所有方法都原子執行。

class threading.Lock

實現原始鎖物件的類。一旦一個執行緒獲取了鎖,後續嘗試獲取它都會阻塞,直到它被釋放;任何執行緒都可以釋放它。

版本 3.13 中的變化: Lock 現在是一個類。在早期的 Python 中,Lock 是一個工廠函式,返回底層私有鎖型別的一個例項。

acquire(blocking=True, timeout=-1)

獲取一個鎖,阻塞或非阻塞。

當以 blocking 引數設定為 True(預設值)呼叫時,阻塞直到鎖被解鎖,然後將其設定為鎖定並返回 True

當以 blocking 引數設定為 False 呼叫時,不阻塞。如果以 blocking 設定為 True 的呼叫會阻塞,則立即返回 False;否則,將鎖設定為鎖定並返回 True

當以浮點數 timeout 引數設定為正值呼叫時,最多阻塞 timeout 指定的秒數(或其分數),只要鎖無法獲取。timeout 引數為 -1 表示無限等待。當 blockingFalse 時,禁止指定 timeout

如果鎖成功獲取,則返回值為 True,否則返回 False(例如,如果 timeout 過期)。

版本 3.2 中的變化: timeout 引數是新的。

版本 3.2 中的變化: 如果底層執行緒實現支援,鎖獲取現在可以被 POSIX 上的訊號中斷。

版本 3.14 中的變化: 鎖獲取現在可以被 Windows 上的訊號中斷。

release()

釋放一個鎖。這可以從任何執行緒呼叫,而不僅僅是獲取鎖的執行緒。

當鎖被鎖定時,將其重置為未鎖定,並返回。如果有其他執行緒因等待鎖變為未鎖定而被阻塞,則允許其中恰好一個執行緒繼續執行。

當在未鎖定的鎖上呼叫時,會引發 RuntimeError

沒有返回值。

locked()

如果鎖已獲取,則返回 True

RLock 物件

可重入鎖是一種同步原語,可以被同一執行緒多次獲取。在內部,它除了使用原始鎖的鎖定/未鎖定狀態外,還使用“擁有執行緒”和“遞迴級別”的概念。在鎖定狀態下,某個執行緒擁有該鎖;在未鎖定狀態下,沒有執行緒擁有它。

執行緒呼叫鎖的 acquire() 方法來鎖定它,呼叫其 release() 方法來解鎖它。

備註

可重入鎖支援上下文管理協議,因此建議使用 with 而不是手動呼叫 acquire()release() 來處理程式碼塊的鎖獲取和釋放。

RLock 的 acquire()/release() 呼叫對可以巢狀,這與 Lock 的 acquire()/release() 不同。只有最後一個 release()(最外層對的 release())才會將鎖重置為未鎖定狀態,並允許另一個在 acquire() 中阻塞的執行緒繼續執行。

acquire()/release() 必須成對使用:每個 acquire 必須在獲取鎖的執行緒中有一個 release。如果未能按照獲取鎖的次數呼叫 release,可能會導致死鎖。

class threading.RLock

此類實現可重入鎖物件。可重入鎖必須由獲取它的執行緒釋放。一旦一個執行緒獲取了可重入鎖,同一個執行緒可以再次獲取它而不會阻塞;執行緒必須每次獲取後釋放一次。

請注意,RLock 實際上是一個工廠函式,它返回平臺支援的最有效版本的具體 RLock 類例項。

acquire(blocking=True, timeout=-1)

獲取一個鎖,阻塞或非阻塞。

參見

將 RLock 用作上下文管理器

在實際情況下,建議使用此方法,而不是手動呼叫 acquire()release()

當以 blocking 引數設定為 True(預設值)呼叫時

  • 如果沒有執行緒擁有鎖,則獲取鎖並立即返回。

  • 如果另一個執行緒擁有鎖,則阻塞直到我們能夠獲取鎖,或者如果 timeout 設定為正浮點值,則阻塞直到超時。

  • 如果同一個執行緒擁有鎖,則再次獲取鎖,並立即返回。這是 LockRLock 之間的區別;Lock 處理此情況與前一種情況相同,阻塞直到可以獲取鎖。

當以 blocking 引數設定為 False 呼叫時

  • 如果沒有執行緒擁有鎖,則獲取鎖並立即返回。

  • 如果另一個執行緒擁有鎖,則立即返回。

  • 如果同一個執行緒擁有鎖,則再次獲取鎖並立即返回。

在所有情況下,如果執行緒能夠獲取鎖,則返回 True。如果執行緒無法獲取鎖(即如果未阻塞或達到超時),則返回 False

如果多次呼叫,未能多次呼叫 release() 可能會導致死鎖。考慮將 RLock 用作上下文管理器,而不是直接呼叫 acquire/release。

版本 3.2 中的變化: timeout 引數是新的。

release()

釋放鎖,遞減遞迴級別。如果遞減後遞迴級別為零,則將鎖重置為未鎖定(不被任何執行緒擁有),並且如果有其他執行緒在等待鎖變為未鎖定而被阻塞,則允許其中恰好一個執行緒繼續執行。如果遞減後遞迴級別仍不為零,則鎖保持鎖定狀態並由呼叫執行緒擁有。

僅當呼叫執行緒擁有鎖時才呼叫此方法。如果在未獲取鎖時呼叫此方法,則會引發 RuntimeError

沒有返回值。

locked()

返回一個布林值,指示此物件當前是否被鎖定。

在 3.14 版本加入。

條件物件

條件變數總是與某種鎖相關聯;這可以傳入,也可以預設建立一個。當多個條件變數必須共享同一個鎖時,傳入一個鎖很有用。鎖是條件物件的一部分:您不必單獨跟蹤它。

條件變數遵循上下文管理協議:使用 with 語句會在封閉塊的持續時間內獲取關聯的鎖。acquire()release() 方法也呼叫關聯鎖的相應方法。

必須在持有相關聯的鎖時呼叫其他方法。wait() 方法釋放鎖,然後阻塞,直到另一個執行緒透過呼叫 notify()notify_all() 喚醒它。一旦被喚醒,wait() 重新獲取鎖並返回。也可以指定超時。

notify() 方法喚醒一個正在等待條件變數的執行緒(如果存在)。notify_all() 方法喚醒所有正在等待條件變數的執行緒。

注意:notify()notify_all() 方法不釋放鎖;這意味著被喚醒的執行緒不會立即從它們的 wait() 呼叫返回,而是在呼叫 notify()notify_all() 的執行緒最終放棄鎖的所有權時才會返回。

使用條件變數的典型程式設計風格使用鎖來同步對某些共享狀態的訪問;對特定狀態變化感興趣的執行緒反覆呼叫 wait() 直到它們看到所需狀態,而修改狀態的執行緒在以可能成為某個等待者的所需狀態的方式更改狀態時呼叫 notify()notify_all()。例如,以下程式碼是具有無限緩衝區容量的通用生產者-消費者情況

# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

檢查應用程式條件的 while 迴圈是必要的,因為 wait() 可能會在任意長時間後返回,並且促使 notify() 呼叫的條件可能不再成立。這是多執行緒程式設計固有的。可以使用 wait_for() 方法來自動化條件檢查,並簡化超時計算

# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

要選擇 notify()notify_all(),請考慮一個狀態變化是否只對一個或幾個等待執行緒感興趣。例如,在典型的生產者-消費者場景中,向緩衝區新增一個專案只需要喚醒一個消費者執行緒。

class threading.Condition(lock=None)

此類實現條件變數物件。條件變數允許一個或多個執行緒等待,直到它們被另一個執行緒通知。

如果給定了 lock 引數且不為 None,則它必須是一個 LockRLock 物件,並將其用作底層鎖。否則,將建立一個新的 RLock 物件並將其用作底層鎖。

版本 3.3 中的變化: 從工廠函式更改為類。

acquire(*args)

獲取底層鎖。此方法呼叫底層鎖上的相應方法;返回值是該方法返回的任何內容。

release()

釋放底層鎖。此方法呼叫底層鎖上的相應方法;沒有返回值。

locked()

返回一個布林值,指示此物件當前是否被鎖定。

在 3.14 版本加入。

wait(timeout=None)

等待直到被通知或直到發生超時。如果呼叫此方法時呼叫執行緒尚未獲取鎖,則會引發 RuntimeError

此方法釋放底層鎖,然後阻塞,直到它被另一個執行緒中對同一條件變數的 notify()notify_all() 呼叫喚醒,或者直到可選的超時發生。一旦被喚醒或超時,它會重新獲取鎖並返回。

timeout 引數存在且不為 None 時,它應該是一個浮點數,指定操作的超時時間(以秒或其分數計)。

當底層鎖是 RLock 時,它不會透過其 release() 方法釋放,因為當它被遞迴多次獲取時,這可能實際上不會解鎖。相反,使用的是 RLock 類的一個內部介面,該介面即使在它被遞迴獲取多次後也能真正解鎖。然後使用另一個內部介面在重新獲取鎖時恢復遞迴級別。

返回值是 True,除非給定的 timeout 超時,在這種情況下,返回值是 False

3.2 版本中已更改: 以前,此方法總是返回 None

wait_for(predicate, timeout=None)

等待直到條件評估為真。 predicate 應該是一個可呼叫物件,其結果將被解釋為布林值。可以提供一個 timeout 值來指定最大等待時間。

此工具方法可以重複呼叫 wait() 直到謂詞滿足,或者直到超時發生。返回值是謂詞的最後一個返回值,如果方法超時,則將評估為 False

忽略超時特性,呼叫此方法大致等同於編寫

while not predicate():
    cv.wait()

因此,與 wait() 相同規則適用:呼叫時必須持有鎖,並在返回時重新獲取鎖。在持有鎖的情況下評估謂詞。

在 3.2 版本加入。

notify(n=1)

預設情況下,喚醒等待此條件的一個執行緒(如果有)。如果呼叫執行緒在此方法被呼叫時未獲取鎖,則會引發 RuntimeError

此方法最多喚醒 n 個等待條件變數的執行緒;如果沒有執行緒等待,則此方法不做任何操作。

當前實現會精確喚醒 n 個執行緒,如果至少有 n 個執行緒在等待。但是,不應依賴此行為。未來經過最佳化的實現可能會偶爾喚醒超過 n 個執行緒。

注意:被喚醒的執行緒實際上不會從其 wait() 呼叫返回,直到它能夠重新獲取鎖。由於 notify() 不會釋放鎖,因此其呼叫者應該釋放鎖。

notify_all()

喚醒所有等待此條件的執行緒。此方法的行為類似於 notify(),但會喚醒所有等待執行緒而不是一個。如果呼叫執行緒在此方法被呼叫時未獲取鎖,則會引發 RuntimeError

方法 notifyAll 是此方法的已棄用別名。

訊號量物件

這是計算機科學史上最古老的同步原語之一,由早期的荷蘭計算機科學家 Edsger W. Dijkstra 發明(他使用名稱 P()V() 而不是 acquire()release())。

訊號量管理一個內部計數器,每次呼叫 acquire() 都會使計數器減一,每次呼叫 release() 都會使計數器增一。計數器永遠不會低於零;當 acquire() 發現計數器為零時,它會阻塞,等待直到其他執行緒呼叫 release()

訊號量也支援 上下文管理協議

class threading.Semaphore(value=1)

此類的實現是一個訊號量物件。訊號量管理一個原子計數器,該計數器表示 release() 呼叫次數減去 acquire() 呼叫次數,再加上一個初始值。如有必要,acquire() 方法會阻塞,直到它能夠在不使計數器變為負數的情況下返回。如果未給出,則 value 預設為 1。

可選引數為內部計數器提供初始 value;它預設為 1。如果給定的 value 小於 0,則會引發 ValueError

版本 3.3 中的變化: 從工廠函式更改為類。

acquire(blocking=True, timeout=None)

獲取一個訊號量。

在不帶引數呼叫時

  • 如果進入時內部計數器大於零,則將其減一併立即返回 True

  • 如果進入時內部計數器為零,則阻塞直到被 release() 呼叫喚醒。一旦被喚醒(並且計數器大於0),則將計數器減1並返回 True。每次呼叫 release() 都將精確喚醒一個執行緒。喚醒執行緒的順序不應被依賴。

當呼叫時 blocking 設定為 False,不阻塞。如果無引數呼叫會阻塞,則立即返回 False;否則,執行與無引數呼叫相同的操作,並返回 True

當呼叫時 timeout 不為 None 時,它將阻塞最多 timeout 秒。如果在此間隔內 acquire 未成功完成,則返回 False。否則返回 True

版本 3.2 中的變化: timeout 引數是新的。

release(n=1)

釋放一個訊號量,將內部計數器增加 n。如果進入時計數器為零,並且有其他執行緒正在等待它再次大於零,則喚醒其中 n 個執行緒。

3.9 版本中已更改: 添加了 n 引數以同時釋放多個等待執行緒。

class threading.BoundedSemaphore(value=1)

實現有界訊號量物件的類。有界訊號量會檢查其當前值是否超過其初始值。如果超過,則會引發 ValueError。在大多數情況下,訊號量用於保護容量有限的資源。如果訊號量被釋放的次數過多,則表明存在錯誤。如果未給出,則 value 預設為 1。

版本 3.3 中的變化: 從工廠函式更改為類。

Semaphore 示例

訊號量通常用於保護容量有限的資源,例如資料庫伺服器。在資源大小固定H的任何情況下,都應使用有界訊號量。在生成任何工作執行緒之前,您的主執行緒將初始化訊號量

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

一旦生成,工作執行緒在需要連線到伺服器時會呼叫訊號量的 acquire 和 release 方法

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

有界訊號量的使用減少了因程式錯誤導致訊號量釋放次數多於獲取次數而未被檢測到的可能性。

事件物件

這是執行緒間通訊最簡單的機制之一:一個執行緒發出事件訊號,其他執行緒等待該事件。

事件物件管理一個內部標誌,可以使用 set() 方法將其設定為 True,並使用 clear() 方法將其重置為 False。 wait() 方法會阻塞直到該標誌為 True。

class threading.Event

實現事件物件的類。事件管理一個標誌,該標誌可以使用 set() 方法設定為真,並使用 clear() 方法重置為假。 wait() 方法會阻塞直到該標誌為真。該標誌最初為假。

版本 3.3 中的變化: 從工廠函式更改為類。

is_set()

當且僅當內部標誌為真時返回 True

方法 isSet 是此方法的已棄用別名。

set()

將內部標誌設定為 true。所有等待它變為 true 的執行緒都被喚醒。一旦標誌為 true,呼叫 wait() 的執行緒將根本不會阻塞。

clear()

將內部標誌重置為 false。之後,呼叫 wait() 的執行緒將阻塞,直到呼叫 set() 將內部標誌再次設定為 true。

wait(timeout=None)

只要內部標誌為假,且超時(如果給定)未過期,就阻塞。返回值表示此阻塞方法返回的原因;如果因內部標誌設定為真而返回,則為 True,或者如果給定超時且內部標誌在給定等待時間內未變為真,則為 False

當存在 timeout 引數且不為 None 時,它應該是一個浮點數,指定操作的超時時間(以秒為單位或其分數)。

3.1 版本中已更改: 以前,此方法總是返回 None

計時器物件

此類表示一個計時器,即只有在經過一定時間後才應執行的操作。 TimerThread 的子類,因此也可以作為建立自定義執行緒的示例。

計時器像執行緒一樣,透過呼叫它們的 Timer.start 方法啟動。可以透過呼叫 cancel() 方法來停止計時器(在其操作開始之前)。計時器在執行其操作之前等待的間隔可能與使用者指定的間隔不完全相同。

例如:

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed
class threading.Timer(interval, function, args=None, kwargs=None)

建立一個計時器,它將在 interval 秒過後,使用引數 args 和關鍵字引數 kwargs 來執行 function。如果 argsNone(預設值),則將使用空列表。如果 kwargsNone(預設值),則將使用空字典。

版本 3.3 中的變化: 從工廠函式更改為類。

cancel()

停止計時器,並取消計時器操作的執行。這僅在計時器仍在等待階段時有效。

屏障物件

在 3.2 版本加入。

此類提供了一個簡單的同步原語,供固定數量的執行緒使用,這些執行緒需要彼此等待。每個執行緒都透過呼叫 wait() 方法來嘗試透過屏障,並將阻塞直到所有執行緒都呼叫了 wait()。此時,這些執行緒同時釋放。

該屏障可以為相同數量的執行緒重複使用任意次數。

例如,這是一種同步客戶端和伺服器執行緒的簡單方法

b = Barrier(2, timeout=5)

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)
class threading.Barrier(parties, action=None, timeout=None)

parties 個執行緒建立一個屏障物件。如果提供了 action,則它是一個可呼叫物件,由其中一個執行緒在它們被釋放時呼叫。 timeoutwait() 方法未指定時的預設超時值。

wait(timeout=None)

透過屏障。當所有參與屏障的執行緒都呼叫此函式時,它們將同時釋放。如果提供了 timeout,則優先使用它而不是提供給類建構函式的任何超時。

返回值是一個 0 到 parties – 1 範圍內的整數,每個執行緒不同。這可用於選擇一個執行緒來做一些特殊的內務處理,例如

i = barrier.wait()
if i == 0:
    # Only one thread needs to print this
    print("passed the barrier")

如果建構函式中提供了 action,其中一個執行緒會在被釋放之前呼叫它。如果此呼叫引發錯誤,則屏障將進入 broken 狀態。

如果呼叫超時,屏障將進入 broken 狀態。

如果屏障線上程等待時被重置或損壞,此方法可能會引發 BrokenBarrierError 異常。

reset()

將屏障恢復到預設的空狀態。任何等待它的執行緒都將收到 BrokenBarrierError 異常。

請注意,如果存在狀態未知的其他執行緒,使用此函式可能需要一些外部同步。如果屏障損壞,最好直接放棄並建立一個新的。

abort()

將屏障置於 broken 狀態。這將導致任何正在進行的或未來的 wait() 呼叫因 BrokenBarrierError 而失敗。例如,如果其中一個執行緒需要中止,則使用此方法以避免應用程式死鎖。

最好直接為屏障建立一個合理的 timeout 值,以自動防止其中一個執行緒出錯。

parties

透過屏障所需的執行緒數。

n_waiting

當前在屏障中等待的執行緒數。

broken

一個布林值,如果屏障處於 broken 狀態,則為 True

exception threading.BrokenBarrierError

Barrier 物件被重置或損壞時,會引發此異常,它是 RuntimeError 的子類。

with 語句中使用鎖、條件和訊號量

此模組提供的所有具有 acquirerelease 方法的物件都可以用作 with 語句的上下文管理器。 acquire 方法將在進入塊時呼叫, release 將在退出塊時呼叫。因此,以下程式碼片段

with some_lock:
    # do something...

等價於

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

目前,LockRLockConditionSemaphoreBoundedSemaphore 物件可以用作 with 語句上下文管理器。