0%

【python】multiprocessing进程间的通信(一)队列

multiprocessing模块支持进程间通信的三种主要形式:队列管道共享内存。下面我们来学习一下队列的使用情况。

队列(multiprocessing.queue)

队列是线程和进程安全的,也就是一次只能有一个进程或线程进行操作。

示例:将子进程产生的信息统一写入父进程,这里是使用pool来创建进程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from multiprocessing import Pool, RLock, freeze_support,Queue,Pipe
import multiprocessing
import functools
import datetime
from time import sleep
from tqdm import tqdm
from TS_logs import TS_log_info
from TS_util import (
TS_util_get_tradelist,
TS_util_get_real_date,
TS_util_get_next_datetime,
TS_util_get_now_time,
TS_util_date_convert,
trade_date_sse
)


def log_c(q,codelist,n):
for trade_date in codelist[n::8]:
msg = '{} #Job{} Trying updating data of trade date {} .' \
.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),n,trade_date)
sleep(0.05)
q.put(msg)


L = list(range(8))

if __name__ == '__main__':
freeze_support() # for Windows support
manager = multiprocessing.Manager()
# 父进程创建Queue,并传给各个子进程:
q = manager.Queue()
tqdm.set_lock(RLock())
codelist = trade_date_sse
func = functools.partial(log_c,q,codelist)
p = Pool(initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),))
p.map(func, L)
p.close()
p.join()
while not q.empty():
TS_log_info(q.get())

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.