所谓的并发工具类库就是用来解决多线程环境下的并发问题的工具类库,一般分为同步器和容器两大类,比如
但是在使用并发工具类库时如果没有弄清楚各种并发类工具库的实现原理,提供的特性以及适用的情形,而是一遇到并发问题就选用相关的工具类库去处理,盲目认为并发类工具库能解决所有的并发问题必然会掉入一些坑。接下来就整理一些资料上总结的使用并发工具类库过程中可能遇到的坑。
@RestController
public class ThreadLocalController {
public static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
@GetMapping("thread")
public Map wrong(@RequestParam("userId") Integer userId) {
// 设置用户信息前先查询一次ThreadLocal中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
// 设置用用户信息到ThreadLocal之中
currentUser.set(userId);
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
}
}
在这段代码逻辑中,先把ThreadLocal中用户数据设置为初始值null,获取一次该值,然后再把外部传入的参数设置到ThreadLocal中,再次获取用户数据。期待的结果是第一次获取的用户数据是null,第二次获取的用户数据是前端出入的用户数据。
ThreadLocal作为线程变量主要是填充当前线程的变量,该变量是其他线程隔离的,而在web应用中,程序运行在Tomcat中,其执行线程是Tomcat的工作线程,而Tomcat的工作线程是基于线程池的。那么就存在一种情况,一个请求的线程重用了另一个线程,这就可能使得首次从ThreadLocal中获取的用户信息是前一个线程遗留的数据。
server.tomcat.max-threads=1
// 请求1
localhost:8080/thread?userId=123
返回值
{
"before": "http-nio-8080-exec-1:345",
"after": "http-nio-8080-exec-1:123"
}
// 请求2
localhost:8080/thread?userId=456
返回值
{
"before": "http-nio-8080-exec-1:123",
"after": "http-nio-8080-exec-1:456"
}
从结果而言,很容易发现在获取用户2的信息时也获取到了用户1的残留数据,为了解决这个问题的方法也比较简单,就是在当前线程结束后在finally代码块中显式清楚掉用户数据,那么重用的线程就不会再获取到残留数据了。
public class ThreadLocalController {
public static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
@GetMapping("thread")
public Map wrong(@RequestParam("userId") Integer userId) {
// 设置用户信息前先查询一次ThreadLocal中的用户信息
String before = Thread.currentThread().getName() + ":" + currentUser.get();
// 设置用用户信息到ThreadLocal之中
currentUser.set(userId);
// 设置完用户信息之后才查询一次
try{
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
} finally {
currentUser.remove();
}
}
}
@Slf4j
@RestController
public class ConcurrentHashMapController {
private static int THREAD_COUNT = 10;
private static int ITEM_COUNT = 1000;
/**
* range(开始节点,结束节点)返回一个有序的LongStream, 包含开始节点到结束节点的所有参数,间隔为1,rangeClosed包含结束节点,range不包含
* boxed 数值转换为流
* Collectors.toCurrentMap的参数
* 1. 第一个参数i -> UUID.randomUUID().toString()设置key
* 2. 第二个参数Function.identity()设置value, Function.identity()返回一个输出跟输入一样的Lambda表达式对象,等价于形如t -> t形式的Lambda表达式
* 3. 第三个参数(o1, o2) -> o1,表示如果如果o1与o2的key值相同,选择o1作为那个key所对应的value值
* 4. 第四个参数 ConcurrentHashMap::new表示先创建一个新的对象在传值
* @param count
* @return
*/
private ConcurrentHashMap<String, Long> getData(int count) {
return LongStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(),
Function.identity(),
(o1, o2) -> o1, ConcurrentHashMap::new));
}
@GetMapping("map")
public String wrong() throws InterruptedException {
// 初始的时候创建900个元素
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size:{}", concurrentHashMap.size());
// 通过线程池创建10个线程并发处理
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(0, 10).parallel().forEach(i -> {
// 查询还需补充的的元素个数
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
// 补充元素
concurrentHashMap.putAll(getData(gap));
}));
// 等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
// 查看最终Map内的元素个数
log.info("finish size:{}", concurrentHashMap.size());
return "num:" + concurrentHashMap.size();
}
2020-07-08 18:55:17.345 INFO 15960 --- [nio-8080-exec-1] c.c.s.ConcurrentHashMapController : init size:900
2020-07-08 18:55:17.349 INFO 15960 --- [Pool-1-worker-9] c.c.s.ConcurrentHashMapController : gap size:100
2020-07-08 18:55:17.351 INFO 15960 --- [Pool-1-worker-2] c.c.s.ConcurrentHashMapController : gap size:99
2020-07-08 18:55:17.351 INFO 15960 --- [Pool-1-worker-8] c.c.s.ConcurrentHashMapController : gap size:90
2020-07-08 18:55:17.351 INFO 15960 --- [ool-1-worker-11] c.c.s.ConcurrentHashMapController : gap size:90
2020-07-08 18:55:17.351 INFO 15960 --- [ool-1-worker-15] c.c.s.ConcurrentHashMapController : gap size:90
2020-07-08 18:55:17.351 INFO 15960 --- [Pool-1-worker-6] c.c.s.ConcurrentHashMapController : gap size:90
2020-07-08 18:55:17.352 INFO 15960 --- [ool-1-worker-10] c.c.s.ConcurrentHashMapController : gap size:71
2020-07-08 18:55:17.352 INFO 15960 --- [ool-1-worker-13] c.c.s.ConcurrentHashMapController : gap size:74
2020-07-08 18:55:17.351 INFO 15960 --- [Pool-1-worker-1] c.c.s.ConcurrentHashMapController : gap size:85
2020-07-08 18:55:17.351 INFO 15960 --- [Pool-1-worker-4] c.c.s.ConcurrentHashMapController : gap size:85
2020-07-08 18:55:17.355 INFO 15960 --- [Pool-1-worker-9] c.c.s.ConcurrentHashMapController : gap size:-125
2020-07-08 18:55:17.357 INFO 15960 --- [nio-8080-exec-1] c.c.s.ConcurrentHashMapController : finish size:1774
从结果来看,显然程序出现了线程安全问题,有的线程计算出了错误的应该添加的值,最终导致Map中的元素总数超过了1000。虽然并发的程序中使用了ConcurrentHashMap的并发安全类,但是由于ConcurrentHashMap只能保证其自身的读写的线程安全,而先计算需要添加的值,再进行添加的过程并不是一个原子操作,所以尽管使用了ConcurrentHashMap但是仍然会有线程安全的问题。而为了解决这个问题,一个最简单的方式就将计算待添加的数和进行添加的操作上一个锁,使其成为原子操作。
@GetMapping("map")
public String right() throws InterruptedException {
// 初始的时候创建900个元素
ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
log.info("init size:{}", concurrentHashMap.size());
// 通过线程池创建10个线程并发处理
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(0, 10).parallel().forEach(i -> {
synchronized (concurrentHashMap) {
int gap = ITEM_COUNT - concurrentHashMap.size();
log.info("gap size:{}", gap);
// 补充元素
concurrentHashMap.putAll(getData(gap));
}
}));
// 等待所有任务完成
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
// 查看最终Map内的元素个数
log.info("finish size:{}", concurrentHashMap.size());
return "num:" + concurrentHashMap.size();
}
2020-07-08 19:05:46.058 INFO 12636 --- [nio-8080-exec-1] c.c.s.ConcurrentHashMapController : init size:900
2020-07-08 19:05:46.064 INFO 12636 --- [Pool-1-worker-9] c.c.s.ConcurrentHashMapController : gap size:100
2020-07-08 19:05:46.068 INFO 12636 --- [ool-1-worker-10] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.068 INFO 12636 --- [Pool-1-worker-9] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.068 INFO 12636 --- [Pool-1-worker-1] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.068 INFO 12636 --- [Pool-1-worker-8] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.068 INFO 12636 --- [Pool-1-worker-6] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.069 INFO 12636 --- [Pool-1-worker-2] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.069 INFO 12636 --- [Pool-1-worker-4] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.069 INFO 12636 --- [ool-1-worker-11] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.069 INFO 12636 --- [ool-1-worker-13] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.069 INFO 12636 --- [ool-1-worker-15] c.c.s.ConcurrentHashMapController : gap size:0
2020-07-08 19:05:46.071 INFO 12636 --- [nio-8080-exec-1] c.c.s.ConcurrentHashMapController : finish size:1000
private static int LOOP_COUNT = 1000000;
private static int THREAD_COUNT = 10;
private static int ITEM_COUNT = 10;
/**
* ThreadLocalRandom.current().nextInt(ITEM_COUNT)表示多线程情况下生成随机数,保证每个线程生成的随机数不一致
* @return
* @throws InterruptedException
*/
@GetMapping("map1")
private Map<String, Long> normaluse() throws InterruptedException {
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
// key的value的初始值为1,key后value + 1;
if (freqs.containsKey(key)) {
freqs.put(key, freqs.get(key) + 1);
} else {
freqs.put(key, 1L);
}
}
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
System.out.println("size:" + freqs.values());
long sum = freqs.values().stream().collect(Collectors.summarizingLong(x -> x.longValue())).getSum();
System.out.println("sum:" + sum);
return freqs;
}
@GetMapping("map2")
private Map<String, Long> goodUse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
// 利用computeIfAbsent()方法来实例化LongAdder, 然后利用LongAdder来进行安全计数
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
Map<String, Long> collect = freqs.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue()
));
long sum = collect.values().stream().collect(Collectors.summarizingLong(x -> x.longValue())).getSum();
System.out.println("sum:" + sum);
return collect;
}
@GetMapping("test")
public String good() throws InterruptedException {
StopWatch stopWatch = new StopWatch();
stopWatch.start("normaluse");
Map<String, Long> normaluse = normaluse();
stopWatch.stop();
Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
Assert.isTrue(normaluse.entrySet().stream().mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT, "normaluse count error");
stopWatch.start("gooduse");
Map<String, Long> gooduse = goodUse();
stopWatch.stop();
Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
Assert.isTrue(gooduse.entrySet().stream().mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT, "gooduse count error");
log.info(stopWatch.prettyPrint());
return "OK";
}
-----------------------------------------
ms % Task name
-----------------------------------------
00472 078% normaluse
00131 022% gooduse
就结果而言,可以发现使用ConcurrentHashMap的computeIfAbsent方法比传统的synchronize方法效率高了很多倍,这主要是ConcurrentHashMap使用CAS在虚拟机层面确保写入数据的原子性,比通过synchronize加锁的方式实现原子性的效率高很多。由此可见,在并发场景下,如果没有合理的使用并发工具类提供的特性可能并不能发挥出其的效能。
@GetMapping("write")
public Map testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
int loopCount = 100000;
stopWatch.start("Write:copyOnWriteArrayList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("Write:synchronizedList");
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}
//帮助方法用来填充List
private void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
}
//测试并发读的性能
@GetMapping("read")
public Map testRead() {
//创建两个测试对象
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
//填充数据
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 1000000;
int count = copyOnWriteArrayList.size();
stopWatch.start("Read:copyOnWriteArrayList");
//循环1000000次并发从CopyOnWriteArrayList随机查询元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("Read:synchronizedList");
//循环1000000次并发从加锁的ArrayList随机查询元素
IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
log.info(stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}
-----------------------------------------
ms % Task name
-----------------------------------------
09355 100% Write:copyOnWriteArrayList
00030 000% Write:synchronizedList
-----------------------------------------
ms % Task name
-----------------------------------------
00051 014% Read:copyOnWriteArrayList
00309 086% Read:synchronizedList
通过测试结果可以发现,copyOnWriteArrayList在写多的场景下的性能非常差,但在读多的场景下性能很好。因此,在并发场景中也应该根据具体的场景选择合适的并发工具类。
原文:https://www.cnblogs.com/cy1995/p/13268908.html