JVM 并发性: Java 和 Scala 并发性基础(1)

转载于:转载于:http://www.itxuexiwang.com/a/javajishu/jvm_jdk_yuanmafenxi/2016/0129/50.html?1454076380

处理器速度数十年来一直持续快速发展,并在世纪交替之际走到了终点。从那时起,处理器制造商更多地是通过增加核心来提高芯片性能,而不再通过增加时钟速率来提高芯片性能。多核系统现在成为了从手机到企业服务器等所有设备的标准,而这种趋势可能继续并有所加速。开发人员越来越需要在他们的应用程序代码中支持多个核心,这样才能满足性能需求。

在本系列文章中,您将了解一些针对Java和Scala语言的并发编程的新方法,包括Java如何将Scala和其他基于JVM的语言中已经探索出来的理念结合在一起。第一期文章将介绍一些背景,通过介绍Java7和Scala的一些最新技术,帮助了解JVM上的并发编程的全景。您将了解如何使用JavaExecutorService和ForkJoinPool类来简化并发编程。还将了解一些将并发编程选项扩展到纯Java中的已有功能之外的基本Scala特性。在此过程中,您会看到不同的方法对并发编程性能有何影响。后续几期文章将会介绍Java8中的并发性改进和一些扩展,包括用于执行可扩展的Java和Scala编程的Akka工具包。

Java并发性支持

在Java平台诞生之初,并发性支持就是它的一个特性,线程和同步的实现为它提供了超越其他竞争语言的优势。Scala基于Java并在JVM上运行,能够直接访问所有Java运行时(包括所有并发性支持)。所以在分析Scala特性之前,我首先会快速回顾一下Java语言已经提供的功能。

Java线程基础

在Java编程过程中创建和使用线程非常容易。它们由java.lang.Thread类表示,线程要执行的代码为java.lang.Runnable实例的形式。如果需要的话,可以在应用程序中创建大量线程,您甚至可以创建数千个线程。在有多个核心时,JVM使用它们来并发执行多个线程;超出核心数量的线程会共享这些核心。

Java5:并发性的转折点

Java从一开始就包含对线程和同步的支持。但在线程间共享数据的最初规范不够完善,这带来了Java5的Java语言更新中的重大变化(JSR-133)。JavaLanguageSpecificationforJava5更正并规范化了synchronized#p#分页标题#e#和volatile操作。该规范还规定不变的对象如何使用多线程。(基本上讲,只要在执行构造函数时不允许引用“转义”,不变的对象始终是线程安全的。)以前,线程间的交互通常需要使用阻塞的synchronized操作。这些更改支持使用volatile在线程间执行非阻塞协调。因此,在Java5中添加了新的并发集合类来支持非阻塞操作—这与早期仅支持阻塞的线程安全方法相比是一项重大改进。

线程操作的协调难以让人理解。只要从程序的角度让所有内容保持一致,Java编译器和JVM就不会对您代码中的操作重新排序,这使得问题变得更加复杂。例如:如果两个相加操作使用了不同的变量,编译器或JVM可以安装与指定的顺序相反的顺序执行这些操作,只要程序不在两个操作都完成之前使用两个变量的总数。这种重新排序操作的灵活性有助于提高Java性能,但一致性只被允许应用在单个线程中。硬件也有可能带来线程问题。现代系统使用了多种缓存内存级别,一般来讲,不是系统中的所有核心都能同样看到这些缓存。当某个核心修改内存中的一个值时,其他核心可能不会立即看到此更改。

由于这些问题,在一个线程使用另一个线程修改的数据时,您必须显式地控制线程交互方式。Java使用了特殊的操作来提供这种控制,在不同线程看到的数据视图中建立顺序。基本操作是,线程使用synchronized关键字来访问一个对象。当某个线程在一个对象上保持同步时,该线程将会获得此对象所独有的一个锁的独占访问。如果另一个线程已持有该锁,等待获取该锁的线程必须等待,或者被阻塞,直到该锁被释放。当该线程在一个synchronized代码块内恢复执行时,Java会保证该线程可以“看到了”以前持有同一个锁的其他线程写入的所有数据,但只是这些线程通过离开自己的synchronized锁来释放该锁之前写入的数据。这种保证既适用于编译器或JVM所执行的操作的重新排序,也适用于硬件内存缓存。一个synchronized块的内部是您代码中的一个稳定性孤岛,其中的线程可依次安全地执行、交互和共享信息。

