事件迴圈

原始碼: Lib/asyncio/events.py, Lib/asyncio/base_events.py


前言

事件迴圈是每個 asyncio 應用程式的核心。事件迴圈執行非同步任務和回撥,執行網路 IO 操作,並執行子程序。

應用程式開發人員通常應該使用高階 asyncio 函式,例如 asyncio.run(),很少需要引用迴圈物件或呼叫其方法。本節主要面向需要更精細控制事件迴圈行為的低階程式碼、庫和框架的作者。

獲取事件迴圈

以下低階函式可用於獲取、設定或建立事件迴圈

asyncio.get_running_loop()

返回當前 OS 執行緒中正在執行的事件迴圈。

如果沒有正在執行的事件迴圈,則引發 RuntimeError

此函式只能從協程或回撥中呼叫。

在 3.7 版本加入。

asyncio.get_event_loop()

獲取當前事件迴圈。

當從協程或回撥(例如使用 call_soon 或類似 API 排程)呼叫時,此函式將始終返回正在執行的事件迴圈。

如果沒有設定正在執行的事件迴圈,該函式將返回 get_event_loop_policy().get_event_loop() 呼叫的結果。

由於此函式行為複雜(尤其是在使用自定義事件迴圈策略時),在協程和回撥中,首選使用 get_running_loop() 函式而非 get_event_loop()

如上所述,考慮使用高階 asyncio.run() 函式,而不是使用這些低階函式手動建立和關閉事件迴圈。

3.14 版本變更: 如果沒有當前事件迴圈,則引發 RuntimeError

備註

asyncio 策略系統已棄用,並將在 Python 3.16 中移除;從那時起,如果存在當前正在執行的事件迴圈,此函式將返回該迴圈,否則將返回由 set_event_loop() 設定的迴圈。

asyncio.set_event_loop(loop)

loop 設定為當前 OS 執行緒的當前事件迴圈。

asyncio.new_event_loop()

建立並返回一個新的事件迴圈物件。

請注意,get_event_loop()set_event_loop()new_event_loop() 函式的行為可以透過設定自定義事件迴圈策略來改變。

目錄

本頁文件包含以下部分

事件迴圈方法

事件迴圈具有以下低階 API

執行和停止迴圈

loop.run_until_complete(future)

執行直到 futureFuture 的一個例項)完成。

如果引數是協程物件,它會被隱式排程為 asyncio.Task 執行。

返回 Future 的結果或引發其異常。

loop.run_forever()

執行事件迴圈,直到呼叫 stop()

如果 stop()run_forever() 之前被呼叫,迴圈將以零超時輪詢 I/O 選擇器一次,執行所有響應 I/O 事件排程的回撥(以及那些已經排程的回撥),然後退出。

如果在 run_forever() 正在執行時呼叫 stop(),迴圈將運行當前批次的回撥然後退出。請注意,在這種情況下,由回撥排程的新回撥不會執行;相反,它們將在下次呼叫 run_forever()run_until_complete() 時執行。

loop.stop()

停止事件迴圈。

loop.is_running()

如果事件迴圈當前正在執行,則返回 True

loop.is_closed()

如果事件迴圈已關閉,則返回 True

loop.close()

關閉事件迴圈。

呼叫此函式時,迴圈不得處於執行狀態。所有待處理的回撥都將被丟棄。

此方法會清空所有佇列並關閉執行器,但不會等待執行器完成。

此方法是冪等且不可逆的。事件迴圈關閉後,不應呼叫其他方法。

async loop.shutdown_asyncgens()

排程所有當前開啟的非同步生成器物件透過 aclose() 呼叫關閉。呼叫此方法後,如果迭代新的非同步生成器,事件迴圈將發出警告。這應該用於可靠地完成所有已排程的非同步生成器。

請注意,使用 asyncio.run() 時無需呼叫此函式。

示例

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

在 3.6 版本加入。

async loop.shutdown_default_executor(timeout=None)

排程預設執行器的關閉,並等待它加入 ThreadPoolExecutor 中的所有執行緒。一旦呼叫此方法,使用 loop.run_in_executor() 使用預設執行器將引發 RuntimeError

timeout 引數指定執行器完成加入所允許的時間(以 float 秒為單位)。如果為預設值 None,則執行器被允許無限時間。

如果達到 timeout,則會發出 RuntimeWarning,並且預設執行器會終止,而不等待其執行緒完成加入。

備註

使用 asyncio.run() 時不要呼叫此方法,因為後者會自動處理預設執行器關閉。

在 3.9 版本中新增。

3.12 版本變更: 添加了 timeout 引數。

排程回撥

loop.call_soon(callback, *args, context=None)

排程 callback 回撥 在事件迴圈的下一次迭代中以 args 引數被呼叫。

返回 asyncio.Handle 的例項,該例項以後可用於取消回撥。

回撥按照註冊順序呼叫。每個回撥都將只被呼叫一次。

可選的僅限關鍵字 context 引數指定 callback 執行的自定義 contextvars.Context。如果未提供 context,回撥將使用當前上下文。

call_soon_threadsafe() 不同,此方法不是執行緒安全的。

loop.call_soon_threadsafe(callback, *args, context=None)

執行緒安全的 call_soon() 變體。當從另一個執行緒排程回撥時,必須 使用此函式,因為 call_soon() 不是執行緒安全的。

此函式可以安全地從可重入上下文或訊號處理程式中呼叫,但是,在此類上下文中使用返回的控制代碼是不安全或無益的。

如果在已關閉的迴圈上呼叫,則引發 RuntimeError。這可能發生在主應用程式關閉時的一個輔助執行緒上。

請參閱文件的併發和多執行緒部分。

3.7 版本變更: 添加了 context 僅限關鍵字引數。有關詳細資訊,請參閱 PEP 567

備註

大多數 asyncio 排程函式不允許傳遞關鍵字引數。要做到這一點,請使用 functools.partial()

# will schedule "print("Hello", flush=True)"
loop.call_soon(
    functools.partial(print, "Hello", flush=True))

使用部分物件通常比使用 lambda 更方便,因為 asyncio 可以在除錯和錯誤訊息中更好地呈現部分物件。

