Java 并发编程:基础篇

一. 线程相关的基本概念

1.1 线程的状态

线程可以有 6 种状态(如下图所示),定义在 java.lang.Thread.State 枚举中,我们调用线程对象的 getState 方法来获取线程的当前状态。当我们 new 一个线程的时候,这个线程只是被 创建,还未开始运行,只有当我们调用了 start 方法之后,线程才进入 可运行 状态,这个时候线程不是马上开始运行,而是需要等待 CPU 的时间片。可运行状态包含等待 CPU 调度和处于运行两种状态,没有专门的运行态一说。

1
2
3
4
5
6
7
8
9
// 线程的状态定义
public enum State {
NEW,
RUNNABLE,
BLOCKED,
WAITING,
TIMED_WAITING,
TERMINATED;
}

当出现如下情况时,一个线程会进入 阻塞等待 状态,此时的线程不运行任何代码,且消耗最少的资源,直到被线程调度器重新激活:

  • 当一个线程获取一个正在被其它线程持有的锁的时候,线程会进入阻塞的状态,直到其它线程释放锁,且调度器允许该线程持有该锁的时候,线程才会进入非阻塞状态。
  • 当线程等待另外一个线程的通知时,该线程进入等待状态,常见的操作有 wait、join 等
  • 有些方法包含超时参数,调用这些方法会进入计时等待状态,该状态一直保持到超时或者等待的条件满足为止。

而一个线程的终止运行(死亡),一般可以概括为两点原因:

  1. run 方法的正常退出而自然死亡。
  2. 因为一个没有捕获的异常而导致线程意外死亡。
image
1.1.1 join 的使用

join 可以理解为“插队”,假设我们有两个线程 A 和 B,如果我们在 A 的中间调用了 B,那么 A 在执行的过程中会等待 B 执行完成之后再继续执行。看下面一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MyThread implements Runnable {

private int count = 0;

@Override
public void run() {
for (int i = 0; i < 100; i++) {
count++;
}
}

public static void main(String[] args) throws Exception {
MyThread mt = new MyThread();
Thread thread = new Thread(mt);
thread.start();
System.out.println(mt.count);
}

}

这段代码最终打印会是多少呢?应该一眼就能看得出结果是不确定的,因为在一个线程中累加,在另外一个线程中打印,输出结果完全取决于这两个线程的执行速度,如果我们想打印出期望值 100 应该怎么做呢,可以借助 join,如下:

1
2
3
4
5
6
7
public static void main(String[] args) throws Exception {
MyThread mt = new MyThread();
Thread thread = new Thread(mt);
thread.start();
thread.join(); // 插队,等我先执行完你再继续
System.out.println(mt.count);
}

这样就可以输出期望的值 100 了,其运行机制是通过调用 join 方法将 thread 线程插入到主线程中,这样就能保证两个线程的执行顺序,主线程会等到 thread 执行完毕了之后再继续执行。

join 还提供了一个超时方法 join(long millis),用于让主线程等待指定时间后继续,以防止子线程的死锁或死循环而导致主线程的永久阻塞。join 的源码如下,可以看到,join 本质上还是调用的 wait 方法来实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

注意:stop()、suspend(),以及 resume() 方法已过时。

在 java 早期版本中,提供了 stop 方法来终止一个线程,但是这往往会导致一些问题,所以现在已经不推荐再使用该方法,如果我们希望让一个线程停止运行,可以调用线程的 interrupt() 方法。当我们调用某个线程的中断方法时,线程的中断标志位将被置位,我们在编写多线程程序时应该经常让线程检测自己的中断标志位,即调用 Thread.currentThread().isInterrupted()。但是当一个线程被阻塞时,将无法检测自己的中断标志位,这个时候我们调用该线程的 interrupt() 方法,那么线程的阻塞态将被 InterruptedException 异常中断。

需要注意下面三个中断相关方法的作用:

  • void interrupt(): 向线程发送中断请求,线程的中断标志位将被置位。
  • static boolean interrupted():测试当前线程是否被中断(即正在执行这一命令的线程),调用该方法会清除线程的中断标志位。
  • boolean isInterruted():检测当前线程的中断标志位,不会改变标志位的设置。

1.2 线程的属性

1.2.1 线程的优先级

线程具备优先级的属性,调度器在选择执行的线程的线程时会选择优先级高的线程,默认情况下一个线程继承它的父线程的优先级,我们可以调用 setPriority 方法来设置线程的优先级,线程的优先级分为十档,从 1 到 10 逐级增强,默认为 5。

不过需要注意的是 线程的优先级高度依赖于操作系统,在 windows 上只提供了 7 个优先级,所以 java 的 10 个优先级会进行映射,而在 linux 上线程的优先级被忽略,所有的线程都具备相同的优先级。优先级只是提供了线程优先被执行的可能性,但不是确定性的,所以 不要将程序逻辑的正确性建立在线程优先级的基础上

我们可以调用 yield 方法来让当前线程让步给其它线程机会,如果其它线程具备 大于或等于 该线程的优先级,则将获得运行的机会。

