并发编程基础

James 2019年12月28日 193次浏览

内存模型的基本概念

计算机的每一个操作都是由CPU去执行的,而在执行的过程中,难免会有一些数据的读取和写入操作,这些数据是存在于我们的主存(物理内存)中的,然而主存的输入输出速度和CPU执行执行的速度相比要慢得多,
如果CPU的每次执行都要从主存中的加载写入数据,那么整个系统的性能就被降低了,所以,CPU就内置了高速缓存。
CPU在执行指令之前,会将需要用到的数据提前复制一个副本到高速缓存,然后CPU在运算过程中就可以直接读取和写入高速缓存中的副本数据,最后,运算结束,在退出指令之前将副本数据刷新到主存中。
例如:

i = i + 1;

在以上操作中,CPU会首先从主存中读取i的值,并且将其复制到CPU的高速缓存中,如果此时i的值为0,那么高速缓存中就存在一个i=0的变量,再执行i+1,得到结果为1,最后在退出指令之前将i的值1刷新到主存中。
以上操作在单线程中没有任何问题,但多线程就会出现问题,在多核CPU中,每条线程可能会在不同的CPU中运行,各个CPU有独立的高速缓存,此时两个线程同时执行以上操作,线程1在执行指令之前会将i的值0复制到
CPU1的高速缓存中,线程2也同样将i的值0复制到CPU2的高速缓存中,此时两个线程中i的值都为0,然后开始运算,线程1运算完成后得到i的值为1,在其执行完指令退出前,将缓存中的值刷新到主存中;线程2运算完成
后得到i的值也为1,将其刷新到主存中,这个时候主存中i的值为1,并不是我们理想中的2。这就是著名的缓存一致性问题,我们通常称多个线程访问的变量为共享变量。也就是说,一个变量存在于多个CPU的高速缓存中
就会出现缓存一致性问题。
那么为了解决缓存一致性问题,一般有以下两种方案:

  • 总线锁:早期的CPU都是通过在总线上加锁#Lock来解决的,因为CPU和其他部件都是通过总线来进行的,在总线上加锁,也就保证了CPU的多个核心串行执行,当线程1执行完再执行线程2。这虽然解决了缓存一致性问题
    但是该种方式带来的效率低下是无法避免的。
  • 缓存一致性协议:由于总线锁方式的效率低下,所以就出现了缓存一致性协议,最出名的就是Intel的MESI协议,它的核心思想是:当其中一个CPU在写某个变量时,如果发现该变量是共享变量(该变量存在于多个CPU
    的高速缓存中),则通知其他CPU将该变量的缓存状态设置为失效,当其他CPU需要读取该变量时,发现该变量缓存是失效状态的,所以会重新从主存中读取。

并发编程的三个概念

明白了计算机的内存模型,现在来分析一下我们在日常开发过程中,并发编程需要面临的三个问题:可见性问题、原子性问题、有序性问题

可见性

多个线程访问同一个变量,一个线程对该变量的值进行了改变,其他线程能够立即看到该变量的最新数据。
当一个变量存在于多个CPU中的高速缓存中,一个线程对该变量进行了变更,其他线程中任然是该变量变更之前的数据,这个时候就出现了可见性问题。

原子性

一个或者多个操作,要么全部执行,执行过程中不会被其他线程打断,要么全部不执行。只有保证了原则性,才能确保得到的结果是正确的。
例如:一个32位变量的赋值分为两个步骤,为低16位赋值,为高16位赋值;当将低16位的值写入成功过后,突然中断,此时有一个线程对该变量进行了访问,这个时候得到的结果是不正确的,这就出现了原子性问题

有序性

程序的执行顺序按照代码的编写先后顺序执行。
处理器为了提高程序执行效率,可能会对程序代理进行优化,它不保证每句代理的执行的顺序,但可以保证程序最终执行的结果是一致的。这就是指令重排序,那么它是怎么保证最后结果的一致性的呢?举个例子:

int i = 1; //{1}
int j = 2; //{2}
i = i + 1; //{3}
j = j + i; //{4}

以上代码执行顺序可以是:{2} -> {1} -> {3} -> {4},但是绝对不会出现 {2} -> {1} -> {4} -> {3},因为步骤{4}依赖于步骤{3}的执行结果。
从上面的例子可以看出,指令重排序并不会影响单线程的执行结果,但是在多线程就不一定了,例如(伪代码):

//线程1
Object a = new Object();    //{1}
flag = false;               //{2}

//线程2
while(flag) {               //{3}
    Thread.sleep(1000);
}
a.test();                   //{4}

从以上例子可以看出,线程1中,代码{1}和代码{2}并没有依赖性,所以根据执行重排序的规则,{1}{2}并不保证执行的顺序性,这个时候如果首先执行的是{2},那么线程2就会终止循环结束线程,执行{4},然而这个时候a
还没有完成初始化,这个时候程序就会抛出异常。

总结

要想在多线程中程序的正常执行,必须要保证可见性、原子性、有序性。

Java内存模型(JMM)

前面两节了解了计算机的内存模型,下面来了解一下Java的内存模型。它为我们提供了哪些保证以及提供了哪些方法或者机制来解决以上问题。
JMM主要是为了屏蔽各个硬件平台和操作系统对内存访问的差异,实现JVM在各个平台下能一致的访问内存的功能。他主要定义了程序中变量的访问规则,值得注意的是,JMM为了较好的执行性能,并没有限制CPU使用高速
缓存带来的性能优化,也没有限制编译器对指令的重排序,也就是说,在JMM模型中,任然存在缓存一致性和指令重排序问题。
JMM模型中,所有的变量都存在于主存(可以看成是物理内存)中,每个线程都有自己的工作内存(可以看成是CPU的高速缓存)中,线程对变量的每个操作都是在自己的工作内存中,不能直接对主存进行操作,并且各个线程
不能对其他线程的工作内存中的数据进行操作,如果线程之间需要通信,必须经过主存进行数据传递。

JMM中主内存和工作内存的交互

