Spark For Data Analysis

Spark,一种快速数据分析替代方案

http://www.ibm.com/developerworks/cn/opensource/os-spark/index.html

M. Tim Jones, 顾问工程师, Independent author

简介: 虽然 Hadoop 在分布式数据分析方面备受关注,但是仍有一些替代产品提供了优于典型 Hadoop 平台的令人关注的优势。Spark 是一种可扩展的数据分析平台,它整合了内存计算的基元,因此,相对于 Hadoop 的集群存储方法,它在性能方面更具优势。Spark 是在 Scala 语言中实现的,并且利用了该语言,为数据处理提供了独一无二的环境。了解 Spark 的集群计算方法以及它与 Hadoop 的不同之处。

发布日期: 2012 年 1 月 04 日
级别: 中级
原创语言: 英文

Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。

Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。

尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoo 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。

Spark 集群计算架构

虽然 Spark 与 Hadoop 有相似之处,但它提供了具有有用差异的一个新的集群计算框架。首先,Spark 是为集群计算中的特定类型的工作负载而设计,即那些在并行操作之间重用工作数据集(比如机器学习算法)的工作负载。为了优化这些类型的工作负载,Spark 引进了内存集群计算的概念,可在内存集群计算中将数据集缓存在内存中,以缩短访问延迟。
Spark 还引进了名为 弹性分布式数据集 (RDD) 的抽象。RDD 是分布在一组节点中的只读对象集合。这些集合是弹性的,如果数据集一部分丢失,则可以对它们进行重建。重建部分数据集的过程依赖于容错机制,该机制可以维护 “血统”(即充许基于数据衍生过程重建部分数据集的信息)。RDD 被表示为一个 Scala 对象,并且可以从文件中创建它;一个并行化的切片(遍布于节点之间);另一个 RDD 的转换形式;并且最终会彻底改变现有 RDD 的持久性,比如请求缓存在内存中。

Spark 中的应用程序称为驱动程序,这些驱动程序可实现在单一节点上执行的操作或在一组节点上并行执行的操作。与 Hadoop 类似,Spark 支持单节点集群或多节点集群。对于多节点操作,Spark 依赖于 Mesos 集群管理器。Mesos 为分布式应用程序的资源共享和隔离提供了一个有效平台(参见 图 1)。该设置充许 Spark 与 Hadoop 共存于节点的一个共享池中。

figure1.gif
图 1. Spark 依赖于 Mesos 集群管理器实现资源共享和隔离。

Spark 编程模式

驱动程序可以在数据集上执行两种类型的操作:动作和转换。动作 会在数据集上执行一个计算,并向驱动程序返回一个值;而转换 会从现有数据集中创建一个新的数据集。动作的示例包括执行一个 Reduce 操作(使用函数)以及在数据集上进行迭代(在每个元素上运行一个函数,类似于 Map 操作)。转换示例包括 Map 操作和 Cache 操作(它请求新的数据集存储在内存中)。
我们随后就会看看这两个操作的示例,但是,让我们先来了解一下 Scala 语言。

Scala 简介

Scala 可能是 Internet 上不为人知的秘密之一。您可以在一些最繁忙的 Internet 网站(如 Twitter、LinkedIn 和 Foursquare,Foursquare 使用了名为 Lift 的 Web 应用程序框架)的制作过程中看到 Scala 的身影。还有证据表明,许多金融机构已开始关注 Scala 的性能(比如 EDF Trading 公司将 Scala 用于衍生产品定价)。

Scala 是一种多范式语言,它以一种流畅的、让人感到舒服的方法支持与命令式、函数式和面向对象的语言相关的语言特性。从面向对象的角度来看,Scala 中的每个值都是一个对象。同样,从函数观点来看,每个函数都是一个值。Scala 也是属于静态类型,它有一个既有表现力又很安全的类型系统。

此外,Scala 是一种虚拟机 (VM) 语言,并且可以通过 Scala 编译器生成的字节码,直接运行在使用 Java Runtime Environment V2 的 Java™ Virtual Machine (JVM) 上。该设置充许 Scala 运行在运行 JVM 的任何地方(要求一个额外的 Scala 运行时库)。它还充许 Scala 利用大量现存的 Java 库以及现有的 Java 代码。

最后,Scala 具有可扩展性。该语言(它实际上代表了可扩展语言)被定义为可直接集成到语言中的简单扩展。

Scala 的起源

Scala 语言由 Ecole Polytechnique Federale de Lausanne(瑞士洛桑市的两所瑞士联邦理工学院之一)开发。它是 Martin Odersky 在开发了名为 Funnel 的编程语言之后设计的,Funnel 集成了函数编程和 Petri net 中的创意。在 2011 年,Scala 设计团队从欧洲研究委员会 (European Research Council) 那里获得了 5 年的研究经费,然后他们成立新公司 Typesafe,从商业上支持 Scala,接收筹款开始相应的运作。

