from multiprocessing import Pool defrun(data, index, size):# data 传入数据,index 数据分片索引,size进程数 ... return data # 可以返回数据,在后面收集起来 processor = 15 res = [] p = Pool(processor) for i in range(processor): res.append(p.apply_async(run, args=(data, i, processor,))) print(str(i) + ' processor started !') p.close() p.join() for i in res: print(i.get()) # 使用get获得多进程处理的结果
这里面有三个值得注意的地方
res.append(p.apply_async(run, args=(data, i, processor,)))
defrun(data, index, size):# data 传入数据,index 数据分片索引,size进程数 ... return data # 可以返回数据,在后面收集起来 processor = 2 res = [] tisks = [*] * 100 p = Pool(processor) for tisk in tisks: res.append(p.apply_async(run, args=(data, i, processor,))) print(str(i) + ' processor started !') p.close() p.join() for i in res: all.append(i.get()) # 使用get获得多进程处理的结果 save(all,"***.npy")
from multiprocessing import Pool import pandas as pd import math data=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]}) users=pd.DataFrame(data['user_id'].unique(),columns=['user_id']) processor=4 p=Pool(processor) l_data = len(users) size = math.ceil(l_data / processor) res = [] defrun(i): data=pd.read_csv('../data/user_'+str(i)+'.csv') #todo return data for i in range(processor): start = size * i end = (i + 1) * size if (i + 1) * size < l_data else l_data user = users[start:end] t_data = pd.merge(data, user, on='user_id').reset_index(drop=True) t_data.to_csv('../data/user_'+str(i)+'.csv',index=False) print(len(t_data)) del data,l_data,users for i in range(processor): res.append(p.apply_async(run, args=(i,))) print(str(i) + ' processor started !') p.close() p.join() data = pd.concat([i.get() for i in res])