傳輸和協議

前言

傳輸和協議由底層事件迴圈 API 使用,例如 loop.create_connection()。它們使用基於回撥的程式設計風格,並能夠實現高效能的網路或 IPC 協議(例如 HTTP)。

本質上,傳輸和協議應該只在庫和框架中使用,而永遠不應在高階 asyncio 應用程式中使用。

此文件頁面涵蓋了傳輸協議

簡介

在最高級別,傳輸關注於位元組如何傳輸,而協議確定傳輸哪些位元組(以及在某種程度上何時傳輸)。

換句話說:傳輸是套接字(或類似的 I/O 端點)的抽象,而協議是從傳輸的角度來看應用程式的抽象。

另一種觀點是,傳輸和協議介面共同定義了使用網路 I/O 和程序間 I/O 的抽象介面。

傳輸物件和協議物件之間始終存在 1:1 的關係:協議呼叫傳輸方法來發送資料,而傳輸呼叫協議方法來傳遞已接收的資料。

大多數面向連線的事件迴圈方法(例如 loop.create_connection())通常接受一個 *protocol_factory* 引數,用於為接受的連線建立一個 *Protocol* 物件,該連線由 *Transport* 物件表示。此類方法通常返回一個 (transport, protocol) 元組。

內容

此文件頁面包含以下部分

傳輸

原始碼: Lib/asyncio/transports.py


傳輸是 asyncio 提供的類,用於抽象各種型別的通訊通道。

傳輸物件始終由 asyncio 事件迴圈 例項化。

asyncio 實現了 TCP、UDP、SSL 和子程序管道的傳輸。傳輸上可用的方法取決於傳輸的型別。

傳輸類不是執行緒安全的

傳輸層次結構

class asyncio.BaseTransport

所有傳輸的基類。包含所有 asyncio 傳輸共享的方法。

class asyncio.WriteTransport(BaseTransport)

用於只寫連線的基本傳輸。

WriteTransport 類的例項是從 loop.connect_write_pipe() 事件迴圈方法返回的,並且也被子程序相關的方法(如 loop.subprocess_exec())使用。

class asyncio.ReadTransport(BaseTransport)

用於只讀連線的基本傳輸。

ReadTransport 類的例項是從 loop.connect_read_pipe() 事件迴圈方法返回的,並且也被子程序相關的方法(如 loop.subprocess_exec())使用。

class asyncio.Transport(WriteTransport, ReadTransport)

表示雙向傳輸的介面,例如 TCP 連線。

使用者不直接例項化傳輸;他們呼叫一個實用程式函式,向其傳遞一個協議工廠和其他建立傳輸和協議所需的資訊。

Transport 類的例項是從或被事件迴圈方法(如 loop.create_connection()loop.create_unix_connection()loop.create_server()loop.sendfile() 等)返回或使用。

class asyncio.DatagramTransport(BaseTransport)

用於資料報 (UDP) 連線的傳輸。

DatagramTransport 類的例項是從 loop.create_datagram_endpoint() 事件迴圈方法返回的。

class asyncio.SubprocessTransport(BaseTransport)

表示父程序與其子作業系統程序之間連線的抽象。

SubprocessTransport 類的例項透過事件迴圈方法 loop.subprocess_shell()loop.subprocess_exec() 返回。

基礎傳輸

BaseTransport.close()

關閉傳輸。

如果傳輸具有用於傳出資料的緩衝區,則緩衝資料將非同步重新整理。不會再接收到任何資料。在重新整理所有緩衝資料後,將呼叫協議的 protocol.connection_lost() 方法,並使用 None 作為其引數。傳輸一旦關閉就不應再使用。

BaseTransport.is_closing()

如果傳輸正在關閉或已關閉,則返回 True

BaseTransport.get_extra_info(name, default=None)

返回有關傳輸或其使用的底層資源的資訊。

name 是一個字串,表示要獲取的特定於傳輸的資訊。

如果資訊不可用,或者如果傳輸不支援使用給定的第三方事件迴圈實現或在當前平臺上查詢資訊,則 default 是要返回的值。

例如,以下程式碼嘗試獲取傳輸的底層套接字物件

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

可以在某些傳輸上查詢的資訊類別

BaseTransport.set_protocol(protocol)

設定一個新協議。

僅當兩個協議都明確支援切換時,才應切換協議。

BaseTransport.get_protocol()

返回當前協議。

只讀傳輸

ReadTransport.is_reading()

如果傳輸正在接收新資料,則返回 True

3.7 版本新增。

ReadTransport.pause_reading()

暫停傳輸的接收端。在呼叫 resume_reading() 之前,不會將任何資料傳遞給協議的 protocol.data_received() 方法。

