concurrent.interpreters --- 在同一程序中實現多直譯器

在 3.14 版本加入。

原始碼: Lib/concurrent/interpreters


concurrent.interpreters 模組在底層的 _interpreters 模組之上構建了更高級別的介面。

該模組主要旨在提供一個基本的 API,用於管理直譯器(也稱為“子直譯器”)並在其中執行程式碼。執行操作主要涉及切換到某個直譯器(在當前執行緒中),並呼叫該執行上下文中的函式。

對於併發性,直譯器本身(以及本模組)除了隔離之外,並沒有提供更多功能,而隔離本身作用不大。實際的併發性可透過 執行緒 單獨獲得。請參閱下文

參見

InterpreterPoolExecutor

在一個熟悉的介面中將執行緒與直譯器結合起來。

隔離擴充套件模組

如何更新擴充套件模組以支援多直譯器

PEP 554

PEP 734

PEP 684

可用性:非 WASI。

此模組在 WebAssembly 上不起作用或不可用。有關更多資訊,請參閱 WebAssembly 平臺

關鍵細節

在我們深入探討之前,關於使用多直譯器,有幾個關鍵細節需要牢記:

  • 預設情況下是隔離的

  • 無隱式執行緒

  • 並非所有 PyPI 包都支援在多直譯器中使用

引言

“直譯器”實際上是 Python 執行時的執行上下文。它包含了執行時執行程式所需的所有狀態。這包括匯入狀態和內建函式等。(每個執行緒,即使只有一個主執行緒,除了當前的直譯器外,還有一些與當前異常和位元組碼求值迴圈相關的額外執行時狀態。)

直譯器的概念和功能自 Python 2.2 版以來就已存在,但該功能僅透過 C-API 提供且鮮為人知,並且其隔離性在 3.12 版之前相對不完整。

多直譯器與隔離

一個 Python 實現可能支援在同一程序中使用多個直譯器。CPython 就支援此功能。每個直譯器實際上都與其他直譯器隔離(除了一些經過精心管理的程序全域性例外情況)。

這種隔離主要用於程式中不同邏輯元件之間的強分離,當您希望仔細控制這些元件如何互動時,這種隔離尤其有用。

備註

同一程序中的直譯器在技術上永遠無法彼此嚴格隔離,因為同一程序內的記憶體訪問限制很少。Python 執行時會盡力實現隔離,但擴充套件模組可能會輕易破壞這種隔離。因此,在安全敏感的情況下,不應使用多直譯器,因為它們不應該能夠訪問彼此的資料。

在直譯器中執行

在不同的直譯器中執行涉及在當前執行緒中切換到該直譯器,然後呼叫某個函式。執行時將使用當前直譯器的狀態來執行該函式。concurrent.interpreters 模組提供了一個基本的 API,用於建立和管理直譯器,以及執行切換並呼叫的操作。

該操作不會自動啟動其他執行緒。不過,有一個輔助函式可以實現此功能。還有一個專門的輔助函式,用於在直譯器中呼叫內建的 exec()

當在直譯器中呼叫 exec()(或 eval())時,它們會使用該直譯器的 __main__ 模組作為“全域性”名稱空間。對於不與任何模組關聯的函式也是如此。這與從命令列呼叫的指令碼在 __main__ 模組中執行的方式相同。

併發與並行

如前所述,直譯器本身不提供任何併發性。它們嚴格代表執行時將*在當前執行緒中*使用的隔離執行上下文。這種隔離使它們類似於程序,但它們仍然享有程序內的高效性,就像執行緒一樣。

儘管如此,直譯器確實自然地支援某些型別的併發性。這種隔離有一個強大的副作用。它實現了一種與非同步或執行緒不同的併發方法。這是一種類似於 CSP 或 actor 模型的併發模型,這種模型相對容易理解。

你可以在單個執行緒中利用這種併發模型,在直譯器之間來回切換,類似於 Stackless 的風格。然而,當您將直譯器與多個執行緒結合使用時,這種模型更為有用。這主要涉及啟動一個新執行緒,在該執行緒中切換到另一個直譯器並在那裡執行您想要的程式碼。

Python 中的每個實際執行緒,即使您只在主執行緒中執行,都有其自己的*當前*執行上下文。多個執行緒可以使用同一個直譯器,也可以使用不同的直譯器。

從高層次來看,您可以將執行緒和直譯器的組合視為具有可選共享功能的執行緒。

一個顯著的好處是,直譯器是充分隔離的,它們不共享GIL,這意味著將執行緒與多個直譯器結合可以實現完全的多核並行。(自 Python 3.12 以來就是如此。)

