multiprocessing.shared_memory — 用於跨程序直接訪問的共享記憶體

原始碼: Lib/multiprocessing/shared_memory.py

在 3.8 版本加入。


此模組提供了一個類 SharedMemory,用於分配和管理多核或對稱多處理器 (SMP) 機器上一個或多個程序可訪問的共享記憶體。為了協助共享記憶體的生命週期管理,尤其是在不同程序之間,BaseManager 的一個子類 SharedMemoryManager 也包含在 multiprocessing.managers 模組中。

在此模組中,共享記憶體指的是“POSIX 風格”的共享記憶體塊(儘管不一定明確地如此實現),而不是指“分散式共享記憶體”。這種風格的共享記憶體允許不同的程序可能讀取和寫入易失記憶體的公共(或共享)區域。程序通常僅限於訪問其自己的程序記憶體空間,但共享記憶體允許在程序之間共享資料,從而避免了在程序之間傳送包含該資料的訊息的需要。與透過磁碟、套接字或其他需要資料序列化/反序列化和複製的通訊方式共享資料相比,透過記憶體直接共享資料可以提供顯著的效能優勢。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0, *, track=True)

建立 SharedMemory 類的例項,用於建立新的共享記憶體塊或附加到現有的共享記憶體塊。每個共享記憶體塊都分配有一個唯一的名稱。透過這種方式,一個程序可以使用特定名稱建立一個共享記憶體塊,而另一個程序可以使用相同的名稱附加到該共享記憶體塊。

作為跨程序共享資料的資源,共享記憶體塊的生命週期可能長於建立它們的原始程序。當一個程序不再需要訪問某個共享記憶體塊(但其他程序可能仍需要)時,應呼叫 close() 方法。當任何程序都不再需要某個共享記憶體塊時,應呼叫 unlink() 方法以確保正確清理。

引數:
  • name (str | None) – 所請求共享記憶體的唯一名稱,指定為字串。當建立新的共享記憶體塊時,如果為名稱提供 None (預設值),將生成一個新名稱。

  • create (bool) – 控制是建立新的共享記憶體塊 (True) 還是附加到現有的共享記憶體塊 (False)。

  • size (int) – 建立新的共享記憶體塊時請求的位元組數。由於某些平臺選擇根據其記憶體頁大小分配記憶體塊,因此共享記憶體塊的實際大小可能大於或等於請求的大小。當附加到現有共享記憶體塊時,將忽略 size 引數。

  • track (bool) – 當為 True 時,在作業系統不自動執行此操作的平臺上,將共享記憶體塊註冊到資源跟蹤器程序。資源跟蹤器確保即使所有其他有權訪問記憶體的程序在未執行此操作的情況下退出,也能正確清理共享記憶體。使用 multiprocessing 設施從共同祖先建立的 Python 程序共享一個資源跟蹤器程序,並且共享記憶體段的生命週期在這些程序之間自動處理。以任何其他方式建立的 Python 程序在啟用 track 訪問共享記憶體時將獲得自己的資源跟蹤器。這將導致共享記憶體被第一個終止的程序的資源跟蹤器刪除。為避免此問題,當已存在另一個程序執行簿記時,subprocess 或獨立 Python 程序的使用者應將 track 設定為 Falsetrack 在 Windows 上被忽略,Windows 有自己的跟蹤機制,並在所有控制代碼都關閉時自動刪除共享記憶體。

3.13 版本中的變化: 添加了 track 引數。

close()

關閉此例項到共享記憶體的檔案描述符/控制代碼。一旦不再需要從此例項訪問共享記憶體塊,就應呼叫 close()。根據作業系統,即使所有控制代碼都已關閉,底層記憶體也可能不會被釋放。為確保正確清理,請使用 unlink() 方法。

刪除底層共享記憶體塊。每個共享記憶體塊只能呼叫一次此方法,無論有多少控制代碼指向它,即使在其他程序中也是如此。unlink()close() 可以按任意順序呼叫,但在 unlink() 之後嘗試訪問共享記憶體塊中的資料可能會導致記憶體訪問錯誤,具體取決於平臺。

此方法在 Windows 上無效,在 Windows 上,刪除共享記憶體塊的唯一方法是關閉所有控制代碼。

buf

共享記憶體塊內容的 memoryview。

name

對共享記憶體塊唯一名稱的只讀訪問。

size

對共享記憶體塊大小(以位元組為單位)的只讀訪問。

以下示例演示了 SharedMemory 例項的低階使用

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