CPU高速缓存和物理内存之间交互有MESI缓存一致性协议,那么JMM中的工作内存和主存之间的交互(主存中的数据如何读取到工作内存中的,工作内存中的数据如何写入到主存中的)也有约定,
是通过JVM定义的八种操作来完成的,这八种操作,每一种都是原子性的。这八种操作分别是:

  • Lock(锁定):作用于主内存中的变量,一个变量同一时间只能有一个线程锁定,表示这条线程独占这个变量
  • UnLock(解锁):作用于主内存,将该条线程锁定的对象解锁,使之其他线程能锁定该变量
  • Read(读取):作用于主内存,表示将一个主内存的变量的值传输到线程的工作内存中
  • Load(载入):作用于线程的工作内存中,表示把主内存中read操作得到的值放到工作内存的变量副本中
  • Use(使用):作用于工作内存,当JVM在执行过程中遇到一个需要使用一个变量的值时会调用该操作
  • Assign(赋值):作用于工作内存,当JVM在执行过程中遇到一个需要给一个变量赋值时会调用该操作
  • Store(存储):作用于工作内存,把工作内存中的值传输到主内存中
  • Write(写入):作用于主内存,将工作内存中Store得到的值放入主内存的变量中
    以上八种操作需要遵循以下八种规则:
  • 不允许Read/Load或者Store/Write单独出现,也就是说不允许出现主内存读取了变量工作内存不接受或者工作内存回写了变量主内存不接受的情况。
  • 不允许线程在自己的工作内存中执行了Assign操作(修改了变量),而不同步(不回写)到主内存的情况
  • 工作内存中没有做任何变更的变量不允许会写到主内存
  • 变量只能在主内存中产生,工作内存中不允许直接使用一个未被初始化的变量
  • 一个变量同一时刻只能被一个线程加锁
  • 对变量执行Lock操作,就会清空工作空间中该变量的值
  • 不允许对没有执行Lock操作的变量执行UnLock操作
  • 对一个操作执行UnLock之前,必须要把工作空间的值回写到主内存中,也就是要执行Store和Write操作

Volatile修饰的变量的特殊规则

假设:T表示一个线程,V/W分别是Volatile关键字修饰的变量,那么在进行Read/Load/Use/Assign/Store/Write操作的时候都将遵循以下原则

  • 线程T对变量V执行Use操作之前,必须先执行Load操作,同时线程T执行了Load操作过后必须执行Use操作;再依照普通变量的规则(Read/Load操作不能单独出现):则表示Read/Load/Use三个操作必须是连续的。
  • 线程T对变量V执行Store操作之前,必须先执行Assign操作,同时线程T执行了Assign操作过后必须执行Store操作;再依照普通变量的规则(Store/Write操作不能单独出现):则表示Assign/Store/Write三个操作必须连续。
  • 假设动作A代表线程T对变量V执行的Use或者Assign操作,动作B代表动作A相关联的Load或者Store操作,动作C代表动作B向关联的Read或者Write操作;类似的,动作D代表线程T对变量W执行的Use或者Assign操作,
    动作E代表动作D相关联的Load或者Store操作,动作F代表动作E向关联的Read或者Write操作;如果A先于D,那么C先于F。也就是说,在同一个线程内部,被Volatile修饰的变量不会被指令重排序。
    总结:前面了两条规则可以归纳为“Volatile修饰的变量可以保证对所有线程的可见性”,第三条可以归纳为“Volatile修饰的变量禁止指令重排序优化

Volatile关键字

从上一节中可以得知,Volatile修饰关键字具备了以下两层语义

  • Volatile修饰的变量可以保证对所有线程的可见性
  • Volatile修饰的变量禁止指令重排序优化

Volatile保证原子性吗?

Volatile关键字保证了变量在多个线程下的可见性,但是否保证原子性呢,我们先通过一个例子来分析

public class VolatileTest {
    private volatile int num = 0;
    public void incr(){
        num++;
    }
    public static void main(String[] args) {
        VolatileTest test = new VolatileTest();
        for (int i=0;i<100;i++) {
            new Thread(()->{
                for (int j=0;j<1000;j++) {
                    test.incr();
                }
            }).start();
        }
        //保证前面的线程都执行完毕
        while (Thread.activeCount()>1){
            Thread.yield();
        }
        System.out.println(test.num);
    }
}

以上代码理想的执行结果为1000*100=100000,但其实结果并不一定是100000,它的结果会小于等于100000,为什么呢?我们来一一分析。
我们知道num++不是一个原则操作,它包含三个操作 读取x的值,进行加1操作,写入新的值,也就是说这三个操作有可能被分割,我们来假设一下

  1. 假设某一时刻num的值为100,线程1对num进行自增操作,首先线程1从主内存中读取了num的值,这个时候读入到线程1工作内存的num的值为100,就在此时线程1被阻塞。
  2. 线程2对num进行自增操作,首先线程2从主内存中读取了num的值,这个时候读入到线程2工作内存的值任然为100,然后自增1,然后将101的值刷新到主内存中。
  3. 线程1重新获得了CPU时间片,继续执行,在工作内存中的num值任然为100,在100的基础上执行自增,得到101,最后将101刷新到主内存中。
    由上面的步骤可以看出,两个线程分别对num的值进行了自增,理论上num的值应该增长了2,但实际上只增长了1。到这里,可能会有疑问,为什么线程2修改了值并且将值刷新到主存中,线程1中工作内存的值没有被失效。
    这里需要注意的是缓存失效是在读取之前的,这里线程1已经将数据读取到工作内存中了,所以线程1中的数据任然是有效的。
    由以上可以得出结论,Volatile关键字并不能保证原子性。解决办法是,可以在自增的方法上加上synchronized关键字,或者在该方法中加入Lock块,或者使用JUC包下的原则操作类。分别如下:
    采用synchronized
public synchronized void incr(){
    num++;
}

采用Lock

public void incr(){
    try {
        lock.lock();
        num++;
    } finally {
        lock.unlock();
    }
}

采用AtomicInteger

private volatile AtomicInteger num = new AtomicInteger(0);
public void incr(){
    num.getAndIncrement();
}

Volatile保证有序性吗?

