Java多线程(一)——线程基础

面试中,经常被问到,你对于高并发有什么理解和实践?很多时候,如果自己以前的项目没有涉及到,很容易被这个问题打懵。

并发?我一个写Java后台的,并发不是全靠Servlet自己分发请求到不同的Controller实现的吗?难道你们公司写个Web后台还要多线程?

其实这个问题不能怪面试官故意刁难你,而是你自己对于你们项目流程的理解和挖掘不够,以至于你没有发现其中可以用多线程来优化的点。我们的业务经常需要去多个系统、服务中拿数据,然后在代码中组合、处理,然后在一次性返回给下游。

但其实很多时候,这些请求我们都是用Java代码串行执行的,这是大部分情况。但是也有一部分的业务,其实多个系统之间的请求不需要事务性的关联操作,我们完全可以用多个线程,分别请求多个系统,在所有线程都返回执行结果后,在进行后续处理。

举个生活中的例子类比一下:炒菜,我们需要先点火,烧热锅,下油,油温不高到时候就下姜蒜等作料,然后随着油温的升高一起炒出香味。然后肉丝下锅,翻炒至金黄,然后下切好的青椒丝,翻炒出锅。对于这样一个事情来说,所有的步骤都有着必然的前后顺序,如果打乱的话,结果就可能出问题。

我们再看另外一个例子:你和女朋友打算晚上做饭,于是你俩一起逛超市。猪肉、姜蒜、青椒,分别在三个不同的摊位,你拉着你女朋友,一个个的摊位买过去,每个摊位花了五分钟挑选,最后十五分钟买完所有需要的食材。你觉得一切OK,没什么问题。但是,如果你们两个人分开,你单独去买猪肉,你女朋友同时去买姜蒜和青椒,那么,你五分钟结束自己的采买任务,你女朋友十分钟结束采买任务,再花一分钟时间汇合,总计十一分钟,完成整个采买过程。分开的两个人(并行)就两个人一起(串行)节省了四分钟的时间。

实际上,如果你认真思考你现有项目中的流程,可能会发现很多类似的优化点,只不过你可能从来没有想过而已。

1.Java如何实现线程

Java实现线程一般有两种,继承java.lang.Thread类和实现java.lang.Runnable接口,Java代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Test {

public static void main(String[] args) {
TestThread testThread = new TestThread();
testThread.start();

TestRunnable testRunnable = new TestRunnable();
new Thread(testRunnable).start();
}
}

class TestThread extends Thread {
@Override
public void run() {
System.out.println("do something");
}
}

class TestRunnable implements Runnable {
@Override
public void run() {
System.out.println("do something");
}
}

比较的简单,我们需要注意的是,线程的调用应该使用start方法,而不是你实现的run方法。另外,Runnable也是封装到Thread中调用的。

两种实现方式其实没有什么优劣,类的继承和接口的功能上没有什么区别,但是,Java是单继承的,如果你继承了Thread类,你就不能再继承其他有用的工具类。

线程创建到结束,有多个状态:

new,创建后未启动。

runnable,可能正在运行,也可能在等待CPU分配资源。

blocking,等待获取一个排它锁,如果它期望获得的锁被其他线程释放了,这个状态会结束。

waitting,等待状态,如调用了object.wait()或者thread.join(),线程会进入等待状态,除非被object.notify()或者object.notifyAll()唤醒,CPU不会分配资源给该线程。

