concurrent.futures --- 啟動並行任務

在 3.2 版本加入。

原始碼: Lib/concurrent/futures/thread.py, Lib/concurrent/futures/process.py, and Lib/concurrent/futures/interpreter.py


concurrent.futures 模組提供了一個高階介面,用於非同步執行可呼叫物件。

非同步執行可以透過執行緒池(使用 ThreadPoolExecutorInterpreterPoolExecutor)或獨立的程序池(使用 ProcessPoolExecutor)來執行。它們都實現了相同的介面,該介面由抽象類 Executor 定義。

可用性:非 WASI。

此模組在 WebAssembly 上不起作用或不可用。有關更多資訊,請參閱 WebAssembly 平臺

執行器物件

class concurrent.futures.Executor

一個提供非同步執行方法的抽象類。它不應該被直接使用,而應透過其具體子類使用。

submit(fn, /, *args, **kwargs)

安排可呼叫物件 fnfn(*args, **kwargs) 的方式執行,並返回一個表示該可呼叫物件執行情況的 Future 物件。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)

類似於 map(fn, *iterables),不同之處在於:

  • iterables 是立即收集的,而不是惰性收集的,除非指定了 buffersize 來限制尚未產出結果的已提交任務數量。如果緩衝區已滿,對 iterables 的迭代將暫停,直到從緩衝區中產出一個結果。

  • fn 是非同步執行的,對 fn 的多次呼叫可能會併發進行。

如果呼叫了 __next__(),並且在從最初呼叫 Executor.map() 算起的 timeout 秒後結果仍不可用,則返回的迭代器將引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則沒有等待時間限制。

如果一個 fn 呼叫引發了異常,那麼當從迭代器中獲取其值時,該異常將被引發。

當使用 ProcessPoolExecutor 時,此方法會將 iterables 切分成多個塊,並將它們作為單獨的任務提交到池中。這些塊的(近似)大小可以透過將 chunksize 設定為正整數來指定。對於非常長的可迭代物件,使用較大的 chunksize 值可以顯著提高效能,相比於預設大小 1。對於 ThreadPoolExecutorInterpreterPoolExecutorchunksize 沒有效果。

在 3.5 版更改: 增加了 chunksize 形參。

在 3.14 版更改: 增加了 buffersize 引數。

shutdown(wait=True, *, cancel_futures=False)

通知執行器,在當前待處理的 future 執行完畢後,應釋放其正在使用的任何資源。在 shutdown 後呼叫 Executor.submit()Executor.map() 將會引發 RuntimeError

如果 waitTrue,則此方法將不會返回,直到所有待處理的 future 都執行完畢並且與執行器關聯的資源都已釋放。如果 waitFalse,則此方法將立即返回,與執行器關聯的資源將在所有待處理的 future 執行完畢後釋放。無論 wait 的值如何,整個 Python 程式都不會退出,直到所有待處理的 future 都執行完畢。

如果 cancel_futuresTrue,此方法將取消所有執行器尚未開始執行的待處理 future。任何已完成或正在執行的 future 都不會被取消,無論 cancel_futures 的值如何。

如果 cancel_futureswait 均為 True,則執行器已開始執行的所有 future 將在此方法返回之前完成。剩餘的 future 將被取消。

如果你透過 with 語句將執行器用作上下文管理器,則可以避免顯式呼叫此方法。這會自動關閉 Executor(等待方式如同呼叫 Executor.shutdown() 時將 wait 設定為 True)。

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

在 3.9 版更改: 添加了 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutor 是一個 Executor 子類,它使用執行緒池來非同步執行呼叫。

當與一個 Future 關聯的可呼叫物件等待另一個 Future 的結果時,可能會發生死鎖。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一個 Executor 子類,它使用一個最多包含 max_workers 個執行緒的池來非同步執行呼叫。