前面提到了Volatile修饰的变量禁止指令重排序优化,所以Volatile在一定程度上能保证有序性。Volatile禁止指令重排序有两层意思:

  • 当程序执行到Volatile变量的读或者写操作时,在其前面操作的更改肯定已经全部执行,且结果对后面的操作可见,在其后面的操作肯定还没有执行。
  • 在进行指令优化时,不能将在对Volatile变量访问的语句放到其后面执行,也不能把Volatile变量后面的语句放到前面执行。
    说的不太容易理解,举个例子:
int a = 1;              //{1}
int b = 2;              //{2}
volatile int c = 3;     //{3}
int d = 4;              //{4}
int e = 5;              //{5}

以上声明了五个变量,只有变量c是Volatile修饰的,那么在进行指令重排序的时候,不会将{3}放到{1}、{2}的前面,也不会将{3}放到{4}、{5}的后面,但是{1}、{2}和{4}、{5}的执行顺序不作任何保证。并且,当执行到
语句{3}的时候,{1}、{2}必须是执行完毕了的,且{1}、{2}的执行结果对{3}、{4}、{5}都是可见的。

Volatile原理和实现机制

Volatile关键字修饰的变量,在生成的汇编指令中,会多出一个lock前缀指令,lock前缀指令相当于一个内存屏障,该内存屏障有三个功能:

  1. 他保证指令重排序时不会将内存屏障之后的指令放到之前,也不会将之前的指令放到内存屏障之后,即在执行内存屏障这块指令时,其之前的指令已经全部执行完成。
  2. 强制将修改操作立即更新到主内存
  3. 如果是写操作,会导致其他CPU中缓存行失效

Synchronized关键字

Synchronized关键字可以作用到成员方法上,也可以作用到静态方法上,也可以作用到一个代码块中,当作用在代码块中时,可以指定锁的范围,分别是类对象和实例对象,当作用到实例方法上时,锁的范围是当前实例对象,
也就是this,当作用在静态方法上时,锁的范围为类对象;一个线程能否访问被Synchronized关键字修饰的方法,判断依据在于该方法的锁是否被占用,如果被占用,则不能访问,否则能访问。举个例子:一个类中有多个
Synchronized关键字修饰的实例方法,多个线程同时访问这些实例方法,访问顺序是同步进行的,因为使用的锁都是实例对象this。

注意:Synchronized关键字是不能被继承的,也就是说父类的synchronized func()方法在子类中会变成func()

Synchronized底层原理

在Java中,对象被创建于堆内存中,一个对象实例我们可以分为三个部分:对象头、实例数据、对齐填充。对象头主要包含两个部分信息:自身运行时数据(锁状态标志,线程持有锁)也称Mark Word、类型指针(JVM通过该指
针来只想该对象属于哪个类的)。实例数据用于存放类的属性数据,包括父类的属性数据。对齐填充用于字节对齐,由于JVM中要求对象起始地址必须是8字节的整数倍。
Synchronized的对象锁,其指针指向一个Monitor对象(该对象由C实现)的起始地址,每个对象都会有个Monitor对象,Monitor对象由ObjectMonitor实现,该对象的C代码如下:

//openjdk\hotspot\src\share\vm\runtime\objectMonitor.hpp
ObjectMonitor() {
    _header       = NULL;//markOop对象头
    _count        = 0;
    _waiters      = 0,//等待线程数
    _recursions   = 0;//重入次数
    _object       = NULL;
    _owner        = NULL;//指向获得ObjectMonitor对象的线程或基础锁
    _WaitSet      = NULL;//处于wait状态的线程,会被加入到wait set;
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;
    FreeNext      = NULL ;
    _EntryList    = NULL ;//处于等待锁block状态的线程,会被加入到entry set;
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;// _owner is (Thread *) vs SP/BasicLock
    _previous_owner_tid = 0;// 监视器前一个拥有者线程的ID
}

从以上代码可以看出该对象有两个队列,_WaitSet和_EntryList,其中_WaitSet用来保存等待锁的线程对象,_EntryList用来保存处于阻塞状态的线程。还有一个_owner,该变量用来标志当前获得锁的线程,当多个线程同时
访问一段同步块时,会将其存放到_EntryList中,当一个线程获取了Monitor过后,_owner就会被设置为该线程,同时_count+1,当线程调用wait()方法或者线程顺利执行完毕,就会释放当前持有的Monitor,那么Monitor中的
_owner就会被设置为null,同时_count-1,并且将该线程放到_WaitSet中,等待下一次被唤醒。

因为这个锁存在于对象头上,这就是为什么Java中每个对象都可以成功锁的原因

下面我们来看一下,加上Synchronized关键字的方法和没有加上Synchronized关键字的同步代码块所生成的字节码
在实例方法上加上Synchronized关键字的字节码

public synchronized void incr();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=3, locals=1, args_size=1
         0: aload_0
         1: dup
         2: getfield      #2                  // Field num:I
         5: iconst_1
         6: iadd
         7: putfield      #2                  // Field num:I
        10: return
      LineNumberTable:
        line 11: 0
        line 12: 10
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      11     0  this   Lcom/example/demo/VolatileTest;

在方法内存加上Synchronized关键字的同步代码块的字节码

public void incr();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=3, locals=3, args_size=1
         0: aload_0
         1: dup
         2: astore_1
         3: monitorenter
         4: aload_0
         5: dup
         6: getfield      #2                  // Field num:I
         9: iconst_1
        10: iadd
        11: putfield      #2                  // Field num:I
        14: aload_1
        15: monitorexit
        16: goto          24
        19: astore_2
        20: aload_1
        21: monitorexit
        22: aload_2
        23: athrow
        24: return
      Exception table:
         from    to  target type
             4    16    19   any
            19    22    19   any
      LineNumberTable:
        line 11: 0
        line 12: 4
        line 13: 14
        line 14: 24
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      25     0  this   Lcom/example/demo/VolatileTest;
      StackMapTable: number_of_entries = 2
        frame_type = 255 /* full_frame */
          offset_delta = 19
          locals = [ class com/example/demo/VolatileTest, class java/lang/Object ]
          stack = [ class java/lang/Throwable ]
        frame_type = 250 /* chop */
          offset_delta = 4

