阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Spark官方文档 – 中文翻译

141次阅读
没有评论

共计 24329 个字符,预计需要花费 61 分钟才能阅读完成。

Spark 官方文档 – 中文翻译

  • 1 概述(Overview)
  • 2 引入 Spark(Linking with Spark)
  • 3 初始化 Spark(Initializing Spark)
    • 3.1 使用 Spark Shell(Using the Shell)
  • 4 弹性分布式数据集(RDDs)
    • 4.1 并行集合(Parallelized Collections)
    • 4.2 外部数据库(External Datasets)
    • 4.3 RDD 操作(RDD Operations)
      • 4.3.1 基础(Basics)
      • 4.3.2 把函数传递到 Spark(Passing Functions to Spark)
      • 4.3.3 理解闭包(Understanding closures)
        • 4.3.3.1 示例(Example)
        • 4.3.3.2 本地模式 VS 集群模式(Local vs. cluster modes)
        • 4.3.3.3 打印 RDD 的元素(Printing elements of an RDD)
      • 4.3.4 操作键值对(Working with Key­Value Pairs)
      • 4.3.5 Transformations
      • 4.3.6 Actions
      • 4.3.7 Shuffle 操作(Shuffle operations)
        • 4.3.7.1 背景(Background)
        • 4.3.7.2 性能影响(Performance Impact)
    • 4.4 RDD 持久化(RDD Persistence)
      • 4.4.1 如何选择存储级别(Which Storage Level to Choose?)
      • 4.4.2 移除数据(Removing Data)
  • 5 共享变量(Shared Variables)
    • 5.1 广播变量(broadcast variables)
    • 5.2 累加器(Accumulators)
  • 6 将应用提交到集群(Deploying to a Cluster)
  • 7 Java/Scala 中启动 Spark 作业(Launching Spark jobs from Java / Scala)
  • 8 单元测试(Unit Testing)
  • 9 从 Spark1.0 之前的版本迁移(Migrating from pre­1.0 Versions of Spark)
  • 10 下一步(Where to Go from Here) 

1 概述(Overview)

总体来讲,每一个 Spark 驱动程序应用都由一个驱动程序组成,该驱动程序包含一个由用户编写的 main 方法,该方法会在集群上并行执行一些列并行计算操作。Spark 最重要的一个概念是 弹性分布式数据集,简称 RDD(resilient distributed dataset)。RDD 是一个数据容器,它将分布在集群上各个节点上的数据抽象为一个数据集,并且 RDD 能够进行一系列的并行计算操作。可以将 RDD 理解为一个分布式的 List,该 List 的数据为分布在各个节点上的数据。RDD 通过读取 Hadoop 文件系统中的一个文件进行创建,也可以由一个 RDD 经过转换得到。用户也可以将 RDD 缓存至内存,从而高效的处理 RDD,提高计算效率。另外,RDD 有良好的容错机制。

Spark 另外一个重要的概念是共享变量(shared variables)。在并行计算时,可以方便的使用共享变量。在默认情况下,执行 Spark 任务时会在多个节点上并行执行多个 task,Spark 将每个变量的副本分发给各个 task。在一些场景下,需要一个能够在各个 task 间共享的变量。Spark 支持两种类型的共享变量:

  • 广播变量(broadcast variables):将一个只读变量缓存到集群的每个节点上。例如,将一份数据的只读缓存分发到每个节点。

  • 累加变量(accumulators):只允许 add 操作,用于计数、求和。

 

2 引入 Spark(Linking with Spark)

在 Spark 1.6.0 上编写应用程序,支持使用 Scala 2.10.X、Java 7+、Python 2.6+、R 3.1+。如果使用 Java 8,支持 lambda 表达式(lambda expressions)。
在编写 Spark 应用时,需要在 Maven 依赖中添加 Spark,Spark 的 Maven Central 为:

groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.0

另外,如果 Spark 应用中需要访问 HDFS 集群,则需要在 hadoop-client 中添加对应版本的 HDFS 依赖:

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,需要在程序中添加 Spark 类。代码如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在 Spark 1.3.0 之前的版本,使用 Scala 语言编写 Spark 应用程序时,需要添加 import org.apache.spark.SparkContext._ 来启用必要的隐式转换)

 

3 初始化 Spark(Initializing Spark)

使用 Scala 编写 Spark 程序的需要做的第一件事就是创建一个 SparkContext 对象(使用 Java 语言时创建 JavaSparkContext)。SparkContext 对象指定了 Spark 应用访问集群的方式。创建 SparkContext 需要先创建一个 SparkConf 对象,SparkConf 对象包含了 Spark 应用的一些列信息。代码如下:

  • Scala
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
  • java
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName 参数为应用程序在集群的 UI 上显示的名字。master 为 Spark、Mesos、YARN URL 或 local。使用 local 值时,表示在本地模式下运行程序。应用程序的执行模型也可以在使用 spark-submit 命令提交任务时进行指定。

 

3.1 使用 Spark Shell(Using the Shell)

在 Spark Shell 下,一个特殊的 SparkContext 对象已经帮用户创建好,变量为 sc。使用参数 --master 设置 master 参数值,使用参数 --jars 设置依赖包,多个 jar 包使用逗号分隔。可以使用 --packages 参数指定 Maven 坐标来添加依赖包,多个坐标使用逗号分隔。可以使用参数 --repositories 添加外部的 repository。示例如下:

  • 本地模式下,使用 4 个核运行 Spark 程序:
$ ./bin/spark-shell --master local[4]
  • 将 code.jar 包添加到 classpath:
