温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何在python中使用multiprocessing实现多进程并行计算

发布时间:2021-03-05 14:39:18 来源:亿速云 阅读:779 作者:Leah 栏目:开发技术

如何在python中使用multiprocessing实现多进程并行计算?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

python是什么意思

Python是一种跨平台的、具有解释性、编译性、互动性和面向对象的脚本语言,其最初的设计是用于编写自动化脚本,随着版本的不断更新和新功能的添加,常用于用于开发独立的项目和大型项目。

apply apply_async

apply 要逐个执行任务,在python3中已经被弃用,而apply_async是apply的异步执行版本。并行计算一定要采用apply_async函数。

import multiprocessing import time from random import randint, seed def f(num):   seed()   rand_num = randint(0,10) # 每次都随机生成一个停顿时间   time.sleep(rand_num)   return (num, rand_num) start_time = time.time() cores = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=cores) pool_list = [] result_list = [] start_time = time.time() for xx in xrange(10):   pool_list.append(pool.apply_async(f, (xx, ))) # 这里不能 get, 会阻塞进程 result_list = [xx.get() for xx in pool_list] #在这里不免有人要疑问,为什么不直接在 for 循环中直接 result.get()呢?这是因为pool.apply_async之后的语句都是阻塞执行的,调用 result.get() 会等待上一个任务执行完之后才会分配下一个任务。事实上,获取返回值的过程最好放在进程池回收之后进行,避免阻塞后面的语句。 # 最后我们使用一下语句回收进程池:   pool.close() pool.join() print result_list print '并行花费时间 %.2f' % (time.time() - start_time) print '串行花费时间 %.2f' % (sum([xx[1] for xx in result_list])) #[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)] #并行花费时间 14.11 #串行花费时间 45.00

map map_async

map_async 是 map的异步执行函数。

相比于 apply_async, map_async 只能接受一个参数。

import time from multiprocessing import Pool def run(fn):  #fn: 函数参数是数据列表的一个元素  time.sleep(1)  return fn*fn if __name__ == "__main__":  testFL = [1,2,3,4,5,6]   print '串行:' #顺序执行(也就是串行执行,单进程)  s = time.time()  for fn in testFL:   run(fn)  e1 = time.time()  print "顺序执行时间:", int(e1 - s)  print '并行:' #创建多个进程,并行执行  pool = Pool(4) #创建拥有5个进程数量的进程池  #testFL:要处理的数据列表,run:处理testFL列表中数据的函数  rl =pool.map(run, testFL)   pool.close()#关闭进程池,不再接受新的进程  pool.join()#主进程阻塞等待子进程的退出  e2 = time.time()  print "并行执行时间:", int(e2-e1)  print rl # 串行: # 顺序执行时间: 6 # 并行: # 并行执行时间: 2 # [1, 4, 9, 16, 25, 36]

Process

采用Process必须注意的是,Process对象来创建进程,每一个进程占据一个CPU,所以要建立的进程必须 小于等于 CPU的个数。

如果启动进程数过多,特别是当遇到CPU密集型任务,会降低并行的效率。

#16.6.1.1. The Process class from multiprocessing import Process, cpu_count import os import time start_time = time.time() def info(title): #   print(title)   if hasattr(os, 'getppid'): # only available on Unix     print 'parent process:', os.getppid()   print 'process id:', os.getpid()   time.sleep(3) def f(name):   info('function f')   print 'hello', name if __name__ == '__main__': #   info('main line')   p_list = [] # 保存Process新建的进程   cpu_num = cpu_count()   for xx in xrange(cpu_num):     p_list.append(Process(target=f, args=('xx_%s' % xx,)))   for xx in p_list:     xx.start()   for xx in p_list:     xx.join()   print('spend time: %.2f' % (time.time() - start_time)) parent process: 11741 # parent process: 11741 # parent process: 11741 # process id: 12249 # process id: 12250 # parent process: 11741 # process id: 12251 # process id: 12252 # hello xx_1 # hello xx_0 # hello xx_2 # hello xx_3 # spend time: 3.04

进程间通信

Process和Pool均支持Queues 和 Pipes 两种类型的通信。

Queue 队列

队列遵循先进先出的原则,可以在各个进程间使用。