所有加入 ThreadPoolExecutor 佇列的執行緒都會在直譯器退出前被 join。請注意,執行此操作的退出處理程式在任何使用 atexit 新增的退出處理程式 *之前* 執行。這意味著主執行緒中的異常必須被捕獲和處理,以便向執行緒傳送優雅退出的訊號。因此,建議不要將 ThreadPoolExecutor 用於長時間執行的任務。

initializer 是一個可選的可呼叫物件,在每個工作執行緒啟動時被呼叫;initargs 是傳遞給初始化器的引數元組。如果 initializer 引發異常,所有當前待處理的作業都將引發 BrokenThreadPool,任何向池中提交更多作業的嘗試也將如此。

在 3.5 版更改: 如果 max_workersNone 或未給出,它將預設為機器上的處理器數量乘以 5,假設 ThreadPoolExecutor 通常用於重疊 I/O 而不是 CPU 工作,因此工作執行緒數應高於 ProcessPoolExecutor 的工作執行緒數。

在 3.6 版更改: 添加了 thread_name_prefix 引數,允許使用者控制由池建立的工作執行緒的 threading.Thread 名稱,以便於除錯。

在 3.7 版更改: 添加了 initializerinitargs 引數。

在 3.8 版更改: max_workers 的預設值更改為 min(32, os.cpu_count() + 4)。此預設值至少保留 5 個工作執行緒用於 I/O 密集型任務。它最多利用 32 個 CPU 核心用於釋放 GIL 的 CPU 密集型任務。並且它避免了在多核機器上隱式使用非常大的資源。

ThreadPoolExecutor 現在會在啟動 max_workers 個工作執行緒之前重用空閒的工作執行緒。

在 3.13 版更改: max_workers 的預設值更改為 min(32, (os.process_cpu_count() or 1) + 4)

ThreadPoolExecutor 示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

InterpreterPoolExecutor

InterpreterPoolExecutor 類使用直譯器池來非同步執行呼叫。它是 ThreadPoolExecutor 的子類,這意味著每個工作者都在自己的執行緒中執行。這裡的區別在於每個工作者都有自己的直譯器,並使用該直譯器執行每個任務。

使用直譯器而非僅用執行緒的最大好處是實現了真正的多核並行。每個直譯器都有自己的全域性直譯器鎖,因此在一個直譯器中執行的程式碼可以在一個 CPU 核心上執行,而另一個直譯器中的程式碼可以在不同的核心上無阻塞地執行。

權衡之處在於,為多直譯器編寫併發程式碼可能需要額外的努力。然而,這是因為它迫使你謹慎地考慮直譯器如何以及何時互動,並明確指定哪些資料在直譯器之間共享。這帶來了一些好處,有助於平衡額外的努力,包括真正的多核並行。例如,以這種方式編寫的程式碼可以更容易地推理併發性。另一個主要好處是,你不必處理使用執行緒時的一些主要痛點,比如競爭條件。

每個工作者的直譯器都與所有其他直譯器隔離。“隔離”意味著每個直譯器都有自己的執行時狀態,並且完全獨立執行。例如,如果你在一個直譯器中重定向 sys.stdout,它不會自動重定向到任何其他直譯器。如果你在一個直譯器中匯入一個模組,它不會自動在任何其他直譯器中匯入。你需要在需要它的直譯器中單獨匯入該模組。事實上,在一個直譯器中匯入的每個模組都是一個與不同直譯器中相同模組完全獨立的物件,包括 sysbuiltins,甚至 __main__

隔離意味著一個可變物件或其他資料不能同時被多個直譯器使用。這實際上意味著直譯器之間無法真正共享這類物件或資料。相反,每個直譯器必須有自己的副本,並且你必須手動同步副本之間的任何更改。不可變物件和資料,如內建單例、字串和不可變物件的元組,則沒有這些限制。

在直譯器之間進行通訊和同步最有效的方法是使用專用工具,例如 PEP 734 中提出的工具。一種效率較低的替代方法是使用 pickle 進行序列化,然後透過共享的 socketpipe 傳送位元組。

