首页 > 编程语言 > 详细

java juc

时间:2021-03-30 21:59:25      阅读:35      评论:0      收藏:0      [点我收藏+]

JUC


  • cpu多核
public class Test1 {
    public static void main(String[] args) {

        //获取cpu核数
        //cpu密集型  io密集型
        System.out.println(Runtime.getRuntime().availableProcessors());
    }
}
  • wait会释放锁,sleep不会释放
  • wait必须在同步代码块中,sleep任意位置
  • wait不要捕获异常,sleep必须捕获

Lock锁

  • 公平锁:先来后到
  • 非公平锁:可以插队(默认)
public ReentrantLock() {
    sync = new NonfairSync();
}
  • 与synchronized区别

    • Synchronized 内置的java关键字,lock是一个java类
    • Synchronized 无法判断获取锁的状态,lock可以判断是否获取了锁
    • Synchronized 会自动释放锁,lock必须手动释放锁,否则死锁
    • Synchronized 线程1(获得锁,阻塞),线程2(等待,继续等待);lock不一定会等待下去
    • Synchronized 可重入锁,不可以中断,非公平;lock,可重入锁,可以判断锁,非公平(可以自己设置)
    • Synchronized 适合锁少量代码同步问题,lock锁大量同步代码
  • 虚假唤醒

package juc;

public class A {
    public static void main(String[] args) {
        Data data = new Data();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
    }
}

class Data{
    private int num = 0;

    //+1
    public synchronized void increment() throws InterruptedException{
        if(num != 0){
            this.wait();
        }
        num++;
        System.out.println(Thread.currentThread().getName() + "->" + num);
        this.notifyAll();
    }

    //-1
    public synchronized void decrement() throws InterruptedException{
        if(num == 0){
            this.wait();
        }
        num--;
        System.out.println(Thread.currentThread().getName() + "->" + num);
        this.notifyAll();
    }
}
  • 当有更多线程时就会出现虚假唤醒,把if改成while避免虚假唤醒。等待应该总是出现在循环中。

  • lock版的

    • 随机执行的
package juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class B {

    public static void main(String[] args) {
        Data2 data2 = new Data2();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data2.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data2.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "B").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data2.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "C").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                try {
                    data2.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "D").start();
    }
}

class Data2 {
    private int num = 0;
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    //+1
    public void increment() throws InterruptedException {
        lock.lock();
        try {
            while (num != 0) {
                condition.await();
            }
            num++;
            System.out.println(Thread.currentThread().getName() + "->" + num);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    //-1
    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            while (num == 0) {
                condition.await();
            }
            num--;
            System.out.println(Thread.currentThread().getName() + "->" + num);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
    • 精准唤醒
package juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class C {

    public static void main(String[] args) {
        Data3 data3 = new Data3();

        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data3.printA();
            }
        }, "A").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data3.printB();
            }
        }, "B").start();
        new Thread(()->{
            for (int i = 0; i < 10; i++) {
                data3.printC();
            }
        }, "C").start();

    }
}