从以上可以看出,两种方式生成的字节码是不一样的,一个是生成一个ACC_SYNCHRONIZED标志,另外一个是生成三条指令:monitorenter/monitorexit,其中会生成两条monitorexit指令。下面来分析一下这两条执行
会干什么事情:

  • monitorenter:在执行该指令时,首先会尝试获取对象锁,也就是上面提到的Monitor对象,如果这个对象没有被锁定或者这个线程已经获得了这个对象锁,那么_owner就会被设置为该线程,同时_count+1。
  • monitorexit:该指令与monitorenter指令对应,也就是对象锁的释放,那么_owner就会被设置为null,同时_count-1,并且将线程放入_WaitSet中。
    以上字节码中出现了两个monitorexit指令,但其实只会被执行一次,第二个monitorexit指令是在出现异常时执行,也就是说,通过Synchronized关键字加锁,当程序出现异常,会自动释放锁。
    再来看ACC_SYNCHRONIZED标志,通过Synchronized关键字标记到方法上时,并没有monitorenter和monitorexit指令,原因是JVM通过ACC_SYNCHRONIZED标志来标记该方法是一个同步方法,进而执行上面_count+1等等这些操作。

JVM对Synchronized的优化

锁的状态有四种:无锁状态、偏向锁、轻量级锁、重量级锁,锁可以从偏向锁升级成轻量级锁,也可以从轻量级锁升级成为重量级锁,锁的升级是单向的。

  • 自旋锁:线程的阻塞和唤醒需要从用户态切换到内核态,这个操作是很消耗资源的,如果切换的时间间隔非常短,那么我们可以采用循环的方式获得锁,不让出CPU时间片。
  • 自适应自旋锁:在JDK1.6以后引入了自适应自旋锁,是在自旋锁上面再一次的优化,表示循环的时间不在固定了,而是由前一次在同一个锁上的自旋时间和锁拥有者的状态来决定。
  • 锁消除:JVM在运行时,对一些代码上要求同步执行,但检查出根本不可能存在共享数据竞争的锁进行清除。避免不必要的资源浪费。
  • 锁粗化:把多个小锁改成一把大锁
  • 偏向锁:锁并不只是有多个线程竞争,还有可能同一个线程多次获得,这个时候就出现了偏向锁,获得锁的线程不需要做任何同步操作,这样就节省了不必要的资源损耗。
  • 轻量级锁:如果偏向锁失败,即获得锁的线程和本次竞争锁的线程不是同一个,偏向锁失败以为着不能避免同步操作,这个时候JVM会将偏向锁升级成为轻量级锁。它的本意是没有多线程竞争的情况下。
  • 重量级锁:Synchronized是通过Monitor来实现的,而Monitor又是依赖底层操作系统的Mutex Lock来实现的,这个时候操作系统会线程的阻塞和唤醒需要从用户态转换为内核态,这个操作非常耗时,这就是为什么Synchronized
    耗时的原因,这种依赖操作系统Mutex Lock来实现的锁也称之为重量级锁。

并发编程

Java并发的分类

并发由以下三个方面组成

  • 并发安全:并发安全也可以分为三个部分,互斥同步,非互斥同步,无同步方案
  • 并发性能:线程池
  • 线程协作:并发工具

线程池

java.util.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,该类继承链:ThreadPoolExecutor -> AbstractExecutorService -> ExecutorService -> Executor.
创建线程池的核心参数有:

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:除核心线程外的其他线程的存活时间,当线程多久时间没有执行任务时会被回收
  • unit:参数keepAliveTime的时间单位
  • workQueue:工作队列,当任务量超过了核心线程数过后的任务将堆积到该队列中。通常情况下,用三种常用的队列,分别是:SynchronousQueue(直接交换队列,相当于队列不存在,直接将任务分派给线程)、
    LinkedBlockingQueue(无界队列,队列长度不限,但当任务量过大时,会导致OOM异常)、ArrayBlockingQueue(有界队列,该队列在创建时必须指定长度)
  • threadFactory:线程创建工厂
  • handler:当工作队列被填满并且线程数达到最大线程数时如果还有新的任务添加进来的拒绝策略。
    拒绝时机会发生在两种情况:
    1.当前线程次调用了shutdown方法过后,但还没有执行完队列中的任务时。
    2.工作队列已满,池中线程已达到最大线程数时。
    该策略有四种,分别是:
    1.ThreadPoolExecutor.AbortPolicy(丢弃任务直接抛出异常)
    2.ThreadPoolExecutor.DiscardPolicy(拒绝接受新的任务,不抛出异常)
    3.ThreadPoolExecutor.DiscardOldestPolicy(丢弃老的任务,使新的任务得到执行,不抛出异常)
    4.ThreadPoolExecutor.CallerRunsPolicy(让调用者执行该任务)

线程池的执行原理:
线程池初始化完成过后默认不创建任何线程(我们可以通过调用方法来使其初始化完成就创建线程),当有新的任务进来,线程池才会创建新的线程,直到创建的线程等于核心线程数(corePoolSize)过后,之后的
任务都会堆积到工作队列(workQueue)中,当工作队列堆积的任务量达到队列的容量过后,线程池就会新增线程,直到线程池中的线程数量达到最大线程数(maximumPoolSize),达到最大线程数之后的所有任务
请求都会被拒绝。

线程池的创建于销毁

线程池的创建可以分为手动创建和自动创建

  • 手动创建:通过new一个ThreadPoolExecutor对象创建一个线程池对象,注意参数。
  • 自动创建:JDK为我们提供了几个默认的线程池,通过工具类Executors的静态方法创建。分别是:
    1. FixedThreadPool:创建固定线程数的线程池,该类线程池的corePoolSizemaximumPoolSize相同且都是用户自定义的,workQueue是无界队列LinkedBlockingQueue
    2. SingleThreadExecutor:创建只有一个线程的线程池,该类线程池和FixedThreadPool一样,唯一的区别在于corePoolSizemaximumPoolSize的值都是1。
    3. CachedThreadPool:创建一个核心线程数为0,最大线程数无限大并且workQueueSynchronousQueue的线程池
    4. ScheduledThreadPool:创建一个定时、周期性执行的线程池,有点类似定时任务。该类线程池的核心线程数是用户自定义的,最大线程数可以无限大,workQueue是延时队列DelayedWorkQueue

