首页 > 编程语言 > 详细

[Java]JUC并发编程

时间:2020-06-08 17:46:21      阅读:39      评论:0      收藏:0      [点我收藏+]

[Java]JUC并发编程

一、什么是JUC?

JUC是在并发编程中使用的工具类java.util.concurrent

二、虚假唤醒

public class FakeWake {

    public static void main(String[] args) {

        Data data = new Data();

        new Thread(()-> { for (int i = 0; i < 10; i++) data.increment(); }).start();

        new Thread(()->{ for (int i = 0; i < 10; i++) data.decrement(); }).start();

        new Thread(()->{ for (int i = 0; i < 10; i++)  data.increment();  }).start();

        new Thread(()->{ for (int i = 0; i < 10; i++) data.decrement(); }).start();

    }

}


class Data{

    private int num = 0;

    public synchronized void increment(){
        if(num != 0){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        num++;
        System.out.println(Thread.currentThread().getName()+"=>"+num);

        //唤醒其他线程
        this.notifyAll();
    }

    public synchronized void decrement(){
        if(num == 0){
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        num--;
        System.out.println(Thread.currentThread().getName()+"=>"+num);

        //唤醒其他线程
        this.notifyAll();
    }
}

运行结果:

Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-2=>2
Thread-0=>3
Thread-1=>2
Thread-1=>1
Thread-1=>0
Thread-0=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-3=>0
Thread-2=>1
Thread-3=>0
Thread-2=>1
Thread-3=>0

上述程序就存在虚假唤醒的问题。我们可以看到结果出现了2、3数字,这不符合我们的预期。我们现在来解释一下为什么会出现这种情况,就拿数字2举例:

  1. 第一个生产者线程开始执行,进入if判断,发现num不为0,然后执行this.wait()方法,进入等待队列等待唤醒。
  2. 第二个生产者线程开始执行,进入if判断,发现num不为0,同样执行this.wait()方法,进入等待队列等待唤醒。
  3. 第一个消费者线程开始执行,进入if判断,发现num不为0,执行num--操作,接着执行this.notifyAll()方法,唤醒等待队列的全部进程。
  4. 第一个生产者线程和第二个生产者线程被唤醒,第一个生产者开始执行num++操作,然后第二个生产者也执行了num++操作(也可能是第二个生产者先执行)。

解决办法:将生产方法和消费方法里的if判断改为while循环判断

三、生产者与消费者问题(Lock锁)

public class LockPAC {
    public static void main(String[] args) {

        Data2 data2 = new Data2();

        new Thread(()-> { for (int i = 0; i < 10; i++) data2.increment(); }).start();

        new Thread(()->{ for (int i = 0; i < 10; i++) data2.decrement(); }).start();

        new Thread(()->{ for (int i = 0; i < 10; i++)  data2.increment();  }).start();

        new Thread(()->{ for (int i = 0; i < 10; i++) data2.decrement(); }).start();

    }

}


class Data2{

    private int num = 0;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment(){
        lock.lock();
        try{
            while(num != 0) {
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName()+"=>"+num);
            //唤醒其他线程
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void decrement(){
        lock.lock();
        try {
            while(num == 0){
                condition.await();
            }
            num--;
            System.out.println(Thread.currentThread().getName()+"=>"+num);
            //唤醒其他线程
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

Condition实现精准通知和唤醒线程:

public class TestCondition {

    public static void main(String[] args) {

        Data3 data3 = new Data3();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data3.printA();
            }
        },"A").start();
        new Thread(()->{for (int i = 0; i < 10; i++) {
            data3.printB();
        }},"B").start();
        new Thread(()->{for (int i = 0; i < 10; i++) {
            data3.printC();
        }},"C").start();

    }

}


class Data3{

    private Lock lock = new ReentrantLock();
    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();
    Condition condition3 = lock.newCondition();
    private int flag = 1;

    public void printA(){
        lock.lock();
        try {
            //业务
            while(flag != 1){
                condition1.await();
            }
            System.out.println(Thread.currentThread().getName()+"=>A");

            flag = 2;
            condition2.signal();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }


    }

    public void printB(){
        lock.lock();
        try {
            //业务
            while(flag != 2){
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName()+"=>B");

            flag = 3;
            condition3.signal();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printC(){
        lock.lock();
        try {
            //业务
            while(flag != 3){
                condition3.await();
            }
            System.out.println(Thread.currentThread().getName()+"=>C");

            flag = 1;
            condition1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
}

四、不安全的集合

ArrayListHashMapHashSet都不安全,在多线程环境下应该使用如下方式:

public class TestCollection {

    public static void main(String[] args) throws InterruptedException {

        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();

        for (int i = 0; i < 10000; i++) {

            new Thread(()->{copyOnWriteArrayList.add("xxx");}).start();

        }

        Map stringIntegerMap = new ConcurrentHashMap();

        for (int i = 0; i < 10000; i++) {

            new Thread(()->{stringIntegerMap.put(UUID.randomUUID(),5);}).start();

        }

        Set set = new CopyOnWriteArraySet<>();

        for (int i = 0; i < 10000; i++) {

            new Thread(()->{set.add(UUID.randomUUID());}).start();

        }

        TimeUnit.SECONDS.sleep(1);

        System.out.println(stringIntegerMap.size());
        System.out.println(copyOnWriteArrayList.size());
        System.out.println(set.size());

    }

}

五、Collable

public class TestCallable {


    public static void main(String[] args) throws ExecutionException, InterruptedException {

        MyThread myThread = new MyThread();

        FutureTask futureTask = new FutureTask(myThread);

        new Thread(futureTask).start();
        new Thread(futureTask).start();//有缓存

        Object o = futureTask.get();//会阻塞
        System.out.println(o);


    }

}

class MyThread implements Callable<String>{

    @Override
    public String call() throws Exception {
        System.out.println("call()");
        return "1024";
    }
}

六、常用的辅助类

CountDownLatch:

countDownLatch.countDown();//计数器减一

countDownLatch.await();//等待计数器归零

CyclicBarrier:

CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
    System.out.println("成功!");
});
cyclicBarrier.await();

Semaphore:

public class TestSemaphore {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);//限流

        for (int i = 0; i < 6; i++) {

            new Thread(()->{

                    try {
                        semaphore.acquire();//假如满了等待
                        System.out.println(Thread.currentThread().getName()+"抢到车位!");
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println(Thread.currentThread().getName()+"离开车位!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        semaphore.release();//当前信号量加一,然后等待唤醒的线程
                    }

            },String.valueOf(i+1)).start();
        }
    }
}

七、读写锁

读写锁能更加细粒度的控制加锁解锁操作。

ReadWriteLock readWriteLock = new ReadWriteLock();

readWriteLock.readlock().lock();

readWriteLock.writelock().lock();

八、阻塞队列

常见的阻塞队列:

技术分享图片

BlockingQueue的方法:

方式 抛出异常 有返回值,不抛出异常 阻塞等待 超时等待
添加 add offer put offer(,,)
移除 remove poll take poll(,)
检测队首元素 element peek - -

九、线程池

线程池在系统启动时即创建大量空闲线程,程序将一个Runnable对象或一个Callable对象传给线程池,线程池就会启动一个线程来执行它们的run()或call()方法,方法执行完后该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一次任务的到来。

使用线程池原因:

  1. 当同时并发多个网络线程时,引入线程池技术会极大地提高APP的性能。
  2. 显著减少了创建线程的数目(系统创建一个新线程的成本是比较高的,因为涉及与操作系统的交互)。
  3. 防止内存过度消耗。控制活动线程的数量,防止并发线程过多。

Executors.newSingleThreadExecutor();//单个线程

Executors.newFixedThreadExecutor();//一个固定的线程池

Executors.newCachedThreadExecutor();//可伸缩的

Executors.newScheduledThreadPool();//这个池子里的线程可以按schedule依次delay执行,或周期执行

自定义线程池:

public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
                          int maximumPoolSize,//最大核心线程池大小
                          long keepAliveTime,//超时了没有人用就会释放
                          TimeUnit unit,//超时单位
                          BlockingQueue<Runnable> workQueue,//阻塞队列
                          ThreadFactory threadFactory,//线程工厂
                          RejectedExecutionHandler handler//拒绝策略) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

四种拒绝策略:

  1. AbortPolicy:人达到最大之后,再进来人就抛出异常。
  2. CallerRunsPolicy:哪来的去哪里。
  3. DiscardPolicy:丢掉任务不抛出异常。
  4. DiscardOldestPolicy:满了,就尝试和最早的竞争,不会抛出异常。
Runtime.getRuntime().availableProcessors();//获取CPU的核数

CPU密集型:线程池中线程个数应尽量少,不应大于CPU核心数。

IO密集型:由于IO操作速度远低于CPU速度,那么在运行这类任务时,CPU绝大多数时间处于空闲状态,那么线程池可以配置尽量多些的线程,以提高CPU利用率。

十、四大函数式接口

函数式接口:只有一个方法的接口。

Function:传入参数T,返回参数I。

Predicate:断定性接口,传入一个参数,返回布尔值。

public static void main(String[] args) {
        Function function = new Function<String, String>() {
            @Override
            public String apply(String s) {
                return s;
            }
        };
        System.out.println(function.apply("ssss"));

//        Predicate<String> predicate = new Predicate<String>() {
//
//            @Override
//            public boolean test(String s) {
//                return false;
//            }
//        };

        Predicate<String> predicate = (str) -> {
            return str.isEmpty();
        };
        System.out.println(predicate.test("s"));
    }

Consumer:消费性接口,只有输入,没有返回值。

Supplier:生产型接口,没有参数输入,只有输出。

public class TestStream {

    public static void main(String[] args) {

        Consumer consumer = (str)->{ System.out.println(str); };

        consumer.accept("s");

        Supplier supplier = ()->{return "s";};
        System.out.println(supplier.get());
        
    }

}

十一、Stream流式计算

public static void main(String[] args) {
    User u1 = new User(12, "A", 13);
    User u2= new User(13, "B", 24);
    User u3 = new User(14, "C", 34);
    User u4 = new User(15, "D", 43);
    User u5 = new User(16, "E", 15);

    List<User> list = Arrays.asList(u1, u2, u3, u4, u5);

    //求得学生id为偶数、年龄大于23的学生并把学生姓名转换为大写、逆字典序排列,最后只输出第一个学生。
    list.stream().filter(u->{return u.getId()%2==0;})
            .filter(u->{return u.getAge()>23;})
            .map(u->{return u.getName().toUpperCase();})
            .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
            .limit(1)
            .forEach(System.out::println);

}

十二、ForkJoin

public class TestSum {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test1();//844
        test2();//838
        test3();//
    }

	//普通求和
    public static void test1(){

        long start = System.currentTimeMillis();
        long sum = 0;
        for (long i = 0; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println(sum+" "+(end-start));
       

    }

	//ForkJoin求和
    public static void test2() throws ExecutionException, InterruptedException {

        long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        TestForkJoin testForkJoin = new TestForkJoin(0,10_0000_0000);
        ForkJoinTask<Long> submit = forkJoinPool.submit(testForkJoin);
        Long sum = submit.get();
        long end = System.currentTimeMillis();
        System.out.println(sum+" "+(end-start));
    }

	//stream并行流
    public static void test3(){

        long start = System.currentTimeMillis();
        long sum = LongStream.rangeClosed(0,10_0000_0000).parallel().reduce(0,Long::sum);
        long end = System.currentTimeMillis();
        System.out.println(sum+" "+(end-start));
    }



}
public class TestForkJoin extends RecursiveTask<Long> {
    private long start;
    private long end;

    private long temp = 10000;

    public TestForkJoin(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if(end-start<temp){
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else{
            long middle = (start+end)/2;
            TestForkJoin task1 = new TestForkJoin(start,middle);
            task1.fork();//拆分任务,把任务压入线程队列
            TestForkJoin task2 = new TestForkJoin(middle+1,end);
            task2.fork();
            return task1.join()+task2.join();
        }
    }
}

十三、异步回调

public class TestCompletableFuture {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
        });

        System.out.println("111");
        completableFuture.get();


        //有返回值的
        CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
            return 1024;
        });


        System.out.println(uCompletableFuture.whenComplete((t, u) -> {
            System.out.println("t=>" + t);//正常的返回结果
            System.out.println("u=>" + u);//异常
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 233;
        }).get());
    }

}

十四、JMM

JMM:java内存模型。

JMM有八种内存操作:

