multiprocessing — 基於程序的並行

原始碼: Lib/multiprocessing/


可用性: 非 Android、非 iOS、非 WASI。

該模組在移動平臺WebAssembly 平臺上不受支援。

引言

multiprocessing 是一個支援使用類似於 threading 模組的 API 來建立程序的包。multiprocessing 包同時提供本地和遠端併發,透過使用子程序而非執行緒來有效規避 全域性直譯器鎖。因此,multiprocessing 模組允許程式設計師充分利用給定機器上的多個處理器。它可在 POSIX 和 Windows 上執行。

multiprocessing 模組還引入了在 threading 模組中沒有對應項的 API。一個主要的例子是 Pool 物件,它提供了一種便捷的方式,可以在多個輸入值上並行執行函式,將輸入資料分發到多個程序(資料並行)。以下示例演示了在模組中定義此類函式的常見做法,以便子程序可以成功匯入該模組。這個使用 Pool 的資料並行基本示例:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

將列印到標準輸出

[1, 4, 9]

參見

concurrent.futures.ProcessPoolExecutor 提供了一個更高級別的介面,可以將任務推送到後臺程序而不會阻塞呼叫程序的執行。與直接使用 Pool 介面相比,concurrent.futures API 更容易將工作提交到底層程序池與等待結果分開。

Process 類

multiprocessing 中,透過建立一個 Process 物件,然後呼叫其 start() 方法來生成程序。Process 遵循 threading.Thread 的 API。一個簡單的多程序程式示例是

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

為了顯示涉及的各個程序 ID,這裡有一個擴充套件示例

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

有關為什麼需要 if __name__ == '__main__' 部分的解釋,請參閱 程式設計指南

Process 的引數通常需要能夠從子程序內部進行反序列化。如果您嘗試將上述示例直接輸入 REPL,則可能導致子程序在嘗試在 __main__ 模組中找到 *f* 函式時出現 AttributeError

上下文和啟動方法

根據平臺,multiprocessing 支援三種啟動程序的方式。這些 *啟動方法* 是

spawn(生成)

父程序啟動一個新的 Python 直譯器程序。子程序將只繼承執行程序物件的 run() 方法所需的資源。特別是,父程序中不必要的檔案描述符和控制代碼將不會被繼承。與使用 *fork* 或 *forkserver* 相比,使用此方法啟動程序相當慢。

在 POSIX 和 Windows 平臺上可用。在 Windows 和 macOS 上是預設值。

fork(分叉)

父程序使用 os.fork() 來分叉 Python 直譯器。子程序開始時,實際上與父程序相同。父程序的所有資源都由子程序繼承。請注意,安全地分叉多執行緒程序存在問題。

在 POSIX 系統上可用。

3.14 版本更改: 這不再是任何平臺上的預設啟動方法。需要 *fork* 的程式碼必須透過 get_context()set_start_method() 明確指定。

3.12 版本更改: 如果 Python 能夠檢測到您的程序有多個執行緒,此啟動方法內部呼叫的 os.fork() 函式將引發 DeprecationWarning。請使用不同的啟動方法。有關進一步解釋,請參閱 os.fork() 文件。

forkserver(分叉伺服器)

當程式啟動並選擇 *forkserver* 啟動方法時,會生成一個伺服器程序。從那時起,每當需要一個新程序時,父程序連線到伺服器並請求它分叉一個新程序。分叉伺服器程序是單執行緒的,除非系統庫或預載入的匯入作為副作用生成執行緒,因此通常可以安全地使用 os.fork()。不會繼承不必要的資源。

在支援透過 Unix 管道傳遞檔案描述符的 POSIX 平臺(例如 Linux)上可用。在這些平臺上是預設值。

3.14 版本更改: 這成為 POSIX 平臺上的預設啟動方法。

3.4 版本更改: 在所有 POSIX 平臺上添加了 *spawn*,併為某些 POSIX 平臺添加了 *forkserver*。子程序不再繼承 Windows 上父程序的所有可繼承控制代碼。

3.8 版本更改: 在 macOS 上,*spawn* 啟動方法現在是預設值。*fork* 啟動方法應被視為不安全,因為它可能導致子程序崩潰,因為 macOS 系統庫可能會啟動執行緒。請參閱 bpo-33725

3.14 版本更改: 在 POSIX 平臺上,預設啟動方法從 *fork* 更改為 *forkserver*,以保持效能並避免常見的多執行緒程序不相容性。請參閱 gh-84559

在 POSIX 上使用 *spawn* 或 *forkserver* 啟動方法還將啟動一個 *資源跟蹤器* 程序,該程序跟蹤程式程序建立的未連結的命名系統資源(例如命名訊號量或 SharedMemory 物件)。當所有程序都退出時,資源跟蹤器會取消連結任何剩餘的已跟蹤物件。通常不應該有,但如果程序被訊號殺死,可能會有一些“洩漏”的資源。(無論是洩漏的訊號量還是共享記憶體段都不會在下次重新啟動之前自動取消連結。這對這兩種物件都存在問題,因為系統只允許有限數量的命名訊號量,並且共享記憶體段佔用主記憶體中的一些空間。)

要選擇啟動方法,您可以在主模組的 if __name__ == '__main__' 子句中使用 set_start_method()。例如

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() 不應在程式中多次使用。

或者,您可以使用 get_context() 獲取一個上下文物件。上下文物件與 multiprocessing 模組具有相同的 API,並允許在同一個程式中使用多個啟動方法。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

請注意,與一個上下文相關的物件可能與不同上下文的程序不相容。特別是,使用 *fork* 上下文建立的鎖不能傳遞給使用 *spawn* 或 *forkserver* 啟動方法啟動的程序。

使用 multiprocessingProcessPoolExecutor 的庫應設計為允許其使用者提供自己的多程序上下文。在庫中使用自己的特定上下文可能導致與庫使用者應用程式的其餘部分不相容。如果您的庫需要特定的啟動方法,請務必記錄。

警告

在 POSIX 系統上,'spawn''forkserver' 啟動方法通常不能與“凍結”的可執行檔案(即,由 PyInstallercx_Freeze 等包生成的可執行檔案)一起使用。'fork' 啟動方法可能有效,如果程式碼不使用執行緒。

程序間交換物件

multiprocessing 支援兩種程序間通訊通道

佇列

Queue 類是 queue.Queue 的近乎克隆。例如

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

佇列是執行緒和程序安全的。放入 multiprocessing 佇列的任何物件都將被序列化。

管道

Pipe() 函式返回一對由管道連線的連線物件,該管道預設是雙工的(雙向)。例如

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Pipe() 返回的兩個連線物件代表管道的兩端。每個連線物件都有 send()recv() 方法(以及其他)。請注意,如果兩個程序(或執行緒)同時嘗試從管道的 *同一* 端讀取或寫入,則管道中的資料可能會損壞。當然,程序同時使用管道的不同端沒有損壞的風險。

send() 方法序列化物件,recv() 重新建立物件。

程序間同步

multiprocessing 包含 threading 中的所有同步原語的等價物。例如,可以使用鎖來確保一次只有一個程序列印到標準輸出

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用鎖,不同程序的輸出可能會完全混淆。

程序間共享狀態

如上所述,在進行併發程式設計時,最好儘可能避免使用共享狀態。在使用多個程序時尤其如此。

但是,如果您確實需要使用一些共享資料,那麼 multiprocessing 提供了幾種實現方式。

共享記憶體

資料可以使用 ValueArray 儲存在共享記憶體對映中。例如,以下程式碼

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

將列印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

建立 numarr 時使用的 'd''i' 引數是 array 模組使用的型別程式碼:'d' 表示雙精度浮點數,'i' 表示有符號整數。這些共享物件將是程序和執行緒安全的。

為了更靈活地使用共享記憶體,可以使用 multiprocessing.sharedctypes 模組,該模組支援建立從共享記憶體分配的任意 ctypes 物件。

伺服器程序

Manager() 返回的管理器物件控制一個伺服器程序,該程序持有 Python 物件並允許其他程序使用代理操縱它們。

Manager() 返回的管理器將支援 listdictsetNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray 等型別。例如,

from multiprocessing import Process, Manager

def f(d, l, s):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()
    s.add('a')
    s.add('b')

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
        s = manager.set()

        p = Process(target=f, args=(d, l, s))
        p.start()
        p.join()

        print(d)
        print(l)
        print(s)

將列印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
{'a', 'b'}

伺服器程序管理器比使用共享記憶體物件更靈活,因為它們可以支援任意物件型別。此外,單個管理器可以透過網路在不同計算機上的程序之間共享。但是,它們比使用共享記憶體慢。

使用工作程序池

Pool 類代表一個工作程序池。它有方法可以通過幾種不同的方式將任務解除安裝到工作程序。

例如:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

請注意,池的方法應僅由建立它的程序使用。

備註

此包中的功能要求子程序可以匯入 __main__ 模組。這在 程式設計指南 中有所介紹,但在此處指出也很重要。這意味著一些示例,例如 multiprocessing.pool.Pool 示例,將無法在互動式直譯器中執行。例如

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(如果您嘗試這樣做,它實際上會以半隨機的方式交錯輸出三個完整的追溯,然後您可能需要以某種方式停止父程序。)

