public class Test1 {
public static void main(String[] args) {
//获取cpu核数
//cpu密集型 io密集型
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
与synchronized区别
虚假唤醒
package juc;
public class A {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
}
}
class Data{
private int num = 0;
//+1
public synchronized void increment() throws InterruptedException{
if(num != 0){
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName() + "->" + num);
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException{
if(num == 0){
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName() + "->" + num);
this.notifyAll();
}
}
当有更多线程时就会出现虚假唤醒,把if改成while避免虚假唤醒。等待应该总是出现在循环中。
lock版的
package juc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class B {
public static void main(String[] args) {
Data2 data2 = new Data2();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
try {
data2.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data2 {
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//+1
public void increment() throws InterruptedException {
lock.lock();
try {
while (num != 0) {
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() + "->" + num);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (num == 0) {
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName() + "->" + num);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
package juc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class C {
public static void main(String[] args) {
Data3 data3 = new Data3();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printA();
}
}, "A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printB();
}
}, "B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data3.printC();
}
}, "C").start();
}
}
class Data3{
private int num = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void printA(){
lock.lock();
try {
while (num != 1){
condition1.await();
}
num=2;
System.out.println(Thread.currentThread().getName() + "AAA");
condition2.signalAll();//唤醒B
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
while (num != 2){
condition2.await();
}
num=3;
System.out.println(Thread.currentThread().getName() + "BBB");
condition3.signalAll();//唤醒C
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
while (num != 3){
condition3.await();
}
num=1;
System.out.println(Thread.currentThread().getName() + "CCC");
condition1.signalAll();//唤醒A
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
package juc.lock8;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sendMsg();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
phone.call();
}, "B").start();
}
}
class Phone{
//锁的是方法的调用者
//用的是同一个锁,谁先拿到谁执行
public synchronized void sendMsg(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
package juc.lock8;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
//一个对象只有一把锁
Phone2 phone = new Phone2();
new Thread(() -> {
phone.sendMsg();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone.hello();
}, "B").start();
//先hello后发短信
}
}
class Phone2 {
//锁的是方法的调用者
//用的是同一个锁,谁先拿到谁执行
public synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public synchronized void call() {
System.out.println("打电话");
}
//没有锁,不是同步方法,不受锁的影响
public void hello() {
System.out.println("hello");
}
}
package juc.lock8;
import java.util.concurrent.TimeUnit;
public class Test3 {
public static void main(String[] args) {
//一个对象只有一把锁
//两个对象的Class类模板只有一个,static锁的是class
Phone3 phone1 = new Phone3();
Phone3 phone2 = new Phone3();
new Thread(() -> {
phone1.sendMsg();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone2.call();
}, "B").start();
//先短信后打电话
}
}
//两个静态的锁
//类一加载就有了,锁的是Class
class Phone3 {
//锁的是方法的调用者
//用的是同一个锁,谁先拿到谁执行
public static synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call() {
System.out.println("打电话");
}
}
package juc.lock8;
import java.util.concurrent.TimeUnit;
public class Test4 {
public static void main(String[] args) {
//一个对象只有一把锁
//两个对象的Class类模板只有一个,static锁的是class
Phone4 phone1 = new Phone4();
Phone4 phone2 = new Phone4();
new Thread(() -> {
phone1.sendMsg();
}, "A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
phone1.call();
}, "B").start();
//先打电话后发短信
}
}
class Phone4 {
//锁的是Class类模板
public static synchronized void sendMsg() {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发短信");
}
//普通同步方法 锁的是调用者
public synchronized void call() {
System.out.println("打电话");
}
}
package juc;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
public class ListTest {
public static void main(String[] args) {
/**
* 并发编程下ArrayList不安全
* 1.List<String> list = new Vector<>();
* 2.List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3.List<String> list = new CopyOnWriteArrayList<>();
*/
//CopyOnWrite 写入时复制
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
}
package juc;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
public class SetTest {
public static void main(String[] args) {
/**
* 1.Set<String> set = Collections.synchronizedSet(new HashSet<>());
* 2.Set<String> set = new CopyOnWriteArraySet<>();
*/
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0, 5));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
HashSet底层是HashMap
public HashSet() {
map = new HashMap<>();
}
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
HashMap不安全
package juc;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class MapTest {
public static void main(String[] args) {
//默认等价与new HashMap<>(16, 0.75); 初始化容量和加载因子
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 100; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5));
System.out.println(map);
}, String.valueOf(i)).start();
}
}
}
package juc;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class MyCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// new Thread(new Runnable()).start();
// new Thread(new FutureTask<V>()).start();
// new Thread(new FutureTask<V>( Callable )).start();
MyThread myThread = new MyThread();
FutureTask<Integer> integerFutureTask = new FutureTask<>(myThread);// 适配类
new Thread(integerFutureTask, "A").start();
new Thread(integerFutureTask, "B").start();// 结果会被缓存,只会打印一个call
Integer o = (Integer) integerFutureTask.get();// 返回值 此方法可能会产生阻塞,最好放在最后一行
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("call()");
return 7;
}
}
package juc;
import java.util.concurrent.CountDownLatch;
public class MyCountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 必须要执行的任务时使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();// 计数器减一
}, String.valueOf(i)).start();
}
countDownLatch.await();// 等待计数器归零才会往下执行
System.out.println("over");
}
}
package juc;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class MyCyclicBarrier {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
System.out.println("over");
});
for (int i = 0; i < 7; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName() + " " + temp);
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
package juc;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class MySemaphore {
public static void main(String[] args) {
// 线程数量 限流
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
semaphore.acquire();// 获取
System.out.println(Thread.currentThread().getName() + "获得资源");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "释放资源");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();// 释放
}
}).start();
}
}
}
package juc;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MyReadWriteLock {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i <6; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp + "", temp + "");// lambda无法访问到外部的变量
}, String.valueOf(i)).start();
}
for (int i = 0; i <6; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp + "");
}, String.valueOf(i)).start();
}
}
}
class MyCache{
private volatile Map<String, Object> map = new HashMap<>();
// 读写锁
private ReadWriteLock lock = new ReentrantReadWriteLock();
// 只有一个可以写
public void put(String key, Object value){
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
// 可以多个读
public void get(String key){
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读取" + key);
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}
}
方式 | 抛出异常 | 不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add() | offer() | put() | offer(,,) |
移除 | remove() | poll() | take() | poll(,) |
判断队首 | element() | peek() | --- | --- |
package juc;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class MyBlockingQueue {
public static void main(String[] args) throws InterruptedException {
// 抛出异常
// test1();
// 不抛出异常
// test2();
// 等待(一直)
// test3();
// 超时等待
test4();
}
public static void test1(){
// 队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.add("a"));
System.out.println(arrayBlockingQueue.add("b"));
System.out.println(arrayBlockingQueue.add("c"));
// System.out.println(arrayBlockingQueue.add("d"));// IllegalStateException队列满
System.out.println(arrayBlockingQueue.element());// 查看队首
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.remove());
// System.out.println(arrayBlockingQueue.remove());// NoSuchElementException
}
public static void test2(){
// 队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println(arrayBlockingQueue.offer("d"));// 不跑出异常 返回false
System.out.println(arrayBlockingQueue.peek());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());// 不抛出异常 返回null
}
public static void test3() throws InterruptedException {
// 队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");
// arrayBlockingQueue.put("d");// 会一直等
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
// System.out.println(arrayBlockingQueue.take());// 会一直等
}
public static void test4() throws InterruptedException {
// 队列大小
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<>(3);
arrayBlockingQueue.offer("a");
arrayBlockingQueue.offer("b");
arrayBlockingQueue.offer("c");
arrayBlockingQueue.offer("d", 2, TimeUnit.SECONDS);// 两秒后结束等待
arrayBlockingQueue.poll();
arrayBlockingQueue.poll();
arrayBlockingQueue.poll();
arrayBlockingQueue.poll(2, TimeUnit.SECONDS);// 两秒后结束等待
}
}
package juc;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class MySynchronousQueue {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + " put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "A").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "B").start();
}
}
package juc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Demo1 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);// 固定大小
ExecutorService threadPool = Executors.newCachedThreadPool();// 可伸缩的
try {
for (int i = 0; i < 10; i++) {
// 使用线程池创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,// 核心线程大小
int maximumPoolSize,// 最大大小
long keepAliveTime,// 超时了没人调用就会释放
TimeUnit unit,// 超时单位
BlockingQueue<Runnable> workQueue,// 阻塞队列
ThreadFactory threadFactory,// 线程工厂
RejectedExecutionHandler handler) {// 拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
package juc;
import java.util.concurrent.*;
public class Demo1 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);// 固定大小
// ExecutorService threadPool = Executors.newCachedThreadPool();// 可伸缩的
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,// 核心线程大小
5,// 最大大小
3,// 超时时间
TimeUnit.SECONDS,// 超时时间单位
new LinkedBlockingQueue<>(3),// 阻塞队列
Executors.defaultThreadFactory(),// 线程工厂
new ThreadPoolExecutor.AbortPolicy());
// 拒绝策略:1.AbortPolicy()队列满了还有线程要进就抛出异常
// 2.CallerRunsPolicy() 哪来的去哪,由main线程执行
// 3.DiscardPolicy() 队列满了,丢掉任务,不会抛出异常
// 4.DiscardOldestPolicy() 队列满了,尝试和最早的线程竞争,不会抛出异常
try {
// 最大承载 = 队列 + max
for (int i = 0; i < 8; i++) {
// 使用线程池创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
package juc;
import java.util.function.Function;
public class Demo2 {
public static void main(String[] args) {
// Function 有一个输入和一个输出
// 工具类:输出输入的值
Function function = new Function<String, String>(){
@Override
public String apply(String str) {
return str;
}
};
// 用lambda表达式
Function function1 = (str)->{
return str;
};
System.out.println(function.apply("haha"));
System.out.println(function1.apply("xixi"));
}
}
package juc;
import java.util.function.Predicate;
public class Demo3 {
public static void main(String[] args) {
// 判断字符串是否为空
Predicate<String> predicate = new Predicate<String>() {
@Override
public boolean test(String s) {
return s.isEmpty();
}
};
Predicate<String> predicate1 = (str)->{return str.isEmpty();};
System.out.println(predicate.test("s"));
System.out.println(predicate1.test("x"));
}
}
package juc;
import java.util.function.Consumer;
public class Demo4 {
public static void main(String[] args) {
// 消费型接口只有输入没有返回值
Consumer<String> consumer = new Consumer<>(){
@Override
public void accept(String s) {
System.out.println(s);
}
};
Consumer<String> consumer1 = (str)->{
System.out.println(str);
};
consumer.accept("haha");
consumer1.accept("xixi");
}
}
package juc;
import java.util.function.Supplier;
public class Demo5 {
public static void main(String[] args) {
Supplier<String> supplier = new Supplier<>(){
@Override
public String get() {
return "haha";
}
};
Supplier<String> supplier1 = ()->{return "xixi";};
System.out.println(supplier.get());
System.out.println(supplier1.get());
}
}
package juc;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
public class Demo6 {
/**
* 有五个用户,进行筛选
* 1.id为偶数
* 2.年龄大于23
* 3.用户名转大写
* 4.用户名字母倒序
* 5.只输出一个用户
*/
public static void main(String[] args) {
User u1 = new User(1,"a",21 );
User u2 = new User(2,"b",22 );
User u3 = new User(3,"c",23 );
User u4 = new User(4,"d",24 );
User u5 = new User(6,"e",25 );
// 集合就是存储
List<User> list = Arrays.asList(u1, u2, u3, u4, u5);
// 计算交给Stream
// 链式编程,lambda表达式,函数式接口,Stream流式计算
list.stream()
.filter(u ->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;})
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);
}
}
class User{
private int id;
private String name;
private int age;
public User(){
}
public User(int id, String name, int age) {
this.id = id;
this.name = name;
this.age = age;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name=‘" + name + ‘\‘‘ +
", age=" + age +
‘}‘;
}
}
package juc;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
public class MyForkJoin extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp = 10000L;// 临界值
public MyForkJoin(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else{
// 分支合并计算
long middle = (start+end) / 2;// 中间值
MyForkJoin task1 = new MyForkJoin(start, middle);
task1.fork();// 拆分任务,把任务压入队列
MyForkJoin task2 = new MyForkJoin(middle+1, end);
task2.fork();// 拆分任务,把任务压入队列
return task1.join() + task2.join();
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1();// 5483
// test2();// 3799
test3();// 124
}
// 普通的
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("总时间:" + (end-start) + " sum=" + sum);
}
// 使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new MyForkJoin(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("总时间:" + (end-start) + " sum=" + sum);
}
// Stream并行流
public static void test3(){
long start = System.currentTimeMillis();
Long sum = LongStream.rangeClosed(0L, 10_0000_0000L)
.parallel()
.reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("总时间:" + (end-start) + " sum=" + sum);
}
}
package juc.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class MyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// // 没有返回值的异步回调
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName() + "runAsync->void");
// });
//
// System.out.println("77777");
// completableFuture.get();// 获取执行结果
// 有返回值的异步回调
CompletableFuture<Integer> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "supplyAsync->Integer");
int i = 10/0;
return 1024;
});
System.out.println(uCompletableFuture.whenComplete((t, u) -> {
System.out.println("t=" + t);// 正常的返回
System.out.println("u=" + u);// 错误信息
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 404;// 可以获取到错误的返回结果
}).get());
}
}
package juc;
import java.util.concurrent.TimeUnit;
public class MyVolatile {
// 加volatile可以保证可见性
private volatile static int num = 0;
public static void main(String[] args) {
new Thread(()->{// 线程对主内存的变化不知道
while (num == 0) {
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}
package juc;
public class MyV {
private volatile static int num = 0;
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println(Thread.currentThread().getName() + " " + num);// 结果小于20000
}
public static void add(){// 加synchronized可以保证原子性
num++;
}
}
缺点:循环会耗时;一次性只能保持一个共享变量的原子性;ABA问题
package juc.cas;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
// CAS compareAndSet比较并交换 是CPU的并发原语
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
// 如果是期望的值就更新,否则一直循环
System.out.println(atomicInteger.compareAndSet(2020, 2021));// true
System.out.println(atomicInteger.get());
atomicInteger.getAndIncrement();// 2021
System.out.println(atomicInteger.compareAndSet(2020, 2021));// false
System.out.println(atomicInteger.get());// 2022
}
}
package juc.cas;
import java.util.concurrent.atomic.AtomicInteger;
public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
// A对资源操作过了,又改回去了
System.out.println(atomicInteger.compareAndSet(2020, 2021));// true
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021, 2020));// true
System.out.println(atomicInteger.get());
// B不知情
System.out.println(atomicInteger.compareAndSet(2020, 2022));// true
System.out.println(atomicInteger.get());
}
}
package juc.cas;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class CASDemo {
public static void main(String[] args) {
// !如果泛型是一个包装类,注意对象的引用问题
// 带版本号的原子操作
AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(1, 1);
new Thread(()->{
// 获得版本号
System.out.println("A1->" + atomicInteger.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicInteger.compareAndSet(1, 2,
atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
System.out.println("A2->" + atomicInteger.getStamp());
System.out.println(atomicInteger.compareAndSet(2, 1,
atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
System.out.println("A3->" + atomicInteger.getStamp());
},"A").start();
// 乐观锁原理相同
new Thread(()->{
int stamp = atomicInteger.getStamp();// 获得版本号
System.out.println("B1->" + atomicInteger.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicInteger.compareAndSet(1, 6, stamp, stamp + 1));
System.out.println("B2->" + atomicInteger.getStamp());
},"B").start();
}
}
@IntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));// 获取内存地址中的值,自旋锁
return v;
}
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
package juc.lock;
public class Demo1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sms();
}, "A").start();
new Thread(()->{
phone.sms();
}, "B").start();
/**
* Asms
* Acall
* Bsms
* Bcall
*/
}
}
class Phone{
public synchronized void sms(){
System.out.println(Thread.currentThread().getName() + "sms");
call();
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName() + "call");
}
}
package juc.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Demo2 {
public static void main(String[] args) {
Phone2 phone = new Phone2();
new Thread(()->{
phone.sms();
}, "A").start();
new Thread(()->{
phone.sms();
}, "B").start();
/**
* Asms
* Acall
* Bsms
* Bcall
*/
}
}
class Phone2{
Lock lock = new ReentrantLock();
public void sms(){
lock.lock();// 和call的锁不同,是两把锁
try {
System.out.println(Thread.currentThread().getName() + "sms");
call();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "call");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
package juc.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class MySpinlock {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
// 加锁
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " -> mylock");
// 自旋锁
while (!atomicReference.compareAndSet(null, thread)){
}
}
// 解锁
public void myUnLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + " -> myUnlock");
atomicReference.compareAndSet(thread, null);
}
public static void main(String[] args) throws InterruptedException {
MySpinlock mySpinlock = new MySpinlock();
new Thread(()->{
mySpinlock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
mySpinlock.myUnLock();
}
}, "A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
mySpinlock.myLock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
mySpinlock.myUnLock();
}
}, "B").start();
/**
* A -> mylock
* B -> mylock
* A -> myUnlock
* B -> myUnlock
*/
}
}
原文:https://www.cnblogs.com/sprinining/p/14598327.html