弹性分布式数据集 (Resilient Distributed Datasets, RDD)

RDD 简介

RDD,全称 Resilient Distributed Datasets(弹性分布式数据集),是 Spark 最为核心的概念,是 Spark 对数据的抽象。

RDD 是分布式的元素集合,每个 RDD 只支持读操作,且每个 RDD 都被分为多个分区存储到集群的不同节点上。除此之外,RDD 还允许用户显示的指定数据存储到内存和磁盘中。

对 RDD 的操作,从类型上也比较简单,包括:创建 RDD、转化已有的 RDD 以及在已有 RDD 的基础上进行求值。

RDD 编程练习

flatMap

flatMap() 接收一个函数作为参数,该函数将每个元素转为一个列表,最终 flatMap() 并不是返回由上述列表作为元素组成的 RDD,而是返回一个包含每个列表所有元素的 RDD

flatMap() 练习代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object flatMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("RDDFlatMap")
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps(sc)
sc.stop()
}

def transformationOps(sc:SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
wordsRDD.foreach(println)
}
}

结果如下图:

flatMap 结果

sample

sample() 采样变换根据给定的随机种子,从 RDD 中随机地按指定比例选一部分记录,创建新的 RDD。

语法

1
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

参数
withReplacement: Boolean, True 表示进行替换采样,False 表示进行非替换采样
fraction: Double, 在 0~1 之间的一个浮点值,表示要采样的记录在全体记录中的比例
seed: 随机种子

sample() 练习代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object sample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(sample.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps(sc)
sc.stop()
}

def transformationOps(sc: SparkContext): Unit = {
val list = 1 to 1000
val listRDD = sc.parallelize(list)
val sampleRDD = listRDD.sample(false, 0.2)
sampleRDD.foreach(num => print(num + " "))
println
println("sampleRDD count: " + sampleRDD.count())
println("Another sampleRDD count: " + sc.parallelize(list).sample(false, 0.2).count())
}
}

结果如下图:

sample 结果

union

union() 合并变换将两个 RDD 合并为一个新的 RDD,重复的记录不会被剔除。

语法

1
def union(other: RDD[T]): RDD[T]

参数
other: 第二个 RDD

union() 练习代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object union {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(union.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps(sc)
sc.stop()
}

def transformationOps(sc: SparkContext): Unit = {
val list1 = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val list2 = List(7, 8, 9, 10, 11, 12)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val unionRDD = listRDD1.union(listRDD2)

unionRDD.foreach(println)
}
}

结果如下图:

union 结果

groupByKey

groupByKey() 将 RDD 中每个键的值分组为单个序列。

groupByKey() 练习代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object groupByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(groupByKey.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps(sc)
sc.stop()

def transformationOps(sc: SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
pairsRDD.foreach(println)
val gbkRDD = pairsRDD.groupByKey()
println("==============================")
gbkRDD.foreach(t => println(t._1 + "..." + t._2))
}
}
}

结果如下图:

groupByKey 结果

reduceByKey

groupByKey() 类似,却有不同。

(a,1), (a,2), (b,1), (b,2)groupByKey() 产生中间结果为 ((a,1), (a,2)), ((b,1), (b,2))reduceByKey()(a,3), (b,3)

reduceByKey() 主要作用是聚合,groupByKey() 主要作用是分组

reduceByKey() 练习代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object reduceByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(reduceByKey.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps(sc)
sc.stop()
}

def transformationOps(sc: SparkContext): Unit = {
val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
val pairsRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
val retRDD: RDD[(String, Int)] = pairsRDD.reduceByKey((v1, v2) => v1 + v2)

retRDD.foreach(t => println(t._1 + "..." + t._2))
}
}

结果如下图:

reduceByKey 结果

sortByKey

sortByKey([ascending], [numTasks])

sortByKey() 作用于 Key-Value 形式的 RDD,并对 Key 进行排序。

[ascending] 升序,默认为 true,即升序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object sortByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(sortByKey.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
transformationOps(sc)
sc.stop()
}
/**
* sortByKey:将学生身高进行(降序)排序
* 身高相等,按照年龄排(升序)
*/
def transformationOps(sc: SparkContext): Unit = {
val list = List(
"1,李 磊,22,175",
"2,刘银鹏,23,175",
"3,齐彦鹏,22,180",
"4,杨 柳,22,168",
"5,敦 鹏,20,175"
)
val listRDD:RDD[String] = sc.parallelize(list)
// 使用sortByKey完成操作,只做身高降序排序
val heightRDD:RDD[(String, String)] = listRDD.map(line => {
val fields = line.split(",")
(fields(3), line)
})
val retRDD:RDD[(String, String)] = heightRDD.sortByKey(ascending = false, numPartitions = 1)
retRDD.foreach(println)
}
}

sortByKey 结果

combineByKey 与 aggregeteByKey

下面的代码分别使用 aggregateByKey()combineByKey() 来模拟 groupByKey()reduceBykey()

使用 aggregateByKey 模拟 groupByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object aggregateByKey_1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(aggregateByKey_1.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
aggregateByKey2GroupByKey(sc)
sc.stop()

/**
* 使用aggregateByKey模拟groupByKey
*/
def aggregateByKey2GroupByKey(sc: SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))

