当前位置: 移动技术网 > IT编程>脚本编程>Python > 荐 【操作系统知识】python实现多进程

荐 【操作系统知识】python实现多进程

2020年07月11日  | 移动技术网IT编程  | 我要评论
5. python实现多进程Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次返回一次,但是fork()调用一次返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回进程的ID。一个父进程可以fork出多个子进程,所以父进程要记下每个子进程的ID,而子进程只主要调用getppid()就可以拿到父进程的ID。父进程、子进程执行的顺序没有规律,完全取决于操作系统的调度

5. python实现多进程

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次返回一次,但是fork()调用一次返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回进程的ID。一个父进程可以fork出多个子进程,所以父进程要记下每个子进程的ID,而子进程只主要调用getppid()就可以拿到父进程的ID。

父进程、子进程执行的顺序没有规律,完全取决于操作系统的调度算法。各个进程有独立的运行空间,不共享全局变量;有时会因父进程提前退出,子进程的父进程和开始的不一致。

# os模块封装了常见的系统调用

import os

print(f"Process {os.getpid()} start...")  # unix/linux/mac系统中

pid = os.fork()

if pid < 0:
    print("fork()调用失败")
if pid == 0:
    print(f"父进程pid: {os.getppid()}, 子进程pid: {os.gerpid()}.")  # 子进程在系统的pid不为0
else:
    print(f"子进程pid: {os.getpid()}, 父进程pid: {pid}")  # 父进程返回子进程的ID

# 运行结果
'''
Process 3362 start...
父进程pid:3362, 子进程pid:3363
子进程pid:3363, 子进程pid:3362
'''

由于Windows没有fork调用,而Python是跨平台的,所以可以调用multiprocessing模块中的Process类来实现跨平台的多进程。

Process()的语法:Process([group[, target[, name [, args[, kwargs]]]]]),group参数未使用,默认为None;参数target表示这个进程实例所调用的对象;args表示调用这个对象的位置和参数元组;kwargs表示调用对象的关键子参数字典;name子进程名称。

Process属性方法

方法/属性 说明
start() 启动进程,调用进程中的run()方法。
run() 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法。
timinate() 强制终止进程,不会进行任何清理操作。如果该进程终止前,创建了自进程,那么该子进程在其强制结束后变为僵尸进程;如果进程还保存了一个锁,那么也将不会被释放,进而导致死锁。使用时,要注意。
is_alive() 判断某进程是否存活,存活返回True,否则False。
join([timeout]) 主线程等待子线程终止。timeout为可选超时时间;p.join()只能join住start开启的进程,而不能join住run开启的进程
daemon() 默认值False, 如果设置为True,代表该进程为后台守护进程;当该进程的父进程终止时,该进程也随之终止;并且设置为True后,该进程不能创建子进程,设置该属性必须在start()之前。
exitcode 进程运行时为None,如果为-N,表示信号N结束了。
authkey 进程身份验证,默认是由os.urandom()随机生成32字符的字符串。这个键的用途是设计涉及网络链接的低层进程间的通信提供安全性,这类连接只有在具有相同身份验证才能成功。
from multiprocessing import Process
import os
import time

# 子进程要执行的代码
def run_proc(name):
    time.sleep(3)
    print(f"子进程{name}运行中,pid={os.getpid()},父进程的pid={os.getppid()}")

if __name__ == "__main__":
    print(f"父进程是{os.getpid()}.")
    p = Process(target=run_proc, args=('test',))
    print("子进程开始启动.")
    p.start()  # 启动进程
    p.join()  # 阻塞当前进程,直到调用join方法的那个进程执行完,在继续执行当前进程
    print("子进程运行结束")

# 结果
'''
父进程是12324.
子进程开始启动.
子进程test运行中,pid=4472,父进程的pid=12324
子进程运行结束
'''

创建自己的进程类,继承于Process类:

from multiprocessing import Process
import os
import time

class MyProcess(Process):
    def __init__(self, interval):
        super().__init__()
        self.interval = interval

    def run(self):
        print("子进程")
        start_time = time.time()
        time.sleep(self.interval)
        stop_time = time.time()
        print(f"子进程id: {os.getpid()}, 父进程id:{os.getppid()}, 共执行了{stop_time-start_time: .2f} 秒.")

