Python并行编程 中文版¶

第一章 认识并行计算和Python¶
介绍¶
本章将介绍一些并行编程的架构和编程模型。对于初次接触并行编程技术的程序员来说,这些都是非常有用的概念;对于经验丰富的程序员来说,本章可以作为基础参考。 本章中讲述了并行编程的两种解释,第一种解释是基于系统架构的,第二种解释基于程序示例F。 并行编程对程序员来说一直是一项挑战。 本章讨论并行程序的设计方法的时候,深入讲了这种编程方法。 本章最后简单介绍了Python编程语言。Python的易用和易学、可扩展性和丰富的库以及应用,让它成为了一个全能性的工具,当然,在并行计算方面也得心应手。最后结合在Python中的应用讲了线程和进程。解决一个大问题的一般方法是,将其拆分成若干小的、独立的问题,然后分别解它们。并行的程序也是使用这种方法,用多个处理器同时工作,来完成同一个任务。每一个处理器都做自己的那部分工作(独立的部分)。而且计算过程中处理器之间可能需要交换数据。如果,软件应用要求越来越高的计算能力。提高计算能力有两种思路:提高处理器的时钟速度或增加芯片上的核心数。提高时钟速度就必然会增加散热,然后每瓦特的性能就会降低,甚至可能要求特殊的冷却设备。提高芯片的核心数是更可行的一种方案,因为能源的消耗和散热,第一种方法必然有上限,而且计算能力提高没有特别明显。
为了解决这个问题,计算机硬件供应商的选择是多核心的架构,就是在同一个芯片上放两个或者多个处理器(核心)。GPU制造商也逐渐引进了这种基于多处理器核心的硬件架构。事实上,今天的计算机几乎都是各种多核、异构的计算单元组成的,每一个单元都有多个处理核心。
所以,对我们来说充分利用计算资源就显得至关重要,例如并行计算的程序、技术和工具等。
并行计算的内存架构¶
根据指令的同时执行和数据的同时执行,计算机系统可以分成以下四类:
- 单处理器,单数据 (SISD)
- 单处理器,多数据 (SIMD)
- 多处理器,单数据 (MISD)
- 多处理器,多数据 (MIMD)
这种分类方法叫做“费林分类”:

SISD¶
单处理器单数据就是“单CPU的机器”,它在单一的数据流上执行指令。在SISD中,指令被顺序地执行。
对于每一个“CPU时钟”,CPU按照下面的顺序执行:
- Fetch: CPU 从一片内存区域中(寄存器)获得数据和指令
- Decode: CPU对指令进行解码
- Execute: 该执行在数据上执行,将结果保存在另一个寄存器中
当Execute阶段完成之后,CPU回到步骤1准备执行下一个时钟循环。

运行在这些计算机上的算法是顺序执行的(连续的),不存在任何并行。只有一个CPU的硬件系统就是SISD的例子。
这种架构(冯·诺依曼体系)的主要元素有以下:
- 中心内存单元:存储指令和数据
- CPU:用于从内存单元获得指令/数据,对指令解码并顺序执行它们
- I/O系统:程序的输入和输出流
传统的单处理器计算机都是经典的SISD系统。下图表述了CPU在Fetch、Decode、Execute的步骤中分别用到了哪些单元:

MISD¶
这种模型中,有n个处理器,每一个都有自己的控制单元,共享同一个内存单元。在每一个CPU时钟中,从内存获得的数据会被所有的处理器同时处理,每一个处理器按照自己的控制单元发送的指令处理。在这种情况下,并行实际上是指令层面的并行,多个指令在相同的数据上操作。能够合理利用这种架构的问题模型比较特殊,例如数据加密等。因此,MISD在现实中并没有很多用武之地,更多的是作为一个抽象模型的存在。

SIMD¶
SIMD计算机包括多个独立的处理器,每一个都有自己的局部内存,可以用来存储数据。所有的处理器都在单一指令流下工作;具体说,就是有n个数据流,每个处理器处理一个。所有的处理器同时处理每一步,在不同的数据上执行相同的指令。这是一个数据并行的例子。SIMD架构比MISD架构要实用的多。很多问题都可以用SIMD计算机的架构来解决。这种架构另一个有趣的特性是,这种架构的算法非常好设计,分析和实现。限制是,只有可以被分解成很多个小问题(小问题之间要独立,可以不分先后顺序被相同的指令执行)的问题才可以用这种架构解决。很多超级计算机就是使用这架构设计出来的。例如Connection Machine(1985年的 Thinking Machine)和MPP(NASA-1983).我们在第六章 GPU Python编程中会接触到高级的现代图形处理器(GPU),这种处理器就是内置了很多个SIMD处理单元,使这种架构在今天应用非常广泛。
MIMD¶
在费林分类中,这种计算机是最广泛使用、也是最强大的一个种类。这种架构有n个处理器,n个指令流,n个数据流。每一个处理器都有自己的控制单元和局部内存,让MIMD架构比SIMD架构的计算能力更强。每一个处理器都在独立的控制单元分配的指令流下工作;因此,处理器可以在不同的数据上运行不同的程序,这样可以解决完全不同的子问题甚至是单一的大问题。在MIMD中,架构是通过线程或进程层面的并行来实现的,这也意味着处理器一般是异步工作的。这种类型的计算机通常用来解决那些没有统一结构、无法用SIMD来解决的问题。如今,很多计算机都应用了这中间架构,例如超级计算机,计算机网络等。然而,有一个问题不得不考虑:异步的算法非常难设计、分析和实现。

内存管理¶
内存管理是并行架构需要考虑的另一方面,确切来说是获得数据的方式。无论处理单元多快,如果内存提供指令和数据的速度跟不上,系统性能也不会得到提升。制约内存达到处理器速度级别的响应时间的主要因素是内存存取周期。所谓存取周期就是连续启动两次读或写操作所需间隔的最小时间。处理器的周期通常比内存周期短得多。当处理器传送数据到内存或从内存中获取数据时,内存依旧在一个周期中,其他任何设备(I/O控制器,处理器)都不能使用内存,因为内存必须先对上一个请求作出响应。

为了解决 MIMD 架构访问内存的问题,业界提出了两种内存管理系统。第一种就是人们所熟知的共享内存系统,共享内存系统有大量的虚拟内存空间,而且各个处理器对内存中的数据和指令拥有平等的访问权限。另外一种类型是分布式内存模型,在这种内存模型中,每个处理器都有自己专属的内存,其他处理器都不能访问。共享内存和分布式内存的区别以处理器的角度来说就是内存和虚拟内存体系的不同。每个系统的内存都会分为能独立访问的不同部分。共享内存系统和分布式内存系统的处理单元管理内存访问的方式也不相同。 load R0,i
指令意味着将 i
内存单元的内容加载进 R0
寄存器,但内存管理方式的不同,处理器的处理方式也不尽相同。在共享内存的系统中, i
代表的是内存的全局地址,对系统中的所有处理器来说都指向同一块内存空间。如果两个处理器想同时执行该内存中的指令,它们会向 R0
寄存器载入相同的内容。在分布式内存系统中, i
是局部地址。如果两个处理器同时执行向 R0
载入内容的语句,执行结束之后,不同处理器 R0
寄存器中的值一般情况下是不一样的,因为每个处理器对应的内存单元中的 i
代表的全局地址不一样。对于程序员来说,必须准确的区分共享内存和分布式内存,因为在并行编程中需要考量内存管理方式来决定进程或线程间通讯的方式。对于共享内存系统来说,共享内存能够在内存中构建数据结构并在子进程间通过引用直接访问该数据结构。而对于分布式内存系统来说,必须在每个局部内存保存共享数据的副本。一个处理器会向其他处理器发送含有共享数据的消息从而创建数据副本。这使得分布式内存管理有一个显而易见的缺点,那就是,如果要发送的消息太大,发送过程会耗费相对较长的时间。
共享内存¶
下图展示了共享内存多处理器系统的架构,这里只展示了各部件之间简单的物理连接。总线结构允许任意数量的设备共享一个通道。总线协议最初设计是让单处理器,一个或多个磁盘和磁带控制器通过共享内存进行通讯。可以注意到处理器拥有各自的Cache,Cache中保存着局部内存中有可能被处理器使用的指令或数据。可以想象一下,当一个处理器修改了内存中的数据,同时另外一个处理器正在使用这个数据时,就会出现问题。已修改的值会从处理器的Cache传递到共享内存中,接着,新值会传递到其他处理器的Cache中,其它处理器就不可以使用旧值进行计算。这就是人们所熟知的Cache一致性问题,是内存一致性问题的一种特殊情况,要解决这个问题需要硬件能像多进程编程一样实现处理并发问题 和同步控制 。

共享内存系统的主要特性如下:
- 内存对于所有处理器来说是一样的,例如,所有处理器所对应的相同数据结构都存在于相同的逻辑地址,也就是说可以从相同的内存单元中获得该数据结构。
- 通过控制处理器对共享内存的访问权限可以达到同步控制的效果。实际上,每次只有一个处理器拥有对内存资源的访问权限。
- 当一个任务正在访问共享内存的时候,其它所有任务都不能改变内存单元的内容。
- 共享内存很快,两个任务通讯的时间和读取单个内存单元的时间相等(取决于内存的访问速度)
在共享内存系统中访问内存的方式如下
- 均匀内存访问 (Uniform memory access (UMA) ):这类系统的基本特征是无论对于处理器来说访问任意的内存区域速度是相同的。因此,这些系统也成为对称式多处理器 (symmetric multiprocessor (SMP)) 系统。这类系统实现起来相对简单,但是可扩展性较差,程序员需要通过插入适当的控制、信号量、锁等机制来管理同步,进而在程序中管理资源。
- 非均匀内存访问 (Non-uniform memory access (NUMA)):这类架构将内存分为高速访问区域和低速访问区域。高速访问区域是分配给各个处理器的区域,是用于数据交换的公用区域。这类系统也称为分布式共享内存系统 (Distributed Shared Memory Systems (DSM)) ,这类系统的扩展性很好,但开发难度较高。
- 无远程内存访问 (No remote memory access (NORMA)):对于处理器来说内存在物理上是分布式存在的。每个处理器只能访问其局部私有内存。处理器之间通过消息传递协议进行通讯。
- 仅Cache可访问 (Cache only memory access (COMA)):这类系统中仅有Cache内存。分析 NUMA 架构时,需要注意的是这类系统会把数据的副本保存在Cache中供处理器使用,并且在主存中也保留着重复的数据。COMA 架构可以移除重复的主存数据,而只保留Cache内存。对于处理器来说内存在物理上是分布式存在的。每个处理器只能访问其局部私有内存。处理器之间通过消息传递协议进行通讯。
分布式内存¶
在使用分布式内存的系统中,各个处理器都有其各自的内存,而且每个处理器只能处理属于自己的内存。某些学者把这类系统称为“多计算机系统”,这个名字很真实地反映了组成这类系统的元素能够独立作为一个具有内存和处理器的微型系统,如下图所示:

这种内存管理方式有几个好处。第一,总线和开关级别的的通讯不会发生冲突。每个处理器都可以无视其他处理器的干扰而充分利用局部内存的带宽;第二,没有通用总线意味着没有处理器数量的限制,系统的规模只局限于连接处理器的网络带宽;第三,没有Cache一致性问题的困扰。每个处理器只需要处理属于自己的数据而无须关心上传数据副本的问题。但最大的缺点是,很难实现处理器之间的通讯。如果一个处理器需要其他处理器的数据,这两个处理器必须要通过消息传递协议来交换消息。这样进行通讯会导致速度降低,原因有两个,首先,从一个处理器创建和发送消息到另外一个处理器需要时间;其次,任何处理器都需要停止工作,处理来自其他处理器的消息。面向分布式内存机器的程序必须按照尽量相互独立的任务来组织,任务之间通过消息进行通讯。

分布式内存系统的特性如下:
- 内存通常分布在不同的处理器之中,局部内存只能由对应的处理器访问。
- 同步控制通过在处理器之间转移数据 (也可以是消息本身) 来实现, 同理通讯的实现方式也一样。
- 局部内存的数据分支会影响机器的性能——有必要精确地进行数据分割最小化 CPU 间的通讯。另外,协调数据的分解合成操作的处理器必须与处理部分数据的处理器高效地通讯。
- 消息传递协议用于 CPU 间通过交换数据包通讯。消息是信息的分解单元,他们经过良好的定义,所以处理器之间能够准确地识别出消息地内容。
大规模并行处理 (Massively parallel processing (MPP))¶
MPP 机器由上百个处理器 (在一些机器中达到成千上万个) 通过通讯网络连接而成。世界上最快的计算机几乎都基于这种架构,采用这种架构的计算机系统有:Earth Simulator, Blue Gene, ASCI White, ASCI Red, ASCI Purple 以及 Red Storm 等。
工作站集群¶
工作站集群是指将传统的计算机通过通讯网络连接起来。在集群架构中,一个节点就是集群中的一个计算单元。对于用户来说,集群是完全透明的,掩盖了软硬件的复杂性,使得数据以及应用仿佛从一个节点中得到的。
在这里,会定义三种集群:
- 故障切换集群 (The fail-over cluster) :在这类集群中,会持续检测节点的活动状态,当一个节点出现故障,另外一台机器会马上接管故障节点的工作。这类集群通过这种冗余架构可以保证系统的可用性。
- 负载均衡集群 (The load balancing cluster) :在这类系统中,会将一个作业请求发送给负载较小的节点上执行。这样做可以减少整个处理过程所耗费的时间。
- 高性能计算集群 (The high-performance cluster) :在这类系统中,每个节点都可以提供极高的性能,一个任务依旧分解为若干个子任务交给各个节点处理。任务是并行化的,分配给不同的机器进行处理。
异构架构¶
在同构的超级计算机中采用GPU加速器改变了之前超级计算机的使用规则。即使GPU能够提供高性能计算,但是不能把它看作一个独立的处理单元,因为GPU必须在CPU的配合下才能顺利完成工作。因此,异构计算的程序设计方法很简单,首先CPU通过多种方式计算和控制任务,将计算密集型和具有高并行性的任务分配给图形加速卡执行。CPU和GPU之间不仅可以通过高速总线通讯,也可以通过共享一块虚拟内存或物理内存通讯。事实上,在这类设备上GPU和CPU都没有独立的内存区域,一般是通过由各种编程框架(如CUDA,OpenCL)提供的库来操作内存。这类架构被称之为异构架构,在这种架构中,应用程序可以在单一的地址空间中创建数据结构,然后将任务分配给合适的硬件执行。通过原子性操作,多个任务可以安全地操控同一个内存区域同时避免数据一致性问题。所以,尽管CPU和GPU看起来不能高效联合工作,但通过新的架构可以优化它们之间的交互和提高并行程序的性能。

并行编程模型¶
并行编程模型是作为对硬件和内存架构的抽象而存在的。事实上,这些模式不是特定的,而且和机器的类型或内存的架构无关。他们在理论上能在任何类型的机器上实现。相对于前面的架构细分,这些编程模型会在更高的层面上建立,用于表示软件执行并行计算时必须实现的方式。为了访问内存和分解任务,每一个模型都以它独自的方式和其他处理器共享信息。
需要明白的是没有最好的编程模型,模型的效果如何很大程度上取决于实际的问题。使用范围最广的并行编程模型有:
- 共享内存模型
- 多线程模型
- 分布式内存/消息传递模型
- 数据并行模型
在这节中,会描述这些编程模型的概览。在下一章会更加准确的描述这些编程模型,并会介绍Python中实现这些模型的相应模块。
共享内存模型¶
在这个编程模型中所有任务都共享一个内存空间,对共享资源的读写是 异步的。系统提供一些机制,如锁和信号量,来让程序员控制共享内存的访问权限。使用这个编程模型的优点是,程序员不需要清楚任务之间通讯的细节。但性能方面的一个重要缺点是,了解和管理数据区域变得更加困难;将数据保存在处理器本地才可以节省内存访问,缓存刷新以及多处理器使用相同数据时发生的总线流量。
多线程模型¶
在这个模型中,单个处理器可以有多个执行流程,例如,创建了一个顺序执行任务之后,会创建一系列可以并行执行的任务。通常情况下,这类模型会应用在共享内存架构中。由于多个线程会对共享内存进行操作,所以进行线程间的同步控制是很重要的,作为程序员必须防止多个线程同时修改相同的内存单元。现代的CPU可以在软件和硬件上实现多线程。POSIX 线程就是典型的在软件层面上实现多线程的例子。Intel 的超线程 (Hyper-threading) 技术则在硬件层面上实现多线程,超线程技术是通过当一个线程在停止或等待I/O状态时切换到另外一个线程实现的。使用这个模型即使是非线性的数据对齐也能实现并行性。
消息传递模型¶
消息传递模型通常在分布式内存系统(每一个处理器都有独立的内存空间)中应用。更多的任务可以驻留在一台或多台物理机器上。程序员需要确定并行和通过消息产生的数据交换。实现这个数据模型需要在代码中调用特定的库。于是便出现了大量消息传递模型的实现,最早的实现可以追溯到20世纪80年代,但直到90年代中期才有标准化的模型——实现了名为MPI (the Message Passing Interface, 消息传递接口)的事实标准。MPI 模型是专门为分布式内存设计的,但作为一个并行编程模型,也可以在共享内存机器上跨平台使用。

数据并行模型¶
在这个模型中,有多个任务需要操作同一个数据结构,但每一个任务操作的是数据的不同部分。在共享内存架构中,所有任务都通过共享内存来访问数据;在分布式内存架构中则会将数据分割并且保存到每个任务的局部内存中。为了实现这个模型,程序员必须指定数据的分配方式和对齐方式。现代的GPU在数据已对齐的情况下运行的效率非常高。

如何设计一个并行程序¶
并行算法的设计是基于一系列操作的,在编程的过程中必须执行这些操作来准确地完成工作而不会产生部分结果或错误结果。并行算法地大致操作如下:
- 任务分解 (Task decomposition)
- 任务分配 (Task assignment)
- 聚合 (Agglomeration)
- 映射 (Mapping)
任务分解¶
第一阶段,将软件程序分解为可以在不同处理器执行的多个任务或一系列指令以实现并行性。下面展示了两个方法来实现程序分解:
- 按范围分解 (Domain decomposition):使用这个分解方法,程序所需的数据会被分解;处理器会使用同一个程序处理不同的数据。这个方法一般在需要处理大量数据的情况下使用。
- 按功能分解 (Functional decomposition):使用这个分解方法会将问题分解为几个任务,每个任务会对可利用的数据执行不同的操作。
任务分配¶
在这个步骤中,并行程序将任务分配给各种处理器的机制是确定的。这个阶段非常重要,因为在这阶段会向各个处理器之间分配工作。负载均衡是这个阶段的关键,所有处理器都应该保持工作状态,避免长时间的空闲。为了实现这个效果,程序员必须考虑异构系统的可能性,异构系统会尝试将任务分配给相对更适合的处理器。最后,为了让并行程序有更高的效率,必须尽量减少处理器之间的通讯,因为处理器之间的通讯通常是程序变慢和资源消耗的源头。
聚合¶
聚合,就是为了提升性能将小任务合并成大任务的过程。如果设计过程的前两个阶段是将分解问题得到的任务数量大大超过处理器可接受的程度,或者计算机不是专门设计用于处理大量小任务 (如GPU的架构就非常适合处理数百万甚至上亿任务),那么过分解会导致严重的效率下降。一般情况下,这是因为任务需要跟处理它的处理器或线程进行通讯。大多数的通讯的消耗不仅包括跟传输数据量相称的部分,还包括进行通讯的固定部分 (如建立 TCP 连接的延迟)。如果分解的任务过小,固定消耗可能比数据量还大,可以说这样的设计是低效的。
映射¶
在并行算法设计的映射阶段,会指定任务由哪个处理器处理。这阶段的目标是减少总体的执行时间。在这里需要经常做取舍,因为下面两个相互矛盾的策略:
- 需要频繁通讯的任务应该由同一个处理器处理以增强局部性。
- 可以并行执行的任务应该由多个处理器处理以增强并行性。
这就是所谓的映射问题,也是一个NP完全问题——一般情况下不能再多项式时间内解决的问题。在大小相等和通讯模式容易定义的任务中,映射很直接 (在这里也可以执行聚合的步骤来合并映射到相同处理器的任务)。但是如果任务的通讯模式难以预测或者每个任务的工作量都不一样,设计一个高效的映射和聚合架构就会变得有难度。由于存在这些问题,负载均衡算法会在运行时识别聚合和映射策略。最难的问题是在程序执行期间通信量或任务数量改变的问题。针对这些问题,可以使用在执行过程中周期性运行的动态负载均衡算法。
动态映射¶
无论是全局还是局部,对于不同的问题都有不同的负载均衡算法。全局算法需要对即将指向的计算有全局的认识,这样通常会增加很多开销。局部算法只需要依靠正在解决的问题的局部信息,对比全局算法能够减少很多开销,但在寻找最佳聚合和映射的能力较差。然而,即使映射的结果较差,节省的开销一般还是能减少执行时间。如果任务除了执行开始和结束几乎不和其它任务进行通讯,那么可以使用任务调度算法简单地把任务分配给空转的处理器。在任务调度算法中,会维护一个任务池,任务池中包含了待执行的任务,工作单元 (一般是处理器) 会从中取出任务执行。
接下来会解释这个模型中的三个通用方法。
管理单元/工作单元 (Manager/worker)¶
这是一个基础的动态映射架构,在这个架构中工作单元会连接到一个中央管理单元中。管理单元不停发送任务给工作单元并收集运算结果。这个策略在处理器数量相对较小的情况下表现最好。
分层的管理单元/工作单元 (Hierarchical manager/worker)¶
这是拥有半分布式布局的管理单元/工作单元的变种;工作单元会按组划分,每一组都有器管理单元。当工作单元从组管理单元获取任务时,组管理单元会和中央管理单元通讯 (或者组管理单元之间直接通讯)。通过提前获取任务可以提升这个基础策略的性能,这就导致了通讯和计算重迭进行。这样就可以在多个管理单元之间传播负载,如果所有工作单元都向同一个管理单元请求任务,这种策略本身就可以应付大量的处理器。
去中心化 (Decentralize)¶
在这个架构中,所有东西都是去中心化的。每个处理器都维护着自己的任务池并且直接和其它处理器通讯请求任务。处理器有很多种方式选择处理器请求任务,选择哪种方式有待解决的问题决定。
如何评估并行程序的性能¶
并行编程的发展产生了对性能指标和并行程序评估软件的需求,通过评估性能才能确定该算法是否方便快捷。实际上,并行计算的重点是在相对较短的时间内解决体量较大的问题。为了能够达到这个目的,需要考虑的因素有:使用的硬件类型,问题的可并行程度和采用的编程模型等。为了加速算法评估过程,引进了基本概念分析,也就是将并行算法和原始的顺序执行做对比。通过分析和确定线程数量和/或使用的处理器数量来确定性能。
为了进行分析,在这里介绍几个性能指标:加速比,效率和扩展性。
阿姆德尔定律 (Ahmdal’s law) 引入了并行计算的极限,来评估串行算法并行化的效率。古斯塔夫森定律 (Gustafson’s law) 也做了相似的评估。
加速比¶
加速比用于衡量使用并行方式解决问题的收益。假设使用单个处理单元解决这个问题需要的时间为 ,使用
个相同的处理单元解决这个问题的时间为
,那么加速比
。如果
,加速比为线性,也就是说执行速度随着处理器数量的增加而加快。当然,这只是一个理想状态。当
为最佳串行算法的执行时间,加速比是绝对的,而当
为并行算法在单个处理器上的执行时间,那么加速比是相对的。
下面概括了上述的情况:
为线性加速比,也是理想加速比。
为真实加速比
为超线性加速比
效率¶
在理想状态下,如果一个并行系统有 个处理单元,那么它能提供的加速比等于
。然而,这几乎是不可能达到的。处理单元空转和通讯通常会浪费一些时间。效率通常是用于评价处理器在执行任务时是否被充分利用的性能指标,它跟通讯和同步所耗费的时间作比较。
假设效率为 ,可以通过
算出。拥有线性加速比的算法的效率
;在其它情况下,
会小于1。下面会定义三种情况:
- 当
,为线性加速比。
- 当
,为真实情况。
- 当
,可以确定这是一个有问题的低效并行算法。
伸缩性¶
伸缩性用于度量并行机器高效运行的能力,代表跟处理器数量成比例的计算能力 (执行速度)。如果问题的规模和处理器的数量同时增加,性能不会下降。在依靠各种因素叠加的可伸缩系统中,可以保持相同的效率或者有更高的效率。
阿姆德尔定律 (Ahmdal’s law)¶
阿姆德尔定律广泛使用于处理器设计和并行算法设计。它指出程序能达到的最大加速比被程序的串行部分限制。 中
指程序的串行部分。它的意思是,例如一个程序90%的代码都是并行的,但仍存在10%的串行代码,那么系统中即使由无限个处理器能达到的最大加速比仍为9。
古斯塔夫森定律 (Gustafson’s law)¶
古斯塔夫森定律在考虑下面的情况之后得出的:
- 当问题的规模增大时,程序的串行部分保持不变。
- 当增加处理器的数量时,每个处理器执行的任务仍然相同。
古斯塔夫森定律指出了加速比 ,
为处理器的数量,
为加速比,
是并行处理器中的非并行的部分。作为对比,阿姆德尔定律将单个处理器的执行时间作为定量跟并行执行时间相比。因此阿姆德尔定律是基于固定的问题规模提出的,它假设程序的整体工作量不会随着机器规模 (也就是处理器数量) 而改变。古斯塔夫森定律补充了阿姆德尔定律没有考虑解决问题所需的资源总量的不足。古斯塔夫森定律解决了这个问题, 它表明设定并行解决方案所允许耗费的时间的最佳方式是考虑所有的计算资源和基于这类信息。
介绍Python¶
Python是一种动态的解释型语言,应用场景广泛。它具有以下特性:
- 简明、易读的语法
- 丰富的标准库。通过第三方的软件模块,我们可以方便地添加数据类型、函数和对象
- 上手简单,开发和调试速度快。Python代码的开发速度可能比C/C++快10倍
- 基于Exception的错误处理机制
- 强大的自省功能
- 丰富的文档和活跃的社区
Python也可以作为一种胶水语言。通过Python,擅长不同编程语言的程序员可以在同一个项目中合作。例如开发一个数据型的应用时,C/C++程序员可以从底层实现高效的数值计算算法,而数据科学家可以通过Python调用这些算法,而不用花时间去学习底层的编程语言,C/C++程序员也不需要去理解科学数据层面的东西。
你可以从这里查看更多相关内容: https://www.python.org/doc/essays/omg-darpa-mcc-position/
准备工作¶
Python可以从这里下载:https://www.python.org/downloads/
虽然用NotePad或TextEdit就可以写Python代码,但是如果用集成开发环境(Integrated Development Environment, IDE)的话,编辑和调试会更方便。
目前已经有很多专门为Python设计的IDE,包括IDEL( https://docs.python.org/3/library/idle.html ),PyCharm( https://www.jetbrains.com/pycharm/ ),Sublime Textd(https://www.sublimetext.com/)等。
如何做…¶
下面来通过一些简短的代码熟悉一下Python。 >>>
符号是Python解释器的提示符。
整数类型的操作:
>>> # This is a comment >>> width = 20 >>> height = 5*9 >>> width * height 900
鉴于这是我们第一次展示代码,下面贴一下代码在Python解释器中的样子:
下面来看一下其他的例子:
复数(译者注:这里原书
abs(a) = 5
,应该是错了):>>> a=1.5+0.5j >>> a.real 1.5 >>> a.imag 0.5 >>> abs(a) # sqrt(a.real**2 + a.imag**2) 1.5811388300841898
字符串操作:
>>> word = 'Help' + 'A' >>> word 'HelpA' >>> word[4] 'A' >>> word[0:2] 'He' >>> word[-1] # 最后一个字符 'A'
列表(list)操作:
>>> a = ['spam', 'eggs', 100, 1234] >>> a[0] 'spam' >>> a[3] 1234 >>> a[-2] 100 >>> a[1:-1] ['eggs', 100] >>> len(a) 4
while
循环:# Fibonacci series: >>> while b < 10: ... print b ... a, b = b, a+b ... 1 1 2 3 5 8
if
命令: 首先我们用input()
从键盘读入一个整数:>>>x = int(input("Please enter an integer here: ")) Please enter an integer here:
然后在输入的数字中使用
if
进行判断:>>>if x < 0: ... print ('the number is negative') ...elif x == 0: ... print ('the number is zero') ...elif x == 1: ... print ('the number is one') ...else: ... print ('More') ...
for
循环::>>> # Measure some strings: ... a = ['cat', 'window', 'defenestrate'] >>> for x in a: ... print (x, len(x)) ... cat 3 window 6 defenestrate 12
定义函数:
>>> def fib(n): # 生成n以内的菲波那切数列 ... """Print a Fibonacci series up to n.""" ... a, b = 0, 1 ... while b < n: ... print(b), ... a, b = b, a+b >>> # Now call the function we just defined: ... fib(2000) 1 1 2 3 5 8 13 21 34 55 89 144 233 377 610 987 1597
导入模块:
>>> import math >>> math.sin(1) 0.8414709848078965 >>> from math import * >>> log(1) 0.0
定义类:
>>> class Complex: ... def __init__(self, realpart, imagpart): ... self.r = realpart ... self.i = imagpart ... >>> x = Complex(3.0, -4.5) >>> x.r, x.i (3.0, -4.5)
并行世界的Python¶
作为一种解释型的语言,Python的速度并不算慢。如果对速度有很高的要求的话,可以选择用更快的语言实现,比如C或C++,然后用Python调用。Python的一种常见应用场景是实现高级的逻辑。Python的解释器就是用C语言写的,即CPython。解释器将Python转换成一种中间语言,叫做Python字节码,类似于汇编语言,但是包含一些更高级的指令。当一个运行一个Python程序的时候,评估循环不断将Python字节码转换成机器码。解释型语言的好处是方便编程和调试,但是程序的运行速度慢。其中的一种解决办法是,用C语言实现一些第三方的库,然后在Python中使用。另一种方法是使用即时编译器来替换Cpython,例如PyPy,PyPy对代码生成和Python的运行速度做了优化。但是在本书中,我们将研究第三种方法。Python提供了很多可以利用并行的模块,在后面的章节中,我们将着重讨论这些并行编程的模块。
接下来,本章将介绍两种基本概念:线程和进程,以及它们在Python中的表现。
介绍线程和进程¶
进程是应用程序的一个执行实例,比如,在桌面上双击浏览器图标将会运行一个浏览器。线程是一个控制流程,可以在进程内与其他活跃的线程同时执行。“控制流程”指的是顺序执行一些机器指令。进程可以包含多个线程,所以开启一个浏览器,操作系统将创建一个进程,并开始执行这个进程的主线程。每一个线程将独立执行一系列的指令(通常就是一个函数),并且和其他线程并行执行。然而,同一个进程内的线程可以共享一些地址空间和数据结构。线程也被称作“轻量进程”,因为它和进程有许多共同点,比如都是可以和其他控制流程同时运行的控制流程,说它“轻量”是因为实现一个进程比线程要繁重的多。重申一遍,不同于进程,多个线程可以共享很多资源,特别是地址空间和数据结构等。
总结一下:
- 进程可以包含多个并行运行的线程。
- 通常,操作系统创建和管理线程比进程更能节省CPU的资源。线程用于一些小任务,进程用于繁重的任务——运行应用程序。
- 同一个进程下的线程共享地址空间和其他资源,进程之间相互独立。
在深入研究通过线程和进程管理并行的Python模块之前,我们先来看一下Python中是如何使用这两者的。
开始在Python中使用进程¶
在大多数操作系统中,每个程序在一个进程中运行。通常,我们通过双击一个应用程序的图标来启动程序。在本节中,我们简单地展示如何在Python中开启一个进程。进程有自己的地址空间,数据栈和其他的辅助数据来追踪执行过程;系统会管理所有进程的执行,通过调度程序来分配计算资源等。
如何做…¶
执行第一个示例,我们需要敲入下面两个代码文件:
called_Process.py
calling_Process.py
你可以使用Python IDE(3.3.0)来编辑下面的文件:
called_Process
的代码如下:
print("Hello Python Parallel Cookbook!!")
closeInput = raw_input("Press ENTER to exit")
print"Closing calledProcess"
calling_Process
的代码如下:
## The following modules must be imported
import os
import sys
## this is the code to execute
program = "python"
print("Process calling")
arguments = ["called_Process.py"]
## we call the called_Process.py script
os.execvp(program, (program,) + tuple(arguments))
print("Good Bye!!")
运行例子的方法是,用Python IDE打开 calling_Process
程序然后按下F5.
在Python shell看到的输出如下:

同时,系统的终端将看到如下输出:

我们有两个进程运行,按下Enter可以关闭系统终端。
如何做…¶
在前面的例子中, execvp
函数开启了一个新的进程,替换了当前的进程。注意”Good Bye”永远不会打印出来。相反,它会在当前的系统路径中搜索 called_Process
,将第二个参数的内容作为独立的变量传给程序,然后在当前环境上下文中执行。called_Process``中的 ``input()
仅仅用来管理当前系统的闭包。本节展示了基于进程的并行,我们在后面会介绍更多通过进程(multiprocessing模块)管理并行的方法。
开始在Python中使用线程¶
如前面章节提到的那样,基于线程的并行是编写并行程序的标准方法。然而,Python解释器并不完全是线程安全的。为了支持多线程的Python程序,CPython使用了一个叫做全局解释器锁(Global Interpreter Lock, GIL)的技术。这意味着同一时间只有一个线程可以执行Python代码;执行某一个线程一小段时间之后,Python会自动切换到下一个线程。GIL并没有完全解决线程安全的问题,如果多个线程试图使用共享数据,还是可能导致未确定的行为。
在本节中,我们将展示如何在Python程序中创建一个线程。
如何做…¶
我们需要 helloPythonWithThreads.py
来执行第一个例子:
# To use threads you need import Thread using the following code:
from threading import Thread
# Also we use the sleep function to make the thread "sleep"
from time import sleep
# To create a thread in Python you'll want to make your class work as a thread.
# For this, you should subclass your class from the Thread class
class CookBook(Thread):
def __init__(self):
Thread.__init__(self)
self.message = "Hello Parallel Python CookBook!!\n"
# this method prints only the message
def print_message(self):
print(self.message)
# The run method prints ten times the message
def run(self):
print("Thread Starting\n")
x = 0
while (x < 10):
self.print_message()
sleep(2)
x += 1
print("Thread Ended\n")
# start the main process
print("Process Started")
# create an instance of the HelloWorld class
hello_Python = CookBook()
# print the message...starting the thread
hello_Python.start()
# end the main process
print("Process Ended")
运行上面的代码,需要用Python IDE打开 helloPythonWithThreads.py
然后按下 F5.
在Python shell中你将看到以下输出:

讨论¶
主程序执行结束的时候,线程依然会每个两秒钟就打印一次信息。此例子证实了线程是在父进程下执行的一个子任务。
需要注意的一点是,永远不要留下任何线程在后台默默运行。否则在大型程序中这将给你带来无限痛苦。
第二章 基于线程的并行¶
介绍¶
目前,在软件应用中使用最广泛的并发编程范例是多线程。通常,一个应用有一个进程,分成多个独立的线程,并行运行、互相配合,执行不同类型的任务。
虽然这种模式存在一些缺点,有很多潜在的问题,但是多线程的应用依然非常广泛。
现在几乎所有的操作系统都支持多线程,几乎所有的编程语言都有相应的多线程机制,可以在应用中通过线程实现并发。
所以,使用多线程编程来实现并发的并用是个不错的选择。然而,多线程并不是唯一的选择,有不少其他的方案的表现比多线程好的多。
线程是独立的处理流程,可以和系统的其他线程并行或并发地执行。多线程可以共享数据和资源,利用所谓的共享内存空间。线程和进程的具体实现取决于你要运行的操作系统,但是总体来讲,我们可以说线程是包含在进程中的,同一进程的多个不同的线程可以共享相同的资源。相比而言,进程之间不会共享资源。
每一个线程基本上包含3个元素:程序计数器,寄存器和栈。与同一进程的其他线程共享的资源基本上包括数据和系统资源。每一个线程也有自己的运行状态,可以和其他线程同步,这点和进程一样。线程的状态大体上可以分为ready,running,blocked。线程的典型应用是应用软件的并行化——为了充分利用现代的多核处理器,使每个核心可以运行单个线程。相比于进程,使用线程的优势主要是性能。相比之下,在进程之间切换上下文要比在统一进程的多线程之间切换上下文要重的多。
多线程编程一般使用共享内容空间进行线程间的通讯。这就使管理内容空间成为多线程编程的重点和难点。
使用Python的线程模块¶
Python通过标准库的 threading
模块来管理线程。这个模块提供了很多不错的特性,让线程变得无比简单。实际上,线程模块提供了几种同时运行的机制,实现起来非常简单。
线程模块的主要组件如下:
- 线程对象
- Lock对象
- RLock对象
- 信号对象
- 条件对象
- 事件对象
在接下来的子章节中,我们将通过例子尝试这些由线程库提供的特性。以下实例基于Python 3.3(兼容Python 2.7)。
如何定义一个线程¶
使用线程最简单的一个方法是,用一个目标函数实例化一个Thread然后调用 start()
方法启动它。Python的threading模块提供了 Thread()
方法在不同的线程中运行函数或处理过程等。
class threading.Thread(group=None,
target=None,
name=None,
args=(),
kwargs={})
上面的代码中:
group
: 一般设置为None
,这是为以后的一些特性预留的target
: 当线程启动的时候要执行的函数name
: 线程的名字,默认会分配一个唯一名字Thread-N
args
: 传递给target
的参数,要使用tuple类型kwargs
: 同上,使用字典类型dict
创建线程的方法非常实用,通过`target`参数、`arg`和`kwarg`告诉线程应该做什么。下面这个例子传递一个数字给线程(这个数字正好等于线程号码),目标函数会打印出这个数字。
如何做…¶
让我们看一下如何通过threading模块创建线程,只需要几行代码就可以了:
import threading
def function(i):
print ("function called by thread %i\n" % i)
return
threads = []
for i in range(5):
t = threading.Thread(target=function , args=(i, ))
threads.append(t)
t.start()
t.join()
上面的代码运行结果如下:

注意,输出的顺序可能和上图不同。事实上,多个线程可能同时向 stdout
打印结果,所以输出顺序无法事先确定。
(译者注:这段代码怀疑存在错误,因为写了 t.join()
,这意味着,t线程结束之前并不会看到后续的线程,换句话说,主线程会调用t线程,然后等待t线程完成再执行for循环开启下一个t线程,事实上,这段代码是顺序运行的,实际运行顺序永远是01234顺序出现,不会出现图中结果)
讨论¶
导入内置threading模块,简单地使用python命令就可以了:
import threading
在主程序中,我们使用目标函数 function
初始化了一个线程对象 Thread
。同时还传入了用于打印的一个参数:
t = threading.Thread(target=function , args=(i, ))
线程被创建之后并不会马上运行,需要手动调用 start()
, join()
让调用它的线程一直等待直到执行结束(即阻塞调用它的主线程, t
线程执行结束,主线程才会继续执行):
t.start()
t.join()
如何确定当前的线程¶
使用参数来确认或命名线程是笨拙且没有必要的。每一个 Thread
实例创建的时候都有一个带默认值的名字,并且可以修改。在服务端通常一个服务进程都有多个线程服务,负责不同的操作,这时候命名线程是很实用的。
如何做…¶
为了演示如何确定正在运行的线程,我们创建了三个目标函数,并且引入了 time
在运行期间挂起2s,让结果更明显。
import threading
import time
def first_function():
print(threading.currentThread().getName() + str(' is Starting '))
time.sleep(2)
print (threading.currentThread().getName() + str(' is Exiting '))
return
def second_function():
print(threading.currentThread().getName() + str(' is Starting '))
time.sleep(2)
print (threading.currentThread().getName() + str(' is Exiting '))
return
def third_function():
print(threading.currentThread().getName() + str(' is Starting '))
time.sleep(2)
print(threading.currentThread().getName() + str(' is Exiting '))
return
if __name__ == "__main__":
t1 = threading.Thread(name='first_function', target=first_function)
t2 = threading.Thread(name='second_function', target=second_function)
t3 = threading.Thread(name='third_function', target=third_function)
t1.start()
t2.start()
t3.start()
输出如下图所示:

讨论¶
我们使用目标函数实例化线程。同时,我们传入 name
参数作为线程的名字,如果不传这个参数,将使用默认的参数:
t1 = threading.Thread(name='first_function', target=first_function)
t2 = threading.Thread(name='second_function', target=second_function)
t3 = threading.Thread(target=third_function)
(译者注:这里的代码和上面的不一样,可能作者本意是第三个线程不加参数来测试默认的行为,如果改为这里的代码,那么线程3将会输出的是 Thread-1 is Starting
以及 Thread-1 is Exiting
,读者可以自行尝试)
最后调用 start()
和 join()
启动它们。
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
如何实现一个线程¶
使用threading模块实现一个新的线程,需要下面3步:
- 定义一个
Thread
类的子类 - 重写
__init__(self [,args])
方法,可以添加额外的参数 - 最后,需要重写
run(self, [,args])
方法来实现线程要做的事情
当你创建了新的 Thread
子类的时候,你可以实例化这个类,调用 start()
方法来启动它。线程启动之后将会执行 run()
方法。
如何做…¶
为了在子类中实现线程,我们定义了 myThread
类。其中有两个方法需要手动实现:
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print("Starting " + self.name)
print_time(self.name, self.counter, 5)
print("Exiting " + self.name)
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
# 译者注:原书中使用的thread,但是Python3中已经不能使用thread,以_thread取代,因此应该
# import _thread
# _thread.exit()
thread.exit()
time.sleep(delay)
print("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# Start new Threads
thread1.start()
thread2.start()
# 以下两行为译者添加,如果要获得和图片相同的结果,
# 下面两行是必须的。疑似原作者的疏漏
thread1.join()
thread2.join()
print("Exiting Main Thread")
上面的代码输出结果如下:

讨论¶
threading
模块是创建和管理线程的首选形式。每一个线程都通过一个继承 Thread
类,重写 run()
方法来实现逻辑,这个方法是线程的入口。在主程序中,我们创建了多个 myThread
的类型实例,然后执行 start()
方法启动它们。调用 Thread.__init__
构造器方法是必须的,通过它我们可以给线程定义一些名字或分组之类的属性。调用 start()
之后线程变为活跃状态,并且持续直到 run()
结束,或者中间出现异常。所有的线程都执行完成之后,程序结束。
join()
命令控制主线程的终止。
使用Lock进行线程同步¶
当两个或以上对共享内存的操作发生在并发线程中,并且至少有一个可以改变数据,又没有同步机制的条件下,就会产生竞争条件,可能会导致执行无效代码、bug、或异常行为。
竞争条件最简单的解决方法是使用锁。锁的操作非常简单,当一个线程需要访问部分共享内存时,它必须先获得锁才能访问。此线程对这部分共享资源使用完成之后,该线程必须释放锁,然后其他线程就可以拿到这个锁并访问这部分资源了。
很显然,避免竞争条件出现是非常重要的,所以我们要保证,在同一时刻只有一个线程允许访问共享内存。
尽管原理很简单,但是这样使用是work的。
然而,在实际使用的过程中,我们发现这个方法经常会导致一种糟糕的死锁现象。当不同的线程要求得到一个锁时,死锁就会发生,这时程序不可能继续执行,因为它们互相拿着对方需要的锁。

为了简化问题,我们设有两个并发的线程( 线程A 和 线程B ),需要 资源1 和 资源2 .假设 线程A 需要 资源1 , 线程B 需要 资源2 .在这种情况下,两个线程都使用各自的锁,目前为止没有冲突。现在假设,在双方释放锁之前, 线程A 需要 资源2 的锁, 线程B 需要 资源1 的锁,没有资源线程不会继续执行。鉴于目前两个资源的锁都是被占用的,而且在对方的锁释放之前都处于等待且不释放锁的状态。这是死锁的典型情况。所以如上所说,使用锁来解决同步问题是一个可行却存在潜在问题的方案。
本节中,我们描述了Python的线程同步机制, lock()
。通过它我们可以将共享资源某一时刻的访问限制在单一线程或单一类型的线程上,线程必须得到锁才能使用资源,并且之后必须允许其他线程使用相同的资源。
如何做…¶
下面的例子展示了如何通过 lock()
管理线程。在下面的代码中,我们有两个函数: increment()
和 decrement()
。第一个函数对共享资源执行加1的操作,另一个函数执行减1.两个函数分别使用线程封装。除此之外,每一个函数都有一个循环重复执行操作。我们想要保证,通过对共享资源的管理,执行结果是共享资源最后等于初始值0.
代码如下:
# -*- coding: utf-8 -*-
import threading
shared_resource_with_lock = 0
shared_resource_with_no_lock = 0
COUNT = 100000
shared_resource_lock = threading.Lock()
# 有锁的情况
def increment_with_lock():
global shared_resource_with_lock
for i in range(COUNT):
shared_resource_lock.acquire()
shared_resource_with_lock += 1
shared_resource_lock.release()
def decrement_with_lock():
global shared_resource_with_lock
for i in range(COUNT):
shared_resource_lock.acquire()
shared_resource_with_lock -= 1
shared_resource_lock.release()
# 没有锁的情况
def increment_without_lock():
global shared_resource_with_no_lock
for i in range(COUNT):
shared_resource_with_no_lock += 1
def decrement_without_lock():
global shared_resource_with_no_lock
for i in range(COUNT):
shared_resource_with_no_lock -= 1
if __name__ == "__main__":
t1 = threading.Thread(target=increment_with_lock)
t2 = threading.Thread(target=decrement_with_lock)
t3 = threading.Thread(target=increment_without_lock)
t4 = threading.Thread(target=decrement_without_lock)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
print ("the value of shared variable with lock management is %s" % shared_resource_with_lock)
print ("the value of shared variable with race condition is %s" % shared_resource_with_no_lock)
代码执行的结果如下:

可以看出,如果有锁来管理线程的话,我们会得到正确的结果。这里要注意,没有锁的情况下并不一定会得到错误的结果,但是重复执行多次,总会出现错误的结果。而有锁的情况结果总会是正确的。
讨论¶
在主程序中,我们有以下步骤:
t1 = threading.Thread(target=increment_with_lock)
t2 = threading.Thread(target=decrement_with_lock)
启动线程:
t1.start()
t2.start()
然后阻塞主线程直到所有线程完成:
t1.join()
t2.join()
在 increment_with_lock()
函数和 decrement_with_lock()
函数中,可以看到我们使用了lock语句。当你需要使用资源的时候,调用 acquire()
拿到锁(如果锁暂时不可用,会一直等待直到拿到),最后调用 release()
:
shared_resource_lock.acquire()
shared_resource_with_lock -= 1
shared_resource_lock.release()
让我们总结一下:
- 锁有两种状态: locked(被某一线程拿到)和unlocked(可用状态)
- 我们有两个方法来操作锁:
acquire()
和release()
需要遵循以下规则:
- 如果状态是unlocked, 可以调用
acquire()
将状态改为locked - 如果状态是locked,
acquire()
会被block直到另一线程调用release()
释放锁 - 如果状态是unlocked, 调用
release()
将导致RuntimError
异常 - 如果状态是locked, 可以调用
release()
将状态改为unlocked
了解更多¶
尽管理论上行得通,但是锁的策略不仅会导致有害的僵持局面。还会对应用程序的其他方面产生负面影响。这是一种保守的方法,经常会引起不必要的开销,也会限制程序的可扩展性和可读性。更重要的是,有时候需要对多进程共享的内存分配优先级,使用锁可能和这种优先级冲突。最后,从实践的经验来看,使用锁的应用将对debug带来不小的麻烦。所以,最好使用其他可选的方法确保同步读取共享内存,避免竞争条件。
使用RLock进行线程同步¶
如果你想让只有拿到锁的线程才能释放该锁,那么应该使用 RLock()
对象。和 Lock()
对象一样, RLock()
对象有两个方法: acquire()
和 release()
。当你需要在类外面保证线程安全,又要在类内使用同样方法的时候 RLock()
就很实用了。
(译者注:RLock原作解释的太模糊了,译者在此擅自添加一段。RLock其实叫做“Reentrant Lock”,就是可以重复进入的锁,也叫做“递归锁”。这种锁对比Lock有是三个特点:1. 谁拿到谁释放。如果线程A拿到锁,线程B无法释放这个锁,只有A可以释放;2. 同一线程可以多次拿到该锁,即可以acquire多次;3. acquire多少次就必须release多少次,只有最后一次release才能改变RLock的状态为unlocked)
如何做…¶
在示例代码中,我们引入了 Box
类,有 add()
方法和 remove()
方法,提供了进入 execute()
方法的入口。 execute()
的执行由 Rlock()
控制:
import threading
import time
class Box(object):
lock = threading.RLock()
def __init__(self):
self.total_items = 0
def execute(self, n):
Box.lock.acquire()
self.total_items += n
Box.lock.release()
def add(self):
Box.lock.acquire()
self.execute(1)
Box.lock.release()
def remove(self):
Box.lock.acquire()
self.execute(-1)
Box.lock.release()
## These two functions run n in separate
## threads and call the Box's methods
def adder(box, items):
while items > 0:
print("adding 1 item in the box")
box.add()
time.sleep(1)
items -= 1
def remover(box, items):
while items > 0:
print("removing 1 item in the box")
box.remove()
time.sleep(1)
items -= 1
## the main program build some
## threads and make sure it works
if __name__ == "__main__":
items = 5
print("putting %s items in the box " % items)
box = Box()
t1 = threading.Thread(target=adder, args=(box, items))
t2 = threading.Thread(target=remover, args=(box, items))
t1.start()
t2.start()
t1.join()
t2.join()
print("%s items still remain in the box " % box.total_items)
运行结果如下:

讨论¶
主程序的代码几乎和之前的例子一样。两个线程 t1
和 t2
分别分配了 adder()
函数和 remover()
函数。当item的数量大于0的时候,函数工作。调用 RLock()
的位置是在 Box
类内:
class Box(object):
lock = threading.RLock()
adder()
和 remover()
两个函数在 Box
类内操作items,即调用 Box
类的方法: add()
和 remove()
。每一次方法调用,都会有一次拿到资源然后释放资源的过程。至于 lock()
对象, RLock()
对象有 acquire()
和 release()
方法可以拿到或释放资源;然后每一次方法调用中,我们都有以下操作:
Box.lock.acquire()
# ...do something
Box.lock.release()
使用信号量进行线程同步¶
信号量由E.Dijkstra发明并第一次应用在操作系统中,信号量是由操作系统管理的一种抽象数据类型,用于在多线程中同步对共享资源的使用。本质上说,信号量是一个内部数据,用于标明当前的共享资源可以有多少并发读取。
同样的,在threading模块中,信号量的操作有两个函数,即 acquire()
和 release()
,解释如下:
- 每当线程想要读取关联了信号量的共享资源时,必须调用
acquire()
,此操作减少信号量的内部变量, 如果此变量的值非负,那么分配该资源的权限。如果是负值,那么线程被挂起,直到有其他的线程释放资源。 - 当线程不再需要该共享资源,必须通过
release()
释放。这样,信号量的内部变量增加,在信号量等待队列中排在最前面的线程会拿到共享资源的权限。

虽然表面上看信号量机制没什么明显的问题,如果信号量的等待和通知操作都是原子的,确实没什么问题。但如果不是,或者两个操作有一个终止了,就会导致糟糕的情况。
举个例子,假设有两个并发的线程,都在等待一个信号量,目前信号量的内部值为1。假设第线程A将信号量的值从1减到0,这时候控制权切换到了线程B,线程B将信号量的值从0减到-1,并且在这里被挂起等待,这时控制权回到线程A,信号量已经成为了负值,于是第一个线程也在等待。
这样的话,尽管当时的信号量是可以让线程访问资源的,但是因为非原子操作导致了所有的线程都在等待状态。
准备工作¶
下面的代码展示了信号量的使用,我们有两个线程, producer()
和 consumer()
,它们使用共同的资源,即item。 producer()
的任务是生产item, consumer()
的任务是消费item。
当item还没有被生产出来, consumer()
一直等待,当item生产出来, producer()
线程通知消费者资源可以使用了。
如何做…¶
在以下的代码中,我们使用生产者-消费者模型展示通过信号量的同步。当生产者生产出item,便释放信号量。然后消费者拿到资源进行消费。
# -*- coding: utf-8 -*-
"""Using a Semaphore to synchronize threads"""
import threading
import time
import random
# The optional argument gives the initial value for the internal
# counter;
# it defaults to 1.
# If the value given is less than 0, ValueError is raised.
semaphore = threading.Semaphore(0)
def consumer():
print("consumer is waiting.")
# Acquire a semaphore
semaphore.acquire()
# The consumer have access to the shared resource
print("Consumer notify : consumed item number %s " % item)
def producer():
global item
time.sleep(10)
# create a random item
item = random.randint(0, 1000)
print("producer notify : produced item number %s" % item)
# Release a semaphore, incrementing the internal counter by one.
# When it is zero on entry and another thread is waiting for it
# to become larger than zero again, wake up that thread.
semaphore.release()
if __name__ == '__main__':
for i in range (0,5) :
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print("program terminated")
程序会运行5轮,结果如下:

讨论¶
信号量被初始化为0,此信号量唯一的目的是同步两个或多个线程。在这里,我们的线程必须并行运行,所以需要信号量同步:
semaphore = threading.Semaphore(0)
这个操作和lock中的机制非常相似, producer()
完成创建item之后,释放资源:
semaphore.release()
信号量的 release()
可以提高计数器然后通知其他的线程。同样的, consumer()
方法可以通过下面的方法拿到资源:
semaphore.acquire()
如果信号量的计数器到了0,就会阻塞 acquire()
方法,直到得到另一个线程的通知。如果信号量的计数器大于0,就会对这个值-1然后分配资源。
最后,拿到数据并打印输出:
print("Consumer notify : consumed item number %s " % item)
了解更多¶
信号量的一个特殊用法是互斥量。互斥量是初始值为1的信号量,可以实现数据、资源的互斥访问。
信号量在支持多线程的编程语言中依然应用很广,然而这可能导致死锁的情况。例如,现在有一个线程t1先等待信号量s1,然后等待信号量s2,而线程t2会先等待信号量s2,然后再等待信号量s1,这样就可能会发生死锁,导致t1等待s2,但是t2在等待s1。
使用条件进行线程同步¶
条件指的是应用程序状态的改变。这是另一种同步机制,其中某些线程在等待某一条件发生,其他的线程会在该条件发生的时候进行通知。一旦条件发生,线程会拿到共享资源的唯一权限。
准备工作¶
解释条件机制最好的例子还是生产者-消费者问题。在本例中,只要缓存不满,生产者一直向缓存生产;只要缓存不空,消费者一直从缓存取出(之后销毁)。当缓冲队列不为空的时候,生产者将通知消费者;当缓冲队列不满的时候,消费者将通知生产者。
如何做…¶
为了演示条件机制,我们将再一次使用生产者-消费者的例子:
from threading import Thread, Condition
import time
items = []
condition = Condition()
class consumer(Thread):
def __init__(self):
Thread.__init__(self)
def consume(self):
global condition
global items
condition.acquire()
if len(items) == 0:
condition.wait()
print("Consumer notify : no item to consume")
items.pop()
print("Consumer notify : consumed 1 item")
print("Consumer notify : items to consume are " + str(len(items)))
condition.notify()
condition.release()
def run(self):
for i in range(0, 20):
time.sleep(2)
self.consume()
class producer(Thread):
def __init__(self):
Thread.__init__(self)
def produce(self):
global condition
global items
condition.acquire()
if len(items) == 10:
condition.wait()
print("Producer notify : items producted are " + str(len(items)))
print("Producer notify : stop the production!!")
items.append(1)
print("Producer notify : total items producted " + str(len(items)))
condition.notify()
condition.release()
def run(self):
for i in range(0, 20):
time.sleep(1)
self.produce()
if __name__ == "__main__":
producer = producer()
consumer = consumer()
producer.start()
consumer.start()
producer.join()
consumer.join()
运行的结果如下:

讨论¶
(译者在这里添加一段。乍一看这段代码好像会死锁,因为 condition.acquire()
之后就在 .wait()
了,好像会一直持有锁。其实 .wait()
会将锁释放,然后等待其他线程 .notify()
之后会重新尝试获得锁。但是要注意 .notify()
并不会自动释放锁,所以代码中有两行,先 .notify()
然后再 .release()
。
译者画了一张图,方便大家理解。这里的过程应该是这样子的(注意 wait()
里面实际有一个释放锁重新获得锁的过程):

译者的私货完毕,建议看一下官方文档: https://docs.python.org/3/library/threading.html )
消费者通过拿到锁来修改共享的资源 items[]
:
condition.acquire()
如果list的长度为0,那么消费者就进入等待状态:
if len(items) == 0:
condition.wait()
否则就通过 pop
操作消费一个item:
items.pop()
然后,消费者的状态被通知给生产者,同时共享资源释放:
condition.notify()
condition.release()
生产者拿到共享资源,然后确认缓冲队列是否已满(在我们的这个例子中,最大可以存放10个item),如果已经满了,那么生产者进入等待状态,直到被唤醒:
condition.acquire()
if len(items) == 10:
condition.wait()
如果队列没有满,就生产1个item,通知状态并释放资源:
condition.notify()
condition.release()
了解更多¶
Python对条件同步的实现很有趣。如果没有已经存在的锁传给构造器的话,内部的 _Condition
会创建一个 RLock()
对象。同时,这个RLock也会通过 acquire()
和 release()
管理:
class _Condition(_Verbose):
def __init__(self, lock=None, verbose=None):
_Verbose.__init__(self, verbose)
if lock is None:
lock = RLock()
self.__lock = lock
(以下又是笔者的私货,最近看到一道面试题是这样的,开3个线程按照顺序打印ABC 10次。正好是 Condition 的使用场景。我把我写的代码贴在这里供大家参考。
# -*- coding: utf-8 -*-
"""
Three threads print A B C in order.
"""
from threading import Thread, Condition
condition = Condition()
current = "A"
class ThreadA(Thread):
def run(self):
global current
for _ in range(10):
with condition:
while current != "A":
condition.wait()
print("A")
current = "B"
condition.notify_all()
class ThreadB(Thread):
def run(self):
global current
for _ in range(10):
with condition:
while current != "B":
condition.wait()
print("B")
current = "C"
condition.notify_all()
class ThreadC(Thread):
def run(self):
global current
for _ in range(10):
with condition:
while current != "C":
condition.wait()
print("C")
current = "A"
condition.notify_all()
a = ThreadA()
b = ThreadB()
c = ThreadC()
a.start()
b.start()
c.start()
a.join()
b.join()
c.join()
原理很简单,就是线程拿到锁先检查是不是自己渴望的状态。比如打印“B”的线程,渴望的状态 current = 'B'
然后打印出B,将状态改成 C
,这样就成了打印“C”的线程渴望的状态。
但是这里不能唤醒指定的线程,只好唤醒所有的线程,让他们自己再检查一遍状态了。)
使用事件进行线程同步¶
事件是线程之间用于通讯的对象。有的线程等待信号,有的线程发出信号。基本上事件对象都会维护一个内部变量,可以通过 set()
方法设置为 true
,也可以通过 clear()
方法设置为 false
。 wait()
方法将会阻塞线程,直到内部变量为 true
。
如何做…¶
为了理解通过事件对象实现的线程同步,让我们再一次回到生产者-消费者问题上:
# -*- coding: utf-8 -*-
import time
from threading import Thread, Event
import random
items = []
event = Event()
class consumer(Thread):
def __init__(self, items, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
while True:
time.sleep(2)
self.event.wait()
item = self.items.pop()
print('Consumer notify : %d popped from list by %s' % (item, self.name))
class producer(Thread):
def __init__(self, items, event):
Thread.__init__(self)
self.items = items
self.event = event
def run(self):
global item
for i in range(100):
time.sleep(2)
item = random.randint(0, 256)
self.items.append(item)
print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
print('Producer notify : event set by %s' % self.name)
self.event.set()
print('Produce notify : event cleared by %s '% self.name)
self.event.clear()
if __name__ == '__main__':
t1 = producer(items, event)
t2 = consumer(items, event)
t1.start()
t2.start()
t1.join()
t2.join()
下图是我运行程序时候的运行结果。 线程t1在list最后添加值,然后设置event来通知消费者。消费者通过 wait()
阻塞,直到收到信号的时候从list中取出元素消费。

讨论¶
producer
类初始化时定义了item的list和 Event
,与条件对象时候的例子不同,这里的list并不是全局的,而是通过参数传入的:
class consumer(Thread):
def __init__(self, items, event):
Thread.__init__(self)
self.items = items
self.event = event
在run方法中,每当item创建, producer
类将新item添加到list末尾然后发出事件通知。使用事件有两步,第一步:
self.event.set()
第二步:
self.event.clear()
consumer
类初始化时也定义了item的list和 Event()
。当item进来的时候,它将其取出:
def run(self):
while True:
time.sleep(2)
self.event.wait()
item = self.items.pop()
print('Consumer notify : %d popped from list by %s' % (item, self.name))
下图可以帮我们认识 producer
和 consumer
:

使用with语法¶
Python从2.5版本开始引入了 with
语法。此语法非常实用,在有两个相关的操作需要在一部分代码块前后分别执行的时候,可以使用 with
语法自动完成。同事,使用 with
语法可以在特定的地方分配和释放资源,因此, with
语法也叫做“上下文管理器”。在threading模块中,所有带有 acquire()
方法和 release()
方法的对象都可以使用上下文管理器。
也就是说,下面的对象可以使用 with
语法:
- Lock
- RLock
- Condition
- Semaphore
准备工作¶
在本节中,我们将使用 with
语法简单地尝试这四个对象。
如何做…¶
下面的例子展示了 with
语法的基本用法,我们有一系列的同步原语,下面尝试用 with
来使用它们:
import threading
import logging
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
def threading_with(statement):
with statement:
logging.debug('%s acquired via with' % statement)
def threading_not_with(statement):
statement.acquire()
try:
logging.debug('%s acquired directly' % statement )
finally:
statement.release()
if __name__ == '__main__':
# let's create a test battery
lock = threading.Lock()
rlock = threading.RLock()
condition = threading.Condition()
mutex = threading.Semaphore(1)
threading_synchronization_list = [lock, rlock, condition, mutex]
# in the for cycle we call the threading_with e threading_no_with function
for statement in threading_synchronization_list :
t1 = threading.Thread(target=threading_with, args=(statement,))
t2 = threading.Thread(target=threading_not_with, args=(statement,))
t1.start()
t2.start()
t1.join()
t2.join()
下图展示了使用 with
的每一个函数以及用在了什么地方:

讨论¶
在主程序中,我们定义了一个list, threading_synchronization_list
,包含要测试的线程同步使用的对象:
lock = threading.Lock()
rlock = threading.RLock()
condition = threading.Condition()
mutex = threading.Semaphore(1)
threading_synchronization_list = [lock, rlock, condition, mutex]
定义之后,我们可以在 for
循环中测试每一个对象:
for statement in threading_synchronization_list :
t1 = threading.Thread(target=threading_with, args=(statement,))
t2 = threading.Thread(target=threading_not_with, args=(statement,))
最后,我们有两个目标函数,其中 threading_with
测试了 with
语法:
def threading_with(statement):
with statement:
logging.debug('%s acquired via with' % statement)
了解更多¶
在本例中,我们使用了Python的logging模块进行输出:
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)
使用 % (threadName)
可以在每次输出的信息都加上线程的名字。logging模块是线程安全的。这样我们可以区分出不同线程的输出。
译者注:译者在博客上写过一篇有关Python的with语句的文章,可以参考一下:https://www.kawabangga.com/posts/2010
使用 queue
进行线程通信¶
前面我们已经讨论到,当线程之间如果要共享资源或数据的时候,可能变的非常复杂。如你所见,Python的threading模块提供了很多同步原语,包括信号量,条件变量,事件和锁。如果可以使用这些原语的话,应该优先考虑使用这些,而不是使用queue(队列)模块。队列操作起来更容易,也使多线程编程更安全,因为队列可以将资源的使用通过单线程进行完全控制,并且允许使用更加整洁和可读性更高的设计模式。
Queue常用的方法有以下四个:
put()
: 往queue中放一个itemget()
: 从queue删除一个item,并返回删除的这个itemtask_done()
: 每次item被处理的时候需要调用这个方法join()
: 所有item都被处理之前一直阻塞
如何做…¶
在本例中,我们将学习如何在threading模块中使用queue。同样,本例中将会有两个实体试图共享临界资源,一个队列。代码如下:
from threading import Thread, Event
from queue import Queue
import time
import random
class producer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self) :
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
print('Producer notify: item N° %d appended to queue by %s' % (item, self.name))
time.sleep(1)
class consumer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
item = self.queue.get()
print('Consumer notify : %d popped from queue by %s' % (item, self.name))
self.queue.task_done()
if __name__ == '__main__':
queue = Queue()
t1 = producer(queue)
t2 = consumer(queue)
t3 = consumer(queue)
t4 = consumer(queue)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
代码的运行结果如下:

讨论¶
首先,我们创建一个生产者类。由于我们使用队列存放数字,所以不需要用来存放数字的list了。
class producer(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
producer
类生产整数,然后通过一个 for
循环将整数放到队列中:
def run(self) :
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
print('Producer notify: item N° %d appended to queue by %s' % (item, self.name))
time.sleep(1)
生产者使用 Queue.put(item [,block[, timeout]])
来往queue中插入数据。Queue是同步的,在插入数据之前内部有一个内置的锁机制。
可能发生两种情况:
- 如果
block
为True
,timeout
为None
(这也是默认的选项,本例中使用默认选项),那么可能会阻塞掉,直到出现可用的位置。如果timeout
是正整数,那么阻塞直到这个时间,就会抛出一个异常。 - 如果
block
为False
,如果队列有闲置那么会立即插入,否则就立即抛出异常(timeout
将会被忽略)。本例中,put()
检查队列是否已满,然后调用wait()
开始等待。
消费者从队列中取出整数然后用 task_done()
方法将其标为任务已处理。
消费者使用 Queue.get([block[, timeout]])
从队列中取回数据,queue内部也会经过锁的处理。如果队列为空,消费者阻塞。
最后,在主程序中,我们创建线程t作为生产者,t1, t2, t3作为消费者:
if __name__ == '__main__':
queue = Queue()
t1 = producer(queue)
t2 = consumer(queue)
t3 = consumer(queue)
t4 = consumer(queue)
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
生产者和消费者之间的操作可以用下图来描述:

评估多线程应用的性能¶
在本节中,我们将验证GIL的影响,评估多线程应用的性能。前文已经介绍过,GIL是CPython解释器引入的锁,GIL在解释器层面阻止了真正的并行运行。解释器在执行任何线程之前,必须等待当前正在运行的线程释放GIL。事实上,解释器会强迫想要运行的线程必须拿到GIL才能访问解释器的任何资源,例如栈或Python对象等。这也正是GIL的目的——阻止不同的线程并发访问Python对象。这样GIL可以保护解释器的内存,让垃圾回收工作正常。但事实上,这却造成了程序员无法通过并行执行多线程来提高程序的性能。如果我们去掉CPython的GIL,就可以让多线程真正并行执行。GIL并没有影响多处理器并行的线程,只是限制了一个解释器只能有一个线程在运行。
如何做…¶
下面的代码是用来评估多线程应用性能的简单工具。下面的每一个测试都循环调用函数100次,重复执行多次,取速度最快的一次。在 for
循环中,我们调用 non_threaded
和 threaded
函数。同时,我们会不断增加调用次数和线程数来重复执行这个测试。我们会尝试使用1,2,3,4和8线程数来调用线程。在非线程的测试中,我们顺序调用函数与对应线程数一样多的次数。为了保持简单,度量的指标使用Python的内建模块timer。
代码如下:
from threading import Thread
class threads_object(Thread):
def run(self):
function_to_run()
class nothreads_object(object):
def run(self):
function_to_run()
def non_threaded(num_iter):
funcs = []
for i in range(int(num_iter)):
funcs.append(nothreads_object())
for i in funcs:
i.run()
def threaded(num_threads):
funcs = []
for i in range(int(num_threads)):
funcs.append(threads_object())
for i in funcs:
i.start()
for i in funcs:
i.join()
def function_to_run():
pass
def show_results(func_name, results):
print("%-23s %4.6f seconds" % (func_name, results))
if __name__ == "__main__":
import sys
from timeit import Timer
repeat = 100
number = 1
num_threads = [1, 2, 4, 8]
print('Starting tests')
for i in num_threads:
t = Timer("non_threaded(%s)" % i, "from __main__ import non_threaded")
best_result = min(t.repeat(repeat=repeat, number=number))
show_results("non_threaded (%s iters)" % i, best_result)
t = Timer("threaded(%s)" % i, "from __main__ import threaded")
best_result = min(t.repeat(repeat=repeat, number=number))
show_results("threaded (%s threads)" % i, best_result)
print('Iterations complete')
讨论¶
我们一共进行了四次测试(译者注:原文是three,我怀疑原作者不识数,原文的3个线程数也没有写在代码里),每一次都会使用不同的function进行测试,只要改变 function_to_run()
就可以了。
测试用的机器是 Core 2 Duo CPU – 2.33Ghz。
第一次测试¶
在第一次测试中,我们使用了一个简单的空函数:
def function_to_run():
pass
下图展示了我们测试的每个机制的运行速度:

通过结果可以发现,使用线程的开销要比不使用线程的开销大的多。特别的,我们发现随着线程的数量增加,带来的开销是成比例的。4个线程的运行时间是0.0007143秒,8个线程的运行时间是0.001397秒。
第二次测试¶
多线程比较常用的一个用途是处理数字,下面的测试计算斐波那契数列,注意这个例子中没有共享的资源,只是测试生成数字数列:
def function_to_run():
a, b = 0, 1
for i in range(10000):
a, b = b, a + b
输出如下:

在输出中可以看到,提高线程的数量并没有带来收益。因为GIL和线程管理代码的开销,多线程运行永远不可能比函数顺序执行更快。再次提醒一下:GIL只允许解释器一次执行一个线程。
第三次测试¶
下面的测试是读1kb的数据1000次,测试用的函数如下:
def function_to_run():
fh=open("C:\\CookBookFileExamples\\test.dat","rb")
size = 1024
for i in range(1000):
fh.read(size)
测试的结果如下:

我们终于看到多线程比非多线程跑的好的情况了,而且多线程只用了一半的时间。这给我们的启示是,多线程并不是一个标准。一般,我们将会将多线程放入一个队列中,将它们放到一边,执行其他任务。使用多线程执行同一个相同的任务有时候很有用,但用到的时候很少,除非需要大量处理数据输入。
第四次测试¶
在最后的测试中,我们使用 urllib.request
测试,这是一个Python模块,可以发送URL请求。此模块基于 socket
,使用C语言编写并且是线程安全的。
下面的代码尝试读取 https://www.packpub.com
的主页并且读取前1k的数据:
def function_to_run():
import urllib.request
for i in range(10):
with urllib.request.urlopen("https://www.packtpub.com/")as f:
f.read(1024)
运行结果如下:

可以看到,在 I/O 期间,GIL释放了。多线程执行比单线程快的多。鉴于大多数应用需要很多I/O操作,GIL并没有限制程序员在这方面使用多线程对程序进行性能优化。
了解更多¶
你应该记住,增加线程并不会提高应用启动的时间,但是可以支持并发。例如,一次性创建一个线程池,并重用worker会很有用。这可以让我们切分一个大的数据集,用同样的函数处理不同的部分(生产者消费者模型)。上面这些测试并不是并发应用的模型,只是尽量简单的测试。那么GIL会成为试图发挥多线程应用潜能的纯Python开发的瓶颈吗?是的。线程是编程语言的架构,CPython解释器是线程和操作系统的桥梁。这就是为什么Jython,IronPython没有GIL的原因(译者注:Pypy也没有),因为它不是必要的。
第三章 基于进程的并行¶
译者注:本章有关进程,用到了 mpi4py
这个库,我在 Mac OS 上安装遇到些问题,最后没有安装成功,本章的翻译可能有潜在的问题,代码我没有自己运行过,读者请留意。
介绍¶
在之前的章节中,我们见识了如何用线程实现并发的应用。本章节将会介绍基于进程的并行。本章的重点将会集中在Python的 multiprocessing
和 mpi4py
这两个模块上。
multiprocessing
是Python标准库中的模块,实现了共享内存机制,也就是说,可以让运行在不同处理器核心的进程能读取共享内存。
mpi4py
库实现了消息传递的编程范例(设计模式)。简单来说,就是进程之间不靠任何共享信息来进行通讯(也叫做shared nothing),所有的交流都通过传递信息代替。
这方面与使用共享内存通讯,通过加锁或类似机制实现互斥的技术行成对比。在信息传递的代码中,进程通过 send()
和 receive
进行交流。
在Python多进程的官方文档中,明确指出 multiprocessing
模块要求,使用此模块的函数的main模块对子类来说必须是可导入的( https://docs.python.org/3.3/library/multiprocessing.html )。
__main__
在IDLE中并不是可以导入的,即使你在IDLE中将文件当做一个脚本来运行。为了能正确使用此模块,本章我们将在命令行使用下面的命令运行脚本:
python multiprocessing example.py
这里, multiprocessing_example.py
是脚本的文件名。本章使用的解释器是Python3.3(实际上使用Python2.7也是可以的)。
(译者注,抱歉,这段译者无法理解原文和Python文档的意思。不过通过实验,我发现要多进程运行一个函数,这个函数必须从外部文件导入。比如说下面这样就不行:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
上面脚本来自Python官网文档。如果稍作修改,将需要多进程运行的目标函数放到文件里导入运行,就没有问题了:
In [1]: from multiprocessing import Pool
In [2]: p = Pool(5)
In [4]: import func
In [5]: p.map(func.f, [1,2,3])
Out[5]: [1, 4, 9]
译者注结束)
如何产生一个进程¶
“产生”(spawn)的意思是,由父进程创建子进程。父进程既可以在产生子进程之后继续异步执行,也可以暂停等待子进程创建完成之后再继续执行。Python的multiprocessing库通过以下几步创建进程:
- 创建进程对象
- 调用
start()
方法,开启进程的活动 - 调用
join()
方法,在进程结束之前一直等待
如何做…¶
下面的例子创建了5个进程,每一个进程都分配了 foo(i)
函数, i
表示进程的id:
# -*- coding: utf-8 -*-
import multiprocessing
def foo(i):
print ('called function in process: %s' %i)
return
if __name__ == '__main__':
Process_jobs = []
for i in range(5):
p = multiprocessing.Process(target=foo, args=(i,))
Process_jobs.append(p)
p.start()
p.join()
执行本例需要打开命令行,到文件 spawn_a_process.py
(脚本名字)所在的目录下,然后输入下面的命令执行:
python spawn_a_process.py
我们会得到以下结果:
$ python process_2.py
called function in process: 0
called function in process: 1
called function in process: 2
called function in process: 3
called function in process: 4
讨论¶
按照本节前面提到的步骤,创建进程对象首先需要引入multiprocessing模块:
import multiprocessing
然后,我们在主程序中创建进程对象:
p = multiprocessing.Process(target=foo, args=(i,))
最后,我们调用 start()
方法启动:
p.start()
进程对象的时候需要分配一个函数,作为进程的执行任务,本例中,这个函数是 foo()
。我们可以用元组的形式给函数传递一些参数。最后,使用进程对象调用 join()
方法。
如果没有 join()
,主进程退出之后子进程会留在idle中,你必须手动杀死它们。
了解更多¶
这是因为,子进程创建的时候需要导入包含目标函数的脚本。通过在 __main__
代码块中实例化进程对象,我们可以预防无限递归调用。最佳实践是在不同的脚本文件中定义目标函数,然后导入进来使用。所以上面的代码可以修改为:
import multiprocessing
import target_function
if __name__ == '__main__':
Process_jobs = []
for i in range(5):
p = multiprocessing.Process(target=target_function.function,args=(i,))
Process_jobs.append(p)
p.start()
p.join()
target_function.py
的内容如下:
def function(i):
print('called function in process: %s' %i)
return
输出和上面一样。
如何为一个进程命名¶
在上一节的例子中,我们创建了一个进程,并为其分配了目标函数和函数变量。然而如果能给进程分配一个名字,那么debug的时候就更方便了。
如何做…¶
命名进程的方法和前一章中介绍的命名线程差不多(参考第二章,基于线程的并行,第四节,如何确定当前的线程)。
下面的代码在主程序中创建了一个有名字的进程和一个没有名字的进程,目标函数都是 foo()
函数。
# 命名一个进程
import multiprocessing
import time
def foo():
name = multiprocessing.current_process().name
print("Starting %s \n" % name)
time.sleep(3)
print("Exiting %s \n" % name)
if __name__ == '__main__':
process_with_name = multiprocessing.Process(name='foo_process', target=foo)
process_with_name.daemon = True # 注意原代码有这一行,但是译者发现删掉这一行才能得到正确输出
process_with_default_name = multiprocessing.Process(target=foo)
process_with_name.start()
process_with_default_name.start()
运行上面的代码,打开终端输入:
python naming_process.py
输出的结果如下:
$ python naming_process.py
Starting foo_process
Starting Process-2
Exiting foo_process
Exiting Process-2
讨论¶
这个过程和命名线程很像。命名进程需要为进程对象提供 name
参数:
process_with_name = multiprocessing.Process(name='foo_process', target=foo)
在本例子中,进程的名字就是 foo_function
。如果子进程需要知道父进程的名字,可以使用以下声明:
name = multiprocessing.current_process().name
然后就能看见父进程的名字。
如何在后台运行一个进程¶
如果需要处理比较巨大的任务,又不需要人为干预,将其作为后台进程执行是个非常常用的编程模型。此进程又可以和其他进程并发执行。通过Python的multiprocessing模块的后台进程选项,我们可以让进程在后台运行。
如何做…¶
运行后台进程,可以参照以下代码:
import multiprocessing
import time
def foo():
name = multiprocessing.current_process().name
print("Starting %s " % name)
time.sleep(3)
print("Exiting %s " % name)
if __name__ == '__main__':
background_process = multiprocessing.Process(name='background_process', target=foo)
background_process.daemon = True
NO_background_process = multiprocessing.Process(name='NO_background_process', target=foo)
NO_background_process.daemon = False
background_process.start()
NO_background_process.start()
运行上面的脚本,需要使用下面的命令:
python background_process.py
最后的输出如下:
$ python background_process.py
Starting NO_background_process
Exiting NO_background_process
讨论¶
为了在后台运行进程,我们设置 daemon
参数为 True
background_process.daemon = True
在非后台运行的进程会看到一个输出,后台运行的没有输出,后台运行进程在主进程结束之后会自动结束。
了解更多¶
注意,后台进程不允许创建子进程。否则,当后台进程跟随父进程退出的时候,子进程会变成孤儿进程。另外,它们并不是Unix的守护进程或服务(daemons or services),所以当非后台进程退出,它们会被终结。
如何杀掉一个进程¶
我们可以使用 terminate()
方法立即杀死一个进程。另外,我们可以使用 is_alive()
方法来判断一个进程是否还存活。
如何做…¶
在本例中,创建一个目标函数为 foo()
的进程。启动之后,我们通过 terminate()
方法杀死它。
# 杀死一个进程
import multiprocessing
import time
def foo():
print('Starting function')
time.sleep(0.1)
print('Finished function')
if __name__ == '__main__':
p = multiprocessing.Process(target=foo)
print('Process before execution:', p, p.is_alive())
p.start()
print('Process running:', p, p.is_alive())
p.terminate()
print('Process terminated:', p, p.is_alive())
p.join()
print('Process joined:', p, p.is_alive())
print('Process exit code:', p.exitcode)
输出如下:

讨论¶
我们创建了一个线程,然后用 is_alive()
方法监控它的声明周期。然后通过调用 terminate()
方法结束进程。
最后,我们通过读进程的 ExitCode
状态码(status code)验证进程已经结束, ExitCode
可能的值如下:
- == 0: 没有错误正常退出
- > 0: 进程有错误,并以此状态码退出
- < 0: 进程被
-1 *
的信号杀死并以此作为 ExitCode 退出
在我们的例子中,输出的 ExitCode
是 -15
。负数表示子进程被数字为15的信号杀死。
如何在子类中使用进程¶
实现一个自定义的进程子类,需要以下三步:
- 定义
Process
的子类 - 覆盖
__init__(self [,args])
方法来添加额外的参数 - 覆盖
run(self, [.args])
方法来实现Process
启动的时候执行的任务
创建 Porcess
子类之后,你可以创建它的实例并通过 start()
方法启动它,启动之后会运行 run()
方法。
如何做…¶
我们将使用子类的形式重写之前的例子:
# -*- coding: utf-8 -*-
# 自定义子类进程
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print ('called run method in process: %s' % self.name)
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = MyProcess()
jobs.append(p)
p.start()
p.join()
输入以下命令运行脚本:
python subclass_process.py
运行结果如下:
$ python subclass.py
called run method in process: MyProcess-1
called run method in process: MyProcess-2
called run method in process: MyProcess-3
called run method in process: MyProcess-4
called run method in process: MyProcess-5
讨论¶
每一个继承了 Process
并重写了 run()
方法的子类都代表一个进程。此方法是进程的入口:
class MyProcess(multiprocessing.Process):
def run(self):
print ('called run method in process: %s' % self.name)
return
在主程序中,我们创建了一些 MyProcess()
的子类。当 start()
方法被调用的时候进程开始执行:
p = MyProcess()
p.start()
join()
命令可以让主进程等待其他进程结束最后退出。
如何在进程之间交换对象¶
并行应用常常需要在进程之间交换数据。Multiprocessing库有两个Communication Channel可以交换对象:队列(queue)和管道(pipe)。

使用队列交换对象¶
我们可以通过队列数据结构来共享对象。
Queue
返回一个进程共享的队列,是线程安全的,也是进程安全的。任何可序列化的对象(Python通过 pickable
模块序列化对象)都可以通过它进行交换。
如何做…¶
在下面的例子中,我们将展示如何使用队列来实现生产者-消费者问题。 Producer
类生产item放到队列中,然后 Consumer
类从队列中移除它们。
import multiprocessing
import random
import time
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
print("Process Producer : item %d appended to queue %s" % (item, self.name))
time.sleep(1)
print("The size of queue is %s" % self.queue.qsize())
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
if self.queue.empty():
print("the queue is empty")
break
else:
time.sleep(2)
item = self.queue.get()
print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
time.sleep(1)
if __name__ == '__main__':
queue = multiprocessing.Queue()
process_producer = Producer(queue)
process_consumer = Consumer(queue)
process_producer.start()
process_consumer.start()
process_producer.join()
process_consumer.join()
运行结果如下(译者注:macOS High Sierra运行失败,错误是 NotImplementedError
可能是因为 self._sem._semlock._get_value()
没有实现):
C:\Python CookBook\Chapter 3 - Process Based Parallelism\Example Codes
Chapter 3>python using_queue.py
Process Producer : item 69 appended to queue producer-1
The size of queue is 1
Process Producer : item 168 appended to queue producer-1
The size of queue is 2
Process Consumer : item 69 popped from by consumer-2
Process Producer : item 235 appended to queue producer-1
The size of queue is 2
Process Producer : item 152 appended to queue producer-1
The size of queue is 3
Process Producer : item 213 appended to queue producer-1
Process Consumer : item 168 popped from by consumer-2
The size of queue is 3
Process Producer : item 35 appended to queue producer-1
The size of queue is 4
Process Producer : item 218 appended to queue producer-1
The size of queue is 5
Process Producer : item 175 appended to queue producer-1
Process Consumer : item 235 popped from by consumer-2
The size of queue is 5
Process Producer : item 140 appended to queue producer-1
The size of queue is 6
Process Producer : item 241 appended to queue producer-1
The size of queue is 7
Process Consumer : item 152 popped from by consumer-2
Process Consumer : item 213 popped from by consumer-2
Process Consumer : item 35 popped from by consumer-2
Process Consumer : item 218 popped from by consumer-2
Process Consumer : item 175 popped from by consumer-2
Process Consumer : item 140 popped from by consumer-2
Process Consumer : item 241 popped from by consumer-2
the queue is empty
如何做…¶
我们使用 multiprocessing
类在主程序中创建了 Queue
的实例:
if __name__ == '__main__':
queue = multiprocessing.Queue()
然后我们创建了两个进程,生产者和消费者, Queue
对象作为一个属性。
process_producer = Producer(queue)
process_consumer = Consumer(queue)
生产者类负责使用 put()
方法放入10个item:
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
消费者进程负责使用 get()
方法从队列中移除item,并且确认队列是否为空,如果为空,就执行 break
跳出 while
循环:
def run(self):
while True:
if self.queue.empty():
print("the queue is empty")
break
else:
time.sleep(2)
item = self.queue.get()
print('Process Consumer : item %d popped from by %s \n' % (item, self.name))
time.sleep(1)
了解更多¶
队列还有一个 JoinableQueue
子类,它有以下两个额外的方法:
task_done()
: 此方法意味着之前入队的一个任务已经完成,比如,get()
方法从队列取回item之后调用。所以此方法只能被队列的消费者调用。join()
: 此方法将进程阻塞,直到队列中的item全部被取出并执行。
( Microndgt 注:因为使用队列进行通信是一个单向的,不确定的过程,所以你不知道什么时候队列的元素被取出来了,所以使用task_done来表示队列里的一个任务已经完成。
这个方法一般和join一起使用,当队列的所有任务都处理之后,也就是说put到队列的每个任务都调用了task_done方法后,join才会完成阻塞。)
如何做…¶
下面是管道用法的一个简单示例。这里有一个进程管道从0到9发出数字,另一个进程接收数字并进行平方计算。
import multiprocessing
def create_items(pipe):
output_pipe, _ = pipe
for item in range(10):
output_pipe.send(item)
output_pipe.close()
def multiply_items(pipe_1, pipe_2):
close, input_pipe = pipe_1
close.close()
output_pipe, _ = pipe_2
try:
while True:
item = input_pipe.recv()
output_pipe.send(item * item)
except EOFError:
output_pipe.close()
if __name__== '__main__':
# 第一个进程管道发出数字
pipe_1 = multiprocessing.Pipe(True)
process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
process_pipe_1.start()
# 第二个进程管道接收数字并计算
pipe_2 = multiprocessing.Pipe(True)
process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
process_pipe_2.start()
pipe_1[0].close()
pipe_2[0].close()
try:
while True:
print(pipe_2[1].recv())
except EOFError:
print("End")
程序的输出如下:

讨论¶
Pipe()
函数返回一对通过双向管道连接起来的对象。在本例中, out_pipe
包含数字0-9,通过目标函数 create_items()
产生:
def create_items(pipe):
output_pipe, _ = pipe
for item in range(10):
output_pipe.send(item)
output_pipe.close()
在第二个进程中,我们有两个管道,输入管道和包含结果的输出管道:
process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
最后打印出结果:
try:
while True:
print(pipe_2[1].recv())
except EOFError:
print("End")
进程如何同步¶
多个进程可以协同工作来完成一项任务。通常需要共享数据。所以在多进程之间保持数据的一致性就很重要了。需要共享数据协同的进程必须以适当的策略来读写数据。相关的同步原语和线程的库很类似。
进程的同步原语如下:
- Lock: 这个对象可以有两种装填:锁住的(locked)和没锁住的(unlocked)。一个Lock对象有两个方法,
acquire()
和release()
,来控制共享数据的读写权限。 - Event: 实现了进程间的简单通讯,一个进程发事件的信号,另一个进程等待事件的信号。
Event
对象有两个方法,set()
和clear()
,来管理自己内部的变量。 - Condition: 此对象用来同步部分工作流程,在并行的进程中,有两个基本的方法:
wait()
用来等待进程,notify_all()
用来通知所有等待此条件的进程。 - Semaphore: 用来共享资源,例如,支持固定数量的共享连接。
- Rlock: 递归锁对象。其用途和方法同
Threading
模块一样。 - Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。
如何做…¶
下面的代码展示了如何使用 barrier()
函数来同步两个进程。我们有4个进程,进程1和进程2由barrier语句管理,进程3和进程4没有同步策略。
import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime
def test_with_barrier(synchronizer, serializer):
name = multiprocessing.current_process().name
synchronizer.wait()
now = time()
with serializer:
print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
def test_without_barrier():
name = multiprocessing.current_process().name
now = time()
print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
if __name__ == '__main__':
synchronizer = Barrier(2)
serializer = Lock()
Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
Process(name='p4 - test_without_barrier', target=test_without_barrier).start()
运行下面的代码,将看到进程1和进程2在同一时间打印:
$ python process_barrier.py
process p1 - test_with_barrier ----> 2015-05-09 11:11:33.291229
process p2 - test_with_barrier ----> 2015-05-09 11:11:33.291229
process p3 - test_without_barrier ----> 2015-05-09 11:11:33.310230
process p4 - test_without_barrier ----> 2015-05-09 11:11:33.333231
(译者注:译者在实际运行了10次,没有一次时间是相同的,感觉这个地方同一时间打印出来的影响因素很多。只能看到 with_barrier
的进程1和2比 without_barrier
的进程3和4时间差的小很多。)
讨论¶
在主程序中,我们创建了四个进程,然后我们需要一个锁和一个barrier来进程同步。barrier声明的第二个参数代表要管理的进程总数:
if __name__ == '__main__':
synchronizer = Barrier(2)
serializer = Lock()
Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer,serializer)).start()
Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
Process(name='p4 - test_without_barrier', target=test_without_barrier).start()
test_with_barrier
函数调用了barrier的 wait()
方法:
def test_with_barrier(synchronizer, serializer):
name = multiprocessing.current_process().name
synchronizer.wait()
当两个进程都调用 wait()
方法的时候,它们会一起继续执行:
now = time()
with serializer:
print("process %s ----> %s" % (name, datetime.fromtimestamp(now)))
下面这幅图表示了barrier如何同步两个进程:

如何在进程之间管理状态¶
Python的多进程模块提供了在所有的用户间管理共享信息的管理者(Manager)。一个管理者对象控制着持有Python对象的服务进程,并允许其它进程操作共享对象。
管理者有以下特性:
- 它控制着管理共享对象的服务进程
- 它确保当某一进程修改了共享对象之后,所有的进程拿到额共享对象都得到了更新
如何做…¶
下面来看一个再进程之间共享对象的例子:
首先,程序创建了一个管理者的字典,在
n
个taskWorkers
之间共享,每个worker更新字典的某一个index。所有的worker完成之后,新的列表打印到
stdout
:import multiprocessing def worker(dictionary, key, item): dictionary[key] = item print("key = %d value = %d" % (key, item)) if __name__ == '__main__': mgr = multiprocessing.Manager() dictionary = mgr.dict() jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10)] for j in jobs: j.start() for j in jobs: j.join() print('Results:', dictionary)
(译者注: 源代码中少了一行print,译者加上了,能得到和书中输出一样的结果)
运行结果如下:
$ python manager.py
key = 0 value = 0
key = 3 value = 6
key = 2 value = 4
key = 1 value = 2
key = 4 value = 8
key = 5 value = 10
key = 8 value = 16
key = 6 value = 12
key = 7 value = 14
key = 9 value = 18
Results: {0: 0, 3: 6, 2: 4, 1: 2, 4: 8, 5: 10, 8: 16, 6: 12, 7: 14, 9: 18}
讨论¶
我们在先声明了一个manager:
mgr = multiprocessing.Manager()
下面一行创建了 dictionary
类型的一个数据结构:
dictionary = mgr.dict()
然后,启动多进程:
jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10)]
for j in jobs:
j.start()
这里,目标函数 taskWorker
往字典中添加一个item:
def worker(dictionary, key, item):
dictionary[key] = item
最后,我们得到字典所有的值并打印出来:
for j in jobs:
j.join()
print('Results:', dictionary)
如何使用进程池¶
多进程库提供了 Pool
类来实现简单的多进程任务。 Pool
类有以下方法:
apply()
: 直到得到结果之前一直阻塞。apply_async()
: 这是apply()
方法的一个变体,返回的是一个result对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。map()
: 这是内置的map()
函数的并行版本。在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。map_async()
: 这是map()
方法的一个变体,返回一个result对象。如果指定了回调函数,回调函数应该是callable的,并且只接受一个参数。当result准备好时会自动调用回调函数(除非调用失败)。回调函数应该立即完成,否则,持有result的进程将被阻塞。
如何做…¶
下面的例子展示了如果通过进程池来执行一个并行应用。我们创建了有4个进程的进程池,然后使用 map()
方法进行一个简单的计算。
import multiprocessing
def function_square(data):
result = data*data
return result
if __name__ == '__main__':
inputs = list(range(100))
pool = multiprocessing.Pool(processes=4)
pool_outputs = pool.map(function_square, inputs)
pool.close()
pool.join()
print ('Pool :', pool_outputs)
计算的结果如下:
$ python poll.py
('Pool :', [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801])
如何做…¶
multiprocessing.Pool
方法在输入元素上应用 function_square
方法来执行简单的计算。并行的进程数量是4:
pool = multiprocessing.Pool(processes=4)
pool.map
方法将一些独立的任务提交给进程池:
pool_outputs = pool.map(function_square, inputs)
input
是一个从 0
到 100
的list:
inputs = list(range(100))
计算的结果存储在 pool_outputs
中。最后的结果打印出来:
print ('Pool :', pool_outputs)
需要注意的是, pool.map()
方法的结果和Python内置的 map()
结果是相同的,不同的是 pool.map()
是通过多个并行进程计算的。
使用Python的mpi4py模块¶
Python 提供了很多MPI模块写并行程序。其中 mpi4py
是一个又有意思的库。它在MPI-1/2顶层构建,提供了面向对象的接口,紧跟C++绑定的 MPI-2。MPI的C语言用户可以无需学习新的接口就可以上手这个库。所以,它成为了Python中最广泛使用的MPI库。
此模块包含的主要应用有:
- 点对点通讯
- 集体通讯
- 拓扑
准备工作¶
在Windows中安装 mpi4py
的过程如下(其他操作系统可以参考 http://mpi4py.scipy.org/docs/usrman/install.html ):
- 从MPI软件库( http://www.mpich.org/downloads/ )下载
mpich
。

- 右键图标,选择“以管理员身份运行”。
- 以管理员身份运行
msiexec /i mpich_installation_file.msi
安装MPICH2。 - 安装的时候,选择 “为所有用户安装” 。
- 运行
wmpiconfig
存储用户名密码,使用你的windows登录的用户名和密码。 - 添加
C:\Program Files\MPICH2\bin
到系统路径,无需重启。 - 使用
smpd- status
检查smpd
,应该返回smpd running on $hostname$
。 - 到
$MPICHROOT\examples
文件夹使用mpiexec -n 4 cpi
运行cpi.exe
检查运行环境。 - 从 https://pip.pypa.io/en/stable/installing.html 下载Python包管理工具
pip
。它会在你的Python环境生成一个pip.exe
。

在命令行安装
mpi4py
:C:> pip install mpi4py
如何做…¶
让我们通过打印“Hello world”来开始MPI之旅:
# hello.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
print("hello world from process ", rank)
通过下面的命令执行代码:
C:> mpiexec -n 5 python helloWorld_MPI.py
执行结果将得到如下的输出:
('hello world from process ', 1)
('hello world from process ', 0)
('hello world from process ', 2)
('hello world from process ', 3)
('hello world from process ', 4)
讨论¶
在MPI中,并行程序中不同进程用一个非负的整数来区别,叫做rank。如果我们有p个进程,那么rank会从 0
到 p-1
分配。MPI中拿到rank的函数如下:
rank = comm.Get_rank()
这个函数返回调用它的进程的rank。 comm
叫做交流者(Communicator),用于区别不同的进程集合:
comm = MPI.COMM_WORLD

了解更多¶
需要注意是,插图中的输出顺序并不是确定的,你不一定能得到插图的输出结果。多进程在同一时间启动,操作系统会决定运行的顺序。但是从中我们可以看出,MPI在每个进程中运行相同的二进制代码,每一个进程都执行相同的指令。
点对点通讯¶
MPI提供的最实用的一个特性是点对点通讯。两个不同的进程之间可以通过点对点通讯交换数据:一个进程是接收者,一个进程是发送者。
Python的 mpi4py
通过下面两个函数提供了点对点通讯功能:
Comm.Send(data, process_destination)
: 通过它在交流组中的排名来区分发送给不同进程的数据Comm.Recv(process_source)
: 接收来自源进程的数据,也是通过在交流组中的排名来区分的
Comm
变量表示交流者,定义了可以互相通讯的进程组:
comm = MKPI.COMM_WORLD
如何做…¶
下面的例子展示了如何使用 comm.send
和 comm.recv
指令在不同的进程之间交换信息。
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is : " , rank)
if rank == 0:
data = 10000000
destination_process = 4
comm.send(data,dest=destination_process)
print("sending data % s " % data + "to process % d" % destination_process)
if rank == 1:
destination_process = 8
data = "hello"
comm.send(data,dest=destination_process)
print("sending data % s :" % data + "to process % d" % destination_process)
if rank == 4:
data = comm.recv(source = 0)
print("data received is = % s" % data)
if rank == 8:
data1 = comm.recv(source = 1)
print("data1 received is = % s" % data1)
运行脚本的命令如下:
$ mpiexec -n 9 python pointToPointCommunication.py
得到的输出如下:
('my rank is : ', 5)
('my rank is : ', 1)
sending data hello :to process 8
('my rank is : ', 3)
('my rank is : ', 0)
sending data 10000000 to process 4
('my rank is : ', 2)
('my rank is : ', 7)
('my rank is : ', 4)
data received is = 10000000
('my rank is : ', 8)
data1 received is = hello
('my rank is : ', 6)
讨论¶
我们将最大进程数设置为9来运行程序。所以在交流者组 comm
中,我们可以有9个互相通讯的进程。:
comm = MPI.COMM_WORLD
同时,我们使用 rand
值来区分每个进程:
rank = comm.rand
我们有两个发送者进程和两个接受者进程。rank值为0的进程会发送数据给rank值为4的接受者:
if rank==0:
data= 10000000
destination_process = 4
comm.send(data,dest=destination_process)
同样的,我们可以必须指定rank值为4的进程为接收者。然后我们指定rank变量来调用 comm.recv
命令。
...
if rank == 4:
data = comm.recv(source=0)
对于另外一组发送者和接收者,我们指定rank为1的作为发送者,rank为8的作为接收者。与上一组只有一点不同,这一组发送的数据类型是String。
if rank==1:
destination_process = 8
data= "hello"
comm.send(data,dest=destination_process)
对于接收者来说,需要指定发送者的rank。
if rank==8:
data1=comm.recv(source=1)
下图展示了 mpi4py
的点对点通讯协议:

整个过程分为两部分,发送者发送数据,接收者接收数据,二者必须都指定发送方/接收方。
了解更多¶
comm.send()
和 comm.recv()
函数都是阻塞的函数。他们会一直阻塞调用者,知道数据使用完成。同时在MPI中,有两种方式发送和接收数据:
- buffer模式
- 同步模式
在buffer模式中,只要需要发送的数据被拷贝到buffer中,执行权就会交回到主程序,此时数据并非已经发送/接收完成。在同步模式中,只有函数真正的结束发送/接收任务之后才会返回。
避免死锁问题¶
我们经常需要面临的一个问题是死锁,这是两个或多个进程都在阻塞并等待对方释放自己想要的资源的情况。 mpi4py
没有提供特定的功能来解决这种情况,但是提供了一些程序员必须遵守的规则,来避免死锁问题。
如何做…¶
让我们先分析下面的Python代码,下面的代码中介绍了一种典型的死锁问题;我们有两个进程,一个 rank
等于1,另一个 rank
等于5,他们互相通讯,并且每一个都有发送和接收的函数。
from mpi4py import MPI
comm=MPI.COMM_WORLD
rank = comm.rank
print("my rank is : " , rank)
if rank==1:
data_send= "a"
destination_process = 5
source_process = 5
data_received=comm.recv(source=source_process)
comm.send(data_send,dest=destination_process)
print("sending data %s " %data_send + "to process %d" %destination_process)
print("data received is = %s" %data_received)
if rank==5:
data_send= "b"
destination_process = 1
source_process = 1
data_received=comm.recv(source=source_process)
comm.send(data_send,dest=destination_process)
print("sending data %s :" %data_send + "to process %d" %destination_process)
print("data received is = %s" %data_received)
讨论¶
如果我们尝试运行这个程序(只有两个进程足够了),会发现这两个进程都不会完成:
$ mpiexec -n 9 python deadLockProblems.py
('my rank is : ', 8)
('my rank is : ', 3)
('my rank is : ', 2)
('my rank is : ', 7)
('my rank is : ', 0)
('my rank is : ', 4)
('my rank is : ', 6)
此时连个进程都在等待对方,都被阻塞住了。会发生这种情况是因为MPI的 comm.recv()
函数和 comm.send()
函数都是阻塞的。它们的调用者都在等待它们完成。对 comm.send()
MPI来说,只有数据发出之后函数才会结束,对于 comm.recv()
函数来说,只有接收到数据函数才会结束。为了解决这个问题,我们可以将这连个函数这样写:
if rank==1:
data_send= "a"
destination_process = 5
source_process = 5
comm.send(data_send,dest=destination_process)
data_received=comm.recv(source=source_process)
if rank==5:
data_send= "b"
destination_process = 1
source_process = 1
data_received=comm.recv(source=source_process)
comm.send(data_send,dest=destination_process)
虽然这个解决方法从逻辑上纠正了,但是并不保证一定可以避免死锁问题。鉴于通讯是发生在buffer的, comm.send()
函数将要发送的数据完全拷贝到buffer里,只有buffer里有完整的数据之后程序才能继续运行。否则,依然会产生死锁:发送者不能发送,因为buffer已经提交但是接收到不能接收者不能接收数据,因为它被 comm.send()
阻塞住了。因此,我们可以交换一下发送者和接收者的顺序来解决这个问题。
if rank==1:
data_send= "a"
destination_process = 5
source_process = 5
comm.send(data_send,dest=destination_process)
data_received=comm.recv(source=source_process)
if rank==5:
data_send= "b"
destination_process = 1
source_process = 1
comm.send(data_send,dest=destination_process)
data_received=comm.recv(source=source_process)
最后,我们得到正确的输出如下:
$ mpiexec -n 9 python deadLockProblems.py
('my rank is : ', 7)
('my rank is : ', 0)
('my rank is : ', 8)
('my rank is : ', 1)
sending data a to process 5
data received is = b
('my rank is : ', 5)
sending data b :to process 1
data received is = a
('my rank is : ', 2)
('my rank is : ', 3)
('my rank is : ', 4)
('my rank is : ', 6)
了解更多¶
此并非是解决死锁问题的唯一方案。举个例子,有一个特定的函数统一了向一特定进程发消息和从一特定进程接收消息的功能,叫做 Sendrecv
Sendrecv(self, sendbuf, int dest=0, int sendtag=0, recvbuf=None, int source=0, int recvtag=0, Status status=None)
可以看到,这个函数的参数同 comm.send()
MPI 以及 comm.recv()
MPI 相同。同时在这个函数里,整个函数都是阻塞的,相比于交给子系统来负责检查发送者和接收者之间的依赖,可以避免死锁问题。用这个方案改写之前的例子如下:
if rank==1:
data_send= "a"
destination_process = 5
source_process = 5
data_received=comm.sendrecv(data_send,dest=destination_process,source =source_process)
if rank==5:
data_send= "b"
destination_process = 1
source_process = 1
data_received=comm.sendrecv(data_send,dest=destination_process, source=source_process)
集体通讯:使用broadcast通讯¶
在并行代码的开发中,我们会经常发现需要在多个进程间共享某个变量运行时的值,或操作多个进程提供的变量(可能具有不同的值)。
为了解决这个问题,使用了通讯数。举例说,如果进程0要发送信息给进程1和进程2,同时也会发送信息给进程3,4,5,6,即使这些进程并不需要这些信息。
另外,MPI库提供了在多个进程之间交换信息的方法,针对执行的机器做了优化。

将所有进程变成通讯者的这种方法叫做集体交流。因此,一个集体交流通常是2个以上的进程。我们也可以叫它广播——一个进程发送消息给其他的进程。 mpi4py
模块通过以下的方式提供广播的功能:
buf = comm.bcast(data_to_share, rank_of_root_process)
这个函数将root消息中包含的信息发送给属于 comm
通讯组其他的进程,每个进程必须通过相同的 root
和 comm
来调用它。
如何做…¶
下面来通过一个例子理解广播函数。我们有一个root进程, rank
等于0,保存自己的数据 variable_to_share
,以及其他定义在通讯组中的进程。
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
variable_to_share = 100
else:
variable_to_share = None
variable_to_share = comm.bcast(variable_to_share, root=0)
print("process = %d" %rank + " variable shared = %d " %variable_to_share)
和一个拥有10个进程的通讯组的执行输出结果如下:
C:\>mpiexec -n 10 python broadcast.py
process = 0 variable shared = 100
process = 8 variable shared = 100
process = 2 variable shared = 100
process = 3 variable shared = 100
process = 4 variable shared = 100
process = 5 variable shared = 100
process = 9 variable shared = 100
process = 6 variable shared = 100
process = 1 variable shared = 100
process = 7 variable shared = 100
讨论¶
rank
等于0的root进程初始化了一个变量, variable_to_share
,值为100.这个变量将通过通讯组发送给其他进程。:
if rank == 0:
variable_to_share = 100
为了发送消息,我们声明了一个广播:
variable_to_share = comm.bcast(variable_to_share, root=0)
这里,函数的变量是要发送的数据和发送者的进程。当我们执行代码的时候,在我们的例子中,我们有一个10个进程的通讯组, variable_to_share
变量将发送给组中的其他进程。最后 print
函数打印出来运行的进程和它们的变量:
print("process = %d" %rank + " variable shared = %d " %variable_to_share)
了解更多¶
集体通讯允许组中的多个进程同时进行数据交流。在 mpi4py
模块中,只提供了阻塞版本的集体通讯(阻塞调用者,直到缓存中的数据全部安全发送。)
广泛应用的集体通讯应该是:
- 组中的进程提供通讯的屏障
- 通讯方式包括:
- 将一个进程的数据广播到组中其他进程中
- 从其他进程收集数据发给一个进程
- 从一个进程散播数据散播到其他进程中
- 减少操作
集体通讯:使用scatter通讯¶
scatter函数和广播很像,但是有一个很大的不同, comm.bcast
将相同的数据发送给所有在监听的进程, comm.scatter
可以将数据放在数组中,发送给不同的进程。下图展示了scatter的功能:

comm.scatter
函数接收一个array,根据进程的rank将其中的元素发送给不同的进程。比如第一个元素将发送给进程0,第二个元素将发送给进程1,等等。 mpi4py
中的函数原型如下:
recvbuf = comm.scatter(sendbuf, rank_of_root_process)
如何做…¶
在下面的例子中,我们将观察数据是如何通过 scatter
发送给不同的进程的:
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
array_to_share = [1, 2, 3, 4 ,5 ,6 ,7, 8 ,9 ,10]
else:
array_to_share = None
recvbuf = comm.scatter(array_to_share, root=0)
print("process = %d" %rank + " recvbuf = %d " %recvbuf)
运行代码的输出如下:
C:\>mpiexec -n 10 python scatter.py
process = 0 variable shared = 1
process = 4 variable shared = 5
process = 6 variable shared = 7
process = 2 variable shared = 3
process = 5 variable shared = 6
process = 3 variable shared = 4
process = 7 variable shared = 8
process = 1 variable shared = 2
process = 8 variable shared = 9
process = 9 variable shared = 10
讨论¶
rank为0的进程将 array_to_share
的数据发送给其他进程:
array_to_share = [1, 2, 3, 4 ,5 ,6 ,7, 8 ,9 ,10]
recvbuf
参数表示第i个变量将会通过 comm.scatter
发送给第i个进程:
recvbuf = comm.scatter(array_to_share, root=0)
这里需要注意, comm.scatter
有一个限制,发送数据的列表中元素的个数必须和接收的进程数相等。举个例子,如果列表中的个数比进程数多,就会看到如下错误:
C:\> mpiexec -n 3 python scatter.py
Traceback (most recent call last):
File "scatter.py", line 13, in <module>
recvbuf = comm.scatter(array_to_share, root=0)
File "Comm.pyx", line 874, in mpi4py.MPI.Comm.scatter (c:\users\utente\appdata\local\temp\pip-build-h14iaj\mpi4py\src\mpi4py.MPI.c:73400)
File "pickled.pxi", line 658, in mpi4py.MPI.PyMPI_scatter (c:\users\utente\appdata\local\temp\pip-build-h14iaj\mpi4py\src\mpi4py.MPI.c:34035)
File "pickled.pxi", line 129, in mpi4py.MPI._p_Pickle.dumpv (c:\users\utente\appdata\local\temp\pip-build-h14iaj\mpi4py\src\mpi4py.MPI.c:28325)
ValueError: expecting 3 items, got 10 mpiexec aborting job...
job aborted:
rank: node: exit code[: error message]
0: Utente-PC: 123: mpiexec aborting job
1: Utente-PC: 123
2: Utente-PC: 123
了解更多¶
mpi4py
还提供了两个其他的函数来散布数据:
comm.scatter(sendbuf, recvbuf, root=0)
: 在communicator中从一个进程向其他的进程发送数据。comm.scatterv(sendbuf, recvbuf, root=0)
: 将数据从一个进程发送到组中的其他进程,在发送端提供不同数量的数据和偏移。
sendbuf
和 recvbuf
参数必须以list的形式给出(用于点对点的 comm.send
函数):
buf = [data, data_size, data_type]
data
必须是一个buffer-like的对象,size等于 data_size
,类型是 data_type
。
集体通讯:使用gather通讯¶
gather
函数基本上是反向的 scatter
,即手机所有进程发送向root进程的数据。 mpi4py
实现的 gather
函数如下:
recvbuf = comm.gather(sendbuf, rank_of_root_process)
这里, sendbuf
是要发送的数据, rank_of_root_process
代表要接收数据进程。

如何做…¶
在接下来的例子中,我们想实现上图表示的过程。每一个进程都构建自己的数据,发送给root进程(rank为0)。
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
data = (rank+1)**2
data = comm.gather(data, root=0)
if rank == 0:
print ("rank = %s " %rank + "...receiving data to other process")
for i in range(1, size):
data[i] = (i+1)**2
value = data[i]
print(" process %s receiving %s from process %s" % (rank , value , i))
最后,我们用5个进程来演示:
C:\>mpiexec -n 5 python gather.py
rank = 0 ...receiving data to other process
process 0 receiving 4 from process 1
process 0 receiving 9 from process 2
process 0 receiving 16 from process 3
process 0 receiving 25 from process 4
结果正如图中一样,root进程收到了其他四个进程的数据。
讨论¶
首先,我们有n个进程发送各自的数据:
data = (rank+1)**2
如果rank是0,就在array中收集数据:
if rank == 0:
print ("rank = %s " %rank + "...receiving data to other process")
for i in range(1, size):
data[i] = (i+1)**2
value = data[i]
print(" process %s receiving %s from process %s" % (rank , value , i))
数据由下面的函数产生:
data = (rank+1)**2
了解更多¶
mpi4py
提供了下面的函数收集数据:
- gathering to one task:
comm.Gather
,comm.Gatherv
, 和comm.gather
- gathering to all tasks:
comm.Allgather
,comm.Allgatherv
, 和comm.allgather
使用Alltoall通讯¶
Alltoall
集体通讯结合了 scatter
和 gather
的功能。在 mpi4py
中,有以下三种类型的 Alltoall
集体通讯。
comm.Alltoall(sendbuf, recvbuf)
:comm.Alltoallv(sendbuf, recvbuf)
:comm.Alltoallw(sendbuf, recvbuf)
:
如何做…¶
在下面的例子中,我们将看到 mpi4py
是如何实现 comm.Alltoall
的。我们定义了进程的通讯者组,进程可以在组中接收或发送数据,格式为数字数据的数组。
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
a_size = 1
senddata = (rank+1)*numpy.arange(size,dtype=int)
recvdata = numpy.empty(size*a_size,dtype=int)
comm.Alltoall(senddata,recvdata)
print(" process %s sending %s receiving %s" % (rank , senddata , recvdata))
运行代码,设定通讯者组的进程数为5,输出如下:
C:\>mpiexec -n 5 python alltoall.py
process 0 sending [0 1 2 3 4] receiving [0 0 0 0 0]
process 1 sending [0 2 4 6 8] receiving [1 2 3 4 5]
process 2 sending [0 3 6 9 12] receiving [2 4 6 8 10]
process 3 sending [0 4 8 12 16] receiving [3 6 9 12 15]
process 4 sending [0 5 10 15 20] receiving [4 8 12 16 20]
如何做…¶
comm.alltoall
方法将 task j
的中 sendbuf
的第i个对象拷贝到 task i
中 recvbuf
的第j个对象(接收者收到的对象和发送者一一对应,发送者发送的对象和接收者一一对应,译者注)。
下图可以表示这个发送过程。

从图中我们可以观察到:
- P0 包含数据 [0 1 2 3 4],它将 0 赋值给自己, 1 传给进程 P1 , 2 传给进程 P2 , 3 传给进程 P3 , 4 传给进程 P4 。
- 以此类推……
了解更多¶
All-to-all 定制通讯也叫做全部交换。这种操作经常用于各种并行算法中,比如快速傅里叶变换,矩阵变换,样本排序以及一些数据库的 Join 操作。
简化操作¶
同 comm.gather
一样, comm.reduce
接收一个数组,每一个元素是一个进程的输入,然后返回一个数组,每一个元素是进程的输出,返回给 root 进程。输出的元素中包含了简化的结果。
在 mpi4py
中,我们将简化操作定义如下:
comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)
这里需要注意的是,这里有个参数 op
和 comm.gather
不同,它代表你想应用在数据上的操作, mpi4py
模块代表定义了一系列的简化操作,其中一些如下:
MPI.MAX
: 返回最大的元素MPI.MIN
: 返回最小的元素MPI.SUM
: 对所有元素相加MPI.PROD
: 对所有元素相乘MPI.LAND
: 对所有元素进行逻辑操作MPI.MAXLOC
: 返回最大值,以及拥有它的进程MPI.MINLOC
: 返回最小值,以及拥有它的进程
如何做…¶
现在,我们用 MPI.SUM
实验一下对结果进行相加的操作。每一个进程维护一个大小为 3 的数组,我们用 numpy
来操作这些数组:
import numpy
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.size
rank = comm.rank
array_size = 3
recvdata = numpy.zeros(array_size, dtype=numpy.int)
senddata = (rank+1)*numpy.arange(size,dtype=numpy.int)
print("process %s sending %s " % (rank , senddata))
comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)
print('on task', rank, 'after Reduce: data = ', recvdata)
我们用通讯组进程数为 3 来运行,等于维护的数组的大小。输出的结果如下:
C:\>mpiexec -n 3 python reduction2.py
process 2 sending [0 3 6]
on task 2 after Reduce: data = [0 0 0]
process 1 sending [0 2 4]
on task 1 after Reduce: data = [0 0 0]
process 0 sending [0 1 2]
on task 0 after Reduce: data = [ 0 6 12]
讨论¶
为了演示相加简化操作,我们使用 comm.Reduce
语句,并将含有 recvbuf
的 rank 设置为 0, recvdata
代表了最后的计算结果:
comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)
我们的 op = MPI.SUM
选项,将在所有的列上面应用求和操作。下图表示了这个步骤:

操作的过程如下:
- 进程 P0 发出数据数组 [0 1 2]
- 进程 P1 发出数据数组 [0 2 4]
- 进程 P2 发出数据数组 [0 3 6]
简化操作将每个 task 的第 i 个元素相加,然后放回到 P0 进程的第 i 个元素中。在接收操作中, P0 收到数据 [0 6 12]。
(译注:“简化”翻译的可能不太合适)
如何优化通讯¶
拓扑是 MPI 提供的一个有趣功能。如前所述,所有通信功能(点对点或集体)都是指一组进程。我们一直使用包含所有进程的 MPI_COMM_WORLD
组。它为大小为n的通信组的每个进程分配 0 - n-1 的一个rank。但是,MPI允许我们为通信器分配虚拟拓扑。它为不同的进程定义了特定的标签分配。这种机制可以提高执行性能。实际上,如果构建虚拟拓扑,那么每个节点将只与其虚拟邻居进行通信,从而优化性能。
例如,如果排名是随机分配的,则消息可能会在到达目的地之前被迫传递给许多其他节点。除了性能问题之外,虚拟拓扑还可确保代码更清晰可读。 MPI提供两种建筑拓扑。第一个构造创建笛卡尔拓扑,而后者可以创建任何类型的拓扑。具体来说,在第二种情况下,我们必须提供要构建的图形的邻接矩阵。我们将只处理笛卡尔拓扑,通过它可以构建多种广泛使用的结构:网格,环形,环形等等。用于创建笛卡尔拓扑的函数如下所示:
comm.Create_cart((number_of_rows,number_of_columns))
这里, number_of_rows
和 number_of_columns
指定了栅格的行列数。
如何做…¶
在下面的例子中,我们将展示如何实现一个 M x N 的笛卡尔拓扑。同时,我们也定义了一系列坐标展示进程是如何操作的:
from mpi4py import MPI
import numpy as np
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3
neighbour_processes = [0,0,0,0]
if __name__ == "__main__":
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
grid_rows = int(np.floor(np.sqrt(comm.size)))
grid_column = comm.size // grid_rows
if grid_rows*grid_column > size:
grid_column -= 1
if grid_rows*grid_column > size:
grid_rows -= 1
if (rank == 0) :
print("Building a %d x %d grid topology:" % (grid_rows, grid_column) )
cartesian_communicator = comm.Create_cart( (grid_rows, grid_column), periods=(True, True), reorder=True)
my_mpi_row, my_mpi_col = cartesian_communicator.Get_coords( cartesian_communicator.rank )
neighbour_processes[UP], neighbour_processes[DOWN] = cartesian_communicator.Shift(0, 1)
neighbour_processes[LEFT], neighbour_processes[RIGHT] = cartesian_communicator.Shift(1, 1)
print ("Process = %s row = %s column = %s ----> neighbour_processes[UP] = %s neighbour_processes[DOWN] = %s neighbour_processes[LEFT] =%s neighbour_processes[RIGHT]=%s" % (
rank, my_mpi_row, my_mpi_col,neighbour_processes[UP],
neighbour_processes[DOWN], neighbour_processes[LEFT],
neighbour_processes[RIGHT]))
运行以上代码得到的结果如下:
C:\>mpiexec -n 4 python virtualTopology.py
Building a 2 x 2 grid topology:
Process = 0 row = 0 column = 0 ---->
neighbour_processes[UP] = -1
neighbour_processes[DOWN] = 2
neighbour_processes[LEFT] =-1
neighbour_processes[RIGHT]=1
Process = 1 row = 0 column = 1 ---->
neighbour_processes[UP] = -1
neighbour_processes[DOWN] = 3
neighbour_processes[LEFT] =0
neighbour_processes[RIGHT]=-1
Process = 2 row = 1 column = 0 ---->
neighbour_processes[UP] = 0
neighbour_processes[DOWN] = -1
neighbour_processes[LEFT] =-1
neighbour_processes[RIGHT]=3
Process = 3 row = 1 column = 1 ---->
neighbour_processes[UP] = 1
neighbour_processes[DOWN] = -1
neighbour_processes[LEFT] =2
neighbour_processes[RIGHT]=-1
对于每一个进程,输出结果都是:如果 neighbour_processes = -1
,那么没有临近的拓扑,否则, neighbour_processes
显示最近的进程。
讨论¶
最后的拓扑是 2x2 的网状结构,大小为4,和进程数一样:
grid_rows = int(np.floor(np.sqrt(comm.size)))
grid_column = comm.size // grid_rows
if grid_rows*grid_column > size:
grid_column -= 1
if grid_rows*grid_column > size:
grid_rows -= 1
然后,建立笛卡尔拓扑:
cartesian_communicator = comm.Create_cart( (grid_rows, grid_column), periods=(True, True), reorder=True)
通过 Get_coords()
方法,我们可以确定一个进程的坐标:
my_mpi_row, my_mpi_col = cartesian_communicator.Get_coords( cartesian_communicator.rank )
上面的拓扑可以用下图表示:

了解更多¶
如果要得到一个 M x N 的环形拓扑,我们需要如下代码:
cartesian_communicator = comm.Create_cart( (grid_rows, grid_column), periods=(True, True), reorder=True)
输入将如下所示:
C:\>mpiexec -n 4 python VirtualTopology.py
Building a 2 x 2 grid topology:
Process = 0 row = 0 column = 0 ---->
neighbour_processes[UP] = 2
neighbour_processes[DOWN] = 2
neighbour_processes[LEFT] =1
neighbour_processes[RIGHT]=1
Process = 1 row = 0 column = 1 ---->
neighbour_processes[UP] = 3
neighbour_processes[DOWN] = 3
neighbour_processes[LEFT] =0
neighbour_processes[RIGHT]=0
Process = 2 row = 1 column = 0 ---->
neighbour_processes[UP] = 0
neighbour_processes[DOWN] = 0
neighbour_processes[LEFT] =3 neighbour_processes[RIGHT]=3
Process = 3 row = 1 column = 1 ---->
neighbour_processes[UP] = 1
neighbour_processes[DOWN] = 1
neighbour_processes[LEFT] =2
neighbour_processes[RIGHT]=2
拓扑图形如下所示:

第四章 异步编程¶
介绍¶
除了顺序执行和并行执行的模型之外,还有第三种模型,叫做异步模型,这是事件驱动模型的基础。异步活动的执行模型可以只有一个单一的主控制流,能在单核心系统和多核心系统中运行。
在并发执行的异步模型中,许多任务被穿插在同一时间线上,所有的任务都由一个控制流执行(单一线程)。任务的执行可能被暂停或恢复,中间的这段时间线程将会去执行其他任务。下面的这幅图可以清楚地表达这个概念。

如上图所示,任务(不同的颜色表示不同的任务)可能被其他任务插入,但是都处在同一个线程下。这表明,当某一个任务执行的时候,其他的任务都暂停了。与多线程编程模型很大的一点不同是, 多线程由操作系统决定在时间线上什么时候挂起某个活动或恢复某个活动,而在异步并发模型中,程序员必须假设线程可能在任何时间被挂起和替换。
程序员可以将任务编写成许多可以间隔执行的小步骤, 这样的话如果一个任务需要另一个任务的输出,那么被依赖的任务必须接收它的输入。
使用Python的 concurrent.futures
模块¶
Python3.2带来了 concurrent.futures
模块,这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。
此模块由以下部分组成:
concurrent.futures.Executor
: 这是一个虚拟基类,提供了异步执行的方法。submit(function, argument)
: 调度函数(可调用的对象)的执行,将argument
作为参数传入。map(function, argument)
: 将argument
作为参数执行函数,以 异步 的方式。shutdown(Wait=True)
: 发出让执行者释放所有资源的信号。concurrent.futures.Future
: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。
Executor是抽象类,可以通过子类访问,即线程或进程的 ExecutorPools
。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的launcher或executor。
使用线程池和进程池¶
线程池或进程池是用于在程序中优化和简化线程/进程的使用。通过池,你可以提交任务给executor。池由两部分组成,一部分是内部的队列,存放着待执行的任务;另一部分是一系列的进程或线程,用于执行这些任务。池的概念主要目的是为了重用:让线程或进程在生命周期内可以多次使用。它减少了创建创建线程和进程的开销,提高了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要原因。

准备工作¶
current.Futures
模块提供了两种 Executor
的子类,各自独立操作一个线程池和一个进程池。这两个子类分别是:
concurrent.futures.ThreadPoolExecutor(max_workers)
concurrent.futures.ProcessPoolExecutor(max_workers)
max_workers
参数表示最多有多少个worker并行执行任务。
如何做…¶
下面的示例代码展示了线程池和进程池的功能。这里的任务是,给一个list number_list
,包含1到10。对list中的每一个数字,乘以1+2+3…+10000000的和(这个任务只是为了消耗时间)。
下面的代码分别测试了:
- 顺序执行
- 通过有5个worker的线程池执行
- 通过有5个worker的进程池执行
(译者注:原文的代码是错误的,这里贴出的代码以及运行结果是修改后的,详见: 关于第四章第2节书中程序的疑问 #16 ,感谢 @Microndgt 提出) 代码如下::
import concurrent.futures
import time
number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def evaluate_item(x):
# 计算总和,这里只是为了消耗时间
result_item = count(x)
# 打印输入和输出结果
return result_item
def count(number) :
for i in range(0, 10000000):
i=i+1
return i * number
if __name__ == "__main__":
# 顺序执行
start_time = time.time()
for item in number_list:
print(evaluate_item(item))
print("Sequential execution in " + str(time.time() - start_time), "seconds")
# 线程池执行
start_time_1 = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print ("Thread pool execution in " + str(time.time() - start_time_1), "seconds")
# 进程池
start_time_2 = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(evaluate_item, item) for item in number_list]
for future in concurrent.futures.as_completed(futures):
print(future.result())
print ("Process pool execution in " + str(time.time() - start_time_2), "seconds")
运行这个代码,我们可以看到运行时间的输出::
$ python3 pool.py
10000000
20000000
30000000
40000000
50000000
60000000
70000000
80000000
90000000
100000000
Sequential execution in 7.936585903167725 seconds
10000000
30000000
40000000
20000000
50000000
70000000
90000000
100000000
80000000
60000000
Thread pool execution in 7.633088827133179 seconds
40000000
50000000
10000000
30000000
20000000
70000000
90000000
60000000
80000000
100000000
Process pool execution in 4.787093639373779 seconds
讨论¶
我们创建了一个list存放10个数字,然后使用一个循环计算从1加到10000000,打印出和与 number_list
的乘积。:
def evaluate_item(x):
# 计算总和,这里只是为了消耗时间
result_item = count(x)
# 打印输入和输出结果
print ("item " + str(x) + " result " + str(result_item))
def count(number) :
for i in range(0, 10000000):
i=i+1
return i * number
在主要程序中,我们先使用顺序执行跑了一次程序::
if __name__ == "__main__":
# 顺序执行
start_time = time.clock()
for item in number_list:
evaluate_item(item)
print("Sequential execution in " + str(time.clock() - start_time), "seconds")
然后,我们使用了 futures.ThreadPoolExecutor
模块的线程池跑了一次::
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for item in number_list:
executor.submit(evaluate_item, item)
print ("Thread pool execution in " + str(time.clock() - start_time_1), "seconds")
ThreadPoolExecutor
使用线程池中的一个线程执行给定的任务。池中一共有5个线程,每一个线程从池中取得一个任务然后执行它。当任务执行完成,再从池中拿到另一个任务。
当所有的任务执行完成后,打印出执行用的时间::
print ("Thread pool execution in " + str(time.clock() - start_time_1), "seconds")
最后,我们又用 ProcessPoolExecutor
跑了一次程序::
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
for item in number_list:
executor.submit(evaluate_item, item)
如同 ThreadPoolExecutor
一样, ProcessPoolExecutor
是一个executor,使用一个线程池来并行执行任务。然而,和 ThreadPoolExecutor
不同的是, ProcessPoolExecutor
使用了多核处理的模块,让我们可以不受GIL的限制,大大缩短执行时间。
了解更多¶
几乎所有需要处理多个客户端请求的服务应用都会使用池。然而,也有一些应用要求任务需要立即执行,或者要求对任务的线程有更多的控制权,这种情况下,池不是一个最佳选择。
使用Asyncio管理事件循环¶
Python的Asyncio模块提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语。此模块的主要组件和概念包括:
- 事件循环: 在Asyncio模块中,每一个进程都有一个事件循环。
- 协程: 这是子程序的泛化概念。协程可以在执行期间暂停,这样就可以等待外部的处理(例如IO)完成之后,从之前暂停的地方恢复执行。
- Futures: 定义了
Future
对象,和concurrent.futures
模块一样,表示尚未完成的计算。 - Tasks: 这是Asyncio的子类,用于封装和管理并行模式下的协程。
本节中重点讨论事件,事实上,异步编程的上下文中,事件无比重要。因为事件的本质就是异步。
什么是事件循环¶
在计算系统中,可以产生事件的实体叫做事件源,能处理事件的实体叫做事件处理者。此外,还有一些第三方实体叫做事件循环。它的作用是管理所有的事件,在整个程序运行过程中不断循环执行,追踪事件发生的顺序将它们放到队列中,当主线程空闲的时候,调用相应的事件处理者处理事件。最后,我们可以通过下面的伪代码来理解事件循环::
while (1) {
events = getEvents();
for (e in events)
processEvent(e);
}
所有的事件都在 while
循环中捕捉,然后经过事件处理者处理。事件处理的部分是系统唯一活跃的部分,当一个事件处理完成,流程继续处理下一个事件。
准备工作¶
Asyncio提供了一下方法来管理事件循环:
loop = get_event_loop()
: 得到当前上下文的事件循环。loop.call_later(time_delay, callback, argument)
: 延后time_delay
秒再执行callback
方法。loop.call_soon(callback, argument)
: 尽可能快调用callback
,call_soon()
函数结束,主线程回到事件循环之后就会马上调用callback
。loop.time()
: 以float类型返回当前时间循环的内部时间。asyncio.set_event_loop()
: 为当前上下文设置事件循环。asyncio.new_event_loop()
: 根据此策略创建一个新的时间循环并返回。loop.run_forever()
: 在调用stop()
之前将一直运行。
如何做…¶
下面的代码中,我们将展示如何使用Asyncio库提供的时间循环创建异步模式的应用。
import asyncio
import datetime
import time
def function_1(end_time, loop):
print ("function_1 called")
if (loop.time() + 1.0) < end_time:
loop.call_later(1, function_2, end_time, loop)
else:
loop.stop()
def function_2(end_time, loop):
print ("function_2 called ")
if (loop.time() + 1.0) < end_time:
loop.call_later(1, function_3, end_time, loop)
else:
loop.stop()
def function_3(end_time, loop):
print ("function_3 called")
if (loop.time() + 1.0) < end_time:
loop.call_later(1, function_1, end_time, loop)
else:
loop.stop()
def function_4(end_time, loop):
print ("function_5 called")
if (loop.time() + 1.0) < end_time:
loop.call_later(1, function_4, end_time, loop)
else:
loop.stop()
loop = asyncio.get_event_loop()
end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)
# loop.call_soon(function_4, end_loop, loop)
loop.run_forever()
loop.close()
运行结果如下::
python3 event.py
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called
讨论¶
在这个例子中,我们定义了三个异步的任务,相继执行,入下图所示的顺序。

首先,我们要得到这个事件循环::
loop = asyncio.get_event_loop()
然后我们通过 call_soon
方法调用了 function_1()
函数。
end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)
让我们来看一下 function_1()
的定义::
def function_1(end_time, loop):
print ("function_1 called")
if (loop.time() + 1.0) < end_time:
loop.call_later(1, function_2, end_time, loop)
else:
loop.stop()
这个函数通过以下参数定义了应用的异步行为:
end_time
: 定义了function_1()
可以运行的最长时间,并通过call_later
方法传入到function_2()
中作为参数loop
: 之前通过get_event_loop()
方法得到的事件循环
function_1()
的任务非常简单,只是打印出函数名字。当然,里面也可以写非常复杂的操作。
print ("function_1 called")
任务执行结束之后,它将会比较 loop.time()
+1s和设定的运行时间,如果没有超过,使用 call_later
在1秒之后执行 function_2()
。
if (loop.time() + 1.0) < end_time:
loop.call_later(1, function_2, end_time, loop)
else:
loop.stop()
function_2()
和 function_3()
的作用类似。
如果运行的时间超过了设定,时间循环终止。
loop.run_forever()
loop.close()
使用Asyncio管理协程¶
在上文提到的例子中,我们看到当一个程序变得很大而且复杂时,将其划分为子程序,每一部分实现特定的任务是个不错的方案。子程序不能单独执行,只能在主程序的请求下执行,主程序负责协调使用各个子程序。协程就是子程序的泛化。和子程序一样的事,协程只负责计算任务的一步;和子程序不一样的是,协程没有主程序来进行调度。这是因为协程通过管道连接在一起,没有监视函数负责顺序调用它们。在协程中,执行点可以被挂起,可以被从之前挂起的点恢复执行。通过协程池就可以插入到计算中:运行第一个任务,直到它返回(yield)执行权,然后运行下一个,这样顺着执行下去。
这种插入的控制组件就是前文介绍的事件循环。它持续追踪所有的协程并执行它们。
协程的另外一些重要特性如下:
- 协程可以有多个入口点,并可以yield多次
- 协程可以将执行权交给其他协程
yield表示协程在此暂停,并且将执行权交给其他协程。因为协程可以将值与控制权一起传递给另一个协程,所以“yield一个值”就表示将值传给下一个执行的协程。
准备工作¶
使用Asyncio定义协程非常简单,只需要一个装饰器即可:
import asyncio
@asyncio.coroutine
def coroutine_function(function_arguments):
# DO_SOMETHING
如何做…¶
在这个例子中,我们将看到如何使用Asyncio的协程来模拟有限状态机。有限状态机(finite state machine or automaton, FSA)是一个数学模型,不仅在工程领域应用广泛,在科学领域也很著名,例如数学和计算机科学等。我们要模拟的状态机如下图所示:

在上图中,可以看到我们的系统有 S1, S2, S3, S4 四个状态, 0 和 1 是状态机可以从一个状态到另一个状态的值(这个过程叫做转换)。例如在本实验中,只有当只为1的时候, S0 可以转换到 S1 ,当只为0的时候, S0 可以转换到 S2 .Python代码如下,状态模拟从 S0 开始,叫做 初始状态 ,最后到 S4 ,叫做 结束状态 。
# Asyncio Finite State Machine
import asyncio
import time
from random import randint
@asyncio.coroutine
def StartState():
print("Start State called \n")
input_value = randint(0, 1)
time.sleep(1)
if (input_value == 0):
result = yield from State2(input_value)
else:
result = yield from State1(input_value)
print("Resume of the Transition : \nStart State calling " + result)
@asyncio.coroutine
def State1(transition_value):
outputValue = str("State 1 with transition value = %s \n" % transition_value)
input_value = randint(0, 1)
time.sleep(1)
print("...Evaluating...")
if input_value == 0:
result = yield from State3(input_value)
else :
result = yield from State2(input_value)
result = "State 1 calling " + result
return outputValue + str(result)
@asyncio.coroutine
def State2(transition_value):
outputValue = str("State 2 with transition value = %s \n" % transition_value)
input_value = randint(0, 1)
time.sleep(1)
print("...Evaluating...")
if (input_value == 0):
result = yield from State1(input_value)
else :
result = yield from State3(input_value)
result = "State 2 calling " + result
return outputValue + str(result)
@asyncio.coroutine
def State3(transition_value):
outputValue = str("State 3 with transition value = %s \n" % transition_value)
input_value = randint(0, 1)
time.sleep(1)
print("...Evaluating...")
if (input_value == 0):
result = yield from State1(input_value)
else :
result = yield from EndState(input_value)
result = "State 3 calling " + result
return outputValue + str(result)
@asyncio.coroutine
def EndState(transition_value):
outputValue = str("End State with transition value = %s \n" % transition_value)
print("...Stop Computation...")
return outputValue
if __name__ == "__main__":
print("Finite State Machine simulation with Asyncio Coroutine")
loop = asyncio.get_event_loop()
loop.run_until_complete(StartState())
运行代码,我们可以看到类似以下输出(译注,运行结果随机,这里为译者运行的三次结果).
$ python3 coroutines.py
Finite State Machine simulation with Asyncio Coroutine
Start State called
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Stop Computation...
Resume of the Transition :
Start State calling State 2 with transition value = 0
State 2 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 3 with transition value = 1
State 3 calling End State with transition value = 1
$ python3 coroutines.py
Finite State Machine simulation with Asyncio Coroutine
Start State called
...Evaluating...
...Evaluating...
...Stop Computation...
Resume of the Transition :
Start State calling State 2 with transition value = 0
State 2 calling State 3 with transition value = 1
State 3 calling End State with transition value = 1
$ python3 coroutines.py
Finite State Machine simulation with Asyncio Coroutine
Start State called
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Evaluating...
...Stop Computation...
Resume of the Transition :
Start State calling State 1 with transition value = 1
State 1 calling State 2 with transition value = 1
State 2 calling State 1 with transition value = 0
State 1 calling State 3 with transition value = 0
State 3 calling State 1 with transition value = 0
State 1 calling State 2 with transition value = 1
State 2 calling State 3 with transition value = 1
State 3 calling End State with transition value = 1
讨论¶
每一个状态都由装饰器装饰:
@asyncio.coroutine
例如, S0 的定义如下所示:
@asyncio.coroutine
def StartState():
print("Start State called \n")
input_value = randint(0, 1)
time.sleep(1)
if (input_value == 0):
result = yield from State2(input_value)
else:
result = yield from State1(input_value)
print("Resume of the Transition : \nStart State calling " + result)
通过 random
模块的 randint(0, 1)
函数生成了 input_value
的值,决定了下一个转换状态。此函数随机生成1或0:
input_value = randint(0, 1)
得到 input_value
的值之后,通过 yield from
命令调用下一个协程。
if (input_value == 0):
result = yield from State2(input_value)
else:
result = yield from State1(input_value)
result
是下一个协程返回的string,这样我们在计算的最后就可以重新构造出计算过程。
启动事件循环的代码如下:
if __name__ == "__main__":
print("Finite State Machine simulation with Asyncio Coroutine")
loop = asyncio.get_event_loop()
loop.run_until_complete(StartState())
使用Asyncio控制任务¶
Asyncio是用来处理事件循环中的异步进程和并发任务执行的。它还提供了 asyncio.Task()
类,可以在任务中使用协程。它的作用是,在同一事件循环中,运行某一个任务的同时可以并发地运行多个任务。当协程被包在任务中,它会自动将任务和事件循环连接起来,当事件循环启动的时候,任务自动运行。这样就提供了一个可以自动驱动协程的机制。
准备工作¶
Asyncio模块为我们提供了 asyncio.Task(coroutine)
方法来处理计算任务,它可以调度协程的执行。任务对协程对象在事件循环的执行负责。如果被包裹的协程要从future yield,那么任务会被挂起,等待future的计算结果。
当future计算完成,被包裹的协程将会拿到future返回的结果或异常(exception)继续执行。另外,需要注意的是,事件循环一次只能运行一个任务,除非还有其它事件循环在不同的线程并行运行,此任务才有可能和其他任务并行。当一个任务在等待future执行的期间,事件循环会运行一个新的任务。
"""
Asyncio using Asyncio.Task to execute three math function in parallel
"""
import asyncio
@asyncio.coroutine
def factorial(number):
f = 1
for i in range(2, number + 1):
print("Asyncio.Task: Compute factorial(%s)" % (i))
yield from asyncio.sleep(1)
f *= i
print("Asyncio.Task - factorial(%s) = %s" % (number, f))
@asyncio.coroutine
def fibonacci(number):
a, b = 0, 1
for i in range(number):
print("Asyncio.Task: Compute fibonacci (%s)" % (i))
yield from asyncio.sleep(1)
a, b = b, a + b
print("Asyncio.Task - fibonacci(%s) = %s" % (number, a))
@asyncio.coroutine
def binomialCoeff(n, k):
result = 1
for i in range(1, k+1):
result = result * (n-i+1) / i
print("Asyncio.Task: Compute binomialCoeff (%s)" % (i))
yield from asyncio.sleep(1)
print("Asyncio.Task - binomialCoeff(%s , %s) = %s" % (n, k, result))
if __name__ == "__main__":
tasks = [asyncio.Task(factorial(10)),
asyncio.Task(fibonacci(10)),
asyncio.Task(binomialCoeff(20, 10))]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
如何做…¶
在下面的代码中,我们展示了三个可以被 Asyncio.Task()
并发执行的数学函数。
运行的结果如下:
python3 task.py
Asyncio.Task: Compute factorial(2)
Asyncio.Task: Compute fibonacci (0)
Asyncio.Task: Compute binomialCoeff (1)
Asyncio.Task: Compute factorial(3)
Asyncio.Task: Compute fibonacci (1)
Asyncio.Task: Compute binomialCoeff (2)
Asyncio.Task: Compute factorial(4)
Asyncio.Task: Compute fibonacci (2)
Asyncio.Task: Compute binomialCoeff (3)
Asyncio.Task: Compute factorial(5)
Asyncio.Task: Compute fibonacci (3)
Asyncio.Task: Compute binomialCoeff (4)
Asyncio.Task: Compute factorial(6)
Asyncio.Task: Compute fibonacci (4)
Asyncio.Task: Compute binomialCoeff (5)
Asyncio.Task: Compute factorial(7)
Asyncio.Task: Compute fibonacci (5)
Asyncio.Task: Compute binomialCoeff (6)
Asyncio.Task: Compute factorial(8)
Asyncio.Task: Compute fibonacci (6)
Asyncio.Task: Compute binomialCoeff (7)
Asyncio.Task: Compute factorial(9)
Asyncio.Task: Compute fibonacci (7)
Asyncio.Task: Compute binomialCoeff (8)
Asyncio.Task: Compute factorial(10)
Asyncio.Task: Compute fibonacci (8)
Asyncio.Task: Compute binomialCoeff (9)
Asyncio.Task - factorial(10) = 3628800
Asyncio.Task: Compute fibonacci (9)
Asyncio.Task: Compute binomialCoeff (10)
Asyncio.Task - fibonacci(10) = 55
Asyncio.Task - binomialCoeff(20 , 10) = 184756.0
讨论¶
在这个例子中,我们定义了三个协程, factorial
, fibonacci
和 binomialCoeff
,每一个都带有 asyncio.coroutine
装饰器:
@asyncio.coroutine
def factorial(number):
do Something
@asyncio.coroutine
def fibonacci(number):
do Something
@asyncio.coroutine
def binomialCoeff(n, k):
do Something
为了能并行执行这三个任务,我们将其放到一个task的list中:
if __name__ == "__main__":
tasks = [asyncio.Task(factorial(10)),
asyncio.Task(fibonacci(10)),
asyncio.Task(binomialCoeff(20, 10))]
得到事件循环:
loop = asyncio.get_event_loop()
然后运行任务:
loop.run_until_complete(asyncio.wait(tasks))
这里, asyncio.wait(tasks)
表示运行直到所有给定的协程都完成。
最后,关闭事件循环:
loop.close()
使用Asyncio和Futures¶
Asyncio 模块的另一个重要的组件是 Future
类。它和 concurrent.futures.Futures
很像,但是针对Asyncio的事件循环做了很多定制。 asyncio.Futures
类代表还未完成的结果(有可能是一个Exception)。所以综合来说,它是一种抽象,代表还没有做完的事情。
实际上,必须处理一些结果的回调函数被加入到了这个类的实例中。
准备工作¶
要操作Asyncio中的 Future
,必须进行以下声明:
import asyncio
future = asyncio.Future()
基本的方法有:
cancel()
: 取消future的执行,调度回调函数result()
: 返回future代表的结果exception()
: 返回future中的Exceptionadd_done_callback(fn)
: 添加一个回调函数,当future执行的时候会调用这个回调函数remove_done_callback(fn)
: 从“call whten done”列表中移除所有callback的实例set_result(result)
: 将future标为执行完成,并且设置result的值set_exception(exception)
: 将future标为执行完成,并设置Exception
如何做…¶
下面的例子展示了 Future
类是如何管理两个协程的,第一个协程 first_coroutine
计算前n个数的和, 第二个协程 second_coroutine
计算n的阶乘,代码如下:
# -*- coding: utf-8 -*-
"""
Asyncio.Futures - Chapter 4 Asynchronous Programming
"""
import asyncio
import sys
@asyncio.coroutine
def first_coroutine(future, N):
"""前n个数的和"""
count = 0
for i in range(1, N + 1):
count = count + i
yield from asyncio.sleep(3)
future.set_result("first coroutine (sum of N integers) result = " + str(count))
@asyncio.coroutine
def second_coroutine(future, N):
count = 1
for i in range(2, N + 1):
count *= i
yield from asyncio.sleep(4)
future.set_result("second coroutine (factorial) result = " + str(count))
def got_result(future):
print(future.result())
if __name__ == "__main__":
N1 = int(sys.argv[1])
N2 = int(sys.argv[2])
loop = asyncio.get_event_loop()
future1 = asyncio.Future()
future2 = asyncio.Future()
tasks = [
first_coroutine(future1, N1),
second_coroutine(future2, N2)]
future1.add_done_callback(got_result)
future2.add_done_callback(got_result)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
输出如下:
$ python asy.py 1 1
first coroutine (sum of N integers) result = 1
second coroutine (factorial) result = 1
$ python asy.py 2 2
first coroutine (sum of N integers) result = 3
second coroutine (factorial) result = 2
$ python asy.py 3 3
first coroutine (sum of N integers) result = 6
second coroutine (factorial) result = 6
$ python asy.py 4 4
first coroutine (sum of N integers) result = 10
second coroutine (factorial) result = 24
讨论¶
在主程序中,我们通过定义future对象和协程联系在一起:
if __name__ == "__main__":
...
future1 = asyncio.Future()
future2 = asyncio.Future()
定义tasks的时候,将future对象作为变量传入协程中:
tasks = [
first_coroutine(future1, N1),
second_coroutine(future2, N2)]
最后,添加一个 future
执行时的回调函数:
def got_result(future):
print(future.result())
在我们传入future的协程中,在计算之后我们分别添加了3s、4s的睡眠时间:
yield from asyncio.sleep(4)
然后,我们将future标为完成,通过 future.set_result()
设置结果。
了解更多¶
交换两个协程睡眠的时间,协程2会比1更早得到结果:
$ python asy.py 3 3
second coroutine (factorial) result = 6
first coroutine (sum of N integers) result = 6
$ python asy.py 4 4
second coroutine (factorial) result = 24
first coroutine (sum of N integers) result = 10
第五章 分布式Python编程¶
介绍¶
分布式计算基本的思想是将一个大任务分散成几个小任务,交给分布式网络中的计算机去完成。在分布式计算的环境中,必须保证网络中的计算机的可用性(避免网络延迟、非预知的崩溃等)。所以就需要可以可持续的监控架构。
采用这种技术产生的根本问题是对各种类型的流量(数据,任务,命令等)进行适当的分配。此外,还有一个分布式系统基础特征产生的问题:网络由不同操作系统的计算机组成,很多互不兼容。实际上,随着时间的推移,为了使用分布式环境中的不同资源,已经可以识别不同的计算模型。他们的目标是提供一个框架,为一个分布式应用的不同处理器之间描述合作方法。
我们可以说,基本上各种模型的不同点是分布式系统能提供的容量的大小。最常用的模型是服务器/客户端模型。它可以让不同的进程运行在不同的计算机上,通过交换消息进行实时的合作。相对于前一个模型有很大的提高,前一个模型要求将所有的文件分发到所有的机器上,从而进行离线的计算。客户端/服务器模型典型的实现是通过远程的程序调用(这是本地调用的扩展),或者通过分布式对象的程序(面向对象的中间件)。本章将介绍一些Python提供的类似的计算架构。然后我们会介绍一些用OO的方法或远程调用实现了分布式架构的库,例如 Celery,SCOOP, Pyro4 和 RPyC,也有一些使用了不同方法的库,例如 PyCSP 和Disco,Python版本的 MapReduce 算法。
使用Celery实现分布式任务¶
Celery 是一个 Python 框架,用来管理分布式任务的,遵循面向对象的中间件方法。它的主要 feature 是可以将许多小任务分布到一个大型的计算集群中,最后将任务的结果收集起来,组成整体的解决方案。
要使用 Celery,我们需要下面几种组件:
- Celery 模块(废话!!)
- 一个消息代理(Message Broker)。这是一个独立于 Celery 的一个中间件,用来和分布式的 worker 收发消息。它会处理在通讯网络中的信息交换。这种中间件的消息发送方案不再是点对点的,而是面向消息的方式,最终名的是发布者/订阅者的范式。

