multiprocessing — 基於程序的並行

原始碼: Lib/multiprocessing/


可用性: 不支援 Android, 不支援 iOS, 不支援 WASI。

此模組不支援 移動平臺WebAssembly 平臺

簡介

multiprocessing 是一個支援使用類似於 threading 模組的 API 來生成程序的包。multiprocessing 包提供本地和遠端併發,透過使用子程序而不是執行緒來有效地繞過 全域性直譯器鎖。因此,multiprocessing 模組允許程式設計師充分利用給定機器上的多個處理器。它可以在 POSIX 和 Windows 上執行。

multiprocessing 模組還引入了在 threading 模組中沒有對應 API 的介面。一個典型的例子是 Pool 物件,它提供了一種方便的方式來並行化一個函式在多個輸入值上的執行,並將輸入資料分發到各個程序(資料並行)。下面的示例演示了在模組中定義此類函式的常見做法,以便子程序可以成功匯入該模組。這個使用 Pool 的資料並行基本示例,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

將會列印到標準輸出

[1, 4, 9]

另請參閱

concurrent.futures.ProcessPoolExecutor 提供了一個更高階的介面,可以將任務推送到後臺程序,而不會阻塞呼叫程序的執行。與直接使用 Pool 介面相比,concurrent.futures API 更容易將工作提交到底層程序池與等待結果的操作分離。

Process

multiprocessing 中,程序是透過建立 Process 物件,然後呼叫其 start() 方法來生成的。Process 遵循 threading.Thread 的 API。一個簡單的多程序程式示例是

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

為了展示所涉及的各個程序 ID,這裡有一個擴充套件的示例

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

關於為什麼需要 if __name__ == '__main__' 部分的解釋,請參閱 程式設計指南

上下文和啟動方法

根據平臺的不同,multiprocessing 支援三種啟動程序的方法。這些啟動方法

spawn

父程序啟動一個新的 Python 直譯器程序。子程序只會繼承執行程序物件的 run() 方法所必需的資源。特別是,父程序中不必要的檔案描述符和控制代碼不會被繼承。與使用 forkforkserver 相比,使用此方法啟動程序速度較慢。

在 POSIX 和 Windows 平臺上可用。在 Windows 和 macOS 上預設為此方法。

fork

父程序使用 os.fork() 來 fork Python 直譯器。當子程序開始時,它實際上與父程序相同。父程序的所有資源都由子程序繼承。請注意,安全地 fork 多執行緒程序是有問題的。

在 POSIX 系統上可用。目前在 POSIX 上(macOS 除外)預設為此方法。

注意

在 Python 3.14 中,預設啟動方法將不再是 fork。需要 fork 的程式碼應透過 get_context()set_start_method() 顯式指定。

在 3.12 版本中變更: 如果 Python 檢測到你的程序有多個執行緒,此啟動方法內部呼叫的 os.fork() 函式將引發一個 DeprecationWarning。請使用不同的啟動方法。有關詳細解釋,請參閱 os.fork() 文件。

forkserver

當程式啟動並選擇 *forkserver* 啟動方法時,會生成一個伺服器程序。從那時起,每當需要一個新程序時,父程序都會連線到伺服器並請求它 fork 一個新程序。除非系統庫或預載入的匯入以副作用的方式生成執行緒,否則 fork 伺服器程序是單執行緒的,因此通常可以安全地使用 os.fork()。不會繼承不必要的資源。

在支援透過 Unix 管道傳遞檔案描述符的 POSIX 平臺上可用,例如 Linux。

在 3.4 版本中變更: 在所有 POSIX 平臺上添加了 *spawn*,在某些 POSIX 平臺上添加了 *forkserver*。子程序不再繼承 Windows 上父程序的所有可繼承控制代碼。

在 3.8 版本中變更: 在 macOS 上,*spawn* 啟動方法現在是預設方法。*fork* 啟動方法應被認為是不安全的,因為它可能導致子程序崩潰,因為 macOS 系統庫可能會啟動執行緒。請參閱 bpo-33725

在 POSIX 上使用 *spawn* 或 *forkserver* 啟動方法還會啟動一個*資源跟蹤器*程序,該程序跟蹤程式程序建立的未連結的命名系統資源(例如命名訊號量或 SharedMemory 物件)。當所有程序都退出後,資源跟蹤器會取消連結任何剩餘的跟蹤物件。通常應該沒有剩餘物件,但如果程序被訊號殺死,可能會有一些“洩漏”的資源。(洩漏的訊號量和共享記憶體段都不會自動取消連結,直到下次重啟。這對於這兩個物件都是有問題的,因為系統只允許有限數量的命名訊號量,並且共享記憶體段佔用主記憶體中的一些空間。)

要選擇啟動方法,請在主模組的 if __name__ == '__main__' 子句中使用 set_start_method()。例如

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() 在程式中不應使用多次。

或者,你可以使用 get_context() 來獲取上下文物件。上下文物件與 multiprocessing 模組具有相同的 API,並允許在同一程式中使用多個啟動方法。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

請注意,與一個上下文相關的物件可能與不同上下文的程序不相容。特別是,使用 *fork* 上下文建立的鎖不能傳遞給使用 *spawn* 或 *forkserver* 啟動方法啟動的程序。

想要使用特定啟動方法的庫可能應該使用 get_context(),以避免干擾庫使用者的選擇。

警告

在 POSIX 系統上,'spawn''forkserver' 啟動方法通常不能與“凍結”的可執行檔案(即,由 PyInstallercx_Freeze 等包生成的二進位制檔案)一起使用。如果程式碼不使用執行緒,'fork' 啟動方法可能會起作用。

程序間交換物件

multiprocessing 支援程序之間的兩種型別的通訊通道

佇列

Queue 類幾乎是 queue.Queue 的克隆。例如

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

佇列是執行緒和程序安全的。放入 multiprocessing 佇列中的任何物件都將被序列化。

管道

Pipe() 函式返回一對透過管道連線的連線物件,預設情況下是雙向的(雙向)。例如

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Pipe() 返回的兩個連線物件代表管道的兩端。每個連線物件都有 send()recv() 方法(以及其他方法)。請注意,如果兩個程序(或執行緒)嘗試同時從管道的 *同一* 端讀取或寫入資料,則管道中的資料可能會損壞。當然,如果程序同時使用管道的不同端,則不存在損壞的風險。

send() 方法序列化物件,recv() 方法重新建立物件。

程序之間的同步

multiprocessing 包含來自 threading 的所有同步原語的等價物。例如,可以使用鎖來確保一次只有一個程序列印到標準輸出

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用鎖,來自不同程序的輸出很可能會混在一起。

程序之間共享狀態

如上所述,在進行併發程式設計時,通常最好儘可能避免使用共享狀態。當使用多個程序時,這一點尤其如此。

但是,如果你確實需要使用一些共享資料,那麼 multiprocessing 提供了幾種方法來實現。

共享記憶體

可以使用 ValueArray 將資料儲存在共享記憶體對映中。例如,以下程式碼

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

將列印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

在建立 numarr 時使用的 'd''i' 引數是 array 模組使用的型別程式碼:'d' 表示雙精度浮點數,'i' 表示有符號整數。這些共享物件將是程序和執行緒安全的。

為了更靈活地使用共享記憶體,可以使用 multiprocessing.sharedctypes 模組,該模組支援建立從共享記憶體分配的任意 ctypes 物件。

伺服器程序

Manager() 返回的管理器物件控制一個伺服器程序,該程序持有 Python 物件並允許其他程序使用代理來操作它們。

Manager() 返回的管理器將支援 listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray 等型別。例如,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

將列印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

伺服器程序管理器比使用共享記憶體物件更靈活,因為它們可以被設計為支援任意物件型別。此外,單個管理器可以由網路上不同計算機上的程序共享。但是,它們比使用共享記憶體慢。

使用工作程序池

Pool 類表示一個工作程序池。它具有一些方法,允許以幾種不同的方式將任務解除安裝到工作程序。

例如

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

請注意,池的方法應僅由建立它的程序使用。

注意

此包中的功能要求子程序可以匯入 __main__ 模組。這在 程式設計指南 中有介紹,但在此處指出也很有必要。這意味著某些示例(例如 multiprocessing.pool.Pool 示例)在互動式直譯器中將無法正常工作。例如:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(如果您嘗試這樣做,實際上會以半隨機的方式輸出三個完整的追溯資訊,然後您可能需要以某種方式停止父程序。)

參考

multiprocessing 包主要複製了 threading 模組的 API。

Process 和異常

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Process 物件表示在單獨程序中執行的活動。Process 類具有 threading.Thread 的所有方法的等價物。

