进程池Pool & 进程间通信Queue

进程池Pool

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from multiprocessing import Pool
import os
import random
import time

def worker(num):
# for i in range(5):
# print('---进程号: %d ---num=%d----'%(os.getpid(), num))
# time.sleep(1)

print('---进程号: %d ---num=%d----'%(os.getpid(), num))
time.sleep(2)

if __name__ == '__main__':

pool = Pool(3) # 创建进程池, 进程数=3

for i in range(10):
print("----添加任务:%d-----"%i)
pool.apply_async(worker, (i,)) # 非堵塞式添加
# pool.apply(worker, (i,)) # 堵塞式添加
# 向进程池中添加任务数=10, 若添加的函数有参数,以元组的方式传入

pool.close() # 关闭进程池,不再添加新任务
pool.join() # 等待子进程都结束,主进程再结束
# 不然 主进程会在添加完任务后就结束

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
----添加任务:0-----
----添加任务:1-----
----添加任务:2-----
----添加任务:3-----
----添加任务:4-----
----添加任务:5-----
----添加任务:6-----
----添加任务:7-----
----添加任务:8-----
----添加任务:9-----
---进程号: 13608 ---num=0----
---进程号: 17324 ---num=1----
---进程号: 2716 ---num=2----
---进程号: 13608 ---num=3----
---进程号: 17324 ---num=4----
---进程号: 2716 ---num=5----
---进程号: 13608 ---num=6----
---进程号: 17324 ---num=7----
---进程号: 2716 ---num=8----
---进程号: 13608 ---num=9----

非堵塞式添加和堵塞式添加

阻塞与非阻塞指的是程序的两种运行状态

  • 阻塞:遇到IO就发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且立刻释放CPU资源
  • 非阻塞(就绪态或运行态):没有遇到IO操作,或者通过某种手段让程序即便是遇到IO操作也不会停在原地,执行其他操作,力求尽可能多的占有CPU

pool.apply 和 pool.apply_async 的区别

  • apply :阻塞式,需要等待当前子进程执行完毕后,在执行下一个子进程,耗时长。
  • apply_async : 非阻塞式,async是异步的意思,不用等待当前运行的子进程执行完毕,随时根据系统调度来进行进程切换,耗时短。

进程间通信Queue

Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。
multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序

Queue的使用

在内存中开辟一个队列模型,用来存放消息,任何拥有队列对象的进程都可以进行消息存放和取出

初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);

  • Queue.qsize():返回当前队列包含的消息数量;

  • Queue.empty():如果队列为空,返回True,反之False ;

  • Queue.full():如果队列满了,返回True,反之False;

  • Queue.get([block[,timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;

    • 1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出”Queue.Empty”异常; (如 Queue.get(block=True,timeout=2))
    • 2)如果block值为False,消息列队如果为空,则会立刻抛出”Queue.Empty”异常;
  • Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;

    • 1)如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出”Queue.Full”异常;(如 Queue.put(‘xxx’block=True,timeout=2))
    • 2)如果block值为False,消息列队如果没有空间可写入,则会立刻抛出”Queue.Full”异常;
  • Queue.get_nowait():相当Queue.get(False);
  • Queue.put_nowait(item):相当Queue.put(item, False);

Queue实例

在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from multiprocessing import Process, Queue
import os, time

def write_data(queue):
for value in ['Message1', 'Message2', 'Message3']:
print("Put %s to queue..." %value)
queue.put(value) # 加入消息队列
time.sleep(2)

def read_data(queue):
while True:
if not queue.empty(): # 判断消息队列是否为空
value = queue.get(True) # 默认值为True,获取消息队列中的一个值
print("Get %s to queue..." %value)
time.sleep(2)
else:
break

if __name__ == '__main__':
# 当前父进程创建Queue,供两个子进程读写
q = Queue() # 初始化队列 ,未传入参数时,即可不受限制接受消息
pw = Process(target=write_data, args=(q,)) # 创建写进程
pr = Process(target=read_data, args=(q,)) # 创建读进程
# 写进程pw开启
pw.start()
# 等待写进程pw结束
pw.join()
# 读进程pr开启
pr.start()
# 等待度进程pr结束
pr.join()
print("...Done")

执行结果:

1
2
3
4
5
6
7
Put Message1 to queue...
Put Message2 to queue...
Put Message3 to queue...
Get Message1 to queue...
Get Message2 to queue...
Get Message3 to queue...
...Done

进程池通信Queue

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的实例演示了进程池中的进程如何通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from multiprocessing import Manager, Pool
import os, time

def reader(queue):
print("reader启动(%s),父进程为(%s)"%(os.getpid(),os.getppid()))
for i in range(queue.qsize()):
print("reader从Queue获取到消息:%s "% queue.get(True))


def writer(queue):
print("writer启动(%s),父进程为(%s)"%(os.getpid(),os.getppid()))
for i in ['Message1', 'Message2', 'Message3', 'Message4']:
queue.put(i)

if __name__ == '__main__':
print("(%s) start"%os.getpid())
q = Manager().Queue() #使用Manager中的Queue来初始化
po = Pool()
#使用阻塞模式创建进程,这样就不需要在reader中使用死循环了,可以让writer完全执行完成后,再用reader去读取
po.apply(writer,(q,))
po.apply(reader,(q,))
po.close()
po.join()
print("(%s) End"%os.getpid())

执行结果:

1
2
3
4
5
6
7
8
(12196) start
writer启动(6812),父进程为(12196)
reader启动(7992),父进程为(12196)
reader从Queue获取到消息:Message1
reader从Queue获取到消息:Message2
reader从Queue获取到消息:Message3
reader从Queue获取到消息:Message4
(12196) End