if __name__ == "__main__":
    print("主进程")
    startTime = time.time()
    p = MyProcess(2)
    p.start()
    p.join()
    stopTime = time.time()
    print(f"子进程结束,花费了{stopTime-startTime: .2f}")

# 结果
'''
主进程
子进程
子进程id: 3008, 父进程id:13084, 共执行了 2.00 秒.
子进程结束,花费了 2.79
'''

如果需要创建很多进程,需要用到进程池Pool方法。初始化进程时,可以指定一个最大进程数(默认为CPU核数),当池中的进程数达到最大值时,该请求就会等待,直到池中有进程结束,才会创建新的进程来执行。

from multiprocessing import Pool
import os
import time
import random

# 进程类
def worker(msg):
    start_time = time.time()
    print(f"{msg}开始执行,执行进程id为{os.getpid()},父进程id为{os.getppid()}")
    time.sleep(random.random()*2)
    stop_time = time.time()
    print(msg, f"执行完毕,耗时{stop_time-start_time: .2f}秒。")

if __name__ == "__main__":
    print("----开始执行----")
    start = time.time()
    pool = Pool(3)
    for i in range(0, 10):
        pool.apply_async(worker, (i,))  # 异步非阻塞
    pool.close()  # 关闭进程池,关闭后不再就收新请求
    pool.join()  # 必须放在close语句后面
    stop = time.time()
    print(f"执行结束,总耗时{stop-start:.2f}")

# 结果
'''
----开始执行----
0开始执行,执行进程id为4640,父进程id为7116
1开始执行,执行进程id为8,父进程id为7116
2开始执行,执行进程id为7728,父进程id为7116
0 执行完毕,耗时 0.09秒。
3开始执行,执行进程id为4640,父进程id为7116
2 执行完毕,耗时 0.10秒。
4开始执行,执行进程id为7728,父进程id为7116
4 执行完毕,耗时 0.04秒。
5开始执行,执行进程id为7728,父进程id为7116
3 执行完毕,耗时 0.20秒。
6开始执行,执行进程id为4640,父进程id为7116
6 执行完毕,耗时 0.12秒。
7开始执行,执行进程id为4640,父进程id为7116
5 执行完毕,耗时 1.38秒。
8开始执行,执行进程id为7728,父进程id为7116
1 执行完毕,耗时 1.95秒。
9开始执行,执行进程id为8,父进程id为7116
8 执行完毕,耗时 0.48秒。
7 执行完毕,耗时 1.81秒。
9 执行完毕,耗时 1.58秒。
执行结束,总耗时4.63
'''

如果使用进程池pool创建进程的话,就需要使用Manager().Queue()

from multiprocessing import Manager, Pool
import os

# queue,实现多进程间的数据传递,其实是个消息队列
def write(q):
    print(f"--开始执行写进程 {os.getpid()}--")
    for value in ['A', 'B', 'C']:
        print(f"把 {value} 放入队列")
        q.put(value)

def read(q):
    print(f"--开始执行读进程 {os.getpid()}--")
    for i in range(q.qsize()):
        print(f"从队列读取 {q.get(True)}")

if __name__ == "__main__":
    print(f"--开始执行主进程 {os.getpid()}--")
    q = Manager().Queue()
    p = Pool()
    p.apply(write, args=(q,))  # apply 阻塞,上个进程结束才会执行下一个进程
    p.apply(read, args=(q,))
    p.close()  # 关闭pool 不再接收请求
    p.join()  # 主进程阻塞,等待子进程运行结束,必须在close以后
    print("--主进程结束--")

# 结果
'''
--开始执行主进程 6012--
--开始执行写进程 11524--
把 A 放入队列
把 B 放入队列
把 C 放入队列
--开始执行读进程 11524--
从队列读取 A
从队列读取 B
从队列读取 C
--主进程结束--
'''

当子进程不是自身,而是一个外部进程时,创建子进程后,还需要控制子进程的输入和输出。subprocess模块可以非常方便的启动子进程,然后可以通过communicate()方法输入。

python 中subprocess

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode("gbk"))
print('exit code:', p.returncode)

# 结果
'''
$ nslookup
默认服务器:  dnspai-public-dns.dnspai.com
Address:  101.226.4.6

> > 服务器:  dnspai-public-dns.dnspai.com
Address:  101.226.4.6

python.org	MX preference = 50, mail exchanger = mail.python.org
> 
exit code: 0
'''

本文地址:https://blog.csdn.net/holysll/article/details/107226353

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网