# 16.6.1.2. Exchanging objects between processes # Queues from multiprocessing import Process, Queue def f(q):   q.put([42, None, 'hello']) if __name__ == '__main__':   q = Queue()   p = Process(target=f, args=(q,))   p.start()   print q.get()  # prints "[42, None, 'hello']"   p.join()

pipe

from multiprocessing import Process, Pipe def f(conn):   conn.send([42, None, 'hello'])   conn.close() if __name__ == '__main__':   parent_conn, child_conn = Pipe()   p = Process(target=f, args=(child_conn,))   p.start()   print parent_conn.recv()  # prints "[42, None, 'hello']"   p.join()

queue 与 pipe比较

Pipe() can only have two endpoints.

Queue() can have multiple producers and consumers.

When to use them

If you need more than two points to communicate, use a Queue().

If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

参考:

https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue

共享资源

多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。

在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。

此时我们可以通过共享内存和Manager的方法来共享资源。

但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

共享内存

共享内存仅适用于 Process 类,不能用于进程池 Pool

# 16.6.1.4. Sharing state between processes # Shared memory from multiprocessing import Process, Value, Array def f(n, a):   n.value = 3.1415927   for i in range(len(a)):     a[i] = -a[i] if __name__ == '__main__':   num = Value('d', 0.0)   arr = Array('i', range(10))   p = Process(target=f, args=(num, arr))   p.start()   p.join()   print num.value   print arr[:] # 3.1415927 # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Manager Class

Manager Class 既可以用于Process 也可以用于进程池 Pool。

from multiprocessing import Manager, Process def f(d, l, ii):   d[ii] = ii   l.append(ii) if __name__ == '__main__':   manager = Manager()   d = manager.dict()   l = manager.list(range(10))   p_list = []    for xx in range(4):     p_list.append(Process(target=f, args=(d, l, xx)))   for xx in p_list:     xx.start()   for xx in p_list:     xx.join()   print d   print l # {0: 0, 1: 1, 2: 2, 3: 3} # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]

补充:python程序多进程运行时间计算/多进程写数据/多进程读数据

import time time_start=time.time() time_end=time.time() print('time cost',time_end-time_start,'s')

单位为秒,也可以换算成其他单位输出

注意写测试的时候,函数名要以test开头,否则运行不了。

多线程中的问题:

1)多线程存数据:

def test_save_features_to_db(self):     df1 = pd.read_csv('/home/sc/PycharmProjects/risk-model/xg_test/statis_data/shixin_company.csv')     com_list = df1['company_name'].values.tolist()     # com_list = com_list[400015:400019]     # print 'test_save_features_to_db'     # print(com_list)     p_list = [] # 进程列表     i = 1     p_size = len(com_list)     for company_name in com_list:       # 创建进程       p = Process(target=self.__save_data_iter_method, args=[company_name])       # p.daemon = True       p_list.append(p)       # 间歇执行进程       if i % 20 == 0 or i == p_size: # 20页处理一次, 最后一页处理剩余         for p in p_list:           p.start()         for p in p_list:           p.join() # 等待进程结束         p_list = [] # 清空进程列表       i += 1

总结:多进程写入的时候,不需要lock,也不需要返回值。

核心p = Process(target=self.__save_data_iter_method, args=[company_name]),其中target指向多进程的一次完整的迭代,arg则是该迭代的输入。

注意写法args=[company_name]才对,原来写成:args=company_name,args=(company_name)会报如下错:只需要1个参数,而给出了34个参数。

多进程外层循环则是由输入决定的,有多少个输入就为多少次循环,理解p.start和p.join;