以下示例演示了 SharedMemory 類與 NumPy 陣列 的實際應用,從兩個不同的 Python shell 訪問相同的 numpy.ndarray

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])

multiprocessing.managers.BaseManager 的子類,可用於管理跨程序的共享記憶體塊。

SharedMemoryManager 例項呼叫 start() 會啟動一個新程序。此新程序的唯一目的是管理透過它建立的所有共享記憶體塊的生命週期。要觸發釋放由該程序管理的所有共享記憶體塊,請在該例項上呼叫 shutdown()。這會觸發對由該程序管理的所有 SharedMemory 物件呼叫 unlink(),然後停止該程序本身。透過 SharedMemoryManager 建立 SharedMemory 例項,我們避免了手動跟蹤和觸發釋放共享記憶體資源的需要。

此類提供了建立和返回 SharedMemory 例項以及建立由共享記憶體支援的類列表物件 (ShareableList) 的方法。

請參閱 BaseManager 以瞭解繼承的 addressauthkey 可選輸入引數的描述以及它們如何用於從其他程序連線到現有 SharedMemoryManager 服務。

SharedMemory(size)

建立一個新的 SharedMemory 物件並返回它,其大小為指定的 size 位元組。

ShareableList(sequence)

建立一個新的 ShareableList 物件並返回它,該物件由輸入 sequence 中的值初始化。

以下示例演示了 SharedMemoryManager 的基本機制

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

以下示例描述了一種使用 SharedMemoryManager 物件更方便的模式,透過 with 語句確保所有共享記憶體塊在不再需要時都被釋放

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

當在 with 語句中使用 SharedMemoryManager 時,使用該管理器建立的所有共享記憶體塊都會在 with 語句的程式碼塊執行完畢時被釋放。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

提供一個可變類列表物件,其中儲存的所有值都儲存在共享記憶體塊中。這限制了可儲存值為以下內建資料型別

  • int(有符號 64 位)

  • 浮點數

  • bool

  • str(編碼為 UTF-8 時每個小於 10M 位元組)

  • bytes(每個小於 10M 位元組)

  • None

它還與內建的 list 型別顯著不同,這些列表不能改變其總長度(例如不支援 append()insert() 等),並且不支援透過切片動態建立新的 ShareableList 例項。

sequence 用於填充新的 ShareableList 值。設定為 None 則透過其唯一的共享記憶體名稱附加到已存在的 ShareableList

name 是請求共享記憶體的唯一名稱,如 SharedMemory 的定義中所述。當附加到現有 ShareableList 時,請指定其共享記憶體塊的唯一名稱,同時將 sequence 設定為 None

備註

對於 bytesstr 值存在一個已知問題。如果它們以 \x00 空位元組或字元結尾,當透過索引從 ShareableList 獲取它們時,這些位元組或字元可能會被 靜默刪除。這種 .rstrip(b'\x00') 行為被認為是一個 bug,將來可能會消失。請參閱 gh-106939

對於尾部空值截斷會成為問題的應用程式,可以透過在儲存此類值時始終無條件地在其末尾附加一個額外的非 0 位元組並在獲取時無條件地將其移除來解決此問題

>>> from multiprocessing import shared_memory
>>> nul_bug_demo = shared_memory.ShareableList(['?\x00', b'\x03\x02\x01\x00\x00\x00'])
>>> nul_bug_demo[0]
'?'
>>> nul_bug_demo[1]
b'\x03\x02\x01'
>>> nul_bug_demo.shm.unlink()
>>> padded = shared_memory.ShareableList(['?\x00\x07', b'\x03\x02\x01\x00\x00\x00\x07'])
>>> padded[0][:-1]
'?\x00'
>>> padded[1][:-1]
b'\x03\x02\x01\x00\x00\x00'
>>> padded.shm.unlink()
count(value)

返回 value 出現的次數。

index(value)

返回 value 的第一個索引位置。如果 value 不存在,則引發 ValueError

format

只讀屬性,包含所有當前儲存值使用的 struct 打包格式。

shm

儲存值的 SharedMemory 例項。

以下示例演示了 ShareableList 例項的基本用法

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

以下示例描述了透過提供其背後共享記憶體塊的名稱,一個、兩個或多個程序如何訪問相同的 ShareableList

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

以下示例演示了 ShareableList(以及底層 SharedMemory)物件在需要時可以進行 pickle 和 unpickle。請注意,它仍將是相同的共享物件。之所以會這樣,是因為反序列化的物件具有相同的唯一名稱,並且只是附加到具有相同名稱的現有物件(如果該物件仍然存在)

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()