val retRDD: RDD[(String, ArrayBuffer[Int])] = pairsRDD.aggregateByKey(ArrayBuffer[Int]())(
(part, num) => {
part.append(num)
part
},
(part1, part2) => {
part1.++=(part2)
part1
}
)
retRDD.foreach(println)
}
}
}

运行结果:

使用aggregateByKey模拟groupByKey

使用 aggregateByKey 模拟 reduceByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object aggregateByKey_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(aggregateByKey_2.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
aggregateByKey2ReduceByKey(sc)
sc.stop()

/**
* 使用aggregateByKey模拟reduceByKey
*/
def aggregateByKey2ReduceByKey(sc: SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))

val retRDD: RDD[(String, Int)] = pairsRDD.aggregateByKey(0)(
(partNum, num) => partNum + num, // 也就是mergeValue
(partNum1, partNum2) => partNum1 + partNum2 // 也就是mergeCombiners
)
retRDD.foreach(println)
}
}
}

运行结果:

使用aggregateByKey模拟reduceByKey

使用 combineByKey 模拟 reduceByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object combineByKey_1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(combineByKey_1.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
combineByKey2ReduceByKey(sc)
sc.stop()

/**
* 使用combineByKey模拟reduceByKey
*/
def combineByKey2ReduceByKey(sc: SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))
/**
* 对于createCombiner1, mergeValue1, mergeCombiners1
* 代码的参数已经体现得很清楚了,其实只要理解了combineByKey模拟groupByKey的例子,这个就非常容易了
*/
val retRDD: RDD[(String, Int)] = pairsRDD.combineByKey(createCombiner1, mergeValue1, mergeCombiners1)

retRDD.foreach(println)
}

/**
* reduceByKey操作,value就是该数值本身,则上面的数据会产生:
* (hello, 1) (bo, 1) (bo, 1)
* (zhou, 1) (xin, 1) (xin, 1)
* (hello, 1) (song, 1) (bo, 1)
* 注意有别于groupByKey的操作,它是创建一个容器
*/
def createCombiner1(num: Int): Int = {
num
}

/**
* 同一partition内,对于有相同key的,这里的mergeValue直接将其value相加
* 注意有别于groupByKey的操作,它是添加到value到一个容器中
*/
def mergeValue1(localNum1: Int, localNum2: Int): Int = {
localNum1 + localNum2
}

/**
* 将两个不同partition中的key相同的value值相加起来
* 注意有别于groupByKey的操作,它是合并两个容器
*/
def mergeCombiners1(thisPartitionNum1: Int, anotherPartitionNum2: Int): Int = {
thisPartitionNum1 + anotherPartitionNum2
}

}
}

运行结果:

使用combineByKey模拟reduceByKey

使用 combineByKey 模拟 groupByKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object combineByKey_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(combineByKey_2.getClass.getSimpleName)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
combineByKey2GroupByKey(sc)
sc.stop()

/**
* 使用combineByKey模拟groupByKey
*/
def combineByKey2GroupByKey(sc: SparkContext): Unit = {
val list = List("hello bo bo", "zhou xin xin", "hello song bo")
val lineRDD = sc.parallelize(list)
val wordsRDD = lineRDD.flatMap(line => line.split(" "))
val pairsRDD = wordsRDD.map(word => (word, 1))

// 输出每个partition中的map对
pairsRDD.foreachPartition(partition => {
println("<=========partition-start=========>")
partition.foreach(println)
println("<=========partition-end=========>")
})

val gbkRDD: RDD[(String, ArrayBuffer[Int])] = pairsRDD.combineByKey(createCombiner, mergeValue, mergeCombiners)
gbkRDD.foreach(println)
}

/**
* 初始化,将value转变成为标准的格式数据
* 是在每个分区中进行的操作,去重后的key有几个,就调用次,
* 因为对于每个key,其容器创建一次就ok了,之后有key相同的,只需要执行mergeValue到已经创建的容器中即可
*/
def createCombiner(num: Int): ArrayBuffer[Int] = {
println("----------createCombiner----------")
ArrayBuffer[Int](num)
}

/**
* 将key相同的value,添加到createCombiner函数创建的ArrayBuffer容器中
* 一个分区内的聚合操作,将一个分区内key相同的数据,合并
*/
def mergeValue(ab: ArrayBuffer[Int], num: Int): ArrayBuffer[Int] = {
println("----------mergeValue----------")
ab.append(num)
ab
}

/**
* 将key相同的多个value数组,进行整合
* 分区间的合并操作
*/
def mergeCombiners(ab1: ArrayBuffer[Int], ab2: ArrayBuffer[Int]): ArrayBuffer[Int] = {
println("----------mergeCombiners----------")
ab1 ++= ab2
ab1
}
}
}

使用combineByKey模拟groupByKey

弹性分布式数据集 (Resilient Distributed Datasets, RDD)

https://morooi.cn/2019/spark/

作者

SJ Zhou

发布于

2019-05-27

更新于

2021-01-06

许可协议

评论