在变量上对volatile关键字的使用,为线程间的安全交互提供了一种稍微较弱的形式。synchronized关键字可确保在您获取该锁时可以看到其他线程的存储,而且在您之后,获取该锁的其他线程也会看到您的存储。volatile关键字将这一保证分解为两个不同的部分。如果一个线程向volatile变量写入数据,那么首先将会擦除它在这之前写入的数据。如果某个线程读取该变量,那么该线程不仅会看到写入该变量的值,还会看到写入的线程所写入的其他所有值。所以读取一个#p#分页标题#e#volatile变量会提供与输入一个synchronized块相同的内存保证,而且写入一个volatile变量会提供与离开一个synchronized块相同的内存保证。但二者之间有很大的差别:volatile变量的读取或写入绝不会受阻塞。

抽象Java并发性

同步很有用,而且许多多线程应用程序都是在Java中仅使用基本的synchronized块开发出来的。但协调线程可能很麻烦,尤其是在处理许多线程和许多块的时候。确保线程仅在安全的方式下交互并避免潜在的死锁(两个或更多线程等待对方释放锁之后才能继续执行),这很困难。支持并发性而不直接处理线程和锁的抽象,这为开发人员提供了处理常见用例的更好方法。

java.util.concurrent分层结构包含一些集合变形,它们支持并发访问、针对原子操作的包装器类,以及同步原语。这些类中的许多都是为支持非阻塞访问而设计的,这避免了死锁的问题,而且实现了更高效的线程。这些类使得定义和控制线程之间的交互变得更容易,但他们仍然面临着基本线程模型的一些复杂性。

java.util.concurrent包中的一对抽象,支持采用一种更加分离的方法来处理并发性:Future<T>接口、Executor和ExecutorService接口。这些相关的接口进而成为了对Java并发性支持的许多Scala和Akka扩展的基础,所以更详细地了解这些接口和它们的实现是值得的。

Future<T>是一个T类型的值的持有者,但奇怪的是该值一般在创建Future之后才能使用。正确执行一个同步操作后,才会获得该值。收到Future的线程可调用方法来:

查看该值是否可用

等待该值变为可用#p#分页标题#e#

在该值可用时获取它

如果不再需要该值,则取消该操作

Future的具体实现结构支持处理异步操作的不同方式。

Executor是一种围绕某个执行任务的东西的抽象。这个“东西”最终将是一个线程,但该接口隐藏了该线程处理执行的细节。Executor本身的适用性有限,ExecutorService子接口提供了管理终止的扩展方法,并为任务的结果生成了Future。Executor的所有标准实现还会实现ExecutorService,所以实际上,您可以忽略根接口。

线程是相对重量级的资源,而且与分配并丢弃它们相比,重用它们更有意义。ExecutorService简化了线程间的工作共享,还支持自动重用线程,实现了更轻松的编程和更高的性能。ExecutorService的ThreadPoolExecutor实现管理着一个执行任务的线程池。

应用Java并发性

并发性的实际应用常常涉及到需要与您的主要处理逻辑独立的外部交互的任务(与用户、存储或其他系统的交互)。这类应用很难浓缩为一个简单的示例,所以在演示并发性的时候,人们通常会使用简单的计算密集型任务,比如数学计算或排序。我将使用一个类似的示例。

任务是找到离一个未知的输入最近的已知单词,其中的最近是按照Levenshtein距离来定义的:将输入转换为已知的单词所需的最少的字符增加、删除或更改次数。我使用的代码基于Wikipedia上的Levenshtein距离文章中的一个示例,该示例计算了每个已知单词的Levenshtein距离,并返回最佳匹配值(或者如果多个已知的单词拥有相同的距离,那么返回结果是不确定的)。

清单1给出了计算Levenshtein距离的Java代码。该计算生成一个矩阵,将行和列与两个对比的文本的大小进行匹配,在每个维度上加1。为了提高效率,此实现使用了一对大小与目标文本相同的数组来表示矩阵的连续行,将这些数组包装在每个循环中,因为我只需要上一行的值就可以计算下一行。#p#分页标题#e#

清单1.Java中的Levenshtein距离计算