线程池的关闭有两种方式,分别是shutdown/shutdownNow

  • shutdown:该方法并不会立即终止线程池,而是告诉线程池我需要关闭,当线程池执行完工作队列中的任务过后优雅停止,值得注意的是,当执行了shutdown方法过后,线程池的状态变更为SHUTDOWN,该状态
    的线程池不接受新的任务,但会执行工作队列中已经存在的任务。
  • shutdownNow:该方法会立即终止线程池,并且正在执行的线程会立刻返回。线程池的状态变更为STOP

线程池的状态

线程池有五种状态,分别是:

  • RUNNING:该状态下的线程池接受并处理任务
  • SHUTDOWN:该状态下的线程池不接受新的任务,但会执行已存在于工作队列中的任务
  • STOP:该状态下的线程池不接受新的任务,也不执行工作队列中的任务,并且中断正在执行的任务
  • TIDYING:当池中线程执行完并且工作队列中的任务都被执行完成,并且调用钩子方法terminate()
  • TERMINATED:钩子方法terminate()执行完成

Future和Callable

Runnable和Callable的区别在于Runnable没有返回值且不能跑出检查的异常,而Callable可以
Future的五个重要方法:

  • boolean cancel(boolean):该方法用于尝试取消任务的执行,如果该任务已结束或者已取消时返回false。参数接收一个boolean变量,当传入true时,表示强制停止的意思,传入false时,只有当任务还未开始时可以被中断。
  • boolean isCancelled():判断当前任务是否被取消
  • boolean isDone():当前任务是否已完成,正常完成,取消执行,异常都会返回true
  • V get() throws InterruptedException, ExecutionException:返回任务的执行结果,如果当前任务还么有执行完毕,该方法将阻塞
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException:指定阻塞的时长,在这段时间内如果返回则继续执行,如果超时,则抛出TimeoutException异常。

ThreadLocal

ThreadLocal使用场景

  • 每个线程独享一个对象,例如每个线程中创建一个SimpleDataFormat对象,该方式需要重写initialValue方法来创建对象,因为SimpleDataFormat对象不受其他因素影响,PS:
public static ThreadLocal<SimpleDateFormat> formatLocal = ThreadLocal.initialValue(() -> new SimpleDateFormat("yyyy-MM-dd"));
  • 每个线程中保存一个全局变量,例如用户信息,每个线程的用户信息不一样,该方式使用set和get方法来存取对象,因为一开始我们并不知道当前用户是哪个。例如:线程A设置了一个值到ThreadLocal中,线程A
    在任何时候都可以通过get方法获取到这个值,然而在其他线程中是无法获取的。

原理

一个Thread对应一个ThreadLocalMap,而一个ThreadLocalMap中包含多个ThreadLocal,也就是说一个线程可以包含多个ThreadLocal对象,ThreadLocal中的对象其实都是保存在线程对象中的threadLocals变量中的,
在ThreadLocal中有三个比较重要的方法,分别是:

  • setInitialValue():该方法首先会获取到当前线程,并且获取当前线程的threadLocals属性,判断该属性是否为null,如果为null则创建,不为null则调用initialValue方法,获取到对象,然后将其设置到当前
    线程的threadLocals属性的map中。
  • set():该方法和setInitialValue方法一样,首先会判断threadLocals属性是否为null,若为null,则创建,若不为null,则将value设置到threadLocals属性map中。
  • get():该方法从当前线程的threadLocals属性中获取值,如果threadLocals属性为null,则调用setInitialValue方法,若不为null,则直接获取值且返回。

注意事项

  • 不能再ThreadLocal中set一个共享变量(静态变量),如果set的是一个共享变量,那么ThreadLocal通过get方法得到的仍然是共享的,毫无意义
  • 避免ThreadLocal内存泄漏,什么是内存泄漏?内存泄漏是指,对象在程序中已经没有意义了,但是不能被GC回收,常驻内存,最终导致OOM异常的发生。在使用完ThreadLocal对象过后,应该手动调用remove()
    方法释放掉ThreadLocal对象的强引用value

Lock接口

该接口有六个方法

class Lock {
    // 获取锁  
    void lock();
    
    // 如果当前线程未被中断,则获取锁,可以响应中断  
    void lockInterruptibly(); 
    
    // 返回绑定到此 Lock 实例的新 Condition 实例  
    Condition newCondition();
    
    // 仅在调用时锁为空闲状态才获取该锁,可以响应中断  
    boolean tryLock();
    
    // 如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁  
    boolean tryLock(long time, TimeUnit unit);
    
    // 释放锁  
    void unlock();
}

以上方法重点介绍lockInterruptibly方法,该方法可以响应中断,也就是说,如果多个线程同时获取同一把锁时,有且只有一个线程能成功获取到锁,然而其他线程就只能等待,这个时候,正在等待的线程
调用interrupt方法,就能够中断该线程的等待过程

锁的分类

  • 根据线程是否需要锁住同步资源进行分类:悲观锁、乐观锁
  • 根据线程是否共享一把锁进行分类:共享锁、独占锁
  • 根据线程竞争时是否排队进行分类:公平锁、非公平锁
  • 根据同一个线程是否可以重复获取同一把锁进行分类:可重入锁、不可重入锁
  • 根据线程是否可中断进行分类:可中断锁、不可中断锁
  • 根据线程等锁的方式进行分类:自旋锁、非自旋锁

