|
| 1 | +下面我们将使用多进程解决多输入情况下的斐波那契数列问题,而不是之前我们使用的多线程的方法。 |
| 2 | +multiprocessing_fibonacci.py程序使用multiprocessing模块,为了顺利执行,还导入了如下模块: |
| 3 | +```python |
| 4 | +import sys, time, random, re, requests |
| 5 | +import concurrent.futures |
| 6 | +from multiprocessing import, cpu_count, current_process, Manager |
| 7 | +``` |
| 8 | + |
| 9 | +上面一些导入模块在之前的章节中提及过,然而,下面中的一些模块我们需要特别注意: |
| 10 | +* cpu_count: 该方法允许获得机器cpu的数量 |
| 11 | +* current_process: 该方法可以获得当前进程的信息,比如进程名称 |
| 12 | +* Manager: 该类通过对象的方式允许功在多进程之间共享python对象 |
| 13 | + |
| 14 | +下面的代码中我们可以注意到第一个方法有点不同,它将产生15个1到20之间的整数,这些整数将被当作fibo_dict的key使用。 |
| 15 | +接下来让我们一起来看producer_task方法,如下: |
| 16 | +```python |
| 17 | +def producer_task(q, fibo_dict): |
| 18 | + for i in range(5): |
| 19 | + value = random.randint(1, 20) |
| 20 | + fibo_dict[value] = None |
| 21 | + |
| 22 | + print("Producer [%s] putting value [%d] into queue.." % (current_process().name, value)) |
| 23 | + q.put(value) |
| 24 | + |
| 25 | +``` |
| 26 | + |
| 27 | +下面将定义一个函数来计算fibo_dict中key对应的斐波那契数列值,和之前章节介绍计算斐波那契序列值不同的是,这里把fibo_dict当作参数传入不同的processes。 |
| 28 | + |
| 29 | +下面是consumer_task方法,如下: |
| 30 | +```python |
| 31 | +def consumer_task(q, fibo_dict): |
| 32 | + while not q.empty(): |
| 33 | + value = q.get(True, 0.05) |
| 34 | + a, b = 0, 1 |
| 35 | + for item in range(value): |
| 36 | + a, b = b, a+b |
| 37 | + fibo_dict[value] = a |
| 38 | + print("consumer [%s] getting value [%d] from queue..." % (current_process().name, value)) |
| 39 | +``` |
| 40 | + |
| 41 | +更进一步,我们来看main函数中的代码,main函数中下面几个变量被定义: |
| 42 | +* data_queue: 该参数由multiprocessing.Queueu来创建,是进程安全的 |
| 43 | +* number_of_cpus: 该参数由multiprocessing.cpu_count方法获得,获得机器cpu的个数 |
| 44 | +* fibo_dict: 这个字典类型变量从Manager实例获得,保存多进程计算结果 |
| 45 | + |
| 46 | +然后,我们将创建producer进程,并传入data_queue队列,data_queue队列值由producer_task方法获得: |
| 47 | +```python |
| 48 | +producer = Process(target=producer_task, args=(data_queue, fibo_dict)) |
| 49 | +producer.start() |
| 50 | +producer.join() |
| 51 | +``` |
| 52 | + |
| 53 | +我们可以注意到Process实例的初始化过程和我们之前的Thread实例初始化过程类似。初始化函数接收target参数作为进程中要执行的函数,和args参数作为target传入的函数的参数。接下来我们通过start方式开始进程,然后使用join方法,等待producer进程执行完毕。 |
| 54 | + |
| 55 | +下面一块代码中,我们将定义consumer_list队列,存入初始化过的consumer进程。使用list存储consumer对象的原因是在所有进程结束开始后调用join方法。循环中的每一个worker被调用后,下一个worker将等待上一个worker执行完毕后才开始执行,下面代码将描述这一过程: |
| 56 | +```python |
| 57 | +consumer_list = [] |
| 58 | +cpu = cpu_count() |
| 59 | +print(cpu) |
| 60 | +for i in range(cpu): |
| 61 | + consumer = Process(target=consumer_task, args=(data_queue, fibo_dict)) |
| 62 | + consumer.start() |
| 63 | + consumer_list.append(consumer) |
| 64 | +[consumer.join() for consumer in consumer_list] |
| 65 | +``` |
| 66 | +最终我们将迭代输出fibo_dict中的结果,如下面截图所示: |
0 commit comments