建構函式應始終使用關鍵字引數呼叫。group 應該始終為 None;它存在僅僅是為了與 threading.Thread 相容。target 是要由 run() 方法呼叫的可呼叫物件。它預設為 None,這意味著不呼叫任何內容。name 是程序名稱(有關更多詳細資訊,請參閱 name)。args 是目標呼叫的引數元組。kwargs 是目標呼叫的關鍵字引數字典。如果提供,則僅關鍵字 daemon 引數會將程序 daemon 標誌設定為 TrueFalse。如果為 None (預設值),則此標誌將從建立程序繼承。

預設情況下,不向 target 傳遞任何引數。args 引數預設為 (),可用於指定要傳遞給 target 的引數列表或元組。

如果子類覆蓋了建構函式,則必須確保在對程序執行任何其他操作之前呼叫基類建構函式 (Process.__init__())。

在 3.3 版本中更改: 添加了 daemon 引數。

run()

表示程序活動的方法。

您可以在子類中重寫此方法。標準的 run() 方法使用從 argskwargs 引數中獲取的順序引數和關鍵字引數,呼叫傳遞給物件建構函式作為目標引數的可呼叫物件(如果有)。

使用列表或元組作為傳遞給 Processargs 引數可以達到相同的效果。

示例

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

開始程序的活動。

每個程序物件最多隻能呼叫一次。它安排在單獨的程序中呼叫物件的 run() 方法。

join([timeout])

如果可選引數 timeoutNone (預設值),則該方法會阻塞,直到呼叫其 join() 方法的程序終止。如果 timeout 是正數,則最多阻塞 timeout 秒。請注意,如果程序終止或方法超時,則該方法返回 None。檢查程序的 exitcode 以確定它是否已終止。

一個程序可以被多次連線。

程序不能加入自身,因為這會導致死鎖。嘗試在程序啟動之前加入程序是錯誤的。

name

程序的名稱。該名稱是一個字串,僅用於標識目的。它沒有語義。可以為多個程序指定相同的名稱。

初始名稱由建構函式設定。如果未向建構函式提供顯式名稱,則會構造一個形如“Process-N1:N2:…:Nk”的名稱,其中每個 Nk 是其父級的第 N 個子級。

is_alive()

返回程序是否處於活動狀態。

粗略地說,從 start() 方法返回到子程序終止時,程序物件都處於活動狀態。

daemon

程序的守護程式標誌,一個布林值。必須在呼叫 start() 之前設定此值。

初始值從建立程序繼承。

當程序退出時,它會嘗試終止其所有守護程序子程序。

請注意,不允許守護程序建立子程序。否則,如果守護程序在其父程序退出時被終止,則會留下其子程序孤立。此外,這些不是 Unix 守護程式或服務,它們是普通的程序,如果非守護程序已退出,則會被終止(而不是被連線)。

除了 threading.Thread API 之外,Process 物件還支援以下屬性和方法:

pid

返回程序 ID。在程序生成之前,此值將為 None

exitcode

子程序的退出程式碼。如果程序尚未終止,則此值為 None

如果子程序的 run() 方法正常返回,則退出程式碼將為 0。如果它透過帶有整數引數 Nsys.exit() 終止,則退出程式碼將為 N

如果子程序由於未在 run() 中捕獲的異常而終止,則退出程式碼將為 1。如果它被訊號 N 終止,則退出程式碼將為負值 -N

authkey

程序的身份驗證金鑰(位元組字串)。

multiprocessing 初始化時,主程序會被分配一個使用 os.urandom() 生成的隨機字串。

當建立 Process 物件時,它將繼承其父程序的身份驗證金鑰,儘管可以透過將 authkey 設定為另一個位元組字串來更改此金鑰。

請參閱 身份驗證金鑰

sentinel

一個系統物件的數字控制代碼,當程序結束時它將變為“就緒”。

如果您想使用 multiprocessing.connection.wait() 一次等待多個事件,則可以使用此值。否則,呼叫 join() 更簡單。

在 Windows 上,這是一個可與 WaitForSingleObjectWaitForMultipleObjects 系列 API 呼叫一起使用的 OS 控制代碼。在 POSIX 上,這是一個可與 select 模組中的原語一起使用的檔案描述符。

3.3 版本中新增。

terminate()

終止程序。在 POSIX 上,這是使用 SIGTERM 訊號完成的;在 Windows 上,使用 TerminateProcess()。請注意,不會執行退出處理程式和 finally 子句等。

請注意,程序的後代程序將不會被終止 - 它們將簡單地變為孤立程序。

警告

如果在關聯程序使用管道或佇列時使用此方法,則管道或佇列可能會損壞,並且可能無法被其他程序使用。同樣,如果該程序已獲取鎖或訊號量等,則終止它可能會導致其他程序死鎖。

kill()

terminate() 相同,但在 POSIX 上使用 SIGKILL 訊號。

3.7 版本中新增。

close()

關閉 Process 物件,釋放與其關聯的所有資源。如果底層程序仍在執行,則會引發 ValueError。一旦 close() 成功返回,Process 物件的大多數其他方法和屬性將引發 ValueError

3.7 版本中新增。

請注意,start()join()is_alive()terminate()exitcode 方法應僅由建立程序物件的程序呼叫。

一些 Process 方法的示例用法

>>> import multiprocessing, time, signal
>>> mp_context = multiprocessing.get_context('spawn')
>>> p = mp_context.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<...Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<...Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<...Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

所有 multiprocessing 異常的基類。

exception multiprocessing.BufferTooShort

當提供的緩衝區物件太小,無法讀取訊息時,Connection.recv_bytes_into() 引發的異常。

如果 eBufferTooShort 的例項,則 e.args[0] 將以位元組字串的形式給出訊息。

exception multiprocessing.AuthenticationError

當出現身份驗證錯誤時引發。

exception multiprocessing.TimeoutError

當超時時間到期時,具有超時的方法會引發此異常。

管道和佇列

當使用多個程序時,通常使用訊息傳遞來實現程序之間的通訊,並避免使用任何同步原語(如鎖)。

對於傳遞訊息,可以使用 Pipe()(用於兩個程序之間的連線)或佇列(允許多個生產者和消費者)。

QueueSimpleQueueJoinableQueue 型別是多生產者、多消費者 FIFO 佇列,建模於標準庫中的 queue.Queue 類。它們的區別在於,Queue 缺少 Python 2.5 的 queue.Queue 類中引入的 task_done()join() 方法。

如果使用 JoinableQueue,則**必須**對從佇列中刪除的每個任務呼叫 JoinableQueue.task_done(),否則用於計算未完成任務數量的訊號量可能會最終溢位,從而引發異常。

與其他 Python 佇列實現的一個區別是,multiprocessing 佇列使用 pickle 序列化放入其中的所有物件。get 方法返回的物件是重新建立的物件,它不與原始物件共享記憶體。

請注意,還可以使用管理器物件建立共享佇列 - 請參閱 管理器

注意

multiprocessing 使用通常的 queue.Emptyqueue.Full 異常來指示超時。它們在 multiprocessing 名稱空間中不可用,因此需要從 queue 匯入它們。

注意

當一個物件被放入佇列時,該物件會被序列化(pickled),然後後臺執行緒會將序列化後的資料重新整理到下層的管道中。這會產生一些有點出乎意料的後果,但應該不會造成任何實際困難——如果這些問題真的困擾你,那麼你可以改用使用管理器建立的佇列。

  1. 在將一個物件放入空佇列後,可能會有一個極小的延遲,之後佇列的empty()方法返回False,並且get_nowait()可以返回而不引發queue.Empty異常。

  2. 如果多個程序正在將物件入隊,則物件可能以亂序的方式在另一端被接收。但是,同一個程序入隊的多個物件之間始終會保持預期的順序。

警告

如果一個程序在使用Queue時被使用Process.terminate()os.kill()終止,那麼佇列中的資料很可能會損壞。這可能會導致任何其他程序在稍後嘗試使用該佇列時出現異常。

警告

如上所述,如果一個子程序已將專案放入佇列(並且它沒有使用JoinableQueue.cancel_join_thread),那麼該程序將不會終止,直到所有緩衝的專案都被重新整理到管道中。

這意味著,除非你確定所有放入佇列的專案都已被消耗,否則如果你嘗試加入該程序,可能會出現死鎖。類似地,如果子程序是非守護程序,那麼父程序在退出時嘗試加入其所有非守護子程序時可能會掛起。

請注意,使用管理器建立的佇列沒有這個問題。請參閱程式設計指南

有關使用佇列進行程序間通訊的示例,請參閱示例

multiprocessing.Pipe([duplex])

返回一對 (conn1, conn2) Connection 物件,表示管道的兩端。

如果 duplexTrue (預設值),則管道是雙向的。如果 duplexFalse,則管道是單向的:conn1 只能用於接收訊息,而 conn2 只能用於傳送訊息。

send() 方法使用 pickle 序列化物件,而 recv() 則重新建立物件。

class multiprocessing.Queue([maxsize])

返回一個使用管道和一些鎖/訊號量實現的程序共享佇列。當程序首次將一個專案放入佇列時,會啟動一個饋送執行緒,該執行緒將物件從緩衝區傳輸到管道中。

