concurrent.futures — 啟動並行任務

3.2 版本新增。

原始碼: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模組為非同步執行可呼叫物件提供了高階介面。

非同步執行可以使用執行緒(使用 ThreadPoolExecutor)或單獨的程序(使用 ProcessPoolExecutor)來執行。兩者都實現了相同的介面,該介面由抽象的 Executor 類定義。

可用性:不適用於 WASI。

此模組在 WebAssembly 上不起作用或不可用。有關更多資訊,請參閱 WebAssembly 平臺

執行器物件

class concurrent.futures.Executor

一個抽象類,提供非同步執行呼叫的方法。它不應直接使用,而是透過其具體的子類使用。

submit(fn, /, *args, **kwargs)

安排可呼叫物件 fn 作為 fn(*args, **kwargs) 執行,並返回一個 Future 物件,表示可呼叫物件的執行。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1)

類似於 map(fn, *iterables),區別在於

  • iterables 會立即收集,而不是惰性地收集;

  • fn 非同步執行,並且可以同時進行多次對 fn 的呼叫。

如果 __next__() 被呼叫且在從原始呼叫 Executor.map()timeout 秒後結果不可用,則返回的迭代器會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間沒有限制。

如果 fn 呼叫引發異常,則當從迭代器檢索其值時,將引發該異常。

當使用 ProcessPoolExecutor 時,此方法會將 iterables 切分成多個塊,並將其作為單獨的任務提交到池中。可以透過將 chunksize 設定為正整數來指定這些塊的(近似)大小。對於非常長的可迭代物件,與預設大小 1 相比,使用較大的 chunksize 值可以顯著提高效能。對於 ThreadPoolExecutorchunksize 沒有影響。

在 3.5 版本中做了更改: 添加了 chunksize 引數。

shutdown(wait=True, *, cancel_futures=False)

向執行器發出訊號,表示噹噹前掛起的 future 執行完成後,它應釋放其正在使用的任何資源。在關閉後呼叫 Executor.submit()Executor.map() 將引發 RuntimeError

如果 waitTrue,則此方法將不會返回,直到所有掛起的 future 執行完成並且與執行器關聯的資源已被釋放。如果 waitFalse,則此方法將立即返回,並且當所有掛起的 future 執行完成時,將釋放與執行器關聯的資源。無論 wait 的值如何,整個 Python 程式都不會退出,直到所有掛起的 future 執行完成。

如果 cancel_futuresTrue,則此方法將取消執行器尚未開始執行的所有掛起的 future。無論 cancel_futures 的值如何,任何已完成或正在執行的 future 都不會被取消。

如果 cancel_futureswait 均為 True,則執行器已開始執行的所有 future 將在此方法返回之前完成。剩餘的 future 將被取消。

如果您使用 with 語句,則可以避免必須顯式呼叫此方法,該語句將關閉 Executor(等待,就好像 Executor.shutdown()wait 設定為 True 一樣)

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

在 3.9 版本中做了更改: 添加了 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutor 是一個 Executor 子類,它使用執行緒池來非同步執行呼叫。

當與 Future 關聯的可呼叫物件等待另一個 Future 的結果時,可能會發生死鎖。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

並且

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一個 Executor 子類,它使用最多 max_workers 個執行緒的池來非同步執行呼叫。

在直譯器退出之前,將加入所有排隊到 ThreadPoolExecutor 的執行緒。請注意,執行此操作的退出處理程式在任何使用 atexit 新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的異常,以便通知執行緒正常退出。因此,建議不要將 ThreadPoolExecutor 用於長時間執行的任務。

initializer 是一個可選的可呼叫物件,在每個工作執行緒啟動時呼叫;initargs 是傳遞給初始化器的引數元組。如果 initializer 引發異常,則所有當前掛起的作業以及任何向池提交更多作業的嘗試都將引發 BrokenThreadPool

在 3.5 版本中更改: 如果 max_workersNone 或未給出,則它將預設為機器上的處理器數量乘以 5,假設 ThreadPoolExecutor 通常用於重疊 I/O 而不是 CPU 工作,並且工作執行緒的數量應該高於 ProcessPoolExecutor 的工作執行緒數量。

在 3.6 版本中更改: 添加了 thread_name_prefix 引數,以允許使用者控制池建立的工作執行緒的 threading.Thread 名稱,以便更輕鬆地進行除錯。

在 3.7 版本中更改: 添加了 initializerinitargs 引數。

在 3.8 版本中更改: max_workers 的預設值更改為 min(32, os.cpu_count() + 4)。此預設值保留至少 5 個用於 I/O 繫結任務的工作執行緒。它最多利用 32 個 CPU 核心來執行釋放 GIL 的 CPU 繫結任務。並且它避免在多核機器上隱式使用非常大的資源。