$ ./bin/spark-shell --master local[4] --jars code.jar
  • 使用 Maven 坐标添加一个依赖:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

详细的 Spark Shell 参数描述请执行命令spark-shell --help。更多的 spark-submit 脚本请见 spark-submit script。

 

4 弹性分布式数据集(RDDs)

Spark 最重要的一个概念就是 RDD,RDD 是一个有容错机制的元素容器,它可以进行并行运算操作。得到 RDD 的方式有两个:

  • 通过并行化驱动程序中已有的一个集合而获得
  • 通过外部存储系统(例如共享的文件系统、HDFS、HBase 等)的数据集进行创建

 

4.1 并行集合(Parallelized Collections)

在驱动程序中,在一个已经存在的集合上(例如一个 Scala 的 Seq)调用 SparkContext 的 parallelize 方法可以创建一个并行集合。集合里的元素将被复制到一个可被并行操作的分布式数据集中。下面为并行化一个保存数字 1 到 5 的集合示例:

  • Scala
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
  • Java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

当分布式数据集创建之后,就可以进行并行操作。例如,可以调用方法 distData.reduce((a,b) => a + b) 求数组内元素的和。Spark 支持的分布式数据集上的操作将在后面章节中详细描述。

并行集合的一个重要的参数是表示将数据划分为几个分区(partition)的分区数。Spark 将在集群上每个数据分区上启动一个 task。通常情况下,你可以在集群上为每个 CPU 设置 2 - 4 个分区。一般情况下,Spark 基于集群自动设置分区数目。也可以手动进行设置,设置该参数需要将参数值作为第二参数传给 parallelize 方法,例如:sc.parallelize(data, 10)。注意:在代码中,部分位置使用术语 slices(而不是 partition),这么做的原因是为了保持版本的向后兼容性。

 

4.2 外部数据库(External Datasets)

Spark 可以通过 Hadoop 支持的外部数据源创建分布式数据集,Hadoop 支持的数据源有本地文件系统、HDFS、Cassandra、HBase、Amazon S3、Spark���持的文本文件、SequenceFiles、Hadoop InputFormat。

SparkContext 的 testFile 方法可以创建文本文件 RDD。使用这个方法需要传递文本文件的 URI,URI 可以为本机文件路径、hdfs://、s3n:// 等。该方法读取文本文件的每一行至容器中。示例如下:

  • Scala
scala> val distFile = sc.textFile("data.txt")
distFile: RDD[String] = MappedRDD@1d4cee08
  • Java
JavaRDD<String> distFile = sc.textFile("data.txt");

创建之后,distFile 就可以进行数据集的通用操作。例如,使用 map 和 reduce 操作计算所有行的长度的总和:distFile.map(s => s.length).reduce((a, b) => a + b)
使用 Spark 读取文件需要注意一下几点:

  • 程序中如果使用到本地文件路径,在其它 worker 节点上该文件必须在同一目录,并有访问权限。在这种情况下,可以将文件复制到所有的 worker 节点,也可以使用网络内的共享文件系统。
  • Spark 所有的基于文件输入的方法(包括textFile),都支持文件夹、压缩文件、通配符。例如:textFile("/my/directory")textFile("/my/directory/*.txt")textFile("/my/directory/*.gz")
  • textFile 方法提供了一个可选的第二参数,用于控制文件的分区数。默认情况下,Spark 为文件的每个块创建一个分区(块使用 HDFS 的默认值 64MB),通过设置这个第二参数可以修改这个默认值。需要注意的是,分区数不能小于块数。