举例说明 Scala

让我们来看一些实际的 Scala 语言示例。Scala 提供自身的解释器,充许您以交互方式试用该语言。Scala 的有用处理已超出本文所涉及的范围,但是您可以在 参考资料 中找到更多相关信息的链接。

清单 1 通过 Scala 自身提供的解释器开始了快速了解 Scala 语言之旅。启用 Scala 后,系统会给出提示,通过该提示,您可以以交互方式评估表达式和程序。我们首先创建了两个变量,一个是不可变变量(即 vals,称作单赋值),另一个变量是可变变量 (vars)。注意,当您试图更改 b(您的 var)时,您可以成功地执行此操作,但是,当您试图更改 val 时,则会返回一个错误。

清单 1. Scala 中的简单变量

$ scala
Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> val a = 1
a: Int = 1

scala> var b = 2
b: Int = 2

scala> b = b + a
b: Int = 3

scala> a = 2
<console>6: error: reassignment to val
       a = 2
         ^

接下来,创建一个简单的方法来计算和返回 Int 的平方值。在 Scala 中定义一个方法得先从 def 开始,后跟方法名称和参数列表,然后,要将它设置为语句的数量(在本示例中为 1)。无需指定任何返回值,因为可以从方法本身推断出该值。注意,这类似于为变量赋值。在一个名为 3 的对象和一个名为 res0 的结果变量(Scala 解释器会自动为您创建该变量)上,我演示了这个过程。这些都显示在 清单 2 中。

清单 2. Scala 中的一个简单方法

scala> def square(x: Int) = x*x
square: (x: Int)Int

scala> square(3)
res0: Int = 9

scala> square(res0)
res1: Int = 81

接下来,让我们看一下 Scala 中的一个简单类的构建过程(参见 清单 3)。定义一个简单的 Dog 类来接收一个 String 参数(您的名称构造函数)。注意,这里的类直接采用了该参数(无需在类的正文中定义类参数)。还有一个定义该参数的方法,可在调用参数时发送一个字符串。您要创建一个新的类实例,然后调用您的方法。注意,解释器会插入一些竖线:它们不属于代码。

清单 3. Scala 中的一个简单的类

scala> class Dog( name: String ) {
     |   def bark() = println(name + " barked")
     | }
defined class Dog

scala> val stubby = new Dog("Stubby")
stubby: Dog = Dog@1dd5a3d

scala> stubby.bark
Stubby barked

scala>

完成上述操作后,只需输入 :quit 即可退出 Scala 解释器。
回页首
安装 Scala 和 Spark
第一步是下载和配置 Scala。清单 4 中显示的命令阐述了 Scala 安装的下载和准备工作。使用 Scala v2.8,因为这是经过证实的 Spark 所需的版本。

清单 4. 安装 Scala

$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.8.1.final.tgz
$ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/

要使 Scala 可视化,请将下列行添加至您的 .bashrc 中(如果您正使用 Bash 作为 shell):
export SCALA_HOME=/opt/scala-2.8.1.final
export PATH=$SCALA_HOME/bin:$PATH

接着可以对您的安装进行测试,如 清单 5 所示。这组命令会将更改加载至 bashrc 文件中,接着快速测试 Scala 解释器 shell。

清单 5. 配置和运行交互式 Scala

$ scala
Welcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> println("Scala is installed!")
Scala is installed!

scala> :quit
$

如清单中所示,现在应该看到一个 Scala 提示。您可以通过输入 :quit 执行退出。注意,Scala 要在 JVM 的上下文中执行操作,所以您会需要 JVM。我使用的是 Ubuntu,它在默认情况下会提供 OpenJDK。
接下来,请获取最新的 Spark 框架副本。为此,请使用 清单 6 中的脚本。

清单 6. 下载和安装 Spark 框架

wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/
mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
$ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz

接下来,使用下列行将 spark 配置设置在 Scala 的根目录 ./conf/spar-env.sh 中:
export SCALA_HOME=/opt/scala-2.8.1.final

设置的最后一步是使用简单的构建工具 (sbt) 更新您的分布。sbt 是一款针对 Scala 的构建工具,用于 Spark 分布中。您可以在 mesos-spark-c86af80 子目录中执行更新和变异步骤,如下所示:
$ sbt/sbt update compile

