JUC是在并发编程中使用的工具类java.util.concurrent
。
public class FakeWake {
public static void main(String[] args) {
Data data = new Data();
new Thread(()-> { for (int i = 0; i < 10; i++) data.increment(); }).start();
new Thread(()->{ for (int i = 0; i < 10; i++) data.decrement(); }).start();
new Thread(()->{ for (int i = 0; i < 10; i++) data.increment(); }).start();
new Thread(()->{ for (int i = 0; i < 10; i++) data.decrement(); }).start();
}
}
class Data{
private int num = 0;
public synchronized void increment(){
if(num != 0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//唤醒其他线程
this.notifyAll();
}
public synchronized void decrement(){
if(num == 0){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num--;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//唤醒其他线程
this.notifyAll();
}
}
运行结果:
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-1=>0
Thread-0=>1
Thread-2=>2
Thread-0=>3
Thread-1=>2
Thread-1=>1
Thread-1=>0
Thread-0=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-2=>2
Thread-3=>1
Thread-3=>0
Thread-2=>1
Thread-3=>0
Thread-2=>1
Thread-3=>0
上述程序就存在虚假唤醒的问题。我们可以看到结果出现了2、3数字,这不符合我们的预期。我们现在来解释一下为什么会出现这种情况,就拿数字2举例:
this.wait()
方法,进入等待队列等待唤醒。this.wait()
方法,进入等待队列等待唤醒。num--
操作,接着执行this.notifyAll()
方法,唤醒等待队列的全部进程。num++
操作,然后第二个生产者也执行了num++
操作(也可能是第二个生产者先执行)。解决办法:将生产方法和消费方法里的if判断改为while循环判断。
public class LockPAC {
public static void main(String[] args) {
Data2 data2 = new Data2();
new Thread(()-> { for (int i = 0; i < 10; i++) data2.increment(); }).start();
new Thread(()->{ for (int i = 0; i < 10; i++) data2.decrement(); }).start();
new Thread(()->{ for (int i = 0; i < 10; i++) data2.increment(); }).start();
new Thread(()->{ for (int i = 0; i < 10; i++) data2.decrement(); }).start();
}
}
class Data2{
private int num = 0;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment(){
lock.lock();
try{
while(num != 0) {
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//唤醒其他线程
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrement(){
lock.lock();
try {
while(num == 0){
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName()+"=>"+num);
//唤醒其他线程
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
Condition实现精准通知和唤醒线程:
public class TestCondition {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printA();
}
},"A").start();
new Thread(()->{for (int i = 0; i < 10; i++) {
data3.printB();
}},"B").start();
new Thread(()->{for (int i = 0; i < 10; i++) {
data3.printC();
}},"C").start();
}
}
class Data3{
private Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int flag = 1;
public void printA(){
lock.lock();
try {
//业务
while(flag != 1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"=>A");
flag = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
//业务
while(flag != 2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"=>B");
flag = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
//业务
while(flag != 3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"=>C");
flag = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
ArrayList、HashMap、HashSet都不安全,在多线程环境下应该使用如下方式:
public class TestCollection {
public static void main(String[] args) throws InterruptedException {
CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
for (int i = 0; i < 10000; i++) {
new Thread(()->{copyOnWriteArrayList.add("xxx");}).start();
}
Map stringIntegerMap = new ConcurrentHashMap();
for (int i = 0; i < 10000; i++) {
new Thread(()->{stringIntegerMap.put(UUID.randomUUID(),5);}).start();
}
Set set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 10000; i++) {
new Thread(()->{set.add(UUID.randomUUID());}).start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println(stringIntegerMap.size());
System.out.println(copyOnWriteArrayList.size());
System.out.println(set.size());
}
}
public class TestCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyThread myThread = new MyThread();
FutureTask futureTask = new FutureTask(myThread);
new Thread(futureTask).start();
new Thread(futureTask).start();//有缓存
Object o = futureTask.get();//会阻塞
System.out.println(o);
}
}
class MyThread implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("call()");
return "1024";
}
}
CountDownLatch:
countDownLatch.countDown();//计数器减一
countDownLatch.await();//等待计数器归零
CyclicBarrier:
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("成功!");
});
cyclicBarrier.await();
Semaphore:
public class TestSemaphore {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//限流
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
semaphore.acquire();//假如满了等待
System.out.println(Thread.currentThread().getName()+"抢到车位!");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();//当前信号量加一,然后等待唤醒的线程
}
},String.valueOf(i+1)).start();
}
}
}
读写锁能更加细粒度的控制加锁解锁操作。
ReadWriteLock readWriteLock = new ReadWriteLock();
readWriteLock.readlock().lock();
readWriteLock.writelock().lock();
常见的阻塞队列:
BlockingQueue的方法:
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer | put | offer(,,) |
移除 | remove | poll | take | poll(,) |
检测队首元素 | element | peek | - | - |
线程池在系统启动时即创建大量空闲线程,程序将一个Runnable对象或一个Callable对象传给线程池,线程池就会启动一个线程来执行它们的run()或call()方法,方法执行完后该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一次任务的到来。
使用线程池原因:
Executors.newSingleThreadExecutor();//单个线程
Executors.newFixedThreadExecutor();//一个固定的线程池
Executors.newCachedThreadExecutor();//可伸缩的
Executors.newScheduledThreadPool();//这个池子里的线程可以按schedule依次delay执行,或周期执行
自定义线程池:
public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//超时了没有人用就会释放
TimeUnit unit,//超时单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
四种拒绝策略:
Runtime.getRuntime().availableProcessors();//获取CPU的核数
CPU密集型:线程池中线程个数应尽量少,不应大于CPU核心数。
IO密集型:由于IO操作速度远低于CPU速度,那么在运行这类任务时,CPU绝大多数时间处于空闲状态,那么线程池可以配置尽量多些的线程,以提高CPU利用率。
函数式接口:只有一个方法的接口。
Function:传入参数T,返回参数I。
Predicate:断定性接口,传入一个参数,返回布尔值。
public static void main(String[] args) {
Function function = new Function<String, String>() {
@Override
public String apply(String s) {
return s;
}
};
System.out.println(function.apply("ssss"));
// Predicate<String> predicate = new Predicate<String>() {
//
// @Override
// public boolean test(String s) {
// return false;
// }
// };
Predicate<String> predicate = (str) -> {
return str.isEmpty();
};
System.out.println(predicate.test("s"));
}
Consumer:消费性接口,只有输入,没有返回值。
Supplier:生产型接口,没有参数输入,只有输出。
public class TestStream {
public static void main(String[] args) {
Consumer consumer = (str)->{ System.out.println(str); };
consumer.accept("s");
Supplier supplier = ()->{return "s";};
System.out.println(supplier.get());
}
}
public static void main(String[] args) {
User u1 = new User(12, "A", 13);
User u2= new User(13, "B", 24);
User u3 = new User(14, "C", 34);
User u4 = new User(15, "D", 43);
User u5 = new User(16, "E", 15);
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
//求得学生id为偶数、年龄大于23的学生并把学生姓名转换为大写、逆字典序排列,最后只输出第一个学生。
list.stream().filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;})
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);
}
public class TestSum {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();//844
test2();//838
test3();//
}
//普通求和
public static void test1(){
long start = System.currentTimeMillis();
long sum = 0;
for (long i = 0; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println(sum+" "+(end-start));
}
//ForkJoin求和
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
TestForkJoin testForkJoin = new TestForkJoin(0,10_0000_0000);
ForkJoinTask<Long> submit = forkJoinPool.submit(testForkJoin);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println(sum+" "+(end-start));
}
//stream并行流
public static void test3(){
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0,10_0000_0000).parallel().reduce(0,Long::sum);
long end = System.currentTimeMillis();
System.out.println(sum+" "+(end-start));
}
}
public class TestForkJoin extends RecursiveTask<Long> {
private long start;
private long end;
private long temp = 10000;
public TestForkJoin(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if(end-start<temp){
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else{
long middle = (start+end)/2;
TestForkJoin task1 = new TestForkJoin(start,middle);
task1.fork();//拆分任务,把任务压入线程队列
TestForkJoin task2 = new TestForkJoin(middle+1,end);
task2.fork();
return task1.join()+task2.join();
}
}
}
public class TestCompletableFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "runAsync=>Void");
});
System.out.println("111");
completableFuture.get();
//有返回值的
CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
return 1024;
});
System.out.println(uCompletableFuture.whenComplete((t, u) -> {
System.out.println("t=>" + t);//正常的返回结果
System.out.println("u=>" + u);//异常
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 233;
}).get());
}
}
JMM:java内存模型。
JMM有八种内存操作:
volatile是Java语言的一个关键字,是Java虚拟机提供轻量级的同步机制。
volatile的特性:
可见性:
public class TestVolatile {
private static volatile int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while(num == 0){
}
}).start();
TimeUnit.SECONDS.sleep(2);
num = 1;
System.out.println(num);
}
}
不保证原子性:
public class TestVolatile2 {
private static volatile int num = 0;
//可使用原子类保证原子性
//private static AtomicInteger num = new AtomicInteger();
public static void add(){
num++;
//num.getAndIncrement();
}
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int i1 = 0; i1 < 1000; i1++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"num=>"+num);
}
}
禁止指令重排:
int i = 0;
boolean flag = false;
i = 1; //语句1
flag = true; //语句2
上面代码定义了一个int型变量,定义了一个boolean类型变量,然后分别对两个变量进行赋值操作。从代码顺序上看,语句1是在语句2前面的,那么JVM在真正执行这段代码的时候会保证语句1一定会在语句2前面执行吗?不一定,为什么呢?这里可能会发生指令重排序(Instruction Reorder)。比如上面的代码中,语句1和语句2谁先执行对最终的程序结果并没有影响,那么就有可能在执行过程中,语句2先执行而语句1后执行。因为处理器在进行重排序时是会考虑指令之间的数据依赖性,如果一个指令Instruction 2必须用到Instruction 1的结果,那么处理器会保证Instruction 1会在Instruction 2之前执行。从上面可以看出,指令重排序不会影响单个线程的执行,但是会影响到线程并发执行的正确性。
//x、y为非volatile变量
//flag为volatile变量
x = 2; //语句1
y = 0; //语句2
flag = true; //语句3
x = 4; //语句4
y = -1; //语句5
由于flag变量为volatile变量,那么在进行指令重排序的过程的时候,不会将语句3放到语句1、语句2前面,也不会讲语句3放到语句4、语句5后面。但是要注意语句1和语句2的顺序、语句4和语句5的顺序是不作任何保证的。并且volatile关键字能保证,执行到语句3时,语句1和语句2必定是执行完毕了的,且语句1和语句2的执行结果对语句3、语句4、语句5是可见的。
“观察加入volatile关键字和没有加入volatile关键字时所生成的汇编代码发现,加入volatile关键字时,会多出一个lock前缀指令”。lock前缀指令实际上相当于一个内存屏障(也称作内存栅栏),内存屏障会提供3个功能:
AtomicInteger.getAndIncrement()是原子操作,它调用的是getAndAddInt方法,最后调用的compareAndSwapInt方法,即CAS。
getAndAddInt方法源码:
getAndAddInt方法解析:拿到内存位置的最新值var5,使用CAS尝试修将内存位置的值修改为目标值var5+var4,如果修改失败,则获取该内存位置的新值v,然后继续尝试,直至修改成功。
compareAndSwapInt源码:
Atomic::cmpxhg源码:(windows_x86)
Atomic::cmpxchg方法解析:
mp是“os::is_MP()”的返回结果,“os::is_MP()”是一个内联函数,用来判断当前系统是否为多处理器。如果当前系统是多处理器,该函数返回1,否则,返回0。LOCK_IF_MP(mp)会根据mp的值来决定是否为cmpxchg指令添加lock前缀。如果通过mp判断当前系统是多处理器(即mp值为1),则为cmpxchg指令添加lock前缀。否则,不加lock前缀。这是一种优化手段,认为单处理器的环境没有必要添加lock前缀,只有在多核情况下才会添加lock前缀,因为lock会导致性能下降。cmpxchg是汇编指令,作用是比较并交换操作数。
CAS的缺点:
ABA问题:
如果内存地址V初次读取的值是A,并且在准备赋值的时候检查到它的值仍然为A,那我们就能说它的值没有被其他线程改变过了吗?如果在这段期间它的值曾经被改成了B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过。这个漏洞称为CAS操的“ABA”问题。Java并发包为了解决这个问题,提供了一个带有标记的原子引用类AtomicStampedReference,它可以通过控制变量值的版本来保证CAS的正确性。因此,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。
原文:https://www.cnblogs.com/baihan/p/13066555.html