当前位置: 移动技术网 > IT编程>脚本编程>Python > python进程-进阶

python进程-进阶

2018年08月27日  | 移动技术网IT编程  | 我要评论

elkelake,鹿山大院,真武傲剑

进程同步(multiprocessing.lock(锁机制)、multiprocessing.semaphore(信号量机制)、multiprocessing.event(事件机制))

  在计算机中,有一些硬件和软件,例如处理器、打印机等,都属于竞争类资源,当有需求时,很多进程都要争抢这些资源,而对于这类资源,就属于临界资源。当多进程共同处理某一个数据时,这个数据也就属于一个临界资源。操作系统对计算机内各种资源都使其在竞争中有序化,但是对于数据来说,尤其是用户动态产生的数据,当处理时就变成了临界资源,所以我们作为程序猿来说,需要对临界资源加以保护,否则就会出现数据混乱现象。这是在提高程序效率的优势下,带来的一个隐患。 

multiprocessing.lock(锁机制)

  当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。

from multiprocessing import process
import random
import time

def func(addr):
    print('我是%s'%addr)
    time.sleep(random.random())
    print('谢谢!')

if __name__ == '__main__':
    l = ['四川的','湖南的','河南的','江苏的']
    for addr in l:
        p = process(target=func,args=(addr,))
        p.start()
    time.sleep(2)
    print('\n\n我选%s'%random.choice(l))
# 关于抢占输出资源的事情,是指多进程并发执行时,并不是一个进程执行完任务后其他进程再执行。
# 比如 此程序会输出:我是四川的  我是河南的 我是江苏的 谢谢!谢谢!我是湖南的 谢谢! 谢谢!
# 而不是 : 我是四川的 谢谢! 我是河南的 谢谢! ...

多进程关于抢占输出资源的事情
多进程关于抢占输出资源的事情
from multiprocessing import process
import random
import time
from multiprocessing import lock
def func(addr,lock):
    lock.acquire()
    print('我是%s'%addr)
    time.sleep(random.random())
    print('谢谢!')
    lock.release()

if __name__ == '__main__':
    lock = lock()
    l = ['四川的','湖南的','河南的','江苏的']
    for addr in l:
        p = process(target=func,args=(addr,lock))
        p.start()
    time.sleep(4)
    print('\n\n我选%s'%random.choice(l))

使用锁维护输出资源
使用锁维护输出资源

  上面这种情况,使用了加锁的形式确保了程序的顺序执行,但是执行又变成了串行,降低了效率,但是不得不说,它确保了数据的安全性。

      下面举例来说锁的重要性:模拟12306抢票问题。模拟银行账户的存取款问题。

# 注意,文件中存储需要以{'c':1}这种形式,c的引号一定要带
# 否则json识别不出来
# 此代码的效果,并发执行,但是多进程同时读写同一个文件数据,造成数据混乱

from multiprocessing import process,lock
import json
import time

def check(i,l):
    with open('a.txt','r',encoding='utf-8') as f:
        dic = json.load(f)
    print('第%s个人在查票,余票为%s' % (i, dic['c']))
    pay(i,l)

def pay(i,l):
    with open('a.txt','r',encoding='utf-8') as f:
        dic = json.load(f)
    time.sleep(0.5)# 模拟网络延迟,当购买过程中也会有网络延迟
    if dic['c']:
        print('第%s个人买到票了 '%i)
        dic['c'] -= 1
    else:
        print('第%s个人没买到票'%i)
    with open('a.txt','w') as f:
        json.dump(dic,f)

if __name__ == '__main__':
    l = lock()
    for i in range(10):
        p = process(target=check,args=(i+1,l))
        p.start()

多个人同时抢票
多个人同时抢票

很明显,上述例子中,因为多进程同时对一个临界资源(a.txt文件)进行了读写操作,使文件内数据混乱,也造成了余票为1张,但是很多人都抢到票的假象。那就加锁来解决它吧

from multiprocessing import process,lock
import json
import time

def check(i,l):
    with open('a.txt','r',encoding='utf-8') as f:
        dic = json.load(f)
    print('第%s个人在查票,余票为%s' % (i, dic['c']))
    l.acquire()
    pay(i,l)# 为什么在这里加锁? 因为每个人都可以查票,读取数据,不会造成数据混乱,但是当买票的时候,就需要对临界资源的写入,所以对写操作加锁,使某一个进程在写文件时候,其他进程不能碰此文件。
    l.release()

def pay(i,l):
    with open('a.txt','r',encoding='utf-8') as f:
        dic = json.load(f)
    time.sleep(0.5)# 模拟网络延迟,当购买过程中也会有网络延迟
    if dic['c']:
        print('第%s个人买到票了 '%i)
        dic['c'] -= 1
    else:
        print('第%s个人没买到票'%i)
    with open('a.txt','w') as f:
        json.dump(dic,f)