/**

*CalculateeditdistancefromtargetTexttoknownword.

*

*@paramwordknownword

*@paramv0intarrayoflengthtargetText.length()+1#p#分页标题#e#

*@paramv1intarrayoflengthtargetText.length()+1

*@returndistance

*/

privateinteditDistance(Stringword,int[]v0,int[]v1){

#p#分页标题#e#

//initializev0(priorrowofdistances)aseditdistanceforempty'word'

for(inti=0;i<v0.length;i++){

v0[i]=i;

}

#p#分页标题#e#

//calculateupdatedv0(currentrowdistances)fromthepreviousrowv0

for(inti=0;i<word.length();i++){

//firstelementofv1=delete(i+1)charsfromtargettomatchempty'word'

#p#分页标题#e#v1[0]=i+1;

//useformulatofillintherestoftherow

for(intj=0;j<targetText.length();j++){

int#p#分页标题#e#cost=(word.charAt(i)==targetText.charAt(j))?0:1;

v1[j+1]=minimum(v1[j]+1,v0[j+1]+1,v0[j]+cost);

}

//swapv1(currentrow)andv0(previousrow)fornextiteration#p#分页标题#e#

int[]hold=v0;

v0=v1;

v1=hold;

}

//returnfinalvaluerepresentingbesteditdistance#p#分页标题#e#

returnv0[targetText.length()];

}

如果有大量已知词汇要与未知的输入进行比较,而且您在一个多核系统上运行,那么您可以使用并发性来加速处理:将已知单词的集合分解为多个块,将每个块作为一个独立任务来处理。通过更改每个块中的单词数量,您可以轻松地更改任务分解的粒度,从而了解它们对总体性能的影响。清单2给出了分块计算的Java代码,摘自示例代码中的ThreadPoolDistance类。清单2使用一个标准的ExecutorService,将线程数量设置为可用的处理器数量。

清单2.在Java中通过多个线程来执行分块的距离计算

privatefinalExecutorServicethreadPool;#p#分页标题#e#

privatefinalString[]knownWords;

privatefinalintblockSize;

publicThreadPoolDistance(String[]words,intblock){

#p#分页标题#e#threadPool=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

knownWords=words;

blockSize=block;

}

publicDistancePairbestMatch(Stringtarget){

#p#分页标题#e#//buildalistoftasksformatchingtorangesofknownwords

List<DistanceTask>tasks=newArrayList<DistanceTask>();

intsize=0;

for(intbase=0#p#分页标题#e#;base<knownWords.length;base+=size){

size=Math.min(blockSize,knownWords.length-base);

tasks.add(newDistanceTask(target,base,size));

}

DistancePairbest;

try{

#p#分页标题#e#

//passthelistoftaskstotheexecutor,gettingbacklistoffutures

List<Future<DistancePair>>results=threadPool.invokeAll(tasks);

//findthebestresult,waitingforeachfuturetocomplete

best=DistancePair.WORST_CASE;

#p#分页标题#e#for(Future<DistancePair>future:results){

DistancePairresult=future.get();

best=DistancePair.best(best,result);

}

}catch(InterruptedExceptione){

#p#分页标题#e#thrownewRuntimeException(e);

}catch(ExecutionExceptione){

thrownewRuntimeException(e);

}

returnbest;

#p#分页标题#e#}

/**

*ShortestdistancetaskimplementationusingCallable.

*/

publicclassDistanceTaskimplementsCallable<DistancePair>

#p#分页标题#e#{

privatefinalStringtargetText;

privatefinalintstartOffset;

privatefinalintcompareCount;

#p#分页标题#e#

publicDistanceTask(Stringtarget,intoffset,intcount){

targetText=target;

startOffset=offset;

compareCount=count;

}

#p#分页标题#e#

privateinteditDistance(Stringword,int[]v0,int[]v1){

...

}

/*(non-Javadoc)#p#分页标题#e#

*@seejava.util.concurrent.Callable#call()

*/

@Override

publicDistancePaircall()throwsException{

#p#分页标题#e#//directlycomparedistancesforcomparisonwordsinrange

int[]v0=newint[targetText.length()+1];

int[]v1=newint[targetText.length()+1];

#p#分页标题#e#intbestIndex=-1;

intbestDistance=Integer.MAX_VALUE;

booleansingle=false;

for(inti=0;i<compareCount;i++){

#p#分页标题#e#

intdistance=editDistance(knownWords[i+startOffset],v0,v1);

if(bestDistance>distance){

bestDistance=distance;

bestIndex=i+startOffset;

single=true;

#p#分页标题#e#}elseif(bestDistance==distance){

single=false;

}

}

returnsingle?newDistancePair(bestDistance,knownWords[bestIndex]):

#p#分页标题#e#newDistancePair(bestDistance);

}

}