timed waitting,在timeout时间之后,会被系统幻想的等待状态,如调用了Thread.sleep(1000),会在设置的时间结束后被唤醒。object.wait(1000)会被被object.notify()或者object.notifyAll()唤醒,thread.join(1000)会被被object.notify()或者object.notifyAll(或者join的线程已经执行完毕后被唤醒。

terminated,死亡状态,线程运行结束,或者发生异常结束之后的状态。

2.多个线程一起运行可能存在的问题

多线程一起运行会有什么问题呢?假设,小明的爸爸排小明去水果店看看有没有苹果,小明去看了一眼,还剩一个苹果,于是小明回去找爸爸要钱,然后带着钱又回去水果店打算买这个苹果。这个过程中会发生什么事呢?假设,在小明看到苹果店有苹果~他带钱再次回到水果店之间,这个苹果被其他人买走了,那么小明就买不到苹果了。这种多个线程引发的问题,我们称为线程安全问题。

想了解这个问题,解决这个问题,首先,我们要明白,哪些东西会被线程共享。我们之前在面试之JVM中,对JVM里内存各个区域的划分做了简单梳理。

局部变量,局部变量维护在每个方法自己的栈里,而虚拟机栈是线程独享的,所以基础数据类型的局部变量,是线程安全的。

局部对象,在方法内部创建的对象,它的指针也是维护在栈里,理论上是线程安全的。但是对象的实例,也就是你new出来的那部分,是存在堆里的,而堆是线程共享的。如果你把一个方法内new出来的对象,传递给了其他线程,那么它就不再是线程安全的了。

对象成员,对象的成员变量存放在堆中,如果不做特殊处理,多个线程同时操作一个对象,它也是线程不安全的。

3.互斥同步,syncchronized

sync是JVM实现的互斥同步机制,可以进行代码块,方法,静态方法,类四种级别的互斥同步,实例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Test {

public void syncCode() {
synchronized (this) {
System.out.println("do something");
}
}

public synchronized void syncMethod() {
System.out.println("do something");
}

public synchronized static void syncStaticMethod() {
System.out.println("do something");
}

public void syncClass() {
synchronized (Test.class) {
System.out.println("do something");
}

}
}

其中,同步代码块和同步类非常像,可以看到,一个sync操作的对象是this,一个操纵的是class,区别也是显而易见的。sync的this,那么同一个对象,运行这段代码时,大家是互斥的,也就是说,先进入到代码的会运行,后进入到代码块的在后面排队,等待先拿到锁的代码运行完毕后,才会运行。如果sync锁的是class,那么就可以理解为,该类下所有的对象,都加都在一起排队。

如果方法修饰的是普通方法,那么多个线程调用同一个对象的该方法时,大家是排队的,调用不同对象之间的方法则没有影响。如果修饰的static,那么所有类的对象的该方法执行都会排队。

说了这么多,syncchronized是如何实现的呢?我们在面试之JVM中说过,对象的实例是存储在Java Heap中的。而对象实例在堆内存中的存储,可以用下图表示:

实例变量里存储了对象的字段信息,父类的属性,数组的长度等内容,在内存中以4字节对齐。

填充数据的存在是因为JVM要求对象起始地址必须是8字节的整数倍,部分情况下,需要填充内容来实现内存对齐。

对象头中主要包含两个部分:

1.Mark Word,存储的主要是对象的HashCode,锁信息,分代年龄(面试之JVM里说过,对象new出来时在新生代中,经历若干次GC后仍然存活就会移动到老年代)或者GC标志。

2.Class Metadata Address,类型指针指向对象的类元数据,可以理解为JVM使用这个指针确定该对象实例属于哪个类。

在Mark Word的锁信息中,存储了monitor对象的地址,什么是monitor?在这里可以理解为一个监视器,它在对象创建时创建,或者在线程试图获取锁时创建。它有三个重要的概念:Owner,WaitSet,EntryList。多个线程访问同一段加锁了的代码时,他们首先会进入EntryList,如果线程成功获取到对象的monitor,会占有monitor的owner。如果线程又调用了wait()方法,则会释放monitor,把Owner= null,然后该线程进入到WaitSet中等待唤醒。获取到monitor锁的线程,执行完毕后,也会释放锁,并清空Owner变量。其工作流程可以参考下图:

4.ReentrantLock

前面我们说道的sync是JVM层面实现的锁,其实两者性能上没有什么区别,但是sync是不公平锁的实现,意思就是,不能保证先排队的线程就一定先获取到锁资源。而ReentrantLock可以在构造参数中通过传参的时候选择是否使用公平锁。

ReentrantLock,重入锁,是一种递归无阻塞的同步机制,是JDK实现的。ReentrantLock会被最后一个成功获得锁,并且没有释放的锁的线程持有。

当锁空置时,调用lock的线程会成功获得锁并且返回。如果已经持有锁的线程再次调用lock,会立即返回。而这一特点就是可重入锁名字的来源,意思就是,已经持有锁的线程,再次试图获取锁,不会进入等待队列,而是直接判定获取锁并且返回。

我们来看一下ReentrantLock的lock方法,代码如下:

1
2
3
public void lock() {
sync.lock();
}

向上查找一下引用,可以看到ReentrantLock的构造方法中有如下处理:

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

根据API文档我们可以知道,在这里传入false,它实现的是非公平锁,也就是说,最终拿到的是NonfairSync对象。NonfairSync继承自ReentrantLock的内部类Sync,而Sync继承自AbstractQueuedSynchronizer(AQS)。我们接下来看一下NonfairSync对象的lock方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

其中compareAndSetState方法是native方法,也就是说是操作系统层面的操作。compareAndSetState简cas操作,它是原子性的,如果返回true,则表示获取锁成功。setExclusiveOwnerThread则是我们之前说到的,把锁的持有者设为当前线程。如果返回失败,则运行acquire方法,这个方法的代码如下:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

在这里,因为我们对象是NonfairSync实际上它的tryAcquire方法走的是我们上文贴的代码中的nonfairTryAcquire(acquires),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

可以看到,它先是判断锁当前的状态,如果c==0,则再次尝试用cas操作获取锁,获取到之后就返回成功。如果c!=0,就判断当前持有锁的线程getExclusiveOwnerThread是不是自己当前所处于的线程Thread.currentThread(),如果是,就更新状态,如果不是,返回失败。这一段代码就是可重入操作的实现部分了。如果这一过程再次返回fase,就会运行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,我们来看一下它的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

这段代码很好理解,我们看到它实际上把当前线程包装成了一个节点,然后加入到队列末端开始排队。而acquireQueued的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

可以看到,实际上它的核心思路是一个死循环, final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) 这段表示,如果当前节点的前一个节点是head节点,即队列的头部了,就再次用cas操作尝试或许锁。如果当前节点的前一个节点还不是head,那表明锁被其他线程持有,应该继续等待。shouldParkAfterFailedAcquire的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