if __name__ == '__main__':
    l = lock()
    for i in range(10):
        p = process(target=check,args=(i+1,l))
        p.start()
加锁解决买票问题

关于银行存取款的问题。同一个账户,某个人一直存,某个人在同一时间一直取,如果不对数据进行保护起来,就会造成的一种数据混乱问题。

from multiprocessing import process, lock,value

def save_money(num):
    for i in range(100):
        time.sleep(0.05)
        num.value += 1

def draw_money(num):
    for i in range(100):
        time.sleep(0.05)
        num.value -= 1

if __name__ == '__main__':
    num = value('i',1000)# 多进程中共享数据,一个int类型的数据,1000
    man = process(target=save_money,args=(num,))
    woman = process(target=draw_money,args=(num,))
    man.start()
    woman.start()
    time.sleep(6)
    print(num.value)
from multiprocessing import process, lock,value

def save_money(num,l):
    for i in range(100):
        time.sleep(0.05)
        l.acquire()
        num.value += 1
        l.release()

def draw_money(num,l):
    for i in range(100):
        time.sleep(0.05)
        l.acquire()# 在操作存取款的数据时,先将数据锁住,不允许其他人更改此数据
        num.value -= 1
        l.release()

if __name__ == '__main__':
    l = lock()
    num = value('i',1000)# 多进程中共享数据,一个int类型的数据,1000
    man = process(target=save_money,args=(num,l))
    woman = process(target=draw_money,args=(num,l))
    man.start()
    woman.start()
    time.sleep(6)
    print(num.value)

这样才对!!!

multiprocessing.semaphore(信号量机制)

上述讲的lock,属于互斥锁,也就是一把钥匙配备一把锁,同时只允许锁住某一个数据。而信号量则是多把钥匙配备多把锁,也就是说同时允许锁住多个数据。

比如在一个粉红发廊,里边有5位服务人员,那么这个发廊最多就同时允许进入5位客人,当又有第6位客人来的时候,就需要在门外等待;当服务人员服务完某位客人后,才允许后续的人再进来一个,换句话说,这个发廊最多同时接待5位客人,多的客人必须等待。

信号量同步基于内部计数器,用户初始化一个计数器初值(比如上述例子中就初始化为5),每调用一次acquire(),计数器减1;每调用一次release(),计数器加1。当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(dijkstra)信号量概念p()和v()的python实现。信号量同步机制适用于访问像服务器这样的有限资源。信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

举个栗子:

from multiprocessing import semaphore
from multiprocessing import process
import time
import random

def sing(i,se):
    se.acquire()# 每次进来一位客人,信号量内部计数器减1
    print('%s进入小黑屋'%i)
    time.sleep(random.randint(1,3))
    print('%s交钱走人'%i)
    se.release()# 每次离开一位客人,信号量内部计数器加1


if __name__ == '__main__':
    se = semaphore(5)# 初始化5把钥匙配备5把锁
    for i in range(10): # 模拟10个人要进入小黑屋子
        p = process(target=sing,args=(i,se))
        p.start()

multiprocessing.event(事件机制)

python中的事件机制,主要用于主进程控制其他进程的执行,事件主要提供了三个方法 set、wait、clear。

    e = event()
    e.set() #将is_set()设为true
    e.clear() # 将is_set()设为false
    e.wait() #判断is_set的bool值,如果bool为true,则非阻塞,bool值为false,则阻塞
    e.is_set() # 标识
    事件是通过is_set()的bool值,去标识e.wait() 的阻塞状态
    当is_set()的bool值为false时,e.wait()是阻塞状态
    当is_set()的bool值为true时,e.wait()是非阻塞状态
    当使用set()时,是把is_set的bool变为true
    当使用clear()时,是把is_set的bool变为false

举个栗子:

from multiprocessing import process, event
import time


def tra(e):
    while 1: #红绿灯需要一直亮着,要么红灯,要么绿灯
        if e.is_set(): #true代表绿灯了,表示可以过车
            time.sleep(5)#睡5秒,让车在这5秒的时间内通过
            print('\033[31m红灯亮\033[0m')#绿灯亮5秒后提示红灯亮
            e.clear()#把is_set设置为false
        else:
            time.sleep(5)#此时代表红灯亮,应该红灯亮5秒.在此等5秒
            print('\033[32m绿灯亮\033[0m')#红灯亮够5秒该绿灯亮了
            e.set()#将is_set设置为true

def car(i,e):
    e.wait()#车等在红绿灯,此时要看是红灯还剩绿灯,如果is_set = true 就可以过车
    print('第%s辆车过去了' % i)


