0%

【python】multiprocessing进程间的通信(二)管道

multiprocessing.Pipe()用来创建管道,返回两个连接对象,代表管道的两端,一般用于进程或者线程之间的通信,不同于os.pipe(),os.pipe()主要用来创建两个文件描述符,一个读,一个写,是单向的。而multiprocessing.Pipe()则可以双向通信。

管道(multiprocessing.Pipe)

multiprocessing.``Pipe([duplex])

返回一对 Connection 对象 (conn1, conn2) , 分别表示管道的两端。

如果 duplex 被置为 True (默认值),那么该管道是双向的。如果 duplex 被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。

示例:将子进程产生的信息统一写入父进程,这里是使用pool来创建进程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from multiprocessing import Pool, RLock, freeze_support,Queue,Pipe
import multiprocessing
import functools
import datetime
from time import sleep
from tqdm import tqdm
from TS_logs import TS_log_info
from TS_util import (
TS_util_get_tradelist,
TS_util_get_real_date,
TS_util_get_next_datetime,
TS_util_get_now_time,
TS_util_date_convert,
trade_date_sse
)


def log_c(child_conn,codelist,n):
for trade_date in codelist[n::8][0:17]:
msg = '{} #Job{} trade date {}.' \
.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),n,trade_date)
child_conn.send(msg)
print('workers exit')

L = list(range(8))

if __name__ == '__main__':
freeze_support() # for Windows support
parent_conn,child_conn = Pipe()
tqdm.set_lock(RLock())
codelist = trade_date_sse
func = functools.partial(log_c,child_conn,codelist)
p = Pool(initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),))
p.map(func, L)

while parent_conn.poll(0.001):
print(parent_conn.recv())
p.close()
p.join()
parent_conn.close()
child_conn.close()

当子进程发送一定量的数据之后会卡住,没有任何报错和提示,卡住不动!!测试大概是7000字节左右。原因尚未找到。按文档看,应该与OS有关,最大应该可达32M。