除了文本文件之外,Spark 还支持其它的数据格式:

  • SparkContext.wholeTextFiles能够读取指定目录下的许多小文本文件,返回(filename,content)对。而 textFile 只能读取一个文本文件,返回该文本文件的每一行。
  • 对于 SequenceFiles 可以使用 SparkContext 的 sequenceFile[K,V] 方法,其中 K 是文件中 key 和 value 的类型。它们必须为像 IntWritable 和 Text 那样,是 Hadoop 的 Writable 接口的子类。另外,对于通用的 Writable,Spark 允许用户指定原生类型。例如,sequenceFile[Int,String]将自动读取 IntWritable 和 Text。
  • 对于其他 Hadoop InputFormat,可以使用 SparkContext.hadoopRDD 方法,该方法接收任意类型的 JobConf 和输入格式类、键类型和值类型。可以像设置 Hadoop job 那样设置输入源。对于 InputFormat 还可以使用基于新版本 MapReduce API(org.apache.hadoop.mapreduce)的SparkContext.newAPIHadoopRDD。(老版本接口为:SparkContext.newHadoopRDD
  • RDD.saveAsObjectFileSparkContext.objectFile 能够保存包含简单的序列化 Java 对象的 RDD。但是这个方法不如 Avro 高效,Avro 能够方便的保存任何 RDD。

 

4.3 RDD 操作(RDD Operations)

RDD 支持两种类型的操作:

  • transformation:从一个 RDD 转换为一个新的 RDD。
  • action:基于一个数据集进行运算,并返回 RDD。

例如,map 是一个 transformation 操作,map 将数据集的每一个元素按指定的函数转换为一个 RDD 返回。reduce 是一个 action 操作,reduce 将 RDD 的所有元素按指定的函数进行聚合并返回结果给驱动程序(还有一个并行的 reduceByKey 能够返回一个分布式的数据集)。

Spark 的所有 transformation 操作都是懒执行,它们并不立马执行,而是先记录对数据集的一系列 transformation 操作。在执行一个需要执行一个 action 操作时,会执行该数据集上所有的 transformation 操作,然后返回结果。这种设计让 Spark 的运算更加高效,例如,对一个数据集 map 操作之后使用 reduce 只返回结果,而不返回庞大的 map 运算的结果集。

默认情况下,每个转换的 RDD 在执行 action 操作时都会重新计算。即使两个 action 操作会使用同一个转换的 RDD,该 RDD 也会重新计算。在这种情况下,可以使用 persist 方法或 cache 方法将 RDD 缓存到内存,这样在下次使用这个 RDD 时将会提高计算效率。在这里,也支持将 RDD 持久化到磁盘,或在多个节点上复制。

 

4.3.1 基础(Basics)

参考下面的程序,了解 RDD 的基本轮廓:

  • Scala
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
  • Java
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

第一行通过读取一个文件创建了一个基本的 RDD。这个数据集没有加载到内存,也没有进行其他的操作,变量 lines 仅仅是一个指向文件的指针。第二行为 transformation 操作 map 的结果。此时 lineLengths 也没有进行运算,因为 map 操作为懒执行。最后,执行 action 操作 reduce。此时 Spark 将运算分隔成多个任务分发给多个机器,每个机器执行各自部分的 map 并进行本地 reduce,最后返回运行结果给驱动程序。

如果在后面的运算中仍会用到 lineLengths,可以将其缓存,在 reduce 操作之前添加如下代码,该 persist 操作将在 lineLengths 第一次被计算得到后将其缓存到内存:

  • Scala
lineLengths.persist()
  • Java
lineLengths.persist(StorageLevel.MEMORY_ONLY());

 

4.3.2 把函数传递到 Spark(Passing Functions to Spark)

  • Scala
    Spark 的 API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。这有两种推荐的实现方式:
    • 使用匿名函数的语法,这可以让代码更加简洁。
    • 使用全局单例对象的静态方法。比如,你可以定义函数对象 objectMyFunctions,然后将该对象的 MyFunction.func1 方法传递给 Spark, 如下所示:
object MyFunctions {deffunc1(s: String): String = {...}
}

myRdd.map(MyFunctions.func1)

注意:由于可能传递的是一个类实例方法的引用(而不是一个单例对象),在传递方法的时候,应该同时传递包含该方法的类对象。举个例子:

class MyClass {deffunc1(s: String): String = {...}
  defdoStuff(rdd: RDD[String]): RDD[String] = {rdd.map(func1) }
}

上面示例中,如果我们创建了一个类实例 new MyClass,并且调用了实例的 doStuff 方法,该方法中的 map 操作调用了这个 MyClass 实例的 func1 方法,所以需要将整个对象传递到集群中。类似于写成:rdd.map(x=>this.func1(x))。

类似地,访问外部对象的字段时将引用整个对象:

class MyClass {val field = "Hello"
  defdoStuff(rdd: RDD[String]): RDD[String] = {rdd.map(x => field + x) }
}

等同于写成 rdd.map(x=>this.field+x), 引用了整个 this。为了避免这种问题,最简单的方式是把 field 拷贝到本地变量,而不是去外部访问它:

defdoStuff(rdd: RDD[String]): RDD[String] = {val field_ = this.field
  rdd.map(x => field_ + x)
}
  • Java
    Spark 的 API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行。在 Java 中,函数由那些实现了 org.apache.spark.api.java.function 包中的接口的类表示。有两种创建这样的函数的方式:
    • 在你自己的类中实现 Function 接口,可以是匿名内部类,或者命名类,并且传递类的一个实例到 Spark。
    • 在 Java8 中,使用 lambda 表达式来简明地定义函数的实现。

为了保持简洁性,本指南中大量使用了 lambda 语法,这在长格式中很容易使用所有相同的 APIs。比如,我们可以把上面的代码写成:

JavaRDD<String>  lines = sc.textFile("data.txt");
JavaRDD lineLengths = lines.map(new Function Integer>() {public Integer call(String s) {return s.length();}
});
int totalLength = lineLengths.reduce(new Function2 Integer, Integer>() {public Integer call(Integer a, Integer b) {return a + b; }
});

同样的功能,使用内联式的实现显得更为笨重繁琐,代码如下:

class GetLength implements Function Integer> {public Integer call(String s) {return s.length();}
}
class Sum implements Function2 Integer, Integer> {public Integer call(Integer a, Integer b) {return a + b; }
}

JavaRDD lines = sc.textFile("data.txt");
JavaRDD lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

注意,java 中的内部匿名类,只要带有 final 关键字,就可以访问类范围内的变量。Spark 也会把变量复制到每一个 worker 节点。

 

4.3.3 理解闭包(Understanding closures)

使用 Spark 的一个难点为:理解程序在集群中执行时变量和方法的生命周期。RDD 操作可以在变量范围之外修改变量,这是一个经常导致迷惑的地方。比如下面的例子,使用 foreach() 方法增加计数器(counter)的值(类似的情况,在其他的 RDD 操作中经常出现)。

 

4.3.3.1 示例(Example)

参考下面简单的 RDD 元素求和示例,求和运算是否在同一个 JVM 中执行,其复杂度也不同。Spark 可以在 local 模式下(--master = local[n])执行应用,也可以将该 Spark 应用提交到集群上执行(例如通过 spark-submit 提交到 YARN):

  • Scala
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value:" + counter)
  • Java
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value:" + counter);

 

4.3.3.2 本地模式 VS 集群模式(Local vs. cluster modes)

在本地模式下仅有一个 JVM,上面的代码将直接计算 RDD 中元素和,并存储到 counter 中。此时 RDD 和变量 counter 都在 driver 节点的同一内存空间中。

然而,在集群模式下,情况会变得复杂,上面的代码并不会按照预期的方式执行。为了执行这个 job,Spark 把处理 RDD 的操作分割成多个任务,每个任务将被一个 executor 处理。在执行之前,Spark 首先计算闭包(closure)。闭包是必须对 executor 可见的变量和方法,在对 RDD 进行运算时将会用到这些变量和方法(在本例子中指 foreach())。这个闭包会被序列化,并发送给每个 executor。在 local 模式下,只有一个 executor,所以所有的变量和方法都使用同一个闭包。在其他模式下情况跟 local 模式不一样,每个 executor 在不同的 worker 节点上运行,每个 executor 都有一个单独的闭包。

在这里,发送给每个 executor 的闭包内的变量是当前变量的副本,因此当 counter 在 foreach 中被引用时,已经不是在 driver 节点上的 counter 了。在 driver 节点的内存中仍然有一个 counter,但这个 counter 对 executors 不可见。executor 只能操作序列化的闭包中的 counter 副本。因此,最终 counter 的值仍然是 0,因为所有对 counter 的操作都是在序列化的闭包内的 counter 上进行的。

在类似这种场景下,为了保证良好的行为确保,应该使用累加器。Spark 中的累加器专门为在集群中多个节点间更新变量提供了一种安全机制。在本手册的累加器部分将对累加器进行详细介绍。

一般情况下,像环或本地定义方法这样的闭包结构,不应该用于更改全局状态。Spark 不定义也不保证来自闭包外引用导致的对象变化行为。有些情况下,在 local 模式下可以正常运行的代码,在分布式模式下也许并不会像预期那样执行。在分布式下运行时,建议使用累加器定义一些全局集合。

 

4.3.3.3 打印 RDD 的元素(Printing elements of an RDD)

打印一个 RDD 的元素也是一个常用的语法,带引 RDD 元素可以使用方法 rdd.foreach(println)rdd.map(println)。在本地模式下,该方法将生成预期的输出并打印 RDD 所有的元素。然而,在集群模式下各个 executor 调用 stdout,将结果打印到 executor 的 stdout 中。因为不是打印到 driver 节点上,所以在 driver 节点的 stdout 上不会看到这些输出。如果想将 RDD 的元素打印到 driver 节点上,可以使用 collect() 方法将 RDD 发送到 driver 节点上,然后再打印该 RDD:rdd.collect().foreach(println)。这个操作可能会导致 driver 节点内存不足,因为 collect() 方法将 RDD 全部的数据都发送到一台节点上。如果仅仅打印 RDD 的部分元素,一个安全的方法是使用 take() 方法:rdd.take(100).foreach(println)

 

4.3.4 操作键值对(Working with Key­Value Pairs)

Spark 大部分的 RDD 操作都是对任意类型的对象的,但是,有部分特殊的操作仅支持对键值对的 RDD 进行操作。最常用的是分布式“shuffle”操作,比如按照 key 将 RDD 的元素进行分组或聚集操作。

  • Scala
    在 Scala 中,包含 Tuple2 对象在内的 RDD 键值对操作,都是可以自动可用的(Tuple2 对象是 Scala 语言内置的元组类型,可以通过简单的编写进行 (a,b) 创建)。键值对操作接口在 PairRDDFunctions 类中,该类中的接口自动使用 RDD 的元组。
    例如,在下面的代码中使用 reduceByKey 操作对键值对进行计数,计算每行的文本出现的次数:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
  • Java
    在 Java 中,键值对使用的是 scala.Tuple2 类。用户可以使用特定的 map 操作将 JavaRDDs 转换为 JavaPairRDDs,例如 mapToPairflatMapToPair。JavaPairRDD 拥有标准 RDD 和特殊键值对的方法。
    例如,在下面的代码中使用 reduceByKey 操作对键值对进行计数,计算每行的文本出现的次数:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

我们还可以使用 counts.sortByKey() 按照字母顺序将键值对排序,使用 counts.collect() 将结果以一个数组的形式发送给 driver 节点。

注意,当在键值对操作中使用自定义对象作为 key 时,你必须保证自定义的 equals() 方法有一个对应的 hashCode() 方法。详细的细节,请阅读 Object.hashCode() documentation。

 

4.3.5 Transformations

下面列出了 Spark 常用的 transformation 操作。详细的细节请参考 RDD API 文档 (Scala、Java、Python、R) 和键值对 RDD 方法文档(Scala、Java)。

  • map(func)
    将原来 RDD 的每个数据项,使用 map 中用户自定义的函数 func 进行映射,转变为一个新的元素,并返回一个新的 RDD。

  • filter(func)
    使用函数 func 对原 RDD 中数据项进行过滤,将符合 func 中条件的数据项组成新的 RDD 返回。

  • flatMap(func)
    类似于 map,但是输入数据项可以被映射到 0 个或多个输出数据集合中,所以函数 func 的返回值是一个数据项集合而不是一个单一的数据项。

  • mapPartitions(func)
    类似于 map,但是该操作是在每个分区上分别执行,所以当操作一个类型为 T 的 RDD 时 func 的格式必须是Iterator<T> => Iterator<U>。即 mapPartitions 需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作。

  • mapPartitionsWithIndex(func)
    类似于 mapPartitions,但是需要提供给 func 一个整型值,这个整型值是分区的索引,所以当处理 T 类型的 RDD 时,func 的格式必须为(Int, Iterator<T>) => Iterator<U>

  • sample(withReplacement, fraction, seed)
    对数据采样。用户可以设定是否有放回(withReplacement)、采样的百分比(fraction)、随机种子(seed)。

  • union(otherDataset)
    返回原数据集和参数指定的数据集合并后的数据集。使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同。该操作不进行去重操作,返回的结果会保存所有元素。如果想去重,可以使用 distinct()。

  • intersection(otherDataset)
    返回两个数据集的交集。

  • distinct([numTasks]))
    将 RDD 中的元素进行去重操作。

  • groupByKey([numTasks])
    操作(K,V)格式的数据集,返回 (K, Iterable)格式的数据集。
    注意,如果分组是为了按 key 进行聚合操作(例如,计算 sum、average),此时使用 reduceByKeyaggregateByKey计算效率会更高。
    注意,默认情况下,并行情况取决于父 RDD 的分区数,但可以通过参数 numTasks 来设置任务数。

  • reduceByKey(func, [numTasks])
    使用给定的 func,将(K,V)对格式的数据集中 key 相同的值进行聚集,其中 func 的格式必须为(V,V) => V。可选参数 numTasks 可以指定 reduce 任务的数目。

  • aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])
    对(K,V)格式的数据按 key 进行聚合操作,聚合时使用给定的合并函数和一个初试值,返回一个 (K,U) 对格式数据。需要指定的三个参数:zeroValue 为在每个分区中, 对 key 值第一次读取 V 类型的值时, 使用的 U 类型的初始变量;seqOp 用于在每个分区中,相同的 key 中 V 类型的值合并到 zeroValue 创建的 U 类型的变量中。combOp 是对重新分区后两个分区中传入的 U 类型数据的合并函数。

  • sortByKey([ascending], [numTasks])
    (K,V)格式的数据集,其中 K 已实现了 Ordered,经过 sortByKey 操作返回排序后的数据集。指定布尔值参数 ascending 来指定升序或降序排列。

  • join(otherDataset, [numTasks])
    用于操作两个键值对格式的数据集,操作两个数据集 (K,V) 和(K,W)返回 (K, (V, W)) 格式的数据集。通过 leftOuterJoinrightOuterJoinfullOuterJoin 完成外连接操作。

  • cogroup(otherDataset, [numTasks])
    用于操作两个键值对格式数据集 (K,V) 和(K,W),返回数据集格式为 (K,(Iterable, Iterable))。这个操作也称为groupWith。对在两个 RDD 中的 Key-Value 类型的元素,每个 RDD 相同 Key 的元素分别聚合为一个集合,并且返回两个 RDD 中对应 Key 的元素集合的迭代器。

  • cartesian(otherDataset)
    对类型为 T 和 U 的两个数据集进行操作,返回包含两个数据集所有元素对的(T,U)格式的数据集。即对两个 RDD 内的所有元素进行笛卡尔积操作。

  • pipe(command, [envVars])
    以管道 (pipe) 方式将 RDD 的各个分区 (partition) 使用 shell 命令处理(比如一个 Perl 或 bash 脚本)。RDD 的元素会被写入进程的标准输入 (stdin),将进程返回的一个字符串型 RDD(RDD of strings),以一行文本的形式写入进程的标准输出(stdout) 中。

  • coalesce(numPartitions)
    把 RDD 的分区数降低到通过参数 numPartitions 指定的值。在得到的更大一些数据集上执行操作,会更加高效。

  • repartition(numPartitions)
    随机地对 RDD 的数据重新洗牌(Reshuffle),从而创建更多或更少的分区,以平衡数据。总是对网络上的所有数据进行洗牌(shuffles)。

  • repartitionAndSortWithinPartitions(partitioner)
    根据给定的分区器对 RDD 进行重新分区,在每个结果分区中,按照 key 值对记录排序。这在每个分区中比先调用 repartition 再排序效率更高,因为它可以将排序过程在 shuffle 操作的机器上进行。

 