參考

multiprocessing 包大部分複製了 threading 模組的 API。

Process 和異常

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Process 物件代表在單獨程序中執行的活動。Process 類具有 threading.Thread 所有方法的等價物。

建構函式應始終使用關鍵字引數呼叫。*group* 應始終為 None;它僅為了與 threading.Thread 相容而存在。*target* 是要由 run() 方法呼叫的可呼叫物件。它預設為 None,表示不呼叫任何內容。*name* 是程序名稱(有關更多詳細資訊,請參閱 name)。*args* 是目標呼叫的引數元組。*kwargs* 是目標呼叫的關鍵字引數字典。如果提供,僅關鍵字引數 *daemon* 將程序 daemon 標誌設定為 TrueFalse。如果為 None(預設值),此標誌將從建立程序繼承。

預設情況下,不向 *target* 傳遞任何引數。*args* 引數(預設為 ())可用於指定要傳遞給 *target* 的引數列表或元組。

如果子類重寫建構函式,它必須確保在對程序執行任何其他操作之前呼叫基類建構函式(super().__init__())。

備註

通常,Process 的所有引數都必須是可序列化的。當嘗試從 REPL 使用本地定義的 *target* 函式建立 Process 或使用 concurrent.futures.ProcessPoolExecutor 時,經常會觀察到這一點。

傳遞在當前 REPL 會話中定義的可呼叫物件會導致子程序透過未捕獲的 AttributeError 異常終止,因為 *target* 必須在可匯入模組中定義才能在反序列化期間載入。

子程序中這種不可捕獲的錯誤示例

>>> import multiprocessing as mp
>>> def knigit():
...     print("Ni!")
...
>>> process = mp.Process(target=knigit)
>>> process.start()
>>> Traceback (most recent call last):
  File ".../multiprocessing/spawn.py", line ..., in spawn_main
  File ".../multiprocessing/spawn.py", line ..., in _main
AttributeError: module '__main__' has no attribute 'knigit'
>>> process
<SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 stopped exitcode=1>

請參閱 spawn 和 forkserver 啟動方法。儘管如果使用 "fork" 啟動方法,此限制不成立,但截至 Python 3.14,它不再是任何平臺上的預設值。請參閱 上下文和啟動方法。另請參閱 gh-132898

3.3 版本更改: 添加了 *daemon* 引數。

run()

表示程序活動的 方法。

您可以在子類中重寫此方法。標準的 run() 方法會呼叫傳遞給物件建構函式作為 *target* 引數的可呼叫物件(如果有),並使用 *args* 和 *kwargs* 引數中的順序引數和關鍵字引數。

將列表或元組作為傳遞給 Process 的 *args* 引數會達到相同的效果。

示例

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

開始程序的活動。

每個程序物件最多隻能呼叫一次此方法。它會安排物件的 run() 方法在單獨的程序中被呼叫。

join([timeout])

如果可選引數 *timeout* 為 None(預設值),則該方法會阻塞,直到呼叫其 join() 方法的程序終止。如果 *timeout* 是正數,它最多阻塞 *timeout* 秒。請注意,如果其程序終止或方法超時,該方法將返回 None。檢查程序的 exitcode 以確定它是否終止。

一個程序可以被多次連線。

一個程序不能連線自身,因為這會導致死鎖。在程序啟動之前嘗試連線程序是錯誤的。

name

程序的名稱。名稱是僅用於標識目的的字串。它沒有語義。多個程序可以被賦予相同的名稱。

初始名稱由建構函式設定。如果未向建構函式提供顯式名稱,則會構造一個形式為“Process-N1:N2:…:Nk”的名稱,其中每個 Nk 是其父程序的第 N 個子程序。

is_alive()

返回程序是否存活。

大致來說,一個程序物件從 start() 方法返回的那一刻起,直到子程序終止,都是存活的。

daemon

程序的守護程序標誌,一個布林值。這必須在呼叫 start() 之前設定。

初始值從建立程序繼承。

當一個程序退出時,它會嘗試終止所有其守護程序子程序。

請注意,守護程序不允許建立子程序。否則,如果守護程序在其父程序退出時被終止,它將使其子程序成為孤兒。此外,這些 不是 Unix 守護程序或服務,它們是正常的程序,如果非守護程序退出,它們將被終止(而不是連線)。

除了 threading.Thread API 之外,Process 物件還支援以下屬性和方法

pid

返回程序 ID。在程序生成之前,這將是 None

exitcode

子程序的退出程式碼。如果程序尚未終止,則此值為 None

如果子程序的 run() 方法正常返回,退出程式碼將為 0。如果它透過 sys.exit() 並帶有一個整數引數 N 終止,退出程式碼將為 N

如果子程序由於 run() 中未捕獲的異常而終止,退出程式碼將為 1。如果它被訊號 N 終止,退出程式碼將為負值 -N

authkey

程序的認證金鑰(一個位元組字串)。

multiprocessing 初始化時,主程序使用 os.urandom() 分配一個隨機字串。

當建立 Process 物件時,它將繼承其父程序的認證金鑰,儘管可以透過將 authkey 設定為另一個位元組字串來更改此金鑰。

參見 認證金鑰

sentinel

一個系統物件的數字控制代碼,當程序結束時,該物件將變為“就緒”。

如果您想使用 multiprocessing.connection.wait() 同時等待多個事件,則可以使用此值。否則,呼叫 join() 更簡單。

在 Windows 上,這是一個可與 WaitForSingleObjectWaitForMultipleObjects 系列 API 呼叫一起使用的作業系統控制代碼。在 POSIX 上,這是一個檔案描述符,可與 select 模組中的原語一起使用。

在 3.3 版本加入。

interrupt()

終止程序。在 POSIX 上使用 SIGINT 訊號。Windows 上的行為未定義。

預設情況下,這透過引發 KeyboardInterrupt 來終止子程序。可以透過在子程序中為 SIGINT 設定相應的訊號處理程式 signal.signal() 來改變此行為。

注意:如果子程序捕獲並丟棄 KeyboardInterrupt,則程序將不會被終止。

注意:預設行為還會將 exitcode 設定為 1,就像在子程序中引發了未捕獲的異常一樣。要獲得不同的 exitcode,您可以簡單地捕獲 KeyboardInterrupt 並呼叫 exit(your_code)

在 3.14 版本加入。

terminate()

終止程序。在 POSIX 上使用 SIGTERM 訊號完成;在 Windows 上使用 TerminateProcess()。請注意,退出處理程式和 finally 子句等將不會執行。

請注意,程序的後代程序將不會被終止——它們將簡單地成為孤兒程序。

警告

如果當關聯程序正在使用管道或佇列時使用此方法,則管道或佇列可能會損壞,並可能無法被其他程序使用。同樣,如果程序已獲取鎖或訊號量等,則終止它可能會導致其他程序死鎖。

kill()

terminate() 相同,但在 POSIX 上使用 SIGKILL 訊號。

在 3.7 版本加入。

close()

關閉 Process 物件,釋放與之關聯的所有資源。如果底層程序仍在執行,則會引發 ValueError。一旦 close() 成功返回,Process 物件的其他大多數方法和屬性都將引發 ValueError

在 3.7 版本加入。

請注意,start()join()is_alive()terminate()exitcode 方法只能由建立程序物件的程序呼叫。

Process 的一些方法的使用示例

>>> import multiprocessing, time, signal
>>> mp_context = multiprocessing.get_context('spawn')
>>> p = mp_context.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<...Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<...Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<...Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

所有 multiprocessing 異常的基類。

exception multiprocessing.BufferTooShort

當提供的緩衝區物件對於讀取的訊息來說太小時,由 Connection.recv_bytes_into() 引發的異常。

如果 eBufferTooShort 的例項,則 e.args[0] 將以位元組字串形式給出訊息。

exception multiprocessing.AuthenticationError

當出現認證錯誤時引發。

exception multiprocessing.TimeoutError

當超時過期時,由帶有超時的各種方法引發。

管道和佇列

在使用多個程序時,通常使用訊息傳遞在程序之間進行通訊,並避免使用任何同步原語,例如鎖。

對於傳遞訊息,可以使用 Pipe()(用於兩個程序之間的連線)或佇列(允許多個生產者和消費者)。

QueueSimpleQueueJoinableQueue 型別是多生產者、多消費者的 FIFO 佇列,它們模仿標準庫中的 queue.Queue 類。它們的不同之處在於 Queue 缺少 Python 2.5 的 queue.Queue 類中引入的 task_done()join() 方法。

如果使用 JoinableQueue,則對於從佇列中刪除的每個任務,必須呼叫 JoinableQueue.task_done(),否則用於計算未完成任務數量的訊號量最終可能會溢位,從而引發異常。

與其它 Python 佇列實現的一個區別是,multiprocessing 佇列使用 pickle 序列化所有放入其中的物件。透過 get 方法返回的物件是一個重新建立的物件,它不與原始物件共享記憶體。

請注意,也可以透過使用管理器物件建立共享佇列——參見 管理器

備註

multiprocessing 使用通常的 queue.Emptyqueue.Full 異常來指示超時。它們在 multiprocessing 名稱空間中不可用,因此您需要從 queue 中匯入它們。

備註

當一個物件被放入佇列時,該物件會被 pickle 序列化,然後一個後臺執行緒會將序列化的資料重新整理到底層管道中。這會產生一些令人驚訝的後果,但應該不會造成任何實際困難——如果它們真的困擾您,那麼您可以改用由 管理器 建立的佇列。

  1. 將物件放入空佇列後,可能會有極小的延遲,之後佇列的 empty() 方法會返回 False,並且 get_nowait() 可以返回而不會引發 queue.Empty

  2. 如果多個程序正在入隊物件,則物件可能會在另一端以亂序接收。但是,由同一程序入隊的物件將始終相對於彼此以預期順序排列。

警告

如果一個程序在使用 Process.terminate()os.kill() 嘗試使用 Queue 時被殺死,則佇列中的資料很可能會損壞。這可能導致任何其他程序在稍後嘗試使用佇列時獲得異常。

警告

如上所述,如果子程序已將專案放入佇列中(並且未使用 JoinableQueue.cancel_join_thread),則該程序在所有緩衝專案重新整理到管道之前不會終止。

這意味著如果你嘗試連線該程序,你可能會死鎖,除非你確定所有已放入佇列的專案都已被消費。同樣,如果子程序是非守護程序,則父程序在退出時嘗試連線所有非守護子程序時可能會掛起。

請注意,使用管理器建立的佇列沒有此問題。請參閱 程式設計指南

有關佇列用於程序間通訊的示例,請參閱 示例

multiprocessing.Pipe([duplex])

返回一對 (conn1, conn2)Connection 物件,表示管道的兩端。

如果 duplexTrue (預設),則管道是雙向的。如果 duplexFalse,則管道是單向的:conn1 只能用於接收訊息,conn2 只能用於傳送訊息。

send() 方法使用 pickle 序列化物件,而 recv() 則重新建立物件。

class multiprocessing.Queue([maxsize])

返回一個使用管道和一些鎖/訊號量實現的程序共享佇列。當一個程序首次將專案放入佇列時,會啟動一個 feeder 執行緒,該執行緒將物件從緩衝區傳輸到管道中。

標準庫 queue 模組中通常的 queue.Emptyqueue.Full 異常用於表示超時。

Queue 實現了 queue.Queue 的所有方法,除了 task_done()join()

qsize()

返回佇列的大致大小。由於多執行緒/多程序語義,這個數字不可靠。

請注意,這可能會在 macOS 等平臺上引發 NotImplementedError,因為 sem_getvalue() 未實現。

empty()

如果佇列為空,則返回 True,否則返回 False。由於多執行緒/多程序語義,這不可靠。

在關閉的佇列上可能引發 OSError。(不保證)

full()

如果佇列已滿,則返回 True,否則返回 False。由於多執行緒/多程序語義,這不可靠。

put(obj[, block[, timeout]])

將 obj 放入佇列。如果可選引數 blockTrue(預設值)且 timeoutNone(預設值),則必要時阻塞直到有空閒槽可用。如果 timeout 是正數,它最多阻塞 timeout 秒,如果在此時間內沒有空閒槽可用,則引發 queue.Full 異常。否則(blockFalse),如果立即有空閒槽可用,則將專案放入佇列,否則引發 queue.Full 異常(在這種情況下,timeout 被忽略)。

3.8 版本發生變化: 如果佇列已關閉,則會引發 ValueError 而不是 AssertionError

put_nowait(obj)

等同於 put(obj, False)

get([block[, timeout]])

從佇列中移除並返回一個專案。如果可選引數 blockTrue(預設值)且 timeoutNone(預設值),則必要時阻塞直到有專案可用。如果 timeout 是正數,它最多阻塞 timeout 秒,如果在此時間內沒有專案可用,則引發 queue.Empty 異常。否則(blockFalse),如果立即有專案可用,則返回一個專案,否則引發 queue.Empty 異常(在這種情況下,timeout 被忽略)。

3.8 版本發生變化: 如果佇列已關閉,則會引發 ValueError 而不是 OSError

get_nowait()

等同於 get(False)

multiprocessing.Queue 還有一些 queue.Queue 中沒有的額外方法。這些方法對於大多數程式碼通常是不必要的。

close()

關閉佇列:釋放內部資源。

佇列一旦關閉,就不能再使用。例如,get()put()empty() 方法不能再呼叫。

後臺執行緒在將所有緩衝資料重新整理到管道後將退出。這在佇列被垃圾回收時會自動呼叫。

join_thread()

等待後臺執行緒。這隻能在呼叫 close() 之後使用。它會阻塞直到後臺執行緒退出,確保緩衝區中的所有資料都已重新整理到管道。

預設情況下,如果一個程序不是佇列的建立者,那麼在退出時它會嘗試加入佇列的後臺執行緒。該程序可以呼叫 cancel_join_thread() 使 join_thread() 不執行任何操作。

cancel_join_thread()

阻止 join_thread() 阻塞。特別是,這會阻止後臺執行緒在程序退出時自動被連線——參見 join_thread()

這個方法的一個更好的名稱可能是 allow_exit_without_flush()。它可能會導致入隊資料丟失,而且你幾乎肯定不需要使用它。它實際上只在你需要當前程序立即退出而無需等待將入隊資料重新整理到底層管道,並且你不關心資料丟失時才存在。

備註

此類的功能需要主機作業系統上可用的共享訊號量實現。如果沒有,此類的功能將被停用,並且嘗試例項化 Queue 將導致 ImportError。有關更多資訊,請參見 bpo-3770。以下列出的任何專用佇列型別也同樣如此。

class multiprocessing.SimpleQueue

它是一種簡化的 Queue 型別,與加鎖的 Pipe 非常相似。

close()

關閉佇列:釋放內部資源。

佇列一旦關閉,就不能再使用。例如,get()put()empty() 方法不能再呼叫。

在 3.9 版本中新增。

empty()

如果佇列為空,則返回 True,否則返回 False

如果 SimpleQueue 已關閉,則總是引發 OSError

get()

從佇列中移除並返回一個專案。

put(item)

item 放入佇列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue 的子類,它額外具有 task_done()join() 方法。

task_done()

指示先前排隊的任務已完成。由佇列消費者使用。對於每個用於獲取任務的 get() 呼叫,後續對 task_done() 的呼叫會告知佇列該任務的處理已完成。

如果當前 join() 正在阻塞,它將在所有專案都被處理後恢復(這意味著對於佇列中每個已被 put() 的專案,都收到了一個 task_done() 呼叫)。

如果呼叫次數多於佇列中放置的專案數,則會引發 ValueError

join()

阻塞直到佇列中的所有專案都已被獲取和處理。

每當一個專案被新增到佇列時,未完成任務的計數就會增加。每當消費者呼叫 task_done() 以指示該專案已被檢索並且其所有工作都已完成時,計數就會減少。當未完成任務的計數降至零時,join() 解除阻塞。

雜項

multiprocessing.active_children()

返回當前程序的所有活動子程序的列表。

呼叫此函式會產生“連線”任何已完成的程序的副作用。

multiprocessing.cpu_count()

返回系統中的 CPU 數量。

此數字不等於當前程序可以使用的 CPU 數量。可用 CPU 的數量可以透過 os.process_cpu_count()(或 len(os.sched_getaffinity(0)))獲取。

當無法確定 CPU 數量時,會引發 NotImplementedError

3.13 版本中的變化: 返回值也可以使用 -X cpu_count 標誌或 PYTHON_CPU_COUNT 進行覆蓋,因為它只是 os CPU 計數 API 的一個包裝器。

multiprocessing.current_process()

返回與當前程序對應的 Process 物件。

類似於 threading.current_thread()

multiprocessing.parent_process()

返回與 current_process() 的父程序對應的 Process 物件。對於主程序,parent_process 將為 None

在 3.8 版本加入。

multiprocessing.freeze_support()

為使用 multiprocessing 的程式在凍結以生成可執行檔案時新增支援。(已使用 py2exePyInstallercx_Freeze 進行測試。)

需要在主模組的 if __name__ == '__main__' 行之後立即呼叫此函式。例如

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果省略 freeze_support() 行,則嘗試執行凍結的可執行檔案將引發 RuntimeError

當啟動方法不是 spawn 時,呼叫 freeze_support() 沒有效果。此外,如果模組是由 Python 直譯器正常執行(程式未被凍結),則 freeze_support() 沒有效果。

multiprocessing.get_all_start_methods()

返回支援的啟動方法列表,其中第一個是預設方法。可能的啟動方法是 'fork''spawn''forkserver'。並非所有平臺都支援所有方法。參見 上下文和啟動方法

在 3.4 版本加入。

multiprocessing.get_context(method=None)

返回一個上下文物件,該物件具有與 multiprocessing 模組相同的屬性。

如果 methodNone,則返回預設上下文。請注意,如果尚未設定全域性啟動方法,此操作將將其設定為預設方法。否則 method 應該為 'fork''spawn''forkserver'。如果指定的啟動方法不可用,則會引發 ValueError。參見 上下文和啟動方法

在 3.4 版本加入。

multiprocessing.get_start_method(allow_none=False)