class Data3{
    private int num = 1;
    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void printA(){
        lock.lock();
        try {
            while (num != 1){
                condition1.await();
            }
            num=2;
            System.out.println(Thread.currentThread().getName() + "AAA");
            condition2.signalAll();//唤醒B
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printB(){
        lock.lock();
        try {
            while (num != 2){
                condition2.await();
            }
            num=3;
            System.out.println(Thread.currentThread().getName() + "BBB");
            condition3.signalAll();//唤醒C
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void printC(){
        lock.lock();
        try {
            while (num != 3){
                condition3.await();
            }
            num=1;
            System.out.println(Thread.currentThread().getName() + "CCC");
            condition1.signalAll();//唤醒A
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
  • 8锁
package juc.lock8;

import java.util.concurrent.TimeUnit;

public class Test1 {

    public static void main(String[] args) {
        Phone phone = new Phone();

        new Thread(()->{
            phone.sendMsg();
        }, "A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            phone.call();
        }, "B").start();

    }
}


class Phone{

    //锁的是方法的调用者
    //用的是同一个锁,谁先拿到谁执行
    public synchronized void sendMsg(){
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public synchronized void call(){
        System.out.println("打电话");
    }
}
package juc.lock8;

import java.util.concurrent.TimeUnit;

public class Test2 {
    public static void main(String[] args) {
        //一个对象只有一把锁
        Phone2 phone = new Phone2();

        new Thread(() -> {
            phone.sendMsg();
        }, "A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            phone.hello();
        }, "B").start();

        //先hello后发短信
    }
}


class Phone2 {

    //锁的是方法的调用者
    //用的是同一个锁,谁先拿到谁执行
    public synchronized void sendMsg() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public synchronized void call() {
        System.out.println("打电话");
    }

    //没有锁,不是同步方法,不受锁的影响
    public void hello() {
        System.out.println("hello");
    }
}
package juc.lock8;

import java.util.concurrent.TimeUnit;

public class Test3 {
    public static void main(String[] args) {
        //一个对象只有一把锁
        //两个对象的Class类模板只有一个,static锁的是class
        Phone3 phone1 = new Phone3();
        Phone3 phone2 = new Phone3();

        new Thread(() -> {
            phone1.sendMsg();
        }, "A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            phone2.call();
        }, "B").start();

        //先短信后打电话
    }
}

//两个静态的锁
//类一加载就有了,锁的是Class
class Phone3 {

    //锁的是方法的调用者
    //用的是同一个锁,谁先拿到谁执行
    public static synchronized void sendMsg() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    public static synchronized void call() {
        System.out.println("打电话");
    }

}
package juc.lock8;

import java.util.concurrent.TimeUnit;

public class Test4 {
    public static void main(String[] args) {
        //一个对象只有一把锁
        //两个对象的Class类模板只有一个,static锁的是class
        Phone4 phone1 = new Phone4();
        Phone4 phone2 = new Phone4();

        new Thread(() -> {
            phone1.sendMsg();
        }, "A").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            phone1.call();
        }, "B").start();

        //先打电话后发短信
    }
}

class Phone4 {

    //锁的是Class类模板
    public static synchronized void sendMsg() {
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("发短信");
    }

    //普通同步方法 锁的是调用者
    public synchronized void call() {
        System.out.println("打电话");
    }

}

集合类不安全

  • ConcurrentModificationException并发修改异常
  • 解决办法
    • 写入时复制
    • 使用工具类Collections
  • List不安全
package juc;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListTest {

    public static void main(String[] args) {

        /**
         * 并发编程下ArrayList不安全
         * 1.List<String> list = new Vector<>();
         * 2.List<String> list = Collections.synchronizedList(new ArrayList<>());
         * 3.List<String> list = new CopyOnWriteArrayList<>();
         */

        //CopyOnWrite 写入时复制
        List<String> list = new CopyOnWriteArrayList<>();

        for (int i = 1; i <= 10; i++) {
            new Thread(()->{
                list.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(list);
            }, String.valueOf(i)).start();
        }
    }
}
  • Set不安全
package juc;

import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

public class SetTest {

    public static void main(String[] args) {

        /**
         * 1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
         * 2.Set<String> set = new CopyOnWriteArraySet<>();
         */
        Set<String> set = new CopyOnWriteArraySet<>();
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                set.add(UUID.randomUUID().toString().substring(0, 5));
                System.out.println(set);
            }, String.valueOf(i)).start();
        }
    }
}
  • HashSet底层是HashMap

    public HashSet() {
        map = new HashMap<>();
    }
    
    public boolean add(E e) {
        return map.put(e, PRESENT)==null;
    }
    
  • HashMap不安全

package juc;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class MapTest {
    public static void main(String[] args) {

        //默认等价与new HashMap<>(16, 0.75); 初始化容量和加载因子
        Map<String, String> map = new ConcurrentHashMap<>();

        for (int i = 1; i <= 100; i++) {
            new Thread(()->{
                map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
                System.out.println(map);
            }, String.valueOf(i)).start();
        }
    }
}

Callable

  • 可以有返回值
  • 可以抛出异常
  • 方法不同,run()/call()