注意,在执行此步骤时,需要连接至 Internet。当完成此操作后,请执行 Spark 快速检测,如 清单 7 所示。在该测试中,需要运行 SparkPi 示例,它会计算 pi 的估值(通过单位平方中的任意点采样)。所显示的格式需要样例程序 (spark.examples.SparkPi) 和主机参数,该参数定义了 Mesos 主机(在此例中,是您的本地主机,因为它是一个单节点集群)和要使用的线程数量。注意,在 清单 7 中,执行了两个任务,而且这两个任务被序列化(任务 0 开始和结束之后,任务 1 再开始)。

清单 7. 对 Spark 执行快速检测

$ ./run spark.examples.SparkPi local[1]
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.SparkContext: Starting job...
11/08/26 19:52:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 19:52:33 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 19:52:33 INFO spark.SparkContext: Job finished in 0.145892763 s
Pi is roughly 3.14952
$

通过增加线程数量,您不仅可以增加线程执行的并行化,还可以用更少的时间执行作业(如 清单 8 所示)。

清单 8. 对包含两个线程的 Spark 执行另一个快速检测

$ ./run spark.examples.SparkPi local[2]
11/08/26 20:04:30 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.SparkContext: Starting job...
11/08/26 20:04:30 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 20:04:30 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 20:04:30 INFO spark.SparkContext: Job finished in 0.101287331 s
Pi is roughly 3.14052
$

使用 Scala 构建一个简单的 Spark 应用程序
要构建 Spark 应用程序,您需要单一 Java 归档 (JAR) 文件形式的 Spark 及其依赖关系。使用 sbt 在 Spark 的顶级目录中创建该 JAR 文件,如下所示:

$ sbt/sbt assembly

结果产生一个文件 ./core/target/scala_2.8.1/"Spark Core-assembly-0.3.jar"。将该文件添加至您的 CLASSPATH 中,以便可以访问它。在本示例中,不会用到此 JAR 文件,因为您将会使用 Scala 解释器运行它,而不是对其进行编译。

在本示例中,使用了标准的 MapReduce 转换(如 清单 9 所示)。该示例从执行必要的 Spark 类导入开始。接着,需要定义您的类 (SparkTest) 及其主方法,用它解析稍后使用的参数。这些参数定义了执行 Spark 的环境(在本例中,该环境是一个单节点集群)。接下来,要创建 SparkContext 对象,它会告知 Spark 如何对您的集群进行访问。该对象需要两个参数:Mesos 主机名称(已传入)以及您分配给作业的名称 (SparkTest)。解析命令行中的切片数量,它会告知 Spark 用于作业的线程数量。要设置的最后一项是指定用于 MapReduce 操作的文本文件。

最后,您将了解 Spark 示例的实质,它是由一组转换组成。使用您的文件时,可调用 flatMap 方法返回一个 RDD(通过指定的函数将文本行分解为标记)。然后通过 map 方法(该方法创建了键值对)传递此 RDD ,最终通过 ReduceByKey 方法合并键值对。合并操作是通过将键值对传递给 _ + _ 匿名函数来完成的。该函数只采用两个参数(密钥和值),并返回将两者合并所产生的结果(一个 String 和一个 Int)。接着以文本文件的形式发送该值(到输出目录)。

清单 9. Scala/Spark 中的 MapReduce (SparkTest.scala)

import spark.SparkContext
import SparkContext._

object SparkTest {

  def main( args: Array[String]) {

    if (args.length == 0) {
      System.err.println("Usage: SparkTest <host> [<slices>]")
      System.exit(1)
    }

    val spark = new SparkContext(args(0), "SparkTest")
    val slices = if (args.length > 1) args(1).toInt else 2

    val myFile = spark.textFile("test.txt")
    val counts = myFile.flatMap(line => line.split(" "))
                        .map(word => (word, 1))
                        .reduceByKey(_ + _)

    counts.saveAsTextFile("out.txt")

  }

}

SparkTest.main(args)

要执行您的脚本,只需要执行以下命令:

$ scala SparkTest.scala local[1]

您可以在输出目录中找到 MapReduce 测试文件(如 output/part-00000)。

其他的大数据分析框架

自从开发了 Hadoop 后,市场上推出了许多值得关注的其他大数据分析平台。这些平台范围广阔,从简单的基于脚本的产品到与 Hadoop 类似的生产环境。

名为 bashreduce 的平台是这些平台中最简单的平台之一,顾名思义,它充许您在 Bash 环境中的多个机器上执行 MapReduce 类型的操作。bashreduce 依赖于您计划使用的机器集群的 Secure Shell(无密码),并以脚本的形式存在,通过它,您可以使用 UNIX®-style 工具(sort、awk、netcat 等)请求作业。

GraphLab 是另一个受人关注的 MapReduce 抽象实现,它侧重于机器学习算法的并行实现。在 GraphLab 中,Map 阶段会定义一些可单独(在独立主机上)执行的计算指令,而 Reduce 阶段会对结果进行合并。