清单2中的bestMatch()方法构造一个DistanceTask距离列表,然后将该列表传递给ExecutorService。这种对ExecutorService的调用形式将会接受一个Collection<?extendsCallable<T>>类型的参数,该参数表示要执行的任务。该调用返回一个Future<T>列表,用它来表示执行的结果。ExecutorService使用在每个任务上调用call()方法所返回的值,异步填写这些结果。在本例中,T类型为DistancePair—一个表示距离和匹配的单词的简单的值对象,或者在没有找到惟一匹配值时近表示距离。

bestMatch()方法中执行的原始线程依次等待每个Future完成,累积最佳的结果并在完成时返回它。通过多个线程来处理DistanceTask的执行,原始线程只需等待一小部分结果。剩余结果可与原始线程等待的结果并发地完成。

并发性性能

要充分利用系统上可用的处理器数量,必须为ExecutorService配置至少与处理器一样多的线程。您还必须将至少与处理器一样多的任务传递给#p#分页标题#e#ExecutorService来执行。实际上,您或许希望拥有比处理器多得多的任务,以实现最佳的性能。这样,处理器就会繁忙地处理一个接一个的任务,近在最后才空闲下来。但是因为涉及到开销(在创建任务和future的过程中,在任务之间切换线程的过程中,以及最终返回任务的结果时),您必须保持任务足够大,以便开销是按比例减小的。

图1展示了我在使用Oracle的Java7for64-bitLinux®的四核AMD系统上运行测试代码时测量的不同任务数量的性能。每个输入单词依次与12,564个已知单词相比较,每个任务在一定范围的已知单词中找到最佳的匹配值。全部933个拼写错误的输入单词会重复运行,每轮运行之间会暂停片刻供JVM处理,该图中使用了10轮运行后的最佳时间。从图1中可以看出,每秒的输入单词性能在合理的块大小范围内(基本来讲,从256到大于1,024)看起来是合理的,只有在任务变得非常小或非常大时,性能才会极速下降。对于块大小16,384,最后的值近创建了一个任务,所以显示了单线程性能。

图1.ThreadPoolDistance性能

perform1

Fork-Join

Java7引入了ExecutorService的另一种实现:ForkJoinPool类。ForkJoinPool是为高效处理可反复分解为子任务的任务而设计的,它使用RecursiveAction类(在任务未生成结果时)或RecursiveTask<T>类(在任务具有一个T类型的结果时)来处理任务。RecursiveTask<T>提供了一种合并子任务结果的便捷方式,如清单3所示。

清单3.RecursiveTask<DistancePair>示例

#p#分页标题#e#privateForkJoinPoolthreadPool=newForkJoinPool();

privatefinalString[]knownWords;

privatefinalintblockSize;

#p#分页标题#e#

publicForkJoinDistance(String[]words,intblock){

knownWords=words;

blockSize=block;

}

publicDistancePairbestMatch(Stringtarget){#p#分页标题#e#

returnthreadPool.invoke(newDistanceTask(target,0,knownWords.length,knownWords));

}

/**

*ShortestdistancetaskimplementationusingRecursiveTask.

#p#分页标题#e#*/

publicclassDistanceTaskextendsRecursiveTask<DistancePair>

{

privatefinalStringcompareText;

privatefinal#p#分页标题#e#intstartOffset;

privatefinalintcompareCount;

privatefinalString[]matchWords;

public#p#分页标题#e#DistanceTask(Stringfrom,intoffset,intcount,String[]words){

compareText=from;

startOffset=offset;

compareCount=count;

matchWords=words;

}

#p#分页标题#e#

privateinteditDistance(intindex,int[]v0,int[]v1){

...

}

#p#分页标题#e#/*(non-Javadoc)

*@seejava.util.concurrent.RecursiveTask#compute()

*/

@Override