在 3.7 版本中更改: 此方法是冪等的,即當傳輸已暫停或關閉時也可以呼叫。

ReadTransport.resume_reading()

恢復接收端。如果有一些資料可供讀取,則將再次呼叫協議的 protocol.data_received() 方法。

在 3.7 版本中更改: 此方法是冪等的,即當傳輸已經在讀取時也可以呼叫。

只寫傳輸

WriteTransport.abort()

立即關閉傳輸,而無需等待掛起的操作完成。緩衝資料將丟失。不會再接收到任何資料。協議的 protocol.connection_lost() 方法最終將使用 None 作為其引數呼叫。

WriteTransport.can_write_eof()

如果傳輸支援 write_eof(),則返回 True,否則返回 False

WriteTransport.get_write_buffer_size()

返回傳輸使用的輸出緩衝區的當前大小。

WriteTransport.get_write_buffer_limits()

獲取用於寫入流量控制的 *高水位線* 和 *低水位線*。返回一個元組 (low, high),其中 *low* 和 *high* 是正位元組數。

使用 set_write_buffer_limits() 設定限制。

3.4.2 版本新增。

WriteTransport.set_write_buffer_limits(high=None, low=None)

設定用於寫入流量控制的 *高水位線* 和 *低水位線*。

這兩個值(以位元組數為單位)控制何時呼叫協議的 protocol.pause_writing()protocol.resume_writing() 方法。如果指定了,則低水位線必須小於或等於高水位線。highlow 都不能為負數。

當緩衝區大小大於或等於 high 值時,會呼叫 pause_writing()。如果寫入已暫停,則當緩衝區大小小於或等於 low 值時,會呼叫 resume_writing()

預設值是特定於實現的。如果只給出高水位線,則低水位線預設為小於或等於高水位線的特定於實現的值。將 high 設定為零也會強制 low 為零,並導致只要緩衝區變為非空就呼叫 pause_writing()。將 low 設定為零會導致只有當緩衝區為空時才呼叫 resume_writing()。將任一限制設定為零通常不是最佳的,因為它減少了同時進行 I/O 和計算的機會。

使用 get_write_buffer_limits() 獲取限制。

WriteTransport.write(data)

將一些 data 位元組寫入傳輸。

此方法不會阻塞;它會緩衝資料並安排非同步傳送。

WriteTransport.writelines(list_of_data)

將資料位元組列表(或任何可迭代物件)寫入傳輸。這在功能上等同於對可迭代物件產生的每個元素呼叫 write(),但可能會更有效地實現。

WriteTransport.write_eof()

在重新整理所有緩衝資料後,關閉傳輸的寫入端。仍然可以接收資料。

如果傳輸(例如 SSL)不支援半關閉連線,則此方法可能會引發 NotImplementedError

資料報傳輸

DatagramTransport.sendto(data, addr=None)

data 位元組傳送到由 addr 給出的遠端對等方(傳輸相關的目標地址)。如果 addrNone,則資料將傳送到在傳輸建立時給出的目標地址。

此方法不會阻塞;它會緩衝資料並安排非同步傳送。

3.13 版本更改: 可以呼叫此方法並傳遞一個空的位元組物件來發送零長度的資料報。用於流控制的緩衝區大小計算也已更新,以考慮資料報頭。

DatagramTransport.abort()

立即關閉傳輸,而無需等待掛起的操作完成。緩衝資料將丟失。不會再接收到任何資料。協議的 protocol.connection_lost() 方法最終將使用 None 作為其引數呼叫。

子程序傳輸

SubprocessTransport.get_pid()

以整數形式返回子程序的程序 ID。

SubprocessTransport.get_pipe_transport(fd)

返回與整數檔案描述符 fd 對應的通訊管道的傳輸。

  • 0:標準輸入 (stdin) 的可讀流式傳輸,如果子程序不是使用 stdin=PIPE 建立的,則為 None

  • 1:標準輸出 (stdout) 的可寫流式傳輸,如果子程序不是使用 stdout=PIPE 建立的,則為 None

  • 2:標準錯誤 (stderr) 的可寫流式傳輸,如果子程序不是使用 stderr=PIPE 建立的,則為 None

  • 其他 fdNone

SubprocessTransport.get_returncode()

以整數形式返回子程序的返回碼,如果它尚未返回,則返回 None,這類似於 subprocess.Popen.returncode 屬性。

SubprocessTransport.kill()

殺死子程序。

在 POSIX 系統上,該函式會向子程序傳送 SIGKILL 訊號。在 Windows 上,此方法是 terminate() 的別名。

另請參閱 subprocess.Popen.kill()

SubprocessTransport.send_signal(signal)

