蛇舞歌词,芮成钢专访朴槿惠,游园不值古诗
目录
小子今天想来谈谈“并行计算”,作为一个非科班人员,我为什么去捣鼓这么一个在科班里也比较专业的问题了。这就要说下我前几天做的一个作业了,当时我用python写了个程序,结果运行了一天,这个速度可让我愁了,我还怎么优化,怎么交作业啊。于是小子就去各大论坛寻丹问药了,终于让我发现可以用并行计算来最大化压榨电脑的cpu,提升计算效率,而且python里有multiprocessing这个库可以提供并行计算接口,于是小子花1天时间改进程序,终于在规定时间内做出了自己满意的结果,上交了作业。之后,小子对并行计算充满了兴趣,于是又重新在google上游历了一番,大致弄清了gpu、cpu、进程、线程、并行计算、分布式计算等概念,也把python的multiprocessing耍了一遍,现在小子也算略有心得了,所以来此立碑,以示后来游客。
小子本文分为四部分,一是大数据时代现状,其二是面对挑战的方法,然后是用python写并行程序,最后是multiprocessing实战。
当前我们正处于大数据时代,每天我们会通过手机、电脑等设备不断的将自己的数据传到互联网上。据统计,youtube上每分钟就会增加500多小时的视频,面对如此海量的数据,如何高效的存储与处理它们就成了当前最大的挑战。
但在这个对硬件要求越来越高的时代,cpu却似乎并不这么给力了。自2013年以来,处理器频率的增长速度逐渐放缓了,目前cpu的频率主要分布在3~4ghz。这个也是可以理解的,毕竟摩尔定律都生效了50年了,如果它老人家还如此给力,那我们以后就只要静等处理器频率提升,什么计算问题在未来那都不是话下了。实际上cpu与频率是于能耗密切相关的,我们之前可以通过加电压来提升频率,但当能耗太大,散热问题就无法解决了,所以频率就逐渐稳定下来了,而intel与amd等大制造商也将目标转向了多核芯片,目前普通桌面pc也达到了4~8核。
咱们有了多核cpu,以及大量计算设备,那我们怎么来用它们应对大数据时代的挑战了。那就要提到下面的方法了。
并行(parallelism)是指程序运行时的状态,如果在同时刻有多个“工作单位”运行,则所运行的程序处于并行状态。图一是并行程序的示例,开始并行后,程序从主线程分出许多小的线程并同步执行,此时每个线程在各个独立的cpu进行运行,在所有线程都运行完成之后,它们会重新合并为主线程,而运行结果也会进行合并,并交给主线程继续处理。
什么时候用并行计算:
gpu即图形处理器核心(graphics processing unit),它是显卡的心脏,显卡上还有显存,gpu与显存类似与cpu与内存。
gpu与cpu有不同的设计目标,cpu需要处理所有的计算指令,所以它的单元设计得相当复杂;而gpu主要为了图形“渲染”而设计,渲染即进行数据的列处理,所以gpu天生就会为了更快速地执行复杂算术运算和几何运算的。
gpu相比与cpu有如下优势:
gpu目前在处理深度学习上用得十分多,英伟达(nvidia)目前也花大精力去开发适合深度学习的gpu。现在上百层的神经网络已经很常见了,面对如此庞大的计算量,cpu可能需要运算几天,而gpu却可以在几小时内算完,这个差距已经足够别人比我们多打几个比赛,多发几篇论文了。
说到分布式计算,我们就先说下下google的3篇论文,原文可以直接点链接去下载:
google在2003~2006年发表了这三篇论文之后,一时之间引起了轰动,但是google并没有将mapreduce开源。在这种情况下hadoop就出现了,doug cutting在google的3篇论文的理论基础上开发了hadoop,此后hadoop不断走向成熟,目前facebook、ibm、imageshack等知名公司都在使用hadoop运行他们的程序。
分布式计算的优势:
可以集成诸多低配的计算机(成千上万台)进行高并发的储存与计算,从而达到与超级计算机媲美的处理能力。
在介绍如何使用python写并行程序之前,我们需要先补充几个概念,分别是进程、线程与全局解释器锁(global interpreter lock, gil)。
进程(process):
线程(threading):
进程与线程有两个主要的不同点,其一是进程包含线程,线程使用进程的内存空间,当然线程也有自己的私有空间,但容量小;其二是进程有各自独立的内存空间,互不干扰,而线程是共享内存空间。
图三展示了进程、线程与cpu之间的关系。在图三中,进程一与进程二都含有3个线程,cpu会按照线程来分配任务,如图中4个cpu同时执行前4个线程,后两个标红线程处于等待状态,在cpu运行完当前线程时,等待的线程会被唤醒并进入cpu执行。通常,进程含有的线程数越多,则它占用cpu的时间会越长。
gil是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 gil 的解释器也只允许同一时间执行一个线程。python的cpython解释器(普遍使用的解释器)使用gil,在一个python解释器进程内可以执行多线程程序,但每次一个线程执行时就会获得全局解释器锁,使得别的线程只能等待,由于gil几乎释放的同时就会被原线程马上获得,那些等待线程可能刚唤醒,所以经常造成线程不平衡享受cpu资源,此时多线程的效率比单线程还要低下。在python的官方文档里,它是这样解释gil的:
in cpython, the global interpreter lock, or gil, is a mutex that prevents multiple native threads from executing python bytecodes at once. this lock is necessary mainly because cpython’s memory management is not thread-safe. (however, since the gil exists, other features have grown to depend on the guarantees that it enforces.)
可以说它的初衷是很好的,为了保证线程间的数据安全性;但是随着时代的发展,gil却成为了python并行计算的最大障碍,但这个时候gil已经遍布cpython的各个角落,修改它的工作量太大,特别是对这种开源性的语音来说。但幸好gil只锁了线程,我们可以再新建解释器进程来实现并行,那这就是multiprocessing的工作了。
multiprocessing是python里的多进程包,通过它,我们可以在python程序里建立多进程来执行任务,从而进行并行计算。如下所述:
the multiprocessing package offers both local and remote concurrency, effectively side-stepping the global interpreter lock by using subprocesses instead of threads.
我们接下来介绍下multiprocessing的各个接口:
multiprocessing.process(target=none, args=()) target: 可以被run()调用的函数,简单来说就是进程中运行的函数 args: 是target的参数 process的方法: start(): 开始启动进程,在创建process之后执行 join([timeout]):阻塞目前父进程,直到调用join方法的进程执行完或超时(timeout),才继续执行父进程 terminate():终止进程,不论进程有没有执行完,尽量少用。
示例1
from multiprocessing import process def f(name): print 'hello', name if __name__ == '__main__': p = process(target=f, args=('bob',)) # p进程执行f函数,参数为'bob',注意后面的“,” p.start() # 进程开始 p.join() # 阻塞主线程,直至p进程执行结束
class multiprocessing.pool([processes]) processes是进程池中的进程数,默认是本机的cpu数量 方法: apply(func[, args[, kwds]])进程池中的进程进行func函数操作,操作时会阻塞进程,直至生成结果。 apply_async(func[, args[, kwds[, callback]]])与apply类似,但是不会阻塞进程 map(func, iterable[, chunksize])进程池中的进程进行映射操作 map_async(func, iterable[, chunksize[, callback]]) imap(func, iterable[, chunksize]):返回有序迭代器 imap_unordered(func, iterable[, chunsize]):返回无序迭代器 close():禁止进程池再接收任务 terminate():强行终止进程池,不论是否有任务在执行 join():在close()或terminate()之后进行,等待进程退出
示例2
from multiprocessing import pool def f(x): return x*x if __name__ == '__main__': p = pool(5) # 创建有5个进程的进程池 print(p.map(f, [1, 2, 3])) # 将f函数的操作给进程池
multiprocessing.pipe([duplex]) 返回两个连接对象(conn1, conn2),两个连接对象分别访问pipe的头和尾,进行读写操作 duplex: true(default),创建的pipe是双向的,也即两端都可以进行读写;若为false,则pipe是单向的,仅可以在一端读,另一端写,此时与queue类似。 multiprocessing.queue([maxsize]) qsize():返回queue中member数量 empty():如果queue是空的,则返回true full():如果queue中member数量达到maxsize,则返回true put(obj):将一个object放入到queue中 get():从队列中取出一个object并将它从queue中移除,fifo原则 close():关闭队列,并将缓存的object写入pipe
示例
from multiprocessing import pool import time def f(x): return x*x if __name__ == '__main__': pool = pool(processes=4) # start 4 worker processes result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process print result.get(timeout=1) # prints "100" unless your computer is *very* slow print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) print it.next() # prints "0" print it.next() # prints "1" print it.next(timeout=1) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print result.get(timeout=1) # raises multiprocessing.timeouterror
当一个进程获得(acquire)锁之后,其它进程在想获得锁就会被禁止,可以保护数据,进行同步处理。 acquire(block=true, timeout=none):尝试获取一个锁,如果block为true,则会在获得锁之后阻止其它进程再获取锁。 release():释放锁
共享内存通常需要配合进程锁来处理,保证处理的顺序相同。
multiprocessing.value(typecode_or_type, *args[, lock]) 返回一个ctype对象, 创建c = value(‘d’, 3.14),调用c.value() multiprocessing.array(typecode_or_type, size_or_initializer, *, lock=true) 返回一个ctype数组,只能是一维的 array(‘i’, [1, 2, 3, 4])
type code | c type | python type | minimum size in bytes |
---|---|---|---|
'b' |
signed char | int | 1 |
'b' |
unsigned char | int | 1 |
'u' |
py_unicode | unicode character | 2 |
'h' |
signed short | int | 2 |
'h' |
unsigned short | int | 2 |
'i' |
signed int | int | 2 |
'i' |
unsigned int | int | 2 |
'l' |
signed long | int | 4 |
'l' |
unsigned long | int | 4 |
'q' |
signed long long | int | 8 |
'q' |
unsigned long long | int | 8 |
'f' |
float | float | 4 |
'd' |
double | float | 8 |
multiprocessing.active_children():返回当前进程的所有子进程 multiprocessing.cpu_count():返回本计算机的cpu数量 multiprocessing.current_process():返回当前进程
windows平台另需注意:
process、lock与value尝试:
import multiprocessing as mp import time def job(v, num, l): l.acquire() # 锁住 for _ in range(5): time.sleep(0.1) v.value += num # 获取共享内存 print(v.value) l.release() # 释放 def multicore(): l = mp.lock() # 定义一个进程锁 #l = 1 v = mp.value('i', 0) # 定义共享内存 p1 = mp.process(target=job, args=(v,1,l)) # 需要将lock传入 p2 = mp.process(target=job, args=(v,3,l)) p1.start() p2.start() p1.join() p2.join() if __name__=='__main__': multicore()
上述代码即对共享内存叠加5次,p1进程每次叠加1,p2进程每次叠加3,为了避免p1与p2在运行时抢夺共享数据v,在进程执行时锁住了该进程,从而保证了执行的顺序。我测试了三个案例:
可以发现,没锁的情况下调整join可以取得与加锁类似的结果,这是因为join即是阻塞主进程,直至当前进程结束才回到主进程,若将p1.join()放到p1.start()后面,则会马上阻塞主进程,使得p2要稍后才开始,这与锁的效果一样。
如果如上述代码所示,p1.join()在p2.start()后面,虽然是p1先join(),但这时只是阻塞了主进程,而p2是兄弟进程,它已经开始了,p1就不能阻止它了,所以这时如果没锁的话p1与p2就是并行了,运行时间就是一半,但因为它们争抢共享变量,所以输出就变得不确定了。
pool
import multiprocessing as mp #import pdb def job(i): return i*i def multicore(): pool = mp.pool() #pdb.set_trace() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get获得结果 print(res.get()) # 迭代器,i=0时apply一次,i=1时apply一次等等 multi_res = [pool.apply_async(job, (i,)) for i in range(10)] # 从迭代器中取出 print([res.get() for res in multi_res]) multicore()
pool其实非常好用,特别是map与apply_async。通过pool这个接口,我们只有指定可以并行的函数与函数参数列表,它就可以自动帮我们创建多进程池进行并行计算,真的不要太方便。pool特别适用于数据并行模型,假如是消息传递模型那还是建议自己通过process来创立进程吧。
小子这次主要是按自己的理解把并行计算理了下,对进程、线程、cpu之间的关系做了下阐述,并把python的multiprocessing这个包拎了拎,个人感觉这个里面还大有学问,上次我一个师兄用python的process来控制单次迭代的运行时间(运行超时就跳过这次迭代,进入下一次迭代)也是让我涨了见识,后面还要多多学习啊。
感谢您花费宝贵的时间阅读到这里,希望能有所收获,也欢迎在评论区进行交流。
推荐好文:
python多进程的理解 multiprocessing process join run(推荐好文)
多进程 multiprocessing
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
新手学习Python2和Python3中print不同的用法
Python基于os.environ从windows获取环境变量
网友评论