排程延遲迴調

事件迴圈提供了排程回撥函式在未來某個時間點被呼叫的機制。事件迴圈使用單調時鐘來跟蹤時間。

loop.call_later(delay, callback, *args, context=None)

排程 callback 在給定的 delay 秒數(可以是整數或浮點數)後被呼叫。

返回 asyncio.TimerHandle 的例項,可用於取消回撥。

callback 將只被呼叫一次。如果兩個回撥被排程在完全相同的時間,則它們的呼叫順序是未定義的。

可選的位置引數 args 將在回撥被呼叫時傳遞給它。如果你想讓回撥使用關鍵字引數呼叫,請使用 functools.partial()

可選的僅限關鍵字 context 引數允許為 callback 執行指定自定義 contextvars.Context。如果未提供 context,則使用當前上下文。

備註

為了效能,使用 loop.call_later() 排程的回撥可能會提前最多一個時鐘解析度執行(請參閱 time.get_clock_info('monotonic').resolution)。

3.7 版本變更: 添加了 context 僅限關鍵字引數。有關詳細資訊,請參閱 PEP 567

3.8 版本變更: 在 Python 3.7 及更早版本中使用預設事件迴圈實現時,delay 不能超過一天。這已在 Python 3.8 中修復。

loop.call_at(when, callback, *args, context=None)

排程 callback 在給定絕對時間戳 when (整數或浮點數)時呼叫,使用與 loop.time() 相同的時間參考。

此方法的行為與 call_later() 相同。

返回 asyncio.TimerHandle 的例項,可用於取消回撥。

備註

為了效能,使用 loop.call_at() 排程的回撥可能會提前最多一個時鐘解析度執行(請參閱 time.get_clock_info('monotonic').resolution)。

3.7 版本變更: 添加了 context 僅限關鍵字引數。有關詳細資訊,請參閱 PEP 567

3.8 版本變更: 在 Python 3.7 及更早版本中使用預設事件迴圈實現時,when 和當前時間之間的差異不能超過一天。這已在 Python 3.8 中修復。

loop.time()

根據事件迴圈的內部單調時鐘,返回當前時間,作為 float 值。

備註

3.8 版本變更: 在 Python 3.7 及更早版本中,超時(相對 delay 或絕對 when)不應超過一天。這已在 Python 3.8 中修復。

參見

asyncio.sleep() 函式。

建立 Futures 和 Tasks

loop.create_future()

建立附加到事件迴圈的 asyncio.Future 物件。

這是在 asyncio 中建立 Futures 的首選方式。這允許第三方事件迴圈提供 Future 物件的替代實現(具有更好的效能或檢測)。

3.5.2 版本新增。

loop.create_task(coro, *, name=None, context=None, eager_start=None, **kwargs)

排程 協程 coro 的執行。返回 Task 物件。

第三方事件迴圈可以使用自己的 Task 子類進行互操作。在這種情況下,結果型別是 Task 的子類。

完整的函式簽名與 Task 建構函式(或工廠)的簽名大致相同——此函式的所有關鍵字引數都透過該介面傳遞。

如果提供了 name 引數且不為 None,則使用 Task.set_name() 將其設定為任務的名稱。

可選的僅限關鍵字 context 引數允許為 coro 執行指定自定義 contextvars.Context。如果未提供 context,則建立當前上下文的副本。

可選的僅限關鍵字 eager_start 引數允許指定任務是在呼叫 create_task 時急切執行,還是稍後排程。如果未傳遞 eager_start,則將使用 loop.set_task_factory() 設定的模式。

3.8 版本變更: 添加了 name 引數。

3.11 版本變更: 添加了 context 引數。

3.13.3 版本變更: 添加了 kwargs,它傳遞任意額外的引數,包括 namecontext

3.13.4 版本變更: 回滾了傳遞 namecontext(如果為 None)的更改,同時仍然傳遞其他任意關鍵字引數(以避免破壞與 3.13.3 的向後相容性)。

3.14 版本變更: 所有 kwargs 現在都已傳遞。eager_start 引數適用於急切任務工廠。

loop.set_task_factory(factory)

設定一個任務工廠,該工廠將由 loop.create_task() 使用。

如果 factoryNone,則將設定預設任務工廠。否則,factory 必須是可呼叫物件,其簽名匹配 (loop, coro, **kwargs),其中 loop 是活動事件迴圈的引用,而 coro 是協程物件。可呼叫物件必須傳遞所有 kwargs,並返回一個 asyncio.Task 相容物件。

3.13.3 版本變更: 要求將所有 kwargs 傳遞給 asyncio.Task

3.13.4 版本變更: name 不再傳遞給任務工廠。如果 contextNone,則不再傳遞給任務工廠。

3.14 版本變更: namecontext 現在再次無條件地傳遞給任務工廠。

loop.get_task_factory()

返回任務工廠,如果正在使用預設工廠,則返回 None

開啟網路連線

async loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None, all_errors=False)

開啟到由 hostport 指定的給定地址的流傳輸連線。

套接字家族可以是 AF_INETAF_INET6,具體取決於 host(或 family 引數,如果提供)。

套接字型別將是 SOCK_STREAM

protocol_factory 必須是一個可呼叫物件,返回一個 asyncio 協議 實現。

此方法將在後臺嘗試建立連線。成功時,它返回一個 (transport, protocol) 對。

底層操作的時間順序概述如下

  1. 連線建立併為其建立傳輸

  2. protocol_factory 在沒有引數的情況下被呼叫,並期望返回一個 協議 例項。

  3. 透過呼叫其 connection_made() 方法,協議例項與傳輸耦合。

  4. 成功時返回一個 (transport, protocol) 元組。

建立的傳輸是依賴於實現的雙向流。