4.3.6 Actions

下面列出了 Spark 支持的常用的 action 操作。详细请参考 RDD API 文档 (Scala、Java、Python、R) 和键值对 RDD 方法文档(Scala、Java)。

  • reduce(func)
    使用函数 func 聚集数据集中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该符合结合律和交换了,这样才能保证数据集中各个元素计算的正确性。

  • collect()
    在驱动程序中,以数组的形式返回数据集的所有元素。通常用于 filter 或其它产生了大量小数据集的情况。

  • count()
    返回数据集中元素的个数。

  • first()
    返回数据集中的第一个元素(类似于take(1))。

  • take(n)
    返回数据集中的前 n 个元素。

  • takeSample(withReplacement,num, [seed])
    对一个数据集随机抽样,返回一个包含 num 个随机抽样元素的数组,参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子。

  • takeOrdered(n, [ordering])
    返回 RDD 按自然顺序或自定义顺序排序后的前 n 个元素。

  • saveAsTextFile(path)
    将数据集中���元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。Spark 将在每个元素上调用 toString 方法,将数据元素转换为文本文件中的一行记录。

  • saveAsSequenceFile(path) (Java and Scala)
    将数据集中的元素以 Hadoop Sequence 文件的形式保存到指定的本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中。该操作只支持对实现了 Hadoop 的 Writable 接口的键值对 RDD 进行操作。在 Scala 中,还支持隐式转换为 Writable 的类型(Spark 包括了基本类型的转换,例如 Int、Double、String 等等)。

  • saveAsObjectFile(path) (Java and Scala)
    将数据集中的元素以简单的 Java 序列化的格式写入指定的路径。这些保存该数据的文件,可以使用 SparkContext.objectFile()进行加载。

  • countByKey()
    仅支持对(K,V)格式的键值对类型的 RDD 进行操作。返回(K,Int)格式的 Hashmap,(K,Int)为每个 key 值对应的记录数目。

  • foreach(func)
    对数据集中每个元素使用函数 func 进行处理。该操作通常用于更新一个累加器(Accumulator)或与外部数据源进行交互。注意:在 foreach()之外修改累加器变量可能引起不确定的后果。详细介绍请阅读 Understanding closures 部分。

 