实际上源码上给出的注释已经解释的非常清楚了,首先拿一下当前节点前面一个节点的状态,if (ws == Node.SIGNAL)如果前面的节点已经是SIGNAL状态,那么当前节点就可以安全的休眠。if (ws > 0),如果前面的节点已经取消了, 那么就再向前取一个节点。如果拿到的节点状态正常,就compareAndSetWaitStatus设置它的状态为SIGNAL。

如果处理完上面的过程,则运行parkAndCheckInterrupt(),代码如下:

1
2
3
4
5
6
7
8
9
10
11
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

将当前线程休眠。
接下来我们看看释放锁的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

可以看到,释放锁的代码实际上是走的sync.release方法,然后继续调用到tryRelease方法尝试释放锁。处理代码也非常好理解,先判断当前持有锁的对象是不是自己的线程,如果不是则抛出异常。如果时,把state的计数器减掉,如果减掉后计数器为0,就说明当前持有锁的线程所有的地方都已经释放了,就把当前锁的持有线程重置为null。
tryRelease处理完毕后,则取到排队的线程队列的head节点,尝试唤醒排队线程。
而FairSync的代码比NonfairSync的代码简单很多,大部分实现都是雷同的,就不在这里赘述了。

5.CountdownLatch

带计数器的锁,内部实现了一个内部类Sync,继承自AbstractQueuedSynchronizer(AQS),前面已经看过很多AQS的代码了,在这里我们再看看Sync的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

可以看到,实际上CountDownLatch的计数器Count最终是被设置到了AQS的state中。每次调用CountDownLatch的countDown的时候,就去AQS的state减1,然后尝试释放锁。该类适用于需要计数的场景,换句话说就是,一个线程需要等待多个线程返回之后再运行的场景。

6.CyclicBarrier
和CountDownLatch类似,但是场景相反。字面意思为同步屏障,即多个线程都到达屏障后,屏障才会放开。它也是使用计数器实现的,但是计数器是使用await方法递增的。当递增到设置的值时,所有被阻拦在屏障的线程都会被释放。

未完待续

参考&引用:

CyC2018@Github

zejian@CSDN