多进程编程
multiprocessing
由于 GIL(全局解释锁) 的原因, 多线程并不能充分利用多核处理器, 如果是一个 CPU 计算型的任务, 应该使用多进程模块 multiprocessing, 它的工作方式与线程库不同, 但是两种库的接口相似。multiprocessing 给每个进程赋予了单独的 Python 解释器, 这样就规避了 GIL 所带来的问题。
import multiprocessing
def worker(): print('Worker')
if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker) jobs.append(p) p.start()
Worker Worker Worker Worker Worker
|
目标函数也支持传入参数:
import multiprocessing
def worker(num): print(f'Worker: {num}')
if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i, )) jobs.append(p) p.start()
Worker: 0 Worker: 1 Worker: 3 Worker: 2 Worker: 4
|
守护进程
和多线程模块一样, 多进程也是可以设置守护进程的, 守护进程可以一直运行而不影响主程序的结束。
import multiprocessing import time
def daemon(): p = multiprocessing.current_process() print(f'Starting: {p.name} {p.pid}') time.sleep(2) print(f'Exiting: {p.name} {p.pid}')
def non_daemon(): p = multiprocessing.current_process() print(f'Starting: {p.name} {p.pid}') print(f'Exiting: {p.name} {p.pid}')
if __name__ == '__main__': d = multiprocessing.Process( name = 'daemon', target = daemon, daemon=True )
n = multiprocessing.Process( name = 'non-daemon', target = non_daemon, )
d.start() time.sleep(1) n.start()
Starting: daemon 7068 Starting: non-daemon 10116 Exiting: non-daemon 10116
|
通过上面代码的输出可以看到, 守护进程没有完成程序就结束了。如果需要等待守护进程完成工作后再结束, 可以使用 join() 方法:
import multiprocessing import time
def daemon(): p = multiprocessing.current_process() print(f'Starting: {p.name} {p.pid}') time.sleep(2) print(f'Exiting: {p.name} {p.pid}')
def non_daemon(): p = multiprocessing.current_process() print(f'Starting: {p.name} {p.pid}') print(f'Exiting: {p.name} {p.pid}')
if __name__ == '__main__': d = multiprocessing.Process( name = 'daemon', target = daemon, daemon=True )
n = multiprocessing.Process( name = 'non-daemon', target = non_daemon, )
d.start() time.sleep(1) n.start()
d.join() n.join()
Starting: daemon 2928 Starting: non-daemon 13376 Exiting: non-daemon 13376 Exiting: daemon 2928
|
join() 方法接收一个超时的参数, 默认是 none, 表示会一直阻塞, 可以设置一个超时时间, 在这个时间内没有完成, 就会阻塞:
import multiprocessing import time
def daemon(): p = multiprocessing.current_process() print(f'Starting: {p.name} {p.pid}') time.sleep(2) print(f'Exiting: {p.name} {p.pid}')
def non_daemon(): p = multiprocessing.current_process() print(f'Starting: {p.name} {p.pid}') print(f'Exiting: {p.name} {p.pid}')
if __name__ == '__main__': d = multiprocessing.Process( name = 'daemon', target = daemon, daemon=True )
n = multiprocessing.Process( name = 'non-daemon', target = non_daemon, )
d.start() time.sleep(1) n.start()
d.join(1) print('d.is_alive()', d.is_alive()) n.join()
Starting: daemon 7272 Starting: non-daemon 5872 Exiting: non-daemon 5872 d.is_alive() True
|
进程池
任务的执行周期决定了 CPU 核数和任务的分配算法, 使用多进程编程 Pool 是一个很灵活的保证效率的方法:
from functools import lru_cache from multiprocessing import Pool
@lru_cache(maxsize=None) def fib(n): if n < 2: return n return fib(n-1) + fib(n-2)
if __name__ == '__main__': pool = Pool(2) pool.map(fib, [35] * 2)
|
dummy
multiprocessing.dummy 这个子模块虽然在多进程模块的代码中, 但是接口和多线程的接口基本是一样的, 如果分不清一个任务是 CPU 密集型还是 I/O 密集型, 可以使用如下方法去试:
from multiprocessing import Pool from multiprocessing.dummy import Pool
|
这种兼容的方式, 便于在多线程/多进程之间切换。
Queue(队列)
多线程里面有 Queue 模块实现队列, 多进程的 multiprocessing 里面包含了 Queue 这个类, 它是线程和进程安全的。下面是一个生产者/消费者的例子, 用到了两个队列, 一个队列用于存储完成的任务, 另外一个用于存储任务完成后的结果:
import time from multiprocessing import Process, JoinableQueue, Queue from random import random
def double(n): return n * 2
def producer(in_queue): while 1: wait_time = random() time.sleep(wait_time) in_queue.put((double, wait_time)) if wait_time > 0.9: in_queue.put(None) print('停止生产') break
def consumer(in_queue, out_queue): while 1: task = in_queue.get() if task is None: break func, arg = task result = func(arg) in_queue.task_done() out_queue.put(result)
if __name__ == '__main__': tasks_queue = JoinableQueue() results_queue = Queue() processes = []
p = Process(target=producer, args=(tasks_queue, )) p.start() processes.append(p)
p = Process(target=consumer, args=(tasks_queue, results_queue)) p.start() processes.append(p)
tasks_queue.join()
for p in processes: p.join()
while True: if results_queue.empty(): break result = results_queue.get() print(f'Result: {result}')
停止生产 Result: 1.3500119015795484 Result: 1.7651301976930043 Result: 1.6336519677702004 Result: 0.06429843269363 Result: 0.29352347406759494 Result: 1.0097954936153397 Result: 0.19863644698178606 Result: 0.9589181928209678 Result: 1.4618869426710388 Result: 1.6837862156424794 Result: 0.8653351112396082 Result: 1.5958573192798793 Result: 0.15849993035736087 Result: 1.3471427672620973 Result: 1.7492282062851205 Result: 0.27695109993667644 Result: 0.7201581558818728 Result: 1.9614106580291402
|
进程间共享状态
multiprocessing 提供了在进程之间共享状态的方案, 主要有两种: 共享内存 和 服务器进程。
共享内存
共享内存主要通过 Value 和 Array 来实现, 在多个进程之间共享一份数据, 常见的共享类型有下面这些:
>>> from multiprocessing.sharedctypes import typecode_to_type >>> typecode_to_type {'c': <class 'ctypes.c_char'>, # 左边是缩写, 右边是全称 'u': <class 'ctypes.c_wchar'>, 'b': <class 'ctypes.c_byte'>, 'B': <class 'ctypes.c_ubyte'>, 'h': <class 'ctypes.c_short'>, 'H': <class 'ctypes.c_ushort'>, 'i': <class 'ctypes.c_long'>, 'I': <class 'ctypes.c_ulong'>, 'l': <class 'ctypes.c_long'>, 'L': <class 'ctypes.c_ulong'>, 'f': <class 'ctypes.c_float'>, 'd': <class 'ctypes.c_double'>}
|
下面是一个共享内存的实现示例:
from multiprocessing import Process, Lock from multiprocessing.sharedctypes import Value, Array from ctypes import Structure, c_bool, c_double
class Point(Structure): _fields_ = [('x', c_double), ('y', c_double)]
def modify(n, b, s, arr, A): n.value **= 2 b.value = True s.value = s.value.upper() arr[0] = 10 for a in A: a.x **= 2 a.y **= 2
if __name__ == '__main__': lock = Lock()
n = Value('i', 7) b = Value(c_bool, False, lock=False) s = Array('c', b'hello world', lock=lock) arr = Array('i', range(5), lock=True) A = Array(Point, [(1.525, -6.25), (-5.75, 2.5)], lock=lock)
p = Process(target=modify, args=(n, b, s, arr, A)) p.start() p.join()
print(n.value) print(b.value) print(s.value) print(arr[:]) print([(a.x, a.y) for a in A])
49 True b'HELLO WORLD' [10, 1, 2, 3, 4] [(2.3256249999999996, 39.0625), (33.0625, 6.25)]
|
服务器进程
一个 multiprocessing 的 Manager 对象会控制一个服务器的进程, 其他进程可以通过代理的方式来访问这个服务器进程, 常见的共享方式有以下几种:
Namespace: 创建一个可分享的命名空间。
Value/Array: 和共享内存中共享 ctypes 对象的方式一样。
dict/list: 创建一个可分享的 dict/list, 支持对应数据结构的方法。
Condition/Event/Lock/Queue/Semaphore: 创建一个可分享的对应同步原语的对象。
from multiprocessing import Manager, Process
def modify(ns, lproxy, dproxy): ns.a **= 2 lproxy.extend(['b', 'c']) dproxy['b'] = 1
if __name__ == '__main__': manager = Manager() ns = manager.Namespace() ns.a = 1 lproxy = manager.list() lproxy.append('a') dproxy = manager.dict() dproxy['b'] = 0
p = Process(target=modify, args=(ns, lproxy, dproxy)) p.start() print(f'PID: {p.pid}') p.join()
print(ns.a) print(lproxy) print(dproxy)
PID: 6556 1 ['a', 'b', 'c'] {'b': 1}
|
分布式的进程间通信
使用 manager 可以实现一个简单的分布式的不同服务器之间不同进程的通信, 也就是一个简单的 client/server 模型:
from multiprocessing.managers import BaseManager
class RemoteManager(BaseManager): pass
if __name__ == '__main__': host = '127.0.0.1' port = 5000 authkey = b'secret'
shared_list = []
RemoteManager.register('get_list', callable=lambda: shared_list) mgr = RemoteManager(address=(host, port), authkey=authkey) server = mgr.get_server() server.serve_forever()
|
from multiprocessing.managers import BaseManager
class RemoteManager(BaseManager): pass
if __name__ == '__main__': host = '127.0.0.1' port = 5000 authkey = b'secret'
RemoteManager.register('get_list') mgr = RemoteManager(address=(host, port), authkey=authkey) mgr.connect()
l = mgr.get_list() print(l) l.append(1) print(mgr.get_list())
|
> python remote_server.py
> python client.py [] [1]
|