ThreadLocal 线程局部变量,只对当前线程范围有效,比如下面例子,在第一个线程设置的值,第二个线程是使用不了的。
public class TLDemo2 { private static ThreadLocal<User> threadLocal = new ThreadLocal<>(); public static void main(String[] args) { new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } threadLocal.set(new User()); System.out.println(threadLocal.get()); }).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (Exception e) { e.printStackTrace(); } threadLocal.set(new User()); // threadLocal.remove(); System.out.println(threadLocal.get()); }).start(); } }
有10000张火车票,同时有10个窗口对外售票
请写一个模拟程序
public class SaleOfTickets1 { private static List<Integer> tickets = new ArrayList<>(); static{ for (int i = 0; i < 10000; i++) { tickets.add(i); } } /** * 当我们卖到最后一张票的时候,tickets是大于0的,第一个线程看到他大于0,进入判断, * 第二个线程同样操作区remove的时候票就没有了,在高并发的情况下,还可能会卖重, * 几百万线程去卖票的时候,remove方法不是同步的,第一个线程卖了这张票,第二个线程可能也买了。 * @param args */ public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ while(tickets.size() > 0){ System.out.println(Thread.currentThread().getName()+"销售票编号:" + tickets.remove(0)); } }).start(); } } }
public class SaleOfTickets2 { private static Vector<Integer> tickets = new Vector<>(); static { for (int i = 0; i < 10000; i++) { tickets.add(i); } } /** * vector是一个同步容器 * 这里是判断和操作分离了,虽然说在vector里面remove方法是原子性的,但是他的判断和remove方法是分离的, * 但是可能在判断到remove的过程当中线程可能会被打断。我们可以加一个模拟性的睡眠,因为在你实际开发的 * 时候,可能在这中间有些判断代码逻辑代码。 * 如果剩了最后一个了,很多线程去抢票,虽然size是原子性的,remove是原子性的,但是在他们的中间, * 线程还是有可能被打断 */ public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ while(tickets.size() > 0){ try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("销售票编号:" + tickets.remove(0)); } }).start(); } } }
public class SaleOfTickets3 { private static List<Integer> tickets = new LinkedList<>(); static { for (int i = 0; i < 10000; i++) { tickets.add(i); } } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ while(true){ //这里使用synchronized,使两个操作具备了原子性,不会出问题 synchronized(tickets){ if(tickets.size() <= 0){ break; } System.out.println("销售票编号:" + tickets.remove(0)); } } }).start(); } } }
在JDK1.5以后,java里面提供了很多的并发容器,这里我们用的是一个queue,队列。
* 所谓队列其实就是一个容器,就是站成一对,不管票还是人都在里面排成一堆,队列有几种,有先进先出的,
* 还有两端的队列,还有就是栈,先进后出,先加进去的后出来。
* 这里用了一个concurrentlinkedqueue,并发的链表队列。线程里面调用了一个poll方法,
* 意思是往外面拿一个数据,相当于在尾巴里面拿一个,如果没有拿到,他的返回值就是空,那么就中断线程。
* 这里面没有加锁,同样有判断,但是不会出问题。完成卖票功能这种效率是比较高的。queue里面是不能装空值。
* 这里虽然判断和操作是一起的,但是我们没有在判断里面有任何操作,大不了反过头来再拿一边,
* poll底层实现是cas,这里我们就不用加锁了。
public class SaleOfTickets4 { private static Queue<Integer> tickets = new ConcurrentLinkedQueue<>(); static { for (int i = 0; i < 10000; i++) { tickets.add(i); } } public static void main(String[] args) { for (int i = 0; i < 10; i++) { new Thread(()->{ while(true){ Integer poll = tickets.poll(); if(poll == null){ break; } System.out.println("销售票编号:" + poll); } }).start(); } } }
写时复制容器
*
* 在往集合中添加数据的时候,先拷贝存储的数组,然后添加元素到拷贝好的数组中,
* 然后用现在的数组去替换成员变量的数组(就是get等读取操作读取的数组)。
* 这个机制和读写锁是一样的,但是比读写锁有改进的地方,那就是读取的时候可以写入的 ,
* 这样省去了读写之间的竞争,看了这个过程,你也发现了问题,同时写入的时候怎么办呢,当然果断还是加锁。
* 读多写少可以用copyonwritelist
public class Demo { public static void main(String[] args) { // List<String> lists = new ArrayList<>(); // List<String> lists = new Vector<>(); List<String> lists = new CopyOnWriteArrayList<>(); Random r = new Random(); Thread[] threads = new Thread[100]; for (int i = 0; i < threads.length; i++) { Runnable task = new Runnable() { @Override public void run() { for (int j = 0; j < 1000; j++) { lists.add("A" + r.nextInt(10000)); } } }; threads[i] = new Thread(task); } run(threads); System.out.println(lists.size()); } private static void run(Thread[] threads) { long start = System.currentTimeMillis(); Arrays.asList(threads).forEach(t->t.start()); Arrays.asList(threads).forEach(t->{ try { t.join(); } catch (Exception e) { e.printStackTrace(); } }); long end = System.currentTimeMillis(); System.out.println(end - start); } }
collections是java里面一个集合处理类,里面有给容器加锁的方法,通过调用api可以返回一个加了锁的容器。
public static void main(String[] args) { ArrayList<String> arrayList = new ArrayList<>(); List<String> synchronizedList = Collections.synchronizedList(arrayList); }
public class Demo { public static void main(String[] args) { Queue<String> strings = new ConcurrentLinkedQueue<>(); for (int i = 0; i < 10; i++) { //offer,类似于add方法,add会出一些问题,比如容量限制, //超出限制会抛异常,offer有返回值可以判断是否加成功了 strings.offer("a" + i); } System.out.println(strings); System.out.println(strings.size()); System.out.println(strings.poll());//拿了就没了 System.out.println(strings.size()); System.out.println(strings.peek());//用一下不删 System.out.println(strings.size()); } }
并发的hashmap,这个例子测试一下效率
*
* 第一种用hashtable,hashtable所有方法都加了锁了,第二种concurrenthashmap,
* 大致能看出来他的效率要比hashtable要高一些,在多线程的情况下。
* 为什么呢,因为hashtable往里面加任何数据的时候都是要锁定整个对象,
* 而concurrenthashmap,是分成十六个段,每次插数据的时候,只会锁住一小段,1.8之后实现不同。
public class Demo { public static void main(String[] args) { Map<String, String> map = new ConcurrentHashMap<>(); //Map<String, String> map = new ConcurrentSkipListMap<>(); // Map<String, String> map = new Hashtable<>(); // Map<String, String> map = new HashMap<>(); // Map<String, String> map1 = Collections.synchronizedMap(map); Random random = new Random(); Thread[] threads = new Thread[100]; CountDownLatch latch = new CountDownLatch(threads.length); long start_time = System.currentTimeMillis(); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(()->{ for(int j=0; j<10000;j++) { map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000)); // map1.put("a" + random.nextInt(100000), "a" + random.nextInt(100000)); } latch.countDown(); }); } Arrays.asList(threads).forEach(t->t.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end_time = System.currentTimeMillis(); System.out.println(end_time-start_time); } }
换成hashtable看下结果
阻塞式的容器
public class Demo { private static BlockingQueue<String> strings = new LinkedBlockingQueue<>(10); public static void main(String[] args) { new Thread(()->{ for (int i = 0; i < 100; i++) { try { // 在阻塞式容器里面加了一个方法,put,也就是如果满了就会等待,对应的方法叫take,如果空了就会等待。 // 这种容器我们去用的时候自动就实现了阻塞式的生产者消费者。 strings.put("商品" + i); } catch (Exception e) { e.printStackTrace(); } } }, "producer").start(); for (int i = 0; i < 5; i++) { new Thread(()->{ for(;;){ try { // take,拿,如果空了也会阻塞 System.out.println(Thread.currentThread().getName() + " take " + strings.take()); //如果空了,就会等待 } catch (Exception e) { e.printStackTrace(); } } },"consumer" + i).start(); } } }
* 有界队列,意思就是说这个队列能装的元素的个数是固定的,后面讲线程池的时候,里面装的其实是一个个任务。
* 这里只能装10个,如果超过了可能会出问题可能会阻塞,这里看你调用什么方法。
* add会报异常
* offer不会报异常,他只通过布尔类型的返回值来告诉你是加成功了还是没有加成功。
* offer可以设置时间,如果这段时间加不进去就不加了也就是返回false
* put方法是满了会阻塞住。
public class Demo { private static BlockingQueue<String> strings = new ArrayBlockingQueue<>(10); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { strings.put("a" + i); } strings.add("aaaa"); // strings.put("aaaa"); // strings.offer("aaaa"); strings.offer("aaaa",1, TimeUnit.SECONDS); System.out.println(strings); } }
容器里每一个元素都设置了一个时间,时间到了才能从中提取元素
public class Demo { private static BlockingQueue<MyTask> tasks = new DelayQueue<>(); static class MyTask implements Delayed{ long runningTime; public MyTask(long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) { return -1; }else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; }else { return 0; } } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return "" + runningTime; } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now+1000); MyTask t2 = new MyTask(now+2000); MyTask t3 = new MyTask(now+1500); MyTask t4 = new MyTask(now+2500); MyTask t5 = new MyTask(now+500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for (int i = 0; i < 5; i++) { System.out.println(tasks.take()); } } } }
* 和普通的queue的方法差不多,多了一个transfer方法。
* 如果你用这种队列的话,往往是消费者先启动,生产者生产一个东西的时候,他先是去找消费者,
* 如果有消费者就直接丢给消费者。
public class Demo { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strings = new LinkedTransferQueue<>(); new Thread(()->{ try { System.out.println("t1"+strings.take()); } catch (Exception e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { System.out.println("t2"+strings.take()); } catch (Exception e) { e.printStackTrace(); } }).start(); TimeUnit.SECONDS.sleep(2); strings.transfer("aaa"); // strings.put("aaa"); System.out.println(strings.size()); // new Thread(()->{ // try { // System.out.println(strings.take()); // } catch (Exception e) { // e.printStackTrace(); // } // }).start(); } }
同步队列
* 同步队列是容量为0,也就是来的东西必须给消费掉.
* 首先启动一个消费者,调用add方法,他报错了
* 只能调用put,意思就是阻塞等待消费者消费。put里面其实用的是transfer,任何东西必须消费,不能往容器里面扔。
public class Demo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> strings = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(strings.take()); } catch (Exception e) { e.printStackTrace(); } }).start(); // strings.add("aaa"); strings.put("aaa"); strings.put("aaa"); strings.put("aaa"); strings.put("aaa"); strings.put("aaa"); System.out.println(strings.size()); } }
原文:https://www.cnblogs.com/lusaisai/p/12741456.html