multiprocessing
— 基於程序的並行¶
原始碼: Lib/multiprocessing/
可用性: 不支援 Android, 不支援 iOS, 不支援 WASI。
此模組不支援 移動平臺 或 WebAssembly 平臺。
簡介¶
multiprocessing
是一個支援使用類似於 threading
模組的 API 來生成程序的包。multiprocessing
包提供本地和遠端併發,透過使用子程序而不是執行緒來有效地繞過 全域性直譯器鎖。因此,multiprocessing
模組允許程式設計師充分利用給定機器上的多個處理器。它可以在 POSIX 和 Windows 上執行。
multiprocessing
模組還引入了在 threading
模組中沒有對應 API 的介面。一個典型的例子是 Pool
物件,它提供了一種方便的方式來並行化一個函式在多個輸入值上的執行,並將輸入資料分發到各個程序(資料並行)。下面的示例演示了在模組中定義此類函式的常見做法,以便子程序可以成功匯入該模組。這個使用 Pool
的資料並行基本示例,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
將會列印到標準輸出
[1, 4, 9]
另請參閱
concurrent.futures.ProcessPoolExecutor
提供了一個更高階的介面,可以將任務推送到後臺程序,而不會阻塞呼叫程序的執行。與直接使用 Pool
介面相比,concurrent.futures
API 更容易將工作提交到底層程序池與等待結果的操作分離。
Process
類¶
在 multiprocessing
中,程序是透過建立 Process
物件,然後呼叫其 start()
方法來生成的。Process
遵循 threading.Thread
的 API。一個簡單的多程序程式示例是
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
為了展示所涉及的各個程序 ID,這裡有一個擴充套件的示例
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
關於為什麼需要 if __name__ == '__main__'
部分的解釋,請參閱 程式設計指南。
上下文和啟動方法¶
根據平臺的不同,multiprocessing
支援三種啟動程序的方法。這些啟動方法是
- spawn
父程序啟動一個新的 Python 直譯器程序。子程序只會繼承執行程序物件的
run()
方法所必需的資源。特別是,父程序中不必要的檔案描述符和控制代碼不會被繼承。與使用 fork 或 forkserver 相比,使用此方法啟動程序速度較慢。在 POSIX 和 Windows 平臺上可用。在 Windows 和 macOS 上預設為此方法。
- fork
父程序使用
os.fork()
來 fork Python 直譯器。當子程序開始時,它實際上與父程序相同。父程序的所有資源都由子程序繼承。請注意,安全地 fork 多執行緒程序是有問題的。在 POSIX 系統上可用。目前在 POSIX 上(macOS 除外)預設為此方法。
注意
在 Python 3.14 中,預設啟動方法將不再是 fork。需要 fork 的程式碼應透過
get_context()
或set_start_method()
顯式指定。在 3.12 版本中變更: 如果 Python 檢測到你的程序有多個執行緒,此啟動方法內部呼叫的
os.fork()
函式將引發一個DeprecationWarning
。請使用不同的啟動方法。有關詳細解釋,請參閱os.fork()
文件。- forkserver
當程式啟動並選擇 *forkserver* 啟動方法時,會生成一個伺服器程序。從那時起,每當需要一個新程序時,父程序都會連線到伺服器並請求它 fork 一個新程序。除非系統庫或預載入的匯入以副作用的方式生成執行緒,否則 fork 伺服器程序是單執行緒的,因此通常可以安全地使用
os.fork()
。不會繼承不必要的資源。在支援透過 Unix 管道傳遞檔案描述符的 POSIX 平臺上可用,例如 Linux。
在 3.4 版本中變更: 在所有 POSIX 平臺上添加了 *spawn*,在某些 POSIX 平臺上添加了 *forkserver*。子程序不再繼承 Windows 上父程序的所有可繼承控制代碼。
在 3.8 版本中變更: 在 macOS 上,*spawn* 啟動方法現在是預設方法。*fork* 啟動方法應被認為是不安全的,因為它可能導致子程序崩潰,因為 macOS 系統庫可能會啟動執行緒。請參閱 bpo-33725。
在 POSIX 上使用 *spawn* 或 *forkserver* 啟動方法還會啟動一個*資源跟蹤器*程序,該程序跟蹤程式程序建立的未連結的命名系統資源(例如命名訊號量或 SharedMemory
物件)。當所有程序都退出後,資源跟蹤器會取消連結任何剩餘的跟蹤物件。通常應該沒有剩餘物件,但如果程序被訊號殺死,可能會有一些“洩漏”的資源。(洩漏的訊號量和共享記憶體段都不會自動取消連結,直到下次重啟。這對於這兩個物件都是有問題的,因為系統只允許有限數量的命名訊號量,並且共享記憶體段佔用主記憶體中的一些空間。)
要選擇啟動方法,請在主模組的 if __name__ == '__main__'
子句中使用 set_start_method()
。例如
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
在程式中不應使用多次。
或者,你可以使用 get_context()
來獲取上下文物件。上下文物件與 multiprocessing 模組具有相同的 API,並允許在同一程式中使用多個啟動方法。
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
請注意,與一個上下文相關的物件可能與不同上下文的程序不相容。特別是,使用 *fork* 上下文建立的鎖不能傳遞給使用 *spawn* 或 *forkserver* 啟動方法啟動的程序。
想要使用特定啟動方法的庫可能應該使用 get_context()
,以避免干擾庫使用者的選擇。
警告
在 POSIX 系統上,'spawn'
和 'forkserver'
啟動方法通常不能與“凍結”的可執行檔案(即,由 PyInstaller 和 cx_Freeze 等包生成的二進位制檔案)一起使用。如果程式碼不使用執行緒,'fork'
啟動方法可能會起作用。
程序間交換物件¶
multiprocessing
支援程序之間的兩種型別的通訊通道
佇列
Queue
類幾乎是queue.Queue
的克隆。例如from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()佇列是執行緒和程序安全的。放入
multiprocessing
佇列中的任何物件都將被序列化。
管道
Pipe()
函式返回一對透過管道連線的連線物件,預設情況下是雙向的(雙向)。例如from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()由
Pipe()
返回的兩個連線物件代表管道的兩端。每個連線物件都有send()
和recv()
方法(以及其他方法)。請注意,如果兩個程序(或執行緒)嘗試同時從管道的 *同一* 端讀取或寫入資料,則管道中的資料可能會損壞。當然,如果程序同時使用管道的不同端,則不存在損壞的風險。
send()
方法序列化物件,recv()
方法重新建立物件。
程序之間的同步¶
multiprocessing
包含來自 threading
的所有同步原語的等價物。例如,可以使用鎖來確保一次只有一個程序列印到標準輸出
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
如果不使用鎖,來自不同程序的輸出很可能會混在一起。
使用工作程序池¶
Pool
類表示一個工作程序池。它具有一些方法,允許以幾種不同的方式將任務解除安裝到工作程序。
例如
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 seconds
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
請注意,池的方法應僅由建立它的程序使用。
注意
此包中的功能要求子程序可以匯入 __main__
模組。這在 程式設計指南 中有介紹,但在此處指出也很有必要。這意味著某些示例(例如 multiprocessing.pool.Pool
示例)在互動式直譯器中將無法正常工作。例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
(如果您嘗試這樣做,實際上會以半隨機的方式輸出三個完整的追溯資訊,然後您可能需要以某種方式停止父程序。)
參考¶
multiprocessing
包主要複製了 threading
模組的 API。
Process
和異常¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
Process 物件表示在單獨程序中執行的活動。
Process
類具有threading.Thread
的所有方法的等價物。建構函式應始終使用關鍵字引數呼叫。group 應該始終為
None
;它存在僅僅是為了與threading.Thread
相容。target 是要由run()
方法呼叫的可呼叫物件。它預設為None
,這意味著不呼叫任何內容。name 是程序名稱(有關更多詳細資訊,請參閱name
)。args 是目標呼叫的引數元組。kwargs 是目標呼叫的關鍵字引數字典。如果提供,則僅關鍵字 daemon 引數會將程序daemon
標誌設定為True
或False
。如果為None
(預設值),則此標誌將從建立程序繼承。預設情況下,不向 target 傳遞任何引數。args 引數預設為
()
,可用於指定要傳遞給 target 的引數列表或元組。如果子類覆蓋了建構函式,則必須確保在對程序執行任何其他操作之前呼叫基類建構函式 (
Process.__init__()
)。在 3.3 版本中更改: 添加了 daemon 引數。
- run()¶
表示程序活動的方法。
您可以在子類中重寫此方法。標準的
run()
方法使用從 args 和 kwargs 引數中獲取的順序引數和關鍵字引數,呼叫傳遞給物件建構函式作為目標引數的可呼叫物件(如果有)。使用列表或元組作為傳遞給
Process
的 args 引數可以達到相同的效果。示例
>>> from multiprocessing import Process >>> p = Process(target=print, args=[1]) >>> p.run() 1 >>> p = Process(target=print, args=(1,)) >>> p.run() 1
- join([timeout])¶
如果可選引數 timeout 為
None
(預設值),則該方法會阻塞,直到呼叫其join()
方法的程序終止。如果 timeout 是正數,則最多阻塞 timeout 秒。請注意,如果程序終止或方法超時,則該方法返回None
。檢查程序的exitcode
以確定它是否已終止。一個程序可以被多次連線。
程序不能加入自身,因為這會導致死鎖。嘗試在程序啟動之前加入程序是錯誤的。
- name¶
程序的名稱。該名稱是一個字串,僅用於標識目的。它沒有語義。可以為多個程序指定相同的名稱。
初始名稱由建構函式設定。如果未向建構函式提供顯式名稱,則會構造一個形如“Process-N1:N2:…:Nk”的名稱,其中每個 Nk 是其父級的第 N 個子級。
- daemon¶
程序的守護程式標誌,一個布林值。必須在呼叫
start()
之前設定此值。初始值從建立程序繼承。
當程序退出時,它會嘗試終止其所有守護程序子程序。
請注意,不允許守護程序建立子程序。否則,如果守護程序在其父程序退出時被終止,則會留下其子程序孤立。此外,這些不是 Unix 守護程式或服務,它們是普通的程序,如果非守護程序已退出,則會被終止(而不是被連線)。
除了
threading.Thread
API 之外,Process
物件還支援以下屬性和方法:- pid¶
返回程序 ID。在程序生成之前,此值將為
None
。
- exitcode¶
子程序的退出程式碼。如果程序尚未終止,則此值為
None
。如果子程序的
run()
方法正常返回,則退出程式碼將為 0。如果它透過帶有整數引數 N 的sys.exit()
終止,則退出程式碼將為 N。如果子程序由於未在
run()
中捕獲的異常而終止,則退出程式碼將為 1。如果它被訊號 N 終止,則退出程式碼將為負值 -N。
- authkey¶
程序的身份驗證金鑰(位元組字串)。
當
multiprocessing
初始化時,主程序會被分配一個使用os.urandom()
生成的隨機字串。當建立
Process
物件時,它將繼承其父程序的身份驗證金鑰,儘管可以透過將authkey
設定為另一個位元組字串來更改此金鑰。請參閱 身份驗證金鑰。
- sentinel¶
一個系統物件的數字控制代碼,當程序結束時它將變為“就緒”。
如果您想使用
multiprocessing.connection.wait()
一次等待多個事件,則可以使用此值。否則,呼叫join()
更簡單。在 Windows 上,這是一個可與
WaitForSingleObject
和WaitForMultipleObjects
系列 API 呼叫一起使用的 OS 控制代碼。在 POSIX 上,這是一個可與select
模組中的原語一起使用的檔案描述符。3.3 版本中新增。
- terminate()¶
終止程序。在 POSIX 上,這是使用
SIGTERM
訊號完成的;在 Windows 上,使用TerminateProcess()
。請注意,不會執行退出處理程式和 finally 子句等。請注意,程序的後代程序將不會被終止 - 它們將簡單地變為孤立程序。
警告
如果在關聯程序使用管道或佇列時使用此方法,則管道或佇列可能會損壞,並且可能無法被其他程序使用。同樣,如果該程序已獲取鎖或訊號量等,則終止它可能會導致其他程序死鎖。
- kill()¶
與
terminate()
相同,但在 POSIX 上使用SIGKILL
訊號。3.7 版本中新增。
- close()¶
關閉
Process
物件,釋放與其關聯的所有資源。如果底層程序仍在執行,則會引發ValueError
。一旦close()
成功返回,Process
物件的大多數其他方法和屬性將引發ValueError
。3.7 版本中新增。
請注意,
start()
、join()
、is_alive()
、terminate()
和exitcode
方法應僅由建立程序物件的程序呼叫。一些
Process
方法的示例用法>>> import multiprocessing, time, signal >>> mp_context = multiprocessing.get_context('spawn') >>> p = mp_context.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <...Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <...Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <...Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError¶
所有
multiprocessing
異常的基類。
- exception multiprocessing.BufferTooShort¶
當提供的緩衝區物件太小,無法讀取訊息時,
Connection.recv_bytes_into()
引發的異常。如果
e
是BufferTooShort
的例項,則e.args[0]
將以位元組字串的形式給出訊息。
- exception multiprocessing.AuthenticationError¶
當出現身份驗證錯誤時引發。
- exception multiprocessing.TimeoutError¶
當超時時間到期時,具有超時的方法會引發此異常。
管道和佇列¶
當使用多個程序時,通常使用訊息傳遞來實現程序之間的通訊,並避免使用任何同步原語(如鎖)。
對於傳遞訊息,可以使用 Pipe()
(用於兩個程序之間的連線)或佇列(允許多個生產者和消費者)。
Queue
、SimpleQueue
和 JoinableQueue
型別是多生產者、多消費者 FIFO 佇列,建模於標準庫中的 queue.Queue
類。它們的區別在於,Queue
缺少 Python 2.5 的 queue.Queue
類中引入的 task_done()
和 join()
方法。
如果使用 JoinableQueue
,則**必須**對從佇列中刪除的每個任務呼叫 JoinableQueue.task_done()
,否則用於計算未完成任務數量的訊號量可能會最終溢位,從而引發異常。
與其他 Python 佇列實現的一個區別是,multiprocessing
佇列使用 pickle
序列化放入其中的所有物件。get 方法返回的物件是重新建立的物件,它不與原始物件共享記憶體。
請注意,還可以使用管理器物件建立共享佇列 - 請參閱 管理器。
注意
multiprocessing
使用通常的 queue.Empty
和 queue.Full
異常來指示超時。它們在 multiprocessing
名稱空間中不可用,因此需要從 queue
匯入它們。
注意
當一個物件被放入佇列時,該物件會被序列化(pickled),然後後臺執行緒會將序列化後的資料重新整理到下層的管道中。這會產生一些有點出乎意料的後果,但應該不會造成任何實際困難——如果這些問題真的困擾你,那麼你可以改用使用管理器建立的佇列。
在將一個物件放入空佇列後,可能會有一個極小的延遲,之後佇列的
empty()
方法返回False
,並且get_nowait()
可以返回而不引發queue.Empty
異常。如果多個程序正在將物件入隊,則物件可能以亂序的方式在另一端被接收。但是,同一個程序入隊的多個物件之間始終會保持預期的順序。
警告
如果一個程序在使用Queue
時被使用Process.terminate()
或os.kill()
終止,那麼佇列中的資料很可能會損壞。這可能會導致任何其他程序在稍後嘗試使用該佇列時出現異常。
警告
如上所述,如果一個子程序已將專案放入佇列(並且它沒有使用JoinableQueue.cancel_join_thread
),那麼該程序將不會終止,直到所有緩衝的專案都被重新整理到管道中。
這意味著,除非你確定所有放入佇列的專案都已被消耗,否則如果你嘗試加入該程序,可能會出現死鎖。類似地,如果子程序是非守護程序,那麼父程序在退出時嘗試加入其所有非守護子程序時可能會掛起。
請注意,使用管理器建立的佇列沒有這個問題。請參閱程式設計指南。
有關使用佇列進行程序間通訊的示例,請參閱示例。
- multiprocessing.Pipe([duplex])¶
返回一對
(conn1, conn2)
Connection
物件,表示管道的兩端。如果 duplex 為
True
(預設值),則管道是雙向的。如果 duplex 為False
,則管道是單向的:conn1
只能用於接收訊息,而conn2
只能用於傳送訊息。send()
方法使用pickle
序列化物件,而recv()
則重新建立物件。
- class multiprocessing.Queue([maxsize])¶
返回一個使用管道和一些鎖/訊號量實現的程序共享佇列。當程序首次將一個專案放入佇列時,會啟動一個饋送執行緒,該執行緒將物件從緩衝區傳輸到管道中。
標準的
queue.Empty
和queue.Full
異常(來自標準庫的queue
模組)會被引發以指示超時。Queue
實現了queue.Queue
的所有方法,除了task_done()
和join()
。- qsize()¶
返回佇列的近似大小。由於多執行緒/多程序語義,此數字不可靠。
請注意,在像 macOS 這樣未實現
sem_getvalue()
的平臺上,這可能會引發NotImplementedError
異常。
- full()¶
如果佇列已滿,則返回
True
,否則返回False
。由於多執行緒/多程序語義,這是不可靠的。
- put(obj[, block[, timeout]])¶
將 obj 放入佇列。如果可選引數 block 為
True
(預設值)並且 timeout 為None
(預設值),則在有可用空閒槽位之前阻塞。如果 timeout 是一個正數,它會最多阻塞 timeout 秒,並且如果在該時間內沒有可用的空閒槽位,則會引發queue.Full
異常。否則(block 為False
),如果有可用的空閒槽位,則將專案放入佇列,否則引發queue.Full
異常(在這種情況下,timeout 將被忽略)。在 3.8 版本中更改: 如果佇列已關閉,則會引發
ValueError
異常,而不是AssertionError
異常。
- put_nowait(obj)¶
等效於
put(obj, False)
。
- get([block[, timeout]])¶
從佇列中移除並返回一個專案。如果可選引數 block 為
True
(預設值)並且 timeout 為None
(預設值),則在有可用專案之前阻塞。如果 timeout 是一個正數,它會最多阻塞 timeout 秒,並且如果在該時間內沒有可用的專案,則會引發queue.Empty
異常。否則(block 為False
),如果有可用的專案,則返回一個專案,否則引發queue.Empty
異常(在這種情況下,timeout 將被忽略)。在 3.8 版本中更改: 如果佇列已關閉,則會引發
ValueError
異常,而不是OSError
異常。
- get_nowait()¶
等效於
get(False)
。
multiprocessing.Queue
具有一些queue.Queue
中沒有的額外方法。這些方法對於大多數程式碼通常是不必要的。- close()¶
表示當前程序將不再向此佇列放入資料。一旦後臺執行緒將所有緩衝資料重新整理到管道,它就會退出。當佇列被垃圾回收時,會自動呼叫此方法。
- join_thread()¶
加入後臺執行緒。此方法只能在
close()
被呼叫後使用。它會阻塞直到後臺執行緒退出,確保緩衝區中的所有資料都被重新整理到管道。預設情況下,如果程序不是佇列的建立者,則在退出時,它將嘗試加入佇列的後臺執行緒。程序可以呼叫
cancel_join_thread()
來使join_thread()
不執行任何操作。
- cancel_join_thread()¶
防止
join_thread()
阻塞。特別是,這會阻止後臺執行緒在程序退出時自動加入 —— 請參閱join_thread()
。這個方法更好的名稱可能是
allow_exit_without_flush()
。 它很可能會導致排隊的資料丟失,而且你幾乎肯定不需要使用它。 只有當您需要當前程序立即退出,而不等待將排隊的資料重新整理到底層管道,並且您不在乎丟失資料時,它才真正有用。
注意
此類的功能需要在主機作業系統上具有正常工作的共享訊號量實現。如果沒有,則此類的功能將被停用,並且嘗試例項化
Queue
將導致ImportError
。有關其他資訊,請參閱 bpo-3770。對於下面列出的任何特殊佇列型別,情況也是如此。
- class multiprocessing.SimpleQueue¶
它是一個簡化的
Queue
型別,非常接近鎖定的Pipe
。- get()¶
從佇列中移除並返回一個專案。
- put(item)¶
將item放入佇列。
- class multiprocessing.JoinableQueue([maxsize])¶
JoinableQueue
是Queue
的子類,它還具有task_done()
和join()
方法。- task_done()¶
表示先前入隊的任務已完成。由佇列消費者使用。對於每個用於獲取任務的
get()
,隨後呼叫task_done()
會告訴佇列該任務的處理已完成。如果
join()
當前正在阻塞,則當所有專案都已處理完畢時,它將恢復(這意味著對於放入佇列的每個專案,都收到了task_done()
呼叫)。如果呼叫的次數多於放入佇列的專案數,則會引發
ValueError
。
- join()¶
阻塞直到佇列中的所有專案都已被獲取和處理。
每當向佇列新增一個專案時,未完成任務的計數就會增加。每當消費者呼叫
task_done()
以指示該專案已被檢索並且所有工作都已完成時,計數就會減少。當未完成任務的計數降至零時,join()
將解除阻塞。
雜項¶
- multiprocessing.active_children()¶
返回當前程序的所有活動子程序的列表。
呼叫此函式會產生“加入”任何已完成的程序的副作用。
- multiprocessing.cpu_count()¶
返回系統中的 CPU 數量。
此數字不等同於當前程序可以使用的 CPU 數量。 可用 CPU 的數量可以使用
os.process_cpu_count()
(或len(os.sched_getaffinity(0))
)獲取。當無法確定 CPU 數量時,會引發
NotImplementedError
。在 3.13 版本中變更: 返回值也可以使用
-X cpu_count
標誌或PYTHON_CPU_COUNT
覆蓋,因為這僅僅是對os
cpu count API 的封裝。
- multiprocessing.parent_process()¶
返回與
current_process()
的父程序對應的Process
物件。對於主程序,parent_process
將為None
。3.8 版本中新增。
- multiprocessing.freeze_support()¶
新增對使用
multiprocessing
的程式被凍結以生成 Windows 可執行檔案時的支援。(已使用 py2exe、PyInstaller 和 cx_Freeze 進行測試。)需要在主模組的
if __name__ == '__main__'
行之後立即呼叫此函式。例如:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
如果省略了
freeze_support()
行,則嘗試執行凍結的可執行檔案將引發RuntimeError
。在 Windows 以外的任何作業系統上呼叫
freeze_support()
均無效。此外,如果該模組在 Windows 上由 Python 直譯器正常執行(程式未被凍結),則freeze_support()
也無效。
- multiprocessing.get_all_start_methods()¶
返回支援的啟動方法列表,其中第一個是預設方法。可能的啟動方法有
'fork'
,'spawn'
和'forkserver'
。並非所有平臺都支援所有方法。請參閱 上下文和啟動方法。3.4 版本中新增。
- multiprocessing.get_context(method=None)¶
返回一個上下文物件,該物件具有與
multiprocessing
模組相同的屬性。如果 method 為
None
,則返回預設上下文。否則,method 應為'fork'
、'spawn'
或'forkserver'
。如果指定的啟動方法不可用,則引發ValueError
。請參閱 上下文和啟動方法。3.4 版本中新增。
- multiprocessing.get_start_method(allow_none=False)¶
返回用於啟動程序的啟動方法的名稱。
如果啟動方法尚未固定且 allow_none 為 false,則啟動方法固定為預設值並返回名稱。如果啟動方法尚未固定且 allow_none 為 true,則返回
None
。返回值可以是
'fork'
、'spawn'
、'forkserver'
或None
。請參閱 上下文和啟動方法。3.4 版本中新增。
在 3.8 版本中更改: 在 macOS 上,*spawn* 啟動方法現在是預設方法。*fork* 啟動方法應被視為不安全,因為它可能導致子程序崩潰。請參閱 bpo-33725。
- multiprocessing.set_executable(executable)¶
設定啟動子程序時要使用的 Python 直譯器的路徑。(預設情況下使用
sys.executable
)。嵌入器可能需要執行以下操作set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
然後才能建立子程序。
在 3.4 版本中更改: 當使用
'spawn'
啟動方法時,現在在 POSIX 上受支援。在 3.11 版本中更改: 接受 path-like object。
- multiprocessing.set_forkserver_preload(module_names)¶
設定 forkserver 主程序嘗試匯入的模組名稱列表,以便 forked 程序繼承其已匯入的狀態。執行此操作時,任何
ImportError
都將被靜默忽略。這可以用作效能增強,以避免在每個程序中重複工作。為了使其正常工作,必須在 forkserver 程序啟動之前(在建立
Pool
或啟動Process
之前)呼叫它。僅在使用
'forkserver'
啟動方法時才有意義。請參閱 上下文和啟動方法。3.4 版本中新增。
- multiprocessing.set_start_method(method, force=False)¶
設定應用於啟動子程序的方法。method 引數可以是
'fork'
、'spawn'
或'forkserver'
。如果已設定啟動方法且 force 不是True
,則會引發RuntimeError
。如果 method 為None
且 force 為True
,則啟動方法設定為None
。如果 method 為None
且 force 為False
,則上下文設定為預設上下文。請注意,此函式最多應呼叫一次,並且應在主模組的
if __name__ == '__main__'
子句中進行保護。請參閱 上下文和啟動方法。
3.4 版本中新增。
連線物件¶
連線物件允許傳送和接收可 pickle 的物件或字串。可以將它們視為面向訊息的已連線套接字。
連線物件通常使用 Pipe
建立 – 另請參閱 偵聽器和客戶端。
- class multiprocessing.connection.Connection¶
- send(obj)¶
將一個物件傳送到連線的另一端,該物件應使用
recv()
讀取。該物件必須是可序列化的。非常大的 pickle (大約 32 MiB+,雖然取決於作業系統) 可能會引發
ValueError
異常。
- fileno()¶
返回連線使用的檔案描述符或控制代碼。
- close()¶
關閉連線。
當連線被垃圾回收時,會自動呼叫此方法。
- poll([timeout])¶
返回是否有任何資料可供讀取。
如果未指定 timeout,則會立即返回。如果 timeout 是一個數字,則指定阻塞的最大時間(以秒為單位)。如果 timeout 為
None
,則使用無限超時。請注意,可以使用
multiprocessing.connection.wait()
一次輪詢多個連線物件。
- send_bytes(buffer[, offset[, size]])¶
從 類位元組物件 傳送位元組資料作為完整訊息。
如果給出了 offset,則從 buffer 中的該位置讀取資料。如果給出了 size,則將從 buffer 中讀取該位元組數。非常大的緩衝區(大約 32 MiB+,雖然取決於作業系統)可能會引發
ValueError
異常。
- recv_bytes([maxlength])¶
返回從連線另一端傳送的位元組資料的完整訊息,並以字串形式返回。會阻塞直到有資料接收。如果沒有任何內容可接收並且另一端已關閉,則引發
EOFError
。如果指定了 maxlength 並且訊息長度大於 maxlength,則會引發
OSError
,並且連線將不再可讀。
- recv_bytes_into(buffer[, offset])¶
將從連線另一端傳送的位元組資料的完整訊息讀取到 buffer 中,並返回訊息中的位元組數。會阻塞直到有資料接收。如果沒有任何內容可接收並且另一端已關閉,則引發
EOFError
。buffer 必須是可寫的 類位元組物件。如果給出了 offset,則將從該位置開始將訊息寫入緩衝區。Offset 必須是一個非負整數,並且小於 buffer 的長度(以位元組為單位)。
如果緩衝區太短,則會引發
BufferTooShort
異常,並且完整訊息可用作e.args[0]
,其中e
是異常例項。
在 3.3 版本中更改: 現在可以使用
Connection.send()
和Connection.recv()
在程序之間傳輸連線物件本身。連線物件現在還支援上下文管理協議 - 請參見 上下文管理器型別。
__enter__()
返回連線物件,__exit__()
呼叫close()
。
例如
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()
方法會自動反序列化它接收的資料,除非您可以信任傳送訊息的程序,否則這可能存在安全風險。
因此,除非使用 Pipe()
生成了連線物件,否則您應僅在執行某種身份驗證後才使用 recv()
和 send()
方法。請參閱 身份驗證金鑰。
警告
如果程序在嘗試讀取或寫入管道時被終止,則管道中的資料很可能會損壞,因為可能無法確定訊息邊界在哪裡。
同步原語¶
通常,在多程序程式中,同步原語不如在多執行緒程式中那麼必要。請參閱 threading
模組的文件。
請注意,還可以透過使用管理器物件建立同步原語 - 請參閱 管理器。
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
柵欄物件:
threading.Barrier
的克隆。3.3 版本中新增。
- class multiprocessing.BoundedSemaphore([value])¶
有界訊號量物件:與
threading.BoundedSemaphore
非常相似。與其密切對應的物件存在一個細微的區別:它的
acquire
方法的第一個引數被命名為 block,這與Lock.acquire()
一致。注意
在 macOS 上,它與
Semaphore
沒有區別,因為sem_getvalue()
未在該平臺上實現。
- class multiprocessing.Condition([lock])¶
條件變數:
threading.Condition
的別名。如果指定了 lock,則它應該是來自
multiprocessing
的Lock
或RLock
物件。在 3.3 版本中更改: 添加了
wait_for()
方法。
- class multiprocessing.Event¶
是
threading.Event
的一個克隆。
- class multiprocessing.Lock¶
一個非遞迴鎖物件:與
threading.Lock
非常相似。一旦程序或執行緒獲取了鎖,後續任何程序或執行緒嘗試獲取該鎖都會被阻塞,直到該鎖被釋放;任何程序或執行緒都可以釋放該鎖。threading.Lock
的概念和行為(在應用於執行緒時)在multiprocessing.Lock
中被複制,應用於程序或執行緒,除非另有說明。請注意,
Lock
實際上是一個工廠函式,它返回一個使用預設上下文初始化的multiprocessing.synchronize.Lock
例項。Lock
支援 上下文管理器 協議,因此可以在with
語句中使用。- acquire(block=True, timeout=None)¶
獲取鎖,阻塞或非阻塞。
如果 block 引數設定為
True
(預設值),則方法呼叫將阻塞,直到鎖處於未鎖定狀態,然後將其設定為鎖定狀態並返回True
。請注意,第一個引數的名稱與threading.Lock.acquire()
中的名稱不同。如果 block 引數設定為
False
,則方法呼叫不會阻塞。如果鎖當前處於鎖定狀態,則返回False
;否則,將鎖設定為鎖定狀態並返回True
。當使用正浮點值 timeout 呼叫時,只要無法獲取鎖,最多會阻塞 timeout 指定的秒數。使用負值 timeout 呼叫等效於 timeout 為零。使用 timeout 值為
None
(預設值)的呼叫將超時時間設定為無限。請注意,timeout 的負值或None
值的處理方式與threading.Lock.acquire()
中實現的有所不同。如果 block 引數設定為False
,則 timeout 引數沒有實際意義,因此會被忽略。如果已獲取鎖,則返回True
;如果超時時間已過,則返回False
。
- release()¶
釋放鎖。這可以從任何程序或執行緒呼叫,而不僅僅是最初獲取鎖的程序或執行緒。
行為與
threading.Lock.release()
中的行為相同,但當在未鎖定的鎖上呼叫時,會引發ValueError
。
- class multiprocessing.RLock¶
一個遞迴鎖物件:與
threading.RLock
非常相似。遞迴鎖必須由獲取它的程序或執行緒釋放。一旦程序或執行緒獲取了遞迴鎖,同一程序或執行緒可以再次獲取它而不會阻塞;該程序或執行緒必須在每次獲取後釋放一次。請注意,
RLock
實際上是一個工廠函式,它返回一個使用預設上下文初始化的multiprocessing.synchronize.RLock
例項。RLock
支援 上下文管理器 協議,因此可以在with
語句中使用。- acquire(block=True, timeout=None)¶
獲取鎖,阻塞或非阻塞。
當使用 block 引數設定為
True
呼叫時,會阻塞,直到鎖處於未鎖定狀態(不歸任何程序或執行緒所有),除非鎖已歸當前程序或執行緒所有。然後,當前程序或執行緒獲取鎖的所有權(如果它尚未擁有所有權),並且鎖內的遞迴級別加 1,導致返回值為True
。請注意,與threading.RLock.acquire()
的實現相比,此第一個引數的行為存在一些差異,從引數本身的名稱開始。當使用 block 引數設定為
False
呼叫時,不會阻塞。如果鎖已被另一個程序或執行緒獲取(因此被擁有),則當前程序或執行緒不會獲取所有權,並且鎖內的遞迴級別不會更改,導致返回值為False
。如果鎖處於未鎖定狀態,則當前程序或執行緒獲取所有權,並且遞迴級別會遞增,導致返回值為True
。timeout 引數的使用和行為與
Lock.acquire()
中的相同。請注意,timeout 的某些行為與threading.RLock.acquire()
中實現的有所不同。
- release()¶
釋放鎖,遞迴級別遞減。如果在遞減後遞迴級別為零,則將鎖重置為未鎖定狀態(不歸任何程序或執行緒所有),並且如果有任何其他程序或執行緒被阻塞等待鎖變為未鎖定狀態,則允許其中一個繼續進行。如果在遞減後遞迴級別仍然不為零,則鎖保持鎖定狀態,並由呼叫程序或執行緒擁有。
僅當呼叫程序或執行緒擁有鎖時才呼叫此方法。如果此方法由所有者以外的程序或執行緒呼叫,或者如果鎖處於未鎖定(未擁有)狀態,則會引發
AssertionError
。請注意,在這種情況下引發的異常型別與threading.RLock.release()
中實現的有所不同。
- class multiprocessing.Semaphore([value])¶
一個訊號量物件:與
threading.Semaphore
非常相似。與其密切對應的物件存在一個細微的區別:它的
acquire
方法的第一個引數被命名為 block,這與Lock.acquire()
一致。
注意
在 macOS 上,不支援 sem_timedwait
,因此使用超時呼叫 acquire()
將使用睡眠迴圈來模擬該函式的行為。
注意
此軟體包的某些功能需要在主機作業系統上具有正常工作的共享訊號量實現。如果沒有,將停用 multiprocessing.synchronize
模組,並且嘗試匯入它將導致 ImportError
。有關更多資訊,請參閱 bpo-3770。
管理器¶
管理器提供了一種建立可以在不同程序之間共享的資料的方法,包括在不同機器上執行的程序之間透過網路共享。管理器物件控制一個伺服器程序,該程序管理共享物件。其他程序可以透過使用代理來訪問共享物件。
- multiprocessing.Manager()¶
返回一個已啟動的
SyncManager
物件,該物件可用於在程序之間共享物件。返回的管理器物件對應於一個已衍生的子程序,並具有可建立共享物件並返回相應代理的方法。
管理器程序會在它們被垃圾回收或其父程序退出時立即關閉。管理器類在 multiprocessing.managers
模組中定義。
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)¶
建立一個 BaseManager 物件。
一旦建立,應該呼叫
start()
或get_server().serve_forever()
以確保管理器物件引用一個已啟動的管理器程序。address 是管理器程序偵聽新連線的地址。如果 address 為
None
,則會選擇一個任意地址。authkey 是用於檢查到伺服器程序的傳入連線有效性的身份驗證金鑰。如果 authkey 為
None
,則使用current_process().authkey
。否則,使用 authkey,並且它必須是一個位元組字串。serializer 必須是
'pickle'
(使用pickle
序列化)或'xmlrpclib'
(使用xmlrpc.client
序列化)。ctx 是一個上下文物件,或
None
(使用當前上下文)。請參閱get_context()
函式。shutdown_timeout 是一個以秒為單位的超時時間,用於等待管理器使用的程序在
shutdown()
方法中完成。如果關閉超時,則程序被終止。如果終止程序也超時,則程序被強制結束。在 3.11 版本中更改: 添加了 shutdown_timeout 引數。
- start([initializer[, initargs]])¶
啟動一個子程序來啟動管理器。如果 initializer 不是
None
,則子程序會在啟動時呼叫initializer(*initargs)
。
- get_server()¶
返回一個
Server
物件,該物件表示受管理器控制的實際伺服器。Server
物件支援serve_forever()
方法>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
另外還有一個address
屬性。
- connect()¶
將本地管理器物件連線到遠端管理器程序
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
一個類方法,可用於向管理器類註冊型別或可呼叫物件。
typeid 是一個“型別識別符號”,用於標識特定型別的共享物件。這必須是一個字串。
callable 是一個可呼叫物件,用於為該型別識別符號建立物件。如果管理器例項將使用
connect()
方法連線到伺服器,或者如果 create_method 引數為False
,則可以將其保留為None
。proxytype 是
BaseProxy
的子類,用於為具有此 typeid 的共享物件建立代理。如果為None
,則會自動建立一個代理類。exposed 用於指定一系列方法名稱,應允許此 typeid 的代理使用
BaseProxy._callmethod()
訪問。(如果 exposed 為None
,則如果存在proxytype._exposed_
,則會改用它。)在未指定公開列表的情況下,共享物件的所有“公共方法”都將可以訪問。(這裡的“公共方法”是指任何具有__call__()
方法並且其名稱不以'_'
開頭的屬性。)method_to_typeid 是一個對映,用於指定應返回代理的那些公開方法的返回型別。它將方法名稱對映到 typeid 字串。(如果 method_to_typeid 為
None
,則如果存在proxytype._method_to_typeid_
,則會改用它。)如果方法的名稱不是此對映的鍵,或者如果對映為None
,則該方法返回的物件將按值複製。create_method 確定是否應建立名稱為 typeid 的方法,該方法可用於告知伺服器程序建立新的共享物件併為其返回代理。預設情況下為
True
。
BaseManager
例項還具有一個只讀屬性- address¶
管理器使用的地址。
在 3.3 版本中更改: 管理器物件支援上下文管理協議 - 請參閱 上下文管理器型別。
__enter__()
啟動伺服器程序(如果尚未啟動),然後返回管理器物件。__exit__()
呼叫shutdown()
。在以前的版本中,如果管理器的伺服器程序尚未啟動,則
__enter__()
不會啟動該程序。
- class multiprocessing.managers.SyncManager¶
是
BaseManager
的子類,可用於程序同步。此型別的物件由multiprocessing.Manager()
返回。它的方法會建立並返回用於跨程序同步的多種常用資料型別的 代理物件。這主要包括共享列表和字典。
- Barrier(parties[, action[, timeout]])¶
建立一個共享的
threading.Barrier
物件並返回它的代理。3.3 版本中新增。
- BoundedSemaphore([value])¶
建立一個共享的
threading.BoundedSemaphore
物件並返回它的代理。
- Condition([lock])¶
建立一個共享的
threading.Condition
物件並返回它的代理。如果提供了 *lock*,則它應該是
threading.Lock
或threading.RLock
物件的代理。在 3.3 版本中更改: 添加了
wait_for()
方法。
- Event()¶
建立一個共享的
threading.Event
物件並返回它的代理。
- Lock()¶
建立一個共享的
threading.Lock
物件並返回它的代理。
- Queue([maxsize])¶
建立一個共享的
queue.Queue
物件並返回它的代理。
- RLock()¶
建立一個共享的
threading.RLock
物件並返回它的代理。
- Semaphore([value])¶
建立一個共享的
threading.Semaphore
物件並返回它的代理。
- Array(typecode, sequence)¶
建立一個數組並返回它的代理。
- Value(typecode, value)¶
建立一個具有可寫
value
屬性的物件,並返回它的代理。
在 3.6 版本中變更: 共享物件可以巢狀。例如,共享容器物件(如共享列表)可以包含其他共享物件,這些物件都將由
SyncManager
管理和同步。
- class multiprocessing.managers.Namespace¶
是可以向
SyncManager
註冊的型別。名稱空間物件沒有公共方法,但具有可寫屬性。它的表示形式顯示其屬性的值。
但是,當使用名稱空間物件的代理時,以
'_'
開頭的屬性將是代理的屬性,而不是引用物件的屬性>>> mp_context = multiprocessing.get_context('spawn') >>> manager = mp_context.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
自定義管理器¶
要建立自己的管理器,可以建立 BaseManager
的子類,並使用 register()
類方法向管理器類註冊新型別或可呼叫物件。例如
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
使用遠端管理器¶
可以在一臺機器上執行管理器伺服器,並允許客戶端從其他機器上使用它(假設所涉及的防火牆允許這樣做)。
執行以下命令會為單個共享佇列建立一個伺服器,遠端客戶端可以訪問該佇列
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
一個客戶端可以按如下方式訪問伺服器
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
另一個客戶端也可以使用它
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地程序也可以訪問該佇列,使用上面客戶端的程式碼遠端訪問它
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理物件¶
代理是一個 *引用* 共享物件的物件,該共享物件(可能)存在於不同的程序中。共享物件被稱為代理的 *引用物件*。多個代理物件可能具有相同的引用物件。
代理物件具有呼叫其引用物件的相應方法的方法(儘管並非引用物件的每個方法都可以透過代理使用)。透過這種方式,代理的使用方式與引用物件相同
>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
請注意,將 str()
應用於代理將返回引用物件的表示形式,而應用 repr()
將返回代理的表示形式。
代理物件的一個重要特性是它們是可序列化的,因此可以在程序之間傳遞。因此,引用物件可以包含 代理物件。這允許對這些託管列表、字典和其他 代理物件 進行巢狀
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
類似地,字典和列表代理可以相互巢狀
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
如果引用物件中包含標準(非代理)list
或 dict
物件,則對這些可變值的修改不會透過管理器傳播,因為代理無法知道何時修改了其中包含的值。但是,在容器代理中儲存值(這會在代理物件上觸發 __setitem__
)會透過管理器傳播,因此要有效地修改此類專案,可以將修改後的值重新分配給容器代理
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
這種方法可能不如對大多數用例使用巢狀的 代理物件 方便,但它也展示了對同步的控制程度。
注意
multiprocessing
中的代理型別不支援按值比較。例如,我們有
>>> manager.list([1,2,3]) == [1,2,3]
False
進行比較時,應改為使用引用物件的副本。
- class multiprocessing.managers.BaseProxy¶
代理物件是
BaseProxy
子類的例項。- _callmethod(methodname[, args[, kwds]])¶
呼叫並返回代理物件的引用物件的方法的結果。
如果
proxy
是一個代理,其引用物件是obj
,那麼表示式proxy._callmethod(methodname, args, kwds)
將會計算表示式
getattr(obj, methodname)(*args, **kwds)
在管理器程序中。
返回的值將是呼叫結果的副本或新共享物件的代理 - 請參閱
BaseManager.register()
的 method_to_typeid 引數的文件。如果呼叫引發異常,則
_callmethod()
將重新引發該異常。如果在管理器程序中引發其他異常,則將其轉換為RemoteError
異常,並由_callmethod()
引發。請特別注意,如果 methodname 沒有被 *公開*,則會引發異常。
以下是
_callmethod()
的使用示例>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()¶
返回引用物件的副本。
如果引用物件不可 pickle,則會引發異常。
- __repr__()¶
返回代理物件的表示。
- __str__()¶
返回引用物件的表示。
清理¶
代理物件使用弱引用回撥,以便在被垃圾回收時,它會從擁有其引用物件的管理器中登出自身。
當不再有任何代理引用共享物件時,該共享物件將從管理器程序中刪除。
程序池¶
可以使用 Pool
類建立一個程序池,該程序池將執行提交給它的任務。
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
一個程序池物件,它控制一個工作程序池,可以向該程序池提交作業。它支援帶有超時和回撥的非同步結果,並具有並行對映實現。
processes 是要使用的工作程序的數量。如果 processes 為
None
,則使用os.process_cpu_count()
返回的數字。如果 initializer 不為
None
,則每個工作程序在啟動時都會呼叫initializer(*initargs)
。maxtasksperchild 是一個工作程序在退出並被新的工作程序替換之前可以完成的任務數,以便釋放未使用的資源。預設的 maxtasksperchild 是
None
,這意味著工作程序將與池的生命週期一樣長。context 可用於指定用於啟動工作程序的上下文。通常,使用函式
multiprocessing.Pool()
或上下文物件的Pool()
方法建立池。在這兩種情況下,都會適當地設定 context。請注意,池物件的方法只能由建立該池的程序呼叫。
警告
multiprocessing.pool
物件具有內部資源,需要透過將池用作上下文管理器或手動呼叫close()
和terminate()
來進行適當的管理(像任何其他資源一樣)。否則可能會導致程序在最終化時掛起。請注意,不正確的是依賴垃圾回收器來銷燬池,因為 CPython 不保證會呼叫池的 finalizer(有關更多資訊,請參見
object.__del__()
)。在 3.2 版本中更改: 添加了 maxtasksperchild 引數。
在 3.4 版本中更改: 添加了 context 引數。
在 3.13 版本中更改: processes 預設使用
os.process_cpu_count()
,而不是os.cpu_count()
。注意
在
Pool
中的工作程序通常在其工作佇列的整個持續時間記憶體活。在其他系統中(如 Apache、mod_wsgi 等)中,為了釋放工作程序持有的資源,常見的一種模式是允許池中的工作程序僅完成一定量的工作,然後退出,進行清理,並生成一個新的程序來替換舊的程序。Pool
的 maxtasksperchild 引數向終端使用者公開了此功能。- apply(func[, args[, kwds]])¶
使用引數 args 和關鍵字引數 kwds 呼叫 func。它會阻塞,直到結果準備就緒。鑑於此會阻塞,
apply_async()
更適合並行執行工作。此外,func 僅在池的一個工作程序中執行。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
apply()
方法的變體,返回一個AsyncResult
物件。如果指定了 callback,則它應該是一個接受單個引數的可呼叫物件。當結果準備就緒時,callback 將應用於該結果,除非呼叫失敗,在這種情況下,將改為應用 error_callback。
如果指定了 error_callback,則它應該是一個接受單個引數的可呼叫物件。如果目標函式失敗,則將使用異常例項呼叫 error_callback。
回撥應立即完成,否則處理結果的執行緒將被阻塞。
- map(func, iterable[, chunksize])¶
是內建函式
map()
的並行等效函式(但它只支援一個可迭代引數,對於多個可迭代引數,請參見starmap()
)。它會阻塞直到結果準備就緒。此方法將可迭代物件分割成若干塊,然後將其作為單獨的任務提交給程序池。可以透過將 chunksize 設定為正整數來指定這些塊的(大致)大小。
請注意,對於非常長的可迭代物件,這可能會導致較高的記憶體使用率。為了獲得更好的效率,請考慮使用帶有顯式 chunksize 選項的
imap()
或imap_unordered()
。
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
是
map()
方法的一個變體,它返回一個AsyncResult
物件。如果指定了 callback,則它應該是一個接受單個引數的可呼叫物件。當結果準備就緒時,callback 將應用於該結果,除非呼叫失敗,在這種情況下,將改為應用 error_callback。
如果指定了 error_callback,則它應該是一個接受單個引數的可呼叫物件。如果目標函式失敗,則將使用異常例項呼叫 error_callback。
回撥應立即完成,否則處理結果的執行緒將被阻塞。
- imap(func, iterable[, chunksize])¶
是
map()
的惰性版本。chunksize 引數與
map()
方法使用的引數相同。對於非常長的可迭代物件,使用較大的 chunksize 值可以使作業完成速度快得多,而不是使用預設值1
。此外,如果 chunksize 為
1
,則imap()
方法返回的迭代器的next()
方法有一個可選的 timeout 引數:如果結果無法在 timeout 秒內返回,next(timeout)
將引發multiprocessing.TimeoutError
。
- imap_unordered(func, iterable[, chunksize])¶
與
imap()
相同,但返回的迭代器的結果順序應被認為是任意的。(只有當只有一個工作程序時,才能保證順序是“正確”的。)
- starmap(func, iterable[, chunksize])¶
與
map()
類似,只是 iterable 的元素預計是可迭代的,並且會被解包為引數。因此,
[(1,2), (3, 4)]
的 iterable 會產生[func(1,2), func(3,4)]
。3.3 版本中新增。
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
是
starmap()
和map_async()
的組合,它迭代 iterable 的可迭代物件,並使用解包後的可迭代物件呼叫 func。返回一個結果物件。3.3 版本中新增。
- close()¶
阻止向池提交任何更多工。一旦所有任務都已完成,工作程序將退出。
- terminate()¶
立即停止工作程序,而不完成未完成的工作。當池物件被垃圾回收時,將立即呼叫
terminate()
。
- join()¶
等待工作程序退出。必須在使用
join()
之前呼叫close()
或terminate()
。
在 3.3 版本中變更: 池物件現在支援上下文管理協議 – 請參見 上下文管理器型別。
__enter__()
返回池物件,__exit__()
呼叫terminate()
。
- class multiprocessing.pool.AsyncResult¶
是由
Pool.apply_async()
和Pool.map_async()
返回的結果的類。- get([timeout])¶
當結果到達時返回結果。如果 timeout 不是
None
並且結果沒有在 timeout 秒內到達,則會引發multiprocessing.TimeoutError
。如果遠端呼叫引發了異常,則該異常將由get()
重新引發。
- wait([timeout])¶
等待直到結果可用或直到 timeout 秒過去。
- ready()¶
返回呼叫是否已完成。
- successful()¶
返回呼叫是否在未引發異常的情況下完成。如果結果尚未就緒,則會引發
ValueError
異常。在 3.7 版本中變更: 如果結果尚未就緒,則會引發
ValueError
異常,而不是AssertionError
異常。
以下示例演示瞭如何使用程序池
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
監聽器和客戶端¶
通常,程序之間的訊息傳遞是使用佇列或使用由 Pipe()
返回的 Connection
物件完成的。
但是,multiprocessing.connection
模組提供了一些額外的靈活性。它基本上為處理套接字或 Windows 命名管道提供了一個高階的面向訊息的 API。它還支援使用 hmac
模組進行摘要認證,並支援同時輪詢多個連線。
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
向連線的另一端傳送一條隨機生成的訊息,並等待回覆。
如果回覆與使用 authkey 作為金鑰的訊息摘要匹配,則會向連線的另一端傳送歡迎訊息。否則,會引發
AuthenticationError
異常。
- multiprocessing.connection.answer_challenge(connection, authkey)¶
接收訊息,使用 authkey 作為金鑰計算訊息的摘要,然後將摘要傳送回去。
如果沒有收到歡迎訊息,則會引發
AuthenticationError
異常。
- multiprocessing.connection.Client(address[, family[, authkey]])¶
嘗試建立與使用地址 address 的監聽器的連線,返回一個
Connection
。連線的型別由 family 引數確定,但通常可以省略此引數,因為它通常可以從 address 的格式推斷出來。(請參閱 地址格式)
如果給定了 authkey 且不為
None
,則它應該是一個位元組字串,並將用作基於 HMAC 的身份驗證挑戰的金鑰。如果 authkey 為None
,則不進行身份驗證。如果身份驗證失敗,則會引發AuthenticationError
異常。請參閱 身份驗證金鑰。
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
繫結套接字或 Windows 命名管道的包裝器,它“監聽”連線。
address 是監聽器物件的繫結套接字或命名管道使用的地址。
注意
如果使用 '0.0.0.0' 的地址,則該地址在 Windows 上將不是可連線的端點。 如果需要可連線的端點,則應使用 '127.0.0.1'。
family 是要使用的套接字(或命名管道)的型別。它可以是字串
'AF_INET'
(用於 TCP 套接字)、'AF_UNIX'
(用於 Unix 域套接字) 或'AF_PIPE'
(用於 Windows 命名管道) 中的一個。 在這些中,只有第一個保證可用。 如果 family 為None
,則會從 address 的格式推斷出 family。 如果 address 也為None
,則會選擇預設值。 此預設值是假定為最快的可用 family。 請參閱 地址格式。請注意,如果 family 為'AF_UNIX'
且地址為None
,則將在使用tempfile.mkstemp()
建立的私有臨時目錄中建立套接字。如果監聽器物件使用套接字,則 backlog (預設為 1) 會在繫結後傳遞給套接字的
listen()
方法。如果給定了 authkey 且不為
None
,則它應該是一個位元組字串,並將用作基於 HMAC 的身份驗證挑戰的金鑰。如果 authkey 為None
,則不進行身份驗證。如果身份驗證失敗,則會引發AuthenticationError
異常。請參閱 身份驗證金鑰。- accept()¶
接受監聽器物件的繫結套接字或命名管道上的連線,並返回一個
Connection
物件。如果嘗試身份驗證失敗,則會引發AuthenticationError
異常。
- close()¶
關閉監聽器物件的繫結套接字或命名管道。當監聽器被垃圾回收時,會自動呼叫此方法。但是,建議顯式呼叫它。
監聽器物件具有以下只讀屬性
- address¶
監聽器物件正在使用的地址。
- last_accepted¶
上次接受的連線的來源地址。如果此地址不可用,則為
None
。
在 3.3 版本中變更: 監聽器物件現在支援上下文管理協議 – 請參閱 上下文管理器型別。
__enter__()
返回監聽器物件,而__exit__()
呼叫close()
。
- multiprocessing.connection.wait(object_list, timeout=None)¶
等待直到 object_list 中的物件準備就緒。返回 object_list 中準備就緒的物件的列表。如果 timeout 為浮點數,則呼叫最多會阻塞那麼多秒。如果 timeout 為
None
,則將無限期阻塞。負超時等效於零超時。對於 POSIX 和 Windows,如果物件滿足以下條件,則可以出現在 object_list 中:
一個可讀的
Connection
物件;一個已連線且可讀的
socket.socket
物件;或
當有資料可從連線或套接字物件讀取時,或者另一端已關閉時,該連線或套接字物件就緒。
POSIX:
wait(object_list, timeout)
幾乎等同於select.select(object_list, [], [], timeout)
。區別在於,如果select.select()
被訊號中斷,它可能會引發錯誤號為EINTR
的OSError
,而wait()
不會。Windows: object_list 中的項必須是一個可等待的整數控制代碼(根據 Win32 函式
WaitForMultipleObjects()
的文件定義),或者是一個具有fileno()
方法的物件,該方法返回套接字控制代碼或管道控制代碼。(請注意,管道控制代碼和套接字控制代碼不是可等待的控制代碼。)3.3 版本中新增。
示例
以下伺服器程式碼建立一個監聽器,它使用 'secret password'
作為身份驗證金鑰。然後它等待連線,並向客戶端傳送一些資料
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
以下程式碼連線到伺服器,並接收來自伺服器的一些資料
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
以下程式碼使用 wait()
來等待來自多個程序的訊息
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
地址格式¶
'AF_INET'
地址是一個(hostname, port)
形式的元組,其中 hostname 是一個字串,port 是一個整數。'AF_UNIX'
地址是一個表示檔案系統中檔名的字串。'AF_PIPE'
地址是r'\\.\pipe\PipeName'
形式的字串。要使用Client()
連線到名為 ServerName 的遠端計算機上的命名管道,應使用r'\\ServerName\pipe\PipeName'
形式的地址代替。
請注意,預設情況下,任何以兩個反斜槓開頭的字串都被假定為 'AF_PIPE'
地址,而不是 'AF_UNIX'
地址。
身份驗證金鑰¶
當使用 Connection.recv
時,接收到的資料會自動反序列化。不幸的是,從不受信任的來源反序列化資料存在安全風險。因此,Listener
和 Client()
使用 hmac
模組提供摘要身份驗證。
身份驗證金鑰是一個位元組字串,可以將其視為密碼:一旦建立連線,兩端都會要求證明另一端知道身份驗證金鑰。(證明兩端都使用相同的金鑰不涉及透過連線傳送金鑰。)
如果請求身份驗證但未指定身份驗證金鑰,則使用 current_process().authkey
的返回值(請參閱 Process
)。此值將自動被當前程序建立的任何 Process
物件繼承。這意味著(預設情況下)多程序程式的所有程序將共享一個身份驗證金鑰,該金鑰可用於在它們之間建立連線。
也可以使用 os.urandom()
生成合適的身份驗證金鑰。
日誌記錄¶
提供了一些日誌記錄支援。但是請注意,logging
包不使用程序共享鎖,因此來自不同程序的訊息可能會混淆(取決於處理程式型別)。
- multiprocessing.get_logger()¶
返回
multiprocessing
使用的記錄器。如有必要,將建立一個新的記錄器。首次建立時,記錄器的級別為
logging.NOTSET
,並且沒有預設的處理程式。預設情況下,傳送到此記錄器的訊息不會傳播到根記錄器。請注意,在 Windows 上,子程序只會繼承父程序記錄器的級別 - 記錄器的任何其他自定義設定都不會被繼承。
- multiprocessing.log_to_stderr(level=None)¶
此函式呼叫
get_logger()
,除了返回由 get_logger 建立的記錄器之外,它還新增一個處理程式,該處理程式使用格式'[%(levelname)s/%(processName)s] %(message)s'
將輸出傳送到sys.stderr
。可以透過傳遞level
引數來修改記錄器的levelname
。
以下是啟用日誌記錄的示例會話
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
有關日誌記錄級別的完整表格,請參閱 logging
模組。
multiprocessing.dummy
模組¶
multiprocessing.dummy
複製了 multiprocessing
的 API,但它只不過是 threading
模組的包裝器。
特別是,multiprocessing.dummy
提供的 Pool
函式返回 ThreadPool
的例項,它是 Pool
的子類,支援所有相同的方法呼叫,但使用工作執行緒池而不是工作程序。
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
執行緒池物件,用於控制可向其提交作業的工作執行緒池。
ThreadPool
例項與Pool
例項完全介面相容,並且它們的資源也必須得到妥善管理,可以透過將池用作上下文管理器,或者手動呼叫close()
和terminate()
來實現。processes 是要使用的工作執行緒數。如果 processes 為
None
,則使用os.process_cpu_count()
返回的數字。如果 initializer 不為
None
,則每個工作程序在啟動時都會呼叫initializer(*initargs)
。與
Pool
不同,maxtasksperchild 和 context 不能被提供。注意
ThreadPool
與Pool
共享相同的介面,後者是圍繞程序池設計的,並且早於concurrent.futures
模組的引入。因此,它繼承了一些對於由執行緒支援的池沒有意義的操作,並且它擁有自己的型別來表示非同步作業的狀態,AsyncResult
,這種型別不被任何其他庫理解。使用者通常應首選使用
concurrent.futures.ThreadPoolExecutor
,它具有從一開始就圍繞執行緒設計的更簡單的介面,並且返回與包括asyncio
在內的許多其他庫相容的concurrent.futures.Future
例項。
程式設計指南¶
使用 multiprocessing
時,應遵守某些指南和習慣用法。
所有啟動方法¶
以下內容適用於所有啟動方法。
避免共享狀態
應儘可能避免在程序之間移動大量資料。
最好堅持使用佇列或管道進行程序間通訊,而不是使用較低級別的同步原語。
可 Pickling 性
確保代理方法的引數是可 pickling 的。
代理的執行緒安全性
除非使用鎖保護,否則不要從多個執行緒使用代理物件。
(不同程序使用*相同*代理永遠不會有問題。)
加入殭屍程序
在 POSIX 上,當程序完成但未被加入時,它會變成殭屍。不應該有很多,因為每次啟動新程序(或呼叫
active_children()
時),所有尚未加入的已完成程序都將被加入。另外,呼叫已完成程序的Process.is_alive
將會加入該程序。即使如此,顯式加入你啟動的所有程序可能也是一個好習慣。
最好繼承而不是 pickle/unpickle
當使用 spawn 或 forkserver 啟動方法時,
multiprocessing
中的許多型別需要是可 pickling 的,以便子程序可以使用它們。但是,通常應避免使用管道或佇列將共享物件傳送到其他程序。相反,你應該安排程式,以便需要訪問在其他地方建立的共享資源的程序可以從祖先程序繼承它。
避免終止程序
使用
Process.terminate
方法停止程序可能會導致程序當前正在使用的任何共享資源(例如鎖、訊號量、管道和佇列)損壞或對其他程序不可用。因此,最好只考慮在從不使用任何共享資源的程序上使用
Process.terminate
。
加入使用佇列的程序
請記住,已將專案放入佇列的程序在終止之前會等待,直到“饋送器”執行緒將所有緩衝的專案饋送到基礎管道。(子程序可以呼叫佇列的
Queue.cancel_join_thread
方法來避免此行為。)這意味著,無論何時使用佇列,都需要確保在程序加入之前,最終會刪除已放入佇列中的所有專案。否則,你無法確定將專案放入佇列的程序是否會終止。還要記住,非守護程序將自動加入。
一個會發生死鎖的例子如下
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()這裡的解決方法是交換最後兩行(或直接刪除
p.join()
行)。
顯式將資源傳遞給子程序
在 POSIX 上,使用 fork 啟動方法,子程序可以使用全域性資源利用在父程序中建立的共享資源。但是,最好將該物件作為引數傳遞給子程序的建構函式。
除了使程式碼(可能)與 Windows 和其他啟動方法相容之外,這還可以確保只要子程序仍然處於活動狀態,該物件就不會在父程序中被垃圾回收。如果某些資源在父程序中被垃圾回收時被釋放,這可能很重要。
因此例如
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()應該重寫為
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
注意將 sys.stdin
替換為“類似檔案的物件”
multiprocessing
最初無條件呼叫os.close(sys.stdin.fileno())在
multiprocessing.Process._bootstrap()
方法中——這導致了程序內程序的問題。這已更改為sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)這解決了程序相互衝突導致檔案描述符錯誤的根本問題,但是給將
sys.stdin()
替換為帶有輸出緩衝的“類檔案物件”的應用程式帶來了潛在的危險。這種危險是,如果多個程序在此類檔案物件上呼叫close()
,則可能導致相同的資料多次重新整理到該物件,從而導致損壞。如果你編寫了一個類檔案物件並實現了自己的快取,則可以透過在每次新增到快取時儲存 pid,並在 pid 更改時丟棄快取來使其對 fork 安全。例如
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
spawn 和 forkserver 啟動方法¶
有一些額外的限制不適用於 fork 啟動方法。
更多的可 Pickling 性
確保
Process.__init__()
的所有引數都是可 pickling 的。另外,如果你子類化Process
,請確保在呼叫Process.start
方法時,例項將是可 pickling 的。
全域性變數
請記住,如果在子程序中執行的程式碼嘗試訪問全域性變數,則它看到的值(如果有)可能與呼叫
Process.start
時父程序中的值不同。但是,只是模組級常量的全域性變數不會引起問題。
安全匯入主模組
確保主模組可以被新的 Python 直譯器安全地匯入,而不會引起意外的副作用(例如啟動新程序)。
例如,使用 spawn 或 forkserver 啟動方法執行以下模組將失敗,並出現
RuntimeError
from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()而是應該使用
if __name__ == '__main__':
來保護程式的“入口點”,如下所示:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(如果程式是正常執行而不是被凍結,則可以省略
freeze_support()
行。)這允許新生成的 Python 直譯器安全地匯入模組,然後執行模組的
foo()
函式。如果在主模組中建立了池(pool)或管理器(manager),則適用類似的限制。
示例¶
演示如何建立和使用自定義的管理器和代理
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
使用 Pool
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
一個示例,展示如何使用佇列來向一組工作程序提供任務並收集結果
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()