ThreadPoolExecutor 現在也會重用空閒工作執行緒,然後再啟動 max_workers 個工作執行緒。

在 3.13 版本中更改: max_workers 的預設值更改為 min(32, (os.process_cpu_count() or 1) + 4)

ThreadPoolExecutor 示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 類是一個 Executor 子類,它使用程序池來非同步執行呼叫。ProcessPoolExecutor 使用 multiprocessing 模組,這允許它繞過 全域性直譯器鎖,但也意味著只有可pickle的物件才能被執行和返回。

必須使工作子程序可匯入 __main__ 模組。這意味著 ProcessPoolExecutor 在互動式直譯器中不起作用。

從提交給 ProcessPoolExecutor 的可呼叫物件呼叫 ExecutorFuture 方法將導致死鎖。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

一個 Executor 子類,它使用最多 max_workers 個程序的池來非同步執行呼叫。如果 max_workersNone 或未給出,則它將預設為 os.process_cpu_count()。如果 max_workers 小於或等於 0,則會引發 ValueError。在 Windows 上,max_workers 必須小於或等於 61。如果不是,則會引發 ValueError。如果 max_workersNone,則選擇的預設值將最多為 61,即使有更多處理器可用。mp_context 可以是一個 multiprocessing 上下文或 None。它將用於啟動工作程序。如果 mp_contextNone 或未給出,則使用預設的 multiprocessing 上下文。請參閱 上下文和啟動方法

initializer 是一個可選的可呼叫物件,在每個工作程序啟動時呼叫;initargs 是傳遞給初始化器的引數元組。如果 initializer 引發異常,則所有當前掛起的作業以及任何向池提交更多作業的嘗試都將引發 BrokenProcessPool

max_tasks_per_child 是一個可選引數,用於指定單個程序在退出並被新的工作程序替換之前可以執行的最大任務數。預設情況下,max_tasks_per_childNone,這意味著工作程序將與池的生命週期一樣長。當指定最大值時,在缺少 mp_context 引數的情況下,預設將使用“spawn”多程序啟動方法。此功能與“fork”啟動方法不相容。

在 3.3 版本中更改: 當其中一個工作程序突然終止時,現在會引發 BrokenProcessPool 錯誤。以前,行為未定義,但對執行器或其 future 的操作通常會凍結或死鎖。

在 3.7 版本中更改: 添加了 mp_context 引數,以允許使用者控制池建立的工作程序的 start_method。

添加了 initializerinitargs 引數。

注意

預設的 multiprocessing 啟動方法(請參閱 上下文和啟動方法)將在 Python 3.14 中更改為不使用 fork。 需要使用 fork 來執行其 ProcessPoolExecutor 的程式碼應透過傳遞 mp_context=multiprocessing.get_context("fork") 引數來顯式指定。

在 3.11 版本中更改: 添加了 max_tasks_per_child 引數,以允許使用者控制池中工作程序的生命週期。

3.12 版本更改: 在 POSIX 系統上,如果您的應用程式有多個執行緒,並且 multiprocessing 上下文使用 "fork" 啟動方法:內部呼叫來生成工作程序的 os.fork() 函式可能會引發 DeprecationWarning。傳遞配置為使用不同啟動方法的 *mp_context*。有關更多說明,請參閱 os.fork() 文件。

3.13 版本更改: max_workers 預設使用 os.process_cpu_count(),而不是 os.cpu_count()

ProcessPoolExecutor 示例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 物件

Future 類封裝了可呼叫物件的非同步執行。Future 例項由 Executor.submit() 建立。

class concurrent.futures.Future

封裝了可呼叫物件的非同步執行。Future 例項由 Executor.submit() 建立,除非用於測試,否則不應直接建立。

cancel()

嘗試取消呼叫。如果當前正在執行呼叫或已完成執行且無法取消,則該方法將返回 False,否則將取消呼叫,並且該方法將返回 True

cancelled()

如果成功取消呼叫,則返回 True

running()

如果當前正在執行呼叫且無法取消,則返回 True

done()

如果成功取消呼叫或已完成執行,則返回 True

result(timeout=None)

返回呼叫返回的值。 如果呼叫尚未完成,則此方法將等待最多 *timeout* 秒。 如果呼叫在 *timeout* 秒內未完成,則會引發 TimeoutError。 *timeout* 可以是 int 或 float。 如果未指定 *timeout* 或為 None,則等待時間沒有限制。

如果 future 在完成之前被取消,則會引發 CancelledError

如果呼叫引發異常,則此方法將引發相同的異常。

exception(timeout=None)

返回呼叫引發的異常。 如果呼叫尚未完成,則此方法將等待最多 *timeout* 秒。 如果呼叫在 *timeout* 秒內未完成,則會引發 TimeoutError。 *timeout* 可以是 int 或 float。 如果未指定 *timeout* 或為 None,則等待時間沒有限制。