各种类型锁的解释

  • 悲观锁:总是设想最坏的情况,认为每个线程都是去修改数据。共享资源只能让一个线程使用,其他线程只能等待当前线程处理完毕释放过后才能访问。例如:数据库行锁、表锁
  • 乐观锁:总是设想最好的情况,认为读多写少。多个线程读取同一个共享资源是不需要加锁的,但是在修改数据的时候通过上一次修改的版本来控制当前数据是最新的。
  • 共享锁:该锁可以同时被多个线程所持有,用于多个线程读取共享数据提高效率,例如ReentrantReadWriteLock的读锁
  • 独占锁:独占锁又称排它锁,该锁只能同时被一个线程所持有,例如ReentrantReadWriteLock的写锁。ReentrantReadWriteLock要么多个线程同时读,要么一个线程写
  • 公平锁:线程对于锁的争抢规则按照顺序获得。
  • 非公平锁:线程对于锁的争抢规则在满足一定条件时可插队,条件是:当前获得资源的线程执行完毕,在释放锁过后线程等待队列的第一个线程没有准备就绪,这个时候可以让已经准备就绪的线程先获得锁。
  • 可重入锁:当多个原子操作需要访问当同一临界资源时,并且这些原子操作有相互嵌套调用的情况,例如:方法A需要访问共享资源,并且方法A需要调用方法B,此时,我们通常会对方法A/B加锁,如果线程1
    访问方法A获得了锁,在调用方法B的时候,有需要获得锁,这个时候还能能获得成功。也就是说,同一个线程能多次获得通一把锁。
  • 不可重入锁:同一个线程对通一把锁只能获得一次。此类锁如果遇到可重入锁的场景会出现死锁的问题。
  • 可中断锁:当线程想要获取的锁被其他线程占用时,如果想要等待中的线程中断去做其他事情,那么可以调用interrupt方法使其中断。
  • 不可中断锁:当线程想要获取一个被其他线程所占用的锁时,当前线程只能等该锁被释放,否则一直等待下去,不能被中断。
  • 自旋锁:当一个线程想要获取一个被占用的锁时,当前线程会一直循环等待下去,直到获得锁为止。该方式主要是为了节省线程被挂起切换所带来的CPU资源开销。
  • 非自旋锁:当锁被占用时,其他线程将被放入等待队列等待锁释放。

ReentrantReadWriteLock简介

ReentrantReadWriteLock中有两把锁,一个是读锁、一个是写锁,读锁可以同时被多个线程持有,但是当读锁被占用时,其他线程不能获取写锁,当写锁被占用时,不能获取读锁,总之就是,要么多个线程
持有一把读锁,要么一个线程持有一把写锁。当ReentrantReadWriteLock是公平锁时,线程对ReentrantReadWriteLock的读写锁争抢规则为顺序,不允许插队;当ReentrantReadWriteLock是非公平锁时,
在满足条件的情况下允许插队,条件是:当前被占用的锁为读锁,并且等待队列的头节点也是读请求时允许插队。
锁降级:当一个操作既用到读锁也用到写锁时,我们希望该操作在一起完成,我们又不希望全局使用写锁(因为会浪费资源开销),我们可以将当前持有的锁进行降级,读锁不能升级成为写锁,但是写锁可以降级成为读锁。
也就是,在读锁里面不能获取写锁,但是在写锁里面可以获取读锁。

锁优化

JVM为我们做了锁的优化,例如:

  • 自旋锁和自适应:自动适应当前线程获取锁的过程采用自选还是非自旋,自选时长等等。
  • 锁清除:频繁的对锁进行同步与释放,是会消耗一定的系统资源的,有时候我们写的代码逻辑其实是不需要进行加锁的,这个时候我们可以通过编译器进行优化
  • 锁粗化:将很多个小逻辑加锁放大到一个操作里面,对这个操作加锁。也就是将多把小锁合并成为一把大锁。

原子类

原子旨在一个操作被执行时不允许被打断,要么该操作被执行,要么该操作不被执行,不允许出现该操作被分成多个部分,分别执行。在JUC中提供了几个常用的原子操作类AtomicInteger/AtomicLong/AtomicBoolean
还有将普通类型转换成为引用类型的原子操作类AtomicReference,原子数组类AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray,普通类型转换原子类型的
AtomicIntegerFieldUpdater/AtomicLongUpdater/AtomicReferenceUpdater,用于累加的LongAdder/DoubleAdder,用于累计特定公式的LongAccumulater/DoubleAccumulater
下面来分别介绍这几种原子类:

  • 基本原子操作类:例如AtomicInteger/AtomicLong/AtomicBoolean,前面两种主要方法有递增,递减,按指定数值累加一节一些CAS方法,而AtomicBoolean没有递增递减方法。
  • 引用原子操作类:AtomicReference,该类提供一个原子读写的对象引用变量,使用方式如下:
class Test {
    public static void main(String[] args) {
        AtomicReference<String> atomicReference = new AtomicReference<>();
        atomicReference.set("Hello");
        //只有当atomicReference中的值等于Hello时,才允许修改
        boolean b = atomicReference.compareAndSet("Hello", "World");
        System.out.println(b);
        System.out.println(atomicReference.get());
    }
}
  • 数组原子操作类:例如AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray,对原子数据的操作其实是对数组中的一个元素进行操作,操作室指定数组下标,方法和基本原子操作类相似。
  • 原子类转换器:例如AtomicIntegerFieldUpdater/AtomicLongUpdater/AtomicReferenceUpdater,该类将一个对象里面的基本数据类型字段转换升级成为原子类进行操作,最后将得到的结果
    自动放入该对象的属性中去,注意该属性不能是静态变量。
  • 累加器:例如LongAdder/DoubleAdder,累加器专门用于数组的递增递减,和基本原子操作类中的递增和递减功能一样,但是性能上有很大的区别,累加器的性能要比基本原子操作类的性能高很多,原因
    在于基本原子操作类在多线程情况下需要保证数据的可见性,也就是多个线程在各自的工作内存中进行递增操作,完成过后需要保证该数据对于其他线程是可见的,所以在线程退出时将数据刷新到主存中去,其他线程
    在递增操作之前需要重新从主存中重新获取,这就导致了额外的内存开销;而累加器则不然,累加器中有两个重要的字段base/cell,在并发较少的情况下直接使用base变量进行累加,当并发较大时,这个
    时候cell就起作用了,cell是一个数组,它为每一个线程开辟一个数组元素作为累加的结果存放区域,各自的线程只需要对自己相对应的元素进行累加就可以了,而最后在得到结果的时候讲base和cell中的各个
    元素求和就得到了总的结果。所以,累加器是一种以空间换时间的做法。
  • 用于累计特定公式:例如LongAccumulater/DoubleAccumulater,该类原子类可以指定计算公式,相当于是将上一次的计算结果赋值给x,accumulate方法的值赋予y,进行计算。如下:
