新的developerWorks Premium会员计划可通过Safari图书在线获得对强大的开发工具和资源的无障碍访问权,其中包括500个顶级技术标题(数十个专门针对Java开发人员),主要开发人员活动的超低折扣,最近O'Reilly的视频重播会议等。 立即注册 。
在任何并发应用程序中,异步事件处理都是至关重要的。 事件的来源可以是单独的计算任务,I / O操作或与外部系统的交互。 无论源是什么,应用程序代码都必须跟踪事件并协调响应事件而采取的措施。 Java应用程序可以使用两种基本的异步事件处理方法:应用程序可以让协调线程等待事件然后采取行动,或者事件可以执行行动(通常采用执行应用程序提供的代码的形式) )直接在事件完成时显示。 让线程等待事件称为阻塞方法。 让事件在没有线程显式等待的情况下执行操作被称为非阻塞方法。
旧的java.util.concurrent.Future类提供了一种简单的方法来处理事件的预期完成,但是只能通过轮询或等待完成来进行。 Java 8 java.util.concurrent.CompletableFuture添加的java.util.concurrent.CompletableFuture类通过一系列用于组合或处理事件的方法扩展了此功能。 (有关CompletableFuture的介绍,请阅读本系列的上一篇文章“ Java 8并发基础 ”。)特别是, CompletableFuture为您提供了在事件完成时执行应用程序代码的标准技术,包括各种组合任务的方式(由表示)。期货)。 这种组合技术使编写非阻塞代码来处理事件变得容易(或至少比过去容易)。 在本文中,您将看到如何使用CompletableFuture进行阻塞和非阻塞事件处理。 您将获得一些提示,说明为什么非阻塞方法值得付出额外的努力。 (从作者的GitHub存储库中获取完整的示例代码 。)
在计算中,根据上下文,术语“ 阻塞”和“ 非阻塞”通常以略有不同的方式使用。 例如,用于共享数据结构的非阻塞算法不需要线程等待访问数据结构。 在非阻塞I / O中,应用程序线程可以启动I / O操作,然后在异步进行操作时关闭并执行其他操作。 在本文中, 非阻塞是指事件执行某个动作的完成,否则该动作需要一个等待线程。 这些用法之间的共同概念是阻塞操作需要线程等待某些东西,而非阻塞操作则不需要。
等待完成很简单:让线程等待事件,当线程恢复时,您知道事件已完成。 如果您的线程在此期间还有其他事情要做,它可以在等待之前关闭并执行它们。 线程甚至可以使用轮询方法来中断其他活动,以检查事件是否已完成。 但是基本原理是相同的:在需要事件结果的时候,您将线程停放,以便它等待事件完成。
现在,多核系统无处不在,并发编程必须比以往任何时候都得到更广泛的应用。 但是并发可能难以正确实现,因此您需要新的工具来帮助您使用它。 许多基于JVM的语言都在开发这种类型的工具,并且Scala在这一领域特别活跃。 本系列文章使您了解了一些用于Java和Scala语言的并发编程的较新方法。
只要您有一个等待事件完成的主线程,阻塞就很容易做到并且相对简单。 当您使用多个正在等待彼此阻塞的线程时,您会遇到诸如以下的问题:
死锁:两个或多个线程,每个线程控制其他线程需要处理的资源。 饥饿:某些线程可能无法前进,因为其他线程占用了共享资源。 活锁:线程试图相互适应,但最终没有进展。非阻塞方法为创造力留出了更多的空间。 回调是非阻塞事件处理的一种常用技术。 回调是灵活性的缩影,因为事件发生时您可以执行所需的任意代码。 缺点是,当您使用回调处理许多事件时,您的代码可能会变得凌乱。 而且,由于控制流与应用程序中代码的顺序不匹配,因此回调尤其容易引起调试混乱。
Java 8 CompletableFuture支持处理事件的阻塞和非阻塞方法(包括常规回调)。 CompletableFuture还提供了组合和组合事件的方法,以使用干净,简单,易读的代码获得回调的灵活性。 在本节中,您将看到处理由CompletableFuture表示的事件的阻塞和非阻塞方法的示例。
应用程序通常必须在特定操作过程中执行多个处理步骤。 例如,在将响应返回给用户之前,Web应用程序可能需要:
在数据库中查找用户信息 使用查询到的信息进行Web服务调用,或者使用其他数据库查询 根据上一步的结果执行数据库更新。图1说明了这种类型的结构。
图1将处理分为四个单独的任务,这些任务由代表顺序依赖性的箭头连接。 任务1可以直接执行,任务2和任务3都在任务1完成后执行,任务4在任务2和任务3都完成之后执行。 这是本文中用于说明异步事件处理的任务结构。 实际的应用程序,尤其是具有许多活动部件的服务器应用程序,可能要复杂得多,但是这个简单的示例可以说明所涉及的原理。
在实际系统中,异步事件的来源通常是并行计算或某种形式的I / O操作。 但是,使用简单的时间延迟对这种类型的系统进行建模比较容易,这就是我在本文中采用的方法。 清单1显示了用于生成事件的基本定时事件代码,形式为CompletableFuture 。
清单1的 TimerTask被实现为具有单个run()方法的匿名内部类。 您可能会认为这是使用lambda代替内部类的好地方。 但是,您只能将lambda用作接口的实例,并且TimerTask被定义为抽象类。 除非将来对lambda功能的扩展增加对抽象类的支持(可能,但由于设计问题不太可能),或者为诸如TimerTask类的情况定义了并行接口,否则您必须继续对单个方法实现使用Java内部类。
清单1的代码使用java.util.Timer调度java.util.TimerTask以在延迟后执行。 每个TimerTask在运行时都会完成一个关联的TimerTask 。 delayedSuccess()计划任务在运行时成功完成CompletableFuture<T> ,并将未来返回给调用方。 delayedFailure()计划任务在完成时完成CompletableFuture<T> ,并在其运行时将其异常并将将来返回给调用方。
清单2显示了如何使用清单1代码以CompletableFuture<Integer>的形式创建事件,以匹配图1中的四个任务。 (此代码来自示例代码中的EventComposition类。)
每个在四个任务方法清单2种 ,用于当任务将完成用途特定延迟值:1秒为task1 ,2秒task2 ,3秒task3 ,并回落到1秒task4 。 每一个都采用一个输入值,并将该输入加上任务号作为将来的(最终)结果值。 这些方法都使用了未来的成功形式。 您稍后将看到使用故障表单的示例。
这些任务的目的是按照图1所示的顺序运行它们,并向每个任务传递前一个任务返回的结果值(如果是task4 ,则将两个前一个任务结果的总和传递给该任务)。 如果两个中间任务同时执行,则总执行时间应约为5秒(1秒+最大值(2秒,3秒)+1秒)。 如果1是输入到task1 ,则结果为2。如果该结果被传递到task2和task3 ,结果是图4和5,而如果这两个结果(9)的总和作为输入传递给task4 ,所述最终结果是13。
现在已经准备好阶段,现在该采取一些行动了。 协调这四个任务执行的最简单方法是使用阻塞等待:主线程等待每个任务完成。 清单3(同样,来自示例代码中的EventComposition类)显示了这种方法。
清单3使用CompletableFuture的join()方法进行阻塞等待。 join()等待完成,如果完成成功则返回结果值,如果完成失败或被取消则抛出未检查的异常。 该代码为先等待task1的结果,那么就启动task2和task3反过来等待两个返回的期货之前,最后等待task4结果。 runBlocking()返回一个CompletableFuture以与我接下来显示的非阻塞形式一致,但是在这种情况下,将来实际上将在方法返回之前完成。
清单4(同样来自示例代码中的EventComposition类)显示了如何将期货链接在一起,以正确的顺序和正确的依赖关系执行任务,而没有阻塞。
清单4代码实质上构造了一个执行计划,该执行计划指定如何执行不同的任务以及它们之间的关系。 这段代码简洁明了,但是如果您不熟悉CompletableFuture方法,可能很难理解。 清单5显示了通过将task2和task3部分分离为新方法runTask2and3 ,将相同的代码重构为更易于理解的形式。
在清单5中 , runTask2and3()方法表示任务流的中间部分,其中task2和task3同时执行,然后将它们的结果值组合在一起。 通过在thenCombine()上使用thenCombine()方法对该序列进行编码,该方法将另一个future作为其第一个参数,并将一个二进制函数实例(输入类型与将来的结果类型匹配)作为其第二个参数。 thenCombine()返回第三个thenCombine()代表应用于原始两个thenCombine()的结果的函数的值。 在这种情况下,两个期货是task2和task3 ,其功能是对结果值求和。
所述runNonblockingAlt()方法使用一对调用所述的thenCompose()上的未来方法。 thenCompose()的参数是一个函数实例,该函数实例将原始thenCompose()的值类型作为输入,并返回另一个thenCompose()作为输出。 thenCompose()的结果是第三个未来,其结果类型与函数相同。 第三个Future用作将来的占位符,在原始将来完成后,该函数最终将由该函数返回。
到呼叫task1.thenCompose()用于施加的结果返回一个未来runTask2and3()函数的结果task1 ,其保存为comp123 。 对comp123.thenCompose()的调用返回将task4()函数应用于第一个thenCompose()的结果的结果的thenCompose() ,后者是执行所有任务的总体结果。
该示例代码包括main()方法,以依次运行每个版本的事件代码,并显示完成时间(约5秒)和结果(13)是正确的。 清单6显示了从控制台运行此main()方法的结果。
到目前为止,您已经看到了以总是成功完成的期货形式协调事件的代码。 在实际的应用程序中,您不能依赖始终坚持这条快乐的路。 处理任务会发生问题,用Java术语来说,这些问题通常由Throwable表示。
更改清单2的任务定义以使用delayedFailure()代替delayedSuccess()方法很容易,如task4所示:
private static CompletableFuture<Integer> task4(int input) { return TimedEventSupport.delayedFailure(1, new IllegalArgumentException("This won't work!")); }如果运行清单3中的代码,但仅修改task4以完成异常处理,那么您会获得对task4的join()调用抛出的预期IllegalArgumentException 。 如果问题未在runBlocking()方法中捕获,则异常将沿调用链传递,如果未捕获,则最终终止执行线程。 幸运的是,修改代码很容易,这样,如果任何一项任务异常完成,该异常就会传递给调用方,以便在返回的将来进行处理。 清单7显示了此更改。
清单7是不言自明的。 原始代码包装在try / catch ,并且catch在返回的future结束时将异常传递回去。 这种方法增加了一点复杂性,但对于任何Java开发人员来说仍然应该很容易理解。
清单4中的非阻塞代码甚至不需要添加try / catch 。 CompletableFuture和合并操作会自动为您传递异常,因此依赖期货也将带有异常。
您已经看到了处理CompletableFuture表示的事件的阻塞和非阻塞方法的示例。 至少对于本文中建模的基本任务流而言,这两种方法都非常简单。 对于更复杂的任务流,代码也将变得更加复杂。
在阻塞的情况下,如果您只需要等待事件完成,那么增加的复杂性就不成问题了。 如果开始在线程之间进行其他类型的同步,则会遇到线程匮乏甚至死锁的问题。
在非阻塞情况下,由事件完成触发的代码执行很难调试。 当发生多种类型的事件并且事件之间存在许多交互时,将很难跟踪哪个事件触发了哪个执行。 无论您使用的是常规回调还是CompletableFuture组合和组合操作,这种情况基本上都是回调地狱的情况。
总体而言,简单性优势通常在于使用阻塞代码。 那么,为什么有人要使用非阻塞方法呢? 本节为您提供了两个重要的原因。
当一个线程阻塞时,先前执行该线程的处理器内核将移至另一个线程。 必须将先前执行的线程的执行状态保存到内存中,并加载新线程的状态。 将内核从运行一个线程更改为运行另一个线程的这种操作称为上下文切换 。
除了直接的上下文切换性能成本之外,新线程通常使用与先前线程不同的数据。 内存访问比处理器时钟慢得多,因此现代系统在处理器内核和主内存之间使用多层高速缓存。 尽管比主内存快得多,但缓存的容量也要小得多(通常,缓存速度越快,容量就越小),因此任何时候缓存中都只能出现总内存的一小部分。 当发生线程切换并且内核开始执行新线程时,新线程所需的内存数据可能不会出现在缓存中,因此内核必须等待从主内存加载数据。
上下文切换和内存访问延迟的组合直接导致了显着的性能成本。 图2显示了使用Oracle的Java 8(用于64位Linux)在四核AMD系统上进行线程切换的开销。 此测试使用从1到4,096乘2的幂的可变数量的线程,以及每个线程从0到64KB的可变大小的内存块。 使用CompletableFuture来依次执行线程以触发执行。 每次线程执行时,它首先使用每个线程的数据运行一个简单的计算,以显示将数据加载到缓存中的开销,然后增加一个共享的静态变量。 首先,创建一个新的CompletableFuture实例以触发其下一次执行,然后通过完成该线程正在等待的CompletableFuture来依次启动下一个线程。 最后,如果需要再次执行,线程将等待新创建的CompletableFuture完成。
您可以在图2图表中看到线程数和每个线程的内存的影响。 线程数最多影响四个线程,只要每个线程的数据很小,两个线程的工作速度几乎与单个线程一样快。 随着线程数增加到四个以上,对性能的影响相对较小。 每个线程的内存量越大,两层缓存就越快被填充到溢出状态,从而导致交换成本的增加。
图2中所示的时序值是针对我有些过时的主系统的。 您系统上的相应时间将有所不同,并且可能会更短。 但是,曲线的形状应大致相同。
图2显示了线程切换的开销(以微秒为单位),因此即使线程切换的成本在成千上万个处理器时钟中,绝对数也不是很大。 在12.5微秒的切换时间(对应于16KB的数据(图中的黄线),具有适中的线程数)下,系统每秒可执行80,000个线程切换。 这比任何合理编写的单用户应用程序甚至许多服务器应用程序中看到的线程切换要多得多。 但是对于每秒处理数千个事件的高性能服务器应用程序,阻塞的开销可能成为性能的主要因素。 对于此类应用程序,重要的是通过尽可能使用非阻塞代码来最大程度地减少线程切换。
同样重要的是要意识到这些时序图是在最佳情况下使用的。 运行线程切换程序时,足够的CPU活动正在进行,以保持所有内核以全时钟速度运行(至少在我的系统上)。 在实际应用中,处理负载可能会更加突发。 在低活动量时,现代处理器会将某些内核转换为睡眠状态,以减少总体功耗和热量产生。 掉电过程的唯一问题是,当需求增加时,需要一些时间才能将内核从睡眠状态唤醒。 从深度睡眠状态到完全操作所需的时间可能是毫秒级,而不是此线程切换时序示例中看到的微秒级。
对于许多应用程序,不阻塞特定线程的另一个原因是这些线程用于处理需要及时响应的事件。 经典示例是UI线程。 如果您在阻止等待异步事件完成的UI线程中执行代码,则会延迟用户输入事件的处理。 没有人喜欢等待应用程序响应其键入,单击或触摸的内容,因此UI线程中的阻塞往往会很快反映在用户的错误报告中。
UI线程概念是更通用的原则。 大多数类型的应用程序,即使是非GUI应用程序,也必须响应事件,在许多情况下,保持响应时间很短是一个关键问题。 对于这些类型的应用程序,阻塞等待不是可接受的选择。
React式编程是来代表编程,使高度敏感和可扩展的应用的样式的名称。 React式编程的核心原理是应用程序应该能够:
对事件做出React:应用程序应该是事件驱动的,并在每个级别上通过异步通信链接松散耦合的组件。 对负载做出React:该应用程序应具有可伸缩性,以便可以轻松对其进行升级以应对不断增长的需求。 对故障做出React:应用程序应该具有弹性,故障可以集中在影响范围内并且可以快速纠正。 对用户做出React:即使在负载和出现故障的情况下,应用程序也应该对用户做出响应。使用阻塞方法进行事件处理的应用程序无法满足这些原则。 线程是一种有限的资源,因此将其绑定在阻塞等待中限制了可伸缩性,同时还增加了延迟(应用程序响应时间),因为阻塞的线程无法立即响应事件。 非阻塞应用程序可以更快速地响应事件,降低延迟,同时还减少线程切换开销并提高吞吐量。
React式编程比非阻塞代码要多得多。 响应式编程涉及将重点放在应用程序中的数据流上,并将这些数据流实现为异步交互,而不会使接收方过载或备份发送方。 对数据流的关注有助于避免传统并发编程的许多复杂性。
在本文中,您已经了解了如何使用Java 8 CompletableFuture类将事件组合和组合成一种执行计划,该计划可以轻松,简洁地用代码表示。 这种类型的非阻塞可组合性对于编写可很好地响应工作负载并正常处理故障的React性应用程序至关重要。
下一篇文章将介绍到Scala方面,并探讨一种非常不同但有趣的异步计算处理方法。 async宏使您可以编写看起来像执行顺序阻塞操作的代码,但在幕后将代码转换为完全无阻塞的结构。 我将举一些例子说明这种方法是如何有用的,并介绍一下async的实现方式。
翻译自: https://www.ibm.com/developerworks/java/library/j-jvmc3/index.html