標準的 queue.Emptyqueue.Full 異常(來自標準庫的 queue 模組)會被引發以指示超時。

Queue 實現了 queue.Queue 的所有方法,除了 task_done()join()

qsize()

返回佇列的近似大小。由於多執行緒/多程序語義,此數字不可靠。

請注意,在像 macOS 這樣未實現 sem_getvalue() 的平臺上,這可能會引發 NotImplementedError 異常。

empty()

如果佇列為空,則返回 True,否則返回 False。由於多執行緒/多程序語義,這是不可靠的。

可能會在關閉的佇列上引發 OSError。(不保證)

full()

如果佇列已滿,則返回 True,否則返回 False。由於多執行緒/多程序語義,這是不可靠的。

put(obj[, block[, timeout]])

將 obj 放入佇列。如果可選引數 blockTrue (預設值)並且 timeoutNone (預設值),則在有可用空閒槽位之前阻塞。如果 timeout 是一個正數,它會最多阻塞 timeout 秒,並且如果在該時間內沒有可用的空閒槽位,則會引發 queue.Full 異常。否則(blockFalse),如果有可用的空閒槽位,則將專案放入佇列,否則引發 queue.Full 異常(在這種情況下,timeout 將被忽略)。

在 3.8 版本中更改: 如果佇列已關閉,則會引發 ValueError 異常,而不是 AssertionError 異常。

put_nowait(obj)

等效於 put(obj, False)

get([block[, timeout]])

從佇列中移除並返回一個專案。如果可選引數 blockTrue (預設值)並且 timeoutNone (預設值),則在有可用專案之前阻塞。如果 timeout 是一個正數,它會最多阻塞 timeout 秒,並且如果在該時間內沒有可用的專案,則會引發 queue.Empty 異常。否則(block 為 False),如果有可用的專案,則返回一個專案,否則引發 queue.Empty 異常(在這種情況下,timeout 將被忽略)。

在 3.8 版本中更改: 如果佇列已關閉,則會引發 ValueError 異常,而不是 OSError 異常。

get_nowait()

等效於 get(False)

multiprocessing.Queue 具有一些 queue.Queue 中沒有的額外方法。這些方法對於大多數程式碼通常是不必要的。

close()

表示當前程序將不再向此佇列放入資料。一旦後臺執行緒將所有緩衝資料重新整理到管道,它就會退出。當佇列被垃圾回收時,會自動呼叫此方法。

join_thread()

加入後臺執行緒。此方法只能在 close() 被呼叫後使用。它會阻塞直到後臺執行緒退出,確保緩衝區中的所有資料都被重新整理到管道。

預設情況下,如果程序不是佇列的建立者,則在退出時,它將嘗試加入佇列的後臺執行緒。程序可以呼叫 cancel_join_thread() 來使 join_thread() 不執行任何操作。

cancel_join_thread()

防止 join_thread() 阻塞。特別是,這會阻止後臺執行緒在程序退出時自動加入 —— 請參閱 join_thread()

這個方法更好的名稱可能是 allow_exit_without_flush()。 它很可能會導致排隊的資料丟失,而且你幾乎肯定不需要使用它。 只有當您需要當前程序立即退出,而不等待將排隊的資料重新整理到底層管道,並且您不在乎丟失資料時,它才真正有用。

注意

此類的功能需要在主機作業系統上具有正常工作的共享訊號量實現。如果沒有,則此類的功能將被停用,並且嘗試例項化 Queue 將導致 ImportError。有關其他資訊,請參閱 bpo-3770。對於下面列出的任何特殊佇列型別,情況也是如此。

class multiprocessing.SimpleQueue

它是一個簡化的 Queue 型別,非常接近鎖定的 Pipe

close()

關閉佇列:釋放內部資源。

佇列關閉後不得再使用。 例如,不得再呼叫 get(), put()empty() 方法。

在 3.9 版本中新增。

empty()

如果佇列為空,則返回 True,否則返回 False

如果 SimpleQueue 已關閉,則始終引發 OSError

get()

從佇列中移除並返回一個專案。

put(item)

item放入佇列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue 的子類,它還具有 task_done()join() 方法。

task_done()

表示先前入隊的任務已完成。由佇列消費者使用。對於每個用於獲取任務的 get(),隨後呼叫 task_done() 會告訴佇列該任務的處理已完成。

如果 join() 當前正在阻塞,則當所有專案都已處理完畢時,它將恢復(這意味著對於放入佇列的每個專案,都收到了 task_done() 呼叫)。

如果呼叫的次數多於放入佇列的專案數,則會引發 ValueError

join()

阻塞直到佇列中的所有專案都已被獲取和處理。

每當向佇列新增一個專案時,未完成任務的計數就會增加。每當消費者呼叫 task_done() 以指示該專案已被檢索並且所有工作都已完成時,計數就會減少。當未完成任務的計數降至零時,join() 將解除阻塞。

雜項

multiprocessing.active_children()

返回當前程序的所有活動子程序的列表。

呼叫此函式會產生“加入”任何已完成的程序的副作用。

multiprocessing.cpu_count()

返回系統中的 CPU 數量。

此數字不等同於當前程序可以使用的 CPU 數量。 可用 CPU 的數量可以使用 os.process_cpu_count() (或 len(os.sched_getaffinity(0)))獲取。

當無法確定 CPU 數量時,會引發 NotImplementedError

在 3.13 版本中變更: 返回值也可以使用 -X cpu_count 標誌或 PYTHON_CPU_COUNT 覆蓋,因為這僅僅是對 os cpu count API 的封裝。

multiprocessing.current_process()

返回與當前程序對應的 Process 物件。

類似於 threading.current_thread()

multiprocessing.parent_process()

返回與 current_process() 的父程序對應的 Process 物件。對於主程序,parent_process 將為 None

3.8 版本中新增。

multiprocessing.freeze_support()

新增對使用 multiprocessing 的程式被凍結以生成 Windows 可執行檔案時的支援。(已使用 py2exePyInstallercx_Freeze 進行測試。)

需要在主模組的 if __name__ == '__main__' 行之後立即呼叫此函式。例如:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果省略了 freeze_support() 行,則嘗試執行凍結的可執行檔案將引發 RuntimeError

在 Windows 以外的任何作業系統上呼叫 freeze_support() 均無效。此外,如果該模組在 Windows 上由 Python 直譯器正常執行(程式未被凍結),則 freeze_support() 也無效。

multiprocessing.get_all_start_methods()

返回支援的啟動方法列表,其中第一個是預設方法。可能的啟動方法有 'fork''spawn''forkserver'。並非所有平臺都支援所有方法。請參閱 上下文和啟動方法

3.4 版本中新增。

multiprocessing.get_context(method=None)

返回一個上下文物件,該物件具有與 multiprocessing 模組相同的屬性。

如果 methodNone,則返回預設上下文。否則,method 應為 'fork''spawn''forkserver'。如果指定的啟動方法不可用,則引發 ValueError。請參閱 上下文和啟動方法

3.4 版本中新增。

multiprocessing.get_start_method(allow_none=False)

返回用於啟動程序的啟動方法的名稱。

如果啟動方法尚未固定且 allow_none 為 false,則啟動方法固定為預設值並返回名稱。如果啟動方法尚未固定且 allow_none 為 true,則返回 None

返回值可以是 'fork''spawn''forkserver'None。請參閱 上下文和啟動方法

3.4 版本中新增。

在 3.8 版本中更改: 在 macOS 上,*spawn* 啟動方法現在是預設方法。*fork* 啟動方法應被視為不安全,因為它可能導致子程序崩潰。請參閱 bpo-33725

multiprocessing.set_executable(executable)

設定啟動子程序時要使用的 Python 直譯器的路徑。(預設情況下使用 sys.executable)。嵌入器可能需要執行以下操作

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

然後才能建立子程序。

在 3.4 版本中更改: 當使用 'spawn' 啟動方法時,現在在 POSIX 上受支援。

在 3.11 版本中更改: 接受 path-like object

multiprocessing.set_forkserver_preload(module_names)

設定 forkserver 主程序嘗試匯入的模組名稱列表,以便 forked 程序繼承其已匯入的狀態。執行此操作時,任何 ImportError 都將被靜默忽略。這可以用作效能增強,以避免在每個程序中重複工作。

為了使其正常工作,必須在 forkserver 程序啟動之前(在建立 Pool 或啟動 Process 之前)呼叫它。

僅在使用 'forkserver' 啟動方法時才有意義。請參閱 上下文和啟動方法

3.4 版本中新增。

multiprocessing.set_start_method(method, force=False)

設定應用於啟動子程序的方法。method 引數可以是 'fork''spawn''forkserver'。如果已設定啟動方法且 force 不是 True,則會引發 RuntimeError。如果 methodNoneforceTrue,則啟動方法設定為 None。如果 methodNoneforceFalse,則上下文設定為預設上下文。

