RDD 编程指南
概述
- rdd
每个 Spark 应用程序都包含一个驱动程序,它运行用户的 main 函数并在集群上执行各种并行操作。 Spark 提供的主要抽象是弹性分布式数据集 (RDD),它是一个分布在集群节点上的元素集合,可以并行操作。 创建于文件或者外部数据源、内存数据源,并对其进行转换。 用户还可以要求 Spark持久化一个 RDD 到内存中,使其能够在并行操作中有效地重复使用。 最后,RDD 会自动从节点故障中恢复。
- 共享变量
Spark 中的第二个抽象是共享变量,它可以在并行操作中使用。 默认情况下,当 Spark 并行运行一个函数作为一组在不同节点上的任务时,它会将函数中使用的每个变量的副本发送到每个任务。有时,需要在任务之间或任务和驱动程序之间共享一个变量。Spark 支持两种类型的共享变量:广播变量,它可以用来在所有节点的内存中缓存一个值,以及累加器,它是一些只能“添加”的变量,例如计数器和总和。
本指南展示了 Spark 支持的每种语言中的每个功能。如果你启动 Spark 的交互式 shell,最容易理解 - 对于 Scala shell,使用 bin/spark-shell.
与 Spark 链接
要编写 Spark 应用程序,你需要添加一个对 Spark 的 Maven 依赖项
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.4</version>
</dependency>
如果你想访问 HDFS 集群,你需要添加对你的 HDFS 版本的 hadoop-client 的依赖项
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
初始化 Spark
然后,你需要创建一个 SparkConf 对象,它包含了 Spark 应用程序的配置信息。
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("MyApp").setMaster("local")
val sc = new SparkContext(conf)
SparkConf 类包含了许多配置选项,包括:
- setAppName(appName: String): 设置 Spark 应用程序的名称。
- setMaster(master: String): 设置 Spark 集群的 URL。
- set(key: String, value: String): 设置 SparkConf 中的其他配置选项。
SparkContext 类是 Spark 应用程序的主要入口点,它负责创建 RDD、运行并行操作、管理持久化的变量以及从节点故障中恢复。
使用 Shell
Spark 提供了一个交互式 shell,你可以在其中运行 Spark 代码。 提供sc(SparkContext)变量,运行程序
$ ./bin/spark-shell
# 在四核上运行
$ ./bin/spark-shell --master local[4]
# 连接到远程集群
$ ./bin/spark-shell --master spark://master:7077
# 连接到 YARN 集群
$ ./bin/spark-shell --master yarn
# 连接到 Mesos 集群
$ ./bin/spark-shell --master mesos://master:5050
# 连接到 Kubernetes 集群
$ ./bin/spark-shell --master k8s://https://kubernetes.default.svc
其他的选项
- --jars: 加载额外的 jar 包。
- --files: 上传到集群的本地文件。
- --conf: 设置 SparkConf 中的配置选项。
spark-submit 工具可以用来提交 Spark 应用程序到集群。 参数说明
- --class: 主类名。(例如 org.apache.spark.examples.SparkPi)
- --master: Spark 集群的 URL。
- --deploy-mode: 部署模式,可以是 "client" 或 "cluster"。
- --jars: 加载额外的 jar 包。
- --files: 上传到集群的本地文件。
- --conf: 设置 SparkConf 中的配置选项。(例如
--conf <key>=<value> --conf <key2>=<value2>
) - --driver-class-path: 驱动程序的类路径。
- --application-arguments: 传递给主类的命令行参数。
- --driver-memory: 驱动程序的内存。
- --executor-memory: 每个 executor 的内存。
- --executor-cores: 每个 executor 的 CPU 核心数。
- --num-executors: 集群中 executor 的数量。
- --archives: 上传到集群的归档文件。
- --queue: 队列名。
- --name: 作业名。
- --verbose: 显示详细的日志信息。
- --help: 显示帮助信息。
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://0.0.0.0:7077 \
--executor-memory 4G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://0.0.0.0:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 4G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://0.0.0.0:7077 \
examples/src/main/python/pi.py \
1000
RDD 操作
source 来源
来源
- 本地集合
val data=Array(1,2,3,4,5)
val rdd=sc.parallelize(data)
- 外部数据源
此方法接受文件的 URI(机器上的本地路径或 hdfs://、s3a:// 等 URI)并将其读取为行集合
val rdd=sc.textFile("file:///path/to/file")
// Spark 的所有基于文件的输入方法(包括 textFile)都支持在目录、压缩文件和通配符上运行
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
转换
RDD 支持两种类型的操作:转换,它从现有数据集创建新数据集,以及操作,它在对数据集运行计算后将值返回给驱动程序。
例如,
map 是一种转换,它将每个数据集元素通过一个函数传递,并返回一个表示结果的新 RDD。
另一方面,reduce 是一种操作,它使用某个函数聚合 RDD 中的所有元素,并将最终结果返回给驱动程序(尽管也存在一个并行的 reduceByKey,它返回一个分布式数据集)。
惰性: Spark 中的所有转换都是惰性的,这意味着它们不会立即计算结果。相反,它们只记住应用于某个基本数据集(例如文件)的转换。只有当操作需要将结果返回给驱动程序时,才会计算转换。这种设计使 Spark 能够更有效地运行。例如,我们可以意识到,通过 map 创建的数据集将在 reduce 中使用,并且只将 reduce 的结果返回给驱动程序,而不是更大的映射数据集。
持久化: 默认情况下,每次对转换后的 RDD 运行操作时,它都可能被重新计算。但是,您也可以使用 persist(或 cache)方法将 RDD 持久化到内存中,在这种情况下,Spark 将在集群中保留这些元素,以便下次查询时能够更快地访问它们。还支持将 RDD 持久化到磁盘,或复制到多个节点上。
val lines = sc.textFile("data/input.txt")
val lineLengths = lines.map(s => s.length)
// 如果我们还想在以后再次使用 lineLengths,
// 在 reduce 之前,这将导致 lineLengths 在第一次计算后保存在内存中
lineLengths.persist()
val totalLength = lineLengths.reduce(_+_)
理解闭包
- 闭包
def add(x: Int): Int = {
def addY(y: Int): Int = x + y
addY
}
- 闭包是一个函数,它引用了其定义域之外的变量。
- 闭包可以访问其定义域中的变量,即使这些变量在函数返回后也仍然存在。
- 闭包可以作为参数传递给另一个函数,并在那里被调用。
- 闭包可以作为变量的值存储在数据结构中,并在稍后被访问。
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
// 发送到每个执行器的闭包中的变量现在是副本,而不是共享变量
// 解决使用 Accumulator 累加器
println("Counter value: " + counter)
打印 RDD 的元素
rdd.foreach(println) 或 rdd.map(println)
在一台机器上,这将生成预期的输出并打印所有 RDD 的元素。
在 cluster 模式下,执行器调用的 stdout 现在写入执行器的 stdout 而不是驱动程序上的 stdout,因此驱动程序上的 stdout 不会显示这些
要打印驱动程序上的所有元素,可以使用 collect() 方法首先将 RDD 带到驱动程序节点,如下所示:rdd.collect().foreach(println)。但这会导致驱动程序内存不足,因为 collect() 将整个 RDD 提取到一台机器上;如果您只需要打印 RDD 的几个元素,更安全的方法是使用
take():rdd.take(100).foreach(println)
使用键值对
- 键值对
Spark 中的键值对通常表示为 (K,V) 对,其中 K 是键,V 是值。
- 键可以是任意类型,但通常是一个不可变的类型(例如 Int、String 或 Tuple)。
- 值可以是任意类型。
- 键值对的集合可以表示为 RDD,其中每个元素是一个 (K,V) 对。
- 键值对的集合可以表示为 Map,其中每个键映射到一个值。
val pairs = sc.parallelize(Array(("a", 1), ("b", 2), ("a", 3)))
val map = pairs.collectAsMap()
println(map) // Map(a -> 3, b -> 2)
// 以下代码使用 reduceByKey 操作对键值对进行操作,以计算文本文件中每行出现的次数。
val lines = sc.textFile("data/input.txt")
val counts = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.foreach(println)
转换
一些算子
- map: 对每个元素应用一个函数。
- flatMap: 对每个元素应用一个函数,然后将结果展平为一个序列。
- filter: 保留满足给定条件的元素。
- distinct: 保留唯一的元素。
- sample: 随机抽样元素。
- union: 合并两个 RDD。
- intersection: 保留两个 RDD 的交集。
- subtract: 保留第一个 RDD 中不在第二个 RDD 中的元素。
- groupByKey: 将键值对集合转换为键值对的集合,其中每个键对应于一个值集合。
- reduceByKey: 聚合键相同的元素。
- ...
- reduce(func): 对元素应用一个二元函数,并返回一个单一的值。
- count: 返回 RDD 中元素的数量。
- first: 返回 RDD 中第一个元素。
- take(n): 返回 RDD 中前 n 个元素。
- collect: 返回 RDD 中所有元素的集合。
- foreach(func): 对每个元素应用一个函数。
- ...
洗牌操作
Spark 提供了两种类型的洗牌操作:
- 全局洗牌:对整个 RDD 进行洗牌。
- 局部洗牌:对每个分区进行洗牌。
一些算子底层使用shuffle操作,例如 groupByKey、reduceByKey、join、cogroup、repartition等。
定义:
重新分配数据, 根据rdd,重新生成一个新的rdd,每个分区中的数据随机分配到其他分区,使得每个分区中的数据分布随机。 reduceByKey 操作生成一个新的 RDD,其中单个键的所有值都组合成一个元组 - 键和对与该键关联的所有值执行 reduce 函数的结果。
RDD 持久化
Spark 支持两种类型的持久化:
- cache(), 内存持久化:将 RDD 持久化到内存中。
- persist(), 磁盘持久化:将 RDD 持久化到磁盘上。
cache() 方法, 使用默认存储级别的简写,即 StorageLevel.MEMORY_ONLY. persist() 方法, 可以指定存储级别,
存储级别
- MEMORY_ONLY: 只缓存到内存中,将 RDD 存储为 JVM 中的反序列化 Java 对象。如果 RDD 无法容纳在内存中,则某些分区将不会被缓存,并且将在每次需要时动态重新计算。这是默认级别。
- MEMORY_AND_DISK: 先缓存到内存中,再写入磁盘。将 RDD 存储为 JVM 中的反序列化 Java 对象。如果 RDD 无法容纳在内存中,则将无法容纳在磁盘上的分区存储在磁盘上,并在需要时从磁盘读取它们。
- MEMORY_ONLY_SER: 只缓存到内存中,使用 Java 序列化。将 RDD 存储为序列化的 Java 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,尤其是在使用 快速序列化器 时,但读取时更占用 CPU。
- MEMORY_AND_DISK_SER: 与 MEMORY_ONLY_SER 相似,但将无法容纳在内存中的分区溢出到磁盘,而不是在每次需要时动态重新计算它们。
- DISK_ONLY: 只写入磁盘,不缓存到内存中。
- OFF_HEAP: 缓存到堆外内存,而不是 JVM 堆中。
- MEMORY_ONLY_2: 与 MEMORY_ONLY 相同,但使用堆外内存。
删除数据
- 自动, LRU算法清除
- 手动, RDD.unpersist(), 不会阻塞
广播变量
broadcast 就是将数据从一个节点发送到其他各个节点上去。
广播变量是只读变量,它可以在多个执行器上共享。
- 广播变量可以用来在多个节点上缓存一个值,以便在多个任务之间共享。
- 广播变量可以用来在每个节点上缓存一个大型数据集,以便在多个任务之间共享。
- 为什么只能 broadcast 只读的变量
一致性, 如果变量可以被更新,那么一旦变量被某个节点更新,其他节点要不要一块更新?如果多个节点同时在更新,更新顺序是什么?怎么做同步?还会涉及 fault-tolerance 的问题。为了避免维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。
- broadcast 到节点而不是 broadcast 到每个 task?
因为每个 task 是一个线程,而且同在一个进程运行 tasks 都属于同一个 application。因此每个节点(executor)上放一份就可以被所有 task 共享。
- 如何使用 broadcast
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)
val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
Accumulator
Accumulator 是一种只写变量,它可以用来聚合多个任务的结果。
Accumulator 可以用来实现计数器、求和、平均值、列表、字典等。
Accumulator 不能被修改,只能被增加。
Accumulator 不能被广播,只能在每个节点上单独创建。
Accumulator 不能被 checkpoint,只能在 driver 节点上使用。
Accumulator 不能被持久化,只能在 driver 节点上使用。
Accumulator 不能被 cache,只能在 driver 节点上使用。
Accumulator 不能被使用 collect() 操作,只能在 driver 节点上使用.
累加器(Accumulators)与广播变量(Broadcast Variables)共同作为Spark提供的两大共享变量,主要用于跨集群的数据节点之间的数据共享,突破数据在集群各个executor不能共享问题。
而累加器主要定义在driver节点,在executor节点进行操作,最后在driver节点聚合结果做进一步的处理
常见的累加器:
- LongAccumulator: 参数支持Integer、Long
- DoubleAccumulator: 参数支持Double
- CollectionAccumulator: 参数支持List、Set、Map
- AccumulatorV2: 自定义Accumulator,可以支持任意类型
累加器的使用场景:
常用来某些统计类场景,比如统计最近1小时多少用户或 IP访问数量;监控某些灰黑产利用平台可能存在漏洞大肆薅羊毛,并短信或邮件通知相关人员及时采取对策,等相关场景会用到
driver 定义累加器,在各个executor上执行任务,累加器的值在各个executor上累加,最后在driver上聚合结果。
累加器的实现
// 定义一个累加器, 对1、2、3、4求和
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.JavaConversions._
object TestAccumulator {
def main(args: Array[String]): Unit = {
val ss: SparkSession = SparkSession.builder().appName("test-ccumulator").master("local[2]").getOrCreate()
val sc = ss.sparkContext
val longAccumulator = sc.longAccumulator("My longAccumulator")
sc.makeRDD(Arrays.asList(1,2,3,4)).foreach(v => {
longAccumulator.add(v)
println(Thread.currentThread().getName+" longAccumulator="+longAccumulator.value)
})
println("Driver "+Thread.currentThread().getName+" longAccumulator="+longAccumulator.value)
}
}
启动 Spark 作业
#类名传递给 Spark 的 bin/run-example 脚本运行 Java 和 Scala 示例
./bin/run-example SparkPi
#对于 Python 示例,请使用 spark-submit 代替
./bin/spark-submit examples/src/main/python/pi.py 1000
#对于 R 示例,请使用 spark-submit 代替
./bin/spark-submit examples/src/main/r/dataframe.R