  1. lock(锁定):作用于主内存的变量,它把一个变量标识为一条线程独占的状态。
  2. unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
  3. read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用。
  4. load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
  5. use(使用):作用于工作内存的变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用到变量的值的字节码指令时将会执行这个操作。
  6. assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
  7. store(存储):作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存 中,以便随后的write操作使用。
  8. write(写入):作用于主内存的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中。

十五、volatile

volatile是Java语言的一个关键字,是Java虚拟机提供轻量级的同步机制

volatile的特性:

  1. 保证可见性。
  2. 不保证原子性。
  3. 禁止指令重排。

可见性:

public class TestVolatile {

    private static  volatile int num = 0;


    public static void main(String[] args) throws InterruptedException {
        new Thread(()->{
            while(num == 0){
                
            }
        }).start();


        TimeUnit.SECONDS.sleep(2);

        num = 1;

        System.out.println(num);
    }

}

不保证原子性:

public class TestVolatile2 {

   
    
    private static volatile int num = 0;
     //可使用原子类保证原子性
    //private static AtomicInteger num = new AtomicInteger();

    public static void add(){
        num++;
        //num.getAndIncrement();
    }

    public static void main(String[] args) {

        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                for (int i1 = 0; i1 < 1000; i1++) {
                    add();
                }
            }).start();
        }

        while (Thread.activeCount() > 2){
            Thread.yield();
        }

        System.out.println(Thread.currentThread().getName()+"num=>"+num);

    }

}

