scala并发

    技术2024-04-07  71

    2003年,Herb Sutter在他的“免费午餐结束了”一文中揭露了业界最大的“肮脏小秘密”,清楚地表明,越来越快的处理器时代已经结束,将由一个新的并行化时代取代。单个芯片上的“核心”(虚拟CPU)。 这个消息在编程界引起了震惊,因为正确地保持线程安全的代码一直都存在,从理论上讲,即使不是在实践中,高能力的软件开发人员也对您的公司而言过于昂贵。 似乎很少有特权的人对Java的线程模型和并发API以及“ synchronized”关键字足够了解,可以编写既提供安全性又提供吞吐量的代码……而且大多数人都很难理解。

    据推测,该行业的其余部分只能自力更生,这显然不是可取的结论,至少对于要开发该软件的IT部门而言,这是不可行的。

    关于本系列

    Ted Neward深入探讨了Scala编程语言,并带您一起学习。 在这个新的developerWorks 系列文章中 ,您将了解最新的炒作内容,并了解Scala在实践中的语言能力。 无论有什么比较之处,Scala代码和Java代码都将并排显示,但是(如您所见),Scala中的许多内容与您在Java中发现的任何内容都没有直接关联-这就是Scala的魅力所在! 毕竟,如果Java可以做到,为什么还要花时间学习Scala?

    与.NET空间中Scala的姊妹语言F#一样,Scala也是所谓的“并发问题”解决方案之一。 在本专栏中,我介绍了Scala的多个属性,这些属性使它更适合编写线程安全代码,例如默认情况下为不可变对象,以及用于返回对象副本而不修改其内容的设计偏好。 不过,Scala对并发的支持远不止于此。 现在是时候开始在Scala库中四处看看,看看那里生活着什么。

    并发基础

    在深入了解Scala的并发支持之前,最好先确保您对Java的基本并发模型有充分的了解,因为Scala对并发的支持在某种程度上建立在Java提供的特性和功能之上。 JVM和支持库。 为此,清单1中的代码包含一个基本的并发问题,称为生产者/消费者问题(如Sun Java Tutorial的“ Guarded Blocks”部分所述)。 请注意,Java Tutorial版本在其解决方案中未使用java.util.concurrent类,而是更喜欢使用java.lang.Object的旧wait() / notifyAll()方法:

    清单1.生产者/消费者(Java5之前的版本)
    package com.tedneward.scalaexamples.notj5; class Producer implements Runnable { private Drop drop; private String importantInfo[] = { "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" }; public Producer(Drop drop) { this.drop = drop; } public void run() { for (int i = 0; i < importantInfo.length; i++) { drop.put(importantInfo[i]); } drop.put("DONE"); } } class Consumer implements Runnable { private Drop drop; public Consumer(Drop drop) { this.drop = drop; } public void run() { for (String message = drop.take(); !message.equals("DONE"); message = drop.take()) { System.out.format("MESSAGE RECEIVED: %s%n", message); } } } class Drop { //Message sent from producer to consumer. private String message; //True if consumer should wait for producer to send message, //false if producer should wait for consumer to retrieve message. private boolean empty = true; //Object to use to synchronize against so as to not "leak" the //"this" monitor private Object lock = new Object(); public String take() { synchronized(lock) { //Wait until message is available. while (empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = true; //Notify producer that status has changed. lock.notifyAll(); return message; } } public void put(String message) { synchronized(lock) { //Wait until message has been retrieved. while (!empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = false; //Store message. this.message = message; //Notify consumer that status has changed. lock.notifyAll(); } } } public class ProdConSample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } }
    Java教程的“错误”

    好奇的读者可能会将此代码与Java教程中的代码进行比较,以了解两者之间的区别。 那些确实会发现,而不是简单地“同步” put和take方法,而是使用了存储在Drop内部的锁对象。 原因很简单:对象的监视器永远不会封装在类的内部,因此编写Java Tutorial版本可以使此(显然是疯狂的)代码破坏它: public class ProdConSample { public static void main(String[] args) { Drop drop = new Drop(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); synchronized(drop) { Thread.sleep(60 * 60 * 24 * 365 * 10); // sleep for 10 years?!? } } }

    通过将私有对象用作锁定所基于的监视器,此代码将无效。 从本质上讲,现在线程安全性实现已封装; 依靠客户的善意才能正常工作。

    注意:我在这里提供的代码是从Sun教程解决方案中稍作修改的; 它们提供的代码存在一个小的设计缺陷(请参阅Java教程的“错误” )。

    生产者/消费者问题的核心是一个易于理解的问题:一个(或多个)生产者实体希望为一个(或多个)消费者实体提供数据以供消费和使用(在这种情况下,它包括打印数据)到控制台)。 的Producer和Consumer类是非常简单的Runnable -implementing类:该Producer需要String从一个数组S和put š它们放入一个缓冲器用于Consumer以take任意的。

    问题的难点在于,如果Producer运行得太快,数据将被覆盖而潜在地丢失。 如果Consumer运行得太快,则由于Consumer两次读取相同的数据,因此可能会对数据进行双重处理。 缓冲区(在Java教程代码中称为Drop )必须确保两种情况都不会发生。 更不用说在消息put缓冲区并从缓冲区中take n的情况下,数据损坏的可能性不大(对于String引用,这很困难,但仍然值得关注)。

    最好由Brian Goetz的Java Concurrency in Practice或Doug Lea的较早的Java Concurrent Programming (请参阅参考资料 )来完成对该主题的完整讨论,但是在将Scala应用于该代码之前,有必要对这些代码的工作原理进行快速总结。

    当Java编译器看到synchronized关键字,它产生try / finally就位用的同步块的块monitorenter在块的顶部和一个操作码monitorexit在操作码finally块来确保显示器(用于Java基础无论代码如何退出,都将释放原子性)。 因此, Drop的put代码将被重写,如清单2所示:

    清单2.编译器有用后的Drop.put
    // This is pseudocode public void put(String message) { try { monitorenter(lock) //Wait until message has been retrieved. while (!empty) { try { lock.wait(); } catch (InterruptedException e) {} } //Toggle status. empty = false; //Store message. this.message = message; //Notify consumer that status has changed. lock.notifyAll(); } finally { monitorexit(lock) } }

    wait()方法告诉当前线程进入非活动状态,并等待另一个线程对该对象调用notifyAll() 。 刚通知的线程然后必须尝试再次获取监视器,在此之后它可以自由继续执行。 从本质上讲, wait()和notify() / notifyAll()充当一种简单的信号机制,允许Drop在Producer线程和Consumer线程之间进行协调,每个put take一个。

    本文随附的代码下载使用Java5并发增强功能( Lock and Condition接口和ReentrantLock锁实现)来提供清单2的基于超时的版本,但是基本的代码模式保持不变。 这就是问题所在:像清单2那样编写代码的开发人员必须过于专注于细节,即低级实现代码,使它们都能正常工作所需的线程和锁定。 而且,开发人员必须对代码中的每一行都进行推理,以查看是否需要对其进行保护,因为太多的同步就太少了。

    现在让我们看看Scala的替代方案。

    良好的旧Scala并发性(v1)

    在Scala中开始使用并发性的一种方法是,将Java代码直接直接转换为Scala,在某些地方利用Scala的语法来简化代码,至少要少一点:

    清单3. ProdConSample(Scala)
    object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } } class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } class Drop { var message : String = "" var empty : Boolean = true var lock : AnyRef = new Object() def put(x: String) : Unit = lock.synchronized { // Wait until message has been retrieved await (empty == true) // Toggle status empty = false // Store message message = x // Notify consumer that status has changed lock.notifyAll() } def take() : String = lock.synchronized { // Wait until message is available. await (empty == false) // Toggle status empty=true // Notify producer that staus has changed lock.notifyAll() // Return the message message } private def await(cond: => Boolean) = while (!cond) { lock.wait() } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop(); // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } }

    Producer和Consumer类几乎与它们的Java表亲相同,再次扩展(实现) Runnable接口并覆盖run()方法,在Producer的情况下,使用内置的迭代方法来遍历内容importantInfo数组。 (实际上,为了使其更像Scala, importantInfo应该可能是List而不是Array ,但是在第一遍中,我想使内容尽可能接近原始Java代码。)

    Drop类的外观也类似于Java版本,只是在Scala中,“ synchronized”不是关键字,它是在AnyRef类(Scala的“所有引用类型的根”)上定义的方法。 这意味着要在特定对象上进行同步,只需在该对象上调用syncize方法即可; 在这种情况下,将对象放在Drop的lock字段中。

    请注意,在await()方法的定义中,我们还在Drop类中使用了Scala-ism: cond参数是一个代码块,等待被评估,而不是在传递给该方法之前被评估。 在Scala中,这正式称为“按名称呼叫”; 在这里它作为捕捉必须(在一次重复两次的条件等待逻辑的有用方式put在,一旦take )的Java版本。

    最后,在main() ,创建Drop实例,实例化两个线程,使用start()它们踢开,然后从main()的结尾退出,并相信JVM将在您启动之前启动这两个线程用main()完成。 (在生产代码中,这可能不应该被认为是理所当然的,但是对于这样一个简单的示例,在99.99%的时间内,这样就可以了。

    但是,尽管如此,仍然存在相同的基本问题:程序员仍然必须过于担心信令和协调两个线程的问题。 尽管某些Scala主义可能会使语法更易于使用,但到目前为止,这并不是真正令人信服的胜利。

    Scala并发v2

    快速浏览《 Scala库参考》会发现一个有趣的包: scala.concurrency 。 该程序包包含许多不同的并发结构,包括我们将要使用的第一个MailBox类。

    顾名思义, MailBox本质上就是Drop本身,它是一个单槽缓冲区,用于保存一条数据直到被检索到为止。 但是, MailBox的一大优势在于,它在模式匹配和case类的组合之后完全封装了发送和接收的详细信息,从而使其比简单的Drop (或Drop的大型多插槽数据-抱着兄弟java.util.concurrent.BoundedBuffer )。

    清单4. ProdConSample,v2(Scala)
    package com.tedneward.scalaexamples.scala.V2 { import concurrent.{MailBox, ops} object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } } class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } } }

    v2和v1之间唯一的区别在于Drop的实现,该实现现在利用MailBox类来处理对传入和从Drop的消息的阻止和发信号。 (我们可能已经重写了Producer和Consumer以直接使用MailBox ,但是为了简单起见,我假设我们要在所有示例中保持Drop API的一致性。)使用MailBox与经典的BoundedBuffer ( Drop )有一点不同。我们一直在使用它,因此让我们详细遍历该代码。

    MailBox有两个基本操作: send和receive 。 receiveWithin方法仅是基于超时的receive版本。 MailBox接收的邮件可以是任何类型。 send()方法本质上将消息放入邮箱,如果消息所关注的类型属于未决接收者,则立即通知该消息,并将其附加到消息的链接列表中以供以后检索。 receive()方法将一直阻塞,直到receive()适合传递给它的功能块的消息为止。

    因此,在这种情况下,我们创建了两个案例类,一个不包含任何表示MailBox的空类( Empty )( Empty ),另一个包含其中包含消息数据的数据( Full )。

    put方法由于将数据放入Drop ,因此在MailBox上调用receive()寻找Empty实例,因此阻塞直到发送Empty 。 此时,它将Full实例发送到包含新数据的MailBox 。 take方法因为正在从Drop删除数据,所以在MailBox上调用receive()寻找Full实例,因此提取了消息(同样,由于模式匹配的能力,它可以从case类内部提取值并将其绑定到local变量),然后将Empty实例发送到MailBox 。

    无需显式锁定,也无需考虑监视器。

    Scala并发v3

    实际上,如果事实证明Producer和Consumer根本不必完全是成熟的类(在这里就是这种情况),我们可以大大缩短代码的长度-两者本质上都是Runnable.run()方法,Scala可以通过使用scala.concurrent.ops对象的spawn方法完全取消它,如清单5所示:

    清单5. ProdConSample,v3(Scala)
    package com.tedneward.scalaexamples.scala.V3 { import concurrent.MailBox import concurrent.ops._ object ProdConSample { class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer spawn { val importantInfo : Array[String] = Array( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "A kid will eat ivy too" ); importantInfo.foreach((msg) => drop.put(msg)) drop.put("DONE") } // Spawn Consumer spawn { var message = drop.take() while (message != "DONE") { System.out.format("MESSAGE RECEIVED: %s%n", message) message = drop.take() } } } } }

    spawn方法(通过包装块顶部的ops对象导入)采用一个代码块(另一个名为参数的示例),并将其包装在匿名构造的线程对象的run()方法内。 实际上,了解ops类内部的spawn的定义并不难:

    清单6. scala.concurrent.ops.spawn()
    def spawn(p: => Unit) = { val t = new Thread() { override def run() = p } t.start() }

    ...再次强调了副名称参数的功能。

    ops.spawn方法的一个缺点是,它是在2003年Java 5并发类生效之前编写的一个基本事实。 特别是,创建java.util.concurrent.Executor及其同类对象的目的是使开发人员更容易生成线程,而不必实际处理直接创建线程对象的细节。 幸运的是, spawn的定义很简单,可以在您自己的自定义库中重新创建,利用Executor (或ExecutorService或ScheduledExecutorService )进行线程的实际启动。

    实际上,Scala对并发的支持远远超出了MailBox和ops类。 Scala还支持类似的概念,称为“ Actors”,该概念使用与MailBox相似的消息传递方法,但程度更大且具有更大的灵活性。 但这是下一次。

    结论

    Scala为并发提供了两个级别的支持,就像对其他与Java有关的主题一样:

    首先,是对基础库的完全访问(例如java.util.concurrent),并且支持“传统” Java并发语义(例如monitors和wait() / notifyAll() )。 第二层是这些基本机制之上的抽象层,本文所讨论的MailBox类和我们将在本系列下一篇文章中讨论的Actors库就是示例。

    两种情况下的目标都是相同的:使开发人员更容易集中精力解决问题的实质,而不必考虑并发编程的底层细节(显然,第二种方法比第一种方法要好得多)至少对于那些对底层原始语言的思考不太投入的人)。

    但是,当前Scala库的一个明显缺陷是明显缺乏对Java 5的支持。 scala.concurrent.ops类应该具有像spawn这样的操作,它们可以利用新的Executor接口。 它也应该支持的版本synchronized ,使使用新的Lock接口。 幸运的是,这些都是库的改进,可以在Scala的生命周期中的任何时候完成,而不会破坏现有的代码。 它们甚至可以由Scala开发人员自己完成,而不必等待Scala的核心开发团队向他们提供(只需一点时间)。


    翻译自: https://www.ibm.com/developerworks/java/library/j-scala02049/index.html

    相关资源:微信小程序源码-合集6.rar
    Processed: 0.009, SQL: 9