1.2.2 守护线程

我们可以调用线程的 setDaemon(true) 方法将一个线程设置为守护线程(后台线程)。守护线程的一个特点是 当只剩下守护线程时,虚拟机也就退出了,没有必要再继续执行。因此守护线程应该永远不去访问固有资源(文件、数据库等),因为它可能在任何时候就死掉了,而不能完成一些善后工作,比如守护线程中的 finally 语句块就不会在线程死亡时被调用。

1.2.3 未捕获异常处理器

我们自定义的 run() 方法不能够抛出任何异常,当异常发生时线程就意外死亡,我们可以为线程设置一个异常处理器来捕获线程抛出的异常,有两种方法可以设置:

  • 为所有的线程设置一个默认的异常捕获器
1
2
3
4
5
6
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {

}
});
  • 为指定的线程设置一个异常捕获器
1
2
3
4
5
6
7
8
Thread thread = new Thread(new MyThread());
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {

}
});
thread.start();

需要注意的是上述异常捕获器只对 execute 提交的任务才有效,而对于 submit 而言,无论是抛出的受检异常还是运行异常,都被认为是任务返回状态的一部分。如果一个由 submit 提交的任务因为抛出运行异常而结束,那么异常信息将被 Futrure.get 封装在 ExecutionException 中重新抛出。

1
2
3
4
5
6
7
8
9
10
11
12
13
Thread.setDefaultUncaughtExceptionHandler(
(Thread t, Throwable e) -> System.out.println("default uncaught exception handler"));

ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

// execute 提交的任务抛出的异常可以直接被异常捕获器捕获
// es.execute(new UnCaughtExceptionDemo());

// submit 提交的任务抛出的异常需要调用 Future.get(),再被重新封装之后才能被捕获
Future future = es.submit(new UnCaughtExceptionDemo());
future.get();

es.shutdown();

如果我们不为线程设置异常捕获器,那么异常发生时的捕获器就是线程的 ThreadGroup 对象。

线程组是一个可以统一管理的线程集合,默认情况下,创建的所有线程都属于同一个线程组。现在已经引入了更好的特性用于线程集合的操作,所以不建议在自己的程序中使用线程组。

二. 同步锁机制

Java 提供了两种锁机制用于同步控制:Lock 和 synchronized。前者相对于后者在使用上更加灵活,后者则更加简单一些。

2.1 Lock 机制

2.1.1 简单使用

Lock 的使用模板如下:

1
2
3
4
5
lock.lock();

// ... 这里是临界区

lock.unlock();

在同一个对象中,临界区的代码一次仅可以被一个线程所访问,先来看一个由多线程累加计数的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MyThread implements Runnable {

private int count = 0;
private Lock lock = new ReentrantLock();

@Override
public void run() {
for (int i = 0; i < 100; i++) {
lock.lock();
try {
System.out.println("[Thread-" + Thread.currentThread().getId() + "] is running.");
count++;
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 放在finally中用于保证在出现异常时也可以释放锁
lock.unlock();
}
}
}

public static void main(String[] args) throws Exception {
MyThread myThread = new MyThread();
Thread thread = new Thread(myThread);
Thread thread1 = new Thread(myThread);
thread.start();
thread1.start();
TimeUnit.SECONDS.sleep(3);
System.out.println(myThread.count);
}

}

上述代码中,通过 Lock 的控制可以保证在并发执行时最终都能正确的进行计数。这里我们利用典型的 ReentrantLock 来构造锁,“reentrant” 译为可重入的,所以 ReentrantLock 也就表示可重入的锁。

可重入的锁

所谓可重入的锁,是指一个锁可以允许同一个线程多次进行该锁控制的临界区,虽然对于不同线程之间来说,锁是排他的,但是对于同一个线程来说,可重入锁不具备排斥性,可重入锁通过一个计数器来记录当前线程进入临界区的次数,当线程出临界期时则将计数器减 1,当计数器为 0 的时候则可以释放锁。

ReentrantLock 除了上面默认的构造函数,还有一个 ReentrantLock(boolean fair) 构造函数,该构造函数的目的是希望构造一个公平的锁,公平锁偏爱等待时间最长的线程,但代价就是大大降低了性能,所以一般不推荐使用公平的锁。