4.3.7 Shuffle 操作(Shuffle operations)

Spark 内的一个操作将会触发 shuffle 事件。shuffle 是 Spark 将多个分区的数据重新分组重新分布数据的机制。shuffle 是一个复杂且代价较高的操作,它需要完成将数据在 executor 和机器节点之间进行复制的工作。

 

4.3.7.1 背景(Background)

通过 reduceByKey 操作的例子,来理解 shuffle 过程。reduceByKey操作生成了一个新的 RDD,原始数据中相同 key 的所有记录的聚合值合并为一个元组,这个元组中的 key 对应的值为执行 reduce 函数之后的结果。这个操作的挑战是,key 相同的所有记录不在同一各分区种,甚至不在同一台机器上,但是该操作必须将这些记录联合运算。

在 Spark 中,通常一条数据不会垮分区分布,除非为了一个特殊的操作在必要的地方才会跨分区分布。在计算过程中,一个分区由一个 task 进行处理。因此,为了组织所有的数据让一个 reduceByKey 任务执行,Spark 需要进行一个 all-to-all 操作。all-to-all 操作需要读取所有分区上的数据的所有的 key,以及 key 对应的所有的值,然后将多个分区上的数据进行汇总,并将每个 key 对应的多个分区的数据进行计算得出最终的结果,这个过程称为 shuffle。