def __save_data_iter_method(self, com):     # time_start = time.time()     # print(com)     f_d_t = ShiXinFeaturesDealSvc()     res = f_d_t.get_time_features(company_name=com)     # 是否失信     shixin_label = res.shixin_label     key1 = res.shixin_time     if key1:       public_at = res.shixin_time       company_name = res.time_map_features[key1].company_name       # print(company_name)       established_years = res.time_map_features[key1].established_years       industry_dx_rate = res.time_map_features[key1].industry_dx_rate       regcap_change_cnt = res.time_map_features[key1].regcap_change_cnt       share_change_cnt = res.time_map_features[key1].share_change_cnt       industry_dx_cnt = res.time_map_features[key1].industry_dx_cnt       address_change_cnt = res.time_map_features[key1].address_change_cnt       fr_change_cnt = res.time_map_features[key1].fr_change_cnt       judgedoc_cnt = res.time_map_features[key1].judgedoc_cnt       bidding_cnt = res.time_map_features[key1].bidding_cnt       trade_mark_cnt = res.time_map_features[key1].trade_mark_cnt       network_share_cancel_cnt = res.time_map_features[key1].network_share_cancel_cnt       cancel_cnt = res.time_map_features[key1].cancel_cnt       industry_all_cnt = res.time_map_features[key1].industry_all_cnt       network_share_zhixing_cnt = res.time_map_features[key1].network_share_zhixing_cnt       network_share_judge_doc_cnt = res.time_map_features[key1].network_share_judge_doc_cnt       net_judgedoc_defendant_cnt = res.time_map_features[key1].net_judgedoc_defendant_cnt       judge_doc_cnt = res.time_map_features[key1].judge_doc_cnt       f_d_do = ShixinFeaturesDto(company_name=company_name, established_years=established_years,                     industry_dx_rate=industry_dx_rate, regcap_change_cnt=regcap_change_cnt,                     share_change_cnt=share_change_cnt, industry_all_cnt=industry_all_cnt,                     industry_dx_cnt=industry_dx_cnt, address_change_cnt=address_change_cnt,                     fr_change_cnt=fr_change_cnt, judgedoc_cnt=judgedoc_cnt,                     bidding_cnt=bidding_cnt, trade_mark_cnt=trade_mark_cnt,                     network_share_cancel_cnt=network_share_cancel_cnt, cancel_cnt=cancel_cnt,                     network_share_zhixing_cnt=network_share_zhixing_cnt,                     network_share_judge_doc_cnt=network_share_judge_doc_cnt,                     net_judgedoc_defendant_cnt=net_judgedoc_defendant_cnt,                     judge_doc_cnt=judge_doc_cnt, public_at=public_at, shixin_label=shixin_label)       # time_end = time.time()       # print('totally cost', time_end - time_start)       self.cfdbsvc.save_or_update_features(f_d_do) def save_or_update_features(self, shixin_features_dto):     """     添加或更新:     插入一行数据, 如果不存在则插入,存在则更新     """     self._pg_util = PgUtil()     p_id = None     if isinstance(shixin_features_dto, ShixinFeaturesDto):       p_id = str(uuid.uuid1())       self._pg_util.execute_sql(         self.s_b.insert_or_update_row(           self.model.COMPANY_NAME,           {             self.model.ID: p_id,             # 公司名             self.model.COMPANY_NAME: shixin_features_dto.company_name,             # 失信时间             self.model.PUBLIC_AT: shixin_features_dto.public_at,             self.model.SHIXIN_LABEL : shixin_features_dto.shixin_label,             self.model.ESTABLISHED_YEARS: shixin_features_dto.established_years,              self.model.INDUSTRY_DX_RATE: shixin_features_dto.industry_dx_rate,              self.model.REGCAP_CHANGE_CNT: shixin_features_dto.regcap_change_cnt,              self.model.SHARE_CHANGE_CNT: shixin_features_dto.share_change_cnt,              self.model.INDUSTRY_ALL_CNT: shixin_features_dto.industry_all_cnt,              self.model.INDUSTRY_DX_CNT: shixin_features_dto.industry_dx_cnt,              self.model.ADDRESS_CHANGE_CNT: shixin_features_dto.address_change_cnt,              self.model.NETWORK_SHARE_CANCEL_CNT: shixin_features_dto.network_share_cancel_cnt,             self.model.CANCEL_CNT: shixin_features_dto.cancel_cnt,              self.model.NETWORK_SHARE_ZHIXING_CNT: shixin_features_dto.network_share_zhixing_cnt,             self.model.FR_CHANGE_CNT: shixin_features_dto.fr_change_cnt,              self.model.JUDGEDOC_CNT: shixin_features_dto.judgedoc_cnt,              self.model.NETWORK_SHARE_JUDGE_DOC_CNT: shixin_features_dto.network_share_judge_doc_cnt,             self.model.BIDDING_CNT: shixin_features_dto.bidding_cnt,              self.model.TRADE_MARK_CNT: shixin_features_dto.trade_mark_cnt,              self.model.JUDGE_DOC_CNT: shixin_features_dto.judge_doc_cnt            },           [self.model.ADDRESS_CHANGE_CNT,self.model.BIDDING_CNT,self.model.CANCEL_CNT,            self.model.ESTABLISHED_YEARS,self.model.FR_CHANGE_CNT,self.model.INDUSTRY_ALL_CNT,            self.model.INDUSTRY_DX_RATE,self.model.INDUSTRY_DX_CNT,self.model.JUDGE_DOC_CNT,            self.model.JUDGEDOC_CNT,self.model.NETWORK_SHARE_CANCEL_CNT,self.model.NETWORK_SHARE_JUDGE_DOC_CNT,            self.model.NETWORK_SHARE_ZHIXING_CNT,self.model.REGCAP_CHANGE_CNT,self.model.TRADE_MARK_CNT,            self.model.SHARE_CHANGE_CNT,self.model.SHIXIN_LABEL,self.model.PUBLIC_AT]         )       )     return p_id