返回用於啟動程序的啟動方法的名稱。

如果尚未設定全域性啟動方法且 allow_noneFalse,則將啟動方法設定為預設方法並返回其名稱。如果尚未設定啟動方法且 allow_noneTrue,則返回 None

返回值可以是 'fork''spawn''forkserver'None。參見 上下文和啟動方法

在 3.4 版本加入。

3.8 版本中的變化: 在 macOS 上,spawn 啟動方法現在是預設方法。fork 啟動方法應被視為不安全,因為它可能導致子程序崩潰。參見 bpo-33725

multiprocessing.set_executable(executable)

設定啟動子程序時使用的 Python 直譯器的路徑。(預設使用 sys.executable)。嵌入者可能需要做一些類似以下的事情

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

才能建立子程序。

3.4 版本中的變化: 現在在 POSIX 上使用 'spawn' 啟動方法時受支援。

3.11 版本中的變化: 接受 類路徑物件

multiprocessing.set_forkserver_preload(module_names)

設定一個模組名稱列表,供 forkserver 主程序嘗試匯入,以便它們已匯入的狀態可由分叉程序繼承。在此過程中發生的任何 ImportError 都將被靜默忽略。這可以作為一種效能增強,以避免在每個程序中重複工作。

為此,必須在 forkserver 程序啟動之前(在建立 Pool 或啟動 Process 之前)呼叫它。

僅在使用 'forkserver' 啟動方法時有意義。參見 上下文和啟動方法

在 3.4 版本加入。

multiprocessing.set_start_method(method, force=False)

設定用於啟動子程序的方法。method 引數可以是 'fork''spawn''forkserver'。如果啟動方法已經設定且 force 不是 True,則引發 RuntimeError。如果 methodNoneforceTrue,則啟動方法設定為 None。如果 methodNoneforceFalse,則上下文設定為預設上下文。

請注意,此函式最多隻能呼叫一次,並且應在主模組的 if __name__ == '__main__' 子句中受到保護。

參見 上下文和啟動方法

在 3.4 版本加入。

連線物件

連線物件允許傳送和接收可 pickle 的物件或字串。它們可以被視為面向訊息的連線套接字。

連線物件通常使用 Pipe 建立——另請參見 監聽器和客戶端

class multiprocessing.connection.Connection
send(obj)

向連線的另一端傳送一個物件,該物件應該使用 recv() 讀取。

該物件必須是可 pickle 的。非常大的 pickle(大約 32 MiB+,儘管這取決於作業系統)可能會引發 ValueError 異常。

recv()

返回使用 send() 從連線的另一端傳送的物件。阻塞直到有東西可以接收。如果沒有可接收的資料且另一端已關閉,則引發 EOFError

fileno()

返回連線使用的檔案描述符或控制代碼。

close()

關閉連線。

當連線被垃圾回收時會自動呼叫此方法。

poll([timeout])

返回是否有任何資料可供讀取。

如果未指定 timeout,則立即返回。如果 timeout 是一個數字,則指定阻塞的最大時間(秒)。如果 timeoutNone,則使用無限超時。

請注意,可以使用 multiprocessing.connection.wait() 同時輪詢多個連線物件。

send_bytes(buffer[, offset[, size]])

類位元組物件 傳送位元組資料作為完整訊息。

如果給定了 offset,則從 buffer 中的該位置讀取資料。如果給定了 size,則將從 buffer 中讀取這麼多位元組。非常大的緩衝區(大約 32 MiB+,儘管這取決於作業系統)可能會引發 ValueError 異常

recv_bytes([maxlength])

返回從連線的另一端傳送的完整位元組資料訊息作為字串。阻塞直到有東西可以接收。如果沒有可接收的資料且另一端已關閉,則引發 EOFError

如果指定了 maxlength 並且訊息長度超過 maxlength,則會引發 OSError,並且連線將不再可讀。

3.3 版本中的變化: 此函式以前會引發 IOError,現在它是 OSError 的別名。

recv_bytes_into(buffer[, offset])

將從連線另一端傳送的完整位元組資料訊息讀取到 buffer 中,並返回訊息中的位元組數。阻塞直到有資料可接收。如果沒有剩餘資料可接收且另一端已關閉,則引發 EOFError

buffer 必須是可寫的 類位元組物件。如果指定了 offset,則訊息將從緩衝區的該位置寫入。offset 必須是一個非負整數,小於 buffer 的長度(以位元組為單位)。

如果緩衝區太短,則會引發 BufferTooShort 異常,並且完整訊息可在 e.args[0] 中獲取,其中 e 是異常例項。

3.3 版本中的變化: 連線物件現在可以透過 Connection.send()Connection.recv() 在程序之間傳輸。

連線物件現在也支援上下文管理協議——參見 上下文管理器型別__enter__() 返回連線物件,而 __exit__() 呼叫 close()

例如:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

由於 Connection.recv() 方法會自動解包它接收到的資料,這可能帶來安全風險,除非您信任傳送訊息的程序。

因此,除非連線物件是使用 Pipe() 產生的,否則您應該在執行某種形式的身份驗證之後才使用 recv()send() 方法。參見 認證金鑰

警告

如果一個程序在嘗試讀寫管道時被終止,則管道中的資料很可能會損壞,因為可能無法確定訊息邊界的位置。

同步原語

通常,同步原語在多程序程式中不像在多執行緒程式中那樣必要。請參閱 threading 模組的文件。

請注意,也可以透過使用管理器物件來建立同步原語——參見 管理器

class multiprocessing.Barrier(parties[, action[, timeout]])

一個屏障物件:threading.Barrier 的克隆。

在 3.3 版本加入。

class multiprocessing.BoundedSemaphore([value])

一個有界訊號量物件:與 threading.BoundedSemaphore 緊密相似。

與其緊密相似的類有一個唯一的區別:其 acquire 方法的第一個引數名為 block,這與 Lock.acquire() 保持一致。

備註

在 macOS 上,這與 Semaphore 沒有區別,因為 sem_getvalue() 在該平臺上未實現。

class multiprocessing.Condition([lock])

一個條件變數:threading.Condition 的別名。

如果指定了 lock,它應該是一個來自 multiprocessing 模組的 LockRLock 物件。

3.3 版更改: 添加了 wait_for() 方法。

class multiprocessing.Event

threading.Event 的克隆。

class multiprocessing.Lock

一個非遞迴鎖物件:與 threading.Lock 緊密相似。一旦某個程序或執行緒獲得了鎖,任何其他程序或執行緒的後續獲取嘗試都將被阻塞,直到鎖被釋放;任何程序或執行緒都可以釋放它。threading.Lock 應用於執行緒的概念和行為,在這裡被複制到 multiprocessing.Lock 並應用於程序或執行緒,但另有說明的除外。

請注意,Lock 實際上是一個工廠函式,它返回一個用預設上下文初始化的 multiprocessing.synchronize.Lock 例項。

Lock 支援 上下文管理器 協議,因此可以在 with 語句中使用。

acquire(block=True, timeout=None)

獲取鎖,阻塞或非阻塞。

block 引數設定為 True(預設值)時,方法呼叫將阻塞,直到鎖處於未鎖定狀態,然後將其設定為鎖定狀態並返回 True。請注意,這個第一個引數的名稱與 threading.Lock.acquire() 中的不同。

block 引數設定為 False 時,方法呼叫不阻塞。如果鎖當前處於鎖定狀態,則返回 False;否則將鎖設定為鎖定狀態並返回 True

當以一個正浮點值作為 timeout 呼叫時,最多阻塞 timeout 指定的秒數,只要鎖無法被獲取。當以負值作為 timeout 呼叫時,等同於 timeout 為零。當以 None(預設值)作為 timeout 呼叫時,超時週期設定為無限。請注意,對 timeout 的負值或 None 值的處理與 threading.Lock.acquire() 中實現的行為不同。如果 block 引數設定為 False,則 timeout 引數沒有實際意義,因此會被忽略。如果鎖已被獲取,則返回 True,如果超時週期已過,則返回 False

release()

釋放鎖。這可以從任何程序或執行緒呼叫,而不僅僅是最初獲取鎖的程序或執行緒。

行為與 threading.Lock.release() 相同,不同之處在於,當在一個未鎖定(已釋放)的鎖上呼叫時,會引發 ValueError

locked()

返回一個布林值,指示此物件當前是否已鎖定。

在 3.14 版本加入。

class multiprocessing.RLock

一個可重入鎖物件:與 threading.RLock 緊密相似。可重入鎖必須由獲取它的程序或執行緒釋放。一旦一個程序或執行緒獲取了一個可重入鎖,同一個程序或執行緒可以在不阻塞的情況下再次獲取它;該程序或執行緒必須為每次獲取釋放一次。

請注意,RLock 實際上是一個工廠函式,它返回一個用預設上下文初始化的 multiprocessing.synchronize.RLock 例項。

RLock 支援 上下文管理器 協議,因此可以在 with 語句中使用。

acquire(block=True, timeout=None)

獲取鎖,阻塞或非阻塞。

