Java线程与并发编程
线程的本质是机器指令的执行路径,它完全是多任务操作系统之上的虚拟概念。从硬件角度来看,每一个CPU只有一条执行路径,成百上千的线程同时运行,只是操作系统调度形成的假象。
每一个计算机程序,或者说进程,至少有一个线程,称为主线程。Java程序的主线程叫做main thread,它以某个Class的main()方法为执行路径的起始点。任何计算机程序都是以单一线程开始运行的。
术语 | 说明 |
屏障(Barrier) | 屏障代表多个线程的集合点,所有线程必须都到达该屏障后,它们才能继续运行下去,因此先到达的线程会陷入等待 |
条件变量(Condition Variable) |
条件变量是和某一个锁(Lock)关联的变量,通常用于同步环境中,实现等待-唤醒机制。线程可以在拥有锁的前提下,等待该锁的某个条件变量;或者在拥有锁的前提下,唤醒一个或者全部正在等待某个条件变量的线程 条件变量也称为事件变量(Event Variable) |
临界区域(Critical Section) | 代表一个需要同步化的方法或者代码块,所有线程必须串行的经过临界区域。本质上就是一个隐式的锁获取和释放的区域 |
锁(Lock) |
用来表示对进入临界区的特定线程授予的访问权 读写锁(Read/Write Lock)可以允许多个线程同时取得,只要它们都同意只进行“读”操作 |
监视器(Monitor) | 该术语在不同的线程系统中含义有所不同,可能代指Lock,或者代指等待-唤醒机制 |
互斥(Mutex) | 和Lock类似,但是往往是跨进程的、基于操作系统级别的 |
信号量(Semaphore) | 线程可以等待一个或者多个信号量的计数,另外一个线程则可以释放一个或者多个信号量的计数,当前一种线程获得足够的计数后,停止等待 |
Java允许两种创建新线程的风格:
- 继承java.lang.Thread,并覆盖run()方法
- 实现java.lang.Runnable接口,并将其传递给Thread类的构造器
Thread对象本质上并不代表线程本身,而是一组与线程有关的方法和数据封装。
Thread的构造函数包括以下参数:
参数 | 说明 |
String name | 新线程的名称,用于显示信息使用,没有其他特殊意义。默认命名为Thread-N,N为唯一的数字 |
Runnable target | 新线程需要执行的指令列表,位于该接口的run()方法中 |
ThreadGroup group | 新线程加入的线程组,默认与调用新线程构造器的那个线程的线程组一致 |
long stackSize | 新线程执行方法时,存放临时变量的栈的大小 |
生命周期阶段 | 相关方法 | 说明 |
创建 | new Thread() |
在Java中,线程也是对象,因此它的创建是通过调用构造器完成的,上面的表格详细的介绍了线程构造器的参数。 线程被构造后即存在,但是还没有执行任何代码。在此时,其他线程就可以和新线程进行交互:例如设置优先级、名称、是否守护线程 |
启动 | start() |
调用start()方法后,JVM执行一些内部管理工作,然后调用thread的run()方法。 start()方法会立即返回,返回后JVM中多了一个线程,这个新线程将处于活动状态isAlive()==true,直到线程终结之前,alive一直为true |
终结 | stop() |
当run()方法执行完毕后——遇到return语句、执行到最后一行、或者抛出异常——线程就会自然终结 stop()方法不能用来强制终结线程,它存在固有的缺陷,已经被废弃不用 |
暂停 |
Thread.sleep() suspend() |
调用Thread.sleep()静态方法,可以让当前线程暂停一段时间,之后自动恢复执行。线程暂停的时间分辨率并不是很高,一般最高精确到几毫秒。该方法不会导致线程放弃持有的监视器 suspend()方法设计用来暂停其他线程,但是存在与stop()一样的缺陷 线程可以调用任意对象的wait()方法,导致它自己陷入等待中,前提是线程持有该对象的监视器,直到被中断或者该对象的notify()、notifyAll()被调用之前,线程会保持暂停,并且放弃持有的监视器 调用一个线程的join()方法,会导致当前线程暂停,直到目标线程终结。如果目标线程已经终结,那么该调用会立即返回 |
恢复 |
Thread.sleep() resume() |
resume()方法设计用来恢复暂停的线程,同样存在固有缺陷 线程调用任意对象的notify()、notifyAll()方法,会导致某个处于等待中的线程被唤醒,唤醒后的线程会立即尝试获取监视器 |
清理 | 线程终结后,其Java对象仍然可以被访问,可以附带一些有价值的信息。如果Thread对象脱离作用范围,会被GC回收,回收可能连带着系统资源的清理 |
方式 | 相关方法 | 说明 | ||||
设置标记位 |
当前线程以实例变量的形式提供一个标记位,允许其他线程改变这个标记位 当前线程会在一个循环中执行,定期检测标记位,如果标记位被设置,则return |
|||||
中断 | interrupt() |
调用该方法,可以让任何处于阻塞(Blocking)方法调用中的线程获得退出的机会。该方法有两个效应:
阻塞方法包括:join、Thread.sleep()、Object.wait()、I/O的读取方法等,注意I/O读取类阻塞方法不一定会在中断时抛出异常,具体看本单元格最后一段。下面是中断方法的使用样例代码: 基于中断标记判断(针对处于非阻塞状态的线程):
利用异常判断(针对处于阻塞状态的线程):
关于interrupt()方法,还需要注意:
|
竞态条件(Race Condition),是指计算的输出依赖于不受控制的事件的出现顺序或者出现时机,具体来理解此定义中的关键词:
- 计算的输出:比如方法调用的结果
- 不受控制:比如线程的调度是不受控制的,具有随机性
- 顺序或时机:比如哪个线程先执行,执行到哪一行代码并OS切换出去,哪个线程被切换进来获得CPU并执行
下面举个浅显的例子,假设有这样一个计算器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public class Calculator { private final int[] input; private int start; private int end; public Calculator( int start, int end ) { super(); this.input = new int[] { start, end }; reset(); } private void reset() { start = input[0]; end = input[1]; } public int add() { int res = 0; for ( ; start <= end; start++ ) res += start; reset(); return res; } } |
我们期望它每次计算都能输出正确的结果,下面有两个线程连续调用计算器100次:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
private static class CalculateThread extends Thread { private Calculator calculator; public CalculateThread( Calculator calculator ) { super(); this.calculator = calculator; } public void run() { for ( int i = 0; i < 1000; i++ ) System.out.println( calculator.add() ); } } public static void main( String[] args ) { Calculator calculator = new Calculator( 0, 100 ); new CalculateThread( calculator ).start(); //线程1 new CalculateThread( calculator ).start(); //线程2 } |
观察计算结果,可以看到,大部分的调用能够得到正确的和5050,但是会有一些随机的值夹杂其中,例如0、4189、1766…… 而且每次运行的输出结果都不一样。这就是典型的竞态条件场景了,线程1在调用add()方法时,随时可能因为OS的线程调度而暂停运行,这个时候calculator处于一种不一致的中间状态,此时线程2来发起/继续它的add()调用,calculator的两个状态字段的值是不确定的、随机的,因而计算结果必然不可靠。
之所以大部分结果是正确的,是因为CPU分配给线程的时间片相对于add()中的循环来说,是非常漫长的,因此大部分情况下线程能够不受干扰的一次执行完这个方法。
即使是一些非常简单的代码,也可能引入竞态条件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public class IntCounterTest { private static int counter = 0; private static class CounterThread extends Thread { public void run() { for (int i = 0; i < 10000000; i++) synchronized (IntCounterTest.class) { counter++; } } } public static void main(String[] args) throws InterruptedException { CounterThread t1 = new CounterThread(); t1.start(); CounterThread t2 = new CounterThread(); t2.start(); t1.join(); t2.join(); System.out.println(counter); } } |
上面的例子中,我们让t1、t2两个线程分别把计数器的值增加1000万,期望结果应该是2000万,但是几乎每次运行结果都不正确。 什么原因呢?因为++操作符至少包含三条指令:
- 读取变量的当前值到寄存器
- 将此值加一
- 将新值写回到变量
如果线程t1在执行这三条指令期间受到干扰,例如:
- 执行完第二条指令后,OS调度导致t1让出CPU,由t2执行
- 多核CPU场景下,t1执行完第二指令后,t2连续执行三条指令
就必然导致数据状态的不一致。
出现竞态条件的根本原因是一项操作(例如一个方法调用、或者一个操作符)不是原子的(atomic),物理学曾经认为原子是不可再分的,我们借用这一概念,用来描述一个不可中断的操作——要么不做、要么完成。
要让一个Java的方法变为原子的,那么就需要线程执行这个方法所有指令的整个过程不被干扰,即不会有其他线程来修改共享资源的状态。达到这一目标最直接的方法就是使用互斥锁(Mutex Lock)机制,该机制可以保证同一时刻只有一个线程对关键的、修改共享资源状态的代码段具有访问权。Java在语言级别上支持互斥锁机制——synchronized关键字,我们可以这样对上例的代码进行改写:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
private static class CounterThread extends Thread { public void run() { for ( int i = 0; i < 10000000; i++ ) //限制同一时刻只有一个人能访问对象IntCounterTest.class //由于这个类对象是全局唯一的,因此同一时刻只有一个线程能对counter进行递增 synchronized ( IntCounterTest.class ) { counter++; } //在同步块的边界,所有寄存器变量的值被视为无效 } } |
这样再执行,结果就总是2000万了。不过使用synchronized的代价是高昂的,在我机器上测试,修改前后代码运行耗时相差超过100倍。Java5提供的AtomicInteger性能相对较好(仍然有10倍性能差距),可以用来代替上例的int。
等待(阻塞)在synchronized块入口的线程,和sleep的线程、wait的线程,从Linux系统的角度来说,都处于S(可中断睡眠)状态。
在Java中,除了long、double以外的变量(包括引用)的基本加载、存储是原子操作,因此这些变量的读、写操作不会存在中间状态。即便如此,没有任何保护错误的共享变量仍然是不安全的,因为Java的内存模型允许线程持有共享变量的本地内存(Local Memory,往往是寄存器)拷贝,这就意味着, 一个线程修改了某个共享变量后,其它线程可能不会立刻发现。解决这一问题的方法有两种:
- 禁止直接访问变量,使用synchronized保护的getter/setter
- 声明变量为volatile
第二种方法更加优雅,volatile可以理解为对单个变量的读写操作进行了同步(JSR-133),volatile关键字的作用包括:
- 确保线程每次读取变量时,都去主存读取
- 确保线程每次写入变量时,都写出到主存
- Happens-Before原则:对volatile字段的写操作,必然发生在后续读操作之前
volatile关键字可以用在各种基本类型的变量上,甚至是数组或者对象,但是对于后两者,其volatile语义是作用在引用(指针)上。合理使用volatile可以避免不必要的同步操作,从而提高应用程序性能。
Java5的concurrent包引入接口: java.util.concurrent.locks.Lock ,其功能与synchronized关键字类似,在代码开始处调用lock,结束处调用unlock,就可以有效的实现同步化。下面是该接口的标准用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
public class MainClass { private Lock lock; private Object shared; public void operation() { lock.lock(); //获得锁,如果无法得到,线程将进入不可中断的等待状态 try { operate( shared ); //操作共享资源 } finally { lock.unlock(); //释放锁 } } } |
Lock接口提供了比synchronized更加细致的控制方式,它提供以下方法:
方法 | 说明 |
lock() | 获取锁,如果无法得到,线程被禁止调度(通过调用unsafe.park实现),直到获得锁 |
unlock() | 释放锁 |
trylock() | 尝试获取锁,如果无法得到,将返回false而不是阻塞,可以提供一个返回false前等待的延时 |
newCondition() | 创建一个与此锁关联的Condition对象,Condition可以代替Object.wait/notify/notifyAll |
Lock最常用的实现是: ReentrantLock ,即可重入锁:如果当前线程已经持有锁L,那么调用另外一个需要该锁的方法时,当前线程不需要释放锁而重新获取,而是简单的对锁的计数++,最退出被调用方法时,则对计数--,这与synchronized关键字的语义是一致的。不是所有Lock的实现都需要支持可重入语义。ReentrantLock提供以下重要方法:
方法 | 说明 |
getHoldCount() | 返回当前线程对Lock计数的数量,返回0代表当前线程没有持有锁 |
isLocked() | 该锁是不是被某个线程持有 |
isHeldByCurrentThread() | 锁是否被当前线程持有 |
getQueueLength() | 得到获取此锁的线程的数量 |
所谓公平锁是指相对公平的响应锁请求的线程,避免发生线程饥饿现象(一直等待而无法得到锁)。
new ReentrantLock(true) 可以构建公平锁,它基本上以先进先出的顺序来服务锁的请求者。开发者可以实现锁类,来实现其它公平策略。
JVM可以在不影响语义的情况下,对执行指令的顺序进行重新排列。这种重排是不考虑并发效应的:
1 2 3 4 5 6 7 8 9 10 |
private int i; private int j; public void set() { //下面两个语句的顺序JVM可以重排 i = 0; j = 1; //该语句可以重排到该方法的任何地方 if ( i == 0 ) //i = 0的指令必须在这一句前面 i = 1; } |
这种指令重排机制可能导致多线程程序的运行结果和预期不符,因此不要尝试依靠代码顺序来避免同步,典型的反例是双重检查惯例。synchronized机制可以有效的避免指令重排效应。
在JSR-133中, volatile的语义被增强,禁止了volatile变量与普通变量之间重排序。因此Java5之后,volatile与synchronized具有类似的语义——针对单个变量读写的锁。
Java5的concurrent包引入了若干Atomic类,这些类支持以原子的方式进行操作,可以避免同步、提高性能。
Atomic家族在内部实现上使用了一种乐观同步机制:当前线程假设没有其它线程修改共享变量,并计算变量的新值,在尝试更新变量时检查是否有其它线程对变量进行了修改,如果被修改,则使用最新的值重新执行前面的步骤。
不管是Java还是数据库,死锁的发生都有一个前提,那就是消费者在独占(不释放)某些资源的同时,又请求其它资源的独占,消费者在Java中就是线程,资源就是对象监视器或者锁。
一个经典的死锁例子是“哲学家进餐问题”,该问题的描述如下:
- N位哲学家坐在圆桌上,它们只做两件事情,思考和吃饭
- 圆桌中间放着一大盘米饭,每位哲学家左右两边各放一只筷子,共计N只筷子
- 哲学家必须先拿起左边的筷子,然后再拿起右边的筷子,才能把米饭盛到自己的碗里
由于哲学家们从不交流,就可能导致吃不上饭的问题:每位哲学家都拿着左手的筷子,并等着右手的筷子。
在这个场景中,筷子就是共享资源,每位哲学家都会尝试在得到一个资源的同同时,请求另外一个资源。致命的是,这些资源请求形成了一个环路,导致任何一个资源都永远无法释放。
下面是哲学家问题的Java代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang3.RandomUtils; public class PhilosopherProblem { private static void printf( String fmt, Object... args ) { System.out.println( Thread.currentThread().getName() + " " + String.format( fmt, args ) ); } private static class Chopstick extends ReentrantLock { private static final long serialVersionUID = 1L; private int id; public Chopstick( int id ) { super(); this.id = id; } @Override public void lock() { super.lock(); printf( "got chopstick %s", id ); } @Override public void unlock() { super.unlock(); printf( "released chopstick %s", id ); } } private static class Philosopher extends Thread { private Chopstick left; private Chopstick right; public Philosopher( String name, Chopstick left, Chopstick right ) { super(); this.left = left; this.right = right; setName( name ); } @Override public void run() { for ( ;; ) { try { meditate(); eat(); } catch ( InterruptedException e ) { e.printStackTrace(); } } } public void meditate() throws InterruptedException { int ms = RandomUtils.nextInt( 1000, 10000 ); TimeUnit.MILLISECONDS.sleep( ms ); printf( "meditated for %d ms", ms ); } public void eat() throws InterruptedException { try { left.lock(); try { right.lock(); int ms = RandomUtils.nextInt( 1000, 10000 ); TimeUnit.MILLISECONDS.sleep( ms ); printf( "ate for %d ms", ms ); } finally { right.unlock(); } } finally { left.lock(); } } } public static void main( String[] args ) { Chopstick cs0 = new Chopstick( 0 ); Chopstick cs1 = new Chopstick( 1 ); Chopstick cs2 = new Chopstick( 2 ); Chopstick cs3 = new Chopstick( 3 ); Chopstick cs4 = new Chopstick( 4 ); new Philosopher( "Socrates", cs0, cs1 ).start(); new Philosopher( "Plato", cs1, cs2 ).start(); new Philosopher( "Aristotle", cs2, cs3 ).start(); new Philosopher( "Thales", cs3, cs4 ).start(); new Philosopher( "Pythagoras", cs4, cs0 ).start(); } } |
程序运行起来后,很快即陷入死锁,不再有任何输出。 很多工具(TPTP、JProfiler、JMC)可以协助识别死锁,帮助改进程序。例如JMC检测上述程序死锁的输出如下:
- 当持有一个锁的时候,线程应当避免再去获得其它锁
- 确保相关的锁总是以一致的顺序被获得
- 总是在 finally 块中调用 unlock()
- 使用时限锁: tryLock() ,在指定时间内没有获得锁则退出,而不是一直等待
每个Java对象都有一个伴随的监视器(Monitor)对象,使用监视器可以实现锁定。同样的每个对象也有一种机制让它称为等待区,这种机制的想法其实很单纯:
- 某个线程等待一个特定条件的出现
- 其它线程可以为它创建这个条件
- 当其它线程创建条件时,会通知正在等待该条件的线程
该等待-通知机制是通过定义在 java.lang.Object 中的方法来实现的:
方法 | 说明 |
wait() |
当前线程调用object.wait()等待条件的发生,当前线程必须持有该对象的监视器,例如处于 synchronized(object){} 块中,调用该方法导致当前线程放弃监视器(锁定) 可以指定一个可选的timeout,在超时后,该调用自动返回 该方法返回时,线程必须获取监视器 |
notify() | 通常正在等待的某个线程,事件已经发生,当前线程必须持有该对象的监视器 |
notifyAll() | 通知所有正在等待的线程 |
表格里面已经提到,等待通知机制中的方法都需要持有目标对象的监视器,这是因为该机制本身存在竞态条件问题,因而必须结合锁一起使用。等待-通知机制的竞态条件场景举例如下:
- Thread1测试条件(通常是一个共享的变量),确认它需要等待
- Thread2设置条件
- Thread2调用notify(),试图唤醒Thread1,但是这不会成功,因为Thread1尚未等待
- Thread1调用wait()方法进入等待
等待通知机制不会在调用wait()与释放锁之间、返回wait()与重新获得锁之间发生竞态条件。
对于wait()的调用一般都位于循环结构中,另外需要注意:
- 线程应该总是在持有锁时、在wait()之前测试条件不满足,然后调用wait()等待
- 线程应该总是从wait()返回后,判断条件是否满足,还是需要继续等待。因为可能有其它线程也被唤醒,并且它改变了条件(即使被唤醒,也不代表条件满足)
下面是一个简单的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
public class WaitAndNotifyMechanism { private static boolean condition; private static Object lock = new Object(); private static class Consumer extends Thread { @Override public void run() { for ( ;; ) { try { //这里是一个全局锁,因此锁定哪个对象不重要,只要对象是全局唯一的 //甚至可以把WaitAndNotifyMechanism.lock作为锁 synchronized ( WaitAndNotifyMechanism.class ) { //检测条件时必须锁定 if ( condition ) { condition = false; //消费 System.out.println( currentThread().getName() ); } WaitAndNotifyMechanism.class.wait(); //但是必须调用作为锁的对象的wait()方法 } } catch ( InterruptedException e ) { } } } } public static void main( String[] args ) throws InterruptedException { //两个消费者 new Consumer().start(); new Consumer().start(); //主线程扮演的生产者 for ( ;; ) { TimeUnit.SECONDS.sleep( RandomUtils.nextInt( 1, 6 ) ); //修改条件时必须锁定 synchronized ( WaitAndNotifyMechanism.class ) { if ( !condition ) { condition = true; //生产 WaitAndNotifyMechanism.class.notify(); //唤醒一个等待的线程 } } } } } |
条件变量是有某个锁关联的变量,它是很多线程系统使用的同步模型,它与Java的等待-唤醒机制很类似。POSIX条件变量的四个基本函数——wait()、timed_wait()、signal()、broadcast()与Java中Object的方法一一对应,它们在逻辑上处理上也是一致的。条件变量的wait()、signal()操作需要当前线程持有互斥锁, 从wait()调用返回时需要重新得到互斥锁。
但是Java的等待-唤醒机制更加易用,它相当于把条件变量与互斥锁整合在了一起,因而在synchronized代码块中调用被同步对象的wait()、notify()方法是很自然的操作方式。
Java5引入的Lock接口,支持创建条件变量,Lock与Condition是绑定的,就像既有的等待-唤醒机制一样。这两个接口提供的灵活性就像其它线程系统的条件变量一样。下面是一个来自JDK的例子,我们可以看一下条件变量的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
/** * 一个有界的缓冲区 * @param <T> 缓冲区元素类型 */ class BoundedBuffer<T> { //互斥锁对象 private final Lock lock = new ReentrantLock(); //从互斥锁创建两个条件变量,条件变量和锁是绑定的 private final Condition notFull = lock.newCondition(); //消费者通知生产者用 private final Condition notEmpty = lock.newCondition(); //生产者通知消费者用 private int putptr, takeptr, count; private T[] items; @SuppressWarnings ( "unchecked" ) public BoundedBuffer( int capacity ) { items = (T[]) new Object[capacity]; } //由消费者线程调用 public void put( T x ) throws InterruptedException { //修改共享数据,因此首先需要互斥锁定 lock.lock(); try { while ( count == items.length ) //如果缓冲区目前是满的,放弃锁定并等待 //与既有的等待-唤醒机制一样,在唤醒后,需要重新获得锁 notFull.await(); items[putptr] = x; if ( ++putptr == items.length ) putptr = 0; ++count; //唤醒正在等待缓冲非空信号的消费者线程 notEmpty.signal(); } finally { lock.unlock(); //总是解除锁定 } } //由生产者线程调用 public Object take() throws InterruptedException { lock.lock(); try { while ( count == 0 ) notEmpty.await(); //如果当前缓冲区为空,等待 T x = items[takeptr]; if ( ++takeptr == items.length ) takeptr = 0; --count; notFull.signal(); //唤醒等待缓冲区非满的生产者线程 return x; } finally { lock.unlock(); } } } |
条件变量比起既有的等待-唤醒机制,其优势在于:
- 使用Lock时,Condition是必要的,因为Lock的wait/notify方法已经被覆盖,用来实现Lock对象的功能
- 每个Lock可以创建多个对应的Condition,通过给Condition变量进行合理命名,可以使代码的语义更加直白
从JDK1.2开始,ThreadLocal类被引入。该类用于存放线程本地(Local,局部)变量。这种变量与普通的共享变量不同,它为每个线程设置不同的副本,不同线程之间互不干扰。
ThreadLocal往往添加 private static 限定符,并通过 static getter() 暴露访问:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
class SharedObject{ //普通变量,会被多个线程共享,存在并发问题 private static Map<String,String> normalVariables; //线程本地变量,每个线程都有自己的副本,不存在并发问题 private static ThreadLocal<Map<String,String>> variables = new ThreadLocal<Map<String,String>>(){ //可以覆盖此方法,为每个线程的副本提供初值 protected Map<String,String> initialValue() { return new HashMap<String,String>(); }; }; public static ThreadLocal<Map<String,String>> getVariables() { return variables; } } |
调用ThreadLocal的set/get方法,可以设置、读取本地变量的值,在JDK1.5之后,ThreadLocal类提供了泛型支持。
线程本地变量常被用来避免方法调用的参数传递,理论上将,可以使用线程本地变量完全取代方法参数,而保持等同的应用逻辑。
除非存在外部引用,否则线程本地变量将在线程终结时自动回收。
在Java中,信号量基本上可以认为是带有计数器的Lock。计数器许可只有1的信号量和Lock是一回事,除了某些Lock的实现支持重入。下面的代码是一个通用的固定大小对象池,演示了信号量的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
package cc.gmem.study.j5ia.concurrent.adv; import java.util.List; import java.util.concurrent.Semaphore; public class CommonPool<T> { private int maxPoolSize; private Semaphore available; protected T[] items; protected boolean[] used; @SuppressWarnings ( "unchecked" ) public CommonPool( List<? extends T> objects ) { //对象池的最大尺寸 this.maxPoolSize = objects.size(); //对象池内存存储结构数组 this.items = (T[]) objects.toArray(); //构建一个许可数目与对象池大小相同的公平信号量 this.available = new Semaphore( maxPoolSize, true ); this.used = new boolean[maxPoolSize]; } /** * 从池中借出一个对象 */ public T borrowItem() throws InterruptedException { //首先,当前线程需要获取一个许可,如果没有需求,则陷入等待 available.acquire(); return getNextAvailableItem(); } /** * 归还一个对象到池中 */ public void returnItem( T item ) { if ( markAsUnused( item ) ) //只有是池里既有的对象才能放回去 //如果返回成功,则释放一个许可 available.release(); } /** * 获取下一个可用对象 */ protected synchronized T getNextAvailableItem() { for ( int i = 0; i < maxPoolSize; ++i ) { if ( !used[i] ) { used[i] = true; return items[i]; } } return null; } /** * 标记某个对象为可用 */ protected synchronized boolean markAsUnused( T item ) { for ( int i = 0; i < maxPoolSize; ++i ) { if ( item == items[i] ) { if ( used[i] ) { used[i] = false; return true; } else return false; } } return false; } } |
对象池的测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
List<Object> objects = new ArrayList<Object>(); objects.add( new Object() ); objects.add( new Object() ); objects.add( new Object() ); final CommonPool<Object> pool = new CommonPool<Object>( objects ); //把三个对象都借走 Object[] borrowed = new Object[] { pool.borrowItem(), pool.borrowItem(), pool.borrowItem() }; new Thread() { public void run() { try { //阻塞,直到3秒后 pool.borrowItem(); System.out.println( "Borrowed an object." ); //阻塞,直到3秒后 pool.borrowItem(); System.out.println( "Borrowed another object." ); } catch ( InterruptedException e ) { } } }.start(); for ( Object item : borrowed ) { TimeUnit.SECONDS.sleep( 3 ); pool.returnItem( item ); } |
屏障类很简单,允许多个线程在一个屏障点集中,汇总结果、并继续下一步工作。下面我们以公司组织去水长城旅游的例子来说明该类的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
import java.util.ArrayList; import java.util.Calendar; import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.DateFormatUtils; public class GreatWaterWallTourism { /** * 小院屏障:所有车到小院集合,换乘旅游大巴 */ private static CyclicBarrier farmyardBarrier; /** * 去往水长城的旅游大巴 */ private static TouristBus touristBus; private static class TouristBus extends Thread { private List<String> persons = new ArrayList<String>(); public void getOn( List<String> persons ) { this.persons.addAll( persons ); } @Override public void run() { System.out.println( persons + " are leaving for the great water wall" ); } } /** * 去往小院的交通工具 */ private abstract static class Vehicle extends Thread { protected List<String> persons = new ArrayList<String>(); @Override public void run() { try { doRun(); farmyardBarrier.await(); //等待其它车辆抵达 touristBus.getOn( persons ); //把人装进旅游大巴 } catch ( Exception e ) { } } protected String now() { return DateFormatUtils.format( Calendar.getInstance(), "HH:mm:ss" ); } protected abstract void doRun() throws InterruptedException; } /** * 小汽车 */ private static class Car extends Vehicle { protected void doRun() throws InterruptedException { persons.add( "Fu Chang" ); persons.add( "Bing Lee" ); System.out.println( persons + " are going to the farmyard by car" ); TimeUnit.SECONDS.sleep( 1 ); System.out.println( "Car arrived at " + now() ); } } /** * 十一路 */ private tatic class BusNo11 extends Vehicle { @Override protected void doRun() throws InterruptedException { persons.add( "Zhen Wong" ); System.out.println( persons + " are taking bus number 11 to the farmyard" ); TimeUnit.SECONDS.sleep( 10 ); System.out.println( "Bus number 11 arrived at " + now() ); } } public static void main( String[] args ) throws Exception { touristBus = new TouristBus(); farmyardBarrier = new CyclicBarrier( 3 ); new Car().start(); new BusNo11().start(); farmyardBarrier.await(); System.out.println( "Tourist bus is about to depart after 3s" ); TimeUnit.SECONDS.sleep( 3 ); touristBus.start(); } } |
输出内容如下:
1 2 3 4 5 6 |
[Fu Chang, Bing Lee] are going to the farmyard by car [Zhen Wong] are taking bus number 11 to the farmyard Car arrived at 14:18:52 Bus number 11 arrived at 14:19:01 Tourist bus is about to depart after 3s [Zhen Wong, Fu Chang, Bing Lee] are leaving for the great water wall |
倒数计数器,这个类和屏障类很相似,它允许一个或者多个线程等待计数变为零,任何线程都可以将计数器的值倒数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
//计数为3的倒数门栓 final CountDownLatch latch = new CountDownLatch( 3 ); latch.countDown(); //减少计数 new Thread() { public void run() { try { TimeUnit.SECONDS.sleep( 3 ); } catch ( InterruptedException e ) { } latch.countDown();//减少计数 latch.countDown();//减少计数 }; }.start(); latch.await();//等待计数为0 |
定义一个交换点(同步点),两个线程会在此点交换数据,先到达的线程会等待,直到配对的线程到达。下面举一个早教中心运动课的例子,参与的孩子们相互传球玩:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
public class BallExchangeGame { private static Exchanger<Ball> ballExchanger = new Exchanger<Ball>(); private static class Ball { private String color; public Ball( String color ) { super(); this.color = color; } public String toString() { return color + " ball"; } } private static class Child extends Thread { private Ball ownedBall; public Child( String name, Ball ownedBall ) { super(); setName( name ); this.ownedBall = ownedBall; } @Override public void run() { for ( ;; ) { try { TimeUnit.SECONDS.sleep( RandomUtils.nextInt( 1, 10 ) ); ownedBall = ballExchanger.exchange( ownedBall ); System.out.println( getName() + " got " + ownedBall ); } catch ( InterruptedException e ) { } } } } public static void main( String[] args ) throws InterruptedException { new Child( "Cai Cai", new Ball( "Colorized" ) ).start(); new Child( "Yu Han", new Ball( "Red" ) ).start(); new Child( "Tiao Tiao", new Ball( "Black" ) ).start(); } } |
观察该程序输出时,会发现控制台总是成对的打印41行的内容,这是因为“交换”是相互的,先提出交换的孩子必须等待另外一个孩子也愿意交换。
JDK 1.5包含读写锁接口ReadWriteLock,它简单的包含两个方法,分别用于获得读锁( Lock readLock() )、写锁( Lock writeLock() )。读锁是共享锁,允许多个线程持有;写锁是独占锁,只能被一个线程持有。此外,一旦有线程持有读锁,就不能有其它线程获取写锁。
JDK自带的读写锁实现类是一个可重入的锁: ReentrantReadWriteLock
从概念上讲,每个JVM线程在整个生命周期中,必然处于下列状态之一:
线程状态 | 说明 |
Initial | 初始状态, 从线程对象被构造,一直到start()方法被调用,处于这一阶段 |
Runnable |
可运行状态,线程的start()方法被调用后,就会处于该状态,JDK提供了多种方式可以使线程离开Runnable状态 处于该状态的进程不一定正在占用CPU资源 某些情况下,把Runnable状态进行以下细分:
|
Blocked |
阻塞状态,处于该状态的线程不能运行,因为它在等待某种事件(定时器、I/O等)的发生,下列情况下线程会进入阻塞状态:
某些情况下,把Blocked状态进行以下细分:
|
Exiting | 一旦线程从run()方法返回,或者被调用stop()方法,就进入该状态,某些情况下也称为Dead状态 |
Java线程调度是基于优先级进行的,但是最终还是受制于具体的操作系统,大部分基于抢占式调度的OS能够很好的支持这种调度方式。
Java中定义了11个优先级,其中开发者可以用到其中的十个(从 Thread.MIN_PRIORITY 到 Thread.MAX_PRIORITY )。
需要注意的是,并不是优先级低的线程总是要等待优先级低的线程让出CPU,为了防止线程饥饿,OS会进行较为复杂的公式计算,来决定到底哪个线程被调度执行。
JDK提供的一些方法可以显式的影响线程调度,例如suspend()可以让当前线程保持在Blocked状态,resume()则让其恢复,但是这两个方法已经被废弃了。
Thread.yield() 会请求OS选择其它线程去运行,该方法的真实效果依赖于OS,大部分时间没有任何意义。在协程(Green Thread)模式下,该方法很有用,但是现代JVM早已抛弃协程,而使用OS级别的线程。
实现方式 | 说明 |
Green Thread |
也称协程,这种实现方式是最简单的,OS不知道任何Java线程的存在,在它看来JVM就是单线程的进程。 在这种模式下,每个线程必须保存所有信息在Thread对象中,例如线程的Stack、只是正在执行的代码的Program Counter,而JVM需要负责线程上下文的切换。 Green Thread在其它环境下,还常常被称为用户级别的线程(User-level Thread),因为它只存在于应用程序的用户级,与OS内核无关。 |
Win32固有线程 | 使用Win32固有线程来实现Java线程时,Java线程和OS线程具有一对一的关系,OS对Java线程具有完全的管辖权。Windows定义的7种优先级和Java优先级具有重叠映射关系 |
Linux固有线程 | 直到JDK1.3,Linux上的JVM都在尝试使用Green Thread,这是因为当时的Linux内核对多线程应用的优化不好。新的Linux内核使用NPTL,JVM也使用了类似其它OS上的1:1线程映射 |
Leave a Reply