select&selectors

作者 耿嘉豪 日期 2018-06-01 阅读量
select&selectors

select/selectors 模块

现实场景中是多客户端甚至多服务端, 那怎么让服务端支持响应多个客户端的请求呢? socket.setblocking() 可以设置非阻塞, 但是单纯设置非阻塞还不够, 还需要 I/O 复用

I/O 复用

操作系统提供了一个功能, 当你的某个 socket 可读或者可写的时候, 它可以给你一个通知。这样当配合非阻塞的 socket 使用时, 只有当系统通知我哪个描述符可读了, 才去执行 read 操作, 这样可以保证每次 read 都能读到有效数据而不做纯返回 -1 之类的无用功。写操作类似。操作系统的这个功能通过支持 I/O 多路复用的系统调用来使用, 这些函数都可以同时监视多个描述符的读写就绪状态, 这样, 多个描述符的 I/O 操作都能在一个线程内并发交替地顺序完成, 这就叫 I/O 多路复用, 这里的复用指的是复用同一个线程。

这里的描述符是指文件描述符, 不是 Python 描述符, 文件描述符是一个用于表述指向文件的引用的抽象化的概念, 内核会给每个访问的文件分配文件描述符, 它本质上是一个非负整数, 在打开或者新建文件时返回。读写文件都要通过文件描述符。

Linux 系统一切皆是文件, 所以套接字也不例外, 它是可读可写可控制可关闭的文件描述符。

I/O 多路复用适用场景

  1. 当客户处理多个描述符时(一般是交互式输入和网络套接字), 必须使用 I/O 复用。

  2. 当一个用户同时处理多个套接字时。这种情况是可能的, 但是很少出现。

  3. 如果一个 TCP 服务器既要处理监听套接字, 又要处理已连接套接字, 一般也要用到 I/O 复用。

  4. 如果一个服务器既要处理 TCP, 又要处理 UDP, 一般要使用 I/O 复用。

  5. 如果一个服务器要处理多个服务或多个协议, 一般要使用 I/O 复用。

与多进程/多线程相比, I/O 多路复用最大的优势是系统开销小, 系统不必创建和维护线程和进程。

支持 I/O 多路复用的系统调用

  1. select

  2. poll

  3. epoll(Linux 特有)

  4. kqueue(FreeBSD 下提供, 基本原理和 epoll 一样)

select 实现

select 服务端例子(select_server.py)

import socket
import select
from queue import Queue, Empty


HOST = '127.0.0.1'
PORT = 5000


server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0) # 设为非阻塞
server.bind((HOST, PORT)) # 绑定套接字到本地 IP 与端口
server.listen(5) # 监听连接

inputs = [server]
outputs = []
message_queues = {}
print(f'Server start at: {HOST}:{PORT}')


while inputs:
# select.select() 接收的前三个参数的值都是列表, 会从第一个参数列表中读取数据, 向第二个参数列表中发出数据, 第三个列表包含了可能有错误的对象。
# select.select() 返回的也是三个参数, 包含了对应满足条件的对象, 分别是可读对象、可写对象、异常对象
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
for s in readable:
if s is server:
conn, addr = s.accept() # 接受客户端连接
print(f'Connected by {addr}')
conn.setblocking(0) # 设置连接为非阻塞
inputs.append(conn)
message_queues[conn] = Queue()
else:
# 接收数据, 将得到的数据存到对应连接的队列中
data = s.recv(1024)
if data:
print(f'received "{data}" from {s.getpeername()}')
message_queues[s].put(data)
if s not in outputs:
outputs.append(s)
else:
# 如果没有可读取的数据, 说明客户端已经断开, 从 inputs 和 message_queues 中移除, 并且关闭套接字
if s in outputs:
outputs.remove(s)
inputs.remove(s)
del message_queues[s]
s.close()

# 遍历可写对象列表
for s in writeable:
try:
# 如果有准备好的可写入的套接字, 就从对应连接的队列里获取收到的消息, 然后发送回给客户端
next_msg = message_queues[s].get_nowait()
except Empty:
outputs.remove(s)
else:
s.send(bytes(f'Server received {next_msg}', 'utf-8'))

# 移除错误的套接字
for s in exceptional:
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del message_queues[s]

客户端例子(client.py)

import socket


HOST = '127.0.0.1'
PORT = 5000


messages = [
'This is ',
'the message.',
'It will be send ',
'in parts',
]


socks = [
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]

print(f'connecting to {HOST} port {PORT}')

for s in socks:
s.connect((HOST, PORT))

for index, message in enumerate(messages):
_, is_odd = divmod(index, 2)
outgoing_data = message.encode()

for index, s in enumerate(socks):
if divmod(index, 2)[1] != is_odd:
continue
print(f'{s.getsockname()}: sending {outgoing_data}')
s.send(outgoing_data)

for index, s in enumerate(socks):
if divmod(index, 2)[1] != is_odd:
continue
data = s.recv(1024)
print(f'{s.getsockname()}: recevied {data}')
if not data:
s.close()

启动服务端和客户端