函数中重新初始化了self._pg_util = PgUtil(),否则会报ssl error 和ssl decryption 的错误,背后原因有待研究!

**2)多进程取数据——(思考取数据为何要多进程)**   def flush_process(self, lock): #需要传入lock;     """     运行待处理的方法队列     :type lock Lock     :return 返回一个dict     """     # process_pool = Pool(processes=20)     # data_list = process_pool.map(one_process, self.__process_data_list)     #     # for (key, value) in data_list:     #     # 覆盖上期变量     self.__dct_share = self.__manager.Value('tmp', {}) # 进程共享变量     p_list = [] # 进程列表     i = 1     p_size = len(self.__process_data_list)     for process_data in self.__process_data_list:  **#循环遍历需要同时查找的公司列表!!!self.__process_data_list包含多个process_data,每个process_data包含三种属性?类对象也可以循环????**       # 创建进程       p = Process(target=self.__one_process, args=(process_data, lock)) #参数需要lock       # p.daemon = True       p_list.append(p)       # 间歇执行进程       if i % 20 == 0 or i == p_size: # 20页处理一次, 最后一页处理剩余         for p in p_list:           p.start()         for p in p_list:           p.join() # 等待进程结束         p_list = [] # 清空进程列表       i += 1     # end for     self.__process_data_list = [] # 清空订阅     return self.__dct_share.value  def __one_process(self, process_data, lock):  #迭代函数     """     处理进程     :param process_data: 方法和参数集等     :param lock: 保护锁     """     fcn = process_data.fcn     params = process_data.params     data_key = process_data.data_key     if isinstance(params, tuple):       data = fcn(*params) #**注意:*params 与 params区别**     else:       data = fcn(params)     with lock:       temp_dct = dict(self.__dct_share.value)       if data_key not in temp_dct:         temp_dct[data_key] = []       temp_dct[data_key].append(data)       self.__dct_share.value = temp_dct

主程序调用:

def exe_process(self, company_name, open_from, time_nodes):     """     多进程执行pre订阅的数据     :param company_name: 公司名     :return:     """     mul_process_helper = MulProcessHelper()     lock = Lock()     self.__get_time_bidding_statistic(company_name, mul_process_helper)     data = mul_process_helper.flush_process(lock)     return data  def __get_time_bidding_statistic(self, company_name, mul_process_helper):     # 招投标信息     process_data = ProcessData(f_e_t_svc.get_bidding_statistic_time_node_api, company_name,                   self.__BIDDING_STATISTIC_TIME) **#此处怎么理解?ProcessData是一个类!!!**     mul_process_helper.add_process_data_list(process_data)  #同时调用多个api???将api方法当做迭代????用于同时查找多个公司????  def add_process_data_list(self, process_data):     """     添加用于进程处理的方法队列     :type process_data ProcessData     :param process_data:     :return:     """     self.__process_data_list.append(process_data)  class ProcessData(object):   """   用于进程处理的的数据   """   def __init__(self, fcn, params, data_key):     self.fcn = fcn # 方法     self.params = params # 参数     self.data_key = data_key # 存储到进程共享变量中的名字

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI