Python并发编程
一个运行的程序称作进程。每个进程都有自己的系统状态,包括内存、已打开文件列表、用于跟踪正在执行的指令的程序计数器以及用于保存函数的局部变量的调用栈。通常在一个控制流序列中, 进程逐条执行语句,这一般称为进程的主线程。在任何一个给定的时刻,程序都只做一件事情。 程序可以使用库函数创建新的进程,比如os或subprocess模块中的函数(例如os.fork()、 subprocess.Popen等)。这些叫做子进程的进程是作为完全独立的实体运行的。 每个子进程都有自己的私有系统状态和执行主线程。因为子进程是独立的,所以它可以与原始进程并发执行。也就是说,创建子进程的进程可以继续处理别的事情,同时子进程在后台执行它自己的任务。
尽管进程是孤立的,但它们可以彼此通信,这称为进程间通信(Interprocess Communication, IPC)。 进程间通信的最常见形式是基于消息传递。一条消息就是一块原始字节的缓存。然后,就可以使用像 send()和recv()这样的简单操作,通过一个I/O通道(如管道或网络套接字)来传输或接收消息。另一种不太常见的IPC机制依赖于内存映射区域(参见nmap模块)。借助内存映射,进程可以创建共享的内存区域。如果对这些内存区域进行修改,那么査看这些区域的所有进程都能看到这些修改。 如果需要同时处理多个任务,可以在一个应用程序内使用多个进程,每个进程负责处理一部分工 作。
另一种将工作细分为多个任务的方法是使用线程。线程类似于进程,也具有自己的控制流和执行栈。但线程运行在创建它的进程内部,它们共享所有的数据和系统资源。当应用程序需要并发执行多个任务时,可以使用线程,但可能存在大量需要在各个任务之间共享的系统状态。
使用多个进程或线程时,主机操作系统负责安排它们的工作。安排的具体做法是:给每个进程(线程)分配一个小的时间片,并在所有活动任务之间快速循环——给每个任务分配一部分可用的CPU周期。例如,如果系统同时运行10个活动的进程,操作系统将给每个进程分配大约1/10的CPU时间,同时在进程之间快速循环。在具有多个CPU核心的系统上,操作系统在安排进程时可以尽可能使用每个 CPU,从而并行执行进程。 编写使用并发执行的程序原本就很复杂,其复杂性的一个主要原因就是同步和访问共享数据。也就是说,多个任务同时更新一个数据结构可能导致数据损坏和程序状态不一致(这个问题的正式说法是竞争条件)。要解决这些问题,并发程序必须找出关键的代码段,并使用互斥锁和其他类似的同步手段保护它们。例如,如果不同的线程尝试同时向一个文件写入数据,可以使用互斥锁来同步它们的
在大多数系统上,Python同时支持消息传递和基于线程的并发编程。Python线程受到的限制有很多,Python解释器使用了内部的GIL (Global Interpreter Lock,全局解释器锁定),在任意指定的时刻只允许单个 Python线程执行。无论系统上存在多少个可用的CPU核心,这限制了 Python程序只能在一个处理器上 运行。如果一个应用程序的大部分是I/O受限(bounded)的,那么使用线程一般没有问题,因为额外的处理器对于花费大多数时间等待事件的程序帮助不大。对于涉及大量CPU处理的应用程序而言,使用线程来细分工作没有任何好处,反而还会降低程序的运行速度。
避免大量使用线程的较为常见的做法是把这类应用程序重新构造为异步事件处理系统。例如,中心的事件循环可以监控使用select模块的所有I/O,并将异步事件分离给大量I/O 处理器。这是诸如asyncore这样的库模块和诸如Twisted等流行的第三方模块的基础。
如果要在Python中进行各种类型的并发编程,消息传递很可能是必须熟练掌握的概念。 即便使用线程,常推荐的方法也是将应用程序的结构设计为大量独立的线程集合,这些线程通过消息队列交换数据。这种特别的方法往往很少出错,因为它极大地减少了对使用锁定和其他同步手段的需求。消息传递还会自然扩展到网络和分布式系统中。消息传递的抽象也与髙级Python功能(如协程)有关。例如,协程是可以接收并处 理发送给它的消息的函数。因此,掌握了消息传递之后,写出的程序会较之前更灵活。
该模块为在子进程中运行任务、通信和共享数据,以及执行各种形式的同步提供支持。接口风格与threading模块类似,但要注意进程之间没有任何共享状态。
- 确保进程之间传递的所有数据都能够序列化
- 避免使用共享数据,尽可能使用消息传递和队列。使用消息传递时,不必过于担心同步、锁定和其他问题。当进程的数量增长时,它往往还能提供更好的扩展
- 在必须行在单独进程中的函数内部,不要使用全局变量而应当显式地传递参数
- 尽量不要在同一个程序中混合使用线程和多线程处理
- 特別要注意关闭进程的方式。一般而言,需要显式地关闭进程,并使用一种定义良好的终止模式,而不要仅仅依赖于垃圾收集或者被迫使用terminate ()操作强制终止子进程
- 管理器和代理的使用与分布式计算中的多个概念密切相关(例如,分布式对象)
- 尽管此模块可以工作在Windows上,但还是应该仔细阅读官方文档中的各种微妙细节
- 最重要的一点是:尽量让事情变得简单
进程是multiprocessing模块的核心,其构造函数如下:
1 2 3 4 5 6 |
Process([group [, target [, name [, args [, kwargs ]]]]]) # group 预留参数 # target 进程启动时需要执行的可调用对象 # name 进程名称的描述性字符串 # args 传递给target的位置参数的元组 # kwargs 传递个target的关键字参数的字典 |
Process具有以下实例方法、属性:
方法/属性 | 说明 |
is_alive() | 如果进程仍在运行,返回True |
join([timeout]) | 等待该进程停止,最大timeout |
run() | 继承Process并改写run()方法,其效果与传递target一样 |
start() | 启动进程 |
terminate() | 强制终止进程,不会进行任何清理操作。如果进程创建了子进程,子进程变为僵尸进程;如果进程持有锁定则可能导致死锁 |
autokey | 进程的身份验证键。除非显式设定,这是由os.urandom()函数生成的32字符的字符串 |
daemon |
一个Boolean标志,指示进程是否是后台进程。当创建它的Python进程终止时,后台(Daemonic)进程将自动终止。另外,禁止后台进程创建自己的新进程。daemon的值必须在使用start()函数 启动进程之前进行设置 |
exitcode |
进程的整数退出代码。如果进程仍然在运行,它的值为None。如果值为负数,则-N表示进程由信号N所终止 |
name |
进程的名称 |
pid |
进程的整数ID |
简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import multiprocessing import time def clock(interval): while True: print "Current time is %s" % time.ctime() time.sleep(interval) if __name__ == '__main__': p = multiprocessing.Process( target=clock, args=(15,) ) p.start() #使用扩展子类的方式 class ClockProcess(multiprocessing.Process): def __init__(self,interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): while True: print "Current time is %s" % time.ctime() time.sleep(self.interval) |
multiprocessing模块支持进程间通信的两种主要形式:管道和队列,这两种方法都是使用消息传递实现的。
使用Queue([maxsize])可以创建队列,maxsize是队列中允许的最大项数,省略表示无穷大,具有以下实例方法:
方法 | 说明 |
cancel_join_thread() | 不在进程退出时自动join后台线程。可防止join_thread()方法阻塞 |
close() | 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get ()操作 上,关闭生产者中的队列不会导致get ()方法返回错误 |
empty() | 如果调用此方法时队列为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入了新的项目 |
full() | 如果队列已满,则返回True。由于线程的存在,结果也可能不可靠 |
get([block [, timeout]]) | 返回队列中的一个项。 如果队列为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True。 如果设置为False,可能引发Queue.Empty异常。timeout是可选超时时间,在阻塞模式中如果在指定的时间间隔内没有项变为可用,引发Queue.Empty异常 |
get_nowait() | 等价于get(False) |
join_thread() | 当前线程join队列的后台线程。用于在调用close()方法之后,等待所有队列项被消耗 |
put(item [, block [, timeout]]) | 将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常 |
put_nowait() | 等价于put(item, False) |
qsize() | 返回队列中目前项目的正确数量。此函数的结果并不可靠 |
使用JoinableQueue([maxsize])可以创建可连接的共享队列,允许消费者通知生产者消息项已经被成功处理,具有额外的方法:
方法 | 说明 |
task_done() | 消费者使用此方法发出信号,表示通过get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常 |
join() | 生产者使用此方法进行阻塞,直到队列中的所有项目均被处理。阻塞将持续到为队列中的每个项目均调用task_done()方法为止。 |
使用队列通信的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import multiprocessing def consumer(inputq): while True: item = inputq.get() if item == -1: break #特殊标记时终止 print item inputq.task_done() def produce(seq, outputq): for item in seq: outputq.put(item) if __name__ == '__main__': q = multiprocessing.JoinableQueue() cons_p = multiprocessing.Process(target=consumer, args=(q,)) cons_p.daemon = True cons_p.start() #这里可以继续创建多个消费者进程,但是每个消息只能被一个进程消费 seq = [0, 1, 2, 3, 4 , -1] #特殊标记,提示消费者流程结束 produce(seq, q) q.join() # 当前进程等待队列被消耗完毕 |
可以在进程之间创建一根管道:
1 2 3 |
#创建管道的函数:duplex表示是否双向通信,默认True #如果duplex=False,那么conn1只能接收,conn2只能发送 (conn1,conn2) = Pipe([duplex]) |
Pipe函数的返回值为Connection对象的元组,Connection对象具有以下属性和方法:
方法/属性 | 说明 |
close() | 关闭连接。如果connection被垃圾收集,将自动调用此方法 |
fileno() | 返回连接使用的整数文件描述符 |
poll([timeout]) | 如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout置为None,操作将无限期地等待数据到达 |
recv() | 接收send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常 |
recv_bytes([maxlength]) | 接收send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。 如果进入的消息超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常 |
recv_bytes_into(buffer [, offset]) | 接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(bytearry对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常 |
c.send(obi) | 通过连接发送对象。obj是与序列化兼容的任意对象 |
send_bytes (buffer [, offset [, size]]) | 通过连接发送字节数据缓冲区。buffer为支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,size是要发送字节数。结果数据以单条消息的形式发出,然后调用recv_bytes()进行接收 |
如果生产者或消费者中都没有使用管道的某个端点,就应将其关闭。如果忘记执行这越步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的, 必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果, 除非消费者也关闭了相同的管道端点。
使用管道的生产者——消费者的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import multiprocessing def consumer(pipe): outputp, inputp = pipe inputp.close() # 关闭消费者的输入通道 while True: try: item = outputp.recv() except EOFError: break print item print "Consumer done" def produce(seq, inputp): for item in seq: inputp.send(item) if __name__ == '__main__': (outputp, inputp) = multiprocessing.Pipe(True) cons_p = multiprocessing.Process(target=consumer, args=((outputp, inputp),)) cons_p.start() outputp.close() # 关闭生产者的输出通道 seq = [0, 1, 2, 3, 4] produce(seq, inputp) # 关闭输入通道 inputp.close() # 等待消费者进程关闭 cons_p.join() |
双向通信的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import multiprocessing def server(pipe): serverp, clientp = pipe clientp.close() #服务器不会使用管道的另一端 while True: try: x, y = serverp.recv() #接收请求消息 except EOFError: #如果客户端关闭了管道另一端 print "Pipe endpoint closed by peer" break result = x + y serverp.send(result) # 返回处理结果 print "Server done" if __name__ == '__main__': (serverp, clientp) = multiprocessing.Pipe(True) s = multiprocessing.Process(target=server, args=((serverp, clientp),)) s.start() #客户端逻辑部分 serverp.close() #客户端不会使用管道的另一端 clientp.send((1, 1)) #发送请求消息 print clientp.recv() #打印处理结果 clientp.send(('Hello',' World')) print clientp.recv() clientp.close() s.join() #等待子进程结束 |
Pool类可用于创建进程池,可以把各种数据处理任务提交给进程池处理:
1 2 3 4 5 |
#构造函数 Pool( [numprocess [, initializer [, initargs]]]) # numprocess 池中的进程数,省略则使用cpu_count()的值 # initializer 进程启动时执行的可调用对象,默认None # initargs 传递给initializer的参数 |
Pool的实例支持以下方法:
方法 | 说明 |
apply(func [, args [, kwargs]]) | 在池中一个工作进程中执行func(*args, **kwargs)并返回结果 |
apply_async(func [, args [, kwargs, [cb]]]) | 在池中一个工作进程中异步执行func(*args, **kwargs)并返回一个AsyncResult对象,该对象可用于稍后获取结果。cb为一可调用对象,当异步执行的结果可用时,会回调之 |
close() | 关闭进程池,防止进行进一步操作 |
terminate() | 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作,如果进程池北垃圾回收,该函数自动调用 |
join() | 等待所有工作进程退出。此方法只能在close()或terminate()方法之后调用 |
map(func, iterable [, chunksize]) | 将可调用对象func应用给iterable中的所有项目,然后以列表的形式返回结果。通过将iterable划分为多块并将工作分派给工作进程,可以并行地执行这项操作。chunksize为每块中的项目数 |
map_async(func, iterable [, chunkslze [, cb]]) | 同map()函数,但结果的返回是异步的。返回值是AsyncResult类的实例 |
imap(func, iterable [, chunksize]) | map()函数的版本之一,返回迭代器而非结果列表 |
imap_unordered (iterable [, chunksize ]) | 同imap()函数,但从工作进程接收结果时,返回结果的次序是任意的 |
AsyncResult的实例支持以下方法:
方法 | 说明 |
get([timeout]) | 返回结果,如果有必要则等待结果到达。timeout是可选的超时。如果结果在指定时间内没有到达,将引发TimeoutError异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发 |
ready() | 如果调用完成,返回True |
successful() | 如果调用完成且没有引发异常,返回True。如果在结果就绪之前调用此方法,将引发AssertionError异常 |
wait ([timeout]) | 等待结果变为可用。timeout是可选的超时 |
通常,进程之间彼此是完全孤立的,唯一的通信方式是队列或管道。可以使用两个对象来表示共享数据。这些对象使用了共享内存(通过mmap模块)使访问多个进程成为可能。可以创建以下类型的共享数据:
共享数据类型 | 说明 |
Value(typecode, arg1, ... argN, lock) |
在共享内存中创建ctypes对象,typecode可以是类似array模块的类型代码,或者ctypes模块的类型对象(例如ctypes.c_int)。参数arg1...argN传递给类型的构造函数,lock必须使用关键字参数调用,如果为True(默认)则会创建一个新的锁定来包含对值的访问,如果传入现有锁对象(Lock或者RLock),则该锁用于同步访问。 可以使用value属性访问底层的数值 |
RawValue (typecode, arg1, ... argN) | 同Value对象,但不存在锁定。 |
Array(typecode, initializer, lock) | 在共享内存中创建ctypes数组。typecode描述了数组的内容,意义与Value()函数中的相同。 initializer要么是设罝数组初始大小的整数,要么是项目序列,其值和大小用于初始化数组。lock 是只能使用关键字调用的参数,意义与Value()函数中相同。如果a是Array创建的共享数组的实例,可使用标准的Python索引、切片和迭代操作访问它的内容,其中每种操作均由锁定进行同步。对于字节字符串,a还具有value属性,可以把整个数组当作一个字符串进行访问 |
RawArray (typecode, initializer) | 同Array,但不存在锁定 |
可以使用以下功能与threading模块类似的同步原语:
原语 | 说明 |
Lock | 互斥锁 |
RLock | 可重入的互斥锁(同一进程可以多次获得它,同时不会造成阻塞) |
Semaphore | 信号量 |
BoundedSemaphore | 有边界的信号量 |
Event | 事件 |
Condition | 条件变量 |
结合信号量来把数组发送给另外一个进程的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import multiprocessing class FloatChannel(object): def __init__(self, maxsize): self.buffer = multiprocessing.RawArray('d', maxsize) self.buffer_len = multiprocessing.Value('i') self.empty = multiprocessing.Semaphore(1) self.full = multiprocessing.Semaphore(0) def send(self, values): # 等待缓冲区为空的信号 self.empty.acquire() n = len(values) self.buffer_len = n self.buffer[:n] = values # 设置缓冲区已满的标记 self.full.release() def recv(self): self.full.acquire() # 复制缓冲区中的值,注意可以使用切片语法 values = self.buffer[:self.buffer_len.value] # 设置缓冲区已空的标记 self.empty.release() return values def consumer(count , fc): for i in xrange(count): fc.recv() if __name__ == '__main__': fc = FloatChannel(100000) p = multiprocessing.Process(target=consumer, args=(1000, fc)) p.start() values = [float(x) for x in xrange(100000)] for i in xrange(1000): fc.send(values) print "Produce done" p.join() |
上一节的数据同步,只能使用共享值、数组等基本类型,如果需要使用Python对象则无能为力。Python提供了“管理器”这种独立的子进程,可以用于管理多个进程之间的共享对象。“管理器”作为服务器进程来运行,其他进程则通过代理访问共享对象,相当于客户端。
使用Manager()函数可以创建一个管理器,其返回位于multiprocessing.managers模块中的SyncManager对象
使用multiprocessing模块的程序可以与运行在同一台计算机上的其他进程或者位于远程系统 上的进程进行消息传递。可以使用Client来连接到其它进程:
1 2 3 4 5 6 |
#位于multiprocessing.connections子模块 connections.Client (address [, family [, authenticate [, authkey]]]) #address 代表网络地址的元组(hostname, port) #family 表示地址格式的字符串,省略则从address自动推断 #authenticate 是否使用摘要身份验证 #authkey 包含身份验证密钥的字符串,省略则使用current_process().authkey |
对方进程必须作为服务器,处于监听状态:
1 |
connections.Listener([address, [,family [, backlog[, authenticate [,authkey]]]]]) |
Listener的实例具有以下方法或属性:
方法/属性 | 说明 |
accept() | 接受一个新连接,并返回一个Connection对象。如果身份验证失败,将引发AuthenticationError |
address | 侦听器正在使用的地址 |
close() | 关闭侦听器正在使用的管道或套接字 |
last_accepted | 接受的最后一个客户端的地址 |
下面是一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
def server(): from multiprocessing.connection import Listener serv = Listener(('127.0.0.1', 15000), authkey="12345") while True: conn = serv.accept() while True: try : x, y = conn.recv() except EOFError: # 如果客户端断开连接 print "Connection closed by peer" break result = x + y conn.send(result) conn.close() if __name__ == '__main__': import multiprocessing s = multiprocessing.Process(target=server) s.start() from multiprocessing.connection import Client conn = Client(('127.0.0.1', 15000), authkey="12345") conn.send((1, 1)) print conn.recv() conn.close() |
函数 | 说明 |
active_children() | 列出所有活动子进程的Process对象 |
cpu_count() | 返回系统上的CPU数量,如果能够确定的话 |
current_prpcess() | 返回当前进程的Process对象 |
freeze_support() | 在使用各种打包工具(如py2exe)进行冻结的应用程序中,此函数应该作为主程序的首行。使用此函数可以防止与在冻结的应用程序中启动子进程相关的运行时错误 |
get_logger() | 返回与多进程处理模块相关的日志记录对象,如果它不存在则创建之。返回的记录器不会把消息传播给根记录器,级别logging.NOTSET,会将所有日志消息打印到标准错误上 |
set_executable(executable) | 设置用于执行子进程的Python可执行程序的名称。这个函数只定义在Windows上 |
该模块提供Thread类和各种同步原语,用于编写多线程的程序。
Thread类用于表本单独的控制线程。使用下面的函数可以创建一个新线程:
1 2 3 4 5 |
Thread (group=None, target=None, name=None, args=(), kwarga={}) # group 预留字段 # target 可调用对象,线程启动时,其run方法将调用该对象 # name 线程的名称 # args、kwargs 传递给target的参数 |
线程支持以下实例方法和属性:
方法/属性 | 说明 |
start() | 通过在一个单独的控制线程中调用run ()方法,启动线程。此方法只能调用一次 |
run() | 线程启动时将调用此方法。默认情况下,它将调用传递到构造函数中的目标函数。还可以在Thread 的子类中重新定义此方法 |
join([timeout]) | 等待直到线程终止或者出现超时为止。timeout是一个浮点数,用于指定以秒为单位的超时时间。 线程不能join自身,而且在线程启动之前就连接它将出现错误 |
is_alive() | 如果线程是活动的,返回True,否则返回False。从start()方法返回的那一刻开始,线程就是活动的,直到它的run()方法终止为止 |
name | 线程名称 |
ident | 整数线程标识符。如果线程尚未启动,它的值为None |
daemon | 线程的布尔型后台标志。必须在调用start ()方法之前设置这个标志,它的初始值从创建线程的后台状态继承而来。当不存在任何活动的非后台线程时,整个Python程序将退出。所有程序都有一个主线程,代表初始的控制线程,它不是后台线程 |
下面是一个使用线程的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import threading import time class ClockThread(threading.Thread): #除了__init__、run以外,改写别的方法 def __init__(self, interval): threading.Thread.__init__(self) #子类如果扩展__init__,必须调用Thread的构造函数 self.daemon = True self.interval = interval def run(self): while True: print "Current time is %s" % time.ctime() time.sleep(self.interval) if __name__ == '__main__': t = ClockThread(15) t.start() time.sleep(150) |
用于延迟执行某一函数:
1 2 3 4 |
Timer(interval, func [, args [, kwargs]]) #interval 延迟执行的秒数 #func 需要执行的函数 #args、kwargs 函数参数 |
Timer具有以下实例方法:
方法 | 说明 |
start() | 启动定时器 |
cancel() | 如果函数尚未执行,可用于取消定时器 |
锁定是一个同步原语,状态是“已锁定”或“未锁定”之一。两个方法acquire() 和release()用于修改锁定的状态。如果状态为已锁定,尝试获取锁定将被阻塞,直到锁定被释放为 止。如果有多个线程等待获取锁定,当锁定被释放时,只有一个线程能获得它。等待线程获得锁定的顺序没有定义
使用构造函数Lock()可以创建锁的实例,初始状态为未锁定。具有以下实例方法:
方法 | 说明 |
acquire([blocking]) | 获取锁定,如果有必要,需要阻塞到锁定释放为止。如果提供参数blocking并将它设为False,当无法获取锁定时将立即返回False,否则立即返回True |
release () | 释放一个锁定。当锁定处于未锁定状态时,或者从与原本调用acquire()方法的线程不同的线程调用此方法,将出现错误 |
使用诸如Lock、RLock或Semphore之类的锁定原语时,必须多加小心。锁定的错误管理经常导致死锁或者竞态条件。可以使用下面代码中提到的两种方式保证锁的正确释放:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import threading lock = threading.Lock() #方法一:使用try-finally语句 try: lock.acquire() #执行同步操作 finally: lock.release() #方法二:使用上下文管理协议 #进入时自动获取锁定,离开时自动释放锁定 with lock: #执行同步操作 pass |
此外,应当尽量避免同时获取多个锁定,容易导致死锁。
可重入锁定是一个类似于Lock同步原语,但同一个线程可以多次获取它。这允许拥有锁定的线程执行嵌套的acquire()和release()操作。在这种情况下,只有最外面的release()操作才能将锁定重置为未锁定状态。进行若干次lock()后,必须在进行对应次数的release()才能彻底释放锁。
信号量是一个基于计数器的同步原语,每次调用acquire()方法时此计数器减1,每次调用 release()方法时此计数器加1。如果计数器为0, acquire ()方法将会阻塞,直到其他线程调用release ()方法为止。
可以使用Semaphore([value])来创建信号量,其中value为计数器的初始值,默认为1。信号量包含以下实例方法:
方法 | 说明 |
acquire([blocking]) | 获取信号量。如果进入时内部计数器大于0,此方法将把它的值减1,然后立即返回。如果它的值为0,此方法将阻塞,直到另一个线程调用release()方法。blocking参数的行为与Lock类似 |
release () | 通过将内部计数器的值加1来释放一个信号量。如果计数器为0,而且另一个线程正在等待,该线程将被唤醒。如果有多个线程正在等待,只能从它的acquire()调用返回其中一个。线程释放的顺序并不确定。 |
BoundedSemaphore的行为与Semaphore相同,但是限制但release()操作的次数不能超过acquire()的次数。
事件用于在线程之间通信。一个线程发出“事件”信号,一个或多个其他线程等待它,具有以下实例方法:
方法 | 说明 |
is_set() | 只有当内部标志为True时才返回True |
set () | 将内部标志置为True。等待它变为True的所有线程都将被唤醒 |
clear() | 将内部知志重置为False |
wait( [timeout] ) | 阻塞直到内部标志为True。如果进入时内部标志为True,此方法将立即返回。否则,它将阻塞直到另个线程调用set()方法将志置为True,或者直到出现可选的超时。timeout是个浮点数,用于指定以秒为单位的超时期限 |
条件变量是构建在另一个锁定上的同步原语,当需要线程关注特定的状态变化或事件的发生时将使用这个锁定。典型的用法是生产者-消费者场景,其中一个线程生产的数据供另一个线程使用。使用构造函数Condition([lock])可以创建条件变量,其中lock是Lock或者RLock的实例,如果不提供则创建新的锁供食用。Condition具有以下实例方法:
方法 | 说明 |
acquire (*args) |
获取底层锁定。此方法将调用底层锁定上对应的acquire方法 |
release () | 释放底层锁定。此方法将调用底层锁定上对应的release()方法 |
wait( [timeout]) | 等待直到获得通知或出现超时为止。此方法在调用线程已经获取锁定之后调用。调用时,将释放底层锁定,而且线程将进入睡眠状态,直到另一个线程在条件变量上执行notify()或notifyAll() 方法将其唤醒为止。在线程被唤醒之后,线程将重新获取锁定,方法也会返回,timeout是浮点数, 单位为秒。如果这段时限耗尽,线程将被唤醒,锁定将被重新获取 |
notify([n]) | 唤醒一个或多个等待此条件变量的线程。此方法只会在调用线程已经获取锁定之后调用,而且如果没有正在等待的线程,它就什么也不做。n指定要唤醒的线程数量,默认为1。被唤醒的线程在它们 新获取锁定之前不会从wait ()调用返回 |
notify_all() | 唤醒所有等待此条件的线程 |
条件变量的简单例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import threading c = threading.Condition() def producer(): while True: c.acquire() produce_item() c.notify() c.release() #使用条件变量时需要注意的是,如果存在多个线程等待同一个条件,notify()操作可能唤醒它们 #中的一个或多个(这种行为通常取决于底层的操作系统)。因此,始终有这样的可能:某个线程被 #唤醒后,其等待的条件已经不满足,因此可能需要循环继续等待下一个信号 def consumer(): while True: c.acquire() while not item_is_available(): #释放锁定,生产者可以获取并生产物品 c.wait() c.release() consume_item() |
线程没有任何方法可用于强制终止或挂起。这是设计上的原因,因为如果某个线程已经获取了锁定,在它能够释放锁定之前强制终止或挂起它,将导致整个应用程序出现死锁。此外,终止时一般不能简单地“释放锁定”,因为复杂的线程同步经常涉及锁定和解除锁定操作,而这些操作在执行时的次序要十分精确。 如果要为终止或挂起提供支持,需要自己构建这些功能。一般的做法是在循环中运行线程,这个循环的作用是定期检査线程状态以决定它是否应该终止
函数 | 说明 |
active_count() |
返回当前活动的Thread对象数量 |
current_thread() | 返回对应于调用者的控制线程的Thread对象 |
enumerate() | 列出当前所有活动的Thread对象 |
local() | 返回local对象,用于保存线程本地的数据,应该保证此对象在每个线程中是唯一的 |
setprofile(func) | 设置一个配置文件函数,用于已创建的所有线程。func在每个线程开始运行之前被传递给 sys.setprofile()函数 |
settrace(func) | 设置一个跟踪函数,用于已创建的所有线程。func在每个线程开始运行之前被传递给sys.settrace()函数 |
stack_size([size]) | 返回创建新线程时使用的栈大小。size 的值可以是32768 (32KB)或更大,而且是4096 (4KB)的倍数,这样可移植性更好 |
Python解释器被一个锁定保护,该锁定只允许一次执行一个线程,即便存在多个可用的处理器。 在计算密集型程序中,这严重限制了线程的作用。事实上,在计算密集型程序中使用线程,经常比仅仅按照顺序执行同样的工作慢得多。因此,实际上应该只在主要关注I/O的程序,如网络服务器中使用线程。对于计算密集程度更高的任务,最好使用C扩展模块或multiprocessing模块来代替。C扩展具有释放解释器锁定和并行运行的选项,可以做到当释放锁定时不与解释器进行交互。 multiprocessing模块将工作分派给不受锁定限制的单独子进程
queue模块(在Python 2中叫Queue)实现了各种多生产者——多消费者队列,可用于在执行的多个线程之间安全地交换信息。 该模块定义了3种不同的队列类:
队列类型 | 说明 |
Queue([maxsize]) | 创建一个FIFO队列。maxsize是队列中可以放入的项目的最大数量。如果省略maxsize参数或将它置为0,队列大小无限制 |
ListQueue([maxsize]) | 创建一个LIFO队列(栈) |
PriorityQueue([maxsize]) | 创建一个优先级队列,其中项目按照优先级从低到髙依次排好。使用这种队列时,项目应该是 (priority, data)形式的元组,其中priority是一个数字 |
队列具有以下实例方法:
方法 | 说明 |
qsize() |
返回队列的正确大小。因为其他线程可能正在更新队列,此方法返回的数字不完全可靠
|
empty() | 如果队列为空,返回True,否则返回False |
full() | 如果队列已满,返回True,否则返回False |
put (item [,block [, timeout]]) | 将item放入队列。如果可选参数block为True (默认值),调用者将被阻塞直到队列中出现可用的空闲位置为止。否则队列满时将引发Full异常。timeout提供可选的超时值, 单位为秒。如果出现超时,将引发Full异常 |
put_nowait (item) | 等价于g.put (item, False) |
get ([block [, timeout]]) | 从队列中删除一项,然后返回这个项目。如果可选参数block为True (默认值),调用者将阻塞, 直到队列中出现可用的空闲位置。否则队列为空时将引发Empty异常,timeout提供可选的超时值,单位为秒。如果出现超时,将引发Empty异常。 |
get_nowait () | 等价于get (0)方法 |
task_done() | 队列中数据的消费者用来指示对于项目的处理已经结束。如果使用此方法,那么从队列中删除的每一项都应该调用一次 |
join() | 阻塞直到队列中的所有项目均被删除和处理为止。一旦为队列中的每一项都调用了一次 task_done ()方法,此方法将会直接返回 |
使用队列一般可以简化多线程的程序。例如,可以使用共享队列将线程连接在一起,而不必依赖于必须由锁定保护的共享状态。在这种模型中,工作者线程一般充当数据的消费者。下面是一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import threading from queue import Queue class WorkerThread(threading.Thread): def __init__(self,*args,**kwargs): threading.Thread.__init__(self, *args, **kwargs) self.input_queue = Queue() def send(self,item): self.input_queue.put(item) def close(self): self.input_queue.put(None) self.input_queue.join() def run(self): while True: item = self.input_queue.get() if item is None: break print item #处理条目 self.input_queue.task_done() self.input_queue.task_done() return #使用示例 w = WorkerThread() w.start() w.send('Hello') #把条目发送给工作线程处理 w.close() |
上面的例子几乎完全等同于协程。如果要执行的工作不涉及任何阻塞操作,可以将run()方法重新实现为协程,这就省却了使用线程的麻烦。后一种方法的运行速度可能更快,因为节省了线程上下文切换带来的开销
在某些类型的应用程序中,可以使用一个任务调度器和一些生成器或协程实现协作式用户空间多线程,这有时称为微线程。这种技术的一种常见用法是在需要管理大量的已打开文件或套接字的程序中,例如一台需要同时管理 1000个客户端连接的网络服务器。此时的解决方案是联合使用异步I/O或轮询(使用select模块)与处理I/O事件的任务调度器,而不是创建1000个线程,因为过多的线程会导致大量系统资源浪费在上下文切换上。 这种编程技术的基础概念是这样产生的:生成器或协程函数中的yield语句挂起函数的执行,直到稍后使用next () 或者send()操作进行恢复为止。这样就可以使用一个调度器循环在一组生成器函数之间协作多个任务。
基于协程的并发编程的例子:基于协程技术实现的异步Echo服务
讲得很详细,很受用,多谢
谢谢,很久以前的笔记了,质量不高,不好意思~~