其他引數

  • ssl:如果給定且不為 false,則建立 SSL/TLS 傳輸(預設建立純 TCP 傳輸)。如果 sslssl.SSLContext 物件,則此上下文用於建立傳輸;如果 sslTrue,則使用從 ssl.create_default_context() 返回的預設上下文。

  • server_hostname 設定或覆蓋目標伺服器證書將與之匹配的主機名。僅當 ssl 不為 None 時才應傳遞。預設情況下使用 host 引數的值。如果 host 為空,則沒有預設值,並且您必須為 server_hostname 傳遞一個值。如果 server_hostname 是一個空字串,則停用主機名匹配(這是一個嚴重的安全風險,可能導致中間人攻擊)。

  • familyprotoflags 是可選的地址家族、協議和標誌,用於透過 getaddrinfo() 進行 host 解析。如果給定,這些都應該來自相應的 socket 模組常量的整數。

  • happy_eyeballs_delay,如果給定,則為此連線啟用 Happy Eyeballs。它應該是一個浮點數,表示等待連線嘗試完成的時間(以秒為單位),然後並行啟動下一次嘗試。這是 RFC 8305 中定義的“連線嘗試延遲”。RFC 推薦的合理預設值是 0.25(250 毫秒)。

  • interleave 控制當主機名解析為多個 IP 地址時地址的重新排序。如果為 0 或未指定,則不進行重新排序,並按 getaddrinfo() 返回的順序嘗試地址。如果指定正整數,則地址按地址族交錯,並且給定整數被解釋為 RFC 8305 中定義的“第一地址族計數”。如果未指定 happy_eyeballs_delay,則預設值為 0;如果指定,則預設值為 1

  • sock,如果給定,應該是一個現有的、已連線的 socket.socket 物件,用於傳輸。如果給定 sock,則不應指定 hostportfamilyprotoflagshappy_eyeballs_delayinterleavelocal_addr

    備註

    sock 引數將套接字的所有權轉移給建立的傳輸。要關閉套接字,請呼叫傳輸的 close() 方法。

  • local_addr,如果給定,是一個 (local_host, local_port) 元組,用於在本地繫結套接字。local_hostlocal_port 使用 getaddrinfo() 查詢,類似於 hostport

  • ssl_handshake_timeout 是(對於 TLS 連線)等待 TLS 握手完成的時間(以秒為單位),在此之前將中止連線。60.0 秒(如果為 None(預設))。

  • ssl_shutdown_timeout 是等待 SSL 關閉完成的時間(以秒為單位),在此之前將中止連線。30.0 秒(如果為 None(預設))。

  • all_errors 決定在無法建立連線時引發哪些異常。預設情況下,只引發一個 Exception:如果只有一個錯誤或所有錯誤訊息相同,則為第一個異常;或者,如果錯誤訊息合併,則為單個 OSError。當 all_errorsTrue 時,將引發一個包含所有異常(即使只有一個)的 ExceptionGroup

3.5 版本變更: ProactorEventLoop 中添加了對 SSL/TLS 的支援。

3.6 版本變更: 預設情況下,所有 TCP 連線都設定了套接字選項 socket.TCP_NODELAY

3.7 版本變更: 添加了 ssl_handshake_timeout 引數。

3.8 版本變更: 添加了 happy_eyeballs_delayinterleave 引數。

Happy Eyeballs 演算法:雙棧主機的成功。當伺服器的 IPv4 路徑和協議工作,但伺服器的 IPv6 路徑和協議不工作時,雙棧客戶端應用程式會經歷顯著的連線延遲,與僅 IPv4 客戶端相比。這是不可取的,因為它會導致雙棧客戶端的使用者體驗更差。本文件指定了減少這種使用者可見延遲的演算法要求,並提供了一種演算法。

更多資訊:https://datatracker.ietf.org/doc/html/rfc6555

3.11 版本變更: 添加了 ssl_shutdown_timeout 引數。

3.12 版本變更: 添加了 all_errors

參見

open_connection() 函式是一個高階替代 API。它返回一對 (StreamReader, StreamWriter),可以直接在 async/await 程式碼中使用。

async loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_port=None, allow_broadcast=None, sock=None)

建立一個數據報連線。

套接字家族可以是 AF_INETAF_INET6AF_UNIX,具體取決於 host(或 family 引數,如果提供)。

套接字型別將是 SOCK_DGRAM

protocol_factory 必須是一個可呼叫物件,返回一個 協議 實現。

成功時返回一個 (transport, protocol) 元組。

其他引數

  • local_addr,如果給定,是一個 (local_host, local_port) 元組,用於在本地繫結套接字。local_hostlocal_port 使用 getaddrinfo() 查詢。

    備註

    在 Windows 上,當使用帶有 local_addr=None 的 proactor 事件迴圈時,執行它將引發帶有 errno.WSAEINVALOSError

  • remote_addr,如果給定,是一個 (remote_host, remote_port) 元組,用於將套接字連線到遠端地址。remote_hostremote_port 使用 getaddrinfo() 查詢。

  • familyprotoflags 是可選的地址家族、協議和標誌,用於透過 getaddrinfo() 進行 host 解析。如果給定,這些都應該來自相應的 socket 模組常量的整數。

  • reuse_port 告訴核心允許此端點繫結到與其他現有端點相同的埠,只要它們在建立時都設定了此標誌。此選項在 Windows 和某些 Unixes 上不受支援。如果未定義 socket.SO_REUSEPORT 常量,則不支援此功能。

  • allow_broadcast 告訴核心允許此端點向廣播地址傳送訊息。

  • 可以可選地指定 sock 以使用一個預先存在的、已連線的 socket.socket 物件供傳輸使用。如果指定,則應省略 local_addrremote_addr(必須為 None)。

    備註

    sock 引數將套接字的所有權轉移給建立的傳輸。要關閉套接字,請呼叫傳輸的 close() 方法。

請參閱UDP echo 客戶端協議UDP echo 伺服器協議示例。

3.4.4 版本變更: 添加了 familyprotoflagsreuse_addressreuse_portallow_broadcastsock 引數。

3.8 版本變更: 添加了對 Windows 的支援。

3.8.1 版本變更: reuse_address 引數不再受支援,因為使用 socket.SO_REUSEADDR 對 UDP 構成嚴重的安全隱患。顯式傳遞 reuse_address=True 將引發異常。

