concurrent.futures
— 啟動並行任務¶
3.2 版本新增。
原始碼: Lib/concurrent/futures/thread.py 和 Lib/concurrent/futures/process.py
concurrent.futures
模組為非同步執行可呼叫物件提供了高階介面。
非同步執行可以使用執行緒(使用 ThreadPoolExecutor
)或單獨的程序(使用 ProcessPoolExecutor
)來執行。兩者都實現了相同的介面,該介面由抽象的 Executor
類定義。
可用性:不適用於 WASI。
此模組在 WebAssembly 上不起作用或不可用。有關更多資訊,請參閱 WebAssembly 平臺。
執行器物件¶
- class concurrent.futures.Executor¶
一個抽象類,提供非同步執行呼叫的方法。它不應直接使用,而是透過其具體的子類使用。
- submit(fn, /, *args, **kwargs)¶
安排可呼叫物件 fn 作為
fn(*args, **kwargs)
執行,並返回一個Future
物件,表示可呼叫物件的執行。with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
類似於
map(fn, *iterables)
,區別在於iterables 會立即收集,而不是惰性地收集;
fn 非同步執行,並且可以同時進行多次對 fn 的呼叫。
如果
__next__()
被呼叫且在從原始呼叫Executor.map()
後 timeout 秒後結果不可用,則返回的迭代器會引發TimeoutError
。timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間沒有限制。如果 fn 呼叫引發異常,則當從迭代器檢索其值時,將引發該異常。
當使用
ProcessPoolExecutor
時,此方法會將 iterables 切分成多個塊,並將其作為單獨的任務提交到池中。可以透過將 chunksize 設定為正整數來指定這些塊的(近似)大小。對於非常長的可迭代物件,與預設大小 1 相比,使用較大的 chunksize 值可以顯著提高效能。對於ThreadPoolExecutor
,chunksize 沒有影響。在 3.5 版本中做了更改: 添加了 chunksize 引數。
- shutdown(wait=True, *, cancel_futures=False)¶
向執行器發出訊號,表示噹噹前掛起的 future 執行完成後,它應釋放其正在使用的任何資源。在關閉後呼叫
Executor.submit()
和Executor.map()
將引發RuntimeError
。如果 wait 為
True
,則此方法將不會返回,直到所有掛起的 future 執行完成並且與執行器關聯的資源已被釋放。如果 wait 為False
,則此方法將立即返回,並且當所有掛起的 future 執行完成時,將釋放與執行器關聯的資源。無論 wait 的值如何,整個 Python 程式都不會退出,直到所有掛起的 future 執行完成。如果 cancel_futures 為
True
,則此方法將取消執行器尚未開始執行的所有掛起的 future。無論 cancel_futures 的值如何,任何已完成或正在執行的 future 都不會被取消。如果 cancel_futures 和 wait 均為
True
,則執行器已開始執行的所有 future 將在此方法返回之前完成。剩餘的 future 將被取消。如果您使用
with
語句,則可以避免必須顯式呼叫此方法,該語句將關閉Executor
(等待,就好像Executor.shutdown()
的 wait 設定為True
一樣)import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
在 3.9 版本中做了更改: 添加了 cancel_futures。
ThreadPoolExecutor¶
ThreadPoolExecutor
是一個 Executor
子類,它使用執行緒池來非同步執行呼叫。
當與 Future
關聯的可呼叫物件等待另一個 Future
的結果時,可能會發生死鎖。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
並且
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
一個
Executor
子類,它使用最多 max_workers 個執行緒的池來非同步執行呼叫。在直譯器退出之前,將加入所有排隊到
ThreadPoolExecutor
的執行緒。請注意,執行此操作的退出處理程式在任何使用atexit
新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的異常,以便通知執行緒正常退出。因此,建議不要將ThreadPoolExecutor
用於長時間執行的任務。initializer 是一個可選的可呼叫物件,在每個工作執行緒啟動時呼叫;initargs 是傳遞給初始化器的引數元組。如果 initializer 引發異常,則所有當前掛起的作業以及任何向池提交更多作業的嘗試都將引發
BrokenThreadPool
。在 3.5 版本中更改: 如果 max_workers 為
None
或未給出,則它將預設為機器上的處理器數量乘以5
,假設ThreadPoolExecutor
通常用於重疊 I/O 而不是 CPU 工作,並且工作執行緒的數量應該高於ProcessPoolExecutor
的工作執行緒數量。在 3.6 版本中更改: 添加了 thread_name_prefix 引數,以允許使用者控制池建立的工作執行緒的
threading.Thread
名稱,以便更輕鬆地進行除錯。在 3.7 版本中更改: 添加了 initializer 和 initargs 引數。
在 3.8 版本中更改: max_workers 的預設值更改為
min(32, os.cpu_count() + 4)
。此預設值保留至少 5 個用於 I/O 繫結任務的工作執行緒。它最多利用 32 個 CPU 核心來執行釋放 GIL 的 CPU 繫結任務。並且它避免在多核機器上隱式使用非常大的資源。ThreadPoolExecutor 現在也會重用空閒工作執行緒,然後再啟動 max_workers 個工作執行緒。
在 3.13 版本中更改: max_workers 的預設值更改為
min(32, (os.process_cpu_count() or 1) + 4)
。
ThreadPoolExecutor 示例¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistent-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
ProcessPoolExecutor
類是一個 Executor
子類,它使用程序池來非同步執行呼叫。ProcessPoolExecutor
使用 multiprocessing
模組,這允許它繞過 全域性直譯器鎖,但也意味著只有可pickle的物件才能被執行和返回。
必須使工作子程序可匯入 __main__
模組。這意味著 ProcessPoolExecutor
在互動式直譯器中不起作用。
從提交給 ProcessPoolExecutor
的可呼叫物件呼叫 Executor
或 Future
方法將導致死鎖。
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
一個
Executor
子類,它使用最多 max_workers 個程序的池來非同步執行呼叫。如果 max_workers 為None
或未給出,則它將預設為os.process_cpu_count()
。如果 max_workers 小於或等於0
,則會引發ValueError
。在 Windows 上,max_workers 必須小於或等於61
。如果不是,則會引發ValueError
。如果 max_workers 為None
,則選擇的預設值將最多為61
,即使有更多處理器可用。mp_context 可以是一個multiprocessing
上下文或None
。它將用於啟動工作程序。如果 mp_context 為None
或未給出,則使用預設的multiprocessing
上下文。請參閱 上下文和啟動方法。initializer 是一個可選的可呼叫物件,在每個工作程序啟動時呼叫;initargs 是傳遞給初始化器的引數元組。如果 initializer 引發異常,則所有當前掛起的作業以及任何向池提交更多作業的嘗試都將引發
BrokenProcessPool
。max_tasks_per_child 是一個可選引數,用於指定單個程序在退出並被新的工作程序替換之前可以執行的最大任務數。預設情況下,max_tasks_per_child 為
None
,這意味著工作程序將與池的生命週期一樣長。當指定最大值時,在缺少 mp_context 引數的情況下,預設將使用“spawn”多程序啟動方法。此功能與“fork”啟動方法不相容。在 3.3 版本中更改: 當其中一個工作程序突然終止時,現在會引發
BrokenProcessPool
錯誤。以前,行為未定義,但對執行器或其 future 的操作通常會凍結或死鎖。在 3.7 版本中更改: 添加了 mp_context 引數,以允許使用者控制池建立的工作程序的 start_method。
添加了 initializer 和 initargs 引數。
注意
預設的
multiprocessing
啟動方法(請參閱 上下文和啟動方法)將在 Python 3.14 中更改為不使用 fork。 需要使用 fork 來執行其ProcessPoolExecutor
的程式碼應透過傳遞mp_context=multiprocessing.get_context("fork")
引數來顯式指定。在 3.11 版本中更改: 添加了 max_tasks_per_child 引數,以允許使用者控制池中工作程序的生命週期。
3.12 版本更改: 在 POSIX 系統上,如果您的應用程式有多個執行緒,並且
multiprocessing
上下文使用"fork"
啟動方法:內部呼叫來生成工作程序的os.fork()
函式可能會引發DeprecationWarning
。傳遞配置為使用不同啟動方法的 *mp_context*。有關更多說明,請參閱os.fork()
文件。3.13 版本更改: max_workers 預設使用
os.process_cpu_count()
,而不是os.cpu_count()
。
ProcessPoolExecutor 示例¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Future 物件¶
Future
類封裝了可呼叫物件的非同步執行。Future
例項由 Executor.submit()
建立。
- class concurrent.futures.Future¶
封裝了可呼叫物件的非同步執行。
Future
例項由Executor.submit()
建立,除非用於測試,否則不應直接建立。- cancel()¶
嘗試取消呼叫。如果當前正在執行呼叫或已完成執行且無法取消,則該方法將返回
False
,否則將取消呼叫,並且該方法將返回True
。
- cancelled()¶
如果成功取消呼叫,則返回
True
。
- running()¶
如果當前正在執行呼叫且無法取消,則返回
True
。
- done()¶
如果成功取消呼叫或已完成執行,則返回
True
。
- result(timeout=None)¶
返回呼叫返回的值。 如果呼叫尚未完成,則此方法將等待最多 *timeout* 秒。 如果呼叫在 *timeout* 秒內未完成,則會引發
TimeoutError
。 *timeout* 可以是 int 或 float。 如果未指定 *timeout* 或為None
,則等待時間沒有限制。如果 future 在完成之前被取消,則會引發
CancelledError
。如果呼叫引發異常,則此方法將引發相同的異常。
- exception(timeout=None)¶
返回呼叫引發的異常。 如果呼叫尚未完成,則此方法將等待最多 *timeout* 秒。 如果呼叫在 *timeout* 秒內未完成,則會引發
TimeoutError
。 *timeout* 可以是 int 或 float。 如果未指定 *timeout* 或為None
,則等待時間沒有限制。如果 future 在完成之前被取消,則會引發
CancelledError
。如果呼叫完成時沒有引發異常,則返回
None
。
- add_done_callback(fn)¶
將可呼叫物件 *fn* 附加到 future。當 future 被取消或完成執行時,將呼叫 *fn*,並將 future 作為其唯一引數。
新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的程序的執行緒中呼叫。 如果可呼叫物件引發
Exception
子類,則會記錄並忽略它。 如果可呼叫物件引發BaseException
子類,則行為未定義。如果 future 已完成或已取消,則會立即呼叫 *fn*。
以下
Future
方法旨在用於單元測試和Executor
實現。- set_running_or_notify_cancel()¶
僅應由
Executor
實現,在執行與Future
關聯的工作之前以及單元測試呼叫此方法。如果該方法返回
False
,則Future
已被取消,即呼叫了Future.cancel()
並返回了True
。任何等待Future
完成(即透過as_completed()
或wait()
)的執行緒都將被喚醒。如果該方法返回
True
,則Future
未被取消,並且已設定為執行狀態,即呼叫Future.running()
將返回True
。此方法只能呼叫一次,並且不能在呼叫
Future.set_result()
或Future.set_exception()
之後呼叫。
- set_result(result)¶
將與
Future
關聯的工作的結果設定為 *result*。此方法僅應由
Executor
實現和單元測試使用。3.8 版本更改: 如果
Future
已完成,則此方法引發concurrent.futures.InvalidStateError
。
模組函式¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
等待由 fs 給出的
Future
例項(可能是由不同的Executor
例項建立)完成。傳遞給 fs 的重複 futures 將被刪除,並且只會返回一次。返回一個包含兩個集合的命名元組。第一個集合名為done
,包含在等待完成之前完成(已完成或已取消的 futures)的 futures。第二個集合名為not_done
,包含未完成的 futures(掛起或正在執行的 futures)。timeout 可用於控制返回前等待的最大秒數。timeout 可以是 int 或 float。如果未指定 timeout 或為
None
,則等待時間沒有限制。return_when 指示此函式應何時返回。它必須是以下常量之一
常量
描述
- concurrent.futures.FIRST_COMPLETED¶
當任何 future 完成或被取消時,該函式將返回。
- concurrent.futures.FIRST_EXCEPTION¶
當任何 future 透過引發異常完成時,該函式將返回。如果沒有 future 引發異常,則它等效於
ALL_COMPLETED
。- concurrent.futures.ALL_COMPLETED¶
當所有 futures 完成或被取消時,該函式將返回。
- concurrent.futures.as_completed(fs, timeout=None)¶
返回一個迭代器,該迭代器遍歷由 fs 給出的
Future
例項(可能是由不同的Executor
例項建立),當這些 futures 完成時(已完成或已取消的 futures)產生 futures。fs 給出的任何重複的 futures 將返回一次。在呼叫as_completed()
之前完成的任何 futures 將首先產生。如果呼叫__next__()
並且從最初呼叫as_completed()
後經過 timeout 秒結果仍然不可用,則返回的迭代器會引發TimeoutError
。timeout 可以是 int 或 float。如果未指定 timeout 或為None
,則等待時間沒有限制。
參見
- PEP 3148 – futures - 非同步執行計算
該提案描述了將此功能包含在 Python 標準庫中的情況。
異常類¶
- exception concurrent.futures.CancelledError¶
當 future 被取消時引發。
- exception concurrent.futures.TimeoutError¶
TimeoutError
的已棄用別名,當 future 操作超出給定的超時時間時引發。在 3.11 版本中更改:此類被設為
TimeoutError
的別名。
- exception concurrent.futures.BrokenExecutor¶
從此類派生自
RuntimeError
,當執行器因某種原因損壞,無法用於提交或執行新任務時,會引發此異常類。3.7 版本中新增。
- exception concurrent.futures.InvalidStateError¶
當在 future 上執行在當前狀態下不允許的操作時引發。
3.8 版本中新增。
- exception concurrent.futures.thread.BrokenThreadPool¶
從此類派生自
BrokenExecutor
,當ThreadPoolExecutor
的一個工作執行緒初始化失敗時,會引發此異常類。3.7 版本中新增。
- exception concurrent.futures.process.BrokenProcessPool¶
從此類派生自
BrokenExecutor
(以前是RuntimeError
),當ProcessPoolExecutor
的一個工作執行緒以非乾淨的方式終止時(例如,如果它是從外部殺死的),會引發此異常類。3.3 版本中新增。