當以 block 引數設定為 True 呼叫時,阻塞直到鎖處於未鎖定狀態(不被任何程序或執行緒擁有),除非鎖已被當前程序或執行緒擁有。當前程序或執行緒然後獲取鎖的所有權(如果它還沒有所有權),並且鎖內部的遞迴級別增加一,導致返回值為 True。請注意,與 threading.RLock.acquire() 的實現相比,這個第一個引數的行為有幾個不同之處,從引數本身的名稱開始。

當以 block 引數設定為 False 呼叫時,不阻塞。如果鎖已被其他程序或執行緒獲取(並因此被擁有),當前程序或執行緒不獲取所有權,並且鎖內的遞迴級別不改變,導致返回值為 False。如果鎖處於未鎖定狀態,當前程序或執行緒獲取所有權,並且遞迴級別遞增,導致返回值為 True

timeout 引數的使用和行為與 Lock.acquire() 中相同。請注意,timeout 的某些行為與 threading.RLock.acquire() 中實現的行為不同。

release()

釋放鎖,遞減遞迴級別。如果遞減後遞迴級別為零,則將鎖重置為未鎖定(不被任何程序或執行緒擁有),並且如果任何其他程序或執行緒正在阻塞等待鎖變為未鎖定,則允許其中一個程序或執行緒繼續。如果遞減後遞迴級別仍非零,則鎖保持鎖定狀態並由呼叫程序或執行緒擁有。

僅當呼叫程序或執行緒擁有鎖時才呼叫此方法。如果此方法由非所有者的程序或執行緒呼叫,或者如果鎖處於未鎖定(未擁有)狀態,則會引發 AssertionError。請注意,在這種情況下引發的異常型別與 threading.RLock.release() 中實現的行為不同。

locked()

返回一個布林值,指示此物件當前是否已鎖定。

在 3.14 版本加入。

class multiprocessing.Semaphore([value])

一個訊號量物件:與 threading.Semaphore 緊密相似。

與其緊密相似的類有一個唯一的區別:其 acquire 方法的第一個引數名為 block,這與 Lock.acquire() 保持一致。

備註

在 macOS 上,sem_timedwait 不受支援,因此使用超時呼叫 acquire() 將使用休眠迴圈來模擬該函式的行為。

備註

此軟體包的某些功能需要在主機作業系統上具有功能正常的共享訊號量實現。如果沒有,multiprocessing.synchronize 模組將被停用,嘗試匯入它將導致 ImportError。有關更多資訊,請參閱 bpo-3770

共享 ctypes 物件

可以使用共享記憶體建立可由子程序繼承的共享物件。

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回從共享記憶體分配的 ctypes 物件。預設情況下,返回值實際上是該物件的同步包裝器。物件本身可以透過 Valuevalue 屬性訪問。

typecode_or_type 決定返回物件的型別:它要麼是一個 ctypes 型別,要麼是一個 array 模組使用的單字元型別碼。*args 傳遞給該型別的建構函式。

如果 lockTrue(預設值),則會建立一個新的可重入鎖物件來同步對值的訪問。如果 lockLockRLock 物件,則將使用該物件來同步對值的訪問。如果 lockFalse,則對返回物件的訪問將不會自動受鎖保護,因此它不一定“程序安全”。

諸如 += 等涉及讀寫操作的操作不是原子性的。因此,例如,如果你想原子地遞增一個共享值,僅僅這樣做是不夠的:

counter.value += 1

假設關聯的鎖是遞迴的(預設情況下是),你可以這樣做:

with counter.get_lock():
    counter.value += 1

請注意,lock 是一個僅限關鍵字的引數。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回從共享記憶體分配的 ctypes 陣列。預設情況下,返回值實際上是該陣列的同步包裝器。

typecode_or_type 決定返回陣列元素的型別:它要麼是一個 ctypes 型別,要麼是一個 array 模組使用的單字元型別碼。如果 size_or_initializer 是一個整數,則它決定了陣列的長度,並且陣列將最初歸零。否則,size_or_initializer 是一個用於初始化陣列的序列,其長度決定了陣列的長度。

如果 lockTrue(預設值),則會建立一個新的鎖物件來同步對值的訪問。如果 lockLockRLock 物件,則將使用該物件來同步對值的訪問。如果 lockFalse,則對返回物件的訪問將不會自動受鎖保護,因此它不一定“程序安全”。

請注意,lock 是一個僅限關鍵字的引數。

請注意,ctypes.c_char 陣列具有 valueraw 屬性,允許使用它來儲存和檢索字串。

multiprocessing.sharedctypes 模組

multiprocessing.sharedctypes 模組提供了從共享記憶體分配 ctypes 物件的函式,這些物件可以由子程序繼承。

備註

雖然可以在共享記憶體中儲存指標,但請記住這將引用特定程序地址空間中的位置。然而,該指標在第二個程序的上下文中很可能是無效的,並且嘗試從第二個程序解引用該指標可能會導致崩潰。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回從共享記憶體分配的 ctypes 陣列。

typecode_or_type 決定返回陣列元素的型別:它要麼是一個 ctypes 型別,要麼是一個 array 模組使用的單字元型別碼。如果 size_or_initializer 是一個整數,則它決定了陣列的長度,並且陣列將最初歸零。否則 size_or_initializer 是一個用於初始化陣列的序列,其長度決定了陣列的長度。

請注意,設定和獲取元素可能不是原子性的——請改用 Array() 以確保使用鎖自動同步訪問。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回從共享記憶體分配的 ctypes 物件。

typecode_or_type 決定返回物件的型別:它要麼是一個 ctypes 型別,要麼是一個 array 模組使用的單字元型別碼。*args 傳遞給該型別的建構函式。

請注意,設定和獲取值可能不是原子性的——請改用 Value() 以確保使用鎖自動同步訪問。

請注意,ctypes.c_char 陣列具有 valueraw 屬性,允許使用它來儲存和檢索字串——請參閱 ctypes 的文件。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

RawArray() 相同,不同之處在於,根據 lock 的值,可能會返回一個程序安全的同步包裝器,而不是原始的 ctypes 陣列。

如果 lockTrue(預設值),則會建立一個新的鎖物件來同步對值的訪問。如果 lockLockRLock 物件,則將使用該物件來同步對值的訪問。如果 lockFalse,則對返回物件的訪問將不會自動受鎖保護,因此它不一定“程序安全”。

請注意,lock 是一個僅限關鍵字的引數。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

RawValue() 相同,不同之處在於,根據 lock 的值,可能會返回一個程序安全的同步包裝器,而不是原始的 ctypes 物件。

如果 lockTrue(預設值),則會建立一個新的鎖物件來同步對值的訪問。如果 lockLockRLock 物件,則將使用該物件來同步對值的訪問。如果 lockFalse,則對返回物件的訪問將不會自動受鎖保護,因此它不一定“程序安全”。

請注意,lock 是一個僅限關鍵字的引數。

multiprocessing.sharedctypes.copy(obj)

返回一個從共享記憶體分配的 ctypes 物件,它是 ctypes 物件 obj 的副本。

multiprocessing.sharedctypes.synchronized(obj[, lock])

返回 ctypes 物件的程序安全包裝器物件,該包裝器使用 lock 同步訪問。如果 lockNone(預設值),則會自動建立一個 multiprocessing.RLock 物件。

同步包裝器除了包裝物件的方法外,還有兩個方法:get_obj() 返回被包裝的物件,get_lock() 返回用於同步的鎖物件。

請注意,透過包裝器訪問 ctypes 物件可能比直接訪問原始 ctypes 物件慢得多。

3.5 版更改: 同步物件支援 上下文管理器 協議。

下表比較了使用共享記憶體建立共享 ctypes 物件的語法與普通 ctypes 語法的區別。(在表中 MyStructctypes.Structure 的某個子類。)

ctypes

使用型別時的 sharedctypes

使用型別碼時的 sharedctypes

c_double(2.4)

RawValue(c_double, 2.4)