class concurrent.futures.InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一個 ThreadPoolExecutor 的子類,它使用一個最多包含 max_workers 個執行緒的池來非同步執行呼叫。每個執行緒都在自己的直譯器中執行任務。工作直譯器彼此隔離,這意味著每個直譯器都有自己的執行時狀態,並且它們不能共享任何可變物件或其他資料。每個直譯器都有自己的全域性直譯器鎖,這意味著使用此執行器執行的程式碼具有真正的多核並行性。

可選的 initializerinitargs 引數與 ThreadPoolExecutor 的含義相同:初始化器在每個工作者建立時執行,但在這種情況下,它是在工作者的直譯器中執行的。執行器在將 initializerinitargs 傳送給工作者的直譯器時,會使用 pickle 對它們進行序列化。

備註

執行器可能會將來自 initializer 的未捕獲異常替換為 ExecutionFailed

來自父類 ThreadPoolExecutor 的其他注意事項也適用於此。

submit()map() 的工作方式與正常情況類似,只是工作者在將可呼叫物件和引數傳送到其直譯器時會使用 pickle 進行序列化。同樣,工作者在返回結果時也會序列化返回值。

當工作者的當前任務引發未捕獲的異常時,工作者總是嘗試按原樣保留該異常。如果成功,它還會將 __cause__ 設定為相應的 ExecutionFailed 例項,該例項包含原始異常的摘要。在少數情況下,如果工作者無法按原樣保留原始異常,它會直接保留相應的 ExecutionFailed 例項。

ProcessPoolExecutor

ProcessPoolExecutor 類是 Executor 的一個子類,它使用程序池來非同步執行呼叫。ProcessPoolExecutor 使用 multiprocessing 模組,這使得它能夠繞過全域性直譯器鎖,但這也意味著只有可 pickle 的物件才能被執行和返回。

__main__ 模組必須能被工作者子程序匯入。這意味著 ProcessPoolExecutor 在互動式直譯器中將無法工作。

從提交給 ProcessPoolExecutor 的可呼叫物件中呼叫 ExecutorFuture 的方法將導致死鎖。

請注意,根據 multiprocessing.Process 的要求,函式和引數需要是可 pickle 的限制,在使用 submit()map()ProcessPoolExecutor 時同樣適用。在 REPL 中定義的函式或 lambda 表示式預計將無法工作。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

一個 Executor 子類,它使用一個最多包含 max_workers 個程序的池來非同步執行呼叫。如果 max_workersNone 或未給出,它將預設為 os.process_cpu_count()。如果 max_workers 小於或等於 0,則將引發 ValueError。在 Windows 上,max_workers 必須小於或等於 61。否則將引發 ValueError。如果 max_workersNone,則即使有更多可用的處理器,選擇的預設值也最多為 61mp_context 可以是 multiprocessing 上下文或 None。它將用於啟動工作程序。如果 mp_contextNone 或未給出,則使用預設的 multiprocessing 上下文。參見上下文和啟動方法

initializer 是一個可選的可呼叫物件,在每個工作程序啟動時被呼叫;initargs 是傳遞給初始化器的引數元組。如果 initializer 引發異常,所有當前待處理的作業都將引發 BrokenProcessPool,任何向池中提交更多作業的嘗試也將如此。

max_tasks_per_child 是一個可選引數,指定單個程序在退出並被一個新的工作程序替換之前可以執行的最大任務數。預設情況下,max_tasks_per_childNone,這意味著工作程序將與池一樣長壽。當指定了最大值時,在沒有 mp_context 引數的情況下,預設將使用 "spawn" 多程序啟動方法。此功能與 "fork" 啟動方法不相容。

在 3.3 版更改: 當其中一個工作程序突然終止時,現在會引發 BrokenProcessPool 錯誤。以前,行為是未定義的,但對執行器或其 future 的操作通常會凍結或死鎖。

