05-java并发编程(进阶)

2022-07-18

05-java并发编程(进阶)

java并发编程(进阶篇)

一. volatile关键字

1.1 作用

强制线程到共享内存中读取数据,而不从线程工作内存中读取,从而使变量在多个线程间可见

一般意义上,一个共享变量会有一个共享内存,然后每个线程都有各自的工作内存,当操作数据时,线程会将共享变量从共享内存中读取到自己的工作内存,因此在操作期间,各个线程之间是不可见的,这样就会引发线程安全问题。
使用volatile关键字,则会忽略从共享内存读取到工作内存,在工作内存中进行独立操作的步骤。而是直接从共享内存中拿数据,操作,然后写入。

不使用volatile: 从共享内存读取到工作内存-> 在工作内存中读取到程序并操作-> 写入工作内存-> 写入共享内存

使用volatile: 从共享内存中读取到程序并操作-> 写入到共享内存

因此,解决了线程可见性的问题
  • 例子
package base;
import java.util.ArrayList;
import java.util.List;

public class DemoThread13{
    private List<String> list = new ArrayList<String>();
    private /*volatile*/ boolean canGet = false;
  
    public void put(){
        for(int i=0;i<10;i++){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            list.add("A");
            System.out.println("线程"+Thread.currentThread().getName()+"添加第"+i+"个元素");
            if(i==5){
                //循环到第次则通知其他线程开始获取数据进行处理
                canGet = true;
                System.out.println("线程"+Thread.currentThread().getName()+"发出通知");
            }
        }
    }

    public void get(){
        while(true){
            if(canGet){
                for(String s:list){
                    System.out.println("线程"+Thread.currentThread().getName()+"获取元素:"+s);
                }
                break;
            }
        }
    }

    public static void main(String[] args) {
        final DemoThread13 demo = new DemoThread13();

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo.put();
            }
        },"t1").start();;

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo.get();
            }
        },"t2").start();
    }
}
  • 如果不使用volatile关键字,线程1在修改canGet变量时,会拷贝一份到自己的工作内存,修改完毕后,立刻写入共享内存,但是由于线程2是死循环一直占用cpu时间,因此一直读取的是自己工作内存中的canget,因此不会执行if。这时候如果在while里面休眠1s,那么将会从共享内存中读取到更改后的canget,就可以执行if了
  • 使用volatile关键字后,线程1,2都会强制到共享内存中读取canget,因此线程1修改后,线程2可以立马读到共享内存被修改的数据

1.2 缺点

volatile无法保证原子性,属于轻量级的同步,性能比synchrozied强很多,但是只能保证可见性,不能替代synchronized。不能替代其的同步功能。

netty框架中大量使用了volatile

1.3 和static比较

  • static保证唯一性,不保证一致性,多个实例共享一个静态变量
  • volatile保证一致性,不保证唯一性,多个实例有多个volatile变量

二. Atomic类

2.1 作用

java.util.concurrent.atomic.AtomicInteger;

可以很好的保证原子性,即一个同步的效果

但是只能保证共享变量的原子性,对成员方法的原子性不能解决

即变量操作是原子性的,但是操作变量的方法不是原子性的,多个线程还是可以进来,只不过进来操作数据的时候是原子性的,同步的(Atomic内置的操作数据的方法是原子性的,比如get,add方法)

2.2 原理

CAS非锁机制(compareAndSet)Compare and Swap

CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。

更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。

例如:内存中有一个为10的变量x

线程1: 任务是 x++

线程1去读取内存中的x,此时,V=10,A=10,B=11
这时候,线程2抢先执行了任务 x--
此时,对于线程2,V=10,A=10,B=9
线程2执行后x变为了9,即V=9
此时对于线程1来说,V=9,A=10,B=11,线程1在执行任务时会比较A和V的值,发现不相等,则此次任务失败,不执行。然后重新从内存中读取新的值,此时V=9,A=9,B=10,执行成功,最后x=10

