Thread :
new Thread(){ public void run() { }; }.start();
Runnable :
new Thread(new Runnable() { @Override public void run() { } }).start();
当多个线程操作同一个共享资源会存在线程安全问题,我们需要使用同步来进行控制,一个线程在操作的时候,其他的线程就不允许再操作了,当这个线程运行完了或是释放了线程锁,其他线程才可以运行。Java中使用 synchronized 关键字进行线程同步互斥:
synchronized(线程锁){ }
synchronize关键字还可以加在方法名上,那么在方法名上使用的时候线程锁使用的是哪个对象呢? 没错,就是this
private boolean tage=true; public synchronized void sub(int i){ while(!tage){ //如果不是本线程运行则等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //下面可以写线程运行的操作代码 tage=false; //调整标志 this.notify(); //通知(唤醒)下面main方法中线程 } public synchronized void main(int i){ while(tage){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //下面可以写线程运行的操作代码 tage=true; this.notify(); }
public class ThreadCommuncation { /** * 子线程循环10次,切换到主线程循环100次,再切换回子线程,如此往复50次 * @param args */ public static void main(String[] args) { Business business=new Business(); //子线程 new Thread(new Runnable() { @Override public void run() { //往复50次 for(int i=1;i<=50;i++){ business.sub(i); } } }).start(); //主线程往复50次 for(int i=1;i<=50;i++){ business.main(i); } } } class Business{ private boolean tage=true; public synchronized void sub(int i){ //子线程循环10次后切换 while(!tage){ //如果不是本线程运行则等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j=1;j<=10;j++){ System.out.println("子线程第"+i+"趟的第"+j+"次循环"); } tage=false; //调整标志 this.notify(); //通知(唤醒)主线程 } public synchronized void main(int i){ //主线程循环100次后切换 while(tage){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j=1;j<=100;j++){ System.out.println("主线程第"+i+"趟的第"+j+"次循环"); } tage=true; this.notify(); } }
new Timer().schedule(new TimerTask() { @Override public void run() { } }, 1000);
import java.util.HashMap; import java.util.Map; import java.util.Random; /** * 同一线程范围内共享变量 * @author LZ * */ public class ThreadScopeSharaData { private static Map<Thread,Integer> threadData=new HashMap<Thread,Integer>(); public static void main(String[] args) { //开启两个线程 for(int i=0;i<2;i++){ new Thread(new Runnable() { @Override public void run() { //每个线程共享的数据 int data=new Random().nextInt(10); System.out.println(Thread.currentThread().getName()+" put data "+data); threadData.put(Thread.currentThread(), data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ int data=threadData.get(Thread.currentThread()); System.out.println("A "+Thread.currentThread().getName()+" get data "+data); } } static class B{ public void get(){ int data=threadData.get(Thread.currentThread()); System.out.println("B "+Thread.currentThread().getName()+" get data "+data); } } }
import java.util.Random; /** * 同一线程范围内共享变量(使用Threadlocal) * @author LZ * */ public class ThreadLocalTest { //private static Map<Thread,Integer> threadData=new HashMap<Thread,Integer>(); public static void main(String[] args) { //开启两个线程 for(int i=0;i<2;i++){ new Thread(new Runnable() { @Override public void run() { //每个线程共享的数据 int data=new Random().nextInt(10); System.out.println(Thread.currentThread().getName()+" put data "+data); //threadData.put(Thread.currentThread(), data); MyThreadScopDate.getInstance().setName("name"+data); MyThreadScopDate.getInstance().setAge("age"+data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ //int data=threadData.get(Thread.currentThread()); MyThreadScopDate myDate = MyThreadScopDate.getInstance(); System.out.println("A "+Thread.currentThread().getName()+" get Mydate "+myDate.getName()+"\t"+myDate.getAge()); } } static class B{ public void get(){ //int data=threadData.get(Thread.currentThread()); MyThreadScopDate myDate = MyThreadScopDate.getInstance(); System.out.println("B "+Thread.currentThread().getName()+" get Mydate "+myDate.getName()+"\t"+myDate.getAge()); } } } /** * 将Threadlocal放入MyThreadScopDate类作为属性,使用单例模式创建该类的对象 * @author LZ * */ class MyThreadScopDate{ private static ThreadLocal<MyThreadScopDate> map=new ThreadLocal<MyThreadScopDate>(); private MyThreadScopDate(){} public static MyThreadScopDate getInstance(){ MyThreadScopDate myThreadScopDate=map.get(); if(myThreadScopDate==null){ myThreadScopDate=new MyThreadScopDate(); map.set(myThreadScopDate); } return myThreadScopDate; } private String name; private String age; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } }
在JDK5之后添加了线程的并发库,用于帮助我们操作线程,并发库中的东西大多都是java.util.concurrent 包下的,下来简单记录一下并发库中常见的类及其使用方法
class Inner{ //实例化锁(Lock是一个接口,使用ReentrantLock实现类, //还有ReadLock读锁,对资源读时使用。writeLock写锁,写资源时使用) Lock lock=new ReentrantLock(); public void output(String name){ lock.lock(); //上锁 try { int length=name.length(); for(int i=0;i<length;i++){ System.out.print(name.charAt(i)); } System.out.println(); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); //解锁 } } }
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ThreadConditionCommuncation { /** * 使用Condition进行线程通信方式实现 * 线程1循环10次,切换到线程2循环20次,再切换回主线程循环100次,如此往复10次 * @param args */ public static void main(String[] args) { Business business=new Business(); //子线程1 new Thread(new Runnable() { @Override public void run() { //往复50次 for(int i=1;i<=10;i++){ business.sub(i); } } }).start(); //子线程2 new Thread(new Runnable() { @Override public void run() { //往复50次 for(int i=1;i<=10;i++){ business.sub2(i); } } }).start(); //主线程往复50次 for(int i=1;i<=10;i++){ business.main(i); } } static class Business{ private Lock lock=new ReentrantLock(); //使用condition实现线程之间通信(实例化3个condition对应3个线程) Condition condition1=lock.newCondition(); Condition condition2=lock.newCondition(); Condition condition3=lock.newCondition(); private int tage=1; //线程1开始执行 public void sub(int i){ lock.lock(); try { //子线程循环10次后切换 while(tage!=1){ //如果不是线程1等待 try { //this.wait(); condition1.await(); //使用Condition的await替代Object的wait } catch (InterruptedException e) { e.printStackTrace(); } } for(int j=1;j<=10;j++){ System.out.println("子线程1第"+i+"趟的第"+j+"次循环"); } // tage=false; // this.notify(); tage=2; //标志为2 condition2.signal(); //线程2唤醒 } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void sub2(int i){ lock.lock(); try { //子线程2循环20次后切换 while(tage!=2){ try { //this.wait(); condition2.await(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j=1;j<=20;j++){ System.out.println("子线程2第"+i+"趟的第"+j+"次循环"); } // tage=false; // this.notify(); tage=3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void main(int i){ lock.lock(); try { //主线程循环100次后切换 while(tage!=3){ try { //this.wait(); condition3.await(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j=1;j<=100;j++){ System.out.println("主线程第"+i+"趟的第"+j+"次循环"); } // tage=true; // this.notify(); tage=1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * JDK5中的线程池 * @author LZ * */ public class ThreadPoolTest { public static void main(String[] args) { ExecutorService threadPool = Executors.newFixedThreadPool(3); //固定大小的线程池 //ExecutorService threadPool =Executors.newCachedThreadPool(); //带缓存的线程池(池中线程个数不一定,需要则自动创建) //ExecutorService threadPool = Executors.newSingleThreadExecutor(); //单一线程池(保证线程池中只有一个线程,如果这个线程被销毁,则会在创建一个新的线程) for(int i=1;i<=10;i++){ //10个任务 final int tage=i; threadPool.execute(new Runnable() { //给线程池任务 @Override public void run() { //每个任务要求线程循环10次 for(int j=1;j<=10;j++){ try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" is loopingt of "+j+" the task is "+tage); } //threadPool.shutdown(); //任务完成销毁线程池 //threadPool.shutdownNow(); //立刻销毁线程池 } }); } } }
我们使用线程池的 execute 方法来完成任务,这个方法没有返回结果,线程池还有一个可以返回结果的方法——submit方法
import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class CallableAndFuture { public static void main(String[] args) { //线程池 ExecutorService threadPool = Executors.newSingleThreadExecutor(); //返回的结果封装到Future对象中 Future<String> future = threadPool.submit(new Callable<String>() { //使用submit方法提交可以得到结果(不需要得到结果的可以使用execute方法) @Override public String call() throws Exception { Thread.sleep(2000); return "Hello"; } }); System.out.println("等待结果"); try { //使用Future对象的get方法得到返回值 System.out.println("拿到结果:"+future.get()); } catch (Exception e) { e.printStackTrace(); } } }
ExecutorService threadPool2 = Executors.newFixedThreadPool(10); //CompletionService用于提交一组Callable任务 CompletionService<Integer> completionService=new ExecutorCompletionService<Integer>(threadPool2); for(int i=1;i<=10;i++){ final int sta=i; completionService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return sta; } }); } for(int i=0;i<10;i++){ try { //task方法用于返回已完成的第一个Callable任务的结果(就是Future),在使用Future的get方法得到值 System.out.println(completionService.take().get()); } catch (Exception e) { e.printStackTrace(); } }
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueCommuncation2 { /** * 使用阻塞队列(BlockingQueue)实现 * 线程1循环10次,切换回主线程循环100次,如此往复10次 * @param args */ public static void main(String[] args) { Business business=new Business(); //子线程1 new Thread(new Runnable() { @Override public void run() { //往复50次 for(int i=1;i<=10;i++){ business.sub(i); } } }).start(); //主线程往复50次 for(int i=1;i<=10;i++){ business.main(i); } } static class Business{ //使用两个大小为1的阻塞队列 BlockingQueue<Integer> queue1=new ArrayBlockingQueue<>(1); BlockingQueue<Integer> queue2=new ArrayBlockingQueue<>(1); //下面是匿名构造块(优先于构造方法,实例化几个对象就执行几次) { try { queue2.put(1); //队列2放数据 } catch (InterruptedException e) { e.printStackTrace(); } } public void sub(int i){ // lock.lock(); // try { // //子线程循环10次后切换 // while(tage!=1){ //如果不是线程1等待 // try { // //this.wait(); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } try { queue1.put(1); //队列1放数据 } catch (InterruptedException e) { e.printStackTrace(); } for(int j=1;j<=10;j++){ System.out.println("子线程第"+i+"趟的第"+j+"次循环"); } try { queue2.take(); //队列2取数据 } catch (InterruptedException e) { e.printStackTrace(); } // tage=false; // this.notify(); // // } catch (Exception e) { // e.printStackTrace(); // }finally { // lock.unlock(); // } } public void main(int i){ // lock.lock(); // try { // //主线程循环100次后切换 // while(tage!=3){ // try { // //this.wait(); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } try { queue2.put(1); //队列2放数据 } catch (InterruptedException e) { e.printStackTrace(); } for(int j=1;j<=100;j++){ System.out.println("主线程第"+i+"趟的第"+j+"次循环"); } try { queue1.take(); //队列1取数据 } catch (InterruptedException e) { e.printStackTrace(); } // tage=true; // this.notify(); // // } catch (Exception e) { // e.printStackTrace(); // }finally { // lock.unlock(); // } } } }
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 现有的程序代码模拟产生了16个日志对象, * 并且需要运行16秒才能打印完这些日志, * 请在程序中增加4个线程去调用parseLog()方法来分头打印这16个日志对象, * 程序只需要运行4秒即可打印完这些日志对象 * @author LZ * */ public class Test1 { public static void main(String[] args) { //使用可阻塞的队列实现 final BlockingQueue<String> queue=new ArrayBlockingQueue<String>(16); for(int i=0;i<4;i++){ //开4个线程 new Thread(new Runnable() { @Override public void run() { while(true){ //使用while不断从队列中取日志 try { String log = queue.take(); //从队列中取日志 parseLog(log); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } System.out.println("begin: "+(System.currentTimeMillis()/1000)); for(int i=0;i<16;i++){ //这行代码不能改动 final String log=""+(i+1); //这行代码不能改动 //Test1.parseLog(log); try { queue.put(log); //将日志放入队列 } catch (InterruptedException e) { e.printStackTrace(); } } } //parseLog方法内部的代码不能改动 public static void parseLog(String log){ System.out.println(log+" : "+(System.currentTimeMillis()/1000)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 现成程序中的Test类中的代码在不断地产生数据, * 然后交给TestDo.doSome()方法去处理,就好像生产者在不断地产生数据,消费者在不断消费数据。 * 请将程序改造成有10个线程来消费生成者产生的数据,这些消费者都调用TestDo.doSome()方法去进行处理, * 故每个消费者都需要一秒才能处理完,程序应保证这些消费者线程依次有序地消费数据, * 只有上一个消费者消费完后,下一个消费者才能消费数据, * 下一个消费者是谁都可以, * 但要保证这些消费者线程拿到的数据是有顺序的 * @author LZ * */ public class Test2 { public static void main(String[] args) { //使用同步库中同步队列 SynchronousQueue<String> queue=new SynchronousQueue<>(); //使用Lock Lock lock=new ReentrantLock(); for(int i=0;i<10;i++){ //开10个线程 new Thread(new Runnable() { @Override public void run() { try { //将线程阻塞 lock.lock(); String input=queue.take(); String output = TestDo.doSome(input); System.out.println(Thread.currentThread().getName()+ ":" + output); //释放 lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } System.out.println("begin:"+(System.currentTimeMillis()/1000)); for(int i=0;i<10;i++){ //这行不能改动 String input = i+""; //这行不能改动 // String output = TestDo.doSome(input); // System.out.println(Thread.currentThread().getName()+ ":" + output); try { queue.put(input); } catch (InterruptedException e) { e.printStackTrace(); } } } } //不能改动此TestDo类 class TestDo { public static String doSome(String input){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String output = input + ":"+ (System.currentTimeMillis() / 1000); return output; } }
ps :或者不使用线程安全的集合,使用ListIterator就可以对List集合在遍历时操作;又或者将线程安全的集合和Listiterator结合使用