class Test {
    public static void main(String[] args) {
        LongAccumulator accumulator = new LongAccumulator((x,y) -> x + y,5);
        accumulator.accumulate(5);
        System.out.println(accumulator.get());
    }
}

CAS(Compare And Swap)

CAS即比较再交换,是一种无锁算法,其底层是调用Unsafe工具。其思想很简单,也就是当我们要修改某一个变量的值时,先判断该变量的值是否和我们预期的值匹配,如果匹配,这将该变量修改为我们将要修改的值,并且返回true,表示修改成功,
如果不匹配,则返回false,表示修改失败。利用CAS思想写一个等价代码:

class Test {
    volatile int value = 0;
    
    //CAS方法需要保证其原子性
    public synchronized boolean compareAndSwap(int preValue,int newValue) {
        if(value==preValue){
            value = newValue;
            return true;
        }
        return false;
    }
}

CAS的缺点:ABA问题,当线程A将value的值从5修改为7过后,线程B又将value的值从7修改为5,线程C这个时候判断value的值是否为5,如果是,则修改,否则则不修改。这个时候修改成功,因为线程C任务该
值没有被改变过,但其实value的值以经变了。

并发容器

早期线程安全的容器,例如:Vector、Hashtable虽然都是线程安全的,但是他是使用synchronized修饰在各个方法上来实现的,性能非常低。
通过Collections.synchronizedList()工具的方法虽然可以将一个非线程安全的ArrayList或者HashMap转换成一个线程安全的容器,但其实现方式任然是使用synchronized关键字使用同步代码块的方式达到
线程安全的目的,虽然不是加在方法上,但其性能也是非常的低。
下面来一一介绍JUC为我们提供的并发容器类

  • CurrentHashMap:线程安全的HashMap,HashMap在JDK1.7中的实现是通过数据加链表实现的,当有数据需要put时,首先根据key进行hash,得到当前数据的下标,然后通过下标拿到该数组的一个元素,也称
    桶,这个桶就是链表,该种实现方式有一个缺点,当put的key出现hash冲突(多个key同时落到同一个桶上)时,该桶中的链表会越来越长,查询效率会很低。但是在JDK1.8中做了很大改动,当链表中节点数大于
    设定的阈值(默认为8)时将链表转为红黑树。回到CurrentHashMap,CurrentHashMap在JDK1.7中采用分段锁的方式来实现线程安全,将CurrentHashMap中的数据分成多个段,然后给每一个段加锁,段与段之间
    数据的操作互不干扰,所以不会出现线程安全问题,CurrentHashMap中定位一个元素需要两次hash操作,第一次得到segment,第二次定位到元素所在的链表。
  • CopyOnWriteArrayList:线程安全的ArrayList,支持读读共享,读写共享,写写互斥,该List的实现方式在于,可以有多个线程同时读,当有线程相对该List进行写入操作时,将原有的数据复制一份,然后
    让写入线程操作这份数据的副本,最后将该List的指针指向这个副本,从而达到读写互不干扰,此种方式有它的有点也有他的缺点,优点是,在读多写少的场景可以提高效率,在写的时候又能保证安全。缺点是,如果
    写比较多时,复制数据会造成内存开销。
  • BlockingQueue:阻塞队列,常用的阻塞队列有ArrayBlockingQueue、LinkedBlockingQueue。ArrayBlockingQueue是一个使用数据实现的有界队列,在初始化时需要制定队列长度,且生产者消费者
    使用同一把锁,数据在插入时是直接插入;LinkedBlockingQueue是一个基于单向链表无界阻塞队列,初始化时不需要指定队列长度,默认是Integer.MAX_VALUE,且生产者消费者使用的分别是putLock和
    takeLock,数据插入需要先包装成Node
  • SynchronousQueue:一个不存储元素的阻塞队列,该队列将生产者消息和消费者直接交换,当消费者处理不过来生产者的消息时,生产者线程在往里面生产消息时会阻塞,同样的,当生产者没有生产消息时,
    消费者消费消息也会被阻塞。
  • DelayQueue:延时队列,该队列可以根据消息的过期时间排序,然后根据过期时间分别出队,也就是说,只有过期的消息才能被消费者消费。该队列使用场景可以用于订单超时自动关闭,当订单生成时,将其
    创建时间及订单号等信息放到该队列中,当订单被用户手动关闭或者订单完成需要将该队列中的消息删除,删除时传入订单实体,如果匹配删除的订单需要重写订单实体的equals方法;当用户没有手动关闭订单并且
    没有完成订单的时候,超时的订单会被消费者消费,消费者线程将订单关闭;订单实体需要实现Delayed接口,该接口有两个方法compareTo/getDelaycompareTo用于排序,getDelay用于获取剩余过期时间。

并发工具

  • CountDownLatch:倒计时计数器,初始化大小,通过countDown()方法,每次递减1,直到0时,调用await()方法被阻塞的线程将被释放。常见示例,一等多/多等一,例如:五个运动与赛跑,同时等待裁判
    发号施令才能同时出发,这个时候,相当于五个线程等待一个人;跑步开始过后,裁判需要等待五个人全部到达终点过后,才能宣布比赛结束,这个时候,相当于一个线程等待五个线程。
  • Semaphore:信号量,也可以认为是限流器,指定数量的许可证(信号量),每个线程需要拿到(通过调用acquire()方法获得许可证)指定数量的许可证后才能执行下面的操作,完成过后将其释放(通过调用release()方法释放)
  • CyclicBarrier:循环栅栏,初始化时指定屏障个数,且指定解除屏障时运行的线程。当线程调用await()方法过后,表示阻塞一个,需要达到阻塞个数过后才能被释放。例如:某公司下班班车数量有限,一次只能
    装50个人,为了节约成本,必须要等齐50个人时才能发车,这个时候,第一个人来了,只能等待,知道50个人全部到齐过后才能发车。当51个人到来时,便安排第二辆班车(或者可以认为是第一辆车,因为
    CyclicBarrier是可以循环使用的)。
  • Exchanger:用于两个线程之间的数据交换,两个线程之间通过exchanger.exchange(obj)方法进行数据交换,当两个线程都执行到交换点(exchange方法)时,两个线程便可以交换数据。
  • Condition:线程间通信工具,和Object中的await和notify类似,但是,Condition的使用必须在lock()和unlock()方法之间,用于等待和唤醒线程。

