学到更多。 开发更多。 连接更多。
新的developerWorks Premium会员计划可通过Safari图书在线获得对强大的开发工具和资源的无障碍访问权,其中包括500个顶级技术标题(数十个专门针对Java开发人员),主要开发人员活动的超低折扣,最近O'Reilly的视频重播会议等。 立即注册 。
“ JVM并发 :与Akka异步操作 ”向您介绍了actor模型以及Akka框架和运行时。 构建actor应用程序与构建传统的线性应用程序不同。 对于线性应用程序,您需要考虑控制流以及实现目标所涉及的步骤顺序。 为了有效地使用参与者模型,您可以将应用程序分解成独立的状态和行为束(参与者),并编写出这些束之间的交互(消息)。 参与者和消息这两个组件是您的应用程序的构建块。
如果您正确地编写了参与者和消息,那么最终您将获得一个系统,其中大多数事情都是异步发生的。 异步操作比线性方法更难理解,但是它在可伸缩性方面得到了回报。 高度异步的程序可以更好地使用增加的系统资源(例如,内存和处理器)来更快地完成特定任务或并行处理任务的更多实例。 借助Akka,您甚至可以通过使用远程处理功能与分布式actor一起在多个系统上扩展此可伸缩性。
关于本系列
现在,多核系统无处不在,并发编程必须比以往任何时候都得到更广泛的应用。 但是并发可能难以正确实现,因此您需要新的工具来帮助您使用它。 许多基于JVM的语言都在开发这种类型的工具,并且Scala在这一领域特别活跃。 本系列文章使您了解了一些用于Java和Scala语言的并发编程的较新方法。
在本文中,您将从参与者和消息方面了解有关构建系统的更多信息。 两个示例应用程序中的第一个显示了参与者和消息如何在Akka中工作的基础知识。 第二个更详细的示例演示了角色系统的计划和可视化。 这两个示例都使用Scala代码,但Java开发人员很容易理解它们(有关帮助,请参阅本系列的上一篇文章,以获取Scala和使用Akka进行Java编程的并排示例)。
下载本文的示例代码 。
认识Star
上一篇文章中的示例使用:
由启动actor系统的主应用程序直接创建的actor 只有一种类型的演员 演员之间的互动最少
对于第一个示例应用程序,我使用稍微更复杂的结构,我将逐个进行回顾。 清单1显示了整个应用程序。
清单1.代Star小号
import scala.concurrent.duration._
import scala.util.Random
import akka.actor._
import akka.util._
object Stars1 extends App {
import Star._
val starBaseLifetime = 5000 millis
val starVariableLifetime = 2000 millis
val starBaseSpawntime = 2000 millis
val starVariableSpawntime = 1000 millis
object Namer {
case object GetName
case class SetName(name: String)
def props(names: Array[String]): Props = Props(new Namer(names))
}
class Namer(names: Array[String]) extends Actor {
import context.dispatcher
import Namer._
context.setReceiveTimeout(starBaseSpawntime + starVariableSpawntime)
def receive = {
case GetName => {
val name = ...
sender ! SetName(name)
}
case ReceiveTimeout => {
println("Namer receive timeout, shutting down system")
system shutdown
}
}
}
object Star {
case class Greet(peer: ActorRef)
case object AskName
case class TellName(name: String)
case object Spawn
case object IntroduceMe
case object Die
def props(greeting: String, gennum: Int, parent: String) = Props(new Star(greeting, gennum, parent))
}
class Star(greeting: String, gennum: Int, parent: String) extends Actor {
import context.dispatcher
var myName: String = ""
var starsKnown = Map[String, ActorRef]()
val random = Random
val namer = context actorSelection namerPath
namer ! Namer.GetName
def scaledDuration(base: FiniteDuration, variable: FiniteDuration) =
base + variable * random.nextInt(1000) / 1000
val killtime = scaledDuration(starBaseLifetime, starVariableLifetime)
val killer = scheduler.scheduleOnce(killtime, self, Die)
val spawntime = scaledDuration(starBaseSpawntime, starVariableSpawntime)
val spawner = scheduler.schedule(spawntime, 1 second, self, Spawn)
if (gennum > 1) scheduler.scheduleOnce(1 second, context.parent, IntroduceMe)
def receive = {
case Namer.SetName(name) => {
myName = name
println(s"$name is the ${gennum}th generation child of $parent")
context become named
}
}
def named: Receive = {
case Greet(peer) => peer ! AskName
case AskName => sender ! TellName(myName)
case TellName(name) => {
println(s"$myName says: '$greeting, $name'")
starsKnown += name -> sender
}
case Spawn => {
println(s"$myName says: A star is born!")
context.actorOf(props(greeting, gennum + 1, myName))
}
case IntroduceMe => starsKnown.foreach {
case (name, ref) => ref ! Greet(sender)
}
case Die => {
println(s"$myName says: 'I'd like to thank the Academy...'")
context stop self
}
}
}
val namerPath = "/user/namer"
val system = ActorSystem("actor-demo-scala")
val scheduler = system.scheduler
system.actorOf(Namer.props(Array("Bob", "Alice", "Rock", "Paper", "Scissors",
"North", "South", "East", "West", "Up", "Down")), "namer")
val star1 = system.actorOf(props("Howya doing", 1, "Nobody"))
val star2 = system.actorOf(props("Happy to meet you", 1, "Nobody"))
Thread sleep 500
star1 ! Greet(star2)
star2 ! Greet(star1)
}
该应用程序创建具有两种角色类型的角色系统: Namer和Star 。 Namer actor是一个单例,实际上是名称的中央目录。 Star演员从Namer获取其(屏幕)名称,然后将问候消息打印到其他Star ,如上一部分中的示例所示。 但是他们也产生子Star ,然后将他们介绍给他们认识的Star 。 而Star演员最终可能会死。
清单2是运行此应用程序时可能会看到的输出示例。
清单2.应用程序输出
Bob is the 1th generation child of Nobody
Alice is the 1th generation child of Nobody
Bob says: 'Howya doing, Alice'
Alice says: 'Happy to meet you, Bob'
Bob says: A star is born!
Rock is the 2th generation child of Bob
Alice says: A star is born!
Paper is the 2th generation child of Alice
Bob says: A star is born!
Scissors is the 2th generation child of Bob
Alice says: 'Happy to meet you, Rock'
Alice says: A star is born!
North is the 2th generation child of Alice
Bob says: 'Howya doing, Paper'
Rock says: 'Howya doing, Paper'
Bob says: A star is born!
South is the 2th generation child of Bob
Alice says: 'Happy to meet you, Scissors'
Paper says: 'Happy to meet you, Scissors'
Alice says: A star is born!
East is the 2th generation child of Alice
Bob says: 'Howya doing, North'
Rock says: 'Howya doing, North'
Scissors says: 'Howya doing, North'
Paper says: A star is born!
West is the 3th generation child of Paper
Rock says: A star is born!
Up is the 3th generation child of Rock
Bob says: A star is born!
Down is the 2th generation child of Bob
Alice says: 'Happy to meet you, South'
North says: 'Happy to meet you, South'
Paper says: 'Happy to meet you, South'
Scissors says: A star is born!
Bob-Bob is the 3th generation child of Scissors
Alice says: A star is born!
Bob-Alice is the 2th generation child of Alice
Scissors says: 'Howya doing, East'
Rock says: 'Howya doing, East'
Bob says: 'Howya doing, East'
South says: 'Howya doing, East'
North says: A star is born!
Bob-Rock is the 3th generation child of North
Paper says: A star is born!
Bob-Paper is the 3th generation child of Paper
Bob says: 'I'd like to thank the Academy...'
Scissors says: 'Howya doing, West'
South says: 'Howya doing, West'
Alice says: A star is born!
Bob-Scissors is the 2th generation child of Alice
North says: A star is born!
Bob-North is the 3th generation child of North
Paper says: A star is born!
Bob-South is the 3th generation child of Paper
Alice says: 'I'd like to thank the Academy...'
Namer receive timeout, shutting down system
Star世代
与某些现实世界中的演员不同, Star演员不会以戏剧化和公开的方式产生后代。 相反,每次收到Spawn消息时,他们都会悄悄弹出一个孩子。 他们对此事件感到兴奋的唯一标志是简单的出生公告“ A star is born! ”再次,与现实世界中的演员不同,骄傲的新父母Star甚至无法宣布他们的新孩子的名字,而是由他们决定的。由命名机构。 初出茅庐的后Star被命名时, Namer打印孩子的姓名和细节的线条形式的“ Ted is the 2th generation child of Bob 。”
Star的死亡是由Star收到Die消息触发的,对此消息I'd like to thank the Academy...打印一条消息:“ I'd like to thank the Academy... ” 然后, Star执行context stop self声明,告诉控制Akka actor上下文已完成并且应将其关闭。 然后,上下文负责所有清理工作,并将参与者从系统中删除。
改变角色
现实世界中的演员可以扮演许多不同的角色。 通过更改消息处理程序方法,Akka参与者也可以扮演不同的角色。 您可以在Star actor中看到此消息,其中默认的receive方法仅处理SetName消息,而所有其他消息均由named方法处理。 切换发生在SetName消息的处理中,并且context become named语句。 更改角色的目的是,在Star命名之前,它什么也不能做,而在命名之后,就不能重命名。
您始终可以在一个receive方法中处理所有消息处理,但这通常会使基于当前参与者状态的条件语句产生混乱的代码。 对不同的状态使用单独的receive方法可以使您的代码保持整洁和直接。 通常,每当您有适合于其他消息的参与者状态时,您应该倾向于使用新的receive方法来表示该状态。
您确实需要当心,在更改角色角色时不要排除对有效消息的处理。 例如,如果允许在任何时候对Star actor进行重named , 清单1中的named方法将需要处理SetName消息。 参与者当前使用的receive方法未处理的任何消息都会被有效地丢弃(实际上,默认情况下会发送到死信邮箱,但就用户参与者而言,它们会被丢弃)。
作为更改消息处理程序的替代方法,您还可以将当前消息处理程序压入堆栈,并使用两个参数的形式become(named, false)设置一个新的消息处理程序。 然后,您可以最终通过context unbecome调用来还原原始处理程序。 您可以嵌套调用become / unbecome这样深深只要你想,但你必须要小心的是,代码最终执行的unbecome匹配每一个become 。 任何不匹配的become表示内存泄漏。
Namer演员
Namer actor在其构造函数中传递了一个名称字符串数组。 每次收到GetName消息时,它都会在SetName消息中返回数组中的下一个名称,当用完简单名称时将使用带连字符的名称。 Namer actor的重点是为Star actor分配名称(理想情况下,唯一的名称),因此在该系统中没有理由拥有多个Namer实例。 启动actor系统的应用程序代码直接创建此单例实例,因此每个Star都可以使用它。
因为该应用程序创建了Namer单例,所以它可以ActorRef传递给每个Star ,而Star actor可以将其传递给其子级。 但是Akka为您提供了一种处理此类知名演员的更干净的方法。 该val namer = context actorSelection namerPath线的Star演员初始化中查找Namer通过其在演员系统路径的演员-在这种情况下, /user/namer 。 ( /user前缀适用于所有用户创建的actor, namer是使用system.actorOf创建Namer actor时设置的名称。) namer值对于应用程序中包含的所有actor都是可见的,因此可以使用它直接在需要时。
预定讯息
清单1的示例使用了几个已调度的消息来提示各个参与者。 Star演员在初始化期间创建两个或三个计划的消息。 val killer = scheduler.scheduleOnce(killtime, self, Die)语句创建一次性消息调度程序,以通过在舞台上的时间结束时发送Die消息来触发Star的死亡。 val spawner = scheduler.schedule(spawntime, 1 second, self, Spawn)语句创建一个重复的调度程序,该调度程序在初始延迟后以1秒的间隔发送Spawn消息,以填充新一代Star 。
第三种类型的用于调度消息的Star用于仅当Star是另一个的后代Star (由演员系统外的应用程序代码中创建,而不是)。 if (gennum > 1) scheduler.scheduleOnce(1 second, context.parent, IntroduceMe)语句创建一个计划的消息,如果新的Star是第二代的,则在初始化Star之后一秒钟将其发送到Star的父级或更高版本。 当父Star收到此消息时,它会向彼此引入的Star发送“ Greet消息,要求这些已知的Stars将自己介绍给孩子。
Namer actor还使用计划的消息,该消息以接收超时的形式出现。 context.setReceiveTimeout(starBaseSpawntime + starVariableSpawntime)语句将超时设置为星星的最大生成时间。 每次参与者收到消息时,上下文都会重置此超时,因此只有在指定的时间流逝而未接收到任何消息时才触发该超时。 Star不断创建新的子Star ,该子Star将消息发送到Namer ,因此仅当所有Star actor都消失时才会发生超时。 如果超时确实发生, Namer处理所得ReceiveTimeout消息(在定义的akka.actor通过关闭整个系统演员封装)。
敏锐的读者可能会想知道Namer超时是如何发生的。 Star的寿命总是至少要5秒钟,并且每个Star在最长3秒钟之前都会开始生成子级Star -因此, 似乎 Star s的数量会不断增加(有点像真人秀电视)。 那么这是如何工作的呢? 答案在于Akka 演员监督模型和亲子关系。
演员家族
Akka根据亲子关系为演员强制执行监督等级。 当一个演员创建另一个演员时,创建的演员将成为原始演员的从属。 这意味着父级演员对其子级演员负责(我们经常希望将其应用于实际演员)。 这项责任主要与失败处理有关 ,但是它对参与者的工作方式确实有一些影响。
监视层次结构是清单1参与者系统关闭的原因。 由于层次结构要求父级actor可用,因此终止父级actor会自动终止其所有子actor。 在清单1中 ,该应用程序最初仅创建了两个Star actor(它们始终使用名称Bob和Alice )。 所有其他Star都是由这两个初始Star之一或其子孙孙Star的。 因此,当每个根Star终止时,它将带走其所有后代。 他们两个都终止之后,就没有Star 。 如果没有任何Star产生子Star ,则不会向Namer名称请求,因此最终会触发Namer超时,并且系统将关闭。
更复杂的演员系统
您在清单1中看到了一个简单的actor系统如何工作的示例。 但是实际的应用程序系统通常具有更多类型的参与者(通常为数十或数百个),并且参与者之间的交互更为复杂。 设计和组织复杂参与者系统的最佳方法之一是通过指定参与者之间的消息流。
对于更复杂的示例,我扩展了清单1应用程序以实现电影制作的简单模型。 该模型使用四种主要角色类型和两种专门的次要角色类型:
Star :参加电影的演员 Scout :寻找Star的天才侦察兵 Academy :跟踪所有活动Star的单例注册表 Director :电影制片人
CastingAssistant : Director的电影助理 ProductionAssistant : Director助理制作电影
像Star S IN 清单1中, Star在这个应用程序的演员有寿命有限。 Director开始制作电影时,会获得要在电影中投放的当前活动Star的列表。 首先, Director需要将Star提交给电影,然后在所有Star提交之后制作电影。 如果电影完成前电影中的任何Star退出业务(或以演员的身分去世),则电影将失败。
绘制消息图
清单1的应用程序很简单,我可以用散文来解释演员的互动。 这个更加复杂的新应用程序需要一种更好的呈现交互的方式。 消息传递图是显示这些交互的好方法。 图1显示了Scout寻找Star (或用演员的话说是创建Star )以及Star在Academy注册所涉及的交互序列。
图1. Star创建和初始化
这是添加Star涉及的消息序列(和创建步骤):
FindTalent (从Scheduler到Scout ):触发以添加Star 。 GetName (从Scout到Academy ):为Star分配一个名称。 GiveName (来自Academy响应):供应分配的名称。 actorOf() : Scout使用提供的名称创建新的Star actor。 Register (从Star到Academy ):注册Star与Academy 。
此消息序列被设计为可伸缩和灵活的。 每个消息都可以隔离处理,因此参与者无需更改其内部状态即可处理消息交换。 ( Academy单例更改状态,但这是交换的全部目的的一部分。)由于内部状态没有更改,因此您无需强制执行严格的消息顺序。 例如,您可以通过向Academy发送多个GetName消息来使FindTalent消息创建多个Star 。 在完成最后一个Star创建之前,您甚至可以连续处理多个FindTalent消息。 您还可以在系统中添加任意数量的Scout actor,并使它们独立运行,而不会发生冲突。
与创建新的Star ,制作电影是一个复杂得多的过程,涉及更多的状态更改和潜在的故障情况。 图2显示了制作电影所涉及的主要应用程序消息:
图2.制作电影
这是制作电影时所涉及的消息序列,主要看的是一切顺利的小路:
MakeMovie (从Scheduler到Director ):触发以开始MakeMovie电影。 PickStars (从Director到Academy ):选择要在电影中扮演的Star 。 StarsPicked或PickFailure (来自Academy响应):如果有足够的Star可供制作电影,则Academy选择所需的数字,并在StarsPicked消息中发回列表; 否则, Academy将发送PickFailure响应。 actorOf() : Director创建了一个CastingAssistant演员来处理电影的投射。 OfferRole( CastingAssistant每个Star在电影): CastingAssistant提供角色的Star 。 AcceptRole或RejectRole (来自每个Star响应):如果Star已承诺提供另一个角色,则它拒绝提供的角色,否则接受。 AllSigned或CastingFailure (给父CastingFailure CastingAssistant ):当所有Star都接受了它们的角色后, CastingAssistant的工作就完成了,以便通过AllSigned消息将成功传递给父级Director 。 如果无法投射Star (特别是如果一个人死亡),则CastingAssistant会将失败传递给父级。 无论哪种方式, CastingAssistant都可以完成并且可以终止。 actorOf() : Director创建了ProductionAssistant演员来处理电影的拍摄。 ProductionComplete (从Scheduler到ProductionAssistant ):在经过所需时间后触发影片的完成。 ProductionComplete或ProductionFailure (将ProductionAssistant传递给父级):当计时器触发以完成电影时, ProductionAssistant向其父级报告该电影已完成。 RoleComplete ( ProductionAssistant每个Star在电影):该ProductionAssistant还需要通知每个Star ,电影完成后,使它们成为可用于其他电影。
此消息序列使用某些参与者的状态更改作为处理的一部分。 Star需要在可用状态和投入观看电影之间更改状态。 CastingAssistant演员需要跟踪哪个Star接受了要在电影中扮演的角色,因此他们知道仍需要招聘哪些演员。 但是Director演员不需要更改状态,因为他们只响应收到的消息(包括来自其子演员的消息)。 ProductionAssistant演员也不需要更改状态,因为他们只需要在电影终止时通知其他演员即可。
如果将其功能合并到Director演员中,则可以避免使用单独的CastingAssistant和ProductionAssistant演员。 但是,消除其他参与者会使Director变得更加复杂,在这种情况下,将功能分成其他参与者更为有意义。 当您考虑处理故障时尤其如此。
处理失败
该应用程序的一个重要方面被遗漏在图1和图2的消息流中。 Star的寿命有限,因此与Star交往的所有演员都必须知道一个人去世的时间。 特别是,如果为电影选择的Star在电影完成之前死亡,则电影必然会失败。
Akka actor系统中的故障处理使用父母监督,从而将故障条件传递到actor的层次结构中。 故障通常在JVM中表示为异常,因此Akka使用异常的自然处理来检测何时发生故障。 如果一个actor不在其自己的代码中处理异常,则Akka通过终止该actor并将故障传递给父actor来处理未捕获的异常。 然后,父级可以处理故障,或者故障本身可以处理给父级。
Akka的内置故障处理功能可以很好地处理与I / O相关的故障,但对于电影制作系统来说,异常是不必要的麻烦。 在这种情况下,需要监视其他参与者,幸运的是Akka提供了一种简便的方法。 通过使用演员系统的DeathWatch组件,演员可以将自己注册为观看任何其他演员。 注册后,如果观看者死亡,则观看者会收到系统Terminated消息。 (为避免出现任何竞赛情况,如果被监视的演员在开始观看之前已经死亡,则Terminated消息会立即出现在监视演员的邮箱中。)
DeathWatch通过调用激活context.watch()方法,它接受ActorRef演员的要被观看。 当感兴趣的演员去世时,最终的Terminated消息是电影制作示例所需的所有失败处理。
Star创作代码
清单3显示了启动应用程序和创建新Star涉及的代码,与图1所示的消息流匹配。
清单3.创建Star代码
object Stars2 extends App { object Scout {
case object FindTalent
val starBaseLifetime = 7 seconds
val starVariableLifetime = 3 seconds
val findBaseTime = 1 seconds
val findVariableTime = 3 seconds
def props(): Props = Props(new Scout())
}
class Scout extends Actor {
import Scout._
import Academy._
import context.dispatcher
val random = Random
scheduleFind
def scheduleFind = {
val nextTime = scaledDuration(findBaseTime, findVariableTime)
scheduler.scheduleOnce(nextTime, self, FindTalent)
}
def scaledDuration(base: FiniteDuration, variable: FiniteDuration) =
base + variable * random.nextInt(1000) / 1000
def receive = {
case FindTalent => academy ! GetName
case GiveName(name) => {
system.actorOf(Star.props(name, scaledDuration(starBaseLifetime, starVariableLifetime)), name)
println(s"$name has been discovered")
scheduleFind
}
}
}
object Academy {
case object GetName
case class GiveName(name: String)
case class Register(name: String)
...
def props(names: Array[String]): Props = Props(new Academy(names))
}
class Academy(names: Array[String]) extends Actor {
import Academy._
var nextNameIndex = 0
val nameIndexLimit = names.length * (names.length + 1)
val liveStars = Buffer[(ActorRef, String)]()
...
def receive = {
case GetName => {
val name =
if (nextNameIndex < names.length) names(nextNameIndex)
else {
val first = nextNameIndex / names.length - 1
val second = nextNameIndex % names.length
names(first) + "-" + names(second)
}
sender ! GiveName(name)
nextNameIndex = (nextNameIndex + 1) % nameIndexLimit
}
case Register(name) => {
liveStars += ((sender, name))
context.watch(sender)
println(s"Academy now tracking ${liveStars.size} stars")
}
case Terminated(ref) => {
val star = (liveStars.find(_._1 == ref)).get
liveStars -= star
println(s"${star._2} has left the business\nAcademy now tracking ${liveStars.size} Stars")
}
...
}
}
}
object Star {
...
def props(name: String, lifespan: FiniteDuration) = Props(new Star(name, lifespan))
}
class Star(name: String, lifespan: FiniteDuration) extends Actor {
import Star._
import context.dispatcher
academy ! Academy.Register(name)
scheduler.scheduleOnce(lifespan, self, PoisonPill)
}
...
val system = ActorSystem("actor-demo-scala")
val scheduler = system.scheduler
val academy = system.actorOf(Academy.props(Array("Bob", "Alice", "Rock",
"Paper", "Scissors", "North", "South", "East", "West", "Up", "Down")), "Academy")
system.actorOf(Scout.props(), "Sam")
system.actorOf(Scout.props(), "Dean")
system.actorOf(Director.props("Astro"), "Astro")
system.actorOf(Director.props("Cosmo"), "Cosmo")
Thread sleep 15000
system.shutdown
}
清单3代码大部分使用与清单1 Star示例相同的Akka功能,并增加了Academy演员在处理来自新Star的Register消息时进行的DeathWatch激活context.watch()调用。 Academy演员记录了ActorRef和每个Star的名称,并且在处理Terminated消息时,它使用ActorRef查找并删除了死亡的Star 。 这样,Live Star的Buffer (本质上是ArrayList )保持最新状态。
主要应用程序代码首先创建单身Academy演员,然后创建一对Scout ,最后创建一对Director 。 该应用程序允许actor系统运行15秒钟,然后关闭系统并退出。
开始电影
清单4显示了制作电影所涉及的代码的第一部分:铸造Star参与电影。 该代码与图2消息流的顶部匹配,包括Scheduler以及Director和Academy actor之间的交互。
清单4.电影制作代码
object Stars2 extends App {
...
object Director {
case object MakeMovie
val starCountBase = 2
val starCountVariable = 4
val productionTime = 3 seconds
val recoveryTime = 3 seconds
def props(name: String) = Props(new Director(name))
}
class Director(name: String) extends Actor {
import Academy._
import Director._
import ProductionAssistant._
import context.dispatcher
val random = Random
def makeMovie = {
val numstars = random.nextInt(starCountVariable) + starCountBase
academy ! PickStars(numstars)
}
def retryMovie = scheduler.scheduleOnce(recoveryTime, self, MakeMovie)
makeMovie
def receive = {
case MakeMovie => makeMovie
case PickFailure => retryMovie
case StarsPicked(stars) => {
println(s"$name wants to make a movie with ${stars.length} actors")
context.actorOf(CastingAssistant.props(name, stars.map(_._1)), name + ":Casting")
context become casting
}
}
...
}
...
object Academy {
...
case class PickStars(count: Int)
case object PickFailure
case class StarsPicked(ref: List[(ActorRef, String)])
def props(names: Array[String]): Props = Props(new Academy(names))
}
class Academy(names: Array[String]) extends Actor {
...
def pickStars(n: Int): Seq[(ActorRef, String)] = ...
def receive = {
...
case PickStars(n) => {
if (liveStars.size < n) sender ! PickFailure
else sender ! StarsPicked(pickStars(n).toList)
}
}
}
清单4代码的开头给出了Director对象和actor定义的一部分,显示了由Scheduler向Director发送MakeMovie消息触发的电影制作的开始。 该Director开始时这个电影制作过程MakeMovie收到消息,要求Academy分配Star s到电影与PickStars消息。 清单4末尾显示的用于处理PickStars消息的Academy代码发送回PickFailure (如果没有足够的Star )或StarsPicked消息。 如果Director收到一个PickFailure消息,它将安排另一个尝试,以备以后使用。 如果Director收到StarsPicked消息,它将使用Academy为电影中的角色选择的Star列表启动CastingAssistant演员,然后更改状态以处理CastingAssistant的响应。 清单5从这一点继续,从Director演员的强制Receive方法开始。
清单5. CastingAssistant操作
class Director(name: String) extends Actor {
...
def casting: Receive = {
case CastingAssistant.AllSigned(stars) => {
println(s"$name cast ${stars.length} actors for movie, starting production")
context.actorOf(ProductionAssistant.props(productionTime, stars), name + ":Production")
context become making
}
case CastingAssistant.CastingFailure => {
println(s"$name failed casting a movie")
retryMovie
context become receive
}
}
...
}
object CastingAssistant {
case class AllSigned(stars: List[ActorRef])
case object CastingFailure
val retryTime = 1 second
def props(dirname: String, stars: List[ActorRef]) = Props(new CastingAssistant(dirname, stars))
}
class CastingAssistant(dirname: String, stars: List[ActorRef]) extends Actor {
import CastingAssistant._
import Star._
import context.dispatcher
var signed = Set[ActorRef]()
stars.foreach { star =>
{
star ! OfferRole
context.watch(star)
}
}
def receive = {
case AcceptRole => {
signed += sender
println(s"Signed star ${signed.size} of ${stars.size} for director $dirname")
if (signed.size == stars.size) {
context.parent ! AllSigned(stars)
context.stop(self)
}
}
case RejectRole => scheduler.scheduleOnce(retryTime, sender, OfferRole)
case Terminated(ref) => {
context.parent ! CastingFailure
stars.foreach { _ ! Star.CancelOffer }
context.stop(self)
}
}
}
object Star {
case object OfferRole
case object AcceptRole
case object RejectRole
case object CancelOffer
case object RoleComplete
...
}
class Star(name: String, lifespan: FiniteDuration) extends Actor {
...
var acceptedOffer: ActorRef = null
scheduler.scheduleOnce(lifespan, self, PoisonPill)
def receive = {
case OfferRole => {
sender ! AcceptRole
acceptedOffer = sender
context become booked
}
}
def booked: Receive = {
case OfferRole => sender ! RejectRole
case CancelOffer => if (sender == acceptedOffer) context become receive
case RoleComplete => context become receive
}
}
Director使用要在电影中ActorRef的Star的ActorRef列表创建CastingAssistant 。 CastingAssistant首先向每个Star发送一个OfferRole ,然后将自己注册为每个Star的观察者。 然后, CastingAssistant等待每个Star返回一个AcceptRole或RejectRole消息,或从actor系统中报告一个Star消亡的Terminated消息。
如果CastingAssistant从AcceptRole的每个Star接收到AcceptRole ,则它将AllSigned消息发送回其Director父级。 为了方便起见,该消息包括Star actorRef的列表,因为需要将其传递给下一个处理步骤。
如果CastingAssistant从任何Star接收到RejectRole消息,则它将在延迟后安排将OfferRole重新发送给同一OfferRole 。 (明星通常难以接近,因此,如果您希望它们出现在电影中,则需要不断询问,直到他们接受为止。)
如果CastingAssistant收到Terminated消息,则表示为电影选择的Star之一已死亡。 在这种令人遗憾的情况下, CastingAssistant报告回一个CastingFailure其父Director和目的本身。 不过,在结束之前,它会向其列表中的每个Star发送CancelOffer消息,以便所有已在影片中担任角色的Star都可以担任其他角色。
您可能想知道为什么CastingAssistant将CancelOffer消息发送给每个 Star ,甚至还包括尚未处理AcceptRole消息的Star 。 原因是列表上的Star可能已经发送了AcceptRole ,但是在处理Terminated消息时它仍在邮箱中。 在分布式AcceptRole系统的一般情况下,也有可能Star已接受,但是AcceptRole消息仍在传输中或已丢失。 在每种情况下,向每个Star发送CancelOffer消息都可以使故障处理更加清晰,如果Star没有在正在制作的电影中扮演角色,则Star可以轻松忽略CancelOffer消息。
清单6显示了电影制作过程的最后一部分: ProductionAssistant actor的操作(与图2的右下角匹配)。 这部分很简单,因为ProductionAssistant仅需要处理Scheduler ProductionComplete消息或Terminated消息。
清单6. ProductionAssistant操作
class Director(name: String) extends Actor {
...
def making: Receive = {
case m: ProductionAssistant.ProductionEnd => {
m match {
case ProductionComplete => println(s"$name made a movie!")
case ProductionFailed => println(s"$name failed making a movie")
}
makeMovie
context become receive
}
}
}
object ProductionAssistant {
sealed trait ProductionEnd
case object ProductionComplete extends ProductionEnd
case object ProductionFailed extends ProductionEnd
def props(time: FiniteDuration, stars: List[ActorRef]) = Props(new ProductionAssistant(time, stars))
}
class ProductionAssistant(time: FiniteDuration, stars: List[ActorRef]) extends Actor {
import ProductionAssistant._
import context.dispatcher
stars.foreach { star => context.watch(star) }
scheduler.scheduleOnce(time, self, ProductionComplete)
def endProduction(end: ProductionEnd) = {
context.parent ! end
stars.foreach { star => star ! Star.RoleComplete }
context.stop(self)
}
def receive = {
case ProductionComplete => endProduction(ProductionComplete)
case Terminated(ref) => endProduction(ProductionFailed)
}
}
如果ProductionAssistant从Scheduler接收到ProductionComplete消息,则它可以将成功报告给父级Director 。 如果它首先收到Terminated消息,则必须报告失败。 无论哪种方式,它都通过告诉电影中涉及的所有Star都已完成工作来进行清理。
清单7是运行该程序时看到的输出示例,电影制作结果以粗体显示。
清单7.示例输出
Bob has been discovered
Academy now tracking 1 stars
Alice has been discovered
Academy now tracking 2 stars
Rock has been discovered
Academy now tracking 3 stars
Paper has been discovered
Academy now tracking 4 stars
Cosmo wants to make a movie with 4 actors
Astro wants to make a movie with 3 actors
Signed star 1 of 4 for director Cosmo
Signed star 2 of 4 for director Cosmo
Signed star 3 of 4 for director Cosmo
Signed star 4 of 4 for director Cosmo
Cosmo cast 4 actors for movie, starting production
Scissors has been discovered
Academy now tracking 5 stars
Cosmo made a movie!
Cosmo wants to make a movie with 4 actors
Signed star 1 of 4 for director Cosmo
Signed star 2 of 4 for director Cosmo
Signed star 3 of 4 for director Cosmo
Signed star 4 of 4 for director Cosmo
Cosmo cast 4 actors for movie, starting production
North has been discovered
Academy now tracking 6 stars
South has been discovered
Academy now tracking 7 stars
Cosmo failed making a movieAstro failed casting a movie
Bob has left the business
Academy now tracking 6 Stars
Cosmo wants to make a movie with 3 actors
Signed star 1 of 3 for director Cosmo
Signed star 2 of 3 for director Cosmo
Signed star 3 of 3 for director Cosmo
Cosmo cast 3 actors for movie, starting production
East has been discovered
Academy now tracking 7 stars
West has been discovered
Academy now tracking 8 stars
Alice has left the business
Academy now tracking 7 Stars
Rock has left the business
Academy now tracking 6 Stars
Up has been discovered
Academy now tracking 7 stars
Astro wants to make a movie with 2 actors
Signed star 1 of 2 for director Astro
Signed star 2 of 2 for director Astro
Astro cast 2 actors for movie, starting production
Cosmo made a movie!
Cosmo wants to make a movie with 3 actors
Signed star 1 of 3 for director Cosmo
Signed star 2 of 3 for director Cosmo
Signed star 3 of 3 for director Cosmo
Cosmo cast 3 actors for movie, starting production
Down has been discovered
Academy now tracking 8 stars
列表中点附近的双重故障显示了一个有趣的输出序列。 首先是Cosmo failed making a movie线,然后是Astro failed casting a movie ,其次是Bob has left the business 。 这些线显示了一颗Star的终止: Bob 。 在这种情况下, Bob接受了Cosmo制作的电影中的角色,并且制作已经开始,因此Cosmo的ProductionAssistant收到了Terminated消息,但制作失败。 Bob )也曾被选为Astro制作的电影的角色,但尚未接受该角色(因为Bob ( Bob )已经致力于Cosmo )的电影),因此Astro )的CastingAssistant收到了已Terminated消息,但未能电影。 第三则消息是Academy在收到Terminated消息时生成的。
结论
实际的参与者系统应用程序涉及多个(通常是许多 )参与者以及这些参与者之间的消息。 本文介绍了如何构造参与者系统并绘制参与者交互图,以帮助理解系统的操作。 与参与者和消息一起工作是与编写顺序代码不同的编程方法。 在获得一些经验之后,您会发现使用actor的方法可以轻松地通过异步执行创建高度可扩展的程序。
构建actor和进行消息交换只有到使actor系统正常工作为止。 在某个时候,您需要跟踪演员的行为异常。 参与者系统的异步特性使查明有问题的交互变得更加困难。 如何跟踪和调试actor交互是整篇文章本身值得讨论的主题。
翻译自: https://www.ibm.com/developerworks/java/library/j-jvmc6/index.html
相关资源:akka-actor_2.12 jar包