从思想上来说,synchronized属于悲观锁,悲观的认为程序中的并发情况严重,所以严防死守,CAS属于乐观锁,乐观地认为程序中的并发情况不那么严重,所以让线程不断去重试更新

  • CAS机制的缺点

    • CPU开销过大

    在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很到的压力。

    • 不能保证代码块的原子性

    • ABA问题

      内存中有变量A
      线程1的任务:将A变为B
      线程2的任务:将A变为B
      线程3的任务:将A变为A
      假设线程1,2先执行,线程3后执行,但是线程2由于某种原因阻塞了,那么执行流程如下:
      1. 线程1执行完毕,内存中A变成了B,此时线程2仍处于阻塞状态,然后线程3开始执行
      2. 线程3执行完毕后内存中B又变成了A,此时线程2恢复执行
      3. 线程2执行时,发现内存中为A,那么就会将A变成B,最后内存中值为B
      

      结合实际来看,这种情况就有很严重的漏洞

      小明有100元,取款50元
      由于某种原因,取款操作开启了两个线程,其中线程2阻塞
      此时,线程1,2的旧的预期值都为100,要修改的新值为50
      1. 线程1执行完毕,小明余额50元,此时线程2还在阻塞
      2. 这时候小明收款50元,此时余额为100元,线程2启动
      3. 对比内存中100元和预期值100相等,执行任务,余额为50
      这明显是一个不应该发生的事情
      
  • 解决ABA问题

    加入版本号,真正要做到严谨的CAS机制,我们在compare阶段不仅要比较期望值A和地址V中的实际值,还要比较变量的版本号是否一致。

    内存中有变量A
    线程1的任务:将A变为B
    线程2的任务:将A变为B
    线程3的任务:将A变为A
    那么每个线程的状态和版本号为:
    线程1: 旧的预期值为A,版本号为01,期望值为B
    线程2: 旧的预期值为A,版本号为01,期望值为B
    线程3: 旧的预期值为A,版本号为01,期望值为A
    假设线程1,2先执行,线程3后执行,但是线程2由于某种原因阻塞了,那么执行流程如下:
    1. 线程1执行完毕,内存中A变成了B,版本号变为02,此时线程2仍处于阻塞状态,然后线程3开始执行
    2. 线程3执行完毕后内存中B又变成了A,版本号为03,此时线程2恢复执行
    3. 线程2执行时,发现内存中为A,但是线程2的版本号为01,而内存中为03,那么判定任务失败,不执行
    
  • java中的实现

    public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                var5 = this.getIntVolatile(var1, var2);
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
            return var5;
        }
    

    jdk1.8后直接调用的unsafe

    什么是unsafe呢?Java语言不像C,C++那样可以直接访问底层操作系统,但是JVM为我们提供了一个后门,这个后门就是unsafe。unsafe为我们提供了硬件级别的原子操作。

    在Java中,AtomicStampedReference类就实现了用版本号作比较的CAS机制。

三. ThreadLocal

ThreadLocal实现了各个线程不共享同一变量,原理如下:

使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本

说白了,就是将共享变量创建副本,每个线程访问时都操作的是副本内容,彼此独立

四. 同步类容器

Vector,HashTable等古老的并发容器,都是使用Collections.synchronizedXXX等工厂方法创建的,并发状态下只能由一个线程访问容器对象.性能很低

都是基于整个容器加的锁

Collections.synchronizedCollection(c);
Collections.synchronizedList(list); 
Collections.synchronizedMap(m);
Collections.synchronizedSet(s); 
Collections.synchronizedSortedMap(m);
Collections.synchronizedSortedSet(s);

即可以将普通的容器更改为线程安全的

五. 并发类容器

5.1 ConcurrentHashMap

ConcurrentHashMap替代HashMap,HashTable

ConcurrentSkipListMap替代TreeMap

ConcurrentHashMap将Hash表分成16个segment,每个segment单独进行锁控制,从而减少了锁的粒度,提升了性能

5.2 COW

Copy On Write容器,写时复制容器

当我们往容器内添加元素时,不直接往当前容器添加,而是先将当前容器进行copy,复制出新的容器,然后往新的容器进行添加元素,添加好之后,再将原来的容器引用指向新的容器'

  • 优点

    • COW容器可以进行并发的读,不需要添加锁,是一种读写分离的模式
  • 缺点

    • 由于每次更新都会复制新容器,对内存占用较大,建议再高并发读的情况下使用
    • COW只能保证数据的最终一致性,不能保证数据的实时一致性
  • java中的应用

    CopyOnWriteArrayList
    CopyOnWriteArraySet
    
    public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();
            }
        }
    

    添加新的元素的时候,需要加锁,这个锁是独占锁,防止线程重入造成写冲突以及复制多个副本。

     public E get(int index) {
            return get(getArray(), index);
        }
    

    读的时候不需要加锁,如果读的时候有多个线程正在向ArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的ArrayList。