AQS(AbstractQueuedSynchronizer)

AQS是一个抽象的队列同步器,使用模板方法模式。其核心思想是,一批线程请求共享资源,如果请求的共享资源空闲,则将当前请求的线程设置为有效的工作线程,将共享资源设置为占用(锁定)的状态,如果被请求的资
源状态为占用(锁定),则将当前请求的线程放到FIFO队列中并将其挂起,等待被唤醒。AQS将请求共享资源的线程封装成CLH锁队列的节点Node进行锁的分配。几乎所有的并发工具都是通过AQS实现的。
AQS三要素

  • state:state是一个被volatile关键字修饰的int变量,该变量在不同的并发工具类中有着不同的语义,而AQS的核心就是通过改变该值来控制并发安全的,而该值的原子性由CAS保证。
  • FIFO队列:一个先进先出的队列,AQS将没有获得共享资源的线程放入到该队列中。
  • 需要实现的获取和释放state的方法:该方法需要用户自己实现,意义在于,我们可以通过AQS实现多种多样的同步器,来达到自己的需求。
    AQS定义了两种资源共享方式,独占(只能一个线程同时访问)和共享(允许多个线程同时访问),自定义同步器需要实现的两组方法
  • boolean tryAcquire(int):独占的方式获取资源,成功返回true,失败返回false
  • boolean tryRelease(int):独占的方式释放资源,成功返回true,失败返回false
  • boolean tryAcquireShared(int):共享的方式获取资源,0表示成功,正数表示成功,且有剩余资源,负数表示失败。
  • boolean tryReleaseShared(int):共享的方式释放资源,释放后允许唤醒后续等待的节点返回true,否则返回false。

高性能缓存示例

Computeable顶层接口

/**
 * 计算接口
 * @param <K>
 * @param <V>
 */
@FunctionalInterface
public interface Computeable<K,V> {
    V compute(K k) throws Exception;
}

实现Computeable接口的业务类

/**
 * 没有缓存的计算
 */
public class NoCacheCompute implements Computeable<String,Integer> {
    @Override
    public Integer compute(String k) {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("缓存未命中,走原有逻辑");
        return Integer.valueOf(k);
    }
}

包装NoCacheCompute带有缓存功能的Computeable实现类,这里使用包装器模式,主要作用是解耦,实现低侵入,低耦合,代码可重用性。

/**
 * 该类是NoCacheCompute的包装类
 */
public class CacheCompute<K,V> implements Computeable<K,V> {
    //被包装的计算器实例
    private Computeable<K,V> compute;
    //缓存容器,value使用Future是为了防止重复计算
    private final Map<K, Future<V>> cache = new ConcurrentHashMap<>();
    //该线程池用于删除过期的key
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
    /**
     * 包装不支持缓存的计算器
     * @param compute
     */
    public CacheCompute(Computeable<K,V> compute) {
        this.compute = compute;
    }
    /**
     * 包装器的计算方法
     * @param k
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Override
    public V compute(K k) throws ExecutionException, InterruptedException {
        //首先从缓存中获取执行
        Future<V> future = cache.get(k);
        //若缓存没有命中,则执行以下逻辑
        if(future==null){
            //创建一个任务去执行被包装的业务方法,这是没有被缓存命中时会被调用的方法。
            Callable<V> callable = () -> this.compute.compute(k);
            FutureTask<V> futureTask = new FutureTask<>(callable);
            //从来没有put过该key的元素
            future = cache.putIfAbsent(k, futureTask);
            if(future==null){
                //将刚刚put进去的futureTask赋值给future,用于后续get方法返回结果
                future = futureTask;
                //开始任务执行实际业务逻辑
                futureTask.run();
            }
        }
        //返回结果,如果任务没有执行完成,该方法会被阻塞。
        try {
            return future.get();
        } catch (ExecutionException e) {
            System.out.println("业务逻辑报错");
            cache.remove(k);
            return null;
        } catch (InterruptedException e) {
            System.out.println("任务中断");
            cache.remove(k);
            return null;
        } catch (CancellationException e) {
            System.out.println("取消任务执行");
            cache.remove(k);
            return null;
        }
    }
    /**
     * 设置过期的key
     * @param k
     * @param timeout
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public V compute(K k,long timeout) throws ExecutionException, InterruptedException {
        if(timeout>0){
            //将任务交给定时线程池执行
            executorService.schedule(() -> {
                expire(k);
            },timeout,TimeUnit.MILLISECONDS);
        }
        return compute(k);
    }
    /**
     * 过期Key
     * @param k
     */
    public synchronized void expire(K k){
        //首先冲缓存中获取future,若存在则执行以下逻辑
        Future<V> future = cache.get(k);
        if(future!=null){
            //若任务还未完成,将强制取消执行
            if(!future.isDone()){
                future.cancel(true);
            }
            //不管任务是否执行完毕,都将缓存中的key删除
            cache.remove(k);
            System.out.println(k + "已过期");
        }
    }
}

测试类

public class Test {
    private final static ExecutorService executorService = Executors.newFixedThreadPool(100);
    public static void main(String[] args) {
        Computeable computeable = new NoCacheCompute();
        CacheCompute<String,Integer> compute = new CacheCompute<>(computeable);
        Runnable runnable = () -> {
            try {
                Integer v = compute.compute("666", 5000);
                System.out.println(v);
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        for (int i = 0; i < 1000; i++) {
            executorService.submit(runnable);
        }
    }
}

以上示例参考慕课网悟空老师的实战课《玩转Java并发工具,精通JUC,成为并发多面手》

参考文献

https://www.cnblogs.com/dolphin0520/p/3920373.html
https://baijiahao.baidu.com/s?id=1612142459503895416&wfr=spider&for=pc&isFailFlag=1