如果 future 在完成之前被取消,則會引發 CancelledError

如果呼叫完成時沒有引發異常,則返回 None

add_done_callback(fn)

將可呼叫物件 *fn* 附加到 future。當 future 被取消或完成執行時,將呼叫 *fn*,並將 future 作為其唯一引數。

新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的程序的執行緒中呼叫。 如果可呼叫物件引發 Exception 子類,則會記錄並忽略它。 如果可呼叫物件引發 BaseException 子類,則行為未定義。

如果 future 已完成或已取消,則會立即呼叫 *fn*。

以下 Future 方法旨在用於單元測試和 Executor 實現。

set_running_or_notify_cancel()

僅應由 Executor 實現,在執行與 Future 關聯的工作之前以及單元測試呼叫此方法。

如果該方法返回 False,則 Future 已被取消,即呼叫了 Future.cancel() 並返回了 True。任何等待 Future 完成(即透過 as_completed()wait())的執行緒都將被喚醒。

如果該方法返回 True,則 Future 未被取消,並且已設定為執行狀態,即呼叫 Future.running() 將返回 True

此方法只能呼叫一次,並且不能在呼叫 Future.set_result()Future.set_exception() 之後呼叫。

set_result(result)

將與 Future 關聯的工作的結果設定為 *result*。

此方法僅應由 Executor 實現和單元測試使用。

3.8 版本更改: 如果 Future 已完成,則此方法引發 concurrent.futures.InvalidStateError

set_exception(exception)

將與 Future 相關聯的工作結果設定為 Exception exception

此方法僅應由 Executor 實現和單元測試使用。

3.8 版本更改: 如果 Future 已完成,則此方法引發 concurrent.futures.InvalidStateError

模組函式

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 fs 給出的 Future 例項(可能是由不同的 Executor 例項建立)完成。傳遞給 fs 的重複 futures 將被刪除,並且只會返回一次。返回一個包含兩個集合的命名元組。第一個集合名為 done,包含在等待完成之前完成(已完成或已取消的 futures)的 futures。第二個集合名為 not_done,包含未完成的 futures(掛起或正在執行的 futures)。

timeout 可用於控制返回前等待的最大秒數。timeout 可以是 int 或 float。如果未指定 timeout 或為 None,則等待時間沒有限制。

return_when 指示此函式應何時返回。它必須是以下常量之一

常量

描述

concurrent.futures.FIRST_COMPLETED

當任何 future 完成或被取消時,該函式將返回。

concurrent.futures.FIRST_EXCEPTION

當任何 future 透過引發異常完成時,該函式將返回。如果沒有 future 引發異常,則它等效於 ALL_COMPLETED

concurrent.futures.ALL_COMPLETED

當所有 futures 完成或被取消時,該函式將返回。

concurrent.futures.as_completed(fs, timeout=None)

返回一個迭代器,該迭代器遍歷由 fs 給出的 Future 例項(可能是由不同的 Executor 例項建立),當這些 futures 完成時(已完成或已取消的 futures)產生 futures。fs 給出的任何重複的 futures 將返回一次。在呼叫 as_completed() 之前完成的任何 futures 將首先產生。如果呼叫 __next__() 並且從最初呼叫 as_completed() 後經過 timeout 秒結果仍然不可用,則返回的迭代器會引發 TimeoutErrortimeout 可以是 int 或 float。如果未指定 timeout 或為 None,則等待時間沒有限制。

參見

PEP 3148 – futures - 非同步執行計算

該提案描述了將此功能包含在 Python 標準庫中的情況。

異常類

exception concurrent.futures.CancelledError

當 future 被取消時引發。

exception concurrent.futures.TimeoutError

TimeoutError 的已棄用別名,當 future 操作超出給定的超時時間時引發。

在 3.11 版本中更改:此類被設為 TimeoutError 的別名。

exception concurrent.futures.BrokenExecutor

從此類派生自 RuntimeError,當執行器因某種原因損壞,無法用於提交或執行新任務時,會引發此異常類。

3.7 版本中新增。

exception concurrent.futures.InvalidStateError

當在 future 上執行在當前狀態下不允許的操作時引發。

3.8 版本中新增。

exception concurrent.futures.thread.BrokenThreadPool

從此類派生自 BrokenExecutor,當 ThreadPoolExecutor 的一個工作執行緒初始化失敗時,會引發此異常類。

3.7 版本中新增。

exception concurrent.futures.process.BrokenProcessPool

從此類派生自 BrokenExecutor (以前是 RuntimeError),當 ProcessPoolExecutor 的一個工作執行緒以非乾淨的方式終止時(例如,如果它是從外部殺死的),會引發此異常類。

3.3 版本中新增。