事件迴圈¶
原始碼: Lib/asyncio/events.py, Lib/asyncio/base_events.py
序言
事件迴圈是每個 asyncio 應用程式的核心。事件迴圈執行非同步任務和回撥,執行網路 IO 操作,並執行子程序。
應用程式開發人員通常應使用高階 asyncio 函式,例如 asyncio.run()
,並且很少需要引用迴圈物件或呼叫其方法。本節主要針對需要對事件迴圈行為進行更精細控制的底層程式碼、庫和框架的作者。
獲取事件迴圈
可以使用以下底層函式來獲取、設定或建立事件迴圈
- asyncio.get_running_loop()¶
返回當前作業系統執行緒中正在執行的事件迴圈。
如果沒有正在執行的事件迴圈,則引發
RuntimeError
。此函式只能從協程或回撥中呼叫。
3.7 版本新增。
- asyncio.get_event_loop()¶
獲取當前事件迴圈。
從協程或回撥(例如,使用 call_soon 或類似的 API 排程)呼叫時,此函式將始終返回正在執行的事件迴圈。
如果沒有設定正在執行的事件迴圈,該函式將返回
get_event_loop_policy().get_event_loop()
呼叫的結果。由於此函式具有相當複雜的行為(尤其是在使用自定義事件迴圈策略時),因此在協程和回撥中,優先使用
get_running_loop()
函式而不是get_event_loop()
。如上所述,考慮使用更高級別的
asyncio.run()
函式,而不是使用這些較低級別的函式手動建立和關閉事件迴圈。自 3.12 版本棄用: 如果當前沒有事件迴圈,則會發出棄用警告。在未來的某些 Python 版本中,這將變成錯誤。
- asyncio.set_event_loop(loop)¶
將 loop 設定為當前作業系統執行緒的當前事件迴圈。
- asyncio.new_event_loop()¶
建立並返回一個新的事件迴圈物件。
請注意,get_event_loop()
, set_event_loop()
, 和 new_event_loop()
函式的行為可以透過 設定自定義事件迴圈策略 來改變。
目錄
此文件頁面包含以下部分
事件迴圈方法 部分是事件迴圈 API 的參考文件;
回撥控制代碼 部分記錄了從諸如
loop.call_soon()
和loop.call_later()
等排程方法返回的Handle
和TimerHandle
例項;伺服器物件 部分記錄了從諸如
loop.create_server()
等事件迴圈方法返回的型別;事件迴圈實現 部分記錄了
SelectorEventLoop
和ProactorEventLoop
類;示例 部分展示瞭如何使用一些事件迴圈 API。
事件迴圈方法¶
事件迴圈具有以下底層 API
執行和停止迴圈¶
- loop.run_until_complete(future)¶
執行直到 future (
Future
的例項)完成。如果引數是 協程物件,則會隱式地排程為
asyncio.Task
執行。返回 Future 的結果或引發其異常。
- loop.run_forever()¶
執行事件迴圈,直到呼叫
stop()
。如果在呼叫
run_forever()
之前呼叫了stop()
,則迴圈將以零超時輪詢 I/O 選擇器一次,執行所有響應 I/O 事件排程的回撥(以及那些已經排程的回撥),然後退出。如果在
run_forever()
執行時呼叫了stop()
,則迴圈將運行當前批次的回撥然後退出。請注意,在這種情況下,回撥排程的新的回撥將不會執行;相反,它們將在下次呼叫run_forever()
或run_until_complete()
時執行。
- loop.stop()¶
停止事件迴圈。
- loop.is_running()¶
如果事件迴圈當前正在執行,則返回
True
。
- loop.is_closed()¶
如果事件迴圈已關閉,則返回
True
。
- loop.close()¶
關閉事件迴圈。
呼叫此函式時,迴圈不得正在執行。任何掛起的回撥都將被丟棄。
此方法清除所有佇列並關閉執行器,但不等待執行器完成。
此方法是冪等的且不可逆的。事件迴圈關閉後,不應呼叫其他方法。
- 協程 loop.shutdown_asyncgens()¶
安排所有當前開啟的 非同步生成器 物件透過
aclose()
呼叫關閉。呼叫此方法後,如果迭代新的非同步生成器,事件迴圈將發出警告。這應該用於可靠地完成所有計劃的非同步生成器。請注意,當使用
asyncio.run()
時,無需呼叫此函式。示例
try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
在 3.6 版本中新增。
- 協程 loop.shutdown_default_executor(timeout=None)¶
安排關閉預設執行器,並等待它加入
ThreadPoolExecutor
中的所有執行緒。一旦呼叫此方法,使用loop.run_in_executor()
的預設執行器將引發RuntimeError
。timeout 引數指定執行器完成加入所允許的時間量(以
float
秒為單位)。 使用預設值None
時,允許執行器無限量的時間。如果達到 timeout,則會發出
RuntimeWarning
,並且預設執行器會終止,而無需等待其執行緒完成加入。注意
使用
asyncio.run()
時不要呼叫此方法,因為後者會自動處理預設執行器的關閉。在 3.9 版本中新增。
在 3.12 版本中更改: 添加了 timeout 引數。
排程回撥¶
- loop.call_soon(callback, *args, context=None)¶
安排 callback 回撥 在事件迴圈的下一次迭代中與 args 引數一起呼叫。
返回
asyncio.Handle
的例項,該例項可以稍後用於取消回撥。回撥按照註冊的順序呼叫。每個回撥將精確呼叫一次。
可選的僅關鍵字 context 引數指定 callback 在其中執行的自定義
contextvars.Context
。如果未提供 context,回撥將使用當前上下文。與
call_soon_threadsafe()
不同,此方法不是執行緒安全的。
- loop.call_soon_threadsafe(callback, *args, context=None)¶
call_soon()
的執行緒安全變體。從另一個執行緒排程回撥時,必須使用此函式,因為call_soon()
不是執行緒安全的。如果在已關閉的迴圈上呼叫,則引發
RuntimeError
。這可能會在主應用程式關閉時在輔助執行緒上發生。請參閱文件的 併發和多執行緒 部分。
在 3.7 版本中更改: 添加了僅關鍵字 context 引數。有關詳細資訊,請參閱 PEP 567。
注意
大多數 asyncio
排程函式不允許傳遞關鍵字引數。為此,請使用 functools.partial()
# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))
使用 partial 物件通常比使用 lambdas 更方便,因為 asyncio 可以在除錯和錯誤訊息中更好地呈現 partial 物件。
排程延遲迴調¶
事件迴圈提供了用於排程回撥函式在未來某個時間點呼叫的機制。事件迴圈使用單調時鐘來跟蹤時間。
- loop.call_later(delay, callback, *args, context=None)¶
安排在給定的 delay 秒數(可以是整數或浮點數)後呼叫callback。
返回一個
asyncio.TimerHandle
的例項,該例項可用於取消回撥。callback 將被精確呼叫一次。如果兩個回撥安排在完全相同的時間執行,它們的呼叫順序是未定義的。
可選的位置引數 args 將在呼叫時傳遞給回撥。如果希望使用關鍵字引數呼叫回撥,請使用
functools.partial()
。可選的僅關鍵字引數 context 允許為 callback 指定一個自定義的
contextvars.Context
來執行。當沒有提供 context 時,將使用當前上下文。3.7 版本更改: 添加了僅關鍵字引數 context。有關更多詳細資訊,請參閱 PEP 567。
3.8 版本更改: 在 Python 3.7 及更早版本中使用預設事件迴圈實現時,delay 不能超過一天。這個問題已在 Python 3.8 中得到修復。
- loop.call_at(when, callback, *args, context=None)¶
安排在給定的絕對時間戳 when(整數或浮點數)呼叫 callback,使用與
loop.time()
相同的時間參考。此方法的行為與
call_later()
相同。返回一個
asyncio.TimerHandle
的例項,該例項可用於取消回撥。3.7 版本更改: 添加了僅關鍵字引數 context。有關更多詳細資訊,請參閱 PEP 567。
3.8 版本更改: 在 Python 3.7 及更早版本中使用預設事件迴圈實現時,when 和當前時間之間的差值不能超過一天。這個問題已在 Python 3.8 中得到修復。
注意
3.8 版本更改: 在 Python 3.7 及更早版本中,超時(相對 delay 或絕對 when)不應超過一天。這個問題已在 Python 3.8 中得到修復。
另請參閱
asyncio.sleep()
函式。
建立 Future 和 Task¶
- loop.create_future()¶
建立一個附加到事件迴圈的
asyncio.Future
物件。這是在 asyncio 中建立 Future 的首選方法。這允許第三方事件迴圈提供 Future 物件的替代實現(具有更好的效能或檢測能力)。
3.5.2 版本新增。
- loop.create_task(coro, *, name=None, context=None)¶
-
第三方事件迴圈可以使用自己的
Task
子類來實現互操作性。在這種情況下,結果型別是Task
的子類。如果提供了 name 引數且不為
None
,則使用Task.set_name()
將其設定為任務的名稱。可選的僅關鍵字引數 context 允許為 coro 指定一個自定義的
contextvars.Context
來執行。當沒有提供 context 時,將建立當前上下文的副本。3.8 版本更改: 添加了 name 引數。
3.11 版本更改: 添加了 context 引數。
- loop.set_task_factory(factory)¶
設定一個任務工廠,該工廠將由
loop.create_task()
使用。如果 factory 為
None
,將設定預設的任務工廠。否則,factory 必須是一個可呼叫物件,其簽名與(loop, coro, context=None)
匹配,其中 loop 是對活動事件迴圈的引用,而 coro 是一個協程物件。可呼叫物件必須返回一個與asyncio.Future
相容的物件。
- loop.get_task_factory()¶
如果正在使用預設任務工廠,則返回任務工廠或
None
。
開啟網路連線¶
- coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None, all_errors=False)¶
開啟一個流式傳輸連線到由 host 和 port 指定的給定地址。
套接字族可以是
AF_INET
或AF_INET6
,具體取決於 host (或提供的 family 引數)。套接字型別將為
SOCK_STREAM
。protocol_factory 必須是一個可呼叫物件,返回一個 asyncio 協議 實現。
此方法將嘗試在後臺建立連線。成功後,它將返回一個
(transport, protocol)
對。底層操作的時間順序概要如下:
連線已建立,併為其建立了一個 傳輸。
protocol_factory 在不帶引數的情況下被呼叫,並期望返回一個 協議 例項。
透過呼叫協議例項的
connection_made()
方法,將其與傳輸耦合。成功後,返回一個
(transport, protocol)
元組。
建立的傳輸是一個與實現相關的雙向流。
其他引數:
ssl:如果給定且不為 false,則建立 SSL/TLS 傳輸(預設情況下建立普通的 TCP 傳輸)。如果 ssl 是一個
ssl.SSLContext
物件,則此上下文用於建立傳輸;如果 ssl 是True
,則使用從ssl.create_default_context()
返回的預設上下文。另請參閱
server_hostname 設定或覆蓋將用於匹配目標伺服器證書的主機名。只有當 ssl 不是
None
時才應傳遞。預設情況下,使用 host 引數的值。如果 host 為空,則沒有預設值,您必須為 server_hostname 傳遞一個值。如果 server_hostname 是一個空字串,則停用主機名匹配(這是一個嚴重的安全風險,可能導致中間人攻擊)。family、proto、flags 是可選的地址族、協議和標誌,將傳遞給 getaddrinfo() 以進行 host 解析。如果給定,這些都應是來自相應
socket
模組常量的整數。happy_eyeballs_delay,如果給定,則為此連線啟用 Happy Eyeballs。它應該是一個浮點數,表示在並行啟動下一個嘗試之前,等待連線嘗試完成的時間(以秒為單位)。這是 RFC 8305 中定義的“連線嘗試延遲”。RFC 建議的合理預設值為
0.25
(250 毫秒)。interleave 控制當主機名解析為多個 IP 地址時地址的重新排序。如果
0
或未指定,則不進行重新排序,並且地址將按照getaddrinfo()
返回的順序進行嘗試。如果指定一個正整數,則地址將按地址族交錯,並且給定的整數將被解釋為 RFC 8305 中定義的“第一個地址族計數”。如果未指定 happy_eyeballs_delay,則預設為0
;如果指定了 happy_eyeballs_delay,則預設為1
。如果給定 sock,則它應該是現有、已連線的
socket.socket
物件,供傳輸使用。如果給定了 sock,則不應指定 host、port、family、proto、flags、happy_eyeballs_delay、interleave 和 local_addr 中的任何一個。注意
sock 引數將套接字的所有權轉移給建立的傳輸。要關閉套接字,請呼叫傳輸的
close()
方法。local_addr,如果給定,則是一個
(local_host, local_port)
元組,用於在本地繫結套接字。local_host 和 local_port 的查詢方式與 host 和 port 類似,使用getaddrinfo()
。ssl_handshake_timeout 是(對於 TLS 連線)等待 TLS 握手完成的時間(以秒為單位),超過該時間將中止連線。如果為
None
(預設值),則為60.0
秒。ssl_shutdown_timeout 是等待 SSL 關閉完成的時間(以秒為單位),超過該時間將中止連線。如果為
None
(預設值),則為30.0
秒。all_errors 確定在無法建立連線時引發哪些異常。預設情況下,只會引發一個
Exception
:如果只有一個異常或所有錯誤都具有相同的訊息,則會引發第一個異常,或者引發一個帶有組合錯誤訊息的OSError
。當all_errors
為True
時,將引發一個包含所有異常的ExceptionGroup
(即使只有一個異常)。
在 3.5 版本中更改: 為
ProactorEventLoop
添加了對 SSL/TLS 的支援。在 3.6 版本中更改: 預設情況下,為所有 TCP 連線設定套接字選項 socket.TCP_NODELAY。
在 3.7 版本中更改: 添加了 ssl_handshake_timeout 引數。
在 3.8 版本中更改: 添加了 happy_eyeballs_delay 和 interleave 引數。
Happy Eyeballs 演算法:雙棧主機成功。當伺服器的 IPv4 路徑和協議工作正常,但伺服器的 IPv6 路徑和協議不工作時,與僅使用 IPv4 的客戶端相比,雙棧客戶端應用程式會遇到明顯的連線延遲。這是不希望的,因為它會導致雙棧客戶端的使用者體驗更差。本文件指定了用於減少這種使用者可見延遲的演算法的要求,並提供了一種演算法。
在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 引數。
在 3.12 版本中更改: 添加了 all_errors。
另請參閱
open_connection()
函式是一種高階替代 API。它返回一對 (StreamReader
,StreamWriter
),可以直接在 async/await 程式碼中使用。
- 協程 loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_port=None, allow_broadcast=None, sock=None)¶
建立資料報連線。
套接字族可以是
AF_INET
,AF_INET6
, 或AF_UNIX
,具體取決於 host(或提供的 family 引數)。套接字型別將為
SOCK_DGRAM
。protocol_factory 必須是一個可呼叫物件,返回一個 協議 實現。
成功後,返回一個
(transport, protocol)
元組。其他引數:
local_addr,如果給定,則是一個
(local_host, local_port)
元組,用於在本地繫結套接字。local_host 和 local_port 的查詢方式與 host 和 port 類似,使用getaddrinfo()
。如果給定 remote_addr,則它是一個
(remote_host, remote_port)
元組,用於將套接字連線到遠端地址。remote_host 和 remote_port 將使用getaddrinfo()
進行查詢。family、proto、flags 是可選的地址族、協議和標誌,它們將被傳遞給
getaddrinfo()
用於 host 解析。如果給定,這些都應該是來自相應socket
模組常量的整數。reuse_port 告訴核心允許此端點繫結到與其他現有端點繫結的同一埠,只要它們在建立時都設定了此標誌。此選項在 Windows 和一些 Unix 系統上不受支援。如果未定義 socket.SO_REUSEPORT 常量,則不支援此功能。
allow_broadcast 告訴核心允許此端點向廣播地址傳送訊息。
可以可選地指定 sock,以便使用一個預先存在的、已連線的
socket.socket
物件供傳輸使用。如果指定了,則應該省略 local_addr 和 remote_addr(必須為None
)。注意
sock 引數將套接字的所有權轉移給建立的傳輸。要關閉套接字,請呼叫傳輸的
close()
方法。
請參閱 UDP 回聲客戶端協議 和 UDP 回聲伺服器協議 示例。
在 3.4.4 版本中更改: 添加了 family、proto、flags、reuse_address、reuse_port、allow_broadcast 和 sock 引數。
在 3.8 版本中更改: 添加了對 Windows 的支援。
在 3.8.1 版本中更改: 不再支援 reuse_address 引數,因為使用 socket.SO_REUSEADDR 會對 UDP 造成嚴重的安全問題。顯式傳遞
reuse_address=True
將引發異常。當多個具有不同 UID 的程序使用
SO_REUSEADDR
將套接字分配給相同的 UDP 套接字地址時,傳入的資料包可以在套接字之間隨機分配。對於支援的平臺,可以使用 reuse_port 作為類似功能的替代品。使用 reuse_port 時,將使用 socket.SO_REUSEPORT,它可以專門阻止具有不同 UID 的程序將套接字分配給相同的套接字地址。
在 3.11 版本中更改: 自 Python 3.8.1、3.7.6 和 3.6.10 起停用的 reuse_address 引數已完全刪除。
- coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
建立一個 Unix 連線。
套接字族將是
AF_UNIX
;套接字型別將是SOCK_STREAM
。成功後,返回一個
(transport, protocol)
元組。path 是 Unix 域套接字的名稱,並且是必需的,除非指定了 sock 引數。支援抽象 Unix 套接字、
str
、bytes
和Path
路徑。有關此方法引數的資訊,請參閱
loop.create_connection()
方法的文件。可用性:Unix。
在 3.7 版本中更改: 添加了 ssl_handshake_timeout 引數。 path 引數現在可以是 路徑類物件。
在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 引數。
建立網路伺服器¶
- coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
建立一個 TCP 伺服器(套接字型別為
SOCK_STREAM
),該伺服器偵聽 host 地址的 port。返回一個
Server
物件。引數
protocol_factory 必須是一個可呼叫物件,返回一個 協議 實現。
host 引數可以設定為幾種型別,這些型別確定伺服器將偵聽的位置
如果 host 是一個字串,則 TCP 伺服器將繫結到由 host 指定的單個網路介面。
如果 host 是一個字串序列,則 TCP 伺服器將繫結到該序列指定的所有網路介面。
如果 host 是一個空字串或
None
,則假定所有介面,並且將返回多個套接字的列表(最有可能一個用於 IPv4,另一個用於 IPv6)。
port 引數可以設定為指定伺服器應偵聽的埠。如果為
0
或None
(預設值),則將選擇一個未使用的隨機埠(請注意,如果 host 解析為多個網路介面,則將為每個介面選擇一個不同的隨機埠)。可以將 family 設定為
socket.AF_INET
或AF_INET6
,以強制套接字使用 IPv4 或 IPv6。如果未設定,則將根據主機名確定 family(預設為AF_UNSPEC
)。flags 是
getaddrinfo()
的位掩碼。可以可選地指定 sock 以使用預先存在的套接字物件。如果指定了,則不得指定 host 和 port。
注意
sock 引數將套接字的所有權轉移到建立的伺服器。要關閉套接字,請呼叫伺服器的
close()
方法。backlog 是傳遞給
listen()
的最大排隊連線數(預設為 100)。可以將 ssl 設定為
SSLContext
例項,以在接受的連線上啟用 TLS。reuse_address 告訴核心重用
TIME_WAIT
狀態的本地套接字,而無需等待其自然超時到期。如果未指定,則在 Unix 上將自動設定為True
。reuse_port 告訴核心允許此端點繫結到與其他現有端點繫結的同一埠,只要它們在建立時都設定了此標誌。此選項在 Windows 上不受支援。
將 keep_alive 設定為
True
透過啟用訊息的定期傳輸來保持連線活動。
在 3.13 版本中更改: 添加了 keep_alive 引數。
ssl_handshake_timeout(對於 TLS 伺服器)是等待 TLS 握手完成然後中止連線的時間(以秒為單位)。 如果
None
(預設值),則為60.0
秒。ssl_shutdown_timeout 是等待 SSL 關閉完成的時間(以秒為單位),超過該時間將中止連線。如果為
None
(預設值),則為30.0
秒。start_serving 設定為
True
(預設值) 會使建立的伺服器立即開始接受連線。當設定為False
時,使用者應該等待Server.start_serving()
或Server.serve_forever()
,以使伺服器開始接受連線。
在 3.5 版本中更改: 為
ProactorEventLoop
添加了對 SSL/TLS 的支援。3.5.1 版本更改: host 引數可以是一個字串序列。
3.6 版本更改: 添加了 ssl_handshake_timeout 和 start_serving 引數。預設情況下,所有 TCP 連線都會設定套接字選項 socket.TCP_NODELAY。
在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 引數。
另請參閱
start_server()
函式是一個更高層的替代 API,它返回一對StreamReader
和StreamWriter
,可以在 async/await 程式碼中使用。
- 協程 loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)¶
類似於
loop.create_server()
,但適用於AF_UNIX
套接字族。path 是 Unix 域套接字的名稱,是必需的,除非提供了 sock 引數。支援抽象 Unix 套接字,
str
,bytes
和Path
路徑。如果 cleanup_socket 為 true,則當伺服器關閉時,Unix 套接字將自動從檔案系統中刪除,除非在建立伺服器後套接字已被替換。
有關此方法的引數的資訊,請參閱
loop.create_server()
方法的文件。可用性:Unix。
3.7 版本更改: 添加了 ssl_handshake_timeout 和 start_serving 引數。path 引數現在可以是
Path
物件。在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 引數。
3.13 版本更改: 添加了 cleanup_socket 引數。
- 協程 loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
將已接受的連線包裝到傳輸/協議對中。
此方法可供在 asyncio 之外接受連線但使用 asyncio 處理這些連線的伺服器使用。
引數
protocol_factory 必須是一個可呼叫物件,返回一個 協議 實現。
sock 是從
socket.accept
返回的預先存在的套接字物件。注意
sock 引數將套接字的所有權轉移給建立的傳輸。要關閉套接字,請呼叫傳輸的
close()
方法。可以將 ssl 設定為
SSLContext
以便在接受的連線上啟用 SSL。ssl_handshake_timeout (對於 SSL 連線) 是在中止連線之前等待 SSL 握手完成的時間(以秒為單位)。如果為
None
(預設值),則為60.0
秒。ssl_shutdown_timeout 是等待 SSL 關閉完成的時間(以秒為單位),超過該時間將中止連線。如果為
None
(預設值),則為30.0
秒。
返回一個
(transport, protocol)
對。在 3.5.3 版本中新增。
在 3.7 版本中更改: 添加了 ssl_handshake_timeout 引數。
在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 引數。
傳輸檔案¶
- 協程 loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)¶
透過 transport 傳送 file。返回傳送的總位元組數。
如果可用,該方法使用高效能的
os.sendfile()
。file 必須是以二進位制模式開啟的常規檔案物件。
offset 指定從何處開始讀取檔案。如果指定了 count,則它是要傳輸的總位元組數,而不是將檔案傳送到 EOF。即使此方法引發錯誤,檔案位置也會始終更新,並且可以使用
file.tell()
來獲取實際傳送的位元組數。當平臺不支援 sendfile 系統呼叫時 (例如 Windows 或 Unix 上的 SSL 套接字),將 fallback 設定為
True
使 asyncio 手動讀取和傳送檔案。如果系統不支援 sendfile 系統呼叫並且 fallback 為
False
,則引發SendfileNotAvailableError
。3.7 版本新增。
TLS 升級¶
- 協程 loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
將現有的基於傳輸的連線升級到 TLS。
建立 TLS 編碼器/解碼器例項,並將其插入 transport 和 protocol 之間。該編碼器/解碼器實現面向 transport 的協議和麵向 protocol 的傳輸。
返回建立的雙介面例項。在 await 之後,protocol 必須停止使用原始 transport,並且只能與返回的物件通訊,因為編碼器會快取 protocol 端的資料,並零星地與 transport 交換額外的 TLS 會話資料包。
在某些情況下 (例如,當傳遞的傳輸已經關閉時),這可能會返回
None
。引數
transport 和 protocol 是諸如
create_server()
和create_connection()
之類的方法返回的例項。sslcontext:
SSLContext
的配置例項。當正在升級伺服器端連線時(例如由
create_server()
建立的連線),傳遞True
。server_hostname:設定或覆蓋目標伺服器的證書將與之匹配的主機名。
ssl_handshake_timeout 是(對於 TLS 連線)等待 TLS 握手完成的時間(以秒為單位),超過該時間將中止連線。如果為
None
(預設值),則為60.0
秒。ssl_shutdown_timeout 是等待 SSL 關閉完成的時間(以秒為單位),超過該時間將中止連線。如果為
None
(預設值),則為30.0
秒。
3.7 版本新增。
在 3.11 版本中更改: 添加了 ssl_shutdown_timeout 引數。
監視檔案描述符¶
- loop.add_reader(fd, callback, *args)¶
開始監視 fd 檔案描述符的讀取可用性,並在 fd 可用於讀取後,使用指定的引數呼叫 callback。
- loop.remove_reader(fd)¶
停止監視 fd 檔案描述符的讀取可用性。如果 fd 之前正在被監視讀取,則返回
True
。
- loop.add_writer(fd, callback, *args)¶
開始監視 fd 檔案描述符的寫入可用性,並在 fd 可用於寫入時呼叫帶有指定引數的 callback。
使用
functools.partial()
將關鍵字引數傳遞給 callback。
- loop.remove_writer(fd)¶
停止監視 fd 檔案描述符的寫入可用性。如果 fd 之前正在被監視寫入,則返回
True
。
有關這些方法的一些限制,請參閱 平臺支援 部分。
直接使用套接字物件¶
通常,使用基於傳輸的 API(例如 loop.create_connection()
和 loop.create_server()
)的協議實現比直接使用套接字的實現更快。但是,在某些效能不是關鍵的情況下,直接使用 socket
物件會更方便。
- 協程 loop.sock_recv(sock, nbytes)¶
從 sock 接收最多 nbytes 個位元組。
socket.recv()
的非同步版本。以位元組物件形式返回接收到的資料。
sock 必須是一個非阻塞套接字。
在 3.7 版本中更改: 儘管此方法始終被記錄為協程方法,但在 Python 3.7 之前的版本中,它返回一個
Future
。 從 Python 3.7 開始,它是一個async def
方法。
- 協程 loop.sock_recv_into(sock, buf)¶
從 sock 接收資料到 buf 緩衝區。 模仿阻塞式
socket.recv_into()
方法。返回寫入緩衝區的位元組數。
sock 必須是一個非阻塞套接字。
3.7 版本新增。
- 協程 loop.sock_recvfrom(sock, bufsize)¶
從 sock 接收最大 bufsize 的資料報。
socket.recvfrom()
的非同步版本。返回 (接收到的資料, 遠端地址) 的元組。
sock 必須是一個非阻塞套接字。
在 3.11 版本中新增。
- 協程 loop.sock_recvfrom_into(sock, buf, nbytes=0)¶
從 sock 接收最大 nbytes 的資料報到 buf 中。
socket.recvfrom_into()
的非同步版本。返回 (接收到的位元組數,遠端地址) 的元組。
sock 必須是一個非阻塞套接字。
在 3.11 版本中新增。
- 協程 loop.sock_sendall(sock, data)¶
將 data 傳送到 sock 套接字。
socket.sendall()
的非同步版本。此方法會繼續傳送到套接字,直到 data 中的所有資料都已傳送或發生錯誤為止。成功時返回
None
。 發生錯誤時,將引發異常。此外,無法確定連線的接收端成功處理了多少資料(如果有)。sock 必須是一個非阻塞套接字。
在 3.7 版本中更改: 儘管此方法始終被記錄為協程方法,但在 Python 3.7 之前它返回一個
Future
。 從 Python 3.7 開始,它是一個async def
方法。
- 協程 loop.sock_sendto(sock, data, address)¶
將資料報從 sock 傳送到 address。
socket.sendto()
的非同步版本。返回傳送的位元組數。
sock 必須是一個非阻塞套接字。
在 3.11 版本中新增。
- 協程 loop.sock_connect(sock, address)¶
將 sock 連線到 address 的遠端套接字。
socket.connect()
的非同步版本。sock 必須是一個非阻塞套接字。
在 3.5.2 版本中更改:
address
不再需要解析。sock_connect
將嘗試透過呼叫socket.inet_pton()
來檢查 address 是否已被解析。 如果沒有,將使用loop.getaddrinfo()
來解析 address。
- 協程 loop.sock_accept(sock)¶
接受一個連線。模仿阻塞式
socket.accept()
方法。該套接字必須繫結到一個地址並偵聽連線。返回值是一對
(conn, address)
,其中 conn 是一個新的套接字物件,可用於在該連線上傳送和接收資料,而 address 是繫結到連線另一端套接字的地址。sock 必須是一個非阻塞套接字。
在 3.7 版本中更改: 儘管此方法始終被記錄為協程方法,但在 Python 3.7 之前它返回一個
Future
。 從 Python 3.7 開始,它是一個async def
方法。另請參閱
- 協程 loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)¶
如果可能,使用高效能
os.sendfile
傳送檔案。 返回傳送的總位元組數。socket.sendfile()
的非同步版本。sock 必須是非阻塞
socket.SOCK_STREAM
socket
。file 必須是以二進位制模式開啟的常規檔案物件。
offset 指定從何處開始讀取檔案。如果指定了 count,則它是要傳輸的總位元組數,而不是將檔案傳送到 EOF。即使此方法引發錯誤,檔案位置也會始終更新,並且可以使用
file.tell()
來獲取實際傳送的位元組數。當設定為
True
時,如果平臺不支援 sendfile 系統呼叫(例如 Windows 或 Unix 上的 SSL 套接字),則 fallback 會使 asyncio 手動讀取和傳送檔案。如果系統不支援 sendfile 系統呼叫且 fallback 為
False
,則引發SendfileNotAvailableError
。sock 必須是一個非阻塞套接字。
3.7 版本新增。
DNS¶
- 協程 loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)¶
socket.getaddrinfo()
的非同步版本。
- 協程 loop.getnameinfo(sockaddr, flags=0)¶
socket.getnameinfo()
的非同步版本。
注意
getaddrinfo 和 getnameinfo 內部都透過迴圈的預設執行緒池執行器利用它們的同步版本。當此執行器飽和時,這些方法可能會遇到延遲,更高級別的網路庫可能會將其報告為超時增加。為了緩解這種情況,請考慮為其他使用者任務使用自定義執行器,或設定具有更多工作執行緒的預設執行器。
在 3.7 版本中變更: getaddrinfo 和 getnameinfo 方法始終被記錄為返回一個協程,但在 Python 3.7 之前,它們實際上返回的是 asyncio.Future
物件。從 Python 3.7 開始,這兩個方法都是協程。
使用管道¶
- 協程 loop.connect_read_pipe(protocol_factory, pipe)¶
在事件迴圈中註冊 pipe 的讀取端。
protocol_factory 必須是一個可呼叫物件,返回一個 asyncio 協議 實現。
pipe 是一個 類檔案物件。
返回對
(transport, protocol)
,其中 transport 支援ReadTransport
介面,protocol 是由 protocol_factory 例項化的物件。使用
SelectorEventLoop
事件迴圈,pipe 被設定為非阻塞模式。
- 協程 loop.connect_write_pipe(protocol_factory, pipe)¶
在事件迴圈中註冊 pipe 的寫入端。
protocol_factory 必須是一個可呼叫物件,返回一個 asyncio 協議 實現。
pipe 是 類檔案物件。
返回對
(transport, protocol)
,其中 transport 支援WriteTransport
介面,protocol 是由 protocol_factory 例項化的物件。使用
SelectorEventLoop
事件迴圈,pipe 被設定為非阻塞模式。
注意
SelectorEventLoop
不支援 Windows 上的上述方法。對於 Windows,請改用 ProactorEventLoop
。
另請參閱
Unix 訊號¶
- loop.add_signal_handler(signum, callback, *args)¶
將 callback 設定為 signum 訊號的處理程式。
回撥將由 loop 呼叫,以及該事件迴圈的其他排隊回撥和可執行協程。與使用
signal.signal()
註冊的訊號處理程式不同,使用此函式註冊的回撥允許與事件迴圈互動。如果訊號編號無效或不可捕獲,則引發
ValueError
。如果設定處理程式時出現問題,則引發RuntimeError
。使用
functools.partial()
將關鍵字引數傳遞給 callback。與
signal.signal()
一樣,此函式必須在主執行緒中呼叫。
- loop.remove_signal_handler(sig)¶
移除 sig 訊號的處理程式。
如果訊號處理程式被移除,則返回
True
,如果未為給定訊號設定處理程式,則返回False
。可用性:Unix。
另請參閱
signal
模組。
線上程或程序池中執行程式碼¶
- 可等待物件 loop.run_in_executor(executor, func, *args)¶
安排在指定的執行器中呼叫 func。
executor 引數應為
concurrent.futures.Executor
例項。如果 executor 為None
,則使用預設執行器。可以使用loop.set_default_executor()
設定預設執行器,否則,如果需要,run_in_executor()
將惰性初始化並使用concurrent.futures.ThreadPoolExecutor
。示例
import asyncio import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open('/dev/urandom', 'rb') as f: return f.read(100) def cpu_bound(): # CPU-bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() ## Options: # 1. Run in the default loop's executor: result = await loop.run_in_executor( None, blocking_io) print('default thread pool', result) # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print('custom thread pool', result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print('custom process pool', result) if __name__ == '__main__': asyncio.run(main())
請注意,由於
multiprocessing
的特殊性(由ProcessPoolExecutor
使用),選項 3 需要入口點保護 (if __name__ == '__main__'
)。請參閱 安全匯入主模組。此方法返回一個
asyncio.Future
物件。使用
functools.partial()
將關鍵字引數傳遞給 func。在 3.5.3 版本中更改:
loop.run_in_executor()
不再配置它建立的執行緒池執行器的max_workers
,而是將其留給執行緒池執行器(ThreadPoolExecutor
)來設定預設值。
- loop.set_default_executor(executor)¶
將 executor 設定為
run_in_executor()
使用的預設執行器。executor 必須是ThreadPoolExecutor
的例項。在 3.11 版本中更改: executor 必須是
ThreadPoolExecutor
的例項。
錯誤處理 API¶
允許自定義事件迴圈中如何處理異常。
- loop.set_exception_handler(handler)¶
將 handler 設定為新的事件迴圈異常處理程式。
如果 handler 為
None
,則將設定預設異常處理程式。 否則,handler 必須是與(loop, context)
簽名匹配的可呼叫物件,其中loop
是對活動事件迴圈的引用,而context
是一個dict
物件,其中包含異常的詳細資訊(有關 context 的詳細資訊,請參閱call_exception_handler()
文件)。如果代表
Task
或Handle
呼叫處理程式,則它在該任務或回撥控制代碼的contextvars.Context
中執行。在 3.12 版本中更改: 可以在異常發生的任務或控制代碼的
Context
中呼叫處理程式。
- loop.get_exception_handler()¶
返回當前的異常處理程式,如果沒有設定自定義異常處理程式,則返回
None
。3.5.2 版本新增。
- loop.default_exception_handler(context)¶
預設異常處理程式。
當發生異常且未設定異常處理程式時,將呼叫此函式。 可以由想要推遲到預設處理程式行為的自定義異常處理程式呼叫此函式。
context 引數的含義與
call_exception_handler()
中的含義相同。
- loop.call_exception_handler(context)¶
呼叫當前的事件迴圈異常處理程式。
context 是一個
dict
物件,其中包含以下鍵(在未來的 Python 版本中可能會引入新鍵)“message”:錯誤訊息;
“exception”(可選):異常物件;
“future”(可選):
asyncio.Future
例項;“task”(可選):
asyncio.Task
例項;“handle”(可選):
asyncio.Handle
例項;“protocol”(可選):協議 例項;
“transport”(可選):傳輸 例項;
“socket”(可選):
socket.socket
例項;- “asyncgen”(可選):導致異常的非同步生成器。
異常。
注意
不應在子類化的事件迴圈中過載此方法。 對於自定義異常處理,請使用
set_exception_handler()
方法。
啟用除錯模式¶
- loop.get_debug()¶
獲取事件迴圈的除錯模式(
bool
)。如果環境變數
PYTHONASYNCIODEBUG
設定為非空字串,則預設值為True
,否則為False
。
- loop.set_debug(enabled: bool)¶
設定事件迴圈的除錯模式。
在 3.7 版本中更改: 現在還可以使用新的 Python 開發模式 來啟用除錯模式。
- loop.slow_callback_duration¶
此屬性可用於設定被認為是“慢”的最短執行時間(以秒為單位)。 啟用除錯模式後,將記錄“慢”的回撥。
預設值為 100 毫秒。
另請參閱
執行子程序¶
此小節中描述的方法是低階的。 在常規的 async/await 程式碼中,請考慮改用高階的 asyncio.create_subprocess_shell()
和 asyncio.create_subprocess_exec()
便利函式。
注意
在 Windows 上,預設事件迴圈 ProactorEventLoop
支援子程序,而 SelectorEventLoop
則不支援。 有關詳細資訊,請參閱 Windows 上的子程序支援。
- coroutine loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)¶
從 args 指定的一個或多個字串引數建立子程序。
args 必須是由以下表示的字串列表
第一個字串指定程式可執行檔案,其餘字串指定引數。 字串引數共同構成程式的
argv
。這類似於標準庫
subprocess.Popen
類,使用shell=False
呼叫,並將字串列表作為第一個引數傳遞;但是,當Popen
接收一個作為字串列表的引數時,subprocess_exec 接收多個字串引數。protocol_factory 必須是一個可呼叫物件,返回
asyncio.SubprocessProtocol
類的子類。其他引數
stdin 可以是以下任何一項
類似檔案的物件
現有檔案描述符(正整數),例如使用
os.pipe()
建立的檔案描述符使用
subprocess.PIPE
常量(預設值),它將建立一個新的管道並連線它,使用值
None
,這將使子程序從當前程序繼承檔案描述符使用
subprocess.DEVNULL
常量,它表示將使用特殊的os.devnull
檔案
stdout 可以是以下任何一個
類似檔案的物件
使用
subprocess.PIPE
常量(預設值),它將建立一個新的管道並連線它,使用值
None
,這將使子程序從當前程序繼承檔案描述符使用
subprocess.DEVNULL
常量,它表示將使用特殊的os.devnull
檔案
stderr 可以是以下任何一個
類似檔案的物件
使用
subprocess.PIPE
常量(預設值),它將建立一個新的管道並連線它,使用值
None
,這將使子程序從當前程序繼承檔案描述符使用
subprocess.DEVNULL
常量,它表示將使用特殊的os.devnull
檔案使用
subprocess.STDOUT
常量,它將把標準錯誤流連線到程序的標準輸出流
所有其他關鍵字引數都將傳遞給
subprocess.Popen
,不做任何解釋,除了 *bufsize*、*universal_newlines*、*shell*、*text*、*encoding* 和 *errors*,這些引數都不應指定。asyncio
子程序 API 不支援將流解碼為文字。可以使用bytes.decode()
將流返回的位元組轉換為文字。
如果作為 *stdin*、*stdout* 或 *stderr* 傳遞的類檔案物件表示一個管道,則該管道的另一端應使用
connect_write_pipe()
或connect_read_pipe()
進行註冊,以便與事件迴圈一起使用。有關其他引數的文件,請參閱
subprocess.Popen
類的建構函式。返回一個
(transport, protocol)
對,其中 *transport* 符合asyncio.SubprocessTransport
基類,而 *protocol* 是由 *protocol_factory* 例項化的物件。
- 協程 loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)¶
從 *cmd* 建立一個子程序,*cmd* 可以是
str
或一個編碼為 檔案系統編碼 的bytes
字串,使用平臺的“shell”語法。這類似於標準庫
subprocess.Popen
類在呼叫時使用shell=True
。*protocol_factory* 必須是可呼叫的,返回
SubprocessProtocol
類的子類。有關其餘引數的詳細資訊,請參閱
subprocess_exec()
。返回一個
(transport, protocol)
對,其中 *transport* 符合asyncio.SubprocessTransport
基類,而 *protocol* 是由 *protocol_factory* 例項化的物件。
注意
應用程式有責任確保所有空格和特殊字元都被適當地引用,以避免 shell 注入 漏洞。可以使用 shlex.quote()
函式來正確轉義將用於構造 shell 命令的字串中的空格和特殊字元。
回撥控制代碼¶
- class asyncio.Handle¶
由
loop.call_soon()
、loop.call_soon_threadsafe()
返回的回撥包裝器物件。- get_context()¶
返回與控制代碼關聯的
contextvars.Context
物件。在 3.12 版本中新增。
- cancel()¶
取消回撥。如果回撥已被取消或執行,則此方法無效。
- cancelled()¶
如果回撥被取消,則返回
True
。3.7 版本新增。
- class asyncio.TimerHandle¶
由
loop.call_later()
和loop.call_at()
返回的回撥包裝器物件。此類是
Handle
的子類。- when()¶
以
float
秒的形式返回計劃的回撥時間。該時間是一個絕對時間戳,使用與
loop.time()
相同的時間參考。3.7 版本新增。
伺服器物件¶
伺服器物件由 loop.create_server()
、loop.create_unix_server()
、start_server()
和 start_unix_server()
函式建立。
請勿直接例項化 Server
類。
- class asyncio.Server¶
Server 物件是非同步上下文管理器。當在
async with
語句中使用時,可以保證當async with
語句完成時,Server 物件已關閉且不接受新連線。srv = await loop.create_server(...) async with srv: # some code # At this point, srv is closed and no longer accepts new connections.
在 3.7 版本中更改: 自 Python 3.7 起,Server 物件是一個非同步上下文管理器。
在 3.11 版本中更改: 此類在 Python 3.9.11、3.10.3 和 3.11 中作為
asyncio.Server
公開。- close()¶
停止服務:關閉偵聽套接字並將
sockets
屬性設定為None
。表示現有傳入客戶端連線的套接字保持開啟狀態。
伺服器是非同步關閉的;使用
wait_closed()
協程等待直到伺服器關閉(並且沒有更多連線處於活動狀態)。
- close_clients()¶
關閉所有現有的傳入客戶端連線。
在所有關聯的傳輸上呼叫
close()
。為了避免與新客戶端連線發生競爭,在關閉伺服器時,應該在呼叫
close_clients()
之前呼叫close()
。3.13 版本中新增。
- abort_clients()¶
立即關閉所有現有的傳入客戶端連線,無需等待掛起的操作完成。
在所有關聯的傳輸上呼叫
abort()
。為了避免與新客戶端連線發生競爭,在關閉伺服器時,應該在呼叫
abort_clients()
之前呼叫close()
。3.13 版本中新增。
- get_loop()¶
返回與伺服器物件關聯的事件迴圈。
3.7 版本新增。
- 協程 start_serving()¶
開始接受連線。
此方法是冪等的,因此可以在伺服器已經提供服務時呼叫。
loop.create_server()
和asyncio.start_server()
的僅限關鍵字引數 start_serving 允許建立一個最初不接受連線的 Server 物件。在這種情況下,可以使用Server.start_serving()
或Server.serve_forever()
使伺服器開始接受連線。3.7 版本新增。
- 協程 serve_forever()¶
開始接受連線,直到協程被取消。取消
serve_forever
任務會導致伺服器關閉。如果伺服器已經在接受連線,則可以呼叫此方法。每個 Server 物件只能存在一個
serve_forever
任務。示例
async def client_connected(reader, writer): # Communicate with the client with # reader/writer streams. For example: await reader.readline() async def main(host, port): srv = await asyncio.start_server( client_connected, host, port) await srv.serve_forever() asyncio.run(main('127.0.0.1', 0))
3.7 版本新增。
- is_serving()¶
如果伺服器正在接受新連線,則返回
True
。3.7 版本新增。
- sockets¶
伺服器正在監聽的類 socket 物件列表,
asyncio.trsock.TransportSocket
。在 3.7 版本中更改: 在 Python 3.7 之前,
Server.sockets
曾經直接返回伺服器套接字的內部列表。在 3.7 中,返回該列表的副本。
事件迴圈實現¶
asyncio 提供了兩種不同的事件迴圈實現:SelectorEventLoop
和 ProactorEventLoop
。
預設情況下,asyncio 配置為使用 EventLoop
。
- class asyncio.SelectorEventLoop¶
基於
selectors
模組的AbstractEventLoop
的子類。使用適用於給定平臺的最有效的 selector。也可以手動配置要使用的確切選擇器實現
import asyncio import selectors class MyPolicy(asyncio.DefaultEventLoopPolicy): def new_event_loop(self): selector = selectors.SelectSelector() return asyncio.SelectorEventLoop(selector) asyncio.set_event_loop_policy(MyPolicy())
可用性: Unix,Windows。
- class asyncio.ProactorEventLoop¶
用於 Windows 的
AbstractEventLoop
的子類,使用“I/O 完成埠”(IOCP)。可用性: Windows。
另請參閱
- class asyncio.EventLoop¶
給定平臺上可用的
AbstractEventLoop
的最高效子類的別名。它是 Unix 上的
SelectorEventLoop
和 Windows 上的ProactorEventLoop
的別名。3.13 版本中新增。
示例¶
請注意,本節中的所有示例都**有意**展示瞭如何使用低級別的事件迴圈 API,例如 loop.run_forever()
和 loop.call_soon()
。現代的 asyncio 應用程式很少需要以這種方式編寫;請考慮使用高階函式,如 asyncio.run()
。
使用 call_soon() 的 Hello World¶
一個使用 loop.call_soon()
方法來排程回撥的示例。回撥顯示 "Hello World"
,然後停止事件迴圈
import asyncio
def hello_world(loop):
"""A callback to print 'Hello World' and stop the event loop"""
print('Hello World')
loop.stop()
loop = asyncio.new_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
另請參閱
使用協程和 run()
函式建立的類似的 Hello World 示例。
使用 call_later() 顯示當前日期¶
一個每秒顯示當前日期的回撥示例。回撥使用 loop.call_later()
方法在 5 秒後重新排程自身,然後停止事件迴圈
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
loop.call_later(1, display_date, end_time, loop)
else:
loop.stop()
loop = asyncio.new_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
try:
loop.run_forever()
finally:
loop.close()
監視檔案描述符的讀取事件¶
等待檔案描述符使用 loop.add_reader()
方法接收到一些資料,然後關閉事件迴圈
import asyncio
from socket import socketpair
# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.new_event_loop()
def reader():
data = rsock.recv(100)
print("Received:", data.decode())
# We are done: unregister the file descriptor
loop.remove_reader(rsock)
# Stop the event loop
loop.stop()
# Register the file descriptor for read event
loop.add_reader(rsock, reader)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
try:
# Run the event loop
loop.run_forever()
finally:
# We are done. Close sockets and the event loop.
rsock.close()
wsock.close()
loop.close()
另請參閱
一個類似的示例,使用了傳輸層、協議和
loop.create_connection()
方法。另一個類似的示例,使用了高階的
asyncio.open_connection()
函式和流。
為 SIGINT 和 SIGTERM 設定訊號處理器¶
(此 signals
示例僅在 Unix 上有效。)
使用 loop.add_signal_handler()
方法,為訊號 SIGINT
和 SIGTERM
註冊處理器
import asyncio
import functools
import os
import signal
def ask_exit(signame, loop):
print("got signal %s: exit" % signame)
loop.stop()
async def main():
loop = asyncio.get_running_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
functools.partial(ask_exit, signame, loop))
await asyncio.sleep(3600)
print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")
asyncio.run(main())