package juc;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class MyCallable {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // new Thread(new Runnable()).start();
        // new Thread(new FutureTask<V>()).start();
        // new Thread(new FutureTask<V>( Callable )).start();

        MyThread myThread = new MyThread();
        FutureTask<Integer> integerFutureTask = new FutureTask<>(myThread);// 适配类

        new Thread(integerFutureTask, "A").start();
        new Thread(integerFutureTask, "B").start();// 结果会被缓存,只会打印一个call

        Integer o = (Integer) integerFutureTask.get();// 返回值   此方法可能会产生阻塞,最好放在最后一行
        System.out.println(o);

    }
}

class MyThread implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("call()");
        return 7;
    }
}

CountDownLatch减法计数器

package juc;

import java.util.concurrent.CountDownLatch;

public class MyCountDownLatch {
    public static void main(String[] args) throws InterruptedException {
        // 必须要执行的任务时使用
        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName());
                countDownLatch.countDown();// 计数器减一
            }, String.valueOf(i)).start();
        }

        countDownLatch.await();// 等待计数器归零才会往下执行
        System.out.println("over");

    }
}

CyclicBarrier加法计数器

package juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class MyCyclicBarrier {
    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
            System.out.println("over");
        });

        for (int i = 0; i < 7; i++) {
            final int temp = i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + " " + temp);
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore

  • acquire(),relase()
  • 多个资源互斥使用
  • 并发限流,控制最大线程数
package juc;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class MySemaphore {
    public static void main(String[] args) {
        // 线程数量 限流
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();// 获取
                    System.out.println(Thread.currentThread().getName() + "获得资源");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + "释放资源");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();// 释放
                }
            }).start();
        }
    }
}

ReadWriteLock读写锁

  • 实现类:ReentrantReadWriteLock
  • 读锁(共享锁),写锁(独占锁)
package juc;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MyReadWriteLock {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();

        for (int i = 0; i <6; i++) {
            final int temp = i;
            new Thread(()->{
                myCache.put(temp + "", temp + "");// lambda无法访问到外部的变量
            }, String.valueOf(i)).start();
        }

        for (int i = 0; i <6; i++) {
            final int temp = i;
            new Thread(()->{
                myCache.get(temp + "");
            }, String.valueOf(i)).start();
        }
    }
}

class MyCache{
    private volatile Map<String, Object> map = new HashMap<>();
    // 读写锁
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    // 只有一个可以写
    public void put(String key, Object value){
        lock.writeLock().lock();

        try {
            System.out.println(Thread.currentThread().getName() + "写入" + key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "写完毕");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.writeLock().unlock();
        }

    }

    // 可以多个读
    public void get(String key){
        lock.readLock().lock();

        try {
            System.out.println(Thread.currentThread().getName() + "读取" + key);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName() + "读完毕");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.readLock().unlock();
        }

    }
}

BlockingQueue

  • Collection->Queue->BlockingQueue
  • 使用场景:多线程并发处理,线程池
  • 四组API
方式 抛出异常 不抛出异常 阻塞等待 超时等待
添加 add() offer() put() offer(,,)
移除 remove() poll() take() poll(,)
判断队首 element() peek() --- ---
package juc;

import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class MyBlockingQueue {
    public static void main(String[] args) throws InterruptedException {

        // 抛出异常
//        test1();

        // 不抛出异常
//        test2();

        // 等待(一直)
//        test3();

        // 超时等待
        test4();
    }

    public static void test1(){
        // 队列大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueue.add("a"));
        System.out.println(arrayBlockingQueue.add("b"));
        System.out.println(arrayBlockingQueue.add("c"));
//        System.out.println(arrayBlockingQueue.add("d"));// IllegalStateException队列满

        System.out.println(arrayBlockingQueue.element());// 查看队首

        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
//        System.out.println(arrayBlockingQueue.remove());// NoSuchElementException

    }

    public static void test2(){
        // 队列大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueue.offer("a"));
        System.out.println(arrayBlockingQueue.offer("b"));
        System.out.println(arrayBlockingQueue.offer("c"));
        System.out.println(arrayBlockingQueue.offer("d"));// 不跑出异常 返回false

        System.out.println(arrayBlockingQueue.peek());

        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());
        System.out.println(arrayBlockingQueue.poll());// 不抛出异常 返回null

    }

    public static void test3() throws InterruptedException {
        // 队列大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);

        arrayBlockingQueue.put("a");
        arrayBlockingQueue.put("b");
        arrayBlockingQueue.put("c");
//        arrayBlockingQueue.put("d");// 会一直等

        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
        System.out.println(arrayBlockingQueue.take());
//        System.out.println(arrayBlockingQueue.take());// 会一直等
    }

    public static void test4() throws InterruptedException {
        // 队列大小
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        arrayBlockingQueue.offer("a");
        arrayBlockingQueue.offer("b");
        arrayBlockingQueue.offer("c");
        arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS);// 两秒后结束等待

        arrayBlockingQueue.poll();
        arrayBlockingQueue.poll();
        arrayBlockingQueue.poll();
        arrayBlockingQueue.poll(2, TimeUnit.SECONDS);// 两秒后结束等待
    }

}