六. 并发-队列

6.1 无阻塞队列

ConcurrentLinkedQueue

无阻赛、无锁、高性能、无界队列(直至内存耗尽)、线程安全,使用CAS算法进行入队和出队操作

无阻塞,即元素为空时可以取,返回null,不会阻塞线程

继承自Queue接口,基本方法都和Queue相同,不能添加null,不然会报空指针异常

6.2 阻塞队列-ArrayBlockingQueue

基于数组实现的阻塞有界队列,创建时可指定长度,内部实现维护了一个定长数组用于缓存数据,内部没有采用读写分离,写入和读取数据不能同时进行,不允许null值

  • offer 如果队列已经满了,则不阻塞,不抛出异常
  • offer 可设置最大阻塞时间,2秒,如果队列还是满的,则不阻塞,不抛出异常
  • add 如果队列满了,则不阻塞,直接抛出异常
  • put 如果队列满了,则永远阻塞, 不抛出异常
  • peek 读取头元素不移除,队列为空,返回null,不阻塞, 不抛异常
  • poll 读取头元素并移除,队列为空,返回null,不阻塞, 不抛异常
  • poll 可指定阻塞时间,2秒,如果队列依然为空,则返回null,不抛异常
  • take 读取头元素并移除,如果队列为空,则永远阻塞,不抛出异常
  • drainTo 取出queue中指定个数(或全部)的元素放入list中,并移除,当队列为空时不抛出异常

6.3 阻塞队列-LinkedBlockingQueue

基于链表的阻塞队列,内部维护了一个链表存储缓存数据,支持写入和读取的并发操作,创建时可以指定长度也可以不指定,不指定时代表无界队列,不允许null值

内部采用读写分离的锁机制,所以支持写入和读取的并发操作

  • offer 如果队列已经满了,则不阻塞,不抛出异常
  • offer 可设置最大阻塞时间,2秒,如果队列还是满的,则不阻塞,不抛出异常
  • add 如果队列满了,则不阻塞,直接抛出异常
  • put 如果队列满了,则永远阻塞, 不抛出异常
  • peek 读取头元素不移除,队列为空,返回null,不阻塞, 不抛异常
  • poll 读取头元素并移除,队列为空,返回null,不阻塞, 不抛异常
  • poll 可指定阻塞时间,2秒,如果队列依然为空,则返回null,不抛异常
  • take 读取头元素并移除,如果队列为空,则永远阻塞,不抛出异常
  • drainTo 取出queue中指定个数(或全部)的元素放入list中,并移除,当队列为空时不抛出异常

put,take是阻塞的操作,poll,offer可以设置阻塞时间

6.4 阻塞队列-SynchronousQueue

没有任何容量,必须现有线程先从队列中take,才能向queue中add数据,否则会抛出队列已满的异常。

不能使用peek方法取数据,此方法底层没有实现,会直接返回null

  • 如果没有读取线程,则add方法会排除Queue Full异常,所以建议使用put方法,进行阻塞。
  • 如果没有写入线程,则poll方法会无法取到数据,所以建议设置poll方法的阻塞时长,或者使用take方法进行永久阻塞

这个队列相当于一个传送门,因为其是没有容量的,因此比如说t2线程先从其中take,take是阻塞操作,那么这个线程会被阻塞,此时,t1线程往里面add,这时候t2线程就取到了t1线程的数据。

可以高效的支持线程之间的信息传送,只是一个媒介,一个桥梁,本身不存储任何数据

6.5 阻塞队列-PriorityBlockingQueue

一个无界阻塞队列,默认初始化长度为11,也可以手动指定,但是队列会自动扩容。资源被耗尽时导致OutOfMemoryError.不允许使用null。不允许插入不可比较的对象(抛出ClassCastException),加入的对象实现Comparable接口

并不是在add之后进行排序优先级,而是在take之后才进行排序的

