0%

python | 使用多进程处理大规模文件

最近,有一个深度学习的任务,需要预处理数据,但是,单 cpu 跑起来实在是太慢了,而我又有一台 20 cpu 的机器,那,为啥不用起多进程呢?

干起来!


背景介绍


因为要处理大规模的脑电数据,这属于计算密集型,所以,我选择了多进程处理数据。


环境介绍


  • ubuntu 18.04

参考资料



业务需求


在某一个文件夹内,有多个文件,我想用多个进程,单独的读取每一个文件,然后对文件内容进行处理,每一个进程处理过后的结果,保存在一个 list 内,当所有进程都处理完之后,把 list 进行保存。

  • 多进程间没有顺序关系

代码实现


数据分片

可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并。

应用场景

  • 多进程爬虫
  • 类 mapreduce 任务

缺点是子进程会拷贝父进程所有状态,内存浪费严重。

先上一版模板代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

from multiprocessing import Pool

def run(data, index, size): # data 传入数据,index 数据分片索引,size进程数
...
return data # 可以返回数据,在后面收集起来

processor = 15
res = []
p = Pool(processor)
for i in range(processor):
res.append(p.apply_async(run, args=(data, i, processor,)))
print(str(i) + ' processor started !')
p.close()
p.join()
for i in res:
print(i.get()) # 使用get获得多进程处理的结果

这里面有三个值得注意的地方

  • res.append(p.apply_async(run, args=(data, i, processor,)))
    • res 添加的是进程本身,而不是返回的数据
  • for i in res
    • 这里面的 get 才是上面进程得到的数据

我们的任务场景可以用下面的代码替代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Pool

all = []

def run(data, index, size): # data 传入数据,index 数据分片索引,size进程数
...
return data # 可以返回数据,在后面收集起来

processor = 2
res = []
tisks = [*] * 100
p = Pool(processor)
for tisk in tisks:
res.append(p.apply_async(run, args=(data, i, processor,)))
print(str(i) + ' processor started !')
p.close()
p.join()
for i in res:
all.append(i.get()) # 使用get获得多进程处理的结果
save(all,"***.npy")

这里面有两个亮点

  • 当任务数大于进程数的时候
    • 任务会依次添加进进程池里面,然后运行
  • all 来收集所有的返回数据,然后保存

以下我没运行过

分文件处理

当内存受限时,不能再继续使用数据分片,因为子进程会拷贝父进程的所有状态,导致内存的浪费。这时候可以考虑先把大文件分片保存到磁盘,然后del 释放掉数据,接着在多进程处理的函数里面分别读取,这样子进程就会分别读取需要处理的数据,而不会占用大量内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Pool
import pandas as pd
import math
data=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]})
users=pd.DataFrame(data['user_id'].unique(),columns=['user_id'])
processor=4
p=Pool(processor)
l_data = len(users)
size = math.ceil(l_data / processor)
res = []
def run(i):
data=pd.read_csv('../data/user_'+str(i)+'.csv')
#todo
return data

for i in range(processor):
start = size * i
end = (i + 1) * size if (i + 1) * size < l_data else l_data
user = users[start:end]
t_data = pd.merge(data, user, on='user_id').reset_index(drop=True)
t_data.to_csv('../data/user_'+str(i)+'.csv',index=False)
print(len(t_data))

del data,l_data,users
for i in range(processor):
res.append(p.apply_async(run, args=(i,)))
print(str(i) + ' processor started !')
p.close()
p.join()
data = pd.concat([i.get() for i in res])

多进程数据共享

当需要修改共享的数据时,那么这个时候可以使用数据共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

from multiprocessing import Process, Manager
# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
def func(i, d):
d[i] = i + 100
print(d.values())
# 在主进程中创建特殊字典
m = Manager()
d = m.dict()
for i in range(5):
# 让子进程去修改主进程的特殊字典
p = Process(target=func, args=(i, d))
p.start()
p.join()
------------
[100]
[100, 101]
[100, 101, 102, 103]
[100, 101, 102, 103]
请我喝杯咖啡吧~