scala 异步调用

    技术2024-03-24  9

    学到更多。 开发更多。 连接更多。

    新的developerWorks Premium会员计划可通过Safari图书在线获得对强大的开发工具和资源的无障碍访问权,其中包括500个顶级技术标题(数十个专门针对Java开发人员),主要开发人员活动的超低折扣,最近O'Reilly的视频重播会议等。 立即注册 。

    异步事件处理在并发应用程序中至关重要。 无论事件的来源是什么(单独的计算任务,I / O操作或与外部系统的交互),您的代码都必须跟踪事件并协调为响应事件而采取的措施。 应用程序可以采用两种基本方法之一来进行异步事件处理:

    阻塞 :协调线程等待事件。 非阻塞 :事件生成某种形式的通知给应用程序,而没有线程显式等待它。

    在“ JVM并发性 :阻止还是不阻止? ”中,您将了解使用Java™8中基于CompletableFuture类的阻塞和非阻塞技术来处理异步事件的方法。 本教程显示了Scala中用于异步事件处理的一些选项,从一个简单的阻止版本开始,然后介绍一些非阻止选项。 最后,您将看到async / await构造如何将看似简单的阻塞代码转换为非阻塞执行。 (从作者的GitHub存储库中获取完整的示例代码 。)

    关于本系列

    现在,多核系统无处不在,并发编程必须比以往任何时候都得到更广泛的应用。 但是并发可能难以正确实现,因此您需要新的工具来帮助您使用它。 许多基于JVM的语言都在开发这种类型的工具,并且Scala在这一领域特别活跃。 本系列文章使您了解了一些用于Java和Scala语言的并发编程的较新方法。

    撰写活动

    scala.concurrent.Promise和scala.concurrent.Future类为Scala开发人员提供了与Java 8开发人员具有的CompletableFuture类似的选择范围。 特别是, Future提供了阻塞和非阻塞方式来处理事件完成。 尽管在此级别上具有相似性,但是,用于两种类型的期货的技术却有所不同。

    在本节中,您将看到使用Future表示的事件的阻塞和非阻塞方法的示例。 本教程使用与上一个相同的并发任务设置。 在深入研究代码之前,我将快速回顾一下该设置。

    任务和顺序

    应用程序通常必须在特定操作过程中执行多个处理步骤。 例如,在将响应返回给用户之前,Web应用程序可能需要:

    在数据库中查找用户信息。 使用查询到的信息进行Web服务调用,或者使用其他数据库查询。 根据前两个操作的结果执行数据库更新。

    图1说明了这种类型的结构。

    图1.应用程序任务流

    图1将处理分为四个单独的任务,这些任务由代表顺序依赖性的箭头连接。 任务1可以直接执行,任务2和任务3都在任务1完成后执行,任务4在任务2和任务3都完成之后执行。

    建模异步事件

    在实际系统中,异步事件的来源通常是并行计算或一种I / O操作。 但是,使用简单的时间延迟为这种类型的系统建模更容易,而这就是我在这里采用的方法。 清单1以完成的Future形式显示了用于生成事件的基本定时事件代码。

    清单1.定时事件代码
    import java.util.Timer import java.util.TimerTask import scala.concurrent._ object TimedEvent { val timer = new Timer /** Return a Future which completes successfully with the supplied value after secs seconds. */ def delayedSuccess[T](secs: Int, value: T): Future[T] = { val result = Promise[T] timer.schedule(new TimerTask() { def run() = { result.success(value) } }, secs * 1000) result.future } /** Return a Future which completes failing with an IllegalArgumentException after secs * seconds. */ def delayedFailure(secs: Int, msg: String): Future[Int] = { val result = Promise[Int] timer.schedule(new TimerTask() { def run() = { result.failure(new IllegalArgumentException(msg)) } }, secs * 1000) result.future }

    与上一期的Java代码一样, 清单1 Scala代码使用java.util.Timer安排java.util.TimerTask以在延迟后执行。 每个TimerTask在运行时都会完成一个关联的TimerTask 。 delayedSuccess函数调度任务以在其运行时成功完成Scala Future[T] ,并将未来返回给调用方。 delayedSuccess函数返回相同的future类型,但使用的任务将以IllegalArgumentException故障完成future。

    清单2显示了如何使用Listing 1代码以Future[Int]的形式创建事件,以匹配图1中的四个任务。 (此代码来自示例代码中的AsyncHappy类。)

    清单2.示例任务的事件
    // task definitions def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1) def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2) def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3) def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4)

    每个在四个任务方法清单2种 ,用于当任务将完成用途特定延迟值:1秒为task1 ,2秒task2 ,3秒task3 ,并回落到1秒task4 。 每一个都采用一个输入值,并将该输入加上任务号作为将来的(最终)结果值。 这些方法都使用了未来的成功形式。 稍后,您将看到使用故障表单的示例。

    这些任务的目的是按照图1所示的顺序运行它们,并向每个任务传递前一个任务返回的结果值(如果是task4 ,则将两个前一个任务结果的总和传递给该任务)。 如果两个中间任务同时执行,则总执行时间应约为5秒(1秒+最大值(2秒,3秒)+1秒)。 如果1是输入到task1 ,则结果为2。如果该结果被传递到task2和task3 ,结果是图4和5,而如果这两个结果(9)的总和作为输入传递给task4 ,所述最终结果是13。

    阻塞等待

    现在已经准备好阶段,现在该看看Scala如何处理事件完成了。 就像上一教程中的Java代码一样,协调四个任务的执行的最简单方法是使用阻塞等待:主线程依次等待每个任务完成。 清单3(同样,来自示例代码中的AsyncHappy类)显示了这种方法。

    清单3.阻塞等待任务
    def runBlocking() = { val v1 = Await.result(task1(1), Duration.Inf) val future2 = task2(v1) val future3 = task3(v1) val v2 = Await.result(future2, Duration.Inf) val v3 = Await.result(future3, Duration.Inf) val v4 = Await.result(task4(v2 + v3), Duration.Inf) val result = Promise[Int] result.success(v4) result.future }

    清单3使用Scala scala.concurrent.Await对象result()方法进行阻塞等待。 对于代码首先等待task1结果,然后创建两个task2和task3反过来等待每个前期货,终于等待task4结果。 最后三行(创建和设置result )使方法可以返回Future[Int] 。 返回将来使该方法与我将在下面显示的非阻塞形式保持一致,但是将来实际上将在方法返回之前完成。

    结合期货

    清单4(同样来自示例代码中的AsyncHappy类)显示了一种将期货链接在一起的方法,以正确的顺序和正确的依赖关系执行任务,而没有阻塞。

    清单4.使用onSuccess()处理onSuccess()
    def runOnSuccess() = { val result = Promise[Int] task1(1).onSuccess(v => v match { case v1 => { val a = task2(v1) val b = task3(v1) a.onSuccess(v => v match { case v2 => b.onSuccess(v => v match { case v3 => task4(v2 + v3).onSuccess(v4 => v4 match { case x => result.success(x) }) }) }) } }) result.future }

    清单4的代码使用onSuccess()方法来设置一个函数(技术上是部分函数,​​因为它只处理成功完成的情况),以便在每个将来完成时执行。 由于onSuccess()调用是嵌套的,因此它们onSuccess()顺序执行(即使期货未按顺序全部完成)。

    清单4的代码相当容易理解,但很冗长。 清单5显示了使用flatMap()方法处理这种情况的更简单方法。

    清单5.使用flatMap()处理flatMap()
    def runFlatMap() = { task1(1) flatMap {v1 => val a = task2(v1) val b = task3(v1) a flatMap { v2 => b flatMap { v3 => task4(v2 + v3) }} } }

    清单5的代码实际上与清单4的代码执行相同的操作,但是清单5使用flatMap()方法从每个flatMap()中提取单个结果值。 使用flatMap()消除了清单4中所需的match / case构造,给出了一个更简洁的形式,但保持了相同的逐步执行路径。

    试试这个例子

    该示例代码使用Scala App依次运行事件代码的每个版本,并确保完成时间(约5秒)和结果(13)正确。 您可以使用Maven从命令行运行此代码,如清单6所示(除去多余的Maven输出):

    清单6.运行事件代码
    dennis@linux-9qea:~/devworks/scala4/code> mvn scala:run -Dlauncher=happypath ... [INFO] launcher 'happypath' selected => com.sosnoski.concur.article4.AsyncHappy Starting runBlocking runBlocking returned 13 in 5029 ms. Starting runOnSuccess runOnSuccess returned 13 in 5011 ms. Starting runFlatMap runFlatMap returned 13 in 5002 ms.

    不幸的道路

    到目前为止,您已经看到了以总是成功完成的期货形式协调事件的代码。 在实际的应用程序中,您不能依赖始终坚持这条快乐的路。 处理任务会发生问题,用JVM语言来讲,这些问题通常由Throwable表示。

    更改清单2的任务定义以使用delayedFailure()代替delayedSuccess()方法很容易,如task4所示:

    def task4(input: Int) = TimedEvent.delayedFailure(1, "This won't work!")

    如果你只是运行清单3代码task4修饰的异常来完成,你会得到预期的IllegalArgumentException被抛出Await.result()的调用task4 。 如果问题未在runBlocking()方法中发现,则异常会一直沿调用链传递,直到最终被发现为止(如果未捕获,则终止线程)。 幸运的是,修改代码很容易,这样,如果任何一项任务异常完成,该异常就会传递给调用方,以便在返回的将来进行处理。 清单7显示了此更改。

    清单7.阻止异常的等待
    def runBlocking() = { val result = Promise[Int] try { val v1 = Await.result(task1(1), Duration.Inf) val future2 = task2(v1) val future3 = task3(v1) val v2 = Await.result(future2, Duration.Inf) val v3 = Await.result(future3, Duration.Inf) val v4 = Await.result(task4(v2 + v3), Duration.Inf) result.success(v4) } catch { case t: Throwable => result.failure(t) } result.future }

    在清单7中 ,原始代码包装在try / catch ,并且catch将异常作为返回的future的完成传递回去。 这种方法增加了一点复杂性,但是对于任何Scala开发人员来说仍然应该很容易理解。

    清单4和5中的事件处理代码的非阻塞变体如何? 顾名思义,清单4中使用的onSuccess()方法仅在成功完成future时起作用。 如果要同时处理成功和失败完成,则必须改用onComplete()方法并检查哪种完成类型适用。 清单8显示了该技术如何用于事件处理代码。

    清单8.成功和失败的onComplete()处理
    def runOnComplete() = { val result = Promise[Int] task1(1).onComplete(v => v match { case Success(v1) => { val a = task2(v1) val b = task3(v1) a.onComplete(v => v match { case Success(v2) => b.onComplete(v => v match { case Success(v3) => task4(v2 + v3).onComplete(v4 => v4 match { case Success(x) => result.success(x) case Failure(t) => result.failure(t) }) case Failure(t) => result.failure(t) }) case Failure(t) => result.failure(t) }) } case Failure(t) => result.failure(t) }) result.future }

    清单8看起来很乱,幸运的是,您有一个更简单的选择:改为使用清单5 flatMap()代码。 flatMap()方法可处理成功完成和失败完成,而无需进行任何更改。

    使用async

    Scala的最新版本包括使用宏在编译期间转换代码的功能。 迄今为止实现的最有用的宏之一是async ,它将在编译过程中将使用期货的看似顺序的代码转换为异步代码。 清单9显示了async如何简化本教程中使用的任务代码。

    清单9.将期货与async {}
    def runAsync(): Future[Int] = { async { val v1 = await(task1(1)) val a = task2(v1) val b = task3(v1) await(task4(await(a) + await(b))) } }

    清单9中包含的async {...}块调用了async宏。 此调用将声明该块为异步块,并且默认情况下将异步执行该块,并返回该块结果的未来。 在该块内, await()方法(实际上是宏的关键字,而不是true方法)显示了需要future结果的位置。 异步宏在编译期间修改Scala程序的抽象语法树(AST),以将块转换为使用回调的代码,大致等效于清单4代码。

    除了async {...}包装器之外, 清单9的代码看起来很像清单3所示的原始阻塞代码。 对于宏来说,这是一个相当大的成就—消除了异步事件的所有复杂性,使其看起来就像您在编写简单的线性代码。 在幕后,相当多的复杂的是参与。

    async发现

    如果查看Scala编译器从源代码生成的类,您将看到几个内部类,它们的名称类似于AsyncHappy$$anonfun$1.class 。 正如您可能从名称中猜到的那样,它们是由编译器为匿名函数(例如,传递给onSuccess()或flatMap()方法的语句)生成的。

    使用Scala 2.11.1编译器和Async 0.9.2实现,您还将看到一个名为AsyncUnhappy$stateMachine$macro$1$1.class 。 这是由async宏生成的实际实现代码,以状态机的形式处理异步任务。 清单10显示了该类的部分反编译视图。

    清单10.反编译的AsyncUnhappy$stateMachine$macro$1$1.class
    public class AsyncUnhappy$stateMachine$macro$1$1 implements Function1<Try<Object>, BoxedUnit>, Function0.mcV.sp { private int state; private final Promise<Object> result; private int await$macro$3$macro$13; private int await$macro$7$macro$14; private int await$macro$5$macro$15; private int await$macro$11$macro$16; ... public void resume() { ... } public void apply(Try<Object> tr) { int i = this.state; switch (i) { default: throw new MatchError(BoxesRunTime.boxToInteger(i)); case 3: if (tr.isFailure()) { result().complete(tr); } else { this.await$macro$11$macro$16 = BoxesRunTime.unboxToInt(tr.get()); this.state = 4; resume(); } break; case 2: if (tr.isFailure()) { result().complete(tr); } else { this.await$macro$7$macro$14 = BoxesRunTime.unboxToInt(tr.get()); this.state = 3; resume(); } break; case 1: if (tr.isFailure()) { result().complete(tr); } else { this.await$macro$5$macro$15 = BoxesRunTime.unboxToInt(tr.get()); this.state = 2; resume(); } break; case 0: if (tr.isFailure()) { result().complete(tr); } else { this.await$macro$3$macro$13 = BoxesRunTime.unboxToInt(tr.get()); this.state = 1; resume(); } break; } } ... }

    清单10 apply()方法处理实际状态更改,评估将来的结果并更改输出状态以匹配。 输入状态告诉代码正在评估哪个未来; 每个状态值对应于异步块内的一个特定的将来。 从清单10的部分代码很难分辨这一点,但是通过查看其他一些字节码,您可以看到状态码与任务匹配,因此状态0表示期望task1的结果,状态1表示预期的结果。预期task2结果,依此类推。

    清单10中未显示resume()方法,因为反编译器无法弄清楚如何将其转换为Java代码。 我也不会进行此练习,但是通过查看字节码,我可以告诉您, resume()方法等效于状态码上的Java switch 。 对于每个非终结状态, resume()执行相应的代码片段以设置下一个预期的未来,最后将AsyncUnhappy$stateMachine$macro$1$1实例设置为future的onComplete()方法的目标。 对于终端状态, resume()设置结果值并完成对最终结果的承诺。

    实际上,您无需深入研究生成的代码即可了解异步(尽管这可能很有趣)。 SIP-22-异步建议中包含有关异步工作原理的完整说明。

    async限制

    由于async宏将代码转换为状态机类的方式,因此使用async宏存在一些限制。 最重要的限制是您不能将await()嵌套在async块内的另一个对象或闭包(包括函数定义)中。 您也不能将await()嵌套在try或catch 。

    除了这些使用限制之外, async的最大问题是,当涉及到调试时,您会回到通常与异步代码相关的回调地狱体验-在这种情况下,试图弄清没有反映您明显的代码结构。 不幸的是,当前的调试器设计无法解决这些问题。 这是在Scala中看到新作品的区域。 同时,您可以禁用异步块的异步执行以async调试(假设您要解决的问题在顺序执行时仍然发生)。

    最后,Scala宏仍在开发中。 目的是在即将发布的版本中, async将成为Scala语言的正式组成部分,但这仅在Scala语言团队对宏的工作方式感到满意时才会发生。 在此之前,无法保证async的形式不会改变。

    结论

    一些处理异步事件的Scala方法与“ JVM并发性 :阻塞还是不阻塞? ”中讨论的Java代码flatMap() 。通过使用flatMap()和async宏,Scala提供了干净且易于理解的技术。 async特别有趣,因为您可以编写看起来像普通顺序代码的代码,但是编译后的代码可以同时执行。 Scala不是唯一提供这种方法的语言,但是基于宏的实现为其他方法提供了更高的灵活性。


    翻译自: https://www.ibm.com/developerworks/java/library/j-jvmc4/index.html

    Processed: 0.025, SQL: 8