在 3.7 版更改: 添加了 mp_context 引數,允許使用者控制由池建立的工作程序的 start_method。

添加了 initializerinitargs 引數。

在 3.11 版更改: 添加了 max_tasks_per_child 引數,允許使用者控制池中工作程序的生命週期。

在 3.12 版更改: 在 POSIX 系統上,如果您的應用程式有多個執行緒,並且 multiprocessing 上下文使用 "fork" 啟動方法:內部呼叫以生成工作程序的 os.fork() 函式可能會引發 DeprecationWarning。請傳遞一個配置為使用不同啟動方法的 mp_context。有關進一步解釋,請參閱 os.fork() 文件。

在 3.13 版更改: max_workers 預設使用 os.process_cpu_count(),而不是 os.cpu_count()

在 3.14 版更改: 預設程序啟動方法(見上下文和啟動方法)已不再是 fork。如果你需要為 ProcessPoolExecutor 使用 fork 啟動方法,你必須顯式傳遞 mp_context=multiprocessing.get_context("fork")

terminate_workers()

透過對每個活動工作程序呼叫 Process.terminate,嘗試立即終止所有活動的工作程序。在內部,它還將呼叫 Executor.shutdown() 以確保與執行器關聯的所有其他資源都被釋放。

呼叫此方法後,呼叫者不應再向執行器提交任務。

在 3.14 版本加入。

kill_workers()

透過對每個活動工作程序呼叫 Process.kill,嘗試立即殺死所有活動的工作程序。在內部,它還將呼叫 Executor.shutdown() 以確保與執行器關聯的所有其他資源都被釋放。

呼叫此方法後,呼叫者不應再向執行器提交任務。

在 3.14 版本加入。

ProcessPoolExecutor 示例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 物件

Future 類封裝了可呼叫物件的非同步執行。Future 例項由 Executor.submit() 建立。

class concurrent.futures.Future

封裝可呼叫物件的非同步執行。Future 例項由 Executor.submit() 建立,除了用於測試外,不應直接建立。

cancel()

嘗試取消呼叫。如果呼叫當前正在執行或已完成執行且無法取消,則該方法將返回 False,否則呼叫將被取消,並且該方法將返回 True

cancelled()

如果呼叫被成功取消,則返回 True

running()

如果呼叫當前正在執行且無法取消,則返回 True

done()

如果呼叫被成功取消或完成執行,則返回 True

result(timeout=None)

返回呼叫返回的值。如果呼叫尚未完成,則此方法將等待最多 timeout 秒。如果呼叫在 timeout 秒內未完成,則將引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則沒有等待時間限制。

如果在完成前 future 被取消,則將引發 CancelledError

如果呼叫引發了異常,此方法將引發相同的異常。

exception(timeout=None)

返回呼叫引發的異常。如果呼叫尚未完成,則此方法將等待最多 timeout 秒。如果呼叫在 timeout 秒內未完成,則將引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則沒有等待時間限制。

如果在完成前 future 被取消,則將引發 CancelledError

如果呼叫完成而沒有引發異常,則返回 None

add_done_callback(fn)

將可呼叫物件 fn 附加到 future。當 future 被取消或完成執行時,將以 future 作為其唯一引數呼叫 fn

新增的可呼叫物件按照新增的順序被呼叫,並且總是在新增它們的程序的執行緒中被呼叫。如果可呼叫物件引發 Exception 子類,它將被記錄並忽略。如果可呼叫物件引發 BaseException 子類,則行為是未定義的。

如果 future 已經完成或被取消,fn 將立即被呼叫。

以下 Future 方法旨在用於單元測試和 Executor 實現。

set_running_or_notify_cancel()

此方法只應由 Executor 實現和單元測試在執行與 Future 相關的工作之前呼叫。

如果方法返回 False,則 Future 已被取消,即 Future.cancel() 被呼叫並返回了 True。任何等待 Future 完成的執行緒(即透過 as_completed()wait())都將被喚醒。