當多個具有不同 UID 的程序使用 SO_REUSEADDR 將套接字分配給相同的 UDP 套接字地址時,傳入的資料包可能會隨機分佈在套接字之間。

對於受支援的平臺,reuse_port 可用作類似功能的替代。使用 reuse_port,將使用 socket.SO_REUSEPORT,它專門防止具有不同 UID 的程序將套接字分配給相同的套接字地址。

3.11 版本變更: reuse_address 引數,自 Python 3.8.1、3.7.6 和 3.6.10 以來已停用,現已完全移除。

async loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

建立 Unix 連線。

套接字家族將是 AF_UNIX;套接字型別將是 SOCK_STREAM

成功時返回一個 (transport, protocol) 元組。

path 是 Unix 域套接字的名稱,除非指定了 sock 引數,否則為必需。支援抽象 Unix 套接字、strbytesPath 路徑。

有關此方法引數的資訊,請參閱 loop.create_connection() 方法的文件。

可用性: Unix。

3.7 版本變更: 添加了 ssl_handshake_timeout 引數。path 引數現在可以是類路徑物件

3.11 版本變更: 添加了 ssl_shutdown_timeout 引數。

建立網路伺服器

async loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

建立一個 TCP 伺服器(套接字型別 SOCK_STREAM),監聽 host 地址的 port 埠。

返回 Server 物件。

引數

  • protocol_factory 必須是一個可呼叫物件,返回一個 協議 實現。

  • host 引數可以設定為多種型別,這些型別決定了伺服器將在何處監聽

    • 如果 host 是字串,TCP 伺服器將繫結到由 host 指定的單個網路介面。

    • 如果 host 是字串序列,TCP 伺服器將繫結到序列中指定的所有網路介面。

    • 如果 host 是空字串或 None,則假定所有介面,並將返回多個套接字的列表(很可能一個用於 IPv4,另一個用於 IPv6)。

  • port 引數可以設定為指定伺服器應監聽的埠。如果為 0None(預設值),將選擇一個隨機未使用的埠(請注意,如果 host 解析為多個網路介面,則每個介面將選擇不同的隨機埠)。

  • family 可以設定為 socket.AF_INETAF_INET6 以強制套接字使用 IPv4 或 IPv6。如果未設定,family 將從主機名確定(預設為 AF_UNSPEC)。

  • flagsgetaddrinfo() 的位掩碼。

  • 可以可選地指定 sock 以使用預先存在的套接字物件。如果指定,則不得指定 hostport

    備註

    sock 引數將套接字的所有權轉移給建立的伺服器。要關閉套接字,請呼叫伺服器的 close() 方法。

  • backlog 是傳遞給 listen() 的最大排隊連線數(預設為 100)。

  • ssl 可以設定為 SSLContext 例項,以在接受的連線上啟用 TLS。

  • reuse_address 告訴核心重用處於 TIME_WAIT 狀態的本地套接字,而無需等待其自然超時過期。如果未指定,則在 Unix 上會自動設定為 True

  • reuse_port 告訴核心允許此端點繫結到與其他現有端點相同的埠,只要它們在建立時都設定了此標誌。此選項在 Windows 上不受支援。

  • keep_alive 設定為 True 可透過啟用訊息的定期傳輸來保持連線處於活動狀態。

3.13 版本變更: 添加了 keep_alive 引數。

  • ssl_handshake_timeout 是(對於 TLS 伺服器)等待 TLS 握手完成的時間(以秒為單位),在此之前將中止連線。60.0 秒(如果為 None(預設))。

  • ssl_shutdown_timeout 是等待 SSL 關閉完成的時間(以秒為單位),在此之前將中止連線。30.0 秒(如果為 None(預設))。

  • start_serving 設定為 True(預設值)會導致建立的伺服器立即開始接受連線。當設定為 False 時,使用者應等待 Server.start_serving()Server.serve_forever(),以使伺服器開始接受連線。

3.5 版本變更: ProactorEventLoop 中添加了對 SSL/TLS 的支援。

3.5.1 版本變更: host 引數可以是一個字串序列。

3.6 版本變更: 添加了 ssl_handshake_timeoutstart_serving 引數。預設情況下,所有 TCP 連線都設定了套接字選項 socket.TCP_NODELAY

3.11 版本變更: 添加了 ssl_shutdown_timeout 引數。

參見

start_server() 函式是一個更高階的替代 API,它返回一對 StreamReaderStreamWriter,可以在 async/await 程式碼中使用。

async loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)

類似於 loop.create_server(),但適用於 AF_UNIX 套接字家族。

path 是 Unix 域套接字的名稱,除非提供了 sock 引數,否則為必需。支援抽象 Unix 套接字、strbytesPath 路徑。

如果 cleanup_socket 為真,則當伺服器關閉時,Unix 套接字將自動從檔案系統中移除,除非在伺服器建立後套接字已被替換。

有關此方法引數的資訊,請參閱 loop.create_server() 方法的文件。

可用性: Unix。

3.7 版本變更: 添加了 ssl_handshake_timeoutstart_serving 引數。path 引數現在可以是 Path 物件。

3.11 版本變更: 添加了 ssl_shutdown_timeout 引數。

3.13 版本變更: 添加了 cleanup_socket 引數。

async loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

將已接受的連線包裝成傳輸/協議對。

此方法可用於在 asyncio 外部接受連線但使用 asyncio 處理它們的伺服器。

引數

  • protocol_factory 必須是一個可呼叫物件,返回一個 協議 實現。

  • sock 是從 socket.accept 返回的現有套接字物件。

    備註

    sock 引數將套接字的所有權轉移給建立的傳輸。要關閉套接字,請呼叫傳輸的 close() 方法。

  • ssl 可以設定為 SSLContext 以在接受的連線上啟用 SSL。

  • ssl_handshake_timeout 是(對於 SSL 連線)等待 SSL 握手完成的時間(以秒為單位),在此之前將中止連線。60.0 秒(如果為 None(預設))。

  • ssl_shutdown_timeout 是等待 SSL 關閉完成的時間(以秒為單位),在此之前將中止連線。30.0 秒(如果為 None(預設))。

返回一個 (transport, protocol) 對。