直譯器間通訊

實際上,只有當我們有辦法在多個直譯器之間進行通訊時,它們才有用。這通常涉及某種形式的訊息傳遞,但甚至可以意味著以某種精心管理的方式共享資料。

考慮到這一點,concurrent.interpreters 模組提供了一個 queue.Queue 的實現,可透過 create_queue() 獲得。

“共享”物件

任何在直譯器之間實際共享的資料都會失去由 GIL 提供的執行緒安全性。在擴充套件模組中有多種方法可以處理這個問題。然而,從 Python 程式碼的角度來看,缺乏執行緒安全性意味著物件實際上無法被共享,只有少數例外。相反,必須建立一個副本,這意味著可變物件將不會保持同步。

預設情況下,大多數物件在傳遞給另一個直譯器時會透過 pickle 進行復制。幾乎所有的不可變內建物件要麼是直接共享的,要麼是高效複製的。例如:

有少數 Python 型別確實在直譯器之間共享可變資料:

參考

該模組定義了以下函式:

concurrent.interpreters.list_all()

返回一個包含 Interpreter 物件的 list,每個物件對應一個現有的直譯器。

concurrent.interpreters.get_current()

返回當前執行的直譯器的 Interpreter 物件。

concurrent.interpreters.get_main()

返回主直譯器的 Interpreter 物件。這是執行時建立的用於執行 REPL 或命令列中給定指令碼的直譯器。它通常是唯一的直譯器。

concurrent.interpreters.create()

初始化一個新的(空閒的)Python 直譯器,併為其返回一個 Interpreter 物件。

concurrent.interpreters.create_queue()

初始化一個新的跨直譯器佇列,併為其返回一個 Queue 物件。

直譯器物件

class concurrent.interpreters.Interpreter(id)

當前程序中的單個直譯器。

通常情況下,不應直接呼叫 Interpreter。而是應該使用 create() 或其他模組函式。

id

(只讀)

底層直譯器的 ID。

whence

(只讀)

一個描述直譯器來源的字串。

is_running()

如果直譯器當前正在其 __main__ 模組中執行程式碼,則返回 True,否則返回 False

close()

終結並銷燬直譯器。

prepare_main(ns=None, **kwargs)

在直譯器的 __main__ 模組中繫結物件。

一些物件是實際共享的,一些是高效複製的,但大多數是透過 pickle 複製的。請參閱 “共享”物件

exec(code, /, dedent=True)

在直譯器中執行給定的原始碼(在當前執行緒中)。

call(callable, /, *args, **kwargs)

返回在直譯器中執行給定函式的結果(在當前執行緒中)。

call_in_thread(callable, /, *args, **kwargs)

在直譯器中執行給定的函式(在一個新執行緒中)。

異常

exception concurrent.interpreters.InterpreterError

這個異常是 Exception 的子類,在發生與直譯器相關的錯誤時引發。

exception concurrent.interpreters.InterpreterNotFoundError

這個異常是 InterpreterError 的子類,在目標直譯器不存在時引發。

exception concurrent.interpreters.ExecutionFailed

這個異常是 InterpreterError 的子類,在執行的程式碼引發未捕獲的異常時引發。

excinfo

在另一個直譯器中引發的異常的基本快照。

exception concurrent.interpreters.NotShareableError

這個異常是 TypeError 的子類,當一個物件無法傳送到另一個直譯器時引發。

直譯器間通訊

class concurrent.interpreters.Queue(id)

一個底層跨直譯器佇列的包裝器,它實現了 queue.Queue 介面。底層的佇列只能透過 create_queue() 建立。

一些物件是實際共享的,一些是高效複製的,但大多數是透過 pickle 複製的。請參閱 “共享”物件

id

(只讀)

佇列的 ID。

exception concurrent.interpreters.QueueEmptyError

這個異常是 queue.Empty 的子類,當佇列為空時,會由 Queue.get()Queue.get_nowait() 引發。

exception concurrent.interpreters.QueueFullError

這個異常是 queue.Full 的子類,當佇列已滿時,會由 Queue.put()Queue.put_nowait() 引發。

基本用法

建立一個直譯器並在其中執行程式碼

from concurrent import interpreters

interp = interpreters.create()

# Run in the current OS thread.

interp.exec('print("spam!")')

interp.exec("""if True:
    print('spam!')
    """)

from textwrap import dedent
interp.exec(dedent("""
    print('spam!')
    """))

def run(arg):
    return arg

res = interp.call(run, 'spam!')
print(res)

def run():
    print('spam!')

interp.call(run)

# Run in new OS thread.

t = interp.call_in_thread(run)
t.join()