protectedDistancePaircompute(){

if#p#分页标题#e#(compareCount>blockSize){

//splitrangeinhalfandfindbestresultfrombestsineachhalfofrange

inthalf=compareCount/2;

DistanceTaskt1=newDistanceTask(compareText,startOffset,half,matchWords);

#p#分页标题#e#t1.fork();

DistanceTaskt2=newDistanceTask(compareText,startOffset+half,

compareCount-half,matchWords);

DistancePairp2=t2.compute();

returnDistancePair.best(p2,t1.join());

}

#p#分页标题#e#

//directlycomparedistancesforcomparisonwordsinrange

int[]v0=newint[compareText.length()+1];

int[]v1=newint[compareText.length()+1#p#分页标题#e#];

intbestIndex=-1;

intbestDistance=Integer.MAX_VALUE;

booleansingle=false;

for(int#p#分页标题#e#i=0;i<compareCount;i++){

intdistance=editDistance(i+startOffset,v0,v1);

if(bestDistance>distance){

bestDistance=distance;

bestIndex=i+startOffset;

single=#p#分页标题#e#true;

}elseif(bestDistance==distance){

single=false;

}

}

return#p#分页标题#e#single?newDistancePair(bestDistance,knownWords[bestIndex]):

newDistancePair(bestDistance);

}

}

图2显示了清单3中的ForkJoin代码与清单2中的ThreadPool代码的性能对比。ForkJoin代码在所有块大小中稳定得多,仅在您只有单个块(意味着执行是单线程的)时性能会显著下降。标准的ThreadPool代码仅在块大小为256和1,024时会表现出更好的性能。

图2.ThreadPoolDistance与ForkJoinDistance的性能对比

perform2

这些结果表明,如果可调节应用程序中的任务大小来实现最佳的性能,那么使用标准#p#分页标题#e#ThreadPool比ForkJoin更好。但请注意,ThreadPool的“最佳性能点”取决于具体任务、可用处理器数量以及您系统的其他因素。一般而言,ForkJoin以最小的调优需求带来了优秀的性能,所以最好尽可能地使用它。

Scala并发性基础

Scala通过许多方式扩展了Java编程语言和运行时,其中包括添加更多、更轻松的处理并发性的方式。对于初学者而言,Future<T>的Scala版本比Java版本灵活得多。您可以直接从代码块中创建future,可向future附加回调来处理这些future的完成。清单4显示了Scalafuture的一些使用示例。该代码首先定义了futureInt()方法,以便按需提供Future<Int>,然后通过三种不同的方式来使用future。

清单4.ScalaFuture<T>示例代码

importExecutionContext.Implicits.global

#p#分页标题#e#vallastInteger=newAtomicInteger

deffutureInt()=future{

Threadsleep2000

lastIntegerincrementAndGet

}

//usecallbacksforcompletionoffutures#p#分页标题#e#

vala1=futureInt

vala2=futureInt