最后,大数据场景的一个新成员是来自 Twitter 的 Storm(通过收购 BackType 获得)。Storm 被定义为 “实时处理的 Hadoop”,它主要侧重于流处理和持续计算(流处理可以得出计算的结果)。Storm 是用 Clojure 语言(Lisp 语言的一种方言)编写的,但它支持用任何语言(比如 Ruby 和 Python)编写的应用程序。Twitter 于 2011 年 9 月以开源形式发布 Storm。

请参阅 参考资料 获得有关的更多信息。

结束语

Spark 是不断壮大的大数据分析解决方案家族中备受关注的新增成员。它不仅为分布数据集的处理提供一个有效框架,而且以高效的方式(通过简洁的 Scala 脚本)处理分布数据集。Spark 和 Scala 都处在积极发展阶段。不过,由于关键 Internet 属性中采用了它们,两者似乎都已从受人关注的开源软件过渡成为基础 Web 技术。

参考资料

学习
EDF Trading: Implementing a domain-specific language for derivative pricing with Scala:许多行业都采用了 Scala,其中包括股票交易。通过观看此视频了解有关的一个示例。

应用程序虚拟化的过去与未来(M. Tim Jones,developerWorks,2011 年 5 月)介绍了 VM 语言及其实现。

Ceylon:真正的进步抑或只是另一种语言?(M. Tim Jones,developerWorks,2011 年 7 月)探讨另一种受人关注的(半成品)依赖于 JVM 的 VM 语言。

First Steps to Scala 介绍了 Scala 语言(Scala 设计者 Martin Odersky 编写了其中的一部分)。这是写于 2007 年一篇篇幅较长的简介,介绍了该语言的许多方面。另一个有用的示例是 Code Examples for Programming in Scala,它为各种代码模式提供了 Scala 方案。

使用 Linux 和 Hadoop 进行分布式计算(Ken Mann 和 M. Tim Jones,developerWorks,2008 年 12 月)提供关于 Hadoop 架构的介绍,其中包括用于大块数据分布式处理的 MapReduce 范式的基础知识。

用 Hadoop 进行分布式数据处理(M. Tim Jones,developerWorks,2010 年):查找关于 Hadoop 的实务介绍,其中包括如何设置和使用单节点 Hadoop 集群,如何设置和使用多节点 Hadoop 集群,以及如何在 Hadoop 环境中开发映射和减少应用程序。

Twitter 上的 developerWorks:关注我们最新的新闻。您也可以在 Twitter 上关注该作者 M. Tim Jones。

developerWorks 演示中心:观看免费的演示,并了解 IBM 及开源技术和产品功能。

随时关注 developerWorks 技术活动和网络广播。

访问 developerWorks Open source 专区获得丰富的 how-to 信息、工具和项目更新以及最受欢迎的文章和教程,帮助您用开放源码技术进行开发,并将它们与 IBM 产品结合使用。

获得产品和技术

Spark 介绍了用 Scala 语言编写并受其支持的内存数据分析解决方案。

The simple build tool 是 Scala 语言采用的构建解决方案。它为小型项目提供了一个简单方法,还为复杂构建提供了一些高级功能。

Lift 是 Scala 的 Web 应用程序框架,类似于 Ruby 的 Rails 框架。您可以在 Twitter 和 Foursquare 找到活动中的 Lift。

Mesos Project:Spark 并不对工作负载的分配提供本地支持,而是依赖于集群管理器,为分布式应用程序提供跨网络的资源隔离和共享。

bashreduce(基于 Bash 脚本的实现)、GraphLab(侧重于机器学习)和 Storm(Twitter 收购 BackType 之后获得的一种用 Clojure 编写的实时分布流处理系统):Hadoop 催生了许多大数字分析平台。除了 Spark 之外,您可以使用这些产品实现并行计算架构。

IBM 产品评估试用版软件:下载产品试用版,在线试用产品,在云环境下试用产品,或者在 IBM SOA 人员沙箱 中花一些时间了解如何有效实现面向服务架构。我们提供了多个信息管理产品的试用版。既然您对数据分析感兴趣,那么您可能也想查看 试用版:SPSS Text Analytics for Surveys、IBM 软件试用版:IBM Cognos 8 Business Intelligence 和 IBM 软件下载:IBM Cognos Express。

讨论

developerWorks 中文社区:查看开发人员推动的博客、论坛、小组和 wiki,并与其他 developerWorks 用户交流。帮助构建 developerWorks 社区中的 Real world open source 小组。

加入 developerWorks 中文社区,developerWorks 社区是一个面向全球 IT 专业人员,可以提供博客、书签、wiki、群组、联系、共享和协作等社区功能的专业社交网络社区。

加入 IBM 软件下载与技术交流群组,参与在线交流。