Python3標準庫:selectors I/O多路復用抽象

1. selectors I/O多路復用抽象

selectors模塊在select中平臺特定的I/O監視函數之上提供了一個平臺獨立的抽象層。

1.1 操作模型

selectors中的API是基于事件的,與select中的poll()類似。它有多個實現,并且這個模塊會自動設置別名DefaultSelector來指示對當前系統配置最為高效的一個實現。

選擇器對象提供了一些方法,可以指定在一個套接字上查找哪些事件,然后以一種平臺獨立的方式讓調用者等待事件。注冊對事件的興趣會創建一個SelectorKey,其中包含套接字、所注冊事件的有關信息,可能還有可選的應用數據。選擇器的所有者調用它的select()方法來了解事件。返回值是一個鍵對象序列和一個指示發生了哪些事件的位掩碼。使用選擇器的程序要反復調用select(),然后適當地處理事件。

1.2 回送服務器

這里給出的回送服務器例子使用了Selectorkey中的應用數據來注冊發生新事件時要調用的一個回調函數。主循環從鍵得到這個回調,并把套接字和事件掩碼傳遞給該回調。服務器啟動時,其會注冊當主服務器套接字上發生讀事件時要調用的accept()函數。接受連接會產生一個新的套接字,然后注冊read()函數作為讀事件的一個回調。

import selectorsimport socketmysel = selectors.DefaultSelector()keep_running = Truedef read(connection, mask): "Callback for read events" global keep_running client_address = connection.getpeername() print("read({})".format(client_address)) data = connection.recv(1024) if data: # A readable client socket has data print(" received {!r}".format(data)) connection.sendall(data) else: # Interpret empty result as closed connection print(" closing") mysel.unregister(connection) connection.close() # Tell the main loop to stop keep_running = Falsedef accept(sock, mask): "Callback for new connections" new_connection, addr = sock.accept() print("accept({})".format(addr)) new_connection.setblocking(False) mysel.register(new_connection, selectors.EVENT_READ, read)server_address = ("localhost", 9999)print("starting up on {} port {}".format(*server_address))server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.setblocking(False)server.bind(server_address)server.listen(5)mysel.register(server, selectors.EVENT_READ, accept)while keep_running: print("waiting for I/O") for key, mask in mysel.select(timeout=1): callback = key.data callback(key.fileobj, mask)print("shutting down")mysel.close()

如果read()沒有從套接字接收到任何數據,那么當連接的另一端關閉時,它會中斷讀事件而不是發送數據。之后,會從選擇器刪除這個套接字,并將其關閉。由于這只是一個示例程序,所以這個服務器與唯一的客戶結束通信后還會關閉服務器自身。

1.3 回送客戶

下面的回送客戶例子會處理主循環中的所有I/O事件,而不是使用回調。它會建立選擇器來報告套接字上的讀事件,并報告套接字什么時候準備好可以發送數據。由于它查看兩種類型的事件,所以客戶必須通過查看掩碼值來檢查發生了哪個事件。所有數據都發出后,它會修改選擇器配置,只在有可讀取的數據時才會報告。

import selectorsimport socketmysel = selectors.DefaultSelector()keep_running = Trueoutgoing = [ b"It will be repeated.", b"This is the message. ",]bytes_sent = 0bytes_received = 0# Connecting is a blocking operation, so call setblocking()# after it returns.server_address = ("localhost", 9999)print("connecting to {} port {}".format(*server_address))sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.connect(server_address)sock.setblocking(False)# Set up the selector to watch for when the socket is ready# to send data as well as when there is data to read.mysel.register( sock, selectors.EVENT_READ | selectors.EVENT_WRITE,)while keep_running: print("waiting for I/O") for key, mask in mysel.select(timeout=1): connection = key.fileobj client_address = connection.getpeername() print("client({})".format(client_address)) if mask & selectors.EVENT_READ: print(" ready to read") data = connection.recv(1024) if data: # A readable client socket has data print(" received {!r}".format(data)) bytes_received += len(data) # Interpret empty result as closed connection, # and also close when we have received a copy # of all of the data sent. keep_running = not ( data or (bytes_received and (bytes_received == bytes_sent)) ) if mask & selectors.EVENT_WRITE: print(" ready to write") if not outgoing: # We are out of messages, so we no longer need to # write anything. Change our registration to let # us keep reading responses from the server. print(" switching to read-only") mysel.modify(sock, selectors.EVENT_READ) else: # Send the next message. next_msg = outgoing.pop() print(" sending {!r}".format(next_msg)) sock.sendall(next_msg) bytes_sent += len(next_msg)print("shutting down")mysel.unregister(connection)connection.close()mysel.close()

這個客戶不僅跟蹤它發出的數據量,還會跟蹤接收的數據量。當這些值一致而且非0時,客戶退出處理循環,并妥善地關閉,它將從選擇器刪除套接字,并關閉套接字和選擇器。

1.4 服務器和客戶

要在不同的終端窗口運行客戶和服務器,使它們能夠相互通信。服務器輸出顯示了入站連接和數據,以及發回給客戶的響應。

客戶輸入顯示了發出的信息和從服務器得到的響應。

免責聲明:本文僅代表文章作者的個人觀點,與本站無關。其原創性、真實性以及文中陳述文字和內容未經本站證實,對本文以及其中全部或者部分內容文字的真實性、完整性和原創性本站不作任何保證或承諾,請讀者僅作參考,并自行核實相關內容。

http://image95.pinlue.com/image/84.jpg
分享
評論
首頁
安微11选五走势图