java.util.concurrent常用类(CountDownLatch,Semaphore,CyclicBarrier,F
CyclicBarrier
CyclicBarrier是用来一个关卡来阻挡住所有线程,等所有线程全部执行到关卡处时,再统一执行下一步操作。假设一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家就等待 。
代码示例:
public class UseCyclicBarrier { static class Runner implements Runnable { private CyclicBarrier barrier; private String name; public Runner(CyclicBarrier barrier, String name) { this.barrier = barrier; this.name = name; } @Override public void run() { try { Thread.sleep(1000 * (new Random()).nextInt(5)); System.out.println(name + " 准备OK."); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(name + " Go!!"); } } public static void main(String[] args) throws IOException, InterruptedException { CyclicBarrier barrier = new CyclicBarrier(3); // ExecutorService executor = Executors.newFixedThreadPool(3); executor.submit(new Thread(new Runner(barrier, "zhangsan"))); executor.submit(new Thread(new Runner(barrier, "lisi"))); executor.submit(new Thread(new Runner(barrier, "wangwu"))); executor.shutdown(); } }View Code
结果:只有都准备OK了以后才继续执行await后面的代码
wangwu 准备OK. lisi 准备OK. zhangsan 准备OK. zhangsan Go!! lisi Go!! wangwu Go!!
CountDownLacth
CountDownLatch是一个计数器闭锁,主要的功能就是通过await()方法来阻塞住当前线程,然后等待计数器减少到0了,再唤起这些线程继续执行。常用于监听某些初始化操作,等待初始化执行完毕后,通知主线程继续工作。
代码示例:
public class UseCountDownLatch { public static void main(String[] args) { final CountDownLatch countDown = new CountDownLatch(2); Thread t1 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("进入线程t1" + "等待其他线程处理完成..."); countDown.await(); System.out.println("t1线程继续执行..."); } catch (InterruptedException e) { e.printStackTrace(); } } },"t1"); Thread t2 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("t2线程进行初始化操作..."); Thread.sleep(3000); System.out.println("t2线程初始化完毕,通知t1线程继续..."); countDown.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t3 = new Thread(new Runnable() { @Override public void run() { try { System.out.println("t3线程进行初始化操作..."); Thread.sleep(4000); System.out.println("t3线程初始化完毕,通知t1线程继续..."); countDown.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); t3.start(); } }View Code
结果:
t2线程进行初始化操作... t3线程进行初始化操作... 进入线程t1等待其他线程处理完成... t2线程初始化完毕,通知t1线程继续... t3线程初始化完毕,通知t1线程继续... t1线程继续执行...
CyclicBarrier和CountDownLatch的区别
CountDownLacth的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,若计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。
Semaphore
Semaphore与CountDownLatch相似,不同的地方在于Semaphore的值被获取到后是可以释放的,并不像CountDownLatch那样一直减到底。它也被更多地用来限制流量,类似阀门的 功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。有点类似于锁的lock与 unlock过程。相对来说他也有两个主要的方法:
- 用于获取权限的acquire(),其底层实现与CountDownLatch.countdown()类似;
- 用于释放权限的release(),其底层实现与acquire()是一个互逆的过程。
代码层面的限流策略
Semaphore sema = new Semaphore(5);//这里的5就表示最多接受5个线程。
sema.aquire();//获取授权
代码块;
sema.release();//释放
代码示例:
public class UseSemaphore { public static void main(String[] args) { // 线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 只能5个线程同时访问 final Semaphore semp = new Semaphore(5); // 模拟20个客户端访问 for (int index = 0; index < 20; index++) { final int NO = index; Runnable run = new Runnable() { public void run() { try { // 获取许可 semp.acquire(); System.out.println("Accessing: " + NO); //模拟实际业务逻辑 Thread.sleep((long) (Math.random() * 10000)); // 访问完后,释放 semp.release(); } catch (InterruptedException e) { } } }; exec.execute(run); } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } //System.out.println(semp.getQueueLength()); // 退出线程池 exec.shutdown(); } }View Code
Future
它的原理在之前介绍过了,下面看下concurrent包下的Future是怎么用的?
代码示例:
public class UseFuture implements Callable<String>{ private String para; public UseFuture(String para){ this.para = para; } /** * 这里是真实的业务逻辑,其执行可能很慢 */ @Override public String call() throws Exception { //模拟执行耗时 Thread.sleep(5000); String result = this.para + "处理完成"; return result; } //主控制函数 public static void main(String[] args) throws Exception { String queryStr = "query"; //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类 FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr)); FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr)); //创建一个固定线程的线程池且线程数为1, ExecutorService executor = Executors.newFixedThreadPool(2); //这里提交任务future,则开启线程执行RealData的call()方法执行 //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值 Future f1 = executor.submit(future); //单独启动一个线程去执行的 Future f2 = executor.submit(future2); System.out.println("请求完毕"); try { //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑 System.out.println("处理实际的业务逻辑..."); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待 System.out.println("数据:" + future.get()); System.out.println("数据:" + future2.get()); executor.shutdown(); } }View Code