如果方法返回 True,則 Future 未被取消,並已進入執行狀態,即呼叫 Future.running() 將返回 True

此方法只能呼叫一次,並且不能在 Future.set_result()Future.set_exception() 被呼叫後呼叫。

set_result(result)

將與 Future 關聯的工作結果設定為 result

此方法只應由 Executor 實現和單元測試使用。

在 3.8 版更改: 如果 Future 已經完成,此方法將引發 concurrent.futures.InvalidStateError

set_exception(exception)

將與 Future 關聯的工作結果設定為 Exception exception

此方法只應由 Executor 實現和單元測試使用。

在 3.8 版更改: 如果 Future 已經完成,此方法將引發 concurrent.futures.InvalidStateError

模組函式

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 fs 給出的 Future 例項(可能由不同的 Executor 例項建立)完成。給 fs 的重複 future 會被移除,且只返回一次。返回一個由兩個集合組成的命名元組。第一個集合名為 done,包含在等待完成前完成的 future(已完成或已取消的 future)。第二個集合名為 not_done,包含未完成的 future(待處理或正在執行的 future)。

timeout 可用於控制返回前等待的最大秒數。timeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則沒有等待時間限制。

return_when 指示此函式何時應返回。它必須是以下常量之一:

常量

描述

concurrent.futures.FIRST_COMPLETED

當任何一個 future 完成或被取消時,該函式將返回。

concurrent.futures.FIRST_EXCEPTION

當任何一個 future 因引發異常而完成時,該函式將返回。如果沒有 future 引發異常,則它等同於 ALL_COMPLETED

concurrent.futures.ALL_COMPLETED

當所有 future 完成或被取消時,該函式將返回。

concurrent.futures.as_completed(fs, timeout=None)

返回一個迭代器,該迭代器遍歷由 fs 給出的 Future 例項(可能由不同的 Executor 例項建立),並在 future 完成(完成或取消)時產生它們。由 fs 提供的任何重複 future 將只返回一次。在呼叫 as_completed() 之前已完成的任何 future 將首先被產生。如果呼叫了 __next__(),並且在從最初呼叫 as_completed() 算起的 timeout 秒後結果仍不可用,則返回的迭代器將引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則沒有等待時間限制。

參見

PEP 3148 – future - 非同步執行計算

描述將此功能納入 Python 標準庫的提案。

異常類

exception concurrent.futures.CancelledError

當 future 被取消時引發。

exception concurrent.futures.TimeoutError

TimeoutError 的已棄用別名,當 future 操作超過給定超時時引發。

在 3.11 版更改: 這個類成為了 TimeoutError 的別名。

exception concurrent.futures.BrokenExecutor

派生自 RuntimeError,當執行器因某種原因損壞,無法用於提交或執行新任務時,會引發此異常類。

在 3.7 版本加入。

exception concurrent.futures.InvalidStateError

在對一個處於不允許該操作的當前狀態的 future 執行操作時引發。

在 3.8 版本加入。

exception concurrent.futures.thread.BrokenThreadPool

派生自 BrokenExecutor,當 ThreadPoolExecutor 的一個工作執行緒初始化失敗時,會引發此異常類。

在 3.7 版本加入。

exception concurrent.futures.interpreter.BrokenInterpreterPool

派生自 BrokenThreadPool,當 InterpreterPoolExecutor 的一個工作執行緒初始化失敗時,會引發此異常類。

在 3.14 版本加入。

exception concurrent.futures.interpreter.ExecutionFailed

當給定的初始化器失敗時,從 InterpreterPoolExecutor 中引發;或者當提交的任務中存在未捕獲的異常時,從 submit() 中引發。

在 3.14 版本加入。

exception concurrent.futures.process.BrokenProcessPool

派生自 BrokenExecutor(以前是 RuntimeError),當 ProcessPoolExecutor 的一個工作程序以非正常方式終止時(例如,如果它被從外部殺死),會引發此異常類。

在 3.3 版本加入。