Java并发编程实战笔记(4)-基础构建模块
转载请注明出处 http://www.paraller.com
原文排版地址 http://www.paraller.com/2016/02/03/Java%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B%E5%AE%9E%E6%88%98%E7%AC%94%E8%AE%B0%284%29-%E5%9F%BA%E7%A1%80%E6%9E%84%E5%BB%BA%E6%A8%A1%E5%9D%97/
主要介绍Java类库中并发基础构建模块 & 常用模式
同步容器类
这些容器类的关键在于:将他们的状态封装起来,并对每个公有方法进行同步,使得每次只有一个线程可以访问容器的状态
存在的问题
复合操作需要同步:包括 迭代、跳转、条件运算,虽然每个接口只能一个线程操作,但是存在线程交替操作
的情况,所以不能保证没有其他线程修改状态
解决:正确的客户端加锁
迭代器 与 ConcurrentModificationException
在迭代器场景中,如果有状态被修改将会抛出 ConcurrentModificationException 并发修改异常
原理:将计数器的变化与容器关联起来,如果容器在迭代,计数器被修改,那么hasNext和next操作会抛出异常。缺点:对检查并没有同步 ; 原因:对性能的权衡妥协
方案:客户端加锁,等待时间过长。使用克隆
容器的方式,在克隆的时候要加锁, 克隆会造成性能开销,需要从调用频率和响应时间、容器大小等考虑
隐藏的迭代器:
如果状态与保护她的同步代码之间相隔越远,开发者就越容易忘记在访问状态时使用同步
避免下述这种情况的方式: 使用 synchronizedSet来替代并且对同步代码进行封装
Set<Integer> demoSet = new HashSet(); public void doSome(){ system.out.println("value is "+ demoSet); }
hasCode / equals / containsAll / retainAll / removeAll 等方法,以及把容器作为构造器参数的场景, 都会对容器进行迭代。都有可能抛出并发修改异常。
并发容器类
设计的原因:提高伸缩性并降低风险
- 改进同步容器类的性能: 同步容器类将所有对容器状态的访问都串行化, 以实现线程安全,在大并发的情况下吞吐量将严重降低。
- 针对多个并发访问增加了一些特性: 例如条件运算的复合操作, 不会产生并发修改异常的迭代器
ConcurrentHashMap
同步容器类在执行每一个操作都会持有一个锁,而像迭代这样的操作会耗费很长的时间,导致其他线程挂起。
- 并发容器类在锁的粒度方面做了改进。这种机制称为
分段锁(Lock Striping)
,后面章节会介绍。 - 迭代器操作具有弱一致性,可以容忍并发修改。在迭代器被创建后,出现修改的操作会反映给容器。(但是不保证)
- ConcurrentHashMap 相对于同步Map有更多的优势和更少的劣势,只有当需要对Map进行独占操作时,才放弃对ConcurrentHashMap的使用。
CopyOnWriteArrayList
迭代期间不需要加锁或复制,不会抛出并发修改异常
- 线程安全性在于:只要正确的发布一个事实不可变的CopyOnWriteArrayList对象,那么访问该对象时就不再需要进一步的同步(这个不太懂。。。)
- 每次修改时都会创建并重新发布一个新的容器副本
- CopyOnWriteArrayList容器保留一个指向底层基础数组的引用,这个数组位于迭代器的起始位置,不会被修改(不支持可变 remove等操作),对其进行同步时只需要确保数组内容的可见性
- 开销:
每次修改容器都会复制底层数组
,当容器规模较大的时候需要较大开销,适用于 迭代操作远多于修改操作,比如 servlet过滤器/监听器注册表
阻塞队列 和 消费者-生产者模式
- 将复杂操作分解成两个组建,简化业务逻辑
- 常见场景:
线程池
、工作队列、Executor任务执行框架 - 当队列为空,take操作会阻塞,特别适合 服务器接收请求的场景
- 阻塞队列提供了 offer方法,当数据不能被添加到队列中,那么将返回一个失败状态,能够灵活的自定义处理负载的策略;
- 设置工作队列的时候尽量使用有界队列,防止架构更改
- 特殊的“队列”,并不会维护空间,直接交付给消费者,没有存储和FIFO操作
双端队列 与 工作密取(Work stealing)
双端队列: 在队列头和队列尾高效插入和移除。包含 Deque
和BlockingDeque
两种类型
工作密取模式:
- 为了高效:每个消费者都有自己的双端队列,当自己的队列任务完成之后,会从其他消费者的 双端队列的尾部(减少竞争关系)获取任务消费
- 既是消费者又是生产者的模式: 比如网络爬虫,在解析到网页内容的时候发现了其他超链接,将链接加到队列的尾部;其他场景:搜索图算法。
阻塞方法 和 中断方法
- 线程会因为一些原因阻塞,比如 IO操作、等待获得一个锁、从Thread.sleep中醒来等。
- 阻塞状态包括(BLOCKED / WAITING / TIMED_WAITING) , 运行状态(RUNNABLE)
InterruptedException:
- 受检查异常。表示一个阻塞方法被中断,它将努力提前结束阻塞状态
- Interrupt方法,Thread中特有的: 用于中断线程 或者 查询线程是否中断
- 中断是一种协作机制,一个A线程不能强制另一个B线程马上中断,只能要求B线程在可以中断的地方停止它的操作(前提是B愿意停下来)。
当代码中调用了一个可能 抛出InterruptedException
的方法时,你自己的代码也变成了阻塞方法
,并且必须要处理
对中断的响应。响应的方式有两种:
- 传递InterruptedException,避开异常,直接抛出给方法的调用者处理,或者捕获异常作简单的处理再抛出异常。
- 保持中断状态(恢复中断):有时不能直接抛出异常,比如当代码是 Runnable的一部分时。处理方式: 捕获异常,保持线程的中断状态,在调用栈中的更高层代码将看到该线程引发了中断
class Demo implements Runnalbel{ public void run(){ try{ queue.take(); }catch{ Thread.currentThread().interrupt(); } } }
同步工具类
- 只要对象可以根据自身的状态来协调线程的
控制流
,任何对象都可以是同步工具类 - 工具类包含一些特定的结构化属性:1、封装一些状态。这些状态将决定线程的执行阶段; 2、提供方法对状态进行操作; 3、一些方法用于高效的等待同步工具类进入到预期状态?
- 常见的工具类包括: 队列 / 信号量(semaphore) / 栅栏(Barrier) / 闭锁(Latch)
闭锁(Latch)
可以延迟线程的进度(阻塞进程,等待时机释放),直到闭锁到达终止状态,闭锁到达终止状态将不再更新状态; 可以用来确保某些活动在其他活动完成之后才执行。常见场景:
- 王者荣耀中兵线的初始化线程A,需要所有玩家加载游戏进度完成的线程都结束后,才会开始A
- 服务在其依赖的所有服务启动完成之后再启动
CountDownLatch是一种闭锁实现,包括一个计数器,需要等待的时间数量,countDown
方法减少计数值,await
方法一直阻塞直到计数器为0,或者事件中断或超时
void test(){ final CountDownLatch cd = new CountDownLatch(1); for(int i = 0 ;i < 10; i++){ Thread thread = new Thread(){ public void run(){ cd.await(); ... } } } cd.countDown(); }
闭锁(FutureTask)
- FutureTask实现了Future的语义,表示一种可生成结果的计算(通过Callable类实现);
- FutureTask
相当于一种可生成结果的 Runnable
,可以处于三种状态, 等待运行 、 正在运行 、 运行完成;运行完成包括三种:正常结束、取消而结束、异常结束 - Future.get方法:如果任务完成马上返回结果,否则一直阻塞直到完成或者异常结束。结果从执行线程返回给获取结果的这个线程
- FutureTask在Executor框架中表示异步任务; 此外还可以用来表示时间较长的计算,可以在使用结果之前启动
class Demo{ FutureTask<Product> future = new FutureTask<Product>(new Callable<Product>(){ public Prodcut call() throws LoadException{ return db.load(); } }); Thread thread = new Thread(future); void start(){ thread.start(); } Product getPro() throws LoadException,InterruptedException{ try{ return future.get(); }catch(ExecutionException e){ Throwable th = e.getCause(); if(th instanceOf LoadException){ throw LoadException }else{ throw launderThrowable(cause); } } } }
- A线程任务未完成能够阻塞当前B线程,实现控制流
- Callable表示的任务可以抛出 受检查或者未受检查的异常, 并且任务代码都可能抛出一个Error
- 无论代码中抛出什么异常,都会被封装到
ExecutionException
中,并在调用get方法的时候抛出;ExecutionException作为 Throwable类返回,可能的情况比较复杂需要额外处理 - 如果任务被取消,Get方法将会抛出 CancellationException
对异常的处理,逻辑如下:
- 检查已知的受检查异常,并重新抛出
- Error:重新抛出
- RuntimeException: 重新抛出
- 抛出 IllegalStateException 表示这是一个漏记错误
RuntimeException launderThrowable(Throwable cause){ if(cause instanceOf RuntimeException){ return (RuntimeException) cause; }else if(cause instanceOf Error){ throw (Error) cause; }else{ throw new IllegalStateException("checked",cause); } }
信号量
定义:计数信号量用来控制同时访问:
- 某个特定资源的操作数量
- 某个指定操作的数量
- 可以用来实现 资源池、容器的施加边界
原理:
- Semaphore管理者一组虚拟许可(permit),初始数量在构造器中指定; 在执行操作的时候先获得许可,使用完成之后进行释放
- 获取许可使用 acquire方法(理解为消费),如果没有可用的许可将会阻塞。 release方法返回一个许可(理解为创建)
- Semaphore 不会与线程关联起来,A线程消费的信号可以在B线程创建,
- 信号量不受限于构造过程中的创建的许可数量
适用场景:
- 实现资源池,比如数据库连接,固定初始大小,每次获取需要阻塞等待资源(最好用 BlockingDueue实现)
- 限定容器的大小。在容器外使用装饰器模式,并且在添加和删除的时候使用 Semaphore
class Demo{ private final Semaphore sem = new Semaphore(2); public void test(){ sem.acquire(); ... sem.release(); } }
栅栏(Barrier)
栅栏与闭锁有许多共性,一些区别如下:
闭锁可以控制一组相关操作,进入终止状态不能重置 | 栅栏状态可以重置
闭锁用于等待事件 | 栅栏用于等待其他线程
闭锁用于所有线程等待一个外部事件
的发生 | 栅栏则是所有线程相互等待
,直到所有线程都到达某一点
时才打开栅栏,然后线程可以继续执行
单向栅栏:
常见场景:将问题分解成几个计算子任务,流程如下:
- 当线程到达栅栏位置的时候,将会调用await方法,这个方法将阻塞直到所有的线程都到达栅栏的位置
- 如果所有线程到达,栅栏将会打开,此时所有的线程将被释放,而栅栏将被重置等待下次使用
- 如果对调用await方法超时,或者await阻塞的线程被中断,栅栏的完整性将被打破,所有阻塞的await调用都将终止并抛出
BrokenBarrierException
- 如果成功通过,await方法将会返回一个唯一索引,后续程序可以通过这个索引选举 leader,来执行一些特殊的操作
双向栅栏:
另一种形式的栅栏是 Exchanger: 当两方执行不对称操作的时候,例如一个线程向缓冲区写入数据,另一个线程向缓存区获取数据。这些线程可以用 Exchanger来汇合,将满的缓冲区和空的缓冲区进行交换。