启动服务端:

> python select_server.py
Server start at: 127.0.0.1:5000
Connected by ('127.0.0.1', 14886)
Connected by ('127.0.0.1', 14887)
received "b'This is '" from ('127.0.0.1', 14886)
received "b'the message.'" from ('127.0.0.1', 14887)
received "b'It will be send '" from ('127.0.0.1', 14886)
received "b'in parts'" from ('127.0.0.1', 14887)

启动客户端:

>python client.py
connecting to 127.0.0.1 port 5000
('127.0.0.1', 14886): sending b'This is '
('127.0.0.1', 14886): recevied b"Server received b'This is '"
('127.0.0.1', 14887): sending b'the message.'
('127.0.0.1', 14887): recevied b"Server received b'the message.'"
('127.0.0.1', 14886): sending b'It will be send '
('127.0.0.1', 14886): recevied b"Server received b'It will be send '"
('127.0.0.1', 14887): sending b'in parts'
('127.0.0.1', 14887): recevied b"Server received b'in parts'"

通过输出可以看到, 消息分成几部分从不同的连接发送出去, 服务端会从对应的连接返回给客户端

select 的缺点

  1. 单个进程所打开的文件描述符数量是有一定限制的, 通常默认值是 1024。这对于高并发的网络服务来说太小了。

  2. 对 socket 进行扫描时是线性扫描, 即采用轮询的方法, 效率较低, 套接字数量多时会浪费很多 CPU 时间。

  3. 需要维护一个用来存放大量数据的数据结构, 这样会使得用户空间和内核空间在传递该结构时复制开销大。

poll 实现

poll 的实现和 select 相似, 只是描述集合的方式不同。poll 是基于链表存储文件描述符, 它不限制文件描述符的数量。

poll 的特点是水平触发, 如果报告了文件描述符之后, 没有被处理, 那么下次 poll 会再次报告这个文件描述符, 如果用户永远不处理这个事件, 就会导致每次都会有这个事件-从内核到用户的拷贝, 比较耗费性能。但是水平触发相对安全, 除非用户处理完毕, 否则事件不会丢。

import socket
import select
from queue import Queue, Empty


HOST = '127.0.0.1'
PORT = 5000


server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.setblocking(0)
server.bind((HOST, PORT))
server.listen(5)


message_queues = {}

TIME_OUT = 500 # 超时时间 0.5 秒
fd_to_socket = { # 一个文件描述符到套接字的映射
server.fileno(): server,
}
READ_ONLY = (
select.POLLIN |
select.POLLPRI |
select.POLLHUP |
select.POLLERR
) # 4 种事件的并集
READ_WRITE = READ_ONLY | select.POLLOUT


poller = select.poll()

# 给 server 套接字注册, 它会关注 READ_ONLY 列出的 4 种事件
poller.register(server, READ_ONLY)

print(f'Server start at: {HOST}:{PORT}')


while True:
events = poller.poll(TIME_OUT)
for fd, flag in events:
s = fd_to_socket[fd]
if flag & (select.POLLIN | select.POLLPRI): # 输入准备就绪, 也就是可读了
if s is server:
conn, addr = s.accept()
print(f'Connected by {addr}')
conn.setblocking(0)
fd_to_socket[conn.fileno()] = conn
poller.register(conn, READ_ONLY) # 新注册的套接字都关注 READ_ONLY 时间
message_queues[conn] = Queue()
else:
data = s.recv(1024)
if data:
print(f'received "{data}" from {s.getpeername()}')
message_queues[s].put(data)
poller.modify(s, READ_WRITE) # 从缓冲区获取内容后, 也关注 POLLOUT 事件了
else:
poller.unregister(s) # 没有可用数据的套接字说明客户端关闭了, 取消注册
s.close()
elif flag & select.POLLHUP: # 套接字关闭了
poller.unregister(s)
s.close()
elif flag & select.POLLOUT: # 可写
# 取出消息, 修改消息状态
try:
next_msg = message_queues[s].get_nowait()
except Empty:
# 修改套接字关注的时间类型, 因为它已经恢复不可写状态了
poller.modify(s, READ_ONLY)
else:
s.send(bytes(f'Server received {next_msg}', 'utf-8'))
elif flag & select.POLLERR: # 错误的套接字
poller.unregister(s)
s.close()
del message_queues[s]

Python 中, 用一个类来实现 poll, 由这个类管理所有监视的套接字, 套接字会通过 register() 方法注册, 同时利用标志位指示套接字关注的事件。

poll 标志位

POLLIN 有数据读取
POLLPRI 有优先级数据读取
POLLOUT 能够输出
POLLHUP 挂起
POLLERR 错误
POLLNVAL 套接字未打开

水平触发 Level Triggered(LT) / 边缘触发 Edge Triggered(ET)

和水平触发相对的是边缘触发, 当内核有事件到达, 只会通知用户一次, 无论用户是否处理, 以后都不再通知, 这样减少了拷贝过程, 提高了性能, 但是相对来说, 如果用户忘记了, 就会产生事件丢失的情况。