請注意,此函式最多應呼叫一次,並且應在主模組的 if __name__ == '__main__' 子句中進行保護。

請參閱 上下文和啟動方法

3.4 版本中新增。

連線物件

連線物件允許傳送和接收可 pickle 的物件或字串。可以將它們視為面向訊息的已連線套接字。

連線物件通常使用 Pipe 建立 – 另請參閱 偵聽器和客戶端

class multiprocessing.connection.Connection
send(obj)

將一個物件傳送到連線的另一端,該物件應使用 recv() 讀取。

該物件必須是可序列化的。非常大的 pickle (大約 32 MiB+,雖然取決於作業系統) 可能會引發 ValueError 異常。

recv()

返回使用 send() 從連線另一端傳送的物件。會阻塞直到有資料接收。如果沒有任何內容可接收並且另一端已關閉,則引發 EOFError

fileno()

返回連線使用的檔案描述符或控制代碼。

close()

關閉連線。

當連線被垃圾回收時,會自動呼叫此方法。

poll([timeout])

返回是否有任何資料可供讀取。

如果未指定 timeout,則會立即返回。如果 timeout 是一個數字,則指定阻塞的最大時間(以秒為單位)。如果 timeoutNone,則使用無限超時。

請注意,可以使用 multiprocessing.connection.wait() 一次輪詢多個連線物件。

send_bytes(buffer[, offset[, size]])

類位元組物件 傳送位元組資料作為完整訊息。

如果給出了 offset,則從 buffer 中的該位置讀取資料。如果給出了 size,則將從 buffer 中讀取該位元組數。非常大的緩衝區(大約 32 MiB+,雖然取決於作業系統)可能會引發 ValueError 異常。

recv_bytes([maxlength])

返回從連線另一端傳送的位元組資料的完整訊息,並以字串形式返回。會阻塞直到有資料接收。如果沒有任何內容可接收並且另一端已關閉,則引發 EOFError

如果指定了 maxlength 並且訊息長度大於 maxlength,則會引發 OSError,並且連線將不再可讀。

在 3.3 版本中更改: 此函式過去引發 IOError,現在是 OSError 的別名。

recv_bytes_into(buffer[, offset])

將從連線另一端傳送的位元組資料的完整訊息讀取到 buffer 中,並返回訊息中的位元組數。會阻塞直到有資料接收。如果沒有任何內容可接收並且另一端已關閉,則引發 EOFError

buffer 必須是可寫的 類位元組物件。如果給出了 offset,則將從該位置開始將訊息寫入緩衝區。Offset 必須是一個非負整數,並且小於 buffer 的長度(以位元組為單位)。

如果緩衝區太短,則會引發 BufferTooShort 異常,並且完整訊息可用作 e.args[0],其中 e 是異常例項。

在 3.3 版本中更改: 現在可以使用 Connection.send()Connection.recv() 在程序之間傳輸連線物件本身。

連線物件現在還支援上下文管理協議 - 請參見 上下文管理器型別__enter__() 返回連線物件,__exit__() 呼叫 close()

例如

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv() 方法會自動反序列化它接收的資料,除非您可以信任傳送訊息的程序,否則這可能存在安全風險。

因此,除非使用 Pipe() 生成了連線物件,否則您應僅在執行某種身份驗證後才使用 recv()send() 方法。請參閱 身份驗證金鑰

警告

如果程序在嘗試讀取或寫入管道時被終止,則管道中的資料很可能會損壞,因為可能無法確定訊息邊界在哪裡。

同步原語

通常,在多程序程式中,同步原語不如在多執行緒程式中那麼必要。請參閱 threading 模組的文件。

請注意,還可以透過使用管理器物件建立同步原語 - 請參閱 管理器

class multiprocessing.Barrier(parties[, action[, timeout]])

柵欄物件:threading.Barrier 的克隆。

3.3 版本中新增。

class multiprocessing.BoundedSemaphore([value])

有界訊號量物件:與 threading.BoundedSemaphore 非常相似。

與其密切對應的物件存在一個細微的區別:它的 acquire 方法的第一個引數被命名為 block,這與 Lock.acquire() 一致。

注意

在 macOS 上,它與 Semaphore 沒有區別,因為 sem_getvalue() 未在該平臺上實現。

class multiprocessing.Condition([lock])

條件變數:threading.Condition 的別名。

如果指定了 lock,則它應該是來自 multiprocessingLockRLock 物件。

在 3.3 版本中更改: 添加了 wait_for() 方法。

class multiprocessing.Event

threading.Event 的一個克隆。

class multiprocessing.Lock

一個非遞迴鎖物件:與 threading.Lock 非常相似。一旦程序或執行緒獲取了鎖,後續任何程序或執行緒嘗試獲取該鎖都會被阻塞,直到該鎖被釋放;任何程序或執行緒都可以釋放該鎖。 threading.Lock 的概念和行為(在應用於執行緒時)在 multiprocessing.Lock 中被複制,應用於程序或執行緒,除非另有說明。

請注意,Lock 實際上是一個工廠函式,它返回一個使用預設上下文初始化的 multiprocessing.synchronize.Lock 例項。

Lock 支援 上下文管理器 協議,因此可以在 with 語句中使用。

acquire(block=True, timeout=None)

獲取鎖,阻塞或非阻塞。

如果 block 引數設定為 True(預設值),則方法呼叫將阻塞,直到鎖處於未鎖定狀態,然後將其設定為鎖定狀態並返回 True。請注意,第一個引數的名稱與 threading.Lock.acquire() 中的名稱不同。

如果 block 引數設定為 False,則方法呼叫不會阻塞。如果鎖當前處於鎖定狀態,則返回 False;否則,將鎖設定為鎖定狀態並返回 True

當使用正浮點值 timeout 呼叫時,只要無法獲取鎖,最多會阻塞 timeout 指定的秒數。使用負值 timeout 呼叫等效於 timeout 為零。使用 timeout 值為 None(預設值)的呼叫將超時時間設定為無限。請注意,timeout 的負值或 None 值的處理方式與 threading.Lock.acquire() 中實現的有所不同。如果 block 引數設定為 False,則 timeout 引數沒有實際意義,因此會被忽略。如果已獲取鎖,則返回 True;如果超時時間已過,則返回 False

release()

釋放鎖。這可以從任何程序或執行緒呼叫,而不僅僅是最初獲取鎖的程序或執行緒。

行為與 threading.Lock.release() 中的行為相同,但當在未鎖定的鎖上呼叫時,會引發 ValueError

class multiprocessing.RLock

一個遞迴鎖物件:與 threading.RLock 非常相似。遞迴鎖必須由獲取它的程序或執行緒釋放。一旦程序或執行緒獲取了遞迴鎖,同一程序或執行緒可以再次獲取它而不會阻塞;該程序或執行緒必須在每次獲取後釋放一次。

請注意,RLock 實際上是一個工廠函式,它返回一個使用預設上下文初始化的 multiprocessing.synchronize.RLock 例項。

RLock 支援 上下文管理器 協議,因此可以在 with 語句中使用。

acquire(block=True, timeout=None)

獲取鎖,阻塞或非阻塞。

當使用 block 引數設定為 True 呼叫時,會阻塞,直到鎖處於未鎖定狀態(不歸任何程序或執行緒所有),除非鎖已歸當前程序或執行緒所有。然後,當前程序或執行緒獲取鎖的所有權(如果它尚未擁有所有權),並且鎖內的遞迴級別加 1,導致返回值為 True。請注意,與 threading.RLock.acquire() 的實現相比,此第一個引數的行為存在一些差異,從引數本身的名稱開始。

當使用 block 引數設定為 False 呼叫時,不會阻塞。如果鎖已被另一個程序或執行緒獲取(因此被擁有),則當前程序或執行緒不會獲取所有權,並且鎖內的遞迴級別不會更改,導致返回值為 False。如果鎖處於未鎖定狀態,則當前程序或執行緒獲取所有權,並且遞迴級別會遞增,導致返回值為 True

timeout 引數的使用和行為與 Lock.acquire() 中的相同。請注意,timeout 的某些行為與 threading.RLock.acquire() 中實現的有所不同。

release()

釋放鎖,遞迴級別遞減。如果在遞減後遞迴級別為零,則將鎖重置為未鎖定狀態(不歸任何程序或執行緒所有),並且如果有任何其他程序或執行緒被阻塞等待鎖變為未鎖定狀態,則允許其中一個繼續進行。如果在遞減後遞迴級別仍然不為零,則鎖保持鎖定狀態,並由呼叫程序或執行緒擁有。

僅當呼叫程序或執行緒擁有鎖時才呼叫此方法。如果此方法由所有者以外的程序或執行緒呼叫,或者如果鎖處於未鎖定(未擁有)狀態,則會引發 AssertionError。請注意,在這種情況下引發的異常型別與 threading.RLock.release() 中實現的有所不同。

class multiprocessing.Semaphore([value])