对于上面介绍的 lock() 方法,如果一个线程没有拿到相应的锁就会被阻塞,但有些时候我们可能并不希望这样,tryLock() 提供了一种机制,它试图申请锁对象,如果成功获得锁则返回 true,否则 立即 返回 false。如下例子所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void run() {
while (!Thread.currentThread().isInterrupted()) {
if (lock.tryLock()) { // 尝试获取锁,获取不到就立即返回false
System.out.println("[Thread-" + Thread.currentThread().getName() + "] try lock true");
try {
TimeUnit.MILLISECONDS.sleep(1);
Thread.currentThread().interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} else {
System.out.println("[Thread-" + Thread.currentThread().getName() + "] try lock false");
}
}
}

tryLock() 方法还提供了超时策略,用于在尝试获取锁对象时,等待指定的时间。

lock() 方法不能被中断,一个线程在等待获取锁对象时如果被中断,那么线程将处于阻塞状态,继而死锁,但是带有超时策略的 tryLock() 在等待获取锁时如果被中断,将抛出 InterruptedException,从而防止像 lock() 方法那样进入死锁。也可以调用 lockInterruptibly() 方法,它相当于一个超时设为无限的 tryLock() 方法,在无限等待获取锁对象时,不会因为被中断而死锁。

2.1.2 条件对象:await() 和 signal()

一些情况下拿到锁,并进入临界区,并不代表该线程就可以开始干事了,一些业务场景可能还需要当前环境满足一定的条件,线程才可以执行目标操作。典型的应用场景就是抢购,在并发场景下并不是你进入了临界区就能抢购得到,进入临界区之后还得看货源是否充足。

在本小节的例子中,我们假设有两个线程 a 和 b,a 执行累加操作的前提是 count 必须大于等于 50,当该条件不满足时,a 线程只有等待,在 Lock 机制中,我们可以利用 await() 和 signal() 方法来执行线程之间的通知机制,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class MyThread implements Runnable {

private int count = 0;

private Lock lock = new ReentrantLock();
// 需要先创建一个条件对象
private Condition condition = lock.newCondition();

@Override
public void run() {
for (int i = 0; i < 100; i++) {
lock.lock();
try {
System.out.println("[Thread-" + Thread.currentThread().getName() + "] is running : count=" + count);
if ("a".equals(Thread.currentThread().getName()) && count < 50) {
// 条件不满足,先等待吧
condition.await();
}
count++;
if ("b".equals(Thread.currentThread().getName()) && count >= 50) {
// 条件已满足,通知a可以不用再等了
condition.signalAll();
}
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 放在finally中用于保证在出现异常时也可以释放锁
lock.unlock();
}
}
}

public static void main(String[] args) throws Exception {
MyThread myThread = new MyThread();

// thread a
Thread thread = new Thread(myThread);
thread.setName("a");
thread.setPriority(10);

// thread b
Thread thread1 = new Thread(myThread);
thread1.setName("b");
thread1.setPriority(1);

thread.start();
thread1.start();

TimeUnit.SECONDS.sleep(3);
System.out.println(myThread.count);
}

}

上述代码的逻辑很简单,当线程 a 检测到当前 count 小于 50 时就进入等待状态,此时线程 a 释放锁,b 拿到锁对象后开始执行,并在检测到 count 大于等于 50 时发出一个通知,告知天下现在条件已满足了,之前等待的线程可以再尝试一下,这里为了让演示效果更加明显,我们提高了线程 a 的优先级。

  • signalAll():解除所有因为 await() 而阻塞的线程的阻塞态。
  • signal():随机解除一个因为 await() 而组设的线程的阻塞态。

需要注意的一点是,随机选择是存在安全隐患的,如果当前通知的条件不能满足随机选出来的线程,那么该线程会继续进入阻塞状态,如果此后没有线程再调用 signal() 方法,那么系统就死锁了。

再来看一个通过线程的通知机制让两个线程交替打印的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AlternateThread implements Runnable {

private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

@Override
public void run() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
condition.signalAll();
System.out.println("Thread-" + Thread.currentThread().getName() + ", num=" + i);
// 执行完了一次就等待,等待被其它线程唤醒
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) {
AlternateThread at = new AlternateThread();
Thread ta = new Thread(at);
ta.setName("a");
ta.start();

Thread tb = new Thread(at);
tb.setName("b");
tb.start();
}

}