epoll

epoll 方案解决了 select 的几个问题:

  1. epoll 支持的上限是最大可以打开文件的数目。这个数量是很大的, 完全可以满足现在对于高并发的需求。

  2. epoll 的解决方法不像 select 和 poll 每次对所有进行遍历轮询所有集合, 而是在注册新的事件的时候, 为每个指定一个回调函数, 当设备就绪时, 调用这个回调函数, 这个回调函数就会把就绪的加入到一个就绪表中(所以 epoll 实际只需要遍历就绪表)。

  3. epoll 的解决方法是每次注册新的事件到 epoll 中, 会把所有的拷贝进内核, 而不是在等待的时候重复拷贝, 保证了每个在整个过程中只会拷贝一次。

epoll 例子

import socket
import select
from queue import Queue, Empty


HOST = '127.0.0.1'
PORT = 5000


server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

server.setblocking(0)
server.bind((HOST, PORT))
server.listen(5)


message_queues = {}

TIME_OUT = 500 # 超时时间 0.5 秒

fd_to_socket = {
server.fileno(): server,
}


epoller = select.epoll()

epoller.register(server, select.EPOLLIN)

print(f'Server start at: {HOST}:{PORT}')


while True:
events = epoller.poll(TIME_OUT)
for fd, flag in events:
s = fd_to_socket[fd]
if s is server:
conn, addr = s.accept()
print(f'Connected by {addr}')
conn.setblocking(0)
epoller.register(conn, select.EPOLLIN)
fd_to_socket[conn.fileno()] = conn
message_queues[conn] = Queue()
elif flag & (select.POLLIN | select.POLLPRI):
data = s.recv(1024)
if data:
print(f'received "{data}" from {s.getpeername()}')
message_queues[s].put(data)
epoller.modify(s, select.EPOLLOUT)
else:
epoller.unregister(s)
s.close()
elif flag & select.POLLHUP:
epoller.unregister(s)
s.close()
elif flag & select.POLLOUT:
try:
next_msg = message_queues[s].get_nowait()
except Empty:
epoller.modify(s, select.EPOLLIN)
else:
s.send(bytes(f'Server received {next_msg}', 'utf-8'))
elif flag & select.POLLERR:
poller.unregister(s)
s.close()
del message_queues[s]

selectors 模块

selectors 是 python3.4 开始加入的一个模块, 它封装了 select 模块下的 select、epoll 等系统级 I/O 外部接口, 抽象成了 Selector 类, 它规定了统一的接口。

该模块定义了一个 BaseSelector 的抽象基类, 以及它的一些子类:

  1. SelectSelector

  2. PollSelector

  3. EpollSelector

  4. DevpollSelector

  5. KqueueSelector

这些子类分别对应 select 模块下的 I/O 复用方案。

DefaultSelector 类是以上子类的别名, 它会自动选择当前环境中最有效的 Selector。

标志位如下:

  1. EVENT_READ 可读

  2. EVENT_WRITE 可写

selectors 模块服务端写法

import socket
import selectors
from queue import Queue, Empty


HOST = '127.0.0.1'
PORT = 5000


sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
sock.bind((HOST, PORT))
sock.listen(5)


sel = selectors.DefaultSelector()
message_queues = {}


print(f'Server start at: {HOST}:{PORT}')


sel.register(
sock,
selectors.EVENT_READ | selectors.EVENT_WRITE,
)


while True:
for key, mask in sel.select(timeout=0.5):
conn = key.fileobj
if conn is sock:
conn, addr = sock.accept()
print(f'Connected by {addr}')
conn.setblocking(0)
message_queues[conn] = Queue()
sel.register(
conn, selectors.EVENT_READ | selectors.EVENT_WRITE)
elif mask & selectors.EVENT_READ:
data = conn.recv(1024)
if data:
print(f'received "{data}" from {conn.getpeername()}')
message_queues[conn].put(data)
elif mask & selectors.EVENT_WRITE:
try:
next_msg = message_queues[conn].get_nowait()
except Empty:
pass
else:
conn.send(bytes(f'Server received {next_msg}', 'utf-8'))
sel.modify(sock, selectors.EVENT_READ) # 从可写切换到可读状态

Python3 时代, 最简洁的方法是下面这种回调形式, 把读取和写入的部分抽象出来:

import socket
import selectors


HOST = '127.0.0.1'
PORT = 5000

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
sock.bind((HOST, PORT))
sock.listen(5)


sel = selectors.DefaultSelector()


print(f'Server start at: {HOST}:{PORT}')


def read(conn, mask):
data = conn.recv(1024)
if data:
print(f'received "{data}" from {conn.getpeername()}')
conn.send(bytes(f'Server received {data}', 'utf-8'))
else:
sel.unregister(conn)
conn.close()


def accept(sock, mask):
conn, addr = sock.accept()
print(f'Connected by {addr}')
conn.setblocking(0)
sel.register(conn, selectors.EVENT_READ, read)


sel.register(sock, selectors.EVENT_READ, accept)


while True:
events = sel.select(0.5)
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)