multiprocessing.shared_memory — 跨程序直接訪問的共享記憶體

原始碼: Lib/multiprocessing/shared_memory.py

3.8 版本加入。


該模組提供了一個類 SharedMemory,用於分配和管理共享記憶體,以便在多核或對稱多處理器 (SMP) 機器上被一個或多個程序訪問。為了幫助管理共享記憶體的生命週期,尤其是在不同的程序之間,還提供了 BaseManager 的子類 SharedMemoryManager,該子類在 multiprocessing.managers 模組中。

在此模組中,共享記憶體是指“POSIX 風格”的共享記憶體塊(儘管不一定明確地以此方式實現),而不是指“分散式共享記憶體”。這種風格的共享記憶體允許不同的程序潛在地讀寫易失性記憶體的公共(或共享)區域。程序通常被限制為只能訪問它們自己的程序記憶體空間,但共享記憶體允許在程序之間共享資料,從而避免了在程序之間傳送包含該資料的訊息的需要。與透過磁碟或套接字或其他需要序列化/反序列化和複製資料的通訊方式共享資料相比,透過記憶體直接共享資料可以提供顯著的效能優勢。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0, *, track=True)

建立 SharedMemory 類的例項,用於建立新的共享記憶體塊或附加到現有的共享記憶體塊。每個共享記憶體塊都分配有一個唯一的名稱。透過這種方式,一個程序可以使用特定的名稱建立一個共享記憶體塊,而另一個程序可以使用相同的名稱附加到該共享記憶體塊。

作為跨程序共享資料的資源,共享記憶體塊的生存時間可能比建立它們的原始程序更長。當一個程序不再需要訪問其他程序可能仍然需要的共享記憶體塊時,應該呼叫 close() 方法。當任何程序都不再需要共享記憶體塊時,應呼叫 unlink() 方法以確保正確清理。

引數:
  • name (str | None) – 請求的共享記憶體的唯一名稱,指定為字串。當建立新的共享記憶體塊時,如果為名稱提供 None(預設值),則會生成一個新的名稱。

  • create (bool) – 控制是建立新的共享記憶體塊 (True) 還是附加到現有的共享記憶體塊 (False)。

  • size (int) – 建立新的共享記憶體塊時請求的位元組數。由於某些平臺選擇根據該平臺的記憶體頁面大小分配記憶體塊,因此共享記憶體塊的實際大小可能大於或等於請求的大小。當附加到現有的共享記憶體塊時,將忽略 size 引數。

  • track (bool) – 當 True 時,在作業系統不自動執行此操作的平臺上,使用資源跟蹤器程序註冊共享記憶體塊。即使所有其他有權訪問記憶體的程序在沒有這樣做的情況下退出,資源跟蹤器也可以確保正確清理共享記憶體。使用 multiprocessing 工具從公共祖先建立的 Python 程序共享一個資源跟蹤器程序,並且這些程序之間會自動處理共享記憶體段的生命週期。當啟用 track 時,以任何其他方式建立的 Python 程序都將收到它們自己的資源跟蹤器。這將導致共享記憶體被第一個終止的程序的資源跟蹤器刪除。為了避免此問題,當已經有另一個程序負責記賬時,subprocess 的使用者或獨立的 Python 程序應將 track 設定為 False。在 Windows 上會忽略 track,Windows 有自己的跟蹤,並且當所有控制代碼都關閉時會自動刪除共享記憶體。

在 3.13 版本中更改: 添加了 track 引數。

close()

從此例項關閉到共享記憶體的檔案描述符/控制代碼。一旦不再需要從此例項訪問共享記憶體塊,就應該呼叫 close()。根據作業系統,即使所有控制代碼都已關閉,底層記憶體也可能不會被釋放。為了確保正確清理,請使用 unlink() 方法。

刪除底層的共享記憶體塊。無論有多少控制代碼(即使在其他程序中),每個共享記憶體塊都應只調用一次此方法。unlink()close() 可以以任何順序呼叫,但在 unlink() 之後嘗試訪問共享記憶體塊中的資料可能會導致記憶體訪問錯誤,具體取決於平臺。

此方法在 Windows 上不起作用,在 Windows 上刪除共享記憶體塊的唯一方法是關閉所有控制代碼。

buf

共享記憶體塊內容的記憶體檢視。

name

對共享記憶體塊的唯一名稱的只讀訪問。

size

對共享記憶體塊的位元組大小的只讀訪問。

以下示例演示了 SharedMemory 例項的底層用法

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

以下示例演示了 SharedMemory 類與 NumPy 陣列 的實際用法,從兩個不同的 Python shell 訪問相同的 numpy.ndarray

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])