RawValue('d', 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray('h', 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray('i', (9, 2, 8))

下面是一個示例,其中子程序修改了多個 ctypes 物件

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

列印的結果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

管理器

管理器提供了一種建立可以在不同程序之間共享資料的方式,包括透過網路在不同機器上執行的程序之間共享資料。管理器物件控制一個管理 共享物件 的伺服器程序。其他程序可以使用代理訪問共享物件。

multiprocessing.Manager()

返回一個已啟動的 SyncManager 物件,可用於在程序之間共享物件。返回的管理器物件對應於一個已派生的子程序,並具有建立共享物件和返回相應代理的方法。

管理器程序會在它們被垃圾回收或其父程序退出時立即關閉。管理器類定義在 multiprocessing.managers 模組中

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

建立一個 BaseManager 物件。

建立後,應呼叫 start()get_server().serve_forever(),以確保管理器物件引用已啟動的管理器程序。

address 是管理器程序監聽新連線的地址。如果 addressNone,則會選擇一個任意地址。

authkey 是身份驗證金鑰,用於檢查傳入到伺服器程序的連線的有效性。如果 authkeyNone,則使用 current_process().authkey。否則,使用 authkey,它必須是一個位元組字串。

serializer 必須是 'pickle'(使用 pickle 序列化)或 'xmlrpclib'(使用 xmlrpc.client 序列化)。

ctx 是一個上下文物件,或 None(使用當前上下文)。參見 get_context() 函式。

shutdown_timeout 是一個以秒為單位的超時時間,用於等待管理器使用的程序在 shutdown() 方法中完成。如果關閉超時,則終止程序。如果終止程序也超時,則殺死程序。

3.11 版本中的變化: 添加了 shutdown_timeout 引數。

start([initializer[, initargs]])

啟動一個子程序來啟動管理器。如果 initializer 不是 None,那麼子程序啟動時將呼叫 initializer(*initargs)

get_server()

返回一個 Server 物件,該物件代表管理器控制下的實際伺服器。Server 物件支援 serve_forever() 方法。

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server 還有一個 address 屬性。

connect()

將本地管理器物件連線到遠端管理器程序。

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

停止管理器使用的程序。這僅在已使用 start() 啟動伺服器程序時才可用。

可以多次呼叫此方法。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

一個類方法,可用於向管理器類註冊型別或可呼叫物件。

typeid 是一個“型別識別符號”,用於標識特定型別的共享物件。它必須是一個字串。

callable 是一個可呼叫物件,用於為此型別識別符號建立物件。如果管理器例項將使用 connect() 方法連線到伺服器,或者如果 create_method 引數為 False,則可以將其保留為 None

proxytypeBaseProxy 的子類,用於為此 typeid 建立共享物件的代理。如果為 None,則會自動建立一個代理類。

exposed 用於指定一組方法名稱,對於此 typeid 的代理應允許使用 BaseProxy._callmethod() 訪問這些方法。(如果 exposedNone,則改為使用 proxytype._exposed_(如果存在)。)如果未指定暴露列表,則共享物件的所有“公共方法”都將可訪問。(此處“公共方法”指任何具有 __call__() 方法且名稱不以 '_' 開頭的屬性。)

method_to_typeid 是一個對映,用於指定那些應該返回代理的暴露方法的返回型別。它將方法名稱對映到 typeid 字串。(如果 method_to_typeidNone,則改為使用 proxytype._method_to_typeid_(如果存在)。)如果方法名稱不是此對映的鍵,或者如果對映為 None,則方法返回的物件將按值複製。

create_method 決定是否應該建立一個名稱為 typeid 的方法,該方法可用於告訴伺服器程序建立一個新的共享物件並返回其代理。預設情況下,它為 True

BaseManager 例項還具有一個只讀屬性

address

管理器使用的地址。

3.3 版本中的變化: 管理器物件支援上下文管理協議——參見 上下文管理器型別__enter__() 啟動伺服器程序(如果尚未啟動),然後返回管理器物件。__exit__() 呼叫 shutdown()

在以前的版本中,如果管理器的伺服器程序尚未啟動,__enter__() 不會啟動它。

class multiprocessing.managers.SyncManager

BaseManager 的子類,可用於程序同步。multiprocessing.Manager() 返回此類物件。

它的方法為一些常用的資料型別建立並返回 代理物件,這些資料型別可以在程序間同步。這尤其包括共享列表和字典。

Barrier(parties[, action[, timeout]])

建立一個共享的 threading.Barrier 物件並返回其代理。

在 3.3 版本加入。

BoundedSemaphore([value])

建立一個共享的 threading.BoundedSemaphore 物件並返回其代理。

Condition([lock])

建立一個共享的 threading.Condition 物件並返回其代理。

如果提供了 lock,則它應該是一個 threading.Lockthreading.RLock 物件的代理。

3.3 版更改: 添加了 wait_for() 方法。

Event()

建立一個共享的 threading.Event 物件並返回其代理。

Lock()

建立一個共享的 threading.Lock 物件並返回其代理。

Namespace()

建立一個共享的 Namespace 物件並返回其代理。

Queue([maxsize])

建立一個共享的 queue.Queue 物件並返回其代理。

RLock()

建立一個共享的 threading.RLock 物件並返回其代理。

Semaphore([value])

建立一個共享的 threading.Semaphore 物件並返回其代理。

Array(typecode, sequence)

建立一個數組並返回其代理。

Value(typecode, value)

建立一個具有可寫 value 屬性的物件並返回其代理。

dict()
dict(mapping)
dict(sequence)

建立一個共享的 dict 物件並返回其代理。

list()
list(sequence)

建立一個共享的 list 物件並返回其代理。

set()
set(sequence)
set(mapping)

建立一個共享的 set 物件並返回其代理。

3.14 版本新增: 添加了 set 支援。

3.6 版本中的變化: 共享物件可以巢狀。例如,一個共享容器物件(如共享列表)可以包含其他共享物件,這些物件都將由 SyncManager 管理和同步。

class multiprocessing.managers.Namespace

一種可以向 SyncManager 註冊的型別。

名稱空間物件沒有公共方法,但有可寫屬性。它的表示顯示其屬性的值。

然而,在使用名稱空間物件的代理時,以 '_' 開頭的屬性將是代理的屬性,而不是引用物件的屬性。

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

自定義管理器

要建立自己的管理器,可以建立一個 BaseManager 的子類,並使用 register() 類方法向管理器類註冊新的型別或可呼叫物件。例如:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

使用遠端管理器

可以在一臺機器上執行管理器伺服器,並讓客戶端從其他機器上使用它(假設相關的防火牆允許)。

執行以下命令會建立一個用於單個共享佇列的伺服器,遠端客戶端可以訪問它:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一個客戶端可以如下訪問伺服器:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一個客戶端也可以使用它:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地程序也可以訪問該佇列,使用上面客戶端的程式碼來遠端訪問它:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

代理物件

代理是一個**引用**共享物件(假定位於不同程序中)的物件。共享物件被稱為代理的**引用物件**。多個代理物件可以具有相同的引用物件。

代理物件具有呼叫其引用物件相應方法的方法(儘管並非引用物件的每個方法都一定能透過代理訪問)。這樣,代理可以像其引用物件一樣使用:

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

請注意,對代理應用 str() 將返回引用物件的表示,而應用 repr() 將返回代理的表示。

代理物件的一個重要特性是它們是可序列化的,因此可以在程序之間傳遞。因此,引用物件可以包含 代理物件。這允許這些託管列表、字典和其他 代理物件 的巢狀。

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

同樣,字典和列表代理可以相互巢狀:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

如果標準(非代理)listdict 物件包含在引用物件中,則對這些可變值的修改不會透過管理器傳播,因為代理無法知道其中包含的值何時被修改。但是,將值儲存在容器代理中(這會觸發代理物件上的 __setitem__)會透過管理器傳播,因此要有效修改此類項,可以將被修改的值重新分配給容器代理:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

對於大多數用例來說,這種方法可能不如使用巢狀的 代理物件 方便,但也展示了對同步的控制級別。

備註

multiprocessing 中的代理型別不支援按值比較。因此,例如,我們有:

>>> manager.list([1,2,3]) == [1,2,3]
False

進行比較時,應該直接使用引用物件的副本。

class multiprocessing.managers.BaseProxy

代理物件是 BaseProxy 子類的例項。

_callmethod(methodname[, args[, kwds]])

呼叫並返回代理引用物件方法的執行結果。

如果 proxy 是一個代理,其引用物件是 obj,那麼表示式

proxy._callmethod(methodname, args, kwds)

將計算表示式

getattr(obj, methodname)(*args, **kwds)

在管理器程序中。

返回的值將是呼叫結果的副本,或者是新共享物件的代理——請參閱 BaseManager.register()method_to_typeid 引數文件。

如果在呼叫時引發異常,則 _callmethod() 將重新引發該異常。如果管理器程序中引發了其他異常,則將其轉換為 RemoteError 異常,並由 _callmethod() 重新引發。

特別注意,如果 methodname 未被**暴露**,則會引發異常。

使用 _callmethod() 的示例:

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回引用物件的副本。

如果引用物件不可序列化,則會引發異常。

__repr__()

返回代理物件的表示。

__str__()

返回引用物件的表示。

清理

代理物件使用弱引用回撥,這樣當它被垃圾回收時,它會從擁有其引用物件的管理器中登出自己。

當不再有任何代理引用共享物件時,該物件將從管理器程序中刪除。

程序池

可以使用 Pool 類建立程序池,以執行提交給它的任務。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一個程序池物件,控制著一個工作程序池,可以將任務提交給它。它支援帶超時和回撥的非同步結果,並具有並行對映實現。

processes 是要使用的工人程序數。如果 processesNone,則使用 os.process_cpu_count() 返回的數字。

如果 initializer 不是 None,則每個工作程序啟動時將呼叫 initializer(*initargs)

maxtasksperchild 是一個工作程序在退出並被一個新的工作程序替換之前可以完成的任務數量,以便釋放未使用的資源。預設的 maxtasksperchildNone,這意味著工作程序將與池一樣長壽。

context 可用於指定啟動工作程序所用的上下文。通常,池是使用函式 multiprocessing.Pool() 或上下文物件的 Pool() 方法建立的。在這兩種情況下,context 都被適當地設定。

請注意,池物件的方法只能由建立池的程序呼叫。

警告

multiprocessing.pool 物件具有需要透過將池用作上下文管理器或手動呼叫 close()terminate() 來正確管理(像任何其他資源一樣)的內部資源。未能這樣做可能導致程序在終結時掛起。

請注意,依賴垃圾回收器銷燬池是不正確的,因為 CPython 不保證會呼叫池的終結器(更多資訊請參見 object.__del__())。

3.2 版本中已更改: 添加了 maxtasksperchild 引數。

3.4 版本中已更改: 添加了 context 引數。

3.13 版本中已更改: processes 預設使用 os.process_cpu_count(),而不是 os.cpu_count()

備註

Pool 中的工作程序通常在池的工作佇列的整個生命週期記憶體在。其他系統(如 Apache、mod_wsgi 等)中常見的釋放工作程序所佔用的資源模式是允許池中的工作程序在退出、清理並生成新程序替換舊程序之前僅完成一定量的工作。Poolmaxtasksperchild 引數向終端使用者公開了此功能。

apply(func[, args[, kwds]])

使用引數 args 和關鍵字引數 kwds 呼叫 func。它會阻塞直到結果就緒。考慮到這種阻塞,apply_async() 更適合並行執行工作。此外,func 只在池中的一個工作程序中執行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

apply() 方法的一個變體,它返回一個 AsyncResult 物件。

如果指定了 callback,它應該是一個接受單個引數的可呼叫物件。當結果就緒時,callback 會應用於它,除非呼叫失敗,在這種情況下會改為應用 error_callback

如果指定了 error_callback,它應該是一個接受單個引數的可呼叫物件。如果目標函式失敗,則會使用異常例項呼叫 error_callback

回撥函式應該立即完成,否則處理結果的執行緒將阻塞。

map(func, iterable[, chunksize])

內建函式 map() 的並行等效版本(它只支援一個 iterable 引數,對於多個可迭代物件請參閱 starmap())。它會阻塞直到結果就緒。

此方法將可迭代物件切分成若干塊,並將其作為單獨的任務提交給程序池。這些塊的(近似)大小可以透過將 chunksize 設定為正整數來指定。

請注意,對於非常長的可迭代物件,這可能會導致高記憶體使用率。為了提高效率,請考慮使用帶有顯式 chunksize 選項的 imap()imap_unordered()

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

map() 方法的一個變體,它返回一個 AsyncResult 物件。

如果指定了 callback,它應該是一個接受單個引數的可呼叫物件。當結果就緒時,callback 會應用於它,除非呼叫失敗,在這種情況下會改為應用 error_callback

如果指定了 error_callback,它應該是一個接受單個引數的可呼叫物件。如果目標函式失敗,則會使用異常例項呼叫 error_callback

回撥函式應該立即完成,否則處理結果的執行緒將阻塞。

imap(func, iterable[, chunksize])

map() 的一個更懶惰的版本。

chunksize 引數與 map() 方法使用的引數相同。對於非常長的可迭代物件,使用較大的 chunksize 值可以使作業完成速度比使用預設值 1 快得多

此外,如果 chunksize1,則 imap() 方法返回的迭代器的 next() 方法有一個可選的 timeout 引數:如果結果不能在 timeout 秒內返回,next(timeout) 將引發 multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

imap() 相同,只是返回的迭代器中結果的順序應被視為任意。(只有當只有一個工作程序時,順序才保證是“正確”的。)

starmap(func, iterable[, chunksize])

類似於 map(),只是 iterable 的元素預期是作為引數解包的可迭代物件。

因此,一個 [(1,2), (3, 4)]iterable 會得到 [func(1,2), func(3,4)]

在 3.3 版本加入。

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

starmap()map_async() 的組合,它遍歷可迭代物件中的可迭代物件,並使用解包後的可迭代物件呼叫 func。返回一個結果物件。

在 3.3 版本加入。

close()

阻止向池提交更多工。一旦所有任務完成,工作程序將退出。

terminate()

立即停止工作程序,不完成未完成的工作。當池物件被垃圾回收時,terminate() 將立即被呼叫。

join()

等待工作程序退出。在使用 join() 之前必須呼叫 close()terminate()

3.3 版本中已更改: 池物件現在支援上下文管理協議 – 參見 上下文管理器型別__enter__() 返回池物件,__exit__() 呼叫 terminate()

class multiprocessing.pool.AsyncResult

Pool.apply_async()Pool.map_async() 返回的結果的類。

get([timeout])

結果到達時返回結果。如果 timeout 不是 None 並且結果未在 timeout 秒內到達,則會引發 multiprocessing.TimeoutError。如果遠端呼叫引發異常,則 get() 將重新引發該異常。

wait([timeout])

等待直到結果可用或直到 timeout 秒過去。

ready()

返回呼叫是否已完成。

successful()

返回呼叫是否成功完成而未引發異常。如果結果尚未就緒,則會引發 ValueError

3.7 版本中已更改: 如果結果尚未就緒,則引發 ValueError 而不是 AssertionError

以下示例演示了池的使用

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

監聽器和客戶端

通常,程序間的訊息傳遞是透過佇列或使用 Pipe() 返回的 Connection 物件完成的。

然而,multiprocessing.connection 模組提供了一些額外的靈活性。它基本上為處理套接字或 Windows 命名管道提供了高階的面向訊息的 API。它還支援使用 hmac 模組進行摘要認證,並支援同時輪詢多個連線。

multiprocessing.connection.deliver_challenge(connection, authkey)

向連線的另一端傳送一個隨機生成的訊息並等待回覆。

如果回覆與使用 authkey 作為金鑰的訊息摘要匹配,則向連線的另一端傳送歡迎訊息。否則會引發 AuthenticationError

multiprocessing.connection.answer_challenge(connection, authkey)

接收訊息,使用 authkey 作為金鑰計算訊息的摘要,然後將摘要傳送回去。

如果未收到歡迎訊息,則會引發 AuthenticationError

multiprocessing.connection.Client(address[, family[, authkey]])

嘗試建立與使用地址 address 的監聽器的連線,返回一個 Connection

連線的型別由 family 引數決定,但通常可以省略,因為它通常可以從 address 的格式中推斷出來。(參見 地址格式

如果給出了 authkey 且不為 None,它應該是一個位元組字串,並將用作基於 HMAC 認證挑戰的金鑰。如果 authkeyNone,則不進行認證。如果認證失敗,則會引發 AuthenticationError。參見 認證金鑰

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

一個已繫結套接字或 Windows 命名管道的包裝器,它“監聽”連線。

address 是監聽器物件繫結的套接字或命名管道要使用的地址。

備註

如果使用地址“0.0.0.0”,則該地址在 Windows 上將不是可連線的端點。如果需要可連線的端點,應使用“127.0.0.1”。

family 是要使用的套接字(或命名管道)的型別。它可以是字串 'AF_INET' (用於 TCP 套接字),'AF_UNIX' (用於 Unix 域套接字) 或 'AF_PIPE' (用於 Windows 命名管道)。其中只有第一個保證可用。如果 familyNone,則從 address 的格式推斷 family。如果 address 也是 None,則選擇一個預設值。此預設值被認為是可用的最快的 family。請參閱 地址格式。請注意,如果 family'AF_UNIX' 並且 address 是 None,則套接字將使用 tempfile.mkstemp() 建立的私有臨時目錄中建立。

如果監聽器物件使用套接字,則 backlog (預設為 1) 在套接字繫結後傳遞給套接字的 listen() 方法。

如果給出了 authkey 且不為 None,它應該是一個位元組字串,並將用作基於 HMAC 認證挑戰的金鑰。如果 authkeyNone,則不進行認證。如果認證失敗,則會引發 AuthenticationError。參見 認證金鑰

accept()

接受監聽器物件的繫結套接字或命名管道上的連線,並返回一個 Connection 物件。如果嘗試認證失敗,則會引發 AuthenticationError

close()

關閉監聽器物件的繫結套接字或命名管道。當監聽器被垃圾回收時,會自動呼叫此方法。然而,建議顯式呼叫它。

監聽器物件具有以下只讀屬性

address

監聽器物件正在使用的地址。

last_accepted

上次接受連線的地址。如果不可用,則為 None

3.3 版本中已更改: 監聽器物件現在支援上下文管理協議 – 參見 上下文管理器型別__enter__() 返回監聽器物件,__exit__() 呼叫 close()

multiprocessing.connection.wait(object_list, timeout=None)

等待 object_list 中的某個物件就緒。返回 object_list 中已就緒的那些物件的列表。如果 timeout 是浮點數,則呼叫最多阻塞該秒數。如果 timeoutNone,則會無限期阻塞。負數超時等效於零超時。

對於 POSIX 和 Windows,如果物件是以下型別,它就可以出現在 object_list 中:

當有資料可從中讀取,或者另一端已關閉時,連線或套接字物件就緒。

POSIX: wait(object_list, timeout) 幾乎等同於 select.select(object_list, [], [], timeout)。區別在於,如果 select.select() 被訊號中斷,它可能會引發錯誤號為 EINTROSError,而 wait() 則不會。

Windows: object_list 中的項必須是可等待的整數控制代碼(根據 Win32 函式 WaitForMultipleObjects() 文件中使用的定義)或者可以是具有 fileno() 方法的物件,該方法返回套接字控制代碼或管道控制代碼。(請注意,管道控制代碼和套接字控制代碼不是可等待的控制代碼。)

在 3.3 版本加入。

示例:

以下伺服器程式碼建立一個使用 'secret password' 作為認證金鑰的監聽器。然後它等待連線並向客戶端傳送一些資料

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

以下程式碼連線到伺服器並從伺服器接收一些資料

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

以下程式碼使用 wait() 同時等待來自多個程序的訊息

from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

地址格式

  • 一個 'AF_INET' 地址是一個 (hostname, port) 形式的元組,其中 hostname 是一個字串,而 port 是一個整數。

  • 一個 'AF_UNIX' 地址是一個表示檔案系統上檔名的字串。

  • 一個 'AF_PIPE' 地址是 r'\\.\pipe\PipeName' 形式的字串。要使用 Client() 連線到遠端計算機 ServerName 上的命名管道,應該使用 r'\\ServerName\pipe\PipeName' 形式的地址。

請注意,任何以兩個反斜槓開頭的字串預設都被認為是 'AF_PIPE' 地址,而不是 'AF_UNIX' 地址。

認證金鑰

當使用 Connection.recv 時,接收到的資料會自動解封裝。不幸的是,從不受信任的來源解封裝資料存在安全風險。因此,ListenerClient() 使用 hmac 模組提供摘要認證。

認證金鑰是一個位元組字串,可以看作是一個密碼:一旦建立連線,兩端都將要求對方提供知道認證金鑰的證據。(證明兩端使用相同的金鑰 涉及在連線上傳輸金鑰。)

如果請求認證但未指定認證金鑰,則使用 current_process().authkey 的返回值(參見 Process)。當前程序建立的任何 Process 物件都將自動繼承此值。這意味著(預設情況下)多程序程式的所有程序將共享一個認證金鑰,該金鑰可用於在它們之間建立連線。

也可以透過使用 os.urandom() 生成合適的認證金鑰。

日誌記錄

提供了一些日誌記錄支援。但是,請注意 logging 包不使用程序共享鎖,因此(取決於處理程式型別)不同程序的訊息可能會混淆。

multiprocessing.get_logger()

返回 multiprocessing 使用的日誌記錄器。如果需要,將建立一個新的日誌記錄器。

首次建立時,日誌記錄器的級別為 logging.NOTSET,沒有預設處理程式。傳送到此日誌記錄器的訊息預設不會傳播到根日誌記錄器。

請注意,在 Windows 上,子程序只會繼承父程序日誌記錄器的級別——日誌記錄器的任何其他自定義都不會被繼承。

multiprocessing.log_to_stderr(level=None)

此函式呼叫 get_logger(),但除了返回由 get_logger 建立的日誌記錄器之外,它還添加了一個處理程式,使用格式 '[%(levelname)s/%(processName)s] %(message)s' 將輸出傳送到 sys.stderr。您可以透過傳遞 level 引數來修改日誌記錄器的 levelname

下面是一個開啟日誌記錄的示例會話

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

有關日誌記錄級別的完整表格,請參閱 logging 模組。

multiprocessing.dummy 模組

multiprocessing.dummy 複製了 multiprocessing 的 API,但它僅僅是 threading 模組的一個包裝器。

特別是,由 multiprocessing.dummy 提供的 Pool 函式返回 ThreadPool 的一個例項,它是 Pool 的一個子類,支援所有相同的方法呼叫,但使用的是工作執行緒池而不是工作程序池。

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

一個執行緒池物件,它控制一個工作執行緒池,可以將作業提交給這些工作執行緒。 ThreadPool 例項與 Pool 例項完全介面相容,並且它們的資源也必須得到妥善管理,要麼透過將池用作上下文管理器,要麼手動呼叫 close()terminate()

processes 是要使用的工作執行緒數。如果 processesNone,則使用 os.process_cpu_count() 返回的數字。

如果 initializer 不是 None,則每個工作程序啟動時將呼叫 initializer(*initargs)

Pool 不同,不能提供 maxtasksperchildcontext

備註

ThreadPool 共享與 Pool 相同的介面,該介面是圍繞程序池設計的,並且早於 concurrent.futures 模組的引入。因此,它繼承了一些對由執行緒支援的池沒有意義的操作,並且它有自己的型別來表示非同步作業的狀態,即 AsyncResult,該型別不被任何其他庫所理解。

使用者通常應優先使用 concurrent.futures.ThreadPoolExecutor,它具有更簡單的介面,從一開始就是圍繞執行緒設計的,並且返回 concurrent.futures.Future 例項,這些例項與許多其他庫相容,包括 asyncio

程式設計指南

在使用 multiprocessing 時,應遵循某些指南和慣例。

所有啟動方法

以下適用於所有啟動方法。

避免共享狀態

應儘可能避免在程序之間傳輸大量資料。

最好堅持使用佇列或管道進行程序間通訊,而不是使用較低級別的同步原語。

可封裝性

確保代理方法的所有引數都是可封裝的。

代理的執行緒安全性

除非您使用鎖保護代理物件,否則不要從多個執行緒使用它。

(不同的程序使用 同一個 代理永遠不會有問題。)

連線殭屍程序

在 POSIX 上,當一個程序完成但尚未被連線時,它會成為殭屍程序。殭屍程序的數量不應該太多,因為每次啟動新程序(或呼叫 active_children())時,所有已完成但尚未連線的程序都將被連線。此外,呼叫已完成程序的 Process.is_alive 也將連線該程序。即便如此,顯式連線您啟動的所有程序可能仍是良好的實踐。

繼承優於封裝/解封裝

當使用 spawnforkserver 啟動方法時,multiprocessing 中的許多型別需要可封裝,以便子程序可以使用它們。但是,通常應避免使用管道或佇列將共享物件傳送到其他程序。相反,您應該安排程式,以便需要訪問在其他地方建立的共享資源的程序可以從祖先程序繼承它。

避免終止程序

使用 Process.terminate 方法停止程序可能會導致該程序當前使用的任何共享資源(例如鎖、訊號量、管道和佇列)損壞或對其他程序不可用。

因此,最好只考慮對從不使用任何共享資源的程序使用 Process.terminate

連線使用佇列的程序

請記住,已將項放入佇列的程序在終止之前會等待,直到“饋送器”執行緒將所有緩衝項饋送到底層管道。(子程序可以呼叫佇列的 Queue.cancel_join_thread 方法來避免此行為。)

這意味著無論何時使用佇列,您都需要確保在連線程序之前,所有已放入佇列的項最終都會被移除。否則,您無法確定已將項放入佇列的程序是否會終止。還要記住,非守護程序會自動連線。

以下是一個會死鎖的示例

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

這裡的解決方法是交換最後兩行(或簡單地刪除 p.join() 行)。

顯式地將資源傳遞給子程序

在 POSIX 上,使用 fork 啟動方法時,子程序可以使用在父程序中建立的共享資源(透過全域性資源)。然而,最好將物件作為引數傳遞給子程序的建構函式。

除了使程式碼(可能)與 Windows 和其他啟動方法相容之外,這還確保只要子程序仍然存活,物件就不會在父程序中被垃圾回收。如果父程序中物件被垃圾回收時釋放了一些資源,這可能很重要。

例如

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

應該改寫為

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

小心不要用“類檔案物件”替換 sys.stdin

multiprocessing 最初無條件地呼叫

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法中——這導致了程序內程序的問題。這已更改為

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

這解決了程序相互衝突導致壞檔案描述符錯誤的基本問題,但給將 sys.stdin() 替換為帶有輸出緩衝的“類檔案物件”的應用程式帶來了潛在危險。這種危險是,如果多個程序在此類檔案物件上呼叫 close(),則可能導致相同的資料多次重新整理到物件中,從而導致損壞。

如果您編寫一個類檔案物件並實現自己的快取,您可以透過在每次向快取追加時儲存程序 ID,並在程序 ID 更改時丟棄快取來使其成為 fork-safe。例如

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有關更多資訊,請參閱 bpo-5155bpo-5313bpo-5331

spawnforkserver 啟動方法

有一些額外的限制不適用於 fork 啟動方法。

更強的可封裝性

確保 Process 的所有引數都是可封裝的。此外,如果您子類化 Process.__init__,您必須確保在呼叫 Process.start 方法時例項是可封裝的。

全域性變數

請記住,如果在子程序中執行的程式碼嘗試訪問全域性變數,那麼它看到的值(如果有)可能與在呼叫 Process.start 時父程序中的值不同。

然而,僅僅是模組級別常量的全域性變數不會引起任何問題。

主模組的安全匯入

確保新的 Python 直譯器可以安全地匯入主模組,而不會導致意外的副作用(例如啟動新程序)。

例如,使用 spawnforkserver 啟動方法執行以下模組將因 RuntimeError 而失敗

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

相反,應該透過使用 if __name__ == '__main__': 來保護程式的“入口點”,如下所示

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(如果程式正常執行而不是凍結,則可以省略 freeze_support() 行。)

這允許新生成的 Python 直譯器安全地匯入模組,然後執行模組的 foo() 函式。

如果在主模組中建立了池或管理器,也適用類似的限制。

示例

演示如何建立和使用自定義管理器和代理

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

使用 Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

一個示例,展示如何使用佇列將任務饋送給一組工作程序並收集結果

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()