Celery 支持很多消息代理,最有名的是 RebbitMQ 和 Redis。
如何做…¶
安装 Celery,我们需要 pip 包管理器。在命令行通过以下命令安装:
pip install celery
之后,我们必须安装消息代理。在本书中,我们使用 RabbitMQ 来做消息代理,这是一个面向消息的中间件(也叫做代理人消息),它实现了 高级消息队列协议(AMQP) 。RabbitMQ 服务端使用 Erlang 写的,基于 Open Telecom Platform(OPT) 框架来管理集群和故障转移。安装 RabbitMQ 的步骤如下:
- 下载安装 Erlang:http://www.erlang.org/download.html
- 下载并运行 RabbitMQ 安装器:http://www.rabbitmq.com/download.html
下载完成后,安装好 RabbitMQ 然后用默认配置运行即可。
最后,我们会安装 Flower( http://flower.readthedocs.org/ ),这是一个基于 web 的任务监控和管理工具(进行中的任务,任务细节和图表、状态等)。安装方法和 Celery 一样,在命令行输入:
pip install -U flower
然后,我们可以验证一下 Celery 的安装,在命令行输入:
celery --version
会出现(译者使用的版本比原书高):
4.2.1 (windowlicker)
Celery 的用法非常简单,和显示的一样:
Usage: celery <command> [options]
显示的配置如下:
usage: celery <command> [options]
Show help screen and exit.
positional arguments:
args
optional arguments:
-h, --help show this help message and exit
--version show program's version number and exit
Global Options:
-A APP, --app APP
-b BROKER, --broker BROKER
--result-backend RESULT_BACKEND
--loader LOADER
--config CONFIG
--workdir WORKDIR
--no-color, -C
--quiet, -q
---- -- - - ---- Commands- -------------- --- ------------
+ Main:
| celery worker
| celery events
| celery beat
| celery shell
| celery multi
| celery amqp
+ Remote Control:
| celery status
| celery inspect --help
| celery inspect active
| celery inspect active_queues
| celery inspect clock
| celery inspect conf [include_defaults=False]
| celery inspect memdump [n_samples=10]
| celery inspect memsample
| celery inspect objgraph [object_type=Request] [num=200 [max_depth=10]]
| celery inspect ping
| celery inspect query_task [id1 [id2 [... [idN]]]]
| celery inspect registered [attr1 [attr2 [... [attrN]]]]
| celery inspect report
| celery inspect reserved
| celery inspect revoked
| celery inspect scheduled
| celery inspect stats
| celery control --help
| celery control add_consumer <queue> [exchange [type [routing_key]]]
| celery control autoscale [max [min]]
| celery control cancel_consumer <queue>
| celery control disable_events
| celery control election
| celery control enable_events
| celery control heartbeat
| celery control pool_grow [N=1]
| celery control pool_restart
| celery control pool_shrink [N=1]
| celery control rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>
| celery control revoke [id1 [id2 [... [idN]]]]
| celery control shutdown
| celery control terminate <signal> [id1 [id2 [... [idN]]]]
| celery control time_limit <task_name> <soft_secs> [hard_secs]
+ Utils:
| celery purge
| celery list
| celery call
| celery result
| celery migrate
| celery graph
| celery upgrade
+ Debugging:
| celery report
| celery logtool
---- -- - - --------- -- - -------------- --- ------------
Type 'celery <command> --help' for help using a specific command.
了解更多¶
有关 Celery 的更多细节,可以访问官方主页: http://www.celeryproject.org/
如何使用Celery创建任务¶
在本节中,我们将展示如何使用 Celery 模块创建一个任务。Celery 提供了下面的方法来调用任务:
apply_async(args[, kwargs[, ...]])
: 发送一个任务消息delay(*args, **kwargs)
: 发送一个任务消息的快捷方式,但是不支持设置一些执行信息
delay
方法用起来更方便,因为它可以像普通函数一样被调用:
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
如果要使用 apply_async
的话,你需要这样写:
task.apply_async (args=[arg1, arg2] kwargs={'kwarg1': 'x','kwarg2': 'y'})
如何做…¶
我们通过以下简单的两个脚本来执行一个任务:
# addTask.py: Executing a simple task
from celery import Celery
app = Celery('addTask', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
第二个脚本如下:
# addTask_main.py : RUN the AddTask example with
import addTask
if __name__ == '__main__':
result = addTask.add.delay(5,5)
这里重申以下,RabbitMQ 服务会在安装之后自动启动,所以这里我们只需要启动 Celery 服务就可以了,启动的命令如下:
celery -A addTask worker --loglevel=info
命令的输出如下:

其中,有个警告告诉我们关闭 pickle 序列化工具,可以避免一些安全隐患。pickle 作为默认的序列化工作是因为它用起来很方便(通过它可以将很复杂的 Python 对象当做函数变量传给任务)。不管你用不用 pickle ,如果想要关闭警告的话,可以设置 CELERY_ACCEPT_CONTENT
变量。详细信息可以参考:http://celery.readthedocs.org/en/latest/configuration.html 。
现在,让我们执行 addTask_main.py
脚本来添加一个任务:

最后,第一个命令的输出会显示:

在最后一行,可以看到结果是10,和我们的期望一样。
讨论¶
让我们先来看 addTask.py
这个脚本。在前两行的代码中,我们创建了一个 Celery 的应用实例,然后用 RabbitMQ 服务作为消息代理:
from celery import Celery
app = Celery('addTask', broker='amqp://guest@localhost//')
Celery 函数的第一个变量是当前 module 的名字( addTask.py
) 第二个变量是消息代理的信息,一个可以连接代理的 broker(RabbitMQ). 然后,我们声明了任务。每一个任务必须用 @app.task
来装饰。
这个装饰器帮助 Celery 标明了哪些函数可以通过任务队列调度。在装饰器后面,我们定义了 worker 可以执行的任务。我们的第一个任务很简单,只是计算两个数的和:
@app.task
def add(x, y):
return x + y
在第二个脚本中, AddTask_main.py
,我们通过 delay()
方法来调用任务:
if __name__ == '__main__':
result = addTask.add.delay(5,5)
记住,这个方法只是 apply_async()
的一个快捷方式,通过 apply_async()
方法我们可以更精确地控制任务执行。
了解更多¶
如果 RabbitMQ 是默认配置的话,Celery 也可以通过 amqp://scheme
来连接。
使用SCOOP进行科学计算¶
Scalable Concurrent Operations in Python (SCOOP) 是一个可扩展的 Python 并行计算库,可以将并行的任务(Python 的 Futures
)放到各种各样的计算节点上执行。它基于 ØMQ 架构,提供了一种在分布式系统中管理 Futures 的方法。SCOOP 主要的应用场景是科学计算,尽可能利用所有的结算资源来执行大量的分布式任务。
在将 Futures 分发这方面,SCOOP 使用了 Broker 模式的变体。

这个通信系统的中心是 Broker,Broker 和所有的节点通讯,在它们之间传输信息。Futures 由各个节点创建,而不是由中心化的 Broker 创建。这种方案让系统的拓扑结构更加可靠,性能更高。事实上,Broker 占用的主要资源是 I/O ,CPU 使用很小。
准备工作¶
SCOOP 的源代码在 https://github.com/soravux/scoop/ 。这个库的依赖如下:
- Python >= 2.6 or >= 3.2
- Distribute >= 0.6.2 or setuptools >= 0.7
- Greenlet >= 0.3.4
- pyzmq >= 13.1.0 and libzmq >= 3.2.0
- SSH for remote execution
SCOOP 支持 Linux, Mac, 和 Windows 平台。和 Disco 一样,它的远程访问需要 SSH 的支持,而且必须在每个节点上都可以免密登陆。有关 SCOOP 完整的安装说明,可以参考文档: http://scoop.readthedocs.org/en/0.7/install.html 。
在 Window 上安装 SCOOP ,简单的使用 pip 命令就可以了:
pip install SCOOP
或者直接在 SCOOP 的源代码文件夹中使用:
Python setup.py install
如何做…¶
SCOOP 内置了很多适用于科学计算场景的功能,可以解决很多需要很多算力的科学问题。本文将以蒙特卡罗算法为例子。要说明白这个算法将占用很大的篇幅,但是在本例子中,只是想以并行执行一个蒙卡特罗算法解决问题展示 SCOOP。下面以计算 π 为例:
import math
from random import random
from scoop import futures
from time import time
def evaluate_points_in_circle(attempts):
points_fallen_in_unit_disk = 0
for i in range (0,attempts) :
x = random()
y = random()
radius = math.sqrt(x*x + y*y)
#the test is ok if the point fall in the unit circle
if radius < 1 :
#if ok the number of points in a disk is increased
points_fallen_in_unit_disk = \
points_fallen_in_unit_disk + 1
return points_fallen_in_unit_disk
def pi_calculus_with_Montecarlo_Method(workers, attempts):
print("number of workers %i - number of attempts %i" % (workers,attempts))
bt = time()
#in this point we call scoop.futures.map function
#the evaluate_number_of_points_in_unit_circle \
#function is executed in an asynchronously way
#and several call this function can be made concurrently
evaluate_task = \
futures.map(evaluate_points_in_circle,
[attempts] * workers)
taskresult= sum(evaluate_task)
print ("%i points fallen in a unit disk after " \
%(taskresult/attempts))
piValue = (4. * taskresult/ float(workers * attempts))
computationalTime = time() - bt
print("value of pi = " + str(piValue))
print ("error percentage = " + \
str((((abs(piValue - math.pi)) * 100) / math.pi)))
print("total time: " + str(computationalTime))
if __name__ == "__main__":
for i in range (1,4):
# let's fix the numbers of workers...only two,
# but it could be much greater
pi_calculus_with_Montecarlo_Method(i*1000, i*1000)
print(" ")
运行这短代码的命令如下:
python –m scoop name_file.py
这段代码的输出如下:
C:\Python CookBook\Chapter 5 - Distributed Python\chapter 5 - codes>python -m scoop pi_calculus_with_montecarlo_method.py
[2015-06-01 15:16:32,685] launcher INFO SCOOP 0.7.2 dev on win32 using Python 3.3.0 (v3.3.0:bd8afb90ebf2, Sep 29 2012, 10:55:48) [MSC v.1600 32 bit (Intel)], API: 1013
[2015-06-01 15:16:32,685] launcher INFO Deploying 2 worker(s) over 1 host(s).
[2015-06-01 15:16:32,685] launcher INFO Worker d--istribution:
[2015-06-01 15:16:32,686] launcher INFO 127.0.0.1:1 + origin
Launching 2 worker(s) using an unknown shell.
number of workers 1000 - number of attempts 1000
785 points fallen in a unit disk after
value of pi = 3.140636
error percentage = 0.03045122952842962
total time: 10.258585929870605
number of workers 2000 - number of attempts 2000
1570 points fallen in a unit disk after
value of pi = 3.141976
error percentage = 0.012202295220195048
total time: 20.451170206069946
number of workers 3000 - number of attempts 3000
2356 points fallen in a unit disk after
value of pi = 3.1413777777777776
error percentage = 0.006839709526630775
total time: 32.3558509349823
[2015-06-01 15:17:36,894] launcher (127.0.0.1:59239) INFO
process is done.
[2015-06-01 15:17:36,896] launcher (127.0.0.1:59239) INFO
cleaning spawned subprocesses.
如果我们增加 attempts 的次数和 worker 的数量,就可以提高 π 的精度。

如何做…¶
前面的代码只是蒙卡特罗方法计算 π 的一种实现。 evaluate_points_in_circle()
函数随机的产生点的坐标 (x, y)
,然后判断此点是否落在单位面积的内切圆内。
每当判断点落在圆的面积内的时候, points_fallen_in_unit_disk
变量的值加 1. 当内循环结束的时候,这个值就表示最终落在圆的面积内点的数量。这个数字足够计算 pi 的值了。事实上,点落在圆内的实际概率是 π / 4 ,这是圆的面积和单位面积的比例。圆的面积是 π,单位面积是 4.
最后,通过计算落在圆内的点的数量 taskresult
,和尝试的次数 workers * attempts
的比例,就可以得到 π / 4
的值,当然也就得到最终 π 的值了。
piValue = (4. * Taskresult/ float(workers * attempts))
SCOOP 函数如下:
futures.map(evaluate_points_in_circle, [attempts] * workers)
这行代码会交给 SCOOP 来将计算任务分发给多个节点,并收集计算结果。它将会并发地调用 evaluate_points_in_circle
。
通过 SCOOP 使用 map 函数¶
当处理 list 或其他类型的序列时,一种很常用的操作是对序列中的每一个元素都执行相同的操作,然后收集结果。举例说,通过 Python IDLE 可以对一个 list 这样更新:
>>>items = [1,2,3,4,5,6,7,8,9,10]
>>>updated_items = []
>>>for x in items:
>>> updated_items.append(x*2)
>>> updated_items
>>> [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
这是比较常见的一个操作,但是 Python 内置了一些功能可以更优雅地完成类似操作。
Python 的函数 map(aFunction, aSequence)
将在序列的每一个元素上调用传入的函数,并将结果以 list 的形式返回。用这个函数完成上面的操作:
>>>items = [1,2,3,4,5,6,7,8,9,10]
>>>def multiplyFor2(x):return x*2
>>>print(list(map(multiplyFor2,items)))
>>>[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
这里,我们将自定义的 multiplyFor2
函数传给 map
. 它将对 items
中所有的元素执行此函数,最后以 list 的形式返回结果。
我们也可以将 lambda 函数(不用绑定变量名的匿名函数)当做参数传给 map
函数。这段代码就变成以下这样:
>>>items = [1,2,3,4,5,6,7,8,9,10]
>>>print(list(map(lambda x:x*2,items)))
>>>[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
内置的 map 函数比手动写 for 循环的性能更高。
准备工作¶
SCOOP 模块提供了多个 map 函数可以将异步计算任务下发到多个计算节点:
futures.map(func, iterables, kargs)
: 此函数返回一个生成器,可以按照输入的顺序遍历结果。可以说是内置map
函数的一个并行执行版本。futures.map_as_completed(func, iterables, kargs)
: 每当有结果出现时,就立刻 yield 出来。futures.scoop.futures.mapReduce(mapFunc, reductionOp, iterables, kargs)
: map 函数执行过后可以通过此函数执行 reduction 函数。返回结果是一个元素。
如何做…¶
在这个例子中,我们将对比 SCOOP 实现 MapReduce 和 Python 内置函数实现:
"""
Compare SCOOP MapReduce with a serial implementation
"""
import operator
import time
from scoop import futures
def simulateWorkload(inputData):
time.sleep(0.01)
return sum(inputData)
def CompareMapReduce():
mapScoopTime = time.time()
res = futures.mapReduce(
simulateWorkload,
operator.add,
list([a] * a for a in range(1000)),
)
mapScoopTime = time.time() - mapScoopTime
print("futures.map in SCOOP executed in {0:.3f}s with result:{1}".format(
mapScoopTime, res))
mapPythonTime = time.time()
res = sum(map(simulateWorkload, list([a] * a for a in range(1000))))
mapPythonTime = time.time() - mapPythonTime
print("map Python executed in: {0:.3f}s with result: {1}".format(
mapPythonTime, res))
if __name__ == '__main__':
CompareMapReduce()
这段代码通过以下命令来执行:
python -m scoop map_reduce.py
> [2015-06-12 20:13:25,602] launcher INFO SCOOP 0.7.2 dev on win32
using Python 3.4.3 (v3.4.3:9b73f1c3e601, Feb 24 2015, 22:43:06) [MSC
v.1600 32 bit (Intel)], API: 1013
[2015-06-12 20:13:25,602] launcher INFO Deploying 2 worker(s) over 1
host(s).
[2015-06-12 20:13:25,602] launcher INFO Worker d--istribution:
[2015-06-12 20:13:25,602] launcher INFO 127.0.0.1: 1 + origin
Launching 2 worker(s) using an unknown shell.
futures.map in SCOOP executed in 8.459s with result: 332833500
map Python executed in: 10.034s with result: 332833500
[2015-06-12 20:13:45,344] launcher (127.0.0.1:2559) INFO
is done.
[2015-06-12 20:13:45,368] launcher (127.0.0.1:2559) INFO
cleaning spawned subprocesses.
讨论¶
在这个例子中,我们将 SCOOP 实现的 MapReduce 和 Python 内置函数的 MapReduce 来对比。主要的函数是 CompareMapReduce()
,里面有两种实现和对时间的统计。程序结构如下:
mapScoopTime = tme.time()
#Run SCOOP MapReduce
mapScoopTime = time.time() – mapScoopTime
mapPythonTime = time.time()
#Run serial MapReduce
mapPythonTime = time.time() - mapPythonTime
在输出中,我们打印了执行时间:
futures.map in SCOOP executed in 8.459s with result: 332833500
map Python executed in: 10.034s with result: 332833500
为了得到比较明显的时间比较,我们在 simulatedWordload
函数中引入了 time.sleep
来延长计算时间。
def simulateWorkload(inputData, chose=None):
time.sleep(0.01)
return sum(inputData)
SCOOP 版本的 MapReduce 如下:
res = futures.mapReduce(
simulateWorkload,
operator.add,
list([a] * a for a in range(1000)),
)
futures.mapReduce
函数需要以下参数:
simulateWork
: 这是要执行的 Futures,注意这个 callable 必须有返回值。operator.add
: 此函数将会在 reduce 操作的时候调用,必须接收两个参数,返回一个值。list(...)
: 一个可迭代对象,其中每一个元素都会传给 callable 对象作为 Future。
使用 Python 内置函数实现的 MapReduce 如下:
res = sum(map(simulateWorkload,
list([a] * a for a in range(1000))))
Python 内置的 map()
函数接受两个参数: simulateWorkload
函数和可迭代的 list()
对象。Reduce 操作我们只是简单地用 Python 内置的 sum()
函数。
使用Pyro4进行远程方法调用¶
Python Remote Objects (Pyro4) 实现了类似 Java 的远程方法调用(Remote Method Invocation, RMI). 可以调用一个远程对象(存在于另一个进程中,甚至是另一台机器上),就像调用本地对象一样(处于和调用者一样的进程)。从概念的角度讲,RMI 的技术可以追溯到远程过程调用(remote procedure call,RPC),RMI 是远程过程调用技术针对面向对象范式进行改造——方法替换过程。在面向对象系统中,对远程方法调用使用这样一种机制可以在项目的统一性和对称性上有很多优势,因为这样我们可以复用同一应用不同对象或方法之间调用的模型。

从图中可以看出,Pyro4 用客户端/服务器的方式来管理和分发对象。Pyro4 可以将客户端调用转换为远程对象调用。在调用的过程中,有两个重要的角色,一个是客户端,一个是服务客户端调用的服务器。Pyro4 以分布式的形式提供这种服务。
准备工作¶
安装非常简单,使用 pip 即可:
pip install pyro
或者可以从 https://github.com/irmen/Pyro4 下载源代码,使用 setup.py
安装。
本例中将使用 Python3.3 版本和 Windows 系统。
如何做…¶
在本例中,我们将使用 Pyro4 构建简单的服务器客户端通信。其中, server
部分的代码如下:
import Pyro4
class Server(object):
def welcomeMessage(self, name):
return ("Hi welcome " + str (name))
def startServer():
server = Server()
daemon = Pyro4.Daemon()
ns = Pyro4.locateNS()
uri = daemon.register(server)
ns.register("server", uri)
print("Ready. Object uri =", uri)
daemon.requestLoop()
if __name__ == "__main__":
startServer()
client.py
的代码如下:
import Pyro4
uri = input("What is the Pyro uri of the greeting object? ").strip()
name = input("What is your name? ").strip()
server = Pyro4.Proxy("PYRONAME:server")
print(server.welcomeMessage(name))
我们需要跑一个 name server 来运行这段代码,从命令行输入:
python -m Pyro4.naming
将看到一下信息:

看到这样的输入就表示 name server 已经成功运行了。接下来要分别在两个窗口中启动 Server 和 Client。运行 Server,使用下面的命令:
python server.py
可以看到类似下面的输出:

然后使用下面的命令运行客户端:
python client.py
将会看到类似下面的输出:
insert the PYRO4 server URI (help : PYRONAME:server)
这表示你需要输入 Pyro4 服务的名字,输入 PYRONAME: server
即可。
insert the PYRO4 server URI (help : PYRONAME:server) PYRONAME:server
然后将会看到以下命令要求输入名字:
What is your name? Rashmi
最后,你将会看到欢迎信息: Hi welcome Rashmi
.类似下面这样。

讨论¶
Server 中包含可以被远程调用的对象,在我们的例子中,这个对象只有一个方法 welcomeMessage()
返回一个字符串。
class Server(object):
def welcomeMessage(self, name):
return ("Hi welcome " + str (name))
要启动这个 Server,必须按照以下步骤:
- 实例化一个 Server 对象(名字叫
server
):server = Server()
。 - 启动 Pyro4 守护进程:
daemon = Pyro4.Daemon()
. Pyro4 的守护进程对象将收到的请求分发到合适的对象。每一个 Server 都必须有一个守护进程,来管理对象的调用。每一个 Server 的守护进程都知道其他进程的可调用对象。 - 然后我们必须运行一个 name server,并且拿到这个 name server 的地址:
ns = Pyro4.locateNS()
。 - 然后将这个 server 注册为 Pyro4 的对象,只有 Pyro4 的守护进程才知道这个对象:
uri = daemon.register(server)
. 它返回注册对象的 URI。 - 最后,我们可以在 name server 中给这个对象注册一个名字。
- 这个函数的最后是调用守护进程的
eventloop
,它会启动 server 的事件循环,等待调用。
Pyro4 的 API 让开发者可以通过透明的方式分发对象。客户端请求服务器执行 welcomeMessage()
方法。远程调用先会创建一个代理对象,Pyro4 的客户端会通过代理对象将方法调用转发到远程对象,并将结果转发回调用的代码。
server = Pyro4.Proxy("PYRONAME:server")
现在,我们就可以调用 server 的方法,打印欢迎信息了。
print(server.welcomeMessage(name))
使用 Pyro4 链接对象¶
在本章中,我们将展示如何使用 Pyro4 创建互相调用的对象链。假设我们想创建如下的分布式架构:

如图所示,我们有四个对象:一个客户端,和依照三个链式拓扑配置的 Server。客户端将请求转发到 Server1 开始链式调用,然后转发到 Server2。然后调用对象链中的下一个对象 Server3. Server3 最后调用 Server1 结束。
通过我们将要展示的这个例子,可以看出管理远程对象多么容易,可以很方便地拓扑出更加复杂的架构。
如何做…¶
使用 Pyro4 实现这样一个对象链,我们需要 5 个 Python 脚本。第一个是客户端 Client, 代码如下。
from __future__ import print_function
import Pyro4
obj = Pyro4.core.Proxy("PYRONAME:example.chain.A")
print("Result=%s" % obj.process(["hello"]))
在 Server 的代码中,当前 Server 用 this
变量表示,下一个要调用的 Server 用变量 that
表示。
server_1.py
的代码如下:
from __future__ import print_function
import Pyro4
import chainTopology
this = "1"
next = "2"
servername = "example.chainTopology." + this
daemon = Pyro4.core.Daemon()
obj = chainTopology.Chain(this, next)
uri = daemon.register(obj)
ns = Pyro4.naming.locateNS()
ns.register(servername, uri)
# enter the service loop.
print("server_%s started " % this)
daemon.requestLoop()
server_2.py
的代码如下:
from __future__ import print_function
import Pyro4
import chainTopology
this = "2"
next = "3"
servername = "example.chainTopology." + this
daemon = Pyro4.core.Daemon()
obj = chain.chainTopology(this, next)
uri = daemon.register(obj)
ns = Pyro4.naming.locateNS()
ns.register(servername, uri)
# enter the service loop.
print("server_%s started " % this)
daemon.requestLoop()
server_3.py
的代码如下:
from __future__ import print_function
import Pyro4
import chainTopology
this = "3"
next = "1"
servername = "example.chainTopology." + this
daemon = Pyro4.core.Daemon()
obj = chain.chainTopology(this, next)
uri = daemon.register(obj)
ns = Pyro4.naming.locateNS()
ns.register(servername, uri)
# enter the service loop.
print("server_%s started " % this)
daemon.requestLoop()
最后是 chain
对象,代码如下:
chainTopology.py:
from __future__ import print_function
import Pyro4
class Chain(object):
def __init__(self, name, next):
self.name = name
self.nextName = next
self.next = None
def process(self, message):
if self.next is None:
self.next = Pyro4.core.Proxy("PYRONAME:example.chain." + self.nextName)
if self.name in message:
print("Back at %s; the chain is closed!" % self.name)
return ["complete at " + self.name]
else:
print("%s forwarding the message to the object %s" \
% (self.name, self.nextName))
message.append(self.name)
result = self.next.process(message)
result.insert(0, "passed on from " + self.name)
return result
执行这段代码,首先要启动 Pyro4 name server:
C:>python -m Pyro4.naming
Not starting broadcast server for localhost.
NS running on localhost:9090 (127.0.0.1)
Warning: HMAC key not set. Anyone can connect to this server!
URI = PYRO:Pyro.NameServer@localhost:9090
然后运行其他三个 server,在三个不同的命令行中运行 python server_name.py
命令。
server_1 的信息如下:

server_2 的信息如下:

server_3 的信息如下:

最后可以运行 client.py
了:

上面的的信息中显示出对象被在三个服务器中转发,最后回到 server_1
的时候任务完成。当对象被转发到下一个服务器时,我们可以关注一下对象服务器的行为。通过在 server_1
的输出中,我们可以观察发生了什么。

以及 server_2
的输出:

和 server_3
的输出:

讨论¶
在这个例子中的核心是我们在 chainTopology.py
中定义的 Chain
类。通过它三个服务器可以互相通讯,每一个 server 可以调用 class 来确定链中的下一个元素是谁(参考 chainTopology.py
中的 process
方法)。同时,它通过 Pyro4.core.proxy
来调用下一个元素:
if self.next is None:
self.next = Pyro4.core.Proxy("PYRONAME:example.chainTopology." + self.nextName)
如果调用链结束(在 server_3
调用 server_1
的时候),结束信息将会打印出来。
if self.name in message:
print("Back at %s; the chain is closed!" % self.name)
return ["complete at " + self.name]
如果有下一个元素,就打印出以下的信息:
print("%s forwarding the message to the object %s" \
% (self.name, self.nextName))
message.append(self.name)
result = self.next.process(message)
result.insert(0, "passed on from " + self.name)
return result
Server 的代码除了链中上一个元素和下一个元素的位置不同,其他的都是相同的,比如,下面是 server_1
的代码:
this = "1"
next = "2"
剩下的代码和之前的例子相同,是如何和链中下一个节点通讯的:
servername = "example.chainTopology." + this
daemon = Pyro4.core.Daemon()
obj = chain.chainTopology(this, next)
uri = daemon.register(obj)
ns = Pyro4.naming.locateNS()
ns.register(servername, uri)
# enter the service loop.
print("server_%s started " % this)
daemon.requestLoop()
最后,在客户端的代码中,我们通过调用链中 server_1
的代码来触发整个调用:
obj = Pyro4.core.Proxy("PYRONAME:example.chainTopology.1")
使用Pyro4部署客户端-服务器应用¶
在本节中,我们会用 Pyro4 学习如何写一个简单的客户端-服务器应用。这里提供的示例并不完整,但是可以成功运行。
一个客户端-服务器程序就是,在一个网络内,客户端连接上服务器来请求特定的服务,可以与其他的客户端共享软件/硬件资源,并使用相同的协议。在本节的系统中,server 管理一个在线购物网站,客户端负责已注册的用户连接到 server 来购物。
如何做…¶
为了让例子尽量简单一些,本例只有三个脚本。第一个代表 client
对象,用来管理用户。第二个脚本是 shop
对象,第二个脚本是 server
.
Server 端的代码如下( server.py
):
"""The Shops server"""
from __future__ import print_function
import Pyro4
import shop
ns = Pyro4.naming.locateNS()
daemon = Pyro4.core.Daemon()
uri = daemon.register(shop.Shop())
ns.register("example.shop.Shop", uri)
print(list(ns.list(prefix="example.shop.").keys()))
daemon.requestLoop()
Client 端的代码如下 ( client.py
):
from __future__ import print_function
import sys
import Pyro4
# A Shop client.
class client(object):
def __init__(self, name, cash):
self.name = name
self.cash = cash
def doShopping_deposit_cash(self, Shop):
print("\n*** %s is doing shopping with %s:" % (self.name, Shop.name()))
print("Log on")
Shop.logOn(self.name)
print("Deposit money %s" % self.cash)
Shop.deposit(self.name, self.cash)
print("balance=%.2f" % Shop.balance(self.name))
print("Deposit money %s" % self.cash)
Shop.deposit(self.name, 50)
print("balance=%.2f" % Shop.balance(self.name))
print("Log out")
Shop.logOut(self.name)
def doShopping_buying_a_book(self, Shop):
print("\n*** %s is doing shopping with %s:" % (self.name, Shop.name()))
print("Log on")
Shop.logOn(self.name)
print("Deposit money %s" % self.cash)
Shop.deposit(self.name, self.cash)
print("balance=%.2f" % Shop.balance(self.name))
print("%s is buying a book for %s$" % (self.name, 37))
Shop.buy(self.name, 37)
print("Log out")
Shop.logOut(self.name)
if __name__ == "__main__":
ns = Pyro4.naming.locateNS()
uri = ns.lookup("example.shop.Shop")
print(uri)
Shop = Pyro4.core.Proxy(uri)
meeta = client("Meeta", 50)
rashmi = client("Rashmi", 100)
rashmi.doShopping_buying_a_book(Shop)
meeta.doShopping_deposit_cash(Shop)
print("")
print("")
print("")
print("")
print("The accounts in the %s:" % Shop.name())
accounts = Shop.allAccounts()
for name in accounts.keys():
print(" %s : %.2f" % (name, accounts[name]))
Shop 对象的代码如下 ( shop.py
):
class Account(object):
def __init__(self):
self._balance = 0.0
def pay(self, price):
self._balance -= price
def deposit(self, cash):
self._balance += cash
def balance(self):
return self._balance
class Shop(object):
def __init__(self):
self.accounts = {}
self.clients = ["Meeta", "Rashmi", "John", "Ken"]
def name(self):
return "BuyAnythingOnline"
def logOn(self, name):
if name in self.clients:
self.accounts[name] = Account()
else:
self.clients.append(name)
self.accounts[name] = Account()
def logOut(self, name):
print("logout %s" % name)
def deposit(self, name, amount):
try:
return self.accounts[name].deposit(amount)
except KeyError:
raise KeyError("unknown account")
def balance(self, name):
try:
return self.accounts[name].balance()
except KeyError:
raise KeyError("unknown account")
def allAccounts(self):
accs = {}
for name in self.accounts.keys():
accs[name] = self.accounts[name].balance()
return accs
def buy(self, name, price):
balance = self.accounts[name].balance()
self.accounts[name].pay(price)
下面开始执行这段代码,首先启动 Pyro4 的 name server:
C:>python -m Pyro4.naming
Not starting broadcast server for localhost.
NS running on localhost:9090 (127.0.0.1)
Warning: HMAC key not set. Anyone can connect to this server!
URI = PYRO:Pyro.NameServer@localhost:9090
然后,使用 python server.py
命令启动 Server. 命令行的显示如下图所示。

最后,用下面的命令启动客户端,模拟用户的动作。
python client.py
命令行的输出将如下所示:
C:\Users\Utente\Desktop\Python CookBook\Python Parallel Programming
INDEX\Chapter 5 - Distributed Python\
chapter 5 - codes\banks>python client.py
PYRO:obj_8c4a5b4ae7554c2c9feee5b0113902e0@localhost:59225
*** Rashmi is doing shopping with BuyAnythingOnline:
Log on
Deposit money 100
balance=100.00
Rashmi is buying a book for 37$
Log out
*** Meeta is doing shopping with BuyAnythingOnline:
Log on
Deposit money 50
balance=50.00
Deposit money 50
balance=100.00
Log out
The accounts in the BuyAnythingOnline:
Meeta : 100.00
Rashmi : 63.00
输出表示两个用户的会话, Meeta
和 Rashmi
.
讨论¶
客户端应用必须找到 Shop()
对象,通过如下的调用:
ns = Pyro4.naming.locateNS()
然后打开一个通讯的 channel:
daemon = Pyro4.core.Daemon()
uri = daemon.register(shop.Shop())
ns.register("example.shop.Shop", uri)
daemon.requestLoop()
shop.py
有处理账户和购物的脚本。 shop
类将会管理每一个账户,它提供了登陆登出,管理用户的余额,以及处理购买动作:
class Shop(object):
def logOn(self, name):
if name in self.clients:
self.accounts[name] = Account()
else:
self.clients.append(name)
self.accounts[name] = Account()
def logOut(self, name):
print("logout %s" % name)
def deposit(self, name, amount):
try:
return self.accounts[name].deposit(amount)
except KeyError:
raise KeyError("unknown account")
def balance(self, name):
try:
return self.accounts[name].balance()
except KeyError:
raise KeyError("unknown account")
def buy(self, name, price):
balance = self.accounts[name].balance()
self.accounts[name].pay(price)
每一个顾客都有自己的 Account
对象,提供存款管理。
class Account(object):
def __init__(self):
self._balance = 0.0
def pay(self, price):
self._balance -= price
def deposit(self, cash):
self._balance += cash
def balance(self):
return self._balance
最后, client.py
有一个启动模拟客户行为的类。在 main 函数中,我们模拟了两个用户, Rashmi
和 Meeta
:
meeta = client('Meeta',50)
rashmi = client('Rashmi',100)
rashmi.doShopping_buying_a_book(Shop)
meeta.doShopping_deposit_cash(Shop)
他们在网站上存上一些现金,然后开始购物:
Rashmi 买了一本书:
def doShopping_buying_a_book(self, Shop):
Shop.logOn(self.name)
Shop.deposit(self.name, self.cash)
Shop.buy(self.name,37)
Shop.logOut(self.name)
Meeta 分两次充了 $100 到账户中:
def doShopping_deposit_cash(self, Shop):
Shop.logOn(self.name)
Shop.deposit(self.name, self.cash)
Shop.deposit(self.name, 50)
Shop.logOut(self.name)
最后程序打印出两个用户的存款:
print("The accounts in the %s:" % Shop.name())
accounts = Shop.allAccounts()
for name in accounts.keys():
print(" %s : %.2f" % (name, accounts[name]))
PyCSP和通信顺序进程¶
PyCSP 是基于通信顺序进程的一个 Python 模块。通信顺序进程是通过消息处理建立并发程序的编程范式。PyCSP 模块的特点是:
- 进程之间交换消息
- 线程之间可以共享内存
- 通过 channels 来实现交换消息
Channels 可以做: - 进程之间传值 - 进程同步
PyCSP 允许多种不同的 channel 类型: One2One, One2Any, Any2One, Any2Any. 这些名字代表多少 readers 和 writers 可以额通过 channel 通讯。
准备工作¶
(译者注:本文使用的 python-csp 已经发生了比较大的变化, 文中的代码可能已经无法运行)
PyCSP 可以通过 pip 用以下命令安装:
pip install python-csp
Github 也有库的源码: https://github.com/futurecore/python-csp .
下载之后在项目目录执行以下命令来安装:
python setup.py install
在本例子中,我们使用 Python2.7 .
如何做…¶
在第一个例子中,我们会先介绍 PyCSP 中的基本概念,processes 和 channels. 我们将定义两个进程,分别是 counter 和 printer. 下面我们来看如何定义这两个进程之间的通信过程。
参考下面的代码:
# -*- coding: utf-8 -*-
from pycsp.parallel import *
@process
def processCounter(cout, limit):
for i in xrange(limit):
cout(i)
poison(cout)
@process
def processPrinter(cin):
while True:
print cin(),
A = Channel('A')
Parallel(
processCounter(A.writer(), limit=5),
processPrinter(A.reader())
)
shutdown()
在 Python2.7 中的运行结果如下:
Python 2.7.9 (default, Dec 10 2014, 12:28:03) [MSC v.1500 64 bit (AMD64)] on win32
Type "copyright", "credits" or "license()" for more information.
>>> ========================RESTART ==========================
>>>
0 1 2 3 4
讨论¶
// TODO
了解更多¶
CSP是一种用于描述并发进程交互的语言。在数学中被称为代数过程。它在实践中被用作规范和验证各种系统的竞争条件的工具。 CSP启发的编程语言Occam的现在被广泛用作并行编程语言。
对 CSP 有兴趣的同学,建议阅读霍尔的原著: http://www.usingcsp.com/cspbook.pdf .
使用Disco进行MapReduce¶
使用RPyC远程调用¶
Remote Python Call(RPyC)是一个用作远程过程调用,同时也可以用作分布式计算的Python模块。其基础RPC主要是提供一种将控制从当前程序(客户端)转移到其他程序(服务器)的机制,类似于在一个主程序里去调用一个子程序。这种方式的优点是它拥有非常简单的语义,知识以及熟悉的集中式函数调用。在一个程序调用里, 客户端进程会挂起,直到服务器完成计算返回结果后才会继续执行。这种方法之所以高效是因为客户端-服务器的通信表现为过程调用而不是传输层调用,因此所有网络操作的细节通过将应用程序置于被称作存根(stubs)的本地过程对应用程序隐藏。RPyC的主要特性是:
- 语法透明,远程过程调用和本地调用有一样的语法
- 语义透明,远程过程调用和本地调用是语义一致的
- 可以处理同步和异步通信
- 对称通信协议意味着不论是客户端还是服务器都可以处理一个请求

准备工作¶
使用pip来安装非常容易。在你的命令行终端里,键入下面的命令: pip install rpyc
。
另外你可以去 https://github.com/tomerfiliba/rpyc 下载完整的包(是一个 .zip
文件)。下载完成之后在包的根目录里执行以下命令: python setup.py install
。
安装完成之后,你可以浏览这个库。在我们的例子里,我们将在同一台机器 localhost
上运行一个客户端和服务器。使用 rpyc
运行一个服务器非常简单:在rpyc包的目录 ../rpyc-master/bin
里执行 rpyc_classic.py
: python rpyc_classic.py
。
在运行这个脚本之后,你可以看到在命令提示符上有如下信息:
INFO:SLAVE/18812:server started on [0.0.0.0]:18812
如何做…¶
我们现在已经可以开始我们第一个例子了:重定向一个远程处理的stdout:
import rpyc
import sys
c = rpyc.classic.connect("localhost")
c.execute("print('hi python cookbook')")
c.modules.sys.stdout = sys.stdout
c.execute("print('hi here')")
通过运行这个脚本,你会在服务器端看到重定向的输出:
INFO:SLAVE/18812:server started on [0.0.0.0]:18812
INFO:SLAVE/18812:accepted 127.0.0.1:6279
INFO:SLAVE/18812:welcome [127.0.0.1]:6279
hi python cookbook
(译者注:在执行 c.modules.sys.stdout = sys.stdout
之后,print将会输出到客户端的命令行里)
讨论¶
第一步是执行一个客户端去连接服务器:
import rpyc
c = rpyc.classic.connect("localhost")
这里,客户端的语句 rpyc.classic.connect(host, port)
根据给定的host和port来创建一个套接字。套接字定义了连接的端点。 rpyc
使用套接字和其他程序通信,其可以分布在不同的计算机上。
下来,我们执行了这条语句: c.execute("print('hi python cookbook')")
。
这条语句就会在服务器上执行print语句(远程的 exec
语句)。
第六章 Python GPU编程¶
介绍¶
图形处理器是一种擅长渲染多边形单元的图像的电子元件。虽然GPU原先的设计目的是渲染图像,但随着不断优化,GPU已经变得越来越复杂,在实时渲染和离线渲染方面的效率也更高。除此之外,GPU也逐渐开始应用在科学计算领域。GPU 的特点是架构高度并行,所以擅长快速处理超大数据集。随着近几年硬件性能的快速提升,以及它的可编程性,GPU 很快引起了学术界的注意。GPU 不再仅仅用于图形渲染领域,人们开始探索它在其他方面的可能性。传统的 GPU 是固定的功能设备,渲染流程是固定于硬件上的。这就限制了图形程序员去使用不同的、更高效、质量更高的渲染算法。所以,现代 GPU 都是建立在百万个轻量级并行核心上的,可以使用 shaders 来编程定义图形渲染。这在图形渲染和游戏方面是划时代的进步。有了这么多可以编程的核心,GPU 厂商逐渐开始开发并行编程模型。每个 GPU 由多个流式处理器(Streaming Multiprocessor, SM)组成,它代表并行的第一层逻辑级别,每一个 SM 都是并行独立、互不干扰地工作。

每个 SM 又被分成一组流处理器(Stream Processors, SP),每一个 SP 都有一个可以执行的核心,可以运行一个线程。SP 是执行逻辑的最小单位,表示更精细的并行度。SM和SP的概念本质上是结构性的,但这可以概述 GPU 的 SP 的组织逻辑,即以特定执行模式为特征的逻辑单元组合在一起。一组中所有核心同时运行相同的指令,属于我们在本书第一章中描述的单指令多数据(SIMD)模型。
每一个 SM 都有多个寄存器,寄存器可以被认为是一小块读写速度很快的、临时的、本地的(不同核心之间无法共享)的内存。每个核心可以使用这些寄存器来存储常用的值。有一个学科叫做图形处理单元上的通用计算(general-purpose computing on graphics processing units, GP-GPU),致力于研究 GPU 的高速并行计算能力。之前介绍过,GPU 和传统的处理器架构很不同,所以它们也面临不同性质的问题,需要不同的编程技术。GPU 最亮眼的特点是有大量的核心,我们可以同时执行很多个线程计算单元,它们同时执行相同的操作。想象这样一个场景,你需要把数据分成很多小的部分,然后在这些小的部分上执行相同的操作,这时候这种计算模式就效率很高。相反,如果你的操作必须严格按照一定的顺序来执行,这种架构的发挥余地就不大了。在其他不能将计算平均地分成很多小的部分的情况,也不适用于这种架构。GPU 的这种编程范式被称为流计算(Stream Processing), 因为数据可以看做是一个巨大的数据流,相同的操作不断在这个流上面操作。
目前,能发挥 GPU 的这种计算能力最好的解决方案是 CUDA 和 OpenCL 这两个库。下面几节中,我们将用 Python 来展示这些库的用法。