下面是三个线程交替打印的例子,因为 Lock 的灵活性,所以在实现三个线程交替打印时要简单许多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/**
* 三个线程交替打印
*
* @author zhenchao.wang 2017-07-05 22:52
* @version 1.0.0
*/
public class ThreeAlternateThread {

private Lock lock = new ReentrantLock();

private Condition c1 = lock.newCondition();
private Condition c2 = lock.newCondition();
private Condition c3 = lock.newCondition();

private class A implements Runnable {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
c2.signalAll();
System.out.println("[Thread-" + Thread.currentThread().getName() + "] print:" + i);
try {
TimeUnit.SECONDS.sleep(1);
c1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
}

private class B implements Runnable {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
c3.signalAll();
System.out.println("[Thread-" + Thread.currentThread().getName() + "] print:" + i);
try {
TimeUnit.SECONDS.sleep(1);
c2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}

}
}

private class C implements Runnable {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
c1.signalAll();
System.out.println("[Thread-" + Thread.currentThread().getName() + "] print:" + i);
try {
TimeUnit.SECONDS.sleep(1);
c3.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) {
ThreeAlternateThread tat = new ThreeAlternateThread();
Thread a = new Thread(tat.new A());
a.setName("A");
Thread b = new Thread(tat.new B());
b.setName("B");
Thread c = new Thread(tat.new C());
c.setName("C");
a.start();
b.start();
c.start();
}
}

同样,await() 方法也提供了超时策略,在 await 的过程中,如果线程被中断,则抛出 InterruptedException 异常并退出等待状态,如果希望在这种情况下不抛出异常继续等待,则可以用使用 awaitUninterruptibly() 方法。

2.1.3 读写锁:ReentrantReadWriteLock

对于一些 多读少写 的应用场景,如果使用一般的可重入锁,将读和写一视同仁的效率是不高的,这个时候可以使用读写锁 ReentrantReadWriteLock,读写锁的特点是 读锁可以被多个读操作所共享,而对于写操作是排他的,写锁则对于读操作和写操作都是排他的。这样的特性可以保证线程安全的同时,极大提升读操作的效率,所以特别适用于读多写少的业务场景。下面展示了读写锁的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private Lock readLock = rwl.readLock(); // 获取读锁
private Lock writeLock = rwl.writeLock(); // 获取写锁

@Override
public void run() {

}

private void read() {
readLock.lock();
// 读操作
readLock.unlock();
}

private void wite() {
writeLock.lock();
// 写操作
writeLock.unlock();
}

2.2 synchronized 关键字

2.2.1 基本概念

synchronized 相对于 Lock 在使用上要简单很多,可以对一个实例方法进行 synchronized,也可以对方法中的一段代码进行 synchronized,这个时候 synchronized 的锁对象都是当前类的实例,如果 synchronized 作用于一个类方法,那么此时的锁对象就是类的 class 对象。

那么我们应该如果选择使用 Lock 和 synchronized 呢,主要有下面两条基本原则:

  1. 能不使用都不使用
  2. 如果必须用的话则 synchronized 优先

synchronized 的使用不再详细举例。

2.2.1 条件对象

与 Lock 一样,synchronized 也有自己的条件对象,分别为 wait(),notify(),以及 notifyAll()。使用时需要注意如下细节:

  1. 使用时需要先对调用对象进行加锁
  2. 调用 wait() 方法之后,线程的状态由 RUNNING 变为 WAITING,并将当前线程放置到对象的等待队列中。
  3. notify() 和 notifyAll() 方法调用后,等待线程依旧不会立即从 wait() 返回,需要调用 notify() 或 notifyAll() 的线程释放锁之后,等待线程才有机会从 wait() 返回。
  4. notify() 和 notifyAll() 将等待线程从等待队列移到同步队列,被移动的线程的状态不是由 WAITING 变为 RUNNABLE,而是变为 BLOCKED,因为这个时候还需要等待调用 notify() 方法的线程释放锁。
  5. 从 wait() 方法返回的前提是获取到了对象的锁。

下面来利用这几个函数重新实现 2.1.2 中的例子。

  • 例子一:a 线程在 count 大于等于 50 时开始累加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class MyThread2 implements Runnable {

private int count = 0;

@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (this) {
try {
System.out.println("[Thread-" + Thread.currentThread().getName() + "] is running : count=" + count);
if ("a".equals(Thread.currentThread().getName()) && count < 50) {
// 条件不满足,先等待吧
this.wait(); // 调用锁对象的wait方法,这里是this
}
count++;
if ("b".equals(Thread.currentThread().getName()) && count >= 50) {
// 条件已满足,通知a可以不用再等了
this.notifyAll(); // 调用锁对象的notifyAll方法,这里是this
}
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) throws Exception {
MyThread2 myThread = new MyThread2();

// thread a
Thread thread = new Thread(myThread);
thread.setName("a");
thread.setPriority(10);

// thread b
Thread thread1 = new Thread(myThread);
thread1.setName("b");
thread1.setPriority(1);

thread.start();
thread1.start();

TimeUnit.SECONDS.sleep(3);
System.out.println(myThread.count);
}

}
  • 例子二:两个线程交替打印数字
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AlternateThread2 implements Runnable {

@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
try {
this.notifyAll(); // 调用锁对象的notifyAll方法,这里是this
System.out.println("Thread-" + Thread.currentThread().getName() + ", num=" + i);
// 执行完了一次就等待,等待被其它线程唤醒
this.wait(); // 调用锁对象的wait方法,这里是this
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
AlternateThread2 at = new AlternateThread2();
Thread ta = new Thread(at);
ta.setName("a");
ta.start();

Thread tb = new Thread(at);
tb.setName("b");
tb.start();
}

}

需要注意的一点是,条件对象方法必须在 synchronized 控制的临界区中,否则会抛出异常,并且 调用条件对象方法的对象必须是当前锁对象

2.3 ThreadLocal

ThreadLocal 不是用来解决共享对象的多线程访问问题的,一般情况下,通过 ThreadLocal.set() 到线程中的对象是该 线程私有的对象,其他线程是不需要访问的,也访问不到的。但是需要注意,__如果我们 set 的是一个本来就是共享的对象,那么即使使用 ThreadLocal,我们还是要解决线程共享的问题。

下面来演示一下 ThreadLocal 的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ThreadLocalTest implements Runnable {

private ThreadLocal<Integer> threadLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};

@Override
public void run() {
for (int i = 0; i < 10; i++) {
// System.out.println("[Thread-" + Thread.currentThread().getName() + "], count=" + );
threadLocal.set(threadLocal.get() + 1);
}
System.out.println("[Thread-" + Thread.currentThread().getName() + "], count=" + threadLocal.get());
}
}

示例中我们通过 ThreadLocal 来为每个线程设置了一个 Integer 型的变量,然后在线程中对这个变量进行累加操作,由于是由 ThreadLocal 修饰的,所以这个 Integer 型变量在每个线程中都有一份,并且是不相互影响,所以这里上述示例每个线程累加的结果都是 10。

上述示例我们在构造 ThreadLocal 对象时覆盖了 initialValue() 方法用于设置变量的初始值,该方法在第一次调用 get() 方法时被调用,且仅调用一次,我们还可以使用 withInitial() 方法来简化上面的写法:

1
ThreadLocal<SimpleDateFormat> sdf = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

上面我们利用了SimpleDateFormat,这是一个线程不安全的类,虽然在 jdk1.8 以后我们可以使用新的时间操作类来保证线程安全,但是如果是在老的 jdk 中,我们还是需要通过上述的方式来保证日期格式化的线程安全。

ThreadLocal 之所以能够保证变量是线程私有的,是因为每一个 Thread 都有一个 ThreadLocalMap 类型的 threadLocals 变量,ThreadLocal 以线程对象为 key 存储了每一个线程的 threadLocals 实例,而线程私有的变量都存储在自己的 threadLocals 中,以保证线程之间数据的隔离,从而绕过多线程共享访问的问题,不过如前面所说,如果我们在线程的 threadLocals 中存储的是一个共享的变量,那么还是会存在线程安全的问题。

我们来简单看一下 ThreadLocal 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public class ThreadLocal<T> {

private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;

private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}

/**
* 初始化方法,覆盖实现以对修饰的变量进行初始化
*/
protected T initialValue() {
return null;
}

/**
* 对 initialValue 的 lambda 语法支持
* @since 1.8
*/
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
return new SuppliedThreadLocal<>(supplier);
}