SynchronousQueue同步队列

  • 没有容量
  • 进去一个元素后,必须等取出来才能再往里放一个元素
package juc;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class MySynchronousQueue {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName() + " put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName() + " put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName() + " put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + blockingQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();
    }
}

线程池

  • 3大方法,7大参数,4大拒绝策略
  • 降低资源消耗
  • 提高响应速度
  • 方便管理
  • 线程复用,控制最大并发数,管理线程
  • 最大线程定义
    • CPU密集型:几核cpu就设置为几,通过代码去获取Runtime().getRuntime().availableProcessors()
    • IO密集型:判断程序中十分耗费IO的线程
package juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo1 {
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);// 固定大小
        ExecutorService threadPool = Executors.newCachedThreadPool();// 可伸缩的

        try {
            for (int i = 0; i < 10; i++) {
                // 使用线程池创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            threadPool.shutdown();
        }

    }
}
  • 源码分析
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


public ThreadPoolExecutor(int corePoolSize,// 核心线程大小
                          int maximumPoolSize,// 最大大小
                          long keepAliveTime,// 超时了没人调用就会释放
                          TimeUnit unit,// 超时单位
                          BlockingQueue<Runnable> workQueue,// 阻塞队列
                          ThreadFactory threadFactory,// 线程工厂
                          RejectedExecutionHandler handler) {// 拒绝策略
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • 手动创建线程池
package juc;

import java.util.concurrent.*;

public class Demo1 {
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);// 固定大小
//        ExecutorService threadPool = Executors.newCachedThreadPool();// 可伸缩的
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,// 核心线程大小
                5,// 最大大小
                3,// 超时时间
                TimeUnit.SECONDS,// 超时时间单位
                new LinkedBlockingQueue<>(3),// 阻塞队列
                Executors.defaultThreadFactory(),// 线程工厂
                new ThreadPoolExecutor.AbortPolicy());
        // 拒绝策略:1.AbortPolicy()队列满了还有线程要进就抛出异常
        // 2.CallerRunsPolicy() 哪来的去哪,由main线程执行
        // 3.DiscardPolicy() 队列满了,丢掉任务,不会抛出异常
        // 4.DiscardOldestPolicy() 队列满了,尝试和最早的线程竞争,不会抛出异常

        try {
            // 最大承载 = 队列 + max
            for (int i = 0; i < 8; i++) {
                // 使用线程池创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            threadPool.shutdown();
        }

    }
}

四大函数式接口

  • 四大原生函数式接口
    • 函数式接口:Function, 有一个输入和一个输出
    • 断定型接口:Predicate,有一个输入参数,返回值只能是布尔值
    • 消费型接口:Consumer,只有输入值,没有返回值
    • 供给型接口:Supplier,只有返回值
  • 只有一个抽象方法的接口,如Runnable,foreach(消费者类型的)
package juc;

import java.util.function.Function;

public class Demo2 {
    public static void main(String[] args) {
        // Function 有一个输入和一个输出
        // 工具类:输出输入的值
        Function function = new Function<String, String>(){
            @Override
            public String apply(String str) {
                return str;
            }
        };

        // 用lambda表达式
        Function function1 = (str)->{
            return str;
        };

        System.out.println(function.apply("haha"));
        System.out.println(function1.apply("xixi"));
    }
}
package juc;

import java.util.function.Predicate;