一個訊號量物件:與 threading.Semaphore 非常相似。

與其密切對應的物件存在一個細微的區別:它的 acquire 方法的第一個引數被命名為 block,這與 Lock.acquire() 一致。

注意

在 macOS 上,不支援 sem_timedwait,因此使用超時呼叫 acquire() 將使用睡眠迴圈來模擬該函式的行為。

注意

此軟體包的某些功能需要在主機作業系統上具有正常工作的共享訊號量實現。如果沒有,將停用 multiprocessing.synchronize 模組,並且嘗試匯入它將導致 ImportError。有關更多資訊,請參閱 bpo-3770

共享 ctypes 物件

可以使用共享記憶體建立可由子程序繼承的共享物件。

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回一個從共享記憶體分配的 ctypes 物件。預設情況下,返回值實際上是該物件的同步包裝器。可以透過 Valuevalue 屬性訪問物件本身。

typecode_or_type 決定返回物件的型別:它可以是一個 ctypes 型別,也可以是 array 模組使用的那種單字元型別碼。*args 會傳遞給該型別的建構函式。

如果 lockTrue(預設值),則會建立一個新的遞迴鎖物件,用於同步對值的訪問。如果 lock 是一個 LockRLock 物件,則將使用該物件來同步對值的訪問。如果 lockFalse,則對返回物件的訪問不會自動受到鎖的保護,因此它不一定是“程序安全的”。

諸如 += 這類涉及讀取和寫入的操作不是原子的。因此,例如,如果要原子地遞增一個共享值,僅僅執行以下操作是不夠的:

counter.value += 1

假設關聯的鎖是遞迴的(預設情況下是遞迴的),你可以改為執行以下操作:

with counter.get_lock():
    counter.value += 1

請注意,lock 是一個僅限關鍵字的引數。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回一個從共享記憶體分配的 ctypes 陣列。預設情況下,返回值實際上是該陣列的同步包裝器。

typecode_or_type 決定返回陣列元素的型別:它可以是一個 ctypes 型別,也可以是 array 模組使用的那種單字元型別碼。如果 size_or_initializer 是一個整數,則它決定陣列的長度,並且陣列最初將被置零。否則,size_or_initializer 是一個序列,該序列用於初始化陣列,並且其長度決定陣列的長度。

如果 lockTrue(預設值),則會建立一個新的鎖物件,用於同步對值的訪問。如果 lock 是一個 LockRLock 物件,則將使用該物件來同步對值的訪問。如果 lockFalse,則對返回物件的訪問不會自動受到鎖的保護,因此它不一定是“程序安全的”。

請注意,lock 是一個僅限關鍵字的引數。

請注意,ctypes.c_char 陣列具有 valueraw 屬性,允許您使用它來儲存和檢索字串。

multiprocessing.sharedctypes 模組

multiprocessing.sharedctypes 模組提供了用於從共享記憶體分配 ctypes 物件的函式,這些物件可以被子程序繼承。

注意

雖然可以在共享記憶體中儲存指標,但請記住,這將引用特定程序的地址空間中的位置。但是,該指標很可能在第二個程序的上下文中無效,並且嘗試從第二個程序取消引用該指標可能會導致崩潰。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回一個從共享記憶體分配的 ctypes 陣列。

typecode_or_type 決定返回陣列元素的型別:它可以是一個 ctypes 型別,也可以是 array 模組使用的那種單字元型別碼。如果 size_or_initializer 是一個整數,則它決定陣列的長度,並且陣列最初將被置零。否則,size_or_initializer 是一個序列,該序列用於初始化陣列,並且其長度決定陣列的長度。

請注意,設定和獲取元素可能不是原子的——請改用 Array() 來確保使用鎖自動同步訪問。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回一個從共享記憶體分配的 ctypes 物件。

typecode_or_type 決定返回物件的型別:它可以是一個 ctypes 型別,也可以是 array 模組使用的那種單字元型別碼。*args 會傳遞給該型別的建構函式。

請注意,設定和獲取值可能不是原子的——請改用 Value() 來確保使用鎖自動同步訪問。

請注意,ctypes.c_char 陣列具有 valueraw 屬性,允許您使用它來儲存和檢索字串——請參閱 ctypes 的文件。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

RawArray() 相同,不同之處在於,根據 lock 的值,可能會返回一個程序安全的同步包裝器,而不是原始的 ctypes 陣列。

如果 lockTrue(預設值),則會建立一個新的鎖物件,用於同步對值的訪問。如果 lock 是一個 LockRLock 物件,則將使用該物件來同步對值的訪問。如果 lockFalse,則對返回物件的訪問不會自動受到鎖的保護,因此它不一定是“程序安全的”。

請注意,lock 是一個僅限關鍵字的引數。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

RawValue() 相同,不同之處在於,根據 lock 的值,可能會返回一個程序安全的同步包裝器,而不是原始的 ctypes 物件。

如果 lockTrue(預設值),則會建立一個新的鎖物件,用於同步對值的訪問。如果 lock 是一個 LockRLock 物件,則將使用該物件來同步對值的訪問。如果 lockFalse,則對返回物件的訪問不會自動受到鎖的保護,因此它不一定是“程序安全的”。

請注意,lock 是一個僅限關鍵字的引數。

multiprocessing.sharedctypes.copy(obj)

返回一個從共享記憶體分配的 ctypes 物件,它是 ctypes 物件 obj 的副本。

multiprocessing.sharedctypes.synchronized(obj[, lock])

返回一個 ctypes 物件的程序安全包裝器物件,該物件使用 lock 來同步訪問。如果 lockNone(預設值),則會自動建立一個 multiprocessing.RLock 物件。

同步包裝器除了它包裝的物件的方法外,還將有兩個方法:get_obj() 返回包裝的物件,get_lock() 返回用於同步的鎖物件。

請注意,透過包裝器訪問 ctypes 物件可能比訪問原始 ctypes 物件慢得多。

在 3.5 版本中更改: 同步物件支援 上下文管理器 協議。

下表比較了從共享記憶體建立共享 ctypes 物件的語法與常規 ctypes 語法。(在表中,MyStructctypes.Structure 的某個子類。)

ctypes

使用型別的 sharedctypes