禁止指令重排:

int i = 0;              
boolean flag = false;
i = 1;                //语句1  
flag = true;          //语句2

上面代码定义了一个int型变量,定义了一个boolean类型变量,然后分别对两个变量进行赋值操作。从代码顺序上看,语句1是在语句2前面的,那么JVM在真正执行这段代码的时候会保证语句1一定会在语句2前面执行吗?不一定,为什么呢?这里可能会发生指令重排序(Instruction Reorder)。比如上面的代码中,语句1和语句2谁先执行对最终的程序结果并没有影响,那么就有可能在执行过程中,语句2先执行而语句1后执行。因为处理器在进行重排序时是会考虑指令之间的数据依赖性,如果一个指令Instruction 2必须用到Instruction 1的结果,那么处理器会保证Instruction 1会在Instruction 2之前执行。从上面可以看出,指令重排序不会影响单个线程的执行,但是会影响到线程并发执行的正确性

//x、y为非volatile变量
//flag为volatile变量
x = 2;        //语句1
y = 0;        //语句2
flag = true;  //语句3
x = 4;         //语句4
y = -1;       //语句5

由于flag变量为volatile变量,那么在进行指令重排序的过程的时候,不会将语句3放到语句1、语句2前面,也不会讲语句3放到语句4、语句5后面。但是要注意语句1和语句2的顺序、语句4和语句5的顺序是不作任何保证的。并且volatile关键字能保证,执行到语句3时,语句1和语句2必定是执行完毕了的,且语句1和语句2的执行结果对语句3、语句4、语句5是可见的。