a1.onSuccess{

casei1=>{

a2.onSuccess{

casei2=>println("Sumofvaluesis"#p#分页标题#e#+(i1+i2))

}

}

}

Threadsleep3000

//useforconstructtoextractvalueswhenfuturescomplete

#p#分页标题#e#valb1=futureInt

valb2=futureInt

for(i1<-b1;i2<-b2)yieldprintln("Sumofvaluesis"+(i1+i2))

Threadsleep3000

//waitdirectlyforcompletionoffutures

#p#分页标题#e#valc1=futureInt

valc2=futureInt

println("Sumofvaluesis"+(Await.result(c1,Duration.Inf)+

Await.result(c2,Duration.Inf)))

清单4中的第一个示例将回调闭包附加到一对future上,以便在两个future都完成时,将两个结果值的和打印到控制台上。回调是按照创建它们的顺序直接嵌套在future上,但是,即使更改顺序,它们也同样有效。如果在您附加回调时future已完成,该回调仍会运行,但无法保证它会立即运行。原始执行线程会在Threadsleep3000行上暂停,以便在进入下一个示例之前完成future。

第二个示例演示了使用Scalaforcomprehension从future中异步提取值,然后直接在表达式中使用它们。forcomprehension是一种Scala结构,可用于简洁地表达复杂的操作组合(map、filter、flatMap和foreach)。它一般与各种形式的集合结合使用,但Scalafuture实现了相同的单值方法来访问集合值。所以可以使用future作为一种特殊的集合,一种包含最多一个值(可能甚至在未来某个时刻之前之后才包含该值)的集合。在这种情况下,for语句要求获取future的结果,并在表达式中使用这些结果值。在幕后,这种技术会生成与第一个示例完全相同的代码,但以线性代码的形式编写它会得到更容易理解的更简单的表达式。和第一个示例一样,原始执行线程会暂停,以便在进入下一个示例之前完成future。#p#分页标题#e#

第三个示例使用阻塞等待来获取future的结果。这与Javafuture的工作原理相同,但在Scala中,一个获取最大等待时间参数的特殊Await.result()方法调用会让阻塞等待变得更为明显。

清单4中的代码没有显式地将future传递给ExecutorService或等效的对象,所以如果没有使用过Scala,那么您可能想知道future内部的代码是如何执行的。答案取决于清单4中最上面一行:importExecutionContext.Implicits.global。ScalaAPI常常为代码块中频繁重用的参数使用implicit值。future{}结构要求ExecutionContext以隐式参数的形式提供。这个ExecutionContext是JavaExecutorService的一个Scala包装器,以相同方式用于使用一个或多个托管线程来执行任务。

除了future的这些基本操作之外,Scala还提供了一种方式将任何集合转换为使用并行编程的集合。将集合转换为并行格式后,您在集合上执行的任何标准的Scala集合操作(比如map、filter或fold)都会自动地尽可能并行完成。(本文稍后会在清单7中提供一个相关示例,该示例使用Scala查找一个单词的最佳匹配值。)

错误处理

Java和Scala中的future都必须解决错误处理的问题。在Java中,截至Java7,future可抛出一个ExecutionException作为返回结果的替代方案。应用程序可针对具体的失败类型而定义自己的ExecutionException子类,或者可连锁异常来传递详细信息,但这限制了灵活性。

Scalafuture提供了更灵活的错误处理。您可以通过两种方式完成Scalafuture:成功时提供一个结果值(假设要求一个结果值),或者在失败时提供一个关联的Throwable。您也可以采用多种方式处理future的完成。在清单4中,onSuccess方法用于附加回调来处理future的成功完成。您还可以使用onComplete来处理任何形式的完成(它将结果或throwable包装在一个#p#分页标题#e#Try中来适应两种情况),或者使用onFailure来专门处理错误结果。Scalafuture的这种灵活性扩展到了您可以使用future执行的所有操作,所以您可以将错误处理直接集成到代码中。

这个ScalaFuture<T>还有一个紧密相关的Promise<T>类。future是一个结果的持有者,该结果在某个时刻可能可用(或不可用—无法内在地确保一个future将完成)。future完成后,结果是固定的,不会发生改变。promise是这个相同契约的另一端:结果的一个一次性、可分配的持有者,具有结果值或throwable的形式。可从promise获取future,在promise上设置了结果后,就可以在该future上设置此结果。

应用Scala并发性

现在您已熟悉一些基本的Scala并发性概念,是时候来了解一下解决Levenshtein距离问题的代码了。清单5显示了Levenshtein距离计算的一个比较符合语言习惯的Scala实现,该代码基本上与清单1中的Java代码类似,但采用了函数风格。

清单5.Scala中的Levenshtein距离计算

vallimit=targetText.length

/**CalculateeditdistancefromtargetTexttoknownword.

#p#分页标题#e#*

*@paramwordknownword

*@paramv0intarrayoflengthtargetText.length+1

*@paramv1intarrayoflengthtargetText.length+1

*@returndistance

*/

#p#分页标题#e#defeditDistance(word:String,v0:Array[Int],v1:Array[Int])={

vallength=word.length

@tailrec

defdistanceByRow(rnum:Int,r0:Array[Int],r1:Array[Int]):Int={

if(rnum>=length)r0(limit)#p#分页标题#e#

else{

//firstelementofr1=delete(i+1)charsfromtargettomatchempty'word'

r1(0)=rnum+1

#p#分页标题#e#//useformulatofillintherestoftherow

for(j<-0untillimit){

valcost=if(word(rnum)==targetText(j))0else1

r1(j+1)=min(r1(j)+1#p#分页标题#e#,r0(j+1)+1,r0(j)+cost);

}

//recursewitharraysswappedfornextrow

distanceByRow(rnum+1,r1,r0)

}

#p#分页标题#e#}

//initializev0(priorrowofdistances)aseditdistanceforempty'word'

for(i<-0tolimit)v0(i)=i

//recursivelyprocessrowsmatchingcharactersinwordbeingcomparedtofindbest#p#分页标题#e#

distanceByRow(0,v0,v1)

}

清单5中的代码对每个行值计算使用了尾部递归distanceByRow()方法。此方法首先检查计算了多少行,如果该数字与检查的单词中的字符数匹配,则返回结果距离。否则会计算新的行值,然后递归地调用自身来计算下一行(将两个行数组包装在该进程中,以便正确地传递新的最新的行值)。Scala将尾部递归方法转换为与Javawhile循环等效的代码,所以保留了与Java代码的相似性。

但是,此代码与Java代码之间有一个重大区别。清单5中的forcomprehension使用了闭包。闭包并不总是得到了当前JVM的高效处理(参阅Whyisusingfor/foreachonaRangeslow?,了解有关的详细信息),所以它们在该计算的最里层循环上增加了大量开销。如上所述,清单5中的代码的运行速度没有Java版本那么快。清单6重写了代码,将forcomprehension替换为添加的尾部递归方法。这个版本要详细得多,但执行效率与Java版本相当。

清单6.为提升性能而重新构造的计算代码

#p#分页标题#e#vallimit=targetText.length

/**CalculateeditdistancefromtargetTexttoknownword.

*

*@paramwordknownword

*@paramv0intarrayoflengthtargetText.length+1

*@paramv1intarrayoflengthtargetText.length+1#p#分页标题#e#

*@returndistance

*/

defeditDistance(word:String,v0:Array[Int],v1:Array[Int])={

vallength=word.length

#p#分页标题#e#@tailrec

defdistanceByRow(row:Int,r0:Array[Int],r1:Array[Int]):Int={

if(row>=length)r0(limit)

else{

//firstelementofv1=delete(i+1)charsfromtargettomatchempty'word'#p#分页标题#e#

r1(0)=row+1

//useformularecursivelytofillintherestoftherow

@tailrec

defdistanceByColumn(col:Int):Unit={

#p#分页标题#e#if(col<limit){

valcost=if(word(row)==targetText(col))0else1

r1(col+1)=min(r1(col)+1,r0(col+1)+1,r0(col)+cost)

#p#分页标题#e#distanceByColumn(col+1)

}

}

distanceByColumn(0)

//recursewitharraysswappedfornextrow

#p#分页标题#e#distanceByRow(row+1,r1,r0)

}

}

//initializev0(priorrowofdistances)aseditdistanceforempty'word'

@tailrec

#p#分页标题#e#definitArray(index:Int):Unit={

if(index<=limit){

v0(index)=index

initArray(index+1)

}

}

initArray(0#p#分页标题#e#)

//recursivelyprocessrowsmatchingcharactersinwordbeingcomparedtofindbest

distanceByRow(0,v0,v1)

}

清单7给出的Scala代码执行了与清单2中的Java代码相同的阻塞的距离计算。bestMatch()方法找到由Matcher类实例处理的特定单词块中与目标文本最匹配的单词,使用尾部递归best()方法来扫描单词。*Distance类创建多个Matcher实例,每个对应一个单词块,然后协调匹配结果的执行和组合。

清单7.Scala中使用多个线程的一次阻塞距离计算

#p#分页标题#e#

classMatcher(words:Array[String]){

defbestMatch(targetText:String)={

vallimit=targetText.length

valv0=new#p#分页标题#e#Array[Int](limit+1)

valv1=newArray[Int](limit+1)

defeditDistance(word:String,v0:Array[Int],v1:Array[Int])={

...

}

#p#分页标题#e#

@tailrec

/**Scanallknownwordsinrangetofindbestmatch.

*

*@paramindexnextwordindex

*@parambestDistminimumdistancefoundsofar

#p#分页标题#e#*@parambestMatchuniquewordatminimumdistance,orNoneifnotunique

*@returnbestmatch

*/

defbest(index:Int,bestDist:Int,bestMatch:Option[String]):DistancePair=

if(index<words.length){

valnewDist=editDistance(words(index),v0,v1)#p#分页标题#e#

valnext=index+1

if(newDist<bestDist)best(next,newDist,Some(words(index)))

elseif(newDist==bestDist)best(next,bestDist,None)

elsebest(next,bestDist,bestMatch)

#p#分页标题#e#}elseDistancePair(bestDist,bestMatch)

best(0,Int.MaxValue,None)

}

}

classParallelCollectionDistance(words:Array[String],size:Int)#p#分页标题#e#extendsTimingTestBase{

valmatchers=words.grouped(size).map(l=>newMatcher(l)).toList

defshutdown={}

defblockSize=size

#p#分页标题#e#

/**Findbestresultacrossallmatchers,usingparallelcollection.*/

defbestMatch(target:String)={

matchers.par.map(m=>m.bestMatch(target)).

foldLeft(DistancePair.worstMatch)((a,m)=>DistancePair.best(a,m))

}

}

#p#分页标题#e#

classDirectBlockingDistance(words:Array[String],size:Int)extendsTimingTestBase{

valmatchers=words.grouped(size).map(l=>newMatcher(l)).toList

defshutdown={}

#p#分页标题#e#

defblockSize=size

/**Findbestresultacrossallmatchers,usingdirectblockingwaits.*/

defbestMatch(target:String)={

importExecutionContext.Implicits.global

valfutures=matchers.map(m=>future{m.bestMatch(target)})#p#分页标题#e#

futures.foldLeft(DistancePair.worstMatch)((a,v)=>

DistancePair.best(a,Await.result(v,Duration.Inf)))

}

}

清单7中的两个*Distance类显示了协调Matcher结果的执行和组合的不同方式。ParallelCollectionDistance使用前面提到的Scala的并行集合feature来隐藏并行计算的细节,只需一个简单的foldLeft就可以组合结果。

DirectBlockingDistance更加明确,它创建了一组future,然后在该列表上为每个结果使用一个foldLeft和嵌套的阻塞等待。

性能再分析

清单7中的两个*Distance实现都是处理Matcher结果的合理方法。(它们不仅合理,而且非常高效。示例代码#p#分页标题#e#包含我在试验中尝试的其他两种实现,但未包含在本文中。)在这种情况下,性能是一个主要问题,所以图3显示了这些实现相对于JavaForkJoin代码的性能。

图3.ForkJoinDistance与Scala替代方案的性能对比

perform3

图3显示,JavaForkJoin代码的性能比每种Scala实现都更好,但DirectBlockingDistance在1,024的块大小下提供了更好的性能。两种Scala实现在大部分块大小下,都提供了比清单1中的ThreadPool代码更好的性能。

这些性能结果仅是演示结果,不具权威性。如果您在自己的系统上运行计时测试,可能会看到不同的性能,尤其在使用不同数量的核心的时候。如果希望为距离任务获得最佳的性能,那么可以实现一些优化:可以按照长度对已知单词进行排序,首先与长度和输入相同的单词进行比较(因为编辑距离总是不低于与单词长度之差)。或者我可以在距离计算超出之前的最佳值时,提前退出计算。但作为一个相对简单的算法,此试验公平地展示了两种并发操作是如何提高性能的,以及不同的工作共享方法的影响。

在性能方面,清单7中的Scale控制代码与清单2和清单3中的Java代码的对比结果很有趣。Scala代码短得多,而且(假设您熟悉Scala!)比Java代码更清晰。Scala和Java可很好的相互操作,您可以在本文的完整示例代码中看到:Scala代码对Scala和Java代码都运行了计时测试,Java代码进而直接处理Scala代码的各部分。得益于这种轻松的互操作性,您可以将Scala引入现有的Java代码库中,无需进行通盘修改。最初使用Scala为Java代码实现高水平控制常常很有用,这样您就可以充分利用Scala强大的表达特性,同时没有闭包或转换的任何重大性能影响。

清单7中的ParallelCollectionDistanceScala代码的简单性非常具有吸引力。使用此方法,您可以从代码中完全抽象出并发性,从而编写类似单线程应用程序的代码,同时仍然获得多个处理器的优势。幸运的是,对于喜欢此方法的简单性但又不愿意或无法执行Scala开发的人而言,Java8带来了一种执行直接的Java编程的类似特性。#p#分页标题#e#

结束语

现在您已经了解了Java和Scala并发性操作的基础知识,本系列下一篇文章将介绍Java8如何改进对Java的并发性支持(以及从长远来讲,可能对Scala的并发性支持)。Java8的许多改动您看起来可能都很熟悉(Scala并发性特性中使用的许多相同的概念都包含在Java8中),所以您很快就能够在普通的Java代码中使用一些Scala技术。请阅读下一期文章,了解应该如何做。

相关推荐