虽然每个分区中新 shuffle 后的数据元素是确定的,分区间的���序也是确定的,但是所有的元素是无序的。如果想在 shuffle 操作后将数据按指定规则进行排序,可以使用下面的方法:

  • 使用 mapPartitions 操作在每个分区上进行排序,排序可以使用 .sorted 等方法。
  • 使用 repartitionAndSortWithinPartitions 操作在重新分区的同时高效的对分区进行排序。
  • 使用 sortBy 将 RDD 进行排序。

会引起 shuffle 过程的操作有:

  • repartition操作,例如:repartitioncoalesce
  • ByKey操作(除了 counting 相关操作),例如:groupByKeyreduceByKey
  • join操作,例如:cogroupjoin

 

4.3.7.2 性能影响(Performance Impact)

shuffle 是一个代价比较高的操作,它涉及磁盘 IO、数据序列化、网络 IO。为了准备 shuffle 操作的数据,Spark 启动了一系列的 map 任务和 reduce 任务,map 任务完成数据的处理工作,reduce 完成 map 任务处理后的数据的收集工作。这里的 map、reduce 来自 MapReduce,跟 Spark 的 map 操作和 reduce 操作没有关系。

在内部,一个 map 任务的所有结果数据会保存在内存,直到内存不能全部存储为止。然后,这些数据将基于目标分区进行排序并写入一个单独的文件中。在 reduce 时,任务将读取相关的已排序的数据块。

某些 shuffle 操作会大量消耗堆内存空间,因为 shuffle 操作在数据转换前后,需要在使用内存中的数据结构对数据进行组织。需要特别说明的是,reduceByKeyaggregateByKey 在 map 时会创建这些数据结构,ByKey操作在 reduce 时创建这些数据结构。当内存满的时候,Spark 会把溢出的数据存到磁盘上,这将导致额外的磁盘 IO 开销和垃圾回收开销的增加。

shuffle 操作还会在磁盘上生成大量的中间文件。在 Spark 1.3 中,这些文件将会保留至对应的 RDD 不在使用并被垃圾回收为止。这么做的好处是,如果在 Spark 重新计算 RDD 的血统关系(lineage)时,shuffle 操作产生的这些中间文件不需要重新创建。如果 Spark 应用长期保持对 RDD 的引用,或者垃圾回收不频繁,这将导致垃圾回收的周期比较长。这意味着,长期运行 Spark 任务可能会消耗大量的磁盘空间。临时数据存储路径可以通过 SparkContext 中设置参数 spark.local.dir 进行配置。

shuffle 操作的行为可以通过调节多个参数进行设置。详细的说明请看 Configuration Guide 中的“Shuffle Behavior”部分。

 

4.4 RDD 持久化(RDD Persistence)