“观察加入volatile关键字和没有加入volatile关键字时所生成的汇编代码发现,加入volatile关键字时,会多出一个lock前缀指令”。lock前缀指令实际上相当于一个内存屏障(也称作内存栅栏),内存屏障会提供3个功能:

  1. 它确保指令重排序时不会把其后面的指令排到内存屏障之前的位置,也不会把前面的指令排到内存屏障的后面;即在执行到内存屏障这句指令时,在它前面的操作已经全部完成。
  2. 它会强制将对缓存的修改操作立即写入主存。
  3. 如果是写操作,它会导致其他CPU中对应的缓存行无效,此时在其他CP中运行的线程必须重新读取主存内容。。

十六、深入理解CAS

  • CAS是英文单词Compare And Swap的缩写,中文意思是:比较并替换。CAS需要有3个操作数:内存地址V旧的预期值A,即将要更新的目标值B
  • CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。

AtomicInteger.getAndIncrement()是原子操作,它调用的是getAndAddInt方法,最后调用的compareAndSwapInt方法,即CAS

getAndAddInt方法源码:

技术分享图片

getAndAddInt方法解析:拿到内存位置的最新值var5,使用CAS尝试修将内存位置的值修改为目标值var5+var4,如果修改失败,则获取该内存位置的新值v,然后继续尝试,直至修改成功。

compareAndSwapInt源码:

技术分享图片

Atomic::cmpxhg源码:(windows_x86)

技术分享图片

Atomic::cmpxchg方法解析:

mp是“os::is_MP()”的返回结果,“os::is_MP()”是一个内联函数,用来判断当前系统是否为多处理器。如果当前系统是多处理器,该函数返回1,否则,返回0。LOCK_IF_MP(mp)会根据mp的值来决定是否为cmpxchg指令添加lock前缀。如果通过mp判断当前系统是多处理器(即mp值为1),则为cmpxchg指令添加lock前缀。否则,不加lock前缀。这是一种优化手段,认为单处理器的环境没有必要添加lock前缀,只有在多核情况下才会添加lock前缀,因为lock会导致性能下降。cmpxchg是汇编指令,作用是比较并交换操作数。

CAS的缺点:

  1. 循环时间长开销很大。
  2. 只能保证一个共享变量的原子操作。
  3. 存在ABA问题。

ABA问题:

如果内存地址V初次读取的值是A,并且在准备赋值的时候检查到它的值仍然为A,那我们就能说它的值没有被其他线程改变过了吗?如果在这段期间它的值曾经被改成了B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过。这个漏洞称为CAS操的“ABA”问题。Java并发包为了解决这个问题,提供了一个带有标记的原子引用类AtomicStampedReference,它可以通过控制变量值的版本来保证CAS的正确性。因此,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。

[Java]JUC并发编程

原文:https://www.cnblogs.com/baihan/p/13066555.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!