3.5.3 版本中新增。

3.7 版本變更: 添加了 ssl_handshake_timeout 引數。

3.11 版本變更: 添加了 ssl_shutdown_timeout 引數。

傳輸檔案

async loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)

透過 transport 傳送 file。返回傳送的總位元組數。

如果可用,此方法使用高效能 os.sendfile()

file 必須是已二進位制模式開啟的常規檔案物件。

offset 指明從檔案何處開始讀取。如果指定了 count,則表示要傳輸的總位元組數,而不是傳送檔案直到 EOF。檔案位置始終會更新,即使此方法引發錯誤,並且可以使用 file.tell() 獲取實際傳送的位元組數。

fallback 設定為 True 會使 asyncio 在平臺不支援 sendfile 系統呼叫時(例如 Windows 或 Unix 上的 SSL 套接字)手動讀取和傳送檔案。

如果系統不支援 sendfile 系統呼叫且 fallbackFalse,則引發 SendfileNotAvailableError

在 3.7 版本加入。

TLS 升級

async loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

將現有的基於傳輸的連線升級為 TLS。

建立 TLS 編碼器/解碼器例項,並將其插入到 transportprotocol 之間。編碼器/解碼器實現面向 transport 的協議和麵向 protocol 的傳輸。

返回建立的雙介面例項。在 await 之後,protocol 必須停止使用原始 transport,並且只與返回的物件通訊,因為編碼器會快取 protocol 側的資料,並偶爾與 transport 交換額外的 TLS 會話包。

在某些情況下(例如,當傳入的傳輸已關閉時),這可能會返回 None

引數

  • 透過 create_server()create_connection() 等方法返回的 transportprotocol 例項。

  • sslcontext:一個已配置的 SSLContext 例項。

  • server_side:當伺服器端連線正在升級時(例如由 create_server() 建立的連線),傳遞 True

  • server_hostname:設定或覆蓋用於匹配目標伺服器證書的主機名。

  • ssl_handshake_timeout 是(對於 TLS 連線)等待 TLS 握手完成的時間(以秒為單位),在此之前將中止連線。60.0 秒(如果為 None(預設))。

  • ssl_shutdown_timeout 是等待 SSL 關閉完成的時間(以秒為單位),在此之前將中止連線。30.0 秒(如果為 None(預設))。

在 3.7 版本加入。

3.11 版本變更: 添加了 ssl_shutdown_timeout 引數。

監控檔案描述符

loop.add_reader(fd, callback, *args)

開始監控 fd 檔案描述符的讀可用性,一旦 fd 可用於讀取,就呼叫帶有指定引數的 callback

fd 註冊的任何現有回撥都將被取消並替換為 callback

loop.remove_reader(fd)

停止監控 fd 檔案描述符的讀可用性。如果 fd 之前正在被監控以進行讀取,則返回 True

loop.add_writer(fd, callback, *args)

開始監控 fd 檔案描述符的寫可用性,一旦 fd 可用於寫入,就呼叫帶有指定引數的 callback

fd 註冊的任何現有回撥都將被取消並替換為 callback

使用 functools.partial() 傳遞關鍵字引數callback

loop.remove_writer(fd)

停止監控 fd 檔案描述符的寫可用性。如果 fd 之前正在被監控以進行寫入,則返回 True

有關這些方法的一些限制,請參閱 平臺支援 部分。

直接使用套接字物件

通常,使用基於傳輸的 API(例如 loop.create_connection()loop.create_server())的協議實現比直接使用套接字實現的協議更快。然而,在某些用例中,效能不是關鍵,直接使用 socket 物件更方便。

async loop.sock_recv(sock, nbytes)

sock 接收最多 nbytessocket.recv() 的非同步版本。

將接收到的資料作為位元組物件返回。

sock 必須是非阻塞套接字。

在 3.7 版本中更改: 儘管此方法始終被記錄為協程方法,但在 Python 3.7 之前的版本返回一個 Future。從 Python 3.7 開始,它是一個 async def 方法。

async loop.sock_recv_into(sock, buf)

sock 接收資料到 buf 緩衝區。模仿阻塞的 socket.recv_into() 方法。

返回寫入緩衝區的位元組數。

sock 必須是非阻塞套接字。

在 3.7 版本加入。

async loop.sock_recvfrom(sock, bufsize)

sock 接收最大 bufsize 的資料報。 socket.recvfrom() 的非同步版本。

返回一個元組 (接收到的資料, 遠端地址)。

sock 必須是非阻塞套接字。

在 3.11 版本中新增。

async loop.sock_recvfrom_into(sock, buf, nbytes=0)

sock 接收最多 nbytes 的資料報到 buf 中。socket.recvfrom_into() 的非同步版本。

返回一個元組 (接收到的位元組數, 遠端地址)。

sock 必須是非阻塞套接字。

在 3.11 版本中新增。

async loop.sock_sendall(sock, data)

data 傳送至 sock 套接字。socket.sendall() 的非同步版本。

此方法會持續向套接字傳送資料,直到 data 中的所有資料都已傳送或發生錯誤。None 在成功時返回。發生錯誤時,會引發異常。此外,無法確定連線的接收端成功處理了多少資料(如果有的話)。

sock 必須是非阻塞套接字。

在 3.7 版本中更改: 儘管該方法一直被記錄為協程方法,但在 Python 3.7 之前它返回一個 Future。從 Python 3.7 開始,這是一個 async def 方法。

async loop.sock_sendto(sock, data, address)

將資料報從 sock 傳送至 addresssocket.sendto() 的非同步版本。

返回傳送的位元組數。

sock 必須是非阻塞套接字。

在 3.11 版本中新增。

async loop.sock_connect(sock, address)

sock 連線到 address 處的遠端套接字。

socket.connect() 的非同步版本。

sock 必須是非阻塞套接字。

在 3.5.2 版本中更改: address 不再需要解析。sock_connect 將嘗試透過呼叫 socket.inet_pton() 來檢查 address 是否已解析。如果未解析,將使用 loop.getaddrinfo() 來解析 address

async loop.sock_accept(sock)