* 
 * 一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。
 * 此类不允许使用 null 元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。
 * 此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。
 * iterator() 方法中提供的迭代器并不 保证以特定的顺序遍历 PriorityBlockingQueue 的元素。
 * 如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。
 * 此外,可以使用方法 drainTo 按优先级顺序移除 全部或部分元素,并将它们放在另一个 collection 中。
 * 在此类上进行的操作不保证具有同等优先级的元素的顺序。如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。
 * 例如,以下是应用先进先出 (first-in-first-out) 规则断开可比较元素之间联系的一个类。
 * 要使用该类,则需要插入一个新的 FIFOEntry(anEntry) 来替换普通的条目对象。
 * 
 * 注意:加入到PriorityBlockingQueue中的元素不是立即排序的,是在调用take等读取方法之后
 * ----------------------------------------------
 * put、add方法实际调用了offer方法
 * ----------------------------------------------
 * peek 读取头元素不移除,队列为空,返回null,不阻塞, 不抛异常
 * poll 读取头元素并移除,队列为空,返回null,不阻塞, 不抛异常 
 * poll 可指定阻塞时间,2秒,如果队列依然为空,则返回null,不抛异常 
 * take 读取头元素并移除,如果队列为空,则永远阻塞,不抛出异常
 * drainTo 取出queue中指定个数(或全部)的元素放入list中,并移除,当队列为空时不抛出异常
 * 

6.6 阻塞队列-DelayQueue

 * Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。
 * 如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。
 * 当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。
 * 即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。
 * 例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
 * 内部元素需实现Delayed接口

模拟java代码

package base;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * ----------------------------------------------
 * Demo说明:模拟一种游戏的自动退出机制,例如甲乙丙三个人分别可以免费玩游戏3、2、1秒,到时自动踢出系统
 */
class User implements Delayed{

    private int id;
    private String name;
    private long endTime; //退出时间

    public User(int id, String name, long endTime) {
        this.id = id;
        this.name = name;
        this.endTime = endTime;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public long getEndTime() {
        return endTime;
    }
    public void setEndTime(long endTime) {
        this.endTime = endTime;
    }
    @Override
    public String toString() {
        return this.name;
    }

    //由于需要根据延时时间的长短,计算从数据中移除的顺序,所以需要实现compareTo方法计算优先级,类似优先级队列
    @Override
    public int compareTo(Delayed o) {
        User user = (User)o;
        if(this.endTime>user.getEndTime()){
            return 1;
        }else if(this.endTime<user.getEndTime()){
            return -1;
        }else{
            return 0;
        }
    }

    //计算剩余延迟时间;零或负值指示延迟时间已经用尽
    @Override
    public long getDelay(TimeUnit unit) {
        return this.endTime-System.currentTimeMillis();
    }

}

public class DemoThread34 {

    DelayQueue<User> delayQueue = new DelayQueue<User>();

    //登录游戏,加入队列
    public void login(User user){
        delayQueue.add(user);
        System.out.println("用户("+user.getId()+")"+user.getName()+"已登录,预计下机时间为"+user.getEndTime());
    }

    //退出游戏,移除队列
    public void logout(){
        try {
            System.out.println(delayQueue);
            User user = delayQueue.take();
//            User user = delayQueue.poll(); //不能使用poll方法,因为没有阻塞功能
            System.out.println("用户("+user.getId()+")"+user.getName()+"到时自动退出,时间为"+System.currentTimeMillis()+"...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //获取当前在线人数
    public int onlineSize(){
        return delayQueue.size();
    }

    public static void main(String[] args) {
        DemoThread34 demo = new DemoThread34();

        //用户登录,并设置退出时间
        demo.login(new User(1,"甲",30000+System.currentTimeMillis()));
        demo.login(new User(2,"乙",20000+System.currentTimeMillis()));
        demo.login(new User(3,"丙",10000+System.currentTimeMillis()));

        while(true){
            //监控到时用户
            demo.logout();
            //如果在线用户则退出
            if(demo.onlineSize()==0){
                break;
            }
        }

    }
}

  • 应用场景

    缓存到期删除,任务超时处理,空闲链接关闭等等


标题:05-java并发编程(进阶)
作者:mahaonan
地址:https://mahaonan.fun/articles/2022/07/18/1658147073654.html