subprocess.Popen.send_signal() 中所示,將 signal 編號傳送到子程序。

SubprocessTransport.terminate()

停止子程序。

在 POSIX 系統上,此方法會向子程序傳送 SIGTERM 訊號。在 Windows 上,會呼叫 Windows API 函式 TerminateProcess() 來停止子程序。

另請參閱 subprocess.Popen.terminate()

SubprocessTransport.close()

透過呼叫 kill() 方法殺死子程序。

如果子程序尚未返回,則關閉 stdinstdoutstderr 管道的傳輸。

協議

原始碼: Lib/asyncio/protocols.py


asyncio 提供了一組抽象基類,應該使用這些基類來實現網路協議。這些類旨在與 傳輸 一起使用。

抽象基協議類的子類可以實現某些或所有方法。所有這些方法都是回撥:它們由傳輸在某些事件上呼叫,例如當收到一些資料時。基礎協議方法應由相應的傳輸呼叫。

基礎協議

class asyncio.BaseProtocol

具有所有協議共享的方法的基礎協議。

class asyncio.Protocol(BaseProtocol)

用於實現流協議(TCP、Unix 套接字等)的基類。

class asyncio.BufferedProtocol(BaseProtocol)

用於實現具有手動控制接收緩衝區的流協議的基類。

class asyncio.DatagramProtocol(BaseProtocol)

用於實現資料報 (UDP) 協議的基類。

class asyncio.SubprocessProtocol(BaseProtocol)

用於實現與子程序(單向管道)通訊的協議的基類。

基礎協議

所有 asyncio 協議都可以實現基礎協議回撥。

連接回調

連接回調會在所有協議上呼叫,每次成功連線只調用一次。所有其他協議回撥只能在這兩個方法之間呼叫。

BaseProtocol.connection_made(transport)

當建立連線時呼叫。

transport 引數是表示連線的傳輸物件。協議負責儲存對其傳輸的引用。

BaseProtocol.connection_lost(exc)

當連線丟失或關閉時呼叫。

引數是一個異常物件或 None。後者表示收到常規的 EOF,或者連線被此連線方中止或關閉。

流量控制回撥

傳輸層可以呼叫流量控制回撥來暫停或恢復協議執行的寫入操作。

有關更多詳細資訊,請參閱 set_write_buffer_limits() 方法的文件。

BaseProtocol.pause_writing()

當傳輸的緩衝區超過高水位線時呼叫。

BaseProtocol.resume_writing()

當傳輸的緩衝區降到低水位線以下時呼叫。

如果緩衝區大小等於高水位線,則不會呼叫 pause_writing():緩衝區大小必須嚴格超過高水位線。

相反,當緩衝區大小等於或低於低水位線時,會呼叫 resume_writing()。這些結束條件對於確保當任何一個標記為零時按預期執行非常重要。

流式協議

事件方法,例如 loop.create_server()loop.create_unix_server()loop.create_connection()loop.create_unix_connection()loop.connect_accepted_socket()loop.connect_read_pipe()loop.connect_write_pipe() 接受返回流式協議的工廠方法。

Protocol.data_received(data)

當收到一些資料時呼叫。data 是一個非空的 bytes 物件,其中包含傳入的資料。

資料是緩衝的、分塊的還是重新組裝的取決於傳輸。一般來說,你不應該依賴於特定的語義,而是使你的解析是通用的和靈活的。但是,始終按正確的順序接收資料。

當連線開啟時,該方法可以被呼叫任意次數。

但是,protocol.eof_received() 最多被呼叫一次。一旦呼叫 eof_received(),則不會再呼叫 data_received()

Protocol.eof_received()

當另一端發出訊號表明它不會發送更多資料時呼叫(例如,如果另一端也使用 asyncio,則透過呼叫 transport.write_eof())。

此方法可能會返回假值(包括 None),在這種情況下,傳輸將自行關閉。相反,如果此方法返回真值,則使用的協議將確定是否關閉傳輸。由於預設實現返回 None,因此它隱式地關閉連線。

一些傳輸協議(包括 SSL)不支援半關閉連線,在這種情況下,從此方法返回真值將導致連線關閉。

狀態機

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

緩衝流式協議

3.7 版本新增。

緩衝協議可以與任何支援 流式協議 的事件迴圈方法一起使用。

BufferedProtocol 的實現允許顯式手動分配和控制接收緩衝區。然後,事件迴圈可以使用協議提供的緩衝區來避免不必要的資料複製。這可以為接收大量資料的協議帶來顯著的效能提升。複雜的協議實現可以顯著減少緩衝區分配的數量。

以下回調在 BufferedProtocol 例項上呼叫