Spark 中一个很重要的能力是将数据持久化(或称为缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点会将本节点计算的数据块存储到内存,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并在各个节点的内存中缓存。Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

另外,每个持久化的 RDD 可以使用不同的存储级别进行缓存,例如,持久化到磁盘、已序列化的 Java 对象形式持久化到内存(可以节省空间)、跨节点间复制、以 off-heap 的方式存储在 Tachyon。这些存储级别通过传递一个 StorageLevel 对象(Scala、Java、Python)给 persist() 方法进行设置。cache()方法是使用默认存储级别的快捷设置方法,默认的存储级别是StorageLevel.MEMORY_ONLY(将反序列化的对象存储到内存中)。详细的存储级别介绍如下:

  • MEMORY_ONLY:将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。
  • MEMORY_AND_DISK:将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
  • MEMORY_ONLY_SER:将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer 时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。
  • MEMORY_AND_DISK_SER:类似于 MEMORY_ONLY_SER,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。
  • DISK_ONLY:只在磁盘上缓存 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2, 等等:与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
  • OFF_HEAP (实验中):以序列化的格式 (serialized format) 将 RDD 存储到 Tachyon。相比于 MEMORY_ONLY_SER,OFF_HEAP 降低了垃圾收集 (garbage collection) 的开销,使得 executors 变得更小,而且共享了内存池,在使用大堆 (heaps) 和多应用并行的环境下有更好的表现。此外,由于 RDD 存储在 Tachyon 中,executor 的崩溃不会导致内存中缓存数据的丢失。在这种模式下,Tachyon 中的内存是可丢弃的。因此,Tachyon 不会尝试重建一个在内存中被清除的分块。如果你打算使用 Tachyon 进行 off heap 级别的缓存,Spark 与 Tachyon 当前可用的版本相兼容。详细的版本配对使用建议请参考 Tachyon 的说明。

注意,在 Python 中,缓存的对象总是使用 Pickle 进行序列化,所以在 Python 中不关心你选择的是哪一种序列化级别。

在 shuffle 操作中(例如 reduceByKey),即便是用户没有调用persist 方法,Spark 也会自动缓存部分中间数据。这么做的目的是,在 shuffle 的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法。

 

4.4.1 如何选择存储级别(Which Storage Level to Choose?)

Spark 的存储级别的选择,核心问题是在内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择:

  • 如果使用默认的存储级别(MEMORY_ONLY),存储在内存中的 RDD 没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度的提高 CPU 的效率, 可以使在 RDD 上的操作以最快的速度运行。
  • 如果内存不能全部存储 RDD, 那么使用 MEMORY_ONLY_SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。
  • 除了在计算该数据集的代价特别高,或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。
  • 如果想快速还原故障,建议使用多副本存储界别(例如,使用 Spark 作为 web 应用的后台服务,在服务出故障时需要快速恢复的场景下)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。
  • 在高内存消耗或者多任务的环境下,还处于实验性的 OFF_HEAP 模式有下列几个优势:
    • 它支持多个 executor 使用 Tachyon 中的同一个内存池。
    • 它显著减少了内存回收的代价。
    • 如果个别 executor 崩溃掉,缓存的数据不会丢失。

 

4.4.2 移除数据(Removing Data)

Spark 自动监控各个节点上的缓存使用率,并以最近最少使用的方式(LRU)将旧数据块移除内存。如果想手动移除一个 RDD,而不是等待该 RDD 被 Spark 自动移除,可以使用 RDD.unpersist() 方法。

 

5 共享变量(Shared Variables)

通常情况下,一个传递给 Spark 操作(例如 mapreduce)的方法是在远程集群上的节点执行的。方法在多个节点执行过程中使用的变量,是同一份变量的多个副本。这些变量的以副本的方式拷贝到每个机器上,各个远程机器上变量的更新并不会传回 driver 程序。然而,为了满足两种常见的使用场景,Spark 提供了两种特定类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

 

5.1 广播变量(broadcast variables)

广播变量允许编程者将一个只读变量缓存到每台机器上,而不是给每个任务传递一个副本。例如,广播变量可以用一种高效的方式给每个节点传递一份比较大的数据集副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发变量,以降低通信成本。

Spark 的 action 操作是通过一些列的阶段(stage)进行执行的,这些阶段(stage)是通过分布式的 shuffle 操作进行切分的。Spark 自动广播在每个阶段内任务需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务在运行前进行反序列化。这明确说明了,只有在跨越多个阶段的多个任务任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。

广播变量通过在一个变量 v 上调用 SparkContext.broadcast(v) 方法进行创建。广播变量是 v 的一个封装器,可以通过 value 方法访问 v 的值。代码示例如下:

  • Scala
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
  • Java
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

广播变量创建之后,在集群上执行的所有的函数中,应该使用该广播变量代替原来的 v 值。所以,每个节点上的 v 最多分发一次。另外,对象 v 在广播后不应该再被修改,以保证分发到所有的节点上的广播变量有同样的值(例如,在分发广播变量之后,又对广播变量进行了修改,然后又需要将广播变量分发到新的节点)。

 

5.2 累加器(Accumulators)

累加器只允许关联操作进行 ”added” 操作,因此在并行计算中可以支持特定的计算。累加器可以用于实现计数(类似在 MapReduce 中那样)或者求和。原生 Spark 支持数值型的累加器,编程者可以添加新的支持类型。创建累加器并命名之后,在 Spark 的 UI 界面上将会显示该累加器。这样可以帮助理解正在运行的阶段的运行情况(注意,在 Python 中还不支持)。

一个累加器可以通过在原始值 v 上调用 SparkContext.accumulator(v)。然后,集群上正在运行的任务就可以使用add 方法或 += 操作对该累加器进行累加操作。只有 driver 程序可以读取累加器的值,读取累加器的值使用 value 方法。
下面代码将数组中的元素进行求和:

  • Scala
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10
  • Java
Accumulator<Integer> accum = sc.accumulator(0);

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

上面的代码示例使用的是 Spark 内置的 Int 类型的累加器,开发者可以通过集成 AccumulatorParam 类创建新的累加器类型。AccumulatorParam 接口有两个方法:zero方法和 addInPlace 方法。zero方法给数据类型提供了一个 0 值,addInPlace方法能够将两个值进行累加。例如,假设我们有一个表示数学上向量的 Vector 类,我们可以写成:

  • Scala
object VectorAccumulatorParam extends AccumulatorParam[Vector] {defzero(initialValue: Vector): Vector = {Vector.zeros(initialValue.size)
  }
  defaddInPlace(v1: Vector, v2: Vector): Vector = {v1 += v2}
}

// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
  • Java
class VectorAccumulatorParam implements AccumulatorParam<Vector> {public Vector zero(Vector initialValue) {return Vector.zeros(initialValue.size());
  }
  public Vector addInPlace(Vector v1, Vector v2) {v1.addInPlace(v2); return v1;
  }
}

// Then, create an Accumulator of this type:
Accumulator<Vector> vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());