multiprocessing.managers.BaseManager 的子類,可用於跨程序管理共享記憶體塊。

呼叫 start() 方法會在 SharedMemoryManager 例項上啟動一個新程序。這個新程序的唯一目的是管理透過它建立的所有共享記憶體塊的生命週期。要觸發釋放該程序管理的所有共享記憶體塊,請呼叫該例項上的 shutdown() 方法。這將觸發對該程序管理的所有 SharedMemory 物件呼叫 unlink() 方法,然後停止程序本身。透過 SharedMemoryManager 建立 SharedMemory 例項,可以避免手動跟蹤和觸發共享記憶體資源的釋放。

此類提供了建立和返回 SharedMemory 例項以及建立由共享記憶體支援的類似列表的物件 (ShareableList) 的方法。

有關繼承的 *address* 和 *authkey* 可選輸入引數的說明以及如何使用它們從其他程序連線到現有的 SharedMemoryManager 服務,請參閱 BaseManager

SharedMemory(size)

建立並返回一個新的 SharedMemory 物件,其大小為指定的 *size* (以位元組為單位)。

ShareableList(sequence)

建立並返回一個新的 ShareableList 物件,該物件由輸入 *sequence* 中的值初始化。

以下示例演示了 SharedMemoryManager 的基本機制

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

以下示例描述了使用 SharedMemoryManager 物件的一種可能更方便的模式,透過 with 語句來確保所有共享記憶體塊在不再需要後被釋放

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

with 語句中使用 SharedMemoryManager 時,使用該管理器建立的共享記憶體塊將在 with 語句的程式碼塊執行完畢後全部釋放。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

提供一個可變的類似列表的物件,其中儲存的所有值都儲存在共享記憶體塊中。這會將可儲存的值限制為以下內建資料型別

  • int (有符號 64 位)

  • float

  • bool

  • str (當編碼為 UTF-8 時,每個小於 10M 位元組)

  • bytes (每個小於 10M 位元組)

  • None

它與內建的 list 型別也明顯不同,因為這些列表無法更改其總長度(即,沒有 append()insert() 等),並且不支援透過切片動態建立新的 ShareableList 例項。

sequence 用於填充包含值的新 ShareableList。設定為 None 以透過其唯一的共享記憶體名稱附加到已存在的 ShareableList

name 是請求的共享記憶體的唯一名稱,如 SharedMemory 的定義中所述。在附加到現有的 ShareableList 時,請指定其共享記憶體塊的唯一名稱,同時將 *sequence* 設定為 None

注意

對於 bytesstr 值存在已知問題。如果它們以 \x00 空位元組或字元結尾,則在透過索引從 ShareableList 中獲取它們時,這些位元組或字元可能會被靜默刪除。這種 .rstrip(b'\x00') 行為被認為是錯誤,將來可能會消失。請參閱 gh-106939

對於尾隨空值的刪除是一個問題的應用程式,透過始終無條件地在儲存此類值時在其末尾附加一個額外的非 0 位元組,並在獲取時無條件地刪除它來解決此問題

>>> from multiprocessing import shared_memory
>>> nul_bug_demo = shared_memory.ShareableList(['?\x00', b'\x03\x02\x01\x00\x00\x00'])
>>> nul_bug_demo[0]
'?'
>>> nul_bug_demo[1]
b'\x03\x02\x01'
>>> nul_bug_demo.shm.unlink()
>>> padded = shared_memory.ShareableList(['?\x00\x07', b'\x03\x02\x01\x00\x00\x00\x07'])
>>> padded[0][:-1]
'?\x00'
>>> padded[1][:-1]
b'\x03\x02\x01\x00\x00\x00'
>>> padded.shm.unlink()
count(value)

返回 *value* 出現的次數。

index(value)

返回 *value* 的第一個索引位置。如果不存在 *value*,則引發 ValueError

format

只讀屬性,包含所有當前儲存的值使用的 struct 打包格式。

shm

儲存值的 SharedMemory 例項。

以下示例演示了 ShareableList 例項的基本用法

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

以下示例描述了一個、兩個或多個程序如何透過提供其背後的共享記憶體塊的名稱來訪問相同的 ShareableList

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

以下示例演示瞭如果需要,可以 pickle 和 unpickle ShareableList(和底層 SharedMemory)物件。 請注意,它仍然是同一個共享物件。之所以會這樣,是因為反序列化的物件具有相同的唯一名稱,並且只是附加到具有相同名稱的現有物件(如果該物件仍然存在)

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()