Scala并发

2017/11/19 Scala

Scala并发

多核问题 (multicore problem)不断得到人们的关注。通过增加 CPU 的核数和服务器数量,并使用水平扩展取代垂直扩展,我们能够继续对性能进行扩展。
水平扩展要求开发人员编写并发软件,这为开发人员带来了挑战。并发并不容易处理,这类应用需要对共享可变状态的访问进行协调,这意味需 要处理使用锁、互斥量及信号量这样的工具的多线程编程。如果未能正确地协调这些访问,便会产生不可预见的行为。其他线程突然对你所使用的一些变量进行了修改,这也意味着代码中存在竞态条件和锁竞争。
当人们意识到利用不可变性 (immutability)和纯函数化能够解决这些问题时,函数式编程开始变得主流起来。我们也能看到 actor 模型这样的古老 并发方法重新变得充满活力。

scala.sys.process 包

某些场景下,我们可以使用小的、同步的进程通过数据库事务、消息队列或进程间的数据转移来完成同步状态。scala.sys.process 包(http://www.scala-lang.org/api/current/scala/sys/process/package.html )提供了一套 DSL 方言,可用于运行或管理操作系统进 程,同时也能对进程 I/O 进行处理。
如:

scala> import scala.sys.process._
scala> import scala.language.postfixOps
scala> import java.net.URL
scala> import java.io.File
// 执行命令,并写入标准输出。
scala> "ls -l src".!
total 0
drwxr-xr-x 4 deanwampler taff 136 Dec 19 2013 main drwxr-xr-x 4 deanwampler taff 136 Dec 19 2013 test res33: Int = 0
// 将命令相关标记传入Seq对象,执行命令后将返回一个记录了命令输出信息的字符串。 scala> Seq("ls", "-l", "src").!!
res34: String =
"total 0
drwxr-xr-x 4 deanwampler staff 136 Dec 19 2013 main drwxr-xr-x 4 deanwampler staff 136 Dec 19 2013 test "
// 创建一个进程,用于访问URL资源,该进程的输出将重新定向到"grep $filter"命令的输入中, 与此同时,grep操作的输出将会以追加的方式(非覆写)输入到文件中。
def findURL(url: String, filter: String) =
new URL(url) #> s"grep $filter" #>> new File(s"$filter.txt")
// 对输出文件执行ls -l命令。假如该文件存在,则计算文件行数。
def countLines(fileName: String) = s"ls -l $fileName" #&& s"wc -l $fileName"
scala> findURL("http://scala-lang.org", "scala") ! res0: Int = 0
scala> countLines("scala.txt") !
-rw-r--r-- 1 deanwampler staff 4111 Jul 31 22:35 scala.txt 43 scala.txt res1: Int = 0
scala> findURL("http://scala-lang.org", "scala") ! res2: Int = 0
scala> countLines("scala.txt") !
-rw-r--r-- 1 deanwampler staff 8222 Jul 31 22:35 scala.txt 86 scala.txt res3: Int = 0

Future类型

这个 API 更强调合理直观地构建代码块。 假设你希望以异步的方式运行几项工作,这些工作就不会阻塞彼此运行。比如说,这些工作也许会执行一些 I/O 操作。那么针对这类情况,使用scala.concurrent.Future (http://www.scala-lang.org/api/current/scala/concurrent/Future.html )类便是最简单的方案。
一旦完成了 Future 对象的构建工作,控制权便会立刻返还给调用者,但结果值却无法保证立刻可用。Future 实例是一个句柄,它指向最终可用的结果值。无论操作成功与否,在 future 操作执行完毕之前,你可以继续执行其他工作。Scala 提供了多种方法用于处理 future 操作的执行。
java.util.concurrent.ForkJoinPool (http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166-4jdk7docs/java/util/concurrent/ForkJoinPool.html )对象对线 程池进行管理,而 ForkJoinPool 对象也会执行那些封装在 Future 对象中的任务。作为 Scala 的用户,我们并不需要关心 Scala 会如何运行我 们的同步代码,除非是那些性能调优的特殊场景。(ForkJoinPool 是 JDK7 类库的一部分,而由于目前 Scala 还支持 JDK6,因此 Scala 将 Doug Lea 所实现的 ForkJoinPool 移植到了类库中,该实现之后也被 JDK7 录入。)
例如:

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
val futures = (0 to 9) map { 
    i => Future {
        val s = i.toString 
        print(s)
        s
    }
}

val f = Future.reduce(futures)((s1, s2) => s1 + s2)

val n = Await.result(f, Duration.Inf)

需要强调至关重要的一点,执行 Future 体中的 print 语句时,print 语句的输出是无序的。例如:两次执行脚本时分别输出了 0214679538 和 0123467985 。不过,由于 fold 方法会依照 Future 对象构造的顺序遍历这些对象,因此 fold 方法生成的字符串总是会严 格按数值次序排列,即 0123456789 。
Future.fold 方法以及其他相似的方法(http://www.scala-lang.org/api/current/#scala.concurrent.Future$ )本身也是异步执行的,这些方法将返回一个 新的 Future 对象。在我们的示例中,只有调用 Await.result 方法时,程序才会阻塞。
通常,我们并不希望等待执行结果时阻塞程序。我们只希望当 Future 执行结束后,系统会执行少量的代码。而注册回调方法能帮助我们实现这 一功能。举个例子,简单的网络服务器会创建 Future 对象用于对请求进行处理,同时利用回调将执行的结果返还给调用者。下面的示例对相关 的逻辑进行了解释:

import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global

case class ThatsOdd(i: Int) extends RuntimeException(s"odd $i received!")

import scala.util.{Try, Success, Failure}

val doComplete: PartialFunction[Try[String],Unit] = {
    case s @ Success(_) => println(s)
    case f @ Failure(_) => println(f)
}

val futures = (0 to 9) map {
    case i if i % 2 == 0 => Future.successful(i.toString)
    case i => Future.failed(ThatsOdd(i))
}
futures map (_ onComplete doComplete)

Async类

使用 Future 类时,我们需要大量使用回调,但这使得代码很快变得复杂起来。因此处理一组有关联的任务时,我们可以把一些 Future 对象组 合起来以减少回调数量。Scala 新设计的 scala.async.Async 模块可以使用户更容易地构建这类计算。 Async 模块中提供了两个基本方法,可用于同步代码块中:

def async[T](body: => T): Future[T] //启动异步计算
def await[T](future: Future[T]): T  //等待Future计算完毕

Akka

Akka 模型实现的 actor 是轻量级的,每个 actor 大概只有 300 字节大小。因此,你可以在一个大型的 JVM 实例中轻松地创建百万个 actor 对象。对 开发人员而言,如何追踪这些具有自主意识的 actor 对象是一个挑战。不过假如大多数的 actor 都是无状态的工作单元,我们就可以管理这些 actor。此外,为了满足非常高的可扩展性和可用性,你还可以使用 Akka 搭建包含上千节点的集群。
这里会给出一个Actor模型的例子,但是由于代码规模的关系,会用新的一篇博文来介绍。
actor 模型是处理大规模、高度可用、事件驱动应用程序的更为通用的一种方法。在进一步说明这一模型之前,我们首先讨论下跨进程分配代码所面临的两个难题。我们也将一同给出这两个难题的解决方案:Pickling 库和 Spores 库。

Scala Pickling 库(https://github.com/scala/pickling )希望能够为用户提供一个序列化的方案,从而最大程度地对源代码进行简化,并提供可插拔的架 构以支持不同的后台序列化格式。 之前,我们在谈论 Akka 的 Props 类型时曾讲过一个相关的问题:假如我们要将某一闭包(即函数字面量)分发到本进程之外的其他进程中,其 他进程会捕获到什么呢? Spores 项目希望能够解决这个问题:Spores 会向用户提供一个 API,开发者可以通过该 API 构造一个“孢子”(即安全的 闭包),其中 API 负责确保“孢子”传递的准确性。如果你希望了解 Spores 项目的更多信息或例子,请参考 Scaladoc(http://docs.scala- lang.org/sips/pending/spores.html )。

反应式编程

很长时间以来,人们都认为必须使用事件驱动来处理大规模应用,这也意味着作为服务方,这些应用必须对请求做出响应;当需要获得其他服务 的“帮助”时,这些应用又需要向服务提供方发送事件(或消息)。因特网就建立在这样的认知之下。由于这类系统本身是响应式的,不需要根据 某些内部逻辑来执行工作,因此这类系统也被称为反应式系统。 反应式编程原理推出之后,涌现了一批模型,这些模型通过不同的方式实现了这一原理。除了 actor 模型之外,还有两个流行的模型。actor 模型 认为只要能将可变状态局限在某一 actor 内,可变状态便是合理的;而这两个类型则不同,它们都比 actor 模型更为纯粹:

新的模型如下:

  1. 函数式反应式编程 (functional reactive programming,FRP)
  2. 反应式扩展 (reactive extensions,Rx) 有组织起草了 Reactive 宣言(http://www.reactivemanifesto.org/ )用于提供明确的“反应式”系统定义。目前已经为反应式系统定义了四个特 征。所有可伸缩、可恢复的反应式程序都应遵循这些特征。
  3. 消息传递或事件传递:反应式系统必须能对消息或事件(这些术语的定义)进行响应,这也是最基本的要求。
  4. 可灵活伸缩:为了能够满足处理要求,反应式系统是可伸缩的系统。这就意味着该系统能够通过水平扩展的方式进行扩容、调整进程数、处理核数、处理 节点数。
  5. 可恢复:构造反应式系统时,必须不 断的进行改造,以便能够在出现错误时优雅地恢复系统。
  6. 响应式:响应式系统需要能够随时对服务请求进行响应,即使系统中出现了错误的组件或是经历了非常高的流量峰值,响应式也需要通过优雅降级(graceful degradation)的方式继续响应用户的请求。
Show Disqus Comments

Search

    Table of Contents