傳輸和協議¶
前言
傳輸和協議由低階事件迴圈 API 使用,例如 loop.create_connection()
。它們使用基於回撥的程式設計風格,並實現了網路或 IPC 協議(例如 HTTP)的高效能實現。
本質上,傳輸和協議應僅在庫和框架中使用,而絕不應在高階 asyncio 應用程式中使用。
引言
在最高層,傳輸關注位元組是如何傳輸的,而協議決定要傳輸哪些位元組(以及在某種程度上何時傳輸)。
換句話說:傳輸是套接字(或類似 I/O 端點)的抽象,而協議是從傳輸的角度來看的應用程式的抽象。
另一種觀點是,傳輸和協議介面共同定義了使用網路 I/O 和程序間 I/O 的抽象介面。
傳輸物件和協議物件之間總是存在 1:1 的關係:協議呼叫傳輸方法傳送資料,而傳輸呼叫協議方法將接收到的資料傳遞給它。
大多數面向連線的事件迴圈方法(例如 loop.create_connection()
)通常接受一個 protocol_factory 引數,用於為接受的連線建立一個 Protocol 物件,該連線由一個 Transport 物件表示。此類方法通常返回一個 (transport, protocol)
元組。
目錄
此文件頁面包含以下部分
傳輸 部分文件化了 asyncio 的
BaseTransport
、ReadTransport
、WriteTransport
、Transport
、DatagramTransport
和SubprocessTransport
類。協議 部分文件化了 asyncio 的
BaseProtocol
、Protocol
、BufferedProtocol
、DatagramProtocol
和SubprocessProtocol
類。示例 部分展示瞭如何使用傳輸、協議和低階事件迴圈 API。
傳輸¶
原始碼: Lib/asyncio/transports.py
傳輸是 asyncio
提供的類,用於抽象各種通訊通道。
傳輸物件總是由 asyncio 事件迴圈 例項化。
asyncio 為 TCP、UDP、SSL 和子程序管道實現了傳輸。傳輸上可用的方法取決於傳輸的型別。
傳輸類不是執行緒安全的。
傳輸層級結構¶
- class asyncio.BaseTransport¶
所有傳輸的基類。包含所有 asyncio 傳輸共享的方法。
- class asyncio.WriteTransport(BaseTransport)¶
只寫連線的基礎傳輸。
WriteTransport 類的例項由
loop.connect_write_pipe()
事件迴圈方法返回,也由子程序相關方法(如loop.subprocess_exec()
)使用。
- class asyncio.ReadTransport(BaseTransport)¶
只讀連線的基礎傳輸。
ReadTransport 類的例項由
loop.connect_read_pipe()
事件迴圈方法返回,也由子程序相關方法(如loop.subprocess_exec()
)使用。
- class asyncio.Transport(WriteTransport, ReadTransport)¶
表示雙向傳輸(例如 TCP 連線)的介面。
使用者不直接例項化傳輸;他們呼叫一個實用函式,傳入一個協議工廠和建立傳輸和協議所需的其他資訊。
Transport 類的例項由事件迴圈方法(如
loop.create_connection()
、loop.create_unix_connection()
、loop.create_server()
、loop.sendfile()
等)返回或使用。
- class asyncio.DatagramTransport(BaseTransport)¶
資料報 (UDP) 連線的傳輸。
DatagramTransport 類的例項由
loop.create_datagram_endpoint()
事件迴圈方法返回。
- class asyncio.SubprocessTransport(BaseTransport)¶
表示父程序與其子 OS 程序之間連線的抽象。
SubprocessTransport 類的例項由事件迴圈方法
loop.subprocess_shell()
和loop.subprocess_exec()
返回。
基礎傳輸¶
- BaseTransport.close()¶
關閉傳輸。
如果傳輸有用於傳出資料的緩衝區,則緩衝的資料將非同步重新整理。不再接收資料。在所有緩衝資料重新整理後,協議的
protocol.connection_lost()
方法將以None
作為其引數被呼叫。傳輸一旦關閉就不應再使用。
- BaseTransport.is_closing()¶
如果傳輸正在關閉或已關閉,則返回
True
。
- BaseTransport.get_extra_info(name, default=None)¶
返回有關傳輸或其使用的底層資源的資訊。
name 是一個字串,表示要獲取的特定於傳輸的資訊。
如果資訊不可用,或者傳輸不支援使用給定的第三方事件迴圈實現或在當前平臺上查詢它,則 default 是要返回的值。
例如,以下程式碼嘗試獲取傳輸的底層套接字物件
sock = transport.get_extra_info('socket') if sock is not None: print(sock.getsockopt(...))
可以在某些傳輸上查詢的資訊類別
socket
'peername'
:套接字連線到的遠端地址,socket.socket.getpeername()
的結果(錯誤時為None
)'socket'
:socket.socket
例項'sockname'
:套接字自身的地址,socket.socket.getsockname()
的結果
SSL 套接字
'compression'
:正在使用的壓縮演算法的字串表示,如果連線未壓縮則為None
;ssl.SSLSocket.compression()
的結果'cipher'
:一個包含正在使用的密碼名稱、定義其使用的 SSL 協議版本以及正在使用的金鑰位數的三元組;ssl.SSLSocket.cipher()
的結果'peercert'
:對端證書;ssl.SSLSocket.getpeercert()
的結果'sslcontext'
:ssl.SSLContext
例項'ssl_object'
:ssl.SSLObject
或ssl.SSLSocket
例項
管道
'pipe'
:管道物件
subprocess
'subprocess'
:subprocess.Popen
例項
- BaseTransport.set_protocol(protocol)¶
設定新協議。
協議切換隻能在兩個協議都文件化支援該切換時進行。
- BaseTransport.get_protocol()¶
返回當前協議。
只讀傳輸¶
- ReadTransport.is_reading()¶
如果傳輸正在接收新資料,則返回
True
。在 3.7 版本加入。
- ReadTransport.pause_reading()¶
暫停傳輸的接收端。在呼叫
resume_reading()
之前,不會將資料傳遞給協議的protocol.data_received()
方法。3.7 版本中的變更: 該方法是冪等的,即當傳輸已暫停或關閉時,可以呼叫它。
- ReadTransport.resume_reading()¶
恢復接收端。如果有一些資料可供讀取,協議的
protocol.data_received()
方法將再次被呼叫。3.7 版本中的變更: 該方法是冪等的,即當傳輸已在讀取時,可以呼叫它。
只寫傳輸¶
- WriteTransport.abort()¶
立即關閉傳輸,不等待掛起操作完成。緩衝資料將丟失。不再接收資料。協議的
protocol.connection_lost()
方法最終將以None
作為其引數被呼叫。
- WriteTransport.can_write_eof()¶
如果傳輸支援
write_eof()
,則返回True
;否則返回False
。
- WriteTransport.get_write_buffer_size()¶
返回傳輸使用的輸出緩衝區的當前大小。
- WriteTransport.get_write_buffer_limits()¶
獲取寫入流控制的高水位線和低水位線。返回一個元組
(low, high)
,其中 low 和 high 是正數字節數。使用
set_write_buffer_limits()
設定限制。在 3.4.2 版本中新增。
- WriteTransport.set_write_buffer_limits(high=None, low=None)¶
設定寫入流控制的高水位線和低水位線。
這兩個值(以位元組數衡量)控制何時呼叫協議的
protocol.pause_writing()
和protocol.resume_writing()
方法。如果指定,低水位線必須小於或等於高水位線。high 和 low 都不能為負。當緩衝區大小大於或等於 high 值時,呼叫
pause_writing()
。如果已暫停寫入,則當緩衝區大小小於或等於 low 值時,呼叫resume_writing()
。預設值是實現特定的。如果只給出了高水位線,則低水位線預設為一個實現特定的值,小於或等於高水位線。將 high 設定為零也會強制 low 為零,並導致只要緩衝區變為非空,就呼叫
pause_writing()
。將 low 設定為零會導致只有當緩衝區為空時才呼叫resume_writing()
。將任一限制設定為零通常不是最佳選擇,因為它減少了併發進行 I/O 和計算的機會。使用
get_write_buffer_limits()
獲取限制。
- WriteTransport.write(data)¶
向傳輸寫入一些 data 位元組。
此方法不阻塞;它將資料緩衝並安排其非同步傳送。
- WriteTransport.writelines(list_of_data)¶
向傳輸寫入一個數據位元組列表(或任何可迭代物件)。這在功能上等同於對可迭代物件產生的每個元素呼叫
write()
,但可以更有效地實現。
- WriteTransport.write_eof()¶
在重新整理所有緩衝資料後關閉傳輸的寫入端。資料可能仍然被接收。
如果傳輸(例如 SSL)不支援半關閉連線,此方法可能會引發
NotImplementedError
。
資料報傳輸¶
- DatagramTransport.sendto(data, addr=None)¶
將 data 位元組傳送到由 addr 給定的遠端對端(與傳輸相關的目標地址)。如果 addr 為
None
,則資料將傳送到建立傳輸時指定的目標地址。此方法不阻塞;它將資料緩衝並安排其非同步傳送。
3.13 版本中的變更: 此方法可以與空位元組物件一起呼叫,以傳送零長度資料報。用於流控制的緩衝區大小計算也已更新,以考慮資料報頭部。
- DatagramTransport.abort()¶
立即關閉傳輸,不等待掛起操作完成。緩衝資料將丟失。不再接收資料。協議的
protocol.connection_lost()
方法最終將以None
作為其引數被呼叫。
子程序傳輸¶
- SubprocessTransport.get_pid()¶
返回子程序的程序 ID,一個整數。
- SubprocessTransport.get_pipe_transport(fd)¶
返回與整數檔案描述符 fd 對應的通訊管道的傳輸。
- SubprocessTransport.get_returncode()¶
返回子程序的返回碼,一個整數;如果子程序尚未返回,則返回
None
,這類似於subprocess.Popen.returncode
屬性。
- SubprocessTransport.kill()¶
終止子程序。
在 POSIX 系統上,該函式向子程序傳送 SIGKILL。在 Windows 上,此方法是
terminate()
的別名。另請參見
subprocess.Popen.kill()
。
- SubprocessTransport.send_signal(signal)¶
向子程序傳送 signal 編號,如
subprocess.Popen.send_signal()
所示。
協議¶
asyncio 提供了一組抽象基類,應使用它們來實現網路協議。這些類旨在與 傳輸 一起使用。
抽象基礎協議類的子類可以實現部分或全部方法。所有這些方法都是回撥:它們在某些事件發生時由傳輸呼叫,例如收到一些資料時。基礎協議方法應由相應的傳輸呼叫。
基礎協議¶
- class asyncio.BaseProtocol¶
所有協議共享方法的基協議。
- class asyncio.Protocol(BaseProtocol)¶
用於實現流式協議(TCP、Unix 套接字等)的基類。
- class asyncio.BufferedProtocol(BaseProtocol)¶
用於實現帶手動控制接收緩衝區的流式協議的基類。
- class asyncio.DatagramProtocol(BaseProtocol)¶
用於實現資料報 (UDP) 協議的基類。
- class asyncio.SubprocessProtocol(BaseProtocol)¶
用於實現與子程序通訊(單向管道)的協議的基類。
基礎協議¶
所有 asyncio 協議都可以實現基礎協議回撥。
連接回調
連接回調在所有協議上被呼叫,每個成功連線只調用一次。所有其他協議回撥只能在這兩個方法之間被呼叫。
- BaseProtocol.connection_made(transport)¶
建立連線時呼叫。
transport 引數是表示連線的傳輸。協議負責儲存對其傳輸的引用。
流量控制回撥
流量控制回撥可以由傳輸呼叫,以暫停或恢復協議執行的寫入操作。
有關更多詳細資訊,請參閱 set_write_buffer_limits()
方法的文件。
- BaseProtocol.pause_writing()¶
當傳輸的緩衝區超出高水位線時呼叫。
- BaseProtocol.resume_writing()¶
當傳輸的緩衝區低於低水位線時呼叫。
如果緩衝區大小等於高水位線,則不會呼叫 pause_writing()
:緩衝區大小必須嚴格超出。
相反,當緩衝區大小等於或低於低水位線時,會呼叫 resume_writing()
。這些終止條件對於確保當任何一個標記為零時一切按預期進行很重要。
流式協議¶
事件方法,例如 loop.create_server()
、loop.create_unix_server()
、loop.create_connection()
、loop.create_unix_connection()
、loop.connect_accepted_socket()
、loop.connect_read_pipe()
和 loop.connect_write_pipe()
接受返回流式協議的工廠。
- Protocol.data_received(data)¶
收到一些資料時呼叫。data 是一個非空的位元組物件,包含傳入的資料。
資料是否被緩衝、分塊或重組取決於傳輸。通常,您不應該依賴特定的語義,而應該使您的解析通用和靈活。但是,資料總是按正確的順序接收。
在連線開啟期間,此方法可以被呼叫任意次數。
但是,
protocol.eof_received()
最多被呼叫一次。一旦呼叫了eof_received()
,就不再呼叫data_received()
。
- Protocol.eof_received()¶
當另一端發出訊號表示不再發送任何資料時呼叫(例如透過呼叫
transport.write_eof()
,如果另一端也使用 asyncio)。此方法可以返回一個假值(包括
None
),在這種情況下,傳輸將自行關閉。相反,如果此方法返回一個真值,則由所使用的協議決定是否關閉傳輸。由於預設實現返回None
,因此它隱式關閉連線。一些傳輸,包括 SSL,不支援半關閉連線,在這種情況下,此方法返回真值將導致連線關閉。
狀態機
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
緩衝流式協議¶
在 3.7 版本加入。
緩衝協議可與支援 流式協議 的任何事件迴圈方法一起使用。
BufferedProtocol
實現允許顯式手動分配和控制接收緩衝區。然後,事件迴圈可以使用協議提供的緩衝區來避免不必要的資料複製。這可以顯著提高接收大量資料的協議的效能。複雜的協議實現可以顯著減少緩衝區分配的數量。
以下回調在 BufferedProtocol
例項上被呼叫
- BufferedProtocol.get_buffer(sizehint)¶
呼叫以分配新的接收緩衝區。
sizehint 是返回緩衝區的推薦最小大小。返回小於或大於 sizehint 建議的緩衝區是可接受的。當設定為 -1 時,緩衝區大小可以是任意的。返回零大小的緩衝區是錯誤的。
get_buffer()
必須返回一個實現 緩衝區協議 的物件。
- BufferedProtocol.buffer_updated(nbytes)¶
當緩衝區用接收到的資料更新時呼叫。
nbytes 是寫入緩衝區的總位元組數。
- BufferedProtocol.eof_received()¶
請參閱
protocol.eof_received()
方法的文件。
get_buffer()
在連線期間可以被呼叫任意次數。但是,protocol.eof_received()
最多被呼叫一次,如果被呼叫,get_buffer()
和 buffer_updated()
將不再在此之後被呼叫。
狀態機
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
資料報協議¶
資料報協議例項應由傳遞給 loop.create_datagram_endpoint()
方法的協議工廠構造。
- DatagramProtocol.datagram_received(data, addr)¶
收到資料報時呼叫。data 是一個位元組物件,包含傳入的資料。addr 是傳送資料的對端地址;具體格式取決於傳輸。
- DatagramProtocol.error_received(exc)¶
當之前的傳送或接收操作引發
OSError
時呼叫。exc 是OSError
例項。此方法在很少見的情況下被呼叫,即當傳輸(例如 UDP)檢測到資料報無法傳遞給其接收方時。然而,在許多情況下,無法傳遞的資料報將被靜默丟棄。
備註
在 BSD 系統(macOS、FreeBSD 等)上,資料報協議不支援流控制,因為沒有可靠的方法來檢測因寫入過多資料包而導致的傳送失敗。
套接字總是顯示“就緒”狀態,多餘的資料包會被丟棄。可能會或可能不會引發 OSError
,其中 errno
設定為 errno.ENOBUFS
;如果引發,它將報告給 DatagramProtocol.error_received()
,否則將被忽略。
子程序協議¶
子程序協議例項應由傳遞給 loop.subprocess_exec()
和 loop.subprocess_shell()
方法的協議工廠構造。
- SubprocessProtocol.pipe_data_received(fd, data)¶
當子程序向其 stdout 或 stderr 管道寫入資料時呼叫。
fd 是管道的整數檔案描述符。
data 是一個非空的位元組物件,包含接收到的資料。
- SubprocessProtocol.pipe_connection_lost(fd, exc)¶
當與子程序通訊的某個管道關閉時呼叫。
fd 是已關閉的整數檔案描述符。
- SubprocessProtocol.process_exited()¶
子程序退出時呼叫。
它可以在
pipe_data_received()
和pipe_connection_lost()
方法之前被呼叫。
示例¶
TCP 回顯伺服器¶
使用 loop.create_server()
方法建立一個 TCP 回顯伺服器,回顯接收到的資料,並關閉連線
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
EchoServerProtocol,
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
參見
使用流的 TCP 回顯伺服器 示例使用了高階 asyncio.start_server()
函式。
TCP 回顯客戶端¶
一個 TCP 回顯客戶端,使用 loop.create_connection()
方法傳送資料,並等待連線關閉
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
參見
使用流的 TCP 回顯客戶端 示例使用了高階 asyncio.open_connection()
函式。
UDP 回顯伺服器¶
一個 UDP 回顯伺服器,使用 loop.create_datagram_endpoint()
方法,回顯接收到的資料
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
EchoServerProtocol,
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
UDP 回顯客戶端¶
一個 UDP 回顯客戶端,使用 loop.create_datagram_endpoint()
方法,傳送資料並在收到答覆時關閉傳輸
import asyncio
class EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Hello World!"
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, on_con_lost),
remote_addr=('127.0.0.1', 9999))
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
連線現有套接字¶
使用 loop.create_connection()
方法和協議等待套接字接收資料
import asyncio
import socket
class MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport;
# connection_lost() will be called automatically.
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Create a pair of connected sockets
rsock, wsock = socket.socketpair()
# Register the socket to wait for data.
transport, protocol = await loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Simulate the reception of data from the network.
loop.call_soon(wsock.send, 'abc'.encode())
try:
await protocol.on_con_lost
finally:
transport.close()
wsock.close()
asyncio.run(main())
參見
監視檔案描述符以進行讀取事件 示例使用了低階 loop.add_reader()
方法來註冊檔案描述符。
註冊開放套接字以使用流等待資料 示例使用了協程中由 open_connection()
函式建立的高階流。
loop.subprocess_exec() 和 SubprocessProtocol¶
一個子程序協議的示例,用於獲取子程序的輸出並等待子程序退出。
子程序由 loop.subprocess_exec()
方法建立
import asyncio
import sys
class DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
self.pipe_closed = False
self.exited = False
def pipe_connection_lost(self, fd, exc):
self.pipe_closed = True
self.check_for_exit()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exited = True
# process_exited() method can be called before
# pipe_connection_lost() method: wait until both methods are
# called.
self.check_for_exit()
def check_for_exit(self):
if self.pipe_closed and self.exited:
self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
code = 'import datetime; print(datetime.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
transport, protocol = await loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', code,
stdin=None, stderr=None)
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await exit_future
# Close the stdout pipe.
transport.close()
# Read the output which was collected by the
# pipe_data_received() method of the protocol.
data = bytes(protocol.output)
return data.decode('ascii').rstrip()
date = asyncio.run(get_date())
print(f"Current date: {date}")
另請參見使用高階 API 編寫的相同示例。