while inputs: # select.select() 接收的前三个参数的值都是列表, 会从第一个参数列表中读取数据, 向第二个参数列表中发出数据, 第三个列表包含了可能有错误的对象。 # select.select() 返回的也是三个参数, 包含了对应满足条件的对象, 分别是可读对象、可写对象、异常对象 readable, writeable, exceptional = select.select(inputs, outputs, inputs) for s in readable: if s is server: conn, addr = s.accept() # 接受客户端连接 print(f'Connected by {addr}') conn.setblocking(0) # 设置连接为非阻塞 inputs.append(conn) message_queues[conn] = Queue() else: # 接收数据, 将得到的数据存到对应连接的队列中 data = s.recv(1024) if data: print(f'received "{data}" from {s.getpeername()}') message_queues[s].put(data) if s notin outputs: outputs.append(s) else: # 如果没有可读取的数据, 说明客户端已经断开, 从 inputs 和 message_queues 中移除, 并且关闭套接字 if s in outputs: outputs.remove(s) inputs.remove(s) del message_queues[s] s.close()
# 遍历可写对象列表 for s in writeable: try: # 如果有准备好的可写入的套接字, 就从对应连接的队列里获取收到的消息, 然后发送回给客户端 next_msg = message_queues[s].get_nowait() except Empty: outputs.remove(s) else: s.send(bytes(f'Server received {next_msg}', 'utf-8'))
# 移除错误的套接字 for s in exceptional: inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
客户端例子(client.py)
import socket
HOST = '127.0.0.1' PORT = 5000
messages = [ 'This is ', 'the message.', 'It will be send ', 'in parts', ]
for index, message in enumerate(messages): _, is_odd = divmod(index, 2) outgoing_data = message.encode()
for index, s in enumerate(socks): if divmod(index, 2)[1] != is_odd: continue print(f'{s.getsockname()}: sending {outgoing_data}') s.send(outgoing_data)
for index, s in enumerate(socks): if divmod(index, 2)[1] != is_odd: continue data = s.recv(1024) print(f'{s.getsockname()}: recevied {data}') ifnot data: s.close()
启动服务端和客户端
启动服务端:
> python select_server.py Server start at: 127.0.0.1:5000 Connected by ('127.0.0.1', 14886) Connected by ('127.0.0.1', 14887) received "b'This is '"from ('127.0.0.1', 14886) received "b'the message.'"from ('127.0.0.1', 14887) received "b'It will be send '"from ('127.0.0.1', 14886) received "b'in parts'"from ('127.0.0.1', 14887)
启动客户端:
>python client.py connecting to 127.0.0.1 port 5000 ('127.0.0.1', 14886): sending b'This is ' ('127.0.0.1', 14886): recevied b"Server received b'This is '" ('127.0.0.1', 14887): sending b'the message.' ('127.0.0.1', 14887): recevied b"Server received b'the message.'" ('127.0.0.1', 14886): sending b'It will be send ' ('127.0.0.1', 14886): recevied b"Server received b'It will be send '" ('127.0.0.1', 14887): sending b'in parts' ('127.0.0.1', 14887): recevied b"Server received b'in parts'"
whileTrue: events = epoller.poll(TIME_OUT) for fd, flag in events: s = fd_to_socket[fd] if s is server: conn, addr = s.accept() print(f'Connected by {addr}') conn.setblocking(0) epoller.register(conn, select.EPOLLIN) fd_to_socket[conn.fileno()] = conn message_queues[conn] = Queue() elif flag & (select.POLLIN | select.POLLPRI): data = s.recv(1024) if data: print(f'received "{data}" from {s.getpeername()}') message_queues[s].put(data) epoller.modify(s, select.EPOLLOUT) else: epoller.unregister(s) s.close() elif flag & select.POLLHUP: epoller.unregister(s) s.close() elif flag & select.POLLOUT: try: next_msg = message_queues[s].get_nowait() except Empty: epoller.modify(s, select.EPOLLIN) else: s.send(bytes(f'Server received {next_msg}', 'utf-8')) elif flag & select.POLLERR: poller.unregister(s) s.close() del message_queues[s]
defread(conn, mask): data = conn.recv(1024) if data: print(f'received "{data}" from {conn.getpeername()}') conn.send(bytes(f'Server received {data}', 'utf-8')) else: sel.unregister(conn) conn.close()