这里用简单的例子说一下多线程与主线程的执行。
相关博文。
我在做量化的时候,有这样一个场景。我主线程获取数据(会把各个 symbol
放在一起),然后用线程池来处理主线程中的数据(每个线程处理一个 symbol
)。
如果其中一个线程察觉有利润,就会去执行该 symbol
的策略逻辑。
下面会使用几个例子来说明
time.sleep(1)
模拟处理数据
time.sleep(20)
模拟有利润,执行策略
方法一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import time from concurrent.futures import ThreadPoolExecutor, as_completed
pool = ThreadPoolExecutor(max_workers=5)
def t(i, pool_run_key): print(i) if i == 5: time.sleep(20) time.sleep(1) pool_run_key[i] = False
if __name__ == '__main__': pool_run_key = {} while 1:
tasks = [] for i in range(10): tasks.append(pool.submit(t, i, pool_run_key))
time.sleep(0.1) print("=============")
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| 0 1 2 3 4 ============= ============= ============= ============= ============= ============= ============= ============= ============= 56 7
8 9 ============= ============= ============= ============= ============= ============= ============= ============= ============= ============= 0 1 2 3 ============= ============= ============= ============= ============= ============= ============= ============= ============= ============= 4 5 6 7 ============= ============= ============= ============= ============= =============
|
主线程和线程池各执行各的。
而且,发现 i = 5
的时候,根本没有 time.sleep(20)
。
并且,虽然线程池的数量小于 for
循环的总量,但是,还是会把 for
循环执行完再下一步。(当然也有可能只是添加了线程池任务,具体等下次分析)
方法二
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import time from concurrent.futures import ThreadPoolExecutor, as_completed
pool = ThreadPoolExecutor(max_workers=5)
def t(i, pool_run_key): print(i) if i == 5: time.sleep(20) time.sleep(1) pool_run_key[i] = False
if __name__ == '__main__': pool_run_key = {} while 1:
tasks = [] for i in range(10): tasks.append(pool.submit(t, i, pool_run_key))
for result in as_completed(tasks): data = result.result()
time.sleep(1) print("=============")
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| /Users/licong/anaconda3/envs/py3.9/bin/python /Users/licong/python/Rhino/Rhino-Gateway/t.py 0 1 2 3 4 56 7
89
============= 01
2 3 4 5 6 78
9 ============= 0 1 2 3 4 5 6 78
9
|
主线程会等待线程池的线程执行完毕,才再次执行。
这样导致的结果效率非常低。
方法三 我使用的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import time from concurrent.futures import ThreadPoolExecutor, as_completed
pool = ThreadPoolExecutor(max_workers=5)
def t(i, pool_run_key): print(i) if i == 5: time.sleep(20) time.sleep(1) pool_run_key[i] = False
if __name__ == '__main__': pool_run_key = {} while 1:
for i in range(10): if pool_run_key.setdefault(i, False) is False: pool_run_key[i] = True pool.submit(t, i, pool_run_key)
time.sleep(1) print("=============")
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| 01
23
4 5 6 7 8 9 ============= 0 1 2 ============= 3 46 7
8 ============= 0 1 2 3 ============= 9 4 67
|
进程和线程各执行各的。