接受連線。模仿阻塞的 socket.accept() 方法。

套接字必須繫結到一個地址並監聽連線。返回值是一個 (conn, address) 對,其中 conn 是一個可用於在該連線上傳送和接收資料的 套接字物件,而 address 是連線另一端繫結到套接字的地址。

sock 必須是非阻塞套接字。

在 3.7 版本中更改: 儘管該方法一直被記錄為協程方法,但在 Python 3.7 之前它返回一個 Future。從 Python 3.7 開始,這是一個 async def 方法。

async loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)

如果可能,使用高效能 os.sendfile 傳送檔案。返回傳送的總位元組數。

socket.sendfile() 的非同步版本。

sock 必須是一個非阻塞的 socket.SOCK_STREAM socket

file 必須是以二進位制模式開啟的常規檔案物件。

offset 指明從檔案何處開始讀取。如果指定了 count,則表示要傳輸的總位元組數,而不是傳送檔案直到 EOF。檔案位置始終會更新,即使此方法引發錯誤,並且可以使用 file.tell() 獲取實際傳送的位元組數。

fallback 設定為 True 時,如果平臺不支援 sendfile 系統呼叫(例如 Windows 或 Unix 上的 SSL 套接字),asyncio 會手動讀取和傳送檔案。

如果系統不支援 sendfile 系統呼叫且 fallbackFalse,則引發 SendfileNotAvailableError

sock 必須是非阻塞套接字。

在 3.7 版本加入。

DNS

async loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)

socket.getaddrinfo() 的非同步版本。

async loop.getnameinfo(sockaddr, flags=0)

socket.getnameinfo() 的非同步版本。

備註

getaddrinfogetnameinfo 都透過迴圈的預設執行緒池執行器在內部使用其同步版本。當此執行器飽和時,這些方法可能會遇到延遲,這可能會被更高階的網路庫報告為超時增加。為緩解此問題,請考慮為其他使用者任務使用自定義執行器,或設定具有更多 worker 的預設執行器。

在 3.7 版本中更改: getaddrinfogetnameinfo 方法一直被記錄為返回一個協程,但在 Python 3.7 之前,它們實際上返回 asyncio.Future 物件。從 Python 3.7 開始,這兩個方法都是協程。

使用管道

async loop.connect_read_pipe(protocol_factory, pipe)

在事件迴圈中註冊 pipe 的讀端。

protocol_factory 必須是一個可呼叫物件,返回一個 asyncio 協議 實現。

pipe 是一個 檔案類物件

返回對 (transport, protocol),其中 transport 支援 ReadTransport 介面,protocol 是由 protocol_factory 例項化的物件。

對於 SelectorEventLoop 事件迴圈,pipe 被設定為非阻塞模式。

async loop.connect_write_pipe(protocol_factory, pipe)

在事件迴圈中註冊 pipe 的寫端。

protocol_factory 必須是一個可呼叫物件,返回一個 asyncio 協議 實現。

pipe檔案類物件

返回對 (transport, protocol),其中 transport 支援 WriteTransport 介面,protocol 是由 protocol_factory 例項化的物件。

對於 SelectorEventLoop 事件迴圈,pipe 被設定為非阻塞模式。

備註

SelectorEventLoop 不支援 Windows 上的上述方法。在 Windows 上請改用 ProactorEventLoop

Unix 訊號

loop.add_signal_handler(signum, callback, *args)

設定 callback 作為 signum 訊號的處理程式。

回撥將由 loop 呼叫,連同該事件迴圈的其他排隊回撥和可執行協程。與使用 signal.signal() 註冊的訊號處理程式不同,使用此函式註冊的回撥允許與事件迴圈互動。

如果訊號編號無效或無法捕獲,則引發 ValueError。如果設定處理程式時出現問題,則引發 RuntimeError

使用 functools.partial() 傳遞關鍵字引數callback

signal.signal() 一樣,此函式必須在主執行緒中呼叫。

loop.remove_signal_handler(sig)

移除 sig 訊號的處理程式。

如果訊號處理程式被移除,則返回 True;如果未為給定訊號設定處理程式,則返回 False

可用性: Unix。

參見

signal 模組。

線上程或程序池中執行程式碼

awaitable loop.run_in_executor(executor, func, *args)

安排在指定的執行器中呼叫 func

executor 引數應為一個 concurrent.futures.Executor 例項。如果 executorNone,則使用預設執行器。預設執行器可以透過 loop.set_default_executor() 設定,否則,如果需要,run_in_executor() 會延遲初始化並使用 concurrent.futures.ThreadPoolExecutor

示例

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

    # 4. Run in a custom interpreter pool:
    with concurrent.futures.InterpreterPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom interpreter pool', result)

if __name__ == '__main__':
    asyncio.run(main())

請注意,由於 multiprocessing 的特殊性(由 ProcessPoolExecutor 使用),選項 3 需要入口點保護 (if __name__ == '__main__')。請參閱 安全匯入主模組

此方法返回一個 asyncio.Future 物件。

使用 functools.partial() 傳遞關鍵字引數func

在 3.5.3 版本中更改: loop.run_in_executor() 不再配置其建立的執行緒池執行器的 max_workers,而是將其留給執行緒池執行器 (ThreadPoolExecutor) 來設定預設值。

loop.set_default_executor(executor)

executor 設定為 run_in_executor() 使用的預設執行器。 executor 必須是 ThreadPoolExecutor 的例項,其中包括 InterpreterPoolExecutor

在 3.11 版本中更改: executor 必須是 ThreadPoolExecutor 的例項。

錯誤處理 API

允許自定義事件迴圈中異常的處理方式。

loop.set_exception_handler(handler)

handler 設定為新的事件迴圈異常處理程式。

如果 handlerNone,則將設定預設異常處理程式。否則,handler 必須是一個可呼叫物件,其簽名匹配 (loop, context),其中 loop 是對活動事件迴圈的引用,context 是一個包含異常詳細資訊的 dict 物件(有關上下文的詳細資訊,請參閱 call_exception_handler() 文件)。

如果處理程式是代表 TaskHandle 呼叫的,它將在該任務或回撥控制代碼的 contextvars.Context 中執行。