BufferedProtocol.get_buffer(sizehint)

呼叫以分配新的接收緩衝區。

sizehint 是返回緩衝區建議的最小大小。返回比 sizehint 建議的更小或更大的緩衝區是可以接受的。當設定為 -1 時,緩衝區大小可以是任意的。返回大小為零的緩衝區是錯誤的。

get_buffer() 必須返回一個實現 緩衝區協議 的物件。

BufferedProtocol.buffer_updated(nbytes)

當緩衝區用接收到的資料更新時呼叫。

nbytes 是寫入緩衝區的總位元組數。

BufferedProtocol.eof_received()

請參閱 protocol.eof_received() 方法的文件。

get_buffer() 可以在連線期間被呼叫任意次數。但是,protocol.eof_received() 最多被呼叫一次,如果呼叫,get_buffer()buffer_updated() 在其之後將不會再被呼叫。

狀態機

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

資料報協議

資料報協議例項應該由傳遞給 loop.create_datagram_endpoint() 方法的協議工廠構造。

DatagramProtocol.datagram_received(data, addr)

當收到資料報時呼叫。data 是一個包含傳入資料的 bytes 物件。addr 是傳送資料的對等方的地址;具體格式取決於傳輸方式。

DatagramProtocol.error_received(exc)

當先前的傳送或接收操作引發 OSError 時呼叫。excOSError 例項。

當傳輸(例如 UDP)檢測到資料報無法傳遞給其接收者時,會在極少數情況下呼叫此方法。但在許多情況下,無法傳遞的資料報將被靜默丟棄。

注意

在 BSD 系統(macOS、FreeBSD 等)上,資料報協議不支援流量控制,因為沒有可靠的方法來檢測因寫入過多資料包而導致的傳送失敗。

套接字始終顯示為“就緒”,多餘的資料包將被丟棄。 可能會或可能不會引發將 errno 設定為 errno.ENOBUFSOSError;如果引發,它將報告給 DatagramProtocol.error_received(),否則將被忽略。

子程序協議

子程序協議例項應該由傳遞給 loop.subprocess_exec()loop.subprocess_shell() 方法的協議工廠構造。

SubprocessProtocol.pipe_data_received(fd, data)

當子程序將資料寫入其 stdout 或 stderr 管道時呼叫。

fd 是管道的整數檔案描述符。

data 是一個包含接收資料的非空 bytes 物件。

SubprocessProtocol.pipe_connection_lost(fd, exc)

當與子程序通訊的管道之一關閉時呼叫。

fd 是已關閉的整數檔案描述符。

SubprocessProtocol.process_exited()

當子程序退出時呼叫。

它可以在 pipe_data_received()pipe_connection_lost() 方法之前呼叫。

示例

TCP 回聲伺服器

使用 loop.create_server() 方法建立一個 TCP 回聲伺服器,將接收到的資料傳送回去,並關閉連線

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        EchoServerProtocol,
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

另請參閱

使用流的 TCP 回聲伺服器 示例使用了高層級的 asyncio.start_server() 函式。

TCP 回聲客戶端

使用 loop.create_connection() 方法建立一個 TCP 回聲客戶端,傳送資料,並等待直到連線關閉

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

另請參閱

使用流的 TCP 回聲客戶端 示例使用了高層級的 asyncio.open_connection() 函式。

UDP 回聲伺服器

使用 loop.create_datagram_endpoint() 方法建立一個 UDP 回聲伺服器,將接收到的資料傳送回去

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        EchoServerProtocol,
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

UDP 回聲客戶端

使用 loop.create_datagram_endpoint() 方法建立一個 UDP 回聲客戶端,傳送資料並在收到答案後關閉傳輸

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

連線現有套接字

使用帶協議的 loop.create_connection() 方法,等待套接字接收資料

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

另請參閱

監視檔案描述符的讀取事件 示例使用了低層級的 loop.add_reader() 方法來註冊 FD。

使用流的 註冊一個開啟的套接字以等待使用流的資料 示例使用了由協程中的 open_connection() 函式建立的高層級流。

loop.subprocess_exec() 和 SubprocessProtocol

一個子程序協議的示例,用於獲取子程序的輸出並等待子程序退出。

子程序由 loop.subprocess_exec() 方法建立

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()
        self.pipe_closed = False
        self.exited = False

    def pipe_connection_lost(self, fd, exc):
        self.pipe_closed = True
        self.check_for_exit()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exited = True
        # process_exited() method can be called before
        # pipe_connection_lost() method: wait until both methods are
        # called.
        self.check_for_exit()

    def check_for_exit(self):
        if self.pipe_closed and self.exited:
            self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

另請參閱使用高層級 API 編寫的 相同示例