public class Demo3 {
    public static void main(String[] args) {
        // 判断字符串是否为空
        Predicate<String> predicate = new Predicate<String>() {
            @Override
            public boolean test(String s) {
                return s.isEmpty();
            }
        };
        Predicate<String> predicate1 = (str)->{return str.isEmpty();};

        System.out.println(predicate.test("s"));
        System.out.println(predicate1.test("x"));
    }
}
package juc;

import java.util.function.Consumer;

public class Demo4 {
    public static void main(String[] args) {
        // 消费型接口只有输入没有返回值
        Consumer<String> consumer = new Consumer<>(){
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        };
        Consumer<String> consumer1 = (str)->{
            System.out.println(str);
        };

        consumer.accept("haha");
        consumer1.accept("xixi");
    }
}
package juc;

import java.util.function.Supplier;

public class Demo5 {
    public static void main(String[] args) {
        Supplier<String> supplier = new Supplier<>(){
            @Override
            public String get() {
                return "haha";
            }
        };

        Supplier<String> supplier1 = ()->{return "xixi";};

        System.out.println(supplier.get());
        System.out.println(supplier1.get());
    }
}

Stream流式计算

package juc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;

public class Demo6 {
    /**
     * 有五个用户,进行筛选
     * 1.id为偶数
     * 2.年龄大于23
     * 3.用户名转大写
     * 4.用户名字母倒序
     * 5.只输出一个用户
     */
    public static void main(String[] args) {
        User u1 = new User(1,"a",21 );
        User u2 = new User(2,"b",22 );
        User u3 = new User(3,"c",23 );
        User u4 = new User(4,"d",24 );
        User u5 = new User(6,"e",25 );

        // 集合就是存储
        List<User> list = Arrays.asList(u1, u2, u3, u4, u5);

        // 计算交给Stream
        // 链式编程,lambda表达式,函数式接口,Stream流式计算
        list.stream()
                .filter(u ->{return u.getId()%2==0;})
                .filter(u->{return u.getAge()>23;})
                .map(u->{return u.getName().toUpperCase();})
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                .limit(1)
                .forEach(System.out::println);
    }
}

class User{
    private int id;
    private String name;
    private int age;

    public User(){

    }

    public User(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name=‘" + name + ‘\‘‘ +
                ", age=" + age +
                ‘}‘;
    }
}

ForkJoin

  • 工作窃取,里面维护的都是双端队列,大数据量的时候使用
package juc;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

public class MyForkJoin extends RecursiveTask<Long> {

    private Long start;
    private Long end;

    private Long temp = 10000L;// 临界值

    public MyForkJoin(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if((end-start)<temp){
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }else{
            // 分支合并计算
            long middle = (start+end) / 2;// 中间值
            MyForkJoin task1 = new MyForkJoin(start, middle);
            task1.fork();// 拆分任务,把任务压入队列
            MyForkJoin task2 = new MyForkJoin(middle+1, end);
            task2.fork();// 拆分任务,把任务压入队列
            return task1.join() + task2.join();
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

//        test1();// 5483

//        test2();// 3799

        test3();// 124
    }

    // 普通的
    public static void test1(){
        Long sum = 0L;
        long start = System.currentTimeMillis();
        for (Long i = 1L; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("总时间:" + (end-start) + " sum=" + sum);
    }

    // 使用ForkJoin
    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new MyForkJoin(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long sum = submit.get();

        long end = System.currentTimeMillis();
        System.out.println("总时间:" + (end-start) + " sum=" + sum);
    }

    // Stream并行流
    public static void test3(){
        long start = System.currentTimeMillis();

        Long sum = LongStream.rangeClosed(0L, 10_0000_0000L)
                .parallel()
                .reduce(0, Long::sum);

        long end = System.currentTimeMillis();
        System.out.println("总时间:" + (end-start) + " sum=" + sum);
    }

}

异步回调

  • 异步执行、成功回调、失败回调
package juc.Future;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class MyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        // 没有返回值的异步回调
//        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            System.out.println(Thread.currentThread().getName() + "runAsync->void");
//        });
//
//        System.out.println("77777");
//        completableFuture.get();// 获取执行结果

        // 有返回值的异步回调
        CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "supplyAsync->Integer");
            int i = 10/0;
            return 1024;
        });

        System.out.println(uCompletableFuture.whenComplete((t, u) -> {
            System.out.println("t=" + t);// 正常的返回
            System.out.println("u=" + u);// 错误信息
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 404;// 可以获取到错误的返回结果
        }).get());

    }
}