在 3.12 版本中更改: 處理程式可以在異常源自的任務或控制代碼的 Context 中呼叫。

loop.get_exception_handler()

返回當前的異常處理程式,如果沒有設定自定義異常處理程式,則返回 None

3.5.2 版本新增。

loop.default_exception_handler(context)

預設異常處理程式。

當發生異常且未設定異常處理程式時,會呼叫此方法。自定義異常處理程式可以呼叫此方法,以推遲到預設處理程式行為。

context 引數與 call_exception_handler() 中的含義相同。

loop.call_exception_handler(context)

呼叫當前事件迴圈異常處理程式。

context 是一個 dict 物件,包含以下鍵(未來 Python 版本可能會引入新鍵)

  • 'message': 錯誤訊息;

  • 'exception' (可選): 異常物件;

  • 'future' (可選): asyncio.Future 例項;

  • 'task' (可選): asyncio.Task 例項;

  • 'handle' (可選): asyncio.Handle 例項;

  • 'protocol' (可選): 協議 例項;

  • 'transport' (可選): 傳輸 例項;

  • 'socket' (可選): socket.socket 例項;

  • 'source_traceback' (可選): 原始碼的追溯資訊;

  • 'handle_traceback' (可選): 控制代碼的追溯資訊;

  • 'asyncgen' (可選): 導致異常的非同步生成器。

    異常。

備註

此方法不應在子類化的事件迴圈中過載。對於自定義異常處理,請使用 set_exception_handler() 方法。

啟用除錯模式

loop.get_debug()

獲取事件迴圈的除錯模式 (bool)。

如果環境變數 PYTHONASYNCIODEBUG 設定為非空字串,則預設值為 True,否則為 False

loop.set_debug(enabled: bool)

設定事件迴圈的除錯模式。

在 3.7 版本中更改: 新的 Python 開發模式 現在也可以用於啟用除錯模式。

loop.slow_callback_duration

此屬性可用於設定被視為“慢”的最小執行持續時間(以秒為單位)。當啟用除錯模式時,將記錄“慢”回撥。

預設值為 100 毫秒。

執行子程序

本小節中描述的方法是低階的。在常規的 async/await 程式碼中,請考慮改用高階的 asyncio.create_subprocess_shell()asyncio.create_subprocess_exec() 便利函式。

備註

在 Windows 上,預設事件迴圈 ProactorEventLoop 支援子程序,而 SelectorEventLoop 不支援。詳情請參閱 Windows 上的子程序支援

async loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

args 指定的一個或多個字串引數建立子程序。

args 必須是一個由

第一個字串指定程式可執行檔案,其餘字串指定引數。字串引數共同構成程式的 argv

這類似於標準庫 subprocess.Popen 類在呼叫時將 shell=False 且字串列表作為第一個引數傳入;然而,Popen 接受一個字串列表作為單個引數,而 subprocess_exec 接受多個字串引數。

protocol_factory 必須是一個可呼叫物件,返回 asyncio.SubprocessProtocol 類的子類。

其他引數

  • stdin 可以是以下任意一種

    • 檔案類物件

    • 現有檔案描述符(正整數),例如使用 os.pipe() 建立的那些

    • subprocess.PIPE 常量(預設值),它將建立一個新管道並連線它,

    • None,它將使子程序繼承此程序的檔案描述符

    • subprocess.DEVNULL 常量,表示將使用特殊的 os.devnull 檔案

  • stdout 可以是以下任意一種

    • 檔案類物件

    • subprocess.PIPE 常量(預設值),它將建立一個新管道並連線它,

    • None,它將使子程序繼承此程序的檔案描述符

    • subprocess.DEVNULL 常量,表示將使用特殊的 os.devnull 檔案

  • stderr 可以是以下任意一種

    • 檔案類物件

    • subprocess.PIPE 常量(預設值),它將建立一個新管道並連線它,

    • None,它將使子程序繼承此程序的檔案描述符

    • subprocess.DEVNULL 常量,表示將使用特殊的 os.devnull 檔案

    • subprocess.STDOUT 常量,它將標準錯誤流連線到程序的標準輸出流

  • 所有其他關鍵字引數都未經解釋地傳遞給 subprocess.Popen,除了 bufsizeuniversal_newlinesshelltextencodingerrors,這些引數根本不應指定。

    asyncio 子程序 API 不支援將流解碼為文字。可以使用 bytes.decode() 將從流返回的位元組轉換為文字。

如果作為 stdinstdoutstderr 傳遞的檔案類物件表示管道,則此管道的另一端應使用 connect_write_pipe()connect_read_pipe() 註冊以用於事件迴圈。

有關其他引數的文件,請參閱 subprocess.Popen 類的建構函式。

返回一對 (transport, protocol),其中 transport 符合 asyncio.SubprocessTransport 基類,protocol 是由 protocol_factory 例項化的物件。

async loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

cmd 建立一個子程序,cmd 可以是 strbytes 字串,編碼為 檔案系統編碼,使用平臺的“shell”語法。

這類似於標準庫 subprocess.Popen 類在呼叫時將 shell=True

protocol_factory 必須是一個可呼叫物件,返回 SubprocessProtocol 類的子類。

有關其餘引數的更多詳細資訊,請參閱 subprocess_exec()

返回一對 (transport, protocol),其中 transport 符合 SubprocessTransport 基類,protocol 是由 protocol_factory 例項化的物件。

備註

應用程式有責任確保所有空格和特殊字元都經過適當引用,以避免 shell 注入漏洞。可以使用 shlex.quote() 函式正確轉義用於構造 shell 命令的字串中的空格和特殊字元。

回撥控制代碼

class asyncio.Handle

loop.call_soon()loop.call_soon_threadsafe() 返回的回撥包裝器物件。

get_context()

返回與控制代碼關聯的 contextvars.Context 物件。

3.12 新版功能.

cancel()

取消回撥。如果回撥已取消或已執行,此方法無效。

cancelled()

如果回撥已取消,則返回 True

在 3.7 版本加入。

class asyncio.TimerHandle

