Scala并发编程
scala支持Java的多线程模型, 也继承了多线程固有的资源竞争和死锁问题.
作为一种函数式编程语言, scala的actor消息模型提供了一种更便捷更安全的并发编程方案.
线程模型
scala的线程模型来自于Java. 首先我们要拓展一个Runable或Callable, 并重写run方法
trait Runnable { def run(): Unit }
Callable与Runable类似,但是有一个返回值:
trait Callable[V] { def call(): V }
Thread需要一个Runable实例作为参数来创建:
scala> val thread = new Thread(new Runnable { | def run() { | println("hello world") | } | }) thread: Thread = Thread[Thread-2,5,main] scala> thread.start() hello world
线程同步
synchronized
是JVM中最简单的使用互斥锁的方式:
class User { var name: String = ""; def setName(nameArg :String) { this.synchronized { this.name = nameArg; } } }
当线程开始执行obj.synchronized
块中的代码前, 它将尝试获得对象obj
的锁, 若获取失败则线程进入阻塞状态.
当某个线程获得了对象的锁后, 其它线程就无法访问或修改该对象. 当obj.synchronized
块中的代码执行完成时, 线程会解除锁, 另一个线程就可以加锁并访问对象了.
Future模型
scala提供了Promise-Future-Callback异步模型:
-
Future 表示一个还没有完成的任务的结果, Future对象可以在任务完成前访问
-
Promise 表示一个还没有执行的任务, 可以通过Promise标记任务的状态
-
Callback 回调用于在任务完成或其它情况下执行的操作
Future
import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global object FutureDemo extends App { val f = Future { println("working on future task") Thread.sleep(100) 1+1 } println("waiting for future task complete") val result = Await.result(f, 1 second) println(result) }
执行异步任务需要上下文, ExecutionContext.Implicits.global
是使用当前的全局上下文作为隐式上下文.
引入.duration._
允许我们使用1 second
, 200 milli
, 2 minute
这样的时间间隔字面值.
上述示例中Await.result
使用阻塞的方式等待Future任务完成, 若Future超时未完成则抛出TimeoutException
异常.
多次运行上述示例就会发现, 两条提示输出顺序是不确定的. 这是因为Future中的代码是在独立线程中执行的.
更好的方式是采用回调的方式来处理Future结果:
import scala.concurrent.{Future} import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Success} object FutureDemo2 extends App { val f = Future { 1 + 2 } f.onComplete{ case Success(value) => println(value) case Failure(e) => e.printStackTrace } }
或者定义onSuccess
和onFailure
两个回调.
import scala.concurrent.{Future} import scala.concurrent.ExecutionContext.Implicits.global object FutureDemo2 extends App { val f = Future { 1 + 2 } f.onSuccess { case value => println(value) } f.onFailure { case e => e.printStackTrace } }
Actor模型
Actor是一个基于消息机制的并发模型, 自Scala 2.11之后Akka Actor已成为Scala事实上的Actor标准.
akka不是scala的默认包, 这里我们使用SBT来管理外部包依赖. 关于sbt的使用可以参见作者的另一篇博文Scala构建工具SBT.
在build.sbt
中添加下列代码, 引入akka依赖.
scalaVersion := "2.12.1" resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.17"
更多关于引入akka的内容可以参见akka官网.
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class HelloActor extends Actor { def receive() = { case "hello" => println("Hi, I am an actor."); case _ => println("?"); } } object Main extends App { val system = ActorSystem("HelloSystem"); val helloActor = system.actorOf(Props[HelloActor], name = "helloactor"); helloActor ! "hello"; helloActor ! "bye"; system.shutdown(); }
自定义类继承Actor并重写receive方法处理不同类型的消息. 这里使用String类进行模式匹配, 使用case class进行模式匹配可以传递更多信息.
Actor需要ActorSystem的事件循环提供支持, 初始化一个ActorSystem后事件循环开始运行.最后必须执行system.shutdown();
否则scala程序会一直运行下去.
!
是用于发送消息的操作符, helloActor ! "hello";
将消息"hello"
发送给了helloActor.
receive
方法的返回值类型是PartialFunction[Any, Unit]
. 所有发送给Actor的消息都将被receive返回的偏函数处理.
偏函数的返回值类型为Unit, 也就是说处理消息时必须依赖副作用而不能有返回值; 偏函数的参数类型为Any, 也就是说所有消息在传入的时候都会发生类型丢失.
非类型化的消息便于设计消息转发, 负载均衡和代理Actor等机制, 且因为基于模式匹配的消息处理, 非类型化并不会产生问题.
基于事件循环的非阻塞机制已经被广为使用, 这里简单说明Actor与线程的问题.Actor并非与线程一一对应, 一个线程可以为多个Actor服务. ActorSystem会根据实际情况选择线程数.
原文地址:https://www.cnblogs.com/Finley/p/6422374.html
相关推荐
-
Spring Data JPA 必须掌握的 20+ 个查询关键字 Java基础
2019-3-15
-
Android WebView实现js与java交互 Java基础
2020-7-3
-
Dubbo一致性哈希负载均衡的源码和Bug,了解一下? Java基础
2020-6-13
-
Redis必知必会之持久化 Java基础
2020-6-13
-
Map集合、散列表、红黑树介绍 Java基础
2020-5-30
-
NioEventLoop启动流程源码解析 Java基础
2019-7-22
-
【并发那些事】线程有序化神器CompletionService Java基础
2020-6-13
-
一道老生常谈有意思的面试题思考 Java基础
2019-7-2
-
英特尔硬核芯片有了“嗅觉”,离复制人脑更进一步? Java基础
2020-7-2
-
关于Http协议,你必须要知道的 Java基础
2018-12-9