if __name__ == '__main__':
    e = event()
    triff_light = process(target=tra,args=(e,))#信号灯的进程
    triff_light.start()
    for i in range(50):#描述50辆车的进程
        if i % 3 == 0:
            time.sleep(2)
        car = process(target=car,args=(i+1,e,))
        car.start()

生产者消费者模型

第一种:

from multiprocessing import queue,process



def producer(q,product):
    for i in range(20):
        info = product + '的娃娃%s号' % i
        q.put(info)
    q.put(none)


def consumer(q,name):
    while 1:
        info = q.get()
        if info:
            print('%s拿走了%s' % (name,info))
        else:
            break

if __name__ == '__main__':
    q = queue(20)
    p_pro = process(target=producer,args=(q,'炫彩'))
    p_con = process(target=consumer,args=(q,'corn'))
    p_pro.start()
    p_con.start()

第二种:

from multiprocessing import queue,process

def producer(q,product):
    for i in range(20):
        info = product + '的娃娃%s号' % str(i)
        q.put(info)


def consumer(q,name,color):
    while 1:
        info = q.get()
        if info:
            print('%s,%s拿走了%s\033[0m' % (color,name,info))
        else:# 当消费者获得队列中数据时,如果获得的是none,就是获得到了生产者不再生产数据的标识
            break# 此时消费者结束即可

if __name__ == '__main__':
    q = queue()
    p_pro1 = process(target=producer,args=(q,'炫彩'))
    p_pro2 = process(target=producer,args=(q,'苍井井'))
    p_pro3 = process(target=producer,args=(q,'波多多'))
    p_con1 = process(target=consumer,args=(q,'alex','\033[31m'))
    p_con2 = process(target=consumer,args=(q,'wusir','\033[32m'))
    p_l = [p_con1,p_con2,p_pro1,p_pro2,p_pro3]
    [i.start() for i in p_l]
    p_pro1.join()
    p_pro2.join()
    p_pro3.join()
    q.put(none)# 几个消费者就要接受几个结束标识
    q.put(none)

进程间通信——队列和管道(multiprocess.queue、multiprocess.pipe)

进程间通信--ipc(inter-process communication)

队列(multiprocess.queue) 

    import queue  # 不能进行多进程之间的数据传输

(1)from multiprocessing import queue 借助queue解决生产者消费者模型,队列是安全的。
  q = queue(num)
  num : 队列的最大长度
  q.get()# 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
  q.put()# 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待

  q.get_nowait()# 不阻塞,如果有数据直接获取,没有数据就报错
  q.put_nowait()# 不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错

(2)from multiprocessing import joinablequeue#可连接的队列
  joinablequeue是继承queue,所以可以使用queue中的方法
  并且joinablequeue又多了两个方法
  q.join()# 用于生产者。等待 q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据
  q.task_done() # 用于消费者,是指每消费队列中一个数据,就给join返回一个标识。

管道(multiprocess.pipe)

   from multiprocessing import pipe

   con1,con2 = pipe()

 管道是不安全的.

 管道是用于多进程之间通信的一种方式.

 如果在单进程中使用管道,那么就是con1收数据,就是con2发数据.

            如果是con1发数据,就是con2收数据

 如果在多进程中使用管道,那么就必须是父进程使用con1收,子进程就必须使用con2发

                  父进程使用con1发,子进程就必须使用con2收

                  父进程使用con2收,子进程就必须使用con1发

                  父进程使用con2发,子进程就必须使用con1收

在管道中有一个著名的错误叫做eoferror.是指,父进程如果关闭了发送端,子进程还继续接收数据,就会产生eoferror错误

 

进程间的共享内存(value,manager)

 from multiprocessing import manager
 m = manager()
 num = m.dict({键 : 值})
 num = m.list([1,2,3])

 进程池

 含义:

   进程池:一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。
 因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数
 开启那么多进程首先就需要消耗大量的时间让操作系统来为你管理它。其次还需要消耗大量时间让
 cpu帮你调度它。

 进程池还会帮程序员去管理池中的进程。

 方法:

 1).map(func,iterable)

    func:进程池中的进程执行的任务函数
    iterable: 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数

 2).apply(func,args=())同步执行任务

    func:进程池中的进程执行的任务函数
    args: 可迭代对象型的参数,是传给任务函数的参数
    同步处理任务时,不需要close和join
    同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束)

 3).apply_async(func,args=(),callback=none)异步执行任务

    func:进程池中的进程执行的任务函数
    args: 可迭代对象型的参数,是传给任务函数的参数
    callback: 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的
    异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)
    异步处理任务时,必须要加上close和join

回调函数

  进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作
回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网