loop.call_later()loop.call_at() 返回的回撥包裝器物件。

此類是 Handle 的子類。

when()

返回以 float 秒錶示的計劃回撥時間。

該時間是一個絕對時間戳,使用與 loop.time() 相同的時間參考。

在 3.7 版本加入。

伺服器物件

伺服器物件由 loop.create_server()loop.create_unix_server()start_server()start_unix_server() 函式建立。

請勿直接例項化 Server 類。

class asyncio.Server

Server 物件是非同步上下文管理器。當在 async with 語句中使用時,可以保證在 async with 語句完成後,Server 物件已關閉並且不接受新連線。

srv = await loop.create_server(...)

async with srv:
    # some code

# At this point, srv is closed and no longer accepts new connections.

在 3.7 版本中更改: 從 Python 3.7 開始,Server 物件是非同步上下文管理器。

在 3.11 版本中更改: 此類別在 Python 3.9.11、3.10.3 和 3.11 中作為 asyncio.Server 公開。

close()

停止服務:關閉監聽套接字並將 sockets 屬性設定為 None

代表現有傳入客戶端連線的套接字保持開啟狀態。

伺服器非同步關閉;使用 wait_closed() 協程等待伺服器關閉(且沒有更多活動連線)。

close_clients()

關閉所有現有的傳入客戶端連線。

對所有關聯的傳輸呼叫 close()

在關閉伺服器時,應在 close_clients() 之前呼叫 close(),以避免與新客戶端連線發生競態。

在 3.13 版本加入。

abort_clients()

立即關閉所有現有傳入客戶端連線,而不等待未決操作完成。

對所有關聯的傳輸呼叫 abort()

在關閉伺服器時,應在 abort_clients() 之前呼叫 close(),以避免與新客戶端連線發生競態。

在 3.13 版本加入。

get_loop()

返回與伺服器物件關聯的事件迴圈。

在 3.7 版本加入。

async start_serving()

開始接受連線。

此方法是冪等的,因此可以在伺服器已執行時呼叫。

loop.create_server()asyncio.start_server()start_serving 僅關鍵字引數允許建立最初不接受連線的 Server 物件。在這種情況下,可以使用 Server.start_serving()Server.serve_forever() 使 Server 開始接受連線。

在 3.7 版本加入。

async serve_forever()

開始接受連線,直到協程被取消。serve_forever 任務的取消會導致伺服器關閉。

如果伺服器已接受連線,則可以呼叫此方法。每個 Server 物件只能存在一個 serve_forever 任務。

示例

async def client_connected(reader, writer):
    # Communicate with the client with
    # reader/writer streams.  For example:
    await reader.readline()

async def main(host, port):
    srv = await asyncio.start_server(
        client_connected, host, port)
    await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))

在 3.7 版本加入。

is_serving()

如果伺服器正在接受新連線,則返回 True

在 3.7 版本加入。

async wait_closed()

等待 close() 方法完成且所有活動連線結束。

sockets

伺服器正在監聽的類套接字物件列表,asyncio.trsock.TransportSocket

在 3.7 版本中更改: 在 Python 3.7 之前,Server.sockets 直接返回伺服器套接字的內部列表。在 3.7 中,返回該列表的副本。

事件迴圈實現

asyncio 提供了兩種不同的事件迴圈實現:SelectorEventLoopProactorEventLoop

預設情況下,asyncio 配置為使用 EventLoop

class asyncio.SelectorEventLoop

一個基於 selectors 模組的 AbstractEventLoop 子類。

使用給定平臺可用的最有效的 選擇器。也可以手動配置要使用的確切選擇器實現。

import asyncio
import selectors

async def main():
   ...

loop_factory = lambda: asyncio.SelectorEventLoop(selectors.SelectSelector())
asyncio.run(main(), loop_factory=loop_factory)

可用性: Unix, Windows。

class asyncio.ProactorEventLoop

一個針對 Windows 的 AbstractEventLoop 子類,它使用“I/O 完成埠”(IOCP)。

可用性: Windows。

class asyncio.EventLoop

給定平臺可用的最有效的 AbstractEventLoop 子類的別名。

在 Unix 上,它是 SelectorEventLoop 的別名;在 Windows 上,它是 ProactorEventLoop 的別名。

在 3.13 版本加入。

class asyncio.AbstractEventLoop

asyncio 相容事件迴圈的抽象基類。

事件迴圈方法 部分列出了 AbstractEventLoop 的替代實現應該定義的所有方法。

示例

請注意,本節中的所有示例都有意展示瞭如何使用低階事件迴圈 API,例如 loop.run_forever()loop.call_soon()。現代 asyncio 應用程式很少需要以這種方式編寫;考慮使用高階函式,例如 asyncio.run()

使用 call_soon() 的 Hello World

一個使用 loop.call_soon() 方法安排回撥的示例。回撥顯示 "Hello World",然後停止事件迴圈。

import asyncio

def hello_world(loop):
    """A callback to print 'Hello World' and stop the event loop"""
    print('Hello World')
    loop.stop()

loop = asyncio.new_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

參見

一個類似的 Hello World 示例,透過協程和 run() 函式建立。

使用 call_later() 顯示當前日期

一個每秒顯示當前日期的回撥示例。該回調使用 loop.call_later() 方法在 5 秒後重新安排自身,然後停止事件迴圈。

import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.new_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

參見

一個類似的 當前日期 示例,透過協程和 run() 函式建立。

監視檔案描述符的讀取事件

使用 loop.add_reader() 方法等待檔案描述符接收到資料,然後關閉事件迴圈。

import asyncio
from socket import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()

loop = asyncio.new_event_loop()

def reader():
    data = rsock.recv(100)
    print("Received:", data.decode())

    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)

    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

try:
    # Run the event loop
    loop.run_forever()
finally:
    # We are done. Close sockets and the event loop.
    rsock.close()
    wsock.close()
    loop.close()

參見

為 SIGINT 和 SIGTERM 設定訊號處理程式

(此 signals 示例僅在 Unix 上有效。)

使用 loop.add_signal_handler() 方法註冊訊號 SIGINTSIGTERM 的處理程式。

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())