使用型別碼的 sharedctypes

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(‘d’, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(‘h’, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(‘i’, (9, 2, 8))

下面是一個示例,其中一些 ctypes 物件被子程序修改

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

列印的結果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

管理器

管理器提供了一種建立可以在不同程序之間共享的資料的方法,包括在不同機器上執行的程序之間透過網路共享。管理器物件控制一個伺服器程序,該程序管理共享物件。其他程序可以透過使用代理來訪問共享物件。

multiprocessing.Manager()

返回一個已啟動的 SyncManager 物件,該物件可用於在程序之間共享物件。返回的管理器物件對應於一個已衍生的子程序,並具有可建立共享物件並返回相應代理的方法。

管理器程序會在它們被垃圾回收或其父程序退出時立即關閉。管理器類在 multiprocessing.managers 模組中定義。

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

建立一個 BaseManager 物件。

一旦建立,應該呼叫 start()get_server().serve_forever() 以確保管理器物件引用一個已啟動的管理器程序。

address 是管理器程序偵聽新連線的地址。如果 addressNone,則會選擇一個任意地址。

authkey 是用於檢查到伺服器程序的傳入連線有效性的身份驗證金鑰。如果 authkeyNone,則使用 current_process().authkey。否則,使用 authkey,並且它必須是一個位元組字串。

serializer 必須是 'pickle' (使用 pickle 序列化)或 'xmlrpclib' (使用 xmlrpc.client 序列化)。

ctx 是一個上下文物件,或 None (使用當前上下文)。請參閱 get_context() 函式。

shutdown_timeout 是一個以秒為單位的超時時間,用於等待管理器使用的程序在 shutdown() 方法中完成。如果關閉超時,則程序被終止。如果終止程序也超時,則程序被強制結束。

在 3.11 版本中更改: 添加了 shutdown_timeout 引數。

start([initializer[, initargs]])

啟動一個子程序來啟動管理器。如果 initializer 不是 None,則子程序會在啟動時呼叫 initializer(*initargs)

get_server()

返回一個 Server 物件,該物件表示受管理器控制的實際伺服器。Server 物件支援 serve_forever() 方法

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server 另外還有一個 address 屬性。

connect()

將本地管理器物件連線到遠端管理器程序

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

停止管理器使用的程序。僅當已使用 start() 啟動伺服器程序時,此方法才可用。

可以多次呼叫此方法。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

一個類方法,可用於向管理器類註冊型別或可呼叫物件。

typeid 是一個“型別識別符號”,用於標識特定型別的共享物件。這必須是一個字串。

callable 是一個可呼叫物件,用於為該型別識別符號建立物件。如果管理器例項將使用 connect() 方法連線到伺服器,或者如果 create_method 引數為 False,則可以將其保留為 None

proxytypeBaseProxy 的子類,用於為具有此 typeid 的共享物件建立代理。如果為 None,則會自動建立一個代理類。

exposed 用於指定一系列方法名稱,應允許此 typeid 的代理使用 BaseProxy._callmethod() 訪問。(如果 exposedNone,則如果存在 proxytype._exposed_,則會改用它。)在未指定公開列表的情況下,共享物件的所有“公共方法”都將可以訪問。(這裡的“公共方法”是指任何具有 __call__() 方法並且其名稱不以 '_' 開頭的屬性。)

method_to_typeid 是一個對映,用於指定應返回代理的那些公開方法的返回型別。它將方法名稱對映到 typeid 字串。(如果 method_to_typeidNone,則如果存在 proxytype._method_to_typeid_,則會改用它。)如果方法的名稱不是此對映的鍵,或者如果對映為 None,則該方法返回的物件將按值複製。

create_method 確定是否應建立名稱為 typeid 的方法,該方法可用於告知伺服器程序建立新的共享物件併為其返回代理。預設情況下為 True

BaseManager 例項還具有一個只讀屬性

address

管理器使用的地址。

在 3.3 版本中更改: 管理器物件支援上下文管理協議 - 請參閱 上下文管理器型別__enter__() 啟動伺服器程序(如果尚未啟動),然後返回管理器物件。__exit__() 呼叫 shutdown()

在以前的版本中,如果管理器的伺服器程序尚未啟動,則 __enter__() 不會啟動該程序。

class multiprocessing.managers.SyncManager

BaseManager 的子類,可用於程序同步。此型別的物件由 multiprocessing.Manager() 返回。

它的方法會建立並返回用於跨程序同步的多種常用資料型別的 代理物件。這主要包括共享列表和字典。

Barrier(parties[, action[, timeout]])

建立一個共享的 threading.Barrier 物件並返回它的代理。

3.3 版本中新增。

BoundedSemaphore([value])

建立一個共享的 threading.BoundedSemaphore 物件並返回它的代理。

Condition([lock])

建立一個共享的 threading.Condition 物件並返回它的代理。

如果提供了 *lock*,則它應該是 threading.Lockthreading.RLock 物件的代理。

在 3.3 版本中更改: 添加了 wait_for() 方法。

Event()

建立一個共享的 threading.Event 物件並返回它的代理。

Lock()

建立一個共享的 threading.Lock 物件並返回它的代理。

Namespace()

建立一個共享的 Namespace 物件並返回它的代理。

Queue([maxsize])

建立一個共享的 queue.Queue 物件並返回它的代理。

RLock()

建立一個共享的 threading.RLock 物件並返回它的代理。

Semaphore([value])

建立一個共享的 threading.Semaphore 物件並返回它的代理。

Array(typecode, sequence)

建立一個數組並返回它的代理。

Value(typecode, value)

建立一個具有可寫 value 屬性的物件,並返回它的代理。

dict()
dict(mapping)
dict(sequence)

建立一個共享的 dict 物件並返回它的代理。

list()
list(sequence)

建立一個共享的 list 物件並返回它的代理。

在 3.6 版本中變更: 共享物件可以巢狀。例如,共享容器物件(如共享列表)可以包含其他共享物件,這些物件都將由 SyncManager 管理和同步。

class multiprocessing.managers.Namespace

是可以向 SyncManager 註冊的型別。

名稱空間物件沒有公共方法,但具有可寫屬性。它的表示形式顯示其屬性的值。

但是,當使用名稱空間物件的代理時,以 '_' 開頭的屬性將是代理的屬性,而不是引用物件的屬性

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

自定義管理器

要建立自己的管理器,可以建立 BaseManager 的子類,並使用 register() 類方法向管理器類註冊新型別或可呼叫物件。例如

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

使用遠端管理器

可以在一臺機器上執行管理器伺服器,並允許客戶端從其他機器上使用它(假設所涉及的防火牆允許這樣做)。

執行以下命令會為單個共享佇列建立一個伺服器,遠端客戶端可以訪問該佇列

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一個客戶端可以按如下方式訪問伺服器

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一個客戶端也可以使用它

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地程序也可以訪問該佇列,使用上面客戶端的程式碼遠端訪問它

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

代理物件

代理是一個 *引用* 共享物件的物件,該共享物件(可能)存在於不同的程序中。共享物件被稱為代理的 *引用物件*。多個代理物件可能具有相同的引用物件。

代理物件具有呼叫其引用物件的相應方法的方法(儘管並非引用物件的每個方法都可以透過代理使用)。透過這種方式,代理的使用方式與引用物件相同

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

請注意,將 str() 應用於代理將返回引用物件的表示形式,而應用 repr() 將返回代理的表示形式。

代理物件的一個重要特性是它們是可序列化的,因此可以在程序之間傳遞。因此,引用物件可以包含 代理物件。這允許對這些託管列表、字典和其他 代理物件 進行巢狀

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

類似地,字典和列表代理可以相互巢狀

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

如果引用物件中包含標準(非代理)listdict 物件,則對這些可變值的修改不會透過管理器傳播,因為代理無法知道何時修改了其中包含的值。但是,在容器代理中儲存值(這會在代理物件上觸發 __setitem__)會透過管理器傳播,因此要有效地修改此類專案,可以將修改後的值重新分配給容器代理

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

這種方法可能不如對大多數用例使用巢狀的 代理物件 方便,但它也展示了對同步的控制程度。

注意

multiprocessing 中的代理型別不支援按值比較。例如,我們有

>>> manager.list([1,2,3]) == [1,2,3]
False

進行比較時,應改為使用引用物件的副本。

class multiprocessing.managers.BaseProxy

代理物件是 BaseProxy 子類的例項。

_callmethod(methodname[, args[, kwds]])

呼叫並返回代理物件的引用物件的方法的結果。

如果 proxy 是一個代理,其引用物件是 obj,那麼表示式

proxy._callmethod(methodname, args, kwds)

將會計算表示式

getattr(obj, methodname)(*args, **kwds)

在管理器程序中。

返回的值將是呼叫結果的副本或新共享物件的代理 - 請參閱 BaseManager.register()method_to_typeid 引數的文件。

如果呼叫引發異常,則 _callmethod() 將重新引發該異常。如果在管理器程序中引發其他異常,則將其轉換為 RemoteError 異常,並由 _callmethod() 引發。

請特別注意,如果 methodname 沒有被 *公開*,則會引發異常。

以下是 _callmethod() 的使用示例

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回引用物件的副本。

如果引用物件不可 pickle,則會引發異常。

__repr__()

返回代理物件的表示。

__str__()

返回引用物件的表示。

清理

代理物件使用弱引用回撥,以便在被垃圾回收時,它會從擁有其引用物件的管理器中登出自身。

當不再有任何代理引用共享物件時,該共享物件將從管理器程序中刪除。

程序池

可以使用 Pool 類建立一個程序池,該程序池將執行提交給它的任務。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一個程序池物件,它控制一個工作程序池,可以向該程序池提交作業。它支援帶有超時和回撥的非同步結果,並具有並行對映實現。

processes 是要使用的工作程序的數量。如果 processesNone,則使用 os.process_cpu_count() 返回的數字。

如果 initializer 不為 None,則每個工作程序在啟動時都會呼叫 initializer(*initargs)

maxtasksperchild 是一個工作程序在退出並被新的工作程序替換之前可以完成的任務數,以便釋放未使用的資源。預設的 maxtasksperchildNone,這意味著工作程序將與池的生命週期一樣長。

context 可用於指定用於啟動工作程序的上下文。通常,使用函式 multiprocessing.Pool() 或上下文物件的 Pool() 方法建立池。在這兩種情況下,都會適當地設定 context

請注意,池物件的方法只能由建立該池的程序呼叫。

警告

multiprocessing.pool 物件具有內部資源,需要透過將池用作上下文管理器或手動呼叫 close()terminate() 來進行適當的管理(像任何其他資源一樣)。否則可能會導致程序在最終化時掛起。

請注意,不正確的是依賴垃圾回收器來銷燬池,因為 CPython 不保證會呼叫池的 finalizer(有關更多資訊,請參見 object.__del__())。

在 3.2 版本中更改: 添加了 maxtasksperchild 引數。

在 3.4 版本中更改: 添加了 context 引數。

在 3.13 版本中更改: processes 預設使用 os.process_cpu_count(),而不是 os.cpu_count()

注意

Pool 中的工作程序通常在其工作佇列的整個持續時間記憶體活。在其他系統中(如 Apache、mod_wsgi 等)中,為了釋放工作程序持有的資源,常見的一種模式是允許池中的工作程序僅完成一定量的工作,然後退出,進行清理,並生成一個新的程序來替換舊的程序。Poolmaxtasksperchild 引數向終端使用者公開了此功能。

apply(func[, args[, kwds]])

使用引數 args 和關鍵字引數 kwds 呼叫 func。它會阻塞,直到結果準備就緒。鑑於此會阻塞,apply_async() 更適合並行執行工作。此外,func 僅在池的一個工作程序中執行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

apply() 方法的變體,返回一個 AsyncResult 物件。

如果指定了 callback,則它應該是一個接受單個引數的可呼叫物件。當結果準備就緒時,callback 將應用於該結果,除非呼叫失敗,在這種情況下,將改為應用 error_callback

如果指定了 error_callback,則它應該是一個接受單個引數的可呼叫物件。如果目標函式失敗,則將使用異常例項呼叫 error_callback

回撥應立即完成,否則處理結果的執行緒將被阻塞。

map(func, iterable[, chunksize])

是內建函式 map() 的並行等效函式(但它只支援一個可迭代引數,對於多個可迭代引數,請參見 starmap())。它會阻塞直到結果準備就緒。

此方法將可迭代物件分割成若干塊,然後將其作為單獨的任務提交給程序池。可以透過將 chunksize 設定為正整數來指定這些塊的(大致)大小。

請注意,對於非常長的可迭代物件,這可能會導致較高的記憶體使用率。為了獲得更好的效率,請考慮使用帶有顯式 chunksize 選項的 imap()imap_unordered()

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

map() 方法的一個變體,它返回一個 AsyncResult 物件。

如果指定了 callback,則它應該是一個接受單個引數的可呼叫物件。當結果準備就緒時,callback 將應用於該結果,除非呼叫失敗,在這種情況下,將改為應用 error_callback

如果指定了 error_callback,則它應該是一個接受單個引數的可呼叫物件。如果目標函式失敗,則將使用異常例項呼叫 error_callback

回撥應立即完成,否則處理結果的執行緒將被阻塞。

imap(func, iterable[, chunksize])

map() 的惰性版本。

chunksize 引數與 map() 方法使用的引數相同。對於非常長的可迭代物件,使用較大的 chunksize 值可以使作業完成速度快得多,而不是使用預設值 1

此外,如果 chunksize1,則 imap() 方法返回的迭代器的 next() 方法有一個可選的 timeout 引數:如果結果無法在 timeout 秒內返回,next(timeout) 將引發 multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

imap() 相同,但返回的迭代器的結果順序應被認為是任意的。(只有當只有一個工作程序時,才能保證順序是“正確”的。)

starmap(func, iterable[, chunksize])

map() 類似,只是 iterable 的元素預計是可迭代的,並且會被解包為引數。

因此,[(1,2), (3, 4)]iterable 會產生 [func(1,2), func(3,4)]

3.3 版本中新增。

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

starmap()map_async() 的組合,它迭代 iterable 的可迭代物件,並使用解包後的可迭代物件呼叫 func。返回一個結果物件。

3.3 版本中新增。

close()

阻止向池提交任何更多工。一旦所有任務都已完成,工作程序將退出。

terminate()

立即停止工作程序,而不完成未完成的工作。當池物件被垃圾回收時,將立即呼叫 terminate()

join()

等待工作程序退出。必須在使用 join() 之前呼叫 close()terminate()

在 3.3 版本中變更: 池物件現在支援上下文管理協議 – 請參見 上下文管理器型別__enter__() 返回池物件,__exit__() 呼叫 terminate()

class multiprocessing.pool.AsyncResult

是由 Pool.apply_async()Pool.map_async() 返回的結果的類。

get([timeout])

當結果到達時返回結果。如果 timeout 不是 None 並且結果沒有在 timeout 秒內到達,則會引發 multiprocessing.TimeoutError。如果遠端呼叫引發了異常,則該異常將由 get() 重新引發。

wait([timeout])

等待直到結果可用或直到 timeout 秒過去。

ready()

返回呼叫是否已完成。

successful()

返回呼叫是否在未引發異常的情況下完成。如果結果尚未就緒,則會引發 ValueError 異常。

在 3.7 版本中變更: 如果結果尚未就緒,則會引發 ValueError 異常,而不是 AssertionError 異常。

以下示例演示瞭如何使用程序池

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

監聽器和客戶端

通常,程序之間的訊息傳遞是使用佇列或使用由 Pipe() 返回的 Connection 物件完成的。

但是,multiprocessing.connection 模組提供了一些額外的靈活性。它基本上為處理套接字或 Windows 命名管道提供了一個高階的面向訊息的 API。它還支援使用 hmac 模組進行摘要認證,並支援同時輪詢多個連線。

multiprocessing.connection.deliver_challenge(connection, authkey)

向連線的另一端傳送一條隨機生成的訊息,並等待回覆。

如果回覆與使用 authkey 作為金鑰的訊息摘要匹配,則會向連線的另一端傳送歡迎訊息。否則,會引發 AuthenticationError 異常。

multiprocessing.connection.answer_challenge(connection, authkey)

接收訊息,使用 authkey 作為金鑰計算訊息的摘要,然後將摘要傳送回去。

如果沒有收到歡迎訊息,則會引發 AuthenticationError 異常。

multiprocessing.connection.Client(address[, family[, authkey]])

嘗試建立與使用地址 address 的監聽器的連線,返回一個 Connection

連線的型別由 family 引數確定,但通常可以省略此引數,因為它通常可以從 address 的格式推斷出來。(請參閱 地址格式

如果給定了 authkey 且不為 None,則它應該是一個位元組字串,並將用作基於 HMAC 的身份驗證挑戰的金鑰。如果 authkeyNone,則不進行身份驗證。如果身份驗證失敗,則會引發 AuthenticationError 異常。請參閱 身份驗證金鑰

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

繫結套接字或 Windows 命名管道的包裝器,它“監聽”連線。

address 是監聽器物件的繫結套接字或命名管道使用的地址。

注意

如果使用 '0.0.0.0' 的地址,則該地址在 Windows 上將不是可連線的端點。 如果需要可連線的端點,則應使用 '127.0.0.1'。

family 是要使用的套接字(或命名管道)的型別。它可以是字串 'AF_INET' (用於 TCP 套接字)、'AF_UNIX' (用於 Unix 域套接字) 或 'AF_PIPE' (用於 Windows 命名管道) 中的一個。 在這些中,只有第一個保證可用。 如果 familyNone,則會從 address 的格式推斷出 family。 如果 address 也為 None,則會選擇預設值。 此預設值是假定為最快的可用 family。 請參閱 地址格式。請注意,如果 family'AF_UNIX' 且地址為 None,則將在使用 tempfile.mkstemp() 建立的私有臨時目錄中建立套接字。

如果監聽器物件使用套接字,則 backlog (預設為 1) 會在繫結後傳遞給套接字的 listen() 方法。

如果給定了 authkey 且不為 None,則它應該是一個位元組字串,並將用作基於 HMAC 的身份驗證挑戰的金鑰。如果 authkeyNone,則不進行身份驗證。如果身份驗證失敗,則會引發 AuthenticationError 異常。請參閱 身份驗證金鑰

accept()

接受監聽器物件的繫結套接字或命名管道上的連線,並返回一個 Connection 物件。如果嘗試身份驗證失敗,則會引發 AuthenticationError 異常。

close()

關閉監聽器物件的繫結套接字或命名管道。當監聽器被垃圾回收時,會自動呼叫此方法。但是,建議顯式呼叫它。

監聽器物件具有以下只讀屬性

address

監聽器物件正在使用的地址。

last_accepted

上次接受的連線的來源地址。如果此地址不可用,則為 None

在 3.3 版本中變更: 監聽器物件現在支援上下文管理協議 – 請參閱 上下文管理器型別__enter__() 返回監聽器物件,而 __exit__() 呼叫 close()

multiprocessing.connection.wait(object_list, timeout=None)

等待直到 object_list 中的物件準備就緒。返回 object_list 中準備就緒的物件的列表。如果 timeout 為浮點數,則呼叫最多會阻塞那麼多秒。如果 timeoutNone,則將無限期阻塞。負超時等效於零超時。

對於 POSIX 和 Windows,如果物件滿足以下條件,則可以出現在 object_list 中:

當有資料可從連線或套接字物件讀取時,或者另一端已關閉時,該連線或套接字物件就緒。

POSIX: wait(object_list, timeout) 幾乎等同於 select.select(object_list, [], [], timeout)。區別在於,如果 select.select() 被訊號中斷,它可能會引發錯誤號為 EINTROSError,而 wait() 不會。

Windows: object_list 中的項必須是一個可等待的整數控制代碼(根據 Win32 函式 WaitForMultipleObjects() 的文件定義),或者是一個具有 fileno() 方法的物件,該方法返回套接字控制代碼或管道控制代碼。(請注意,管道控制代碼和套接字控制代碼不是可等待的控制代碼。)

3.3 版本中新增。

示例

以下伺服器程式碼建立一個監聽器,它使用 'secret password' 作為身份驗證金鑰。然後它等待連線,並向客戶端傳送一些資料

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

以下程式碼連線到伺服器,並接收來自伺服器的一些資料

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

以下程式碼使用 wait() 來等待來自多個程序的訊息

from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

地址格式

  • 'AF_INET' 地址是一個 (hostname, port) 形式的元組,其中 hostname 是一個字串,port 是一個整數。

  • 'AF_UNIX' 地址是一個表示檔案系統中檔名的字串。

  • 'AF_PIPE' 地址是 r'\\.\pipe\PipeName' 形式的字串。要使用 Client() 連線到名為 ServerName 的遠端計算機上的命名管道,應使用 r'\\ServerName\pipe\PipeName' 形式的地址代替。

請注意,預設情況下,任何以兩個反斜槓開頭的字串都被假定為 'AF_PIPE' 地址,而不是 'AF_UNIX' 地址。

身份驗證金鑰

當使用 Connection.recv 時,接收到的資料會自動反序列化。不幸的是,從不受信任的來源反序列化資料存在安全風險。因此,ListenerClient() 使用 hmac 模組提供摘要身份驗證。

身份驗證金鑰是一個位元組字串,可以將其視為密碼:一旦建立連線,兩端都會要求證明另一端知道身份驗證金鑰。(證明兩端都使用相同的金鑰涉及透過連線傳送金鑰。)

如果請求身份驗證但未指定身份驗證金鑰,則使用 current_process().authkey 的返回值(請參閱 Process)。此值將自動被當前程序建立的任何 Process 物件繼承。這意味著(預設情況下)多程序程式的所有程序將共享一個身份驗證金鑰,該金鑰可用於在它們之間建立連線。

也可以使用 os.urandom() 生成合適的身份驗證金鑰。

日誌記錄

提供了一些日誌記錄支援。但是請注意,logging 包不使用程序共享鎖,因此來自不同程序的訊息可能會混淆(取決於處理程式型別)。

multiprocessing.get_logger()

返回 multiprocessing 使用的記錄器。如有必要,將建立一個新的記錄器。

首次建立時,記錄器的級別為 logging.NOTSET,並且沒有預設的處理程式。預設情況下,傳送到此記錄器的訊息不會傳播到根記錄器。

請注意,在 Windows 上,子程序只會繼承父程序記錄器的級別 - 記錄器的任何其他自定義設定都不會被繼承。

multiprocessing.log_to_stderr(level=None)

此函式呼叫 get_logger(),除了返回由 get_logger 建立的記錄器之外,它還新增一個處理程式,該處理程式使用格式 '[%(levelname)s/%(processName)s] %(message)s' 將輸出傳送到 sys.stderr。可以透過傳遞 level 引數來修改記錄器的 levelname

以下是啟用日誌記錄的示例會話

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

有關日誌記錄級別的完整表格,請參閱 logging 模組。

multiprocessing.dummy 模組

multiprocessing.dummy 複製了 multiprocessing 的 API,但它只不過是 threading 模組的包裝器。

特別是,multiprocessing.dummy 提供的 Pool 函式返回 ThreadPool 的例項,它是 Pool 的子類,支援所有相同的方法呼叫,但使用工作執行緒池而不是工作程序。

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

執行緒池物件,用於控制可向其提交作業的工作執行緒池。ThreadPool 例項與 Pool 例項完全介面相容,並且它們的資源也必須得到妥善管理,可以透過將池用作上下文管理器,或者手動呼叫 close()terminate() 來實現。

processes 是要使用的工作執行緒數。如果 processesNone,則使用 os.process_cpu_count() 返回的數字。

如果 initializer 不為 None,則每個工作程序在啟動時都會呼叫 initializer(*initargs)

Pool 不同,maxtasksperchildcontext 不能被提供。

注意

ThreadPoolPool 共享相同的介面,後者是圍繞程序池設計的,並且早於 concurrent.futures 模組的引入。因此,它繼承了一些對於由執行緒支援的池沒有意義的操作,並且它擁有自己的型別來表示非同步作業的狀態,AsyncResult,這種型別不被任何其他庫理解。

使用者通常應首選使用 concurrent.futures.ThreadPoolExecutor,它具有從一開始就圍繞執行緒設計的更簡單的介面,並且返回與包括 asyncio 在內的許多其他庫相容的 concurrent.futures.Future 例項。

程式設計指南

使用 multiprocessing 時,應遵守某些指南和習慣用法。

所有啟動方法

以下內容適用於所有啟動方法。

避免共享狀態

應儘可能避免在程序之間移動大量資料。

最好堅持使用佇列或管道進行程序間通訊,而不是使用較低級別的同步原語。

可 Pickling 性

確保代理方法的引數是可 pickling 的。

代理的執行緒安全性

除非使用鎖保護,否則不要從多個執行緒使用代理物件。

(不同程序使用*相同*代理永遠不會有問題。)

加入殭屍程序

在 POSIX 上,當程序完成但未被加入時,它會變成殭屍。不應該有很多,因為每次啟動新程序(或呼叫 active_children() 時),所有尚未加入的已完成程序都將被加入。另外,呼叫已完成程序的 Process.is_alive 將會加入該程序。即使如此,顯式加入你啟動的所有程序可能也是一個好習慣。

最好繼承而不是 pickle/unpickle

當使用 spawnforkserver 啟動方法時,multiprocessing 中的許多型別需要是可 pickling 的,以便子程序可以使用它們。但是,通常應避免使用管道或佇列將共享物件傳送到其他程序。相反,你應該安排程式,以便需要訪問在其他地方建立的共享資源的程序可以從祖先程序繼承它。

避免終止程序

使用 Process.terminate 方法停止程序可能會導致程序當前正在使用的任何共享資源(例如鎖、訊號量、管道和佇列)損壞或對其他程序不可用。

因此,最好只考慮在從不使用任何共享資源的程序上使用 Process.terminate

加入使用佇列的程序

請記住,已將專案放入佇列的程序在終止之前會等待,直到“饋送器”執行緒將所有緩衝的專案饋送到基礎管道。(子程序可以呼叫佇列的 Queue.cancel_join_thread 方法來避免此行為。)

這意味著,無論何時使用佇列,都需要確保在程序加入之前,最終會刪除已放入佇列中的所有專案。否則,你無法確定將專案放入佇列的程序是否會終止。還要記住,非守護程序將自動加入。

一個會發生死鎖的例子如下

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

這裡的解決方法是交換最後兩行(或直接刪除 p.join() 行)。

顯式將資源傳遞給子程序

在 POSIX 上,使用 fork 啟動方法,子程序可以使用全域性資源利用在父程序中建立的共享資源。但是,最好將該物件作為引數傳遞給子程序的建構函式。

除了使程式碼(可能)與 Windows 和其他啟動方法相容之外,這還可以確保只要子程序仍然處於活動狀態,該物件就不會在父程序中被垃圾回收。如果某些資源在父程序中被垃圾回收時被釋放,這可能很重要。

因此例如

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

應該重寫為

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

注意將 sys.stdin 替換為“類似檔案的物件”

multiprocessing 最初無條件呼叫

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法中——這導致了程序內程序的問題。這已更改為

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

這解決了程序相互衝突導致檔案描述符錯誤的根本問題,但是給將 sys.stdin() 替換為帶有輸出緩衝的“類檔案物件”的應用程式帶來了潛在的危險。這種危險是,如果多個程序在此類檔案物件上呼叫 close(),則可能導致相同的資料多次重新整理到該物件,從而導致損壞。

如果你編寫了一個類檔案物件並實現了自己的快取,則可以透過在每次新增到快取時儲存 pid,並在 pid 更改時丟棄快取來使其對 fork 安全。例如

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有關更多資訊,請參見 bpo-5155bpo-5313bpo-5331

spawnforkserver 啟動方法

有一些額外的限制不適用於 fork 啟動方法。

更多的可 Pickling 性

確保 Process.__init__() 的所有引數都是可 pickling 的。另外,如果你子類化 Process,請確保在呼叫 Process.start 方法時,例項將是可 pickling 的。

全域性變數

請記住,如果在子程序中執行的程式碼嘗試訪問全域性變數,則它看到的值(如果有)可能與呼叫 Process.start 時父程序中的值不同。

但是,只是模組級常量的全域性變數不會引起問題。

安全匯入主模組

確保主模組可以被新的 Python 直譯器安全地匯入,而不會引起意外的副作用(例如啟動新程序)。

例如,使用 spawnforkserver 啟動方法執行以下模組將失敗,並出現 RuntimeError

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

而是應該使用 if __name__ == '__main__': 來保護程式的“入口點”,如下所示:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(如果程式是正常執行而不是被凍結,則可以省略 freeze_support() 行。)

這允許新生成的 Python 直譯器安全地匯入模組,然後執行模組的 foo() 函式。

如果在主模組中建立了池(pool)或管理器(manager),則適用類似的限制。

示例

演示如何建立和使用自定義的管理器和代理

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

使用 Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

一個示例,展示如何使用佇列來向一組工作程序提供任務並收集結果

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()