Spark 也支持使用更通用的 Accumulable 接口去累加数据,其结果数据的类型和累加的元素类型不同(例如,通过收集数据元素创建一个 list)。在 Scala 中,SparkContext.accumulableCollection方法可用于累加常用的 Scala 集合类型。

累加器的更新只发生在 action 操作中,Spark 保证每个任务只能更新累加器一次,例如重新启动一个任务,该重启的任务不允许更新累加器的值。在 transformation 用户需要注意的是,如果任务过 job 的阶段重新执行,每个任务的更新操作将会执行多次。

累加器没有改变 Spark 懒执行的模式。如果累加器在 RDD 中的一个操作中进行更新,该累加器的值只在该 RDD 进行 action 操作时进行更新。因此,在一个像 map() 这样的转换操作中,累加器的更新并没有执行。下面的代码片段证明了这个特性:

  • Scala
val accum = sc.accumulator(0)
data.map {x => accum += x; f(x) }
// Here, accum is still 0 because no actions have caused the <code>map</code> to be computed.
  • Java
Accumulator<Integer> accum = sc.accumulator(0);
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

 

6 将应用提交到集群(Deploying to a Cluster)

应用提交手册描述了如何将应用提交到集群。简单的说,当你将你的应用打包成一个 JAR(Java/Scala)或者一组 .py.zip文件(Python)后,就可以通过 bin/spark-submit 脚本将脚本提交到集群支持的管理器中。

 

7 Java/Scala 中启动 Spark 作业(Launching Spark jobs from Java / Scala)

使用 org.apache.spark.launcher 包提供的简单的 Java API,可以将 Spark 作业以该包中提供的类的子类的形式启动。

 

8 单元测试(Unit Testing)

Spark 可以友好的使用流行的单元测试框架进行单元测试。在 test 中简单的创建一个 SparkContext,master 的 URL 设置为local,运行几个操作,然后调用SparkContext.stop() 将该作业停止。因为 Spark 不支持在同一个程序中运行两个 context,所以需要请确保使用 finally 块或者测试框架的 tearDown 方法将 context 停止。

 

9 从 Spark1.0 之前的版本迁移(Migrating from pre­1.0 Versions of Spark)

Spark 1.0 冻结了 1.X 系列的 Spark 核的 API,因此,当前没有标记为 ”experimental” 或者“developer API”的 API 都将在未来的版本中进行支持。

  • Scala 的变化

对于 Scala 的变化是,分组操作(例如 groupByKeycogroupjoin)的返回类型由 (Key,Seq[Value]) 变为(Key,Iterable[Value])

  • Java API 的变化
    • 1.0 中 org.apache.spark.api.java.function 类中的 Function 类变成了接口,这意味着旧的代码中 extends Function 应该改为implement Function
    • 增加了新的 map 型操作,例如 mapToPairmapToDouble,增加的这些操作可用于创建特殊类型的 RDD。
    • 分组操作(例如 groupByKeycogroupjoin)的返回类型由 (Key,Seq[Value]) 变为(Key,Iterable[Value])

这些迁移指导对 Spark Streaming、MLlib 和 GraphX 同样有效。

 

10 下一步(Where to Go from Here)

你可以在 Spark 网站看一些 Spark 编程示例。另外,Spark 在 examples 目录下包含了许多例子(Scala、Java、Python、R)。运行 Java 和 Scala 例子,可以通过将例子的类名传给 Spark 的 bin/run-example 脚本进行启动。例如:

./bin/run-example SparkPi

Python 示例,使用 spark-submit 命令提交:

./bin/spark-submit examples/src/main/python/pi.py

R 示例,使用 spark-submit 命令提交:

./bin/spark-submit examples/src/main/r/dataframe.R

在 configuration 和 tuning 手册中,有许多优化程序的实践。这些优化建议,能够确保你的数据以高效的格式存储在内存中。对于部署的帮助信息,请阅读 cluster mode overview,该文档描述了分布式操作和支持集群管理��的组件。

最后,完整的 API 文档请查阅 Scala、Java、Python、R。

更多 Spark 相关教程见以下内容

CentOS 7.0 下安装并配置 Spark  http://www.linuxidc.com/Linux/2015-08/122284.htm

Spark1.0.0 部署指南 http://www.linuxidc.com/Linux/2014-07/104304.htm

CentOS 6.2(64 位)下安装 Spark0.8.0 详细记录 http://www.linuxidc.com/Linux/2014-06/102583.htm

Spark 简介及其在 Ubuntu 下的安装使用 http://www.linuxidc.com/Linux/2013-08/88606.htm

安装 Spark 集群(在 CentOS 上) http://www.linuxidc.com/Linux/2013-08/88599.htm

Hadoop vs Spark 性能对比 http://www.linuxidc.com/Linux/2013-08/88597.htm

Spark 安装与学习 http://www.linuxidc.com/Linux/2013-08/88596.htm

Spark 并行计算模型 http://www.linuxidc.com/Linux/2012-12/76490.htm

Ubuntu 14.04 LTS 安装 Spark 1.6.0(伪分布式)http://www.linuxidc.com/Linux/2016-03/129068.htm

Spark 的详细介绍:请点这里
Spark 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-04/130621.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计24329字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中