分段锁实现,线程数越多越有优势
package com.liyuanchen.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
/**
* 粗略对比synchronized、AtomicLong、LongAdder执行效率可以发现在高并发情况下LongAdder的效率会高很多,
* 这是因为LongAdder内部使用了分段锁,再使用UnSafe类原子操作更为高效
*/
public class LongAdderTest {
private static long count1 = 0L;
static AtomicLong count2 = new AtomicLong(0L);
static LongAdder count3 = new LongAdder();
static int length = 1000;
static List<Thread> threads = new ArrayList<>();
public static void main(String[] args) {
testSync();
testAtomic();
testLongAdder();
}
private static void testSync() {
Object lock = new Object();
for (int i = 0; i < length; i++) {
threads.add(new Thread(() -> {
for (int j= 0; j < 100000; j++) {
synchronized (lock) {
count1 ++;
}
}
}));
}
long start = System.currentTimeMillis();
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("synchronized:"+ count1 +"消耗时间:" + (System.currentTimeMillis() - start));
threads.clear();
}
private static void testAtomic() {
for (int i = 0; i < length; i++) {
threads.add(new Thread(() -> {
for (int j= 0; j < 100000; j++) {
count2.incrementAndGet();
}
}));
}
long start = System.currentTimeMillis();
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("Atomic:"+ count2.get()+"消耗时间:" + (System.currentTimeMillis() - start));
threads.clear();
}
private static void testLongAdder() {
for (int i = 0; i < length; i++) {
threads.add(new Thread(() -> {
for (int j= 0; j < 100000; j++) {
count3.increment();
}
}));
}
long start = System.currentTimeMillis();
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("LongAdder:"+ count3.longValue()+"消耗时间:" + (System.currentTimeMillis() - start));
threads.clear();
}
}
package com.liyuanchen.juc;
/**
* wait()使当前线程处于等待状态
* notify()唤醒一个处于等待状态的线程回到就绪状态
* notifyAll()唤醒所有处于等待状态的线程回到就绪状态
* 需求:一个线程打印12345,另一个线程打印abcde,要求两个线程交替打印
*/
public class WaitNotifyTest {
public static void main(String[] args) {
int[] x = {1, 2, 3, 4, 5};
String[] y = {"a", "b", "c", "d", "e"};
new Thread(() -> {
synchronized (WaitNotifyTest.class) {
for (int i = 0; i < x.length; i++) {
System.out.println(x[i]);
try {
//注意要先notify再wait
WaitNotifyTest.class.notify();
WaitNotifyTest.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
WaitNotifyTest.class.notify();
}
}).start();
new Thread(() -> {
synchronized (WaitNotifyTest.class) {
for (int i = 0; i < y.length; i++) {
try {
System.out.println(y[i]);
WaitNotifyTest.class.notify();
WaitNotifyTest.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
WaitNotifyTest.class.notify();
}
}).start();
}
}
package com.liyuanchen.juc;
import java.util.concurrent.locks.LockSupport;
/**
* LockSupport可以让线程在任意位置阻塞
* 需求:一个线程打印12345,另一个线程打印abcde,要求两个线程交替打印
*/
public class LockSupportTest {
static Thread thread1 = null;
static Thread thread2 = null;
public static void main(String[] args) {
int[] x = {1, 2, 3, 4, 5};
String[] y = {"a", "b", "c", "d", "e"};
thread1 = new Thread(() -> {
for (int i = 0; i < x.length; i++) {
System.out.println(x[i]);
//先恢复另一个线程
LockSupport.unpark(thread2);
//阻塞线程
LockSupport.park();
}
});
thread2 = new Thread(() -> {
for (int i = 0; i < y.length; i++) {
//先阻塞线程
LockSupport.park();
System.out.println(y[i]);
//恢复另一个线程
LockSupport.unpark(thread1);
}
});
thread1.start();
thread2.start();
}
}
ReentrantLock底层基于AQS实现,相比于synchronized更为灵活。
tryLock():尝试获取锁,返回布尔类型值,不管锁定与否,方法都将继续执行,也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中。
lockInterruptibly():可以对interrupt()方法做出响应。
newCondition():通过Condition能够更加精确的控制多线程的休眠与唤醒。
package com.liyuanchen.juc;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest {
final private int MAX = 10;
private LinkedList<String> lists = new LinkedList<>();
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void put(String str) {
try {
lock.lock();
while (lists.size() == MAX) {
producer.await();
}
lists.add(str);
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public String get() {
String str = null;
try {
lock.lock();
while (lists.size() == 0) {
consumer.await();
}
str = lists.removeFirst();
producer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return str;
}
public static void main(String[] args) {
ReentrantLockTest rlt = new ReentrantLockTest();
for (int i = 0; i < 10; i++) {
//启动10个消费者线程
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(rlt.get());
}
}).start();
}
for (int i = 0; i < 2; i++) {
//启动2个生产者线程
new Thread(() -> {
for (int j = 0; j < 25; j++) {
rlt.put(Thread.currentThread().getName() + ":" + j);
}
}).start();
}
}
}
package com.liyuanchen.juc;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* 门栓CountDownLatch是通过一个计数变量实现的,await()使当前线程等待,调用countDown()方法计数值减1,
* 当值为0时线程继续继续运行
* 另一个await(long timeout, TimeUnit unit)方法可设置等待时间,如果经过了指定的等待时间,则线程继续运行
*/
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, "t1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
countDownLatch.countDown();
}, "t2").start();
new Thread(() -> {
try {
countDownLatch.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, "t3").start();
}
}
package com.liyuanchen.juc;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 栅栏CyclicBarrier
* 线程到达await后阻塞,直到阻塞线程数量达到设置值parties继续运行。
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(20, () -> System.out.println("释放"));
for (int i = 0; i < 100; i++) {
new Thread(() -> {
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
package com.liyuanchen.juc;
import java.util.concurrent.Phaser;
/**
* 类Phaser提供了动态增减parties计数,可以实现针对某一个线程取消同步运行的效果,
* 可以在指定屏障处等待,相比于CyclicBarrier更为灵活
*/
public class PhaserTest extends Phaser {
public PhaserTest(int parties) {
super(parties);
}
static PhaserTest phaserTest = new PhaserTest(5);
/**
* 需要重写onAdvance方法
* @param phase
* @param registeredParties
* @return
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一次到齐");
return false;
case 1:
System.out.println("第二次到齐");
return false;
case 2:
System.out.println("第三次到齐");
return true;
default:
return true;
}
}
private static void m() {
System.out.println(Thread.currentThread().getName() + "到达第一个阻碍");
//方法arriveAndAwaitAdvance()的作用是使当前线等待,达到设定parties值后继续运行
phaserTest.arriveAndAwaitAdvance();
//获得已经被使用的parties个数
// System.out.println("已经被使用的parties个数" + phaserTest.getArrivedParties());
//获得未被使用的parties个数
// System.out.println("未被使用的parties个数" + phaserTest.getUnarrivedParties());
System.out.println(Thread.currentThread().getName() + "到达第二个阻碍");
phaserTest.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + "到达第三个阻碍");
if (Thread.currentThread().getName().equals("t7") || Thread.currentThread().getName().equals("t8") ||
Thread.currentThread().getName().equals("t9")) {
phaserTest.arriveAndAwaitAdvance();
} else {
//方法arriveAndDeregister()的作用是使当前线程退出,并且使parties值减1
phaserTest.arriveAndDeregister();
}
}
public static void main(String[] args) {
//批量增加parties数量
phaserTest.bulkRegister(4);
//parties数量+1
phaserTest.register();
for (int i = 0; i < 7; i++) {
new Thread(PhaserTest::m, "t" + i).start();
}
new Thread(PhaserTest::m, "t7").start();
new Thread(PhaserTest::m, "t8").start();
new Thread(PhaserTest::m, "t9").start();
}
}
package com.liyuanchen.juc;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ReadWriteLock在读取数据时是共享锁,写数据则是排他锁,多用于写少读多业务场景
*/
public class ReadWriteLockTest {
static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static int read() {
int res;
try {
readLock.lock();
res = value;
System.out.println("read value:" + res);
} finally {
readLock.unlock();
}
return res;
}
public static void write(int var) {
try {
writeLock.lock();
value = var;
System.out.println("write value:" + value);
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
//读线程
for (int i = 0; i < 25; i++) new Thread(ReadWriteLockTest::read).start();
//写线程
for (int i = 0; i < 5; i++) new Thread(() -> write(new Random().nextInt())).start();
}
}
package com.liyuanchen.juc;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Semaphore 是一个计数信号量,必须由获取它的线程释放
* 常用于限制可以访问某些资源的线程数量
*/
public class SemaphoretEST {
public static void main(String[] args) {
//permits表示允许几个线程同时运行,fair参数true代表公平入队,默认非公平
Semaphore semaphore = new Semaphore(1, true);
new Thread(() -> {
try {
//获得一个许可,消耗一个permits
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "运行");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放一个permits
semaphore.release();
}
}, "t1").start();
new Thread(() -> {
try {
//获得一个许可,消耗一个permits
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "运行");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放一个permits
semaphore.release();
}
}, "t2").start();
}
}
package com.liyuanchen.juc;
import java.util.concurrent.Exchanger;
/**
* Exchanger是一个用于线程间协作的工具类,只能作用于两个线程
*/
public class ExchangerTest {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
//线程达到阻塞,等待第二个线程到达才往下执行
String var = exchanger.exchange(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " start " + var);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
//线程达到阻塞,等待另一个线程到达才往下执行
String var = exchanger.exchange(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " start " + var);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
AbstractQueuedSynchronizer抽象队列同步器简称AQS,本质上AQS就是通过volatile、双向链表与CAS实现的。通过volatile修饰的state值判断线程是否被占用,如果没有占用,则使用compareAndSetState方法修改state值,并且线程获得独占锁,否则线程compareAndSetTail进入队列尾部等待执行。compareAndSetState与compareAndSetTail都是CAS操作。
JAVA9之后AQS使用了VarHandle确保原子性操作:
原子性操作
比反射快,直接操作二进制码
ThreadLocal是线程内的存储类,可以在指定线程内存储数据,数据存储以后,保证指定线程可以得到存储数据。
每个线程中都有一个ThreadLocalMap对象,ThreadLocal对象为key,数据值为value设入了线程中的ThreadLocalMap。
ThreadLocalMap的Key为弱引用
ThreadLocal不用了务必要remove
原文:https://www.cnblogs.com/allen167/p/13870293.html