public ThreadLocal() {
}

/**
* 获取修饰的变量值
*/
public T get() {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取当前线程的 ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null) {
// 从线程私有的 map 中获取目标变量值
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = (T)e.value;
return result;
}
}
// 调用初始化方法
return setInitialValue();
}

/**
* 初始化
*/
private T setInitialValue() {
// 调用初始化方法
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) map.set(this, value);
else createMap(t, value);
return value;
}

/**
* 设置变量值到本地线程私有 map 中
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) map.set(this, value);
else createMap(t, value);
}

public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) m.remove(this);
}

/**
* 以线程对应为 key 获取对应的 ThreadLocalMap 对象
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}

T childValue(T parentValue) {
throw new UnsupportedOperationException();
}

// 省略一些内部类定义
}

总之,ThreadLocal 不是用来解决对象共享访问问题的,而是以线程私有的变量副本来保证数据隔离,从而绕过线程并发访问的问题,归纳两点:

  1. 每个线程中都有一个自己的 ThreadLocalMap 类对象,可以将线程自己的变量保存到其中,不同线程之间不相互影响。
  2. 将一个共用的 ThreadLocal 静态实例作为 key,将不同对象的引用保存到不同线程的 ThreadLocalMap 中,然后在线程执行的各处通过这个静态 ThreadLocal 实例的 get() 方法获取线程私有的对象拷贝,避免了将这个对象作为参数传递的麻烦。

需要注意的是在线程池环境下,只有当线程本地值的生命周期受限于任务的生命周期时使用 ThreadLocal 才有意义,否则会导致因为线程被复用而在多个任务之间共享数据。

参考:http://www.iteye.com/topic/103804

三. 线程安全的集合

3.1 阻塞队列

当试图向队列中添加元素而队列已满,或者从队列中取出元素而队列已空的时候,阻塞队列将导致线程阻塞。阻塞队列的方法列表如下:

方法 操作 特殊情况下的响应
add 添加一个元素 如果队列已满,则抛出 IllegalStateException
put 添加一个元素 如果队列已满,则阻塞
offer 添加一个元素 如果队列已满,返回 false
element 获取队头元素 如果队列为空,则抛出 NoSuchElementException
peek 获取队头元素 如果队列为空,则返回 null
poll 获取并移除队头元素 如果队列为空,则返回 null
remove 获取并移除队头元素 如果队列为空,则抛出 NoSuchElementException
take 获取并移除队头元素 如果队列为空,则阻塞

注意:poll 和 peek 通过返回 null 来表示操作失败,所以不允许往队列中插入 null 值。

jdk 提供的基本阻塞队列:

  • 构造一个带有指定的容量和公平性设置的阻塞队列,该队列用循环数组实现
  • ArrayBlockingQueue(int capacity)
  • ArrayBlockingQueue(int capacity, boolean fair)
  • 构造一个无上限的阻塞队列或双向队列,用链表实现
  • LinkedBlockingQueue()
  • LinkedBlockingDeque()
  • 根据指定容量构建一个有限的阻塞队列或双向队列,用链表实现
  • LinkedBlockingQueue(int capacity)
  • LinkedBlockingDeque(int capacity)
  • 构造一个包含Delayed元素的,无界的,阻塞时间有限的阻塞队列。只有那些延迟已经超过时间的元素可以从队列中移除
  • DelayQueue()
  • int getDelay(TimeUnit unit) //得到该对象的延迟
  • 构造一个无边界阻塞优先级队列,用堆实现
  • PriorityBlockingQueue()
  • PriorityBlockingQueue(int capacity)
  • PriorityBlockingQueue(int capacity, Comparator<? super E>)

3.2 高效安全的集合

jdk 提供了一些列线程安全的集合,这些集合使用复杂的算法,通过允许并发的访问数据结构的不同部分来降低线程之间的竞争。

但是需要注意以下几点:

  1. size() 方法不必在常量时间内操作,所以确定一个集合的大小通常需要遍历。
  2. 集合返回的迭代器是 弱一致性 的,这意味着迭代器不一定能够反应出其被构造之后的所有修改,但是这样的迭代器能够保证同一个值不会被返回两次,也不会抛出 ConcurrentModificationException。
  3. 并发散列表默认情况下允许 16 个线程的并发 写操作,如果超过这个数目将会阻塞剩余的线程,但是该值可以修改。

jdk 提供的基本线程安全的集合:

  • 非阻塞队列
  • ConcurrentLinkedQueue()

构造一个可以被多个线程安全访问的,无边界的,非阻塞队列

  • 有序集
  • ConcurrentSkipListSet()
  • ConcurrentSkipListSet(Comparator<? super E> comp)

构造一个可以被多个线程安全访问的有序集

  • 散列映射表
  • ConcurrentHashMap<K, V>()
  • ConcurrentHashMap<K, V>(int initialCapacity)
  • ConcurrentHashMap<K, V>(int initialCapacity, float loadFactor, int concurrencyLevel)

构造一个可以被多个线程安全访问的散列映射表

  • 有序映射表
  • ConcurrentSkipListMap<K, V>()
  • ConcurrentSkipListMap<K, V>(Comparator<? super K> comp)

构造一个可以被多个线程安全访问的,有序的,映射表

  • Copy On Write 集合
  • CopyOnWriteArrayList
  • CopyOnWriteArraySet

四. 执行器

执行器(Executor)可以维持一个线程池的概念,通过维护一组线程来执行请求,执行器提供了如下工厂方法:

工厂方法 描述
newCachedThreadPool 线程池大小不确定,必要时会创建一个新的线程,空闲的线程默认被保留 60 秒
newFixedThreadPool 创建指定大小的线程池
newSingleThreadExecutor 创建大小为 1 的线程池
newScheduledThreadPool 用于预定执行而构建的固定线程池,替代 java.util.Timer
newSingleThreadScheduledExecutor 用于预定执行而构建的单线程池

下面是一个简单的使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class MyExecutor implements Callable<Boolean> {

private static int count;

@Override
public Boolean call() throws Exception {
for (int i = 0; i < 10; i++) {
synchronized (MyExecutor.class) {
System.out.println("[Thread-" + Thread.currentThread().getName() + "] is running, count=" + count);
count++;
}
}
return true;
}

public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(3);
Future<Boolean> future = es.submit(new MyExecutor());
if (future.get()) { // 阻塞主线程,直到子线程执行完毕返回
System.out.println("count=" + count);
}
es.shutdown();
}

}

示例中通过 newFixedThreadPool(3) 创建了一个大小为 3 的线程池,然后调用 submit() 方法提交我们的任务,get() 方法会阻塞主线程直到当前子线程执行完毕返回,如果我们不再需要当前构造的执行器,我们需要调用 shutdown() 方法来关闭,该方法让线程池不再接收新的任务,当所有提交的任务执行完毕之后,线程池中的线程全部死亡。另外一个方法是 shutdownNow(),该方法会尝试取消所有未开始的任务,并中断正在执行的线程。

只有当任务都是同类型且相互独立时,线程池的性能才能够达到最佳,如果将运行时间差别很多的任务混合在一起复用同一个线程池,除非线程池足够大,否则可能会造成拥塞。如果提交的任务依赖于其它任务,那么除非线程池无限大,否则可能造成死锁。

4.1 Runnable 与 Callable

前面的例子中都采用 Runnable 来构造线程类,上述例子中则采用了 Callable,相对于前者要求在执行方法中返回值。FutureTask 类允许我们在两者之间进行转换:

1
2
3
4
5
6
7
// Callable 转 Runnable
FutureTask<Boolean> task = new FutureTask<>(new MyExecutor());
Thread thread = new Thread(task);
thread.start();
if (task.get()) { // 阻塞主线程,直到子线程执行完毕返回
System.out.println("count=" + count);
}

4.2 invokeAll() 与 invokeAny()

invokeAll() 和 invokeAny() 可以用来集合一批线程,区别在于前者需要等待所有的线程都返回结果,而后者只需要等待其中一个返回结果即可,所以各有各的应用场景。使用示例如下:

1
2
3
4
5
6
7
8
9
10
ExecutorService es = Executors.newFixedThreadPool(3);
List<Callable<Boolean>> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
tasks.add(new MyExecutor());
}
List<Future<Boolean>> futures = es.invokeAll(tasks);
for (final Future<Boolean> future : futures) {
if (future.get()) System.out.println("count=" + count);
}
es.shutdown();

invokeAll() 的缺点在于如果任务组中某个任务非常耗时间,那么其余的线程都只能等待这个线程执行完毕之后再返回结果,这种情况下我们可以通过 ExecutorCompletionService 来将结果按照获取的顺序保存起来,从而让执行完毕的线程立即返回结果。

1
2
3
4
5
6
7
8
9
10
11
ExecutorService es = Executors.newFixedThreadPool(3);
// 利用ExecutorCompletionService进行包装
ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(es);
for (int i = 0; i < 10; i++) {
ecs.submit(new MyExecutor());
}
for (int i = 0; i < 10; i++) {
// 可以在一个线程执行完成得到结果之后立即返回
if (ecs.take().get()) System.out.println("count=" + count);
}
es.shutdown();

4.3 Fork-Join 框架

Fork-Join 框架的目的是根据需要创建多个线程来将一个整体的问题分解成为多个小问题分而治之。典型的应用场景就是对于图像的渲染,如果将一个图像分成多个区域,各个区域采用独立的线程进行渲染,则会更加充分的发挥 CPU 的计算能力。

一个类如果希望具备 Fork-Join 的能力,需要先继承 RecursiveTask 类,这里的泛型 V 表示返回的结果类型,如果没有返回值,我们也可以继承 RecursiveAction 类,然后实现 compute() 方法。这里我们简单的以寻找一个数组中的偶数个数为例,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ForkJoinTest extends RecursiveTask<Integer> {

private int left;
private int right;
private int[] array;

public ForkJoinTest(int left, int right, int[] array) {
this.left = left;
this.right = right;
this.array = array;
}

@Override
protected Integer compute() {
if (right - left < 10) {
int count = 0;
for (int i = left; i < right; i++) {
if (array[i] % 2 == 0) count++;
}
return count;
}
System.out.println(left + "\t" + right);
int middle = (right + left) / 2;
ForkJoinTest fj1 = new ForkJoinTest(left, middle, array);
ForkJoinTest fj2 = new ForkJoinTest(middle, right, array);
invokeAll(fj1, fj2);
return fj1.join() + fj2.join();
}

public static void main(String[] args) {
int[] array = new int[100];
for (int i = 0; i < array.length; i++) {
array[i] = RandomUtils.nextInt(0, 100);
}
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTest fj = new ForkJoinTest(0, array.length, array);
pool.invoke(fj);
System.out.println(fj.join());
}

}

示例中以 10 为单位对数组进行拆分,并分别计算各个子数组中偶数的个数,最后将结果进行累加返回。

五. 同步器

5.1 信号量:Semaphore

信号量 Semaphore 就如同一个门卫,通过 acquire() 和 release() 两个方法来控制线程的入场。举一个不太文雅但是易于理解的例子,假设一个厕所有 5 个坑位,现在有 10 个人(线程)想要蹲坑,那么 Semaphore 就像一个厕所管理员,保证当前厕所一次只有 5 个人正在占坑,而其余的人只能等待(阻塞),当一个人拿到 Semaphore 办法的令牌之后(acquire())就可以进入厕所,当一个人完事之后需要调用 release() 方法来释放坑位,下一个人才有机会占坑。

举一个例子来说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SemaphoreTest implements Callable<Boolean>{

private Semaphore semaphore;

public SemaphoreTest() {
}

public SemaphoreTest(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public Boolean call() throws Exception {
semaphore.acquire(); // 获取信号量
System.out.println("[Thread-" + Thread.currentThread().getName() + "] is running");
TimeUnit.SECONDS.sleep(1);
semaphore.release(); // 释放信号量
return Boolean.TRUE;
}

public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Boolean>> tasks = new ArrayList<>();
// 创建一个只允许 3 个线程同时执行的信号量
Semaphore semaphore = new Semaphore(3);
SemaphoreTest st = new SemaphoreTest(semaphore);
for (int i = 0; i < 18; i++) {
tasks.add(st);
}
es.invokeAll(tasks);
es.shutdown();
}

}

上述例子中我们创建了一个大小为 3 的信号量,一次只允许 3 个线程执行临界区的代码。

5.2 倒计时门栓:CountDownLatch

CountDownLatch 的作用类似于一群人约定好了做某一件事情,只有当所有人都到了才能开始做这一件事情,所以每个人到了之后需要签到一下,如果所有人都签到了,那么就可以开始了。我们在构造 CountDownLatch 对象的时候可以指定活动的人数 CountDownLatch(int count),等待线程调用 CountDownLatch 对象的 await() 方法进行等待计数减为 0,而参与活动的人在达到之后需要调用 CountDownLatch 对象的countDown() 方法来将计数减 1 (类似于签到)。

这里我们用一个比较生动的例子说明,假设宿舍里的几个人相约一起吃完饭,而每个人在此之前都在忙自己的事情,来的时间不确定,所以先到的人就需要等待,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class CountDownLatchTest {

private CountDownLatch latch;

public CountDownLatchTest(CountDownLatch latch) {
this.latch = latch;
}

private class chaochao implements Runnable {

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("超超来了");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 签到
latch.countDown();
}
}
}

private class huanhuan implements Runnable {

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("欢欢来了");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 签到
latch.countDown();
}
}
}

private class haoran implements Runnable {

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("浩然来了");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 签到
latch.countDown();
}
}
}

public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(3);
CountDownLatchTest cdl = new CountDownLatchTest(latch);
new Thread(cdl.new chaochao()).start();
new Thread(cdl.new huanhuan()).start();
new Thread(cdl.new haoran()).start();
latch.await();
System.out.println("人都到了,开吃吧~");
}

}

5.3 障栅:CyclicBarrier

CyclicBarrier 在功能上等同于 CountDownLatch,只是在实现机制上不同,CountDownLatch 的对象所在的线程会等待所有参与的线程执行完毕之后继续执行,这里的等待是主线程等待,而 CyclicBarrier 则是让所有参数的线程等待,如果参与的线程有至少 1 个没有执行完成,那么所有已完成的线程将进入等待状态,直到所有参与的线程执行完毕之后,所有等待的线程一次性被唤醒。

我们还是以上面的例子来举例,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class CyclicBarrierTest {

private CyclicBarrier cyclicBarrier;

public CyclicBarrierTest(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

private class chaochao implements Runnable {

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("超超来了");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}

private class huanhuan implements Runnable {

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("欢欢来了");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}

private class haoran implements Runnable {

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("浩然来了");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws Exception {
CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("人都到了,开吃吧~");
}
});
CyclicBarrierTest cb = new CyclicBarrierTest(barrier);
new Thread(cb.new chaochao()).start();
new Thread(cb.new huanhuan()).start();
new Thread(cb.new haoran()).start();
}
}

我们看到上述例子中,如果一个线程执行完毕,则需要调用 CyclicBarrier 对象的 await() 方法等待其它线程的执行完毕。

5.4 交换器:Exchanger

Exchanger 主要用于两个线程之间的数据交换(数据的交换是双向的),当线程 A 操作完之后会将自己修改后的缓冲区交给线程 B,线程 B 操作完毕之后又会将自己修改后缓冲区交给线程 A,如此反复,使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ExchangerTest {

private class MessageExchanger implements Runnable {

private Exchanger<String> exchanger;

public MessageExchanger(Exchanger<String> exchanger) {
this.exchanger = exchanger;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
String send = RandomStringUtils.randomAlphanumeric(16);
System.out.println("[Thread-" + Thread.currentThread().getName() + "] send message:\t" + send);
String receive = exchanger.exchange(send);
System.out.println("[Thread-" + Thread.currentThread().getName() + "] receive message:\t" + receive);
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println();
}
}
}

public static void main(String[] args) {
ExchangerTest et = new ExchangerTest();
Exchanger<String> exchanger = new Exchanger<>();
new Thread(et.new MessageExchanger(exchanger)).start();
new Thread(et.new MessageExchanger(exchanger)).start();
}
}

注意:这可不是 “生产者-消费者” 模式。

5.5 同步队列:SynchronousQueue

同步队列应用于典型的 “生产者-消费者” 模式,当一个线程往同步队列中 put 数据之后即进入阻塞态,直到另外一个线程 take 数据,反之亦然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class SynchronousQueueTest {

private class Producer implements Runnable {

private SynchronousQueue<String> queue;

public Producer(SynchronousQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
String message = RandomStringUtils.randomAlphabetic(16);
System.out.println("Producer produce message:\t" + message);
queue.put(message);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

private class Consumer implements Runnable {

private SynchronousQueue<String> queue;

public Consumer(SynchronousQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
String message = queue.take();
System.out.println("Consumer consume message:\t" + message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
SynchronousQueueTest sq = new SynchronousQueueTest();
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(sq.new Producer(queue)).start();
new Thread(sq.new Consumer(queue)).start();
}
}

相对于 Exchanger,同步队列最大的特点是它的 数据流向是单向的,始终从生产者线程流向消费者线程。


转载声明 : 版权所有,商业转载请联系作者,非商业转载请注明出处
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议
Powered by hexo & Theme by hiero   Copyright © 2015-2019 浙ICP备 16010916  号,指 · 间 All Rights Reserved.