JMM

  • java内存模型,不存在,只是概念、约定
  • 线程解锁前:必须把共享变量立刻刷回主存
  • 线程加锁前:必须读取主存中最新值到工作内存中
  • 加锁解锁必须是同一把锁
  • 八种操作
    • lock:作用于主内存的变量,它把有个变量表示为一个线程独占的状态
    • unlock:作用于主内存的变量,它把一个锁定状态的变量解锁,解锁之后的变量才可以被其他变量锁定
    • read:作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存当中
    • load:作用于工作内存的变量,它把 load 操作从主内存中得到的变量的值放入工作内存的变量副本当中
    • use:作用于工作内存的变量,它把工作内存中的变量传递给执行引擎,每当虚拟机需要使用一个变量的值时就会只执行这个操作
    • assign:作用于工作内存的变量,它把从执行引擎接收到的值赋给工作内存中的变量,每当虚拟机遇到一个给变量赋值的字节码是执行操作
    • store:作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便 write 使用
    • write:作用于主内存的变量,它把 store 操作从工作内存中得到的变量的值放到主内存的变量中
    • 要求
      • 不允许 read 和 load、store 和 write 操作之一单独出现,即:不允许一个变量从主内存中读取但是工作内存不接受,或者从工作内存发起会写了但是主内存不接受的情况出现
      • 不允许一个线程丢失了它的最近的 assign 操作,即:变量在工作内存中改变了之后必须把该变量同步到主内存当中
      • 不允许一个线程无原因(没有发生过任何 assign 操作)把数据从线程的工作内存同步回主内存
        一个新的变量只能在主内存中诞生(初始化)
      • 不允许在工作内存中直接使用一个未被初始化(load 或 assign)的变量,即:就是对一个变量实施 use、store 操作之前,必须先执行 assign 和 load 操作
      • 一个变量在同一个时刻只允许一个 lock 对其操作,但是 lock 操作可以被一个线程多次执行。执行多次 lock 之后只有执行相同的 unlock 变量才可以被其他线程使用
      • 如果对一个变量执行 lock 操作,那将会清空工作内存中此变量的值,在执行引擎使用这个变量之前,需要重新执行 load 或 assign 操作进行初始化
      • 如果一个变量实现没有被执行 lock 操作,那么无法执行 unlock 操作
      • 对一个变量执行 unlock 操作之前必须先把此变量同步到主内存当中(store、write 操作)

Volatile

  • jvm提供的轻量级的同步机制
  • 保证可见性
  • 不保证原子性
  • 禁止指令重排:源代码,编译器优化的重排,指令并行重排,内存系统重排,执行
package juc;

import java.util.concurrent.TimeUnit;

public class MyVolatile {

    // 加volatile可以保证可见性
    private volatile static int num = 0;

    public static void main(String[] args) {
        new Thread(()->{// 线程对主内存的变化不知道
            while (num == 0) {
            }
        }).start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        num = 1;
        System.out.println(num);
    }
}
package juc;

public class MyV {
    private volatile static int num = 0;

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                for (int j = 0; j < 1000; j++) {
                    add();
                }
            }).start();
        }

        while (Thread.activeCount() > 2){
            Thread.yield();
        }

        System.out.println(Thread.currentThread().getName() + " " + num);// 结果小于20000
    }

    public static void add(){// 加synchronized可以保证原子性
        num++;
    }
}

CAS

CAS

缺点:循环会耗时;一次性只能保持一个共享变量的原子性;ABA问题

package juc.cas;

import java.util.concurrent.atomic.AtomicInteger;

public class CASDemo {

    // CAS  compareAndSet比较并交换 是CPU的并发原语
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(2020);
        // 如果是期望的值就更新,否则一直循环
        System.out.println(atomicInteger.compareAndSet(2020, 2021));// true
        System.out.println(atomicInteger.get());
        atomicInteger.getAndIncrement();// 2021

        System.out.println(atomicInteger.compareAndSet(2020, 2021));// false
        System.out.println(atomicInteger.get());// 2022

    }
}
  • ABA问题
package juc.cas;

import java.util.concurrent.atomic.AtomicInteger;

public class CASDemo {

    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(2020);
        // A对资源操作过了,又改回去了
        System.out.println(atomicInteger.compareAndSet(2020, 2021));// true
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(2021, 2020));// true
        System.out.println(atomicInteger.get());

        // B不知情
        System.out.println(atomicInteger.compareAndSet(2020, 2022));// true
        System.out.println(atomicInteger.get());

    }
}
  • 原子引用解决ABA问题
package juc.cas;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

public class CASDemo {

    public static void main(String[] args) {
        // !如果泛型是一个包装类,注意对象的引用问题
        // 带版本号的原子操作
        AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(1, 1);

        new Thread(()->{
            // 获得版本号
            System.out.println("A1->" + atomicInteger.getStamp());

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(atomicInteger.compareAndSet(1, 2,
                    atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
            System.out.println("A2->" + atomicInteger.getStamp());

            System.out.println(atomicInteger.compareAndSet(2, 1,
                    atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
            System.out.println("A3->" + atomicInteger.getStamp());

        },"A").start();

        // 乐观锁原理相同
        new Thread(()->{
            int stamp = atomicInteger.getStamp();// 获得版本号
            System.out.println("B1->" + atomicInteger.getStamp());

            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(atomicInteger.compareAndSet(1, 6, stamp, stamp + 1));
            System.out.println("B2->" + atomicInteger.getStamp());

        },"B").start();
    }
}
  • 注意!:Integer有对象缓存机制

Unsafe类

@IntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));// 获取内存地址中的值,自旋锁
    return v;
}
public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

各种锁的理解

  • 公平锁:不能插队,必须先来后到
  • 非公平锁:可以插队,默认都是非公平
public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
  • 可重入锁:拿到外面的锁,就可以拿到里面的锁,自动获得
package juc.lock;

public class Demo1 {
    public static void main(String[] args) {
        Phone phone = new Phone();
        new Thread(()->{
            phone.sms();
        }, "A").start();

        new Thread(()->{
            phone.sms();
        }, "B").start();

        /**
         * Asms
         * Acall
         * Bsms
         * Bcall
         */
    }
}

class Phone{
    public synchronized void sms(){
        System.out.println(Thread.currentThread().getName() + "sms");
        call();
    }

    public synchronized void call(){
        System.out.println(Thread.currentThread().getName() + "call");
    }
}
package juc.lock;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Demo2 {
    public static void main(String[] args) {
        Phone2 phone = new Phone2();
        new Thread(()->{
            phone.sms();
        }, "A").start();

        new Thread(()->{
            phone.sms();
        }, "B").start();

        /**
         * Asms
         * Acall
         * Bsms
         * Bcall
         */
    }
}

class Phone2{
    Lock lock = new ReentrantLock();

    public void sms(){

        lock.lock();// 和call的锁不同,是两把锁
        try {
            System.out.println(Thread.currentThread().getName() + "sms");
            call();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void call(){

        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "call");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
  • 自旋锁
package juc.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class MySpinlock {
    AtomicReference<Thread> atomicReference = new AtomicReference<>();
    // 加锁
    public void myLock() {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName() + " -> mylock");

        // 自旋锁
        while (!atomicReference.compareAndSet(null, thread)){

        }
    }

    // 解锁
    public void myUnLock() {
        Thread thread = Thread.currentThread();
        System.out.println(Thread.currentThread().getName() + " -> myUnlock");
        atomicReference.compareAndSet(thread, null);
    }

    public static void main(String[] args) throws InterruptedException {
        MySpinlock mySpinlock = new MySpinlock();
        new Thread(()->{
            mySpinlock.myLock();
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mySpinlock.myUnLock();
            }
        }, "A").start();

        TimeUnit.SECONDS.sleep(1);

        new Thread(()->{
            mySpinlock.myLock();
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mySpinlock.myUnLock();
            }
        }, "B").start();

        /**
         * A -> mylock
         * B -> mylock
         * A -> myUnlock
         * B -> myUnlock
         */
    }
}
  • 死锁
    • 使用“jps -l”定位进程号
    • 使用“jstack 进程号”寻找死锁问题

java juc

原文:https://www.cnblogs.com/sprinining/p/14598327.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!