RDD的转换算子
算子名称 | 官网解释 | 作用 |
---|---|---|
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. | map (f: T => U) : RDD[U] 其中f定义了类型为T的元素到类型为U,如果计算key值可写为map(_,1) |
filter(func) | Return a new dataset formed by selecting those elements of the source on which func returns true. | filter(f: T => Boolean)其中f定义了类型为T的元素是否留下,过滤输入RDD中的元素,将f返回true的元素留下,比如可用来找出响应码404的用户rdd.filter(_404) |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). | flatMap(f: T =>TraversableOnce[U]): RDD[U],将函数f作用在RDD中每个元素上,并展开(flatten)输出的每个结果, flatMap = flatten + map,先映射(map),再拍扁(flatten )将二维集合变成一维集合 |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator |
mapPartitions(x:Iterator[Int]) |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator |
mapPartitionsWithIndex()传入的方法需要两个参数,一个为分区Id,另一个为分区数据,该方法可用来查看各分区的数据 |
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. | 根据给定的随机种子seed,随机抽样出数量为frac的数据 |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. | 将多个RDD合并为一个RDD |
distinct([numPartitions])) | Return a new dataset that contains the distinct elements of the source dataset. | 去重 |
groupByKey([numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable |
按Key进行分组,返回[K,Iterable[V]],numPartitions设置分区数,提高作业并行度 |
reduceByKey(func, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. | 按Key进行分组,使用给定的func函数聚合value值, numPartitions设置分区数,提高作业并行度 |
sortByKey([ascending], [numPartitions]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. | 对两个RDD(如:(K,V)和(K,W))相同Key的元素先分别做聚合,最后返回(K,Iterator |
join(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. | 对两个RDD先进行cogroup操作形成新的RDD,再对每个Key下的元素进行笛卡尔积,numPartitions设置分区数,提高作业并行度 |
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. | 交集 |
repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. | 通过创建更过或更少的分区将数据随机的打散,让数据在不同分区之间相对均匀。这个操作经常是通过网络进行数打散。 |
构建rdd
val conf = new SparkConf()
.setAppName("demo1")
.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\mapdata\\")
val rdd: RDD[String] = sc.textFile("C:\\Users\\Administrator\\Desktop\\mapdata\\access.log")
def map(f: T => U): RDD[U]
1.map
1.1声明 def map(f: T => U): RDD[U]
1.2参数 一个一元函数,参数是原RDD的元素类型,返回值可以改变类型
1.3返回值 新的RDD,泛型是f的返回值类型
1.4作用 将原RDD中的每个元素应用到f中,将返回值收集到一个新的RDD中
rdd1.map((x=>{
new Tuple2(x._1,1)
}))
filter def filter(f: T => Boolean): RDD[T]
2.filter
2.1.声明def filter(f: T => Boolean): RDD[T]
2.2.参数 一元函数,参数是原rdd的中的元素,返回值是Boolean
2.3.返回值 和原RDD泛型一致的RDD
2.4.作用 过滤元素 (f返回True的元素将被留下)
rdd1.filter(x=>{
x._3==404
})
rdd1.filter(_._3 == 404)
def flatMap(f: T => TraversableOnce[U]): RDD[U]
3.flatMap
1.声明def flatMap(f: T => TraversableOnce[U]): RDD[U]
2.参数 一个一元函数,参数是原RDD的元素类型,返回值是一个集合
3.返回值 是一个f的返回值泛型的RDD
4.作用 将二维集合变成一维集合
val conf = new SparkConf().setAppName("0113").setMaster("local[2]")
val sc = new SparkContext(conf)
val list = List("give you", "give me", "a job")
val rdd=sc.parallelize(list)
val rdd2=rdd.flatMap(x=>{
val s = x.split(" ")
s
})
rdd2.foreach(println)
val rdd3: RDD[String] = rdd1.flatMap(_._1.replace(".", "").split(""))
mapPartitions
4.mapPartitions
4.1.声明 def mapPartitions(
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
4.2.参数
第一个参数
一个一元函数,参数是一个原RDD泛型的迭代器,
这个迭代器每次传入一个分区的全部元素
返回值也是一个迭代器,泛型不限
4.3.返回值 RDD[U] 泛型为f返回值泛型的RDD
4.4.作用
之前map是每次拿到一个元素
mapPartitions一次性拿到一个分区的元素
在将处理完的结果放回到新的RDD中
如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
val data = sc.parallelize(1 to 4)
data.mapPartitions(x=>mapParrtitionsF(x)).count()
data.map(x=>mapF(x)).count()
def mapF(x:Int):Int={
println("调用了mapF方法")
x
}
def mapParrtitionsF(x:Iterator[Int]):Iterator[Int]={
println("调用了mapParrtitionsF方法")
x
}
调用了mapParrtitionsF方法
调用了mapParrtitionsF方法
调用了mapF方法
调用了mapF方法
调用了mapF方法
调用了mapF方法
mapPartitionsWithIndex
5.mapPartitionsWithIndex
5.1.声明 def mapPartitionsWithIndex(
分区索引
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
5.2.参数 与mapPartitions类似,多了一个int类型的参数,接收传进来的分区索引
5.3.返回值 RDD[U]
5.4.作用 与mapPartitions类似
al data = sc.parallelize(1 to 5,3)
data.mapPartitionsWithIndex((x,iter)=>{
var result=List[String]()
while (iter.hasNext){
result ::=(x+"-"+iter.next())
}
result.toIterator
}).foreach(println)
0-1
1-3
1-2
2-5
2-4
sample
计算机的随机算法,可以保证如果输入参数不同–>>输出结果不同
1.声明 def sample(
withReplacement: Boolean, //是否放回
fraction: Double, //抽样比例 不是非常精准
seed: Long = Utils.random.nextLong //随机种子
): RDD[T]
2.参数
3.返回值 和原RDD类型相同的RDD
4.作用 在海量数据中进行抽样
val list1 = 1 to 1000
val listRDD = sc.parallelize(list1)
val sampleRDD = listRDD.sample(false, 0.2)
sampleRDD.foreach(num => print(num + " "))
println("第一次抽样 " + sampleRDD.count())
println("第二次抽样" + sc.parallelize(list1).sample(false, 0.2).count())
1 3 19 20 22 25 33...
第一次抽样 181
第二次抽样197
union RDDA ∪ RDDB
7.union RDDA ∪ RDDB
1.声明 def union(other: RDD[T]): RDD[T]
2.参数 另外一个和原RDD类型相同的RDD
3.返回值 和原RDD类型相同
4.作用 合并RDD
val pairs1 = Seq(("A",1), ("B",1), ("C",1), ("D", 1), ("A", 2), ("C", 3))
val rdd5 = sc.makeRDD(pairs1, 3)
val pairs2 = Seq(("A",4), ("D",1), ("E", 1))
val rdd6 = sc.makeRDD(pairs2, 2)
rdd5.union(rdd6).foreach(println)
(A,1)
(B,1)
(D,1)
(E,1)
(A,4)
intersection RDDA ∩ RDDB
8.intersection RDDA ∩ RDDB
1.声明 def intersection(other: RDD[T]): RDD[T]
2.参数 另外一个和原RDD类型相同的RDD
3.返回值 和原RDD类型相同
4.作用 交集
val pairs1 = Seq(("A",1), ("B",1), ("C",1), ("D", 1), ("A", 2), ("C", 3))
val rdd5 = sc.makeRDD(pairs1, 3)
val pairs2 = Seq(("A",1), ("D",1), ("E", 1))
val rdd6 = sc.makeRDD(pairs2, 2)
rdd5.intersection(rdd6).foreach(println)
(A,1)
(D,1)
distinct
sc.makeRDD(Array(1, 2, 1, 1, 2, 3, 4, 5))
.distinct()
.foreach(println)
1
4
3
2
5
partitionBy
0 partitionBy
1.声明 def partitionBy(partitioner: Partitioner): RDD[(K, V)]
2.参数 是一个分区器对象的实例
3.返回值 和原RDD类型相同
4.作用 按照自定的分区器,对RDD中的元素进行分区
//使用partitionBy重分区
var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
groupByKey
对数组进行 group by key操作
groupByKey([numTasks]): 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。
注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
mr中:<k1, v1>—>map操作—><k2, v2>—>shuffle—><k2, [v21, v22, v23…]>—><k3, v3>
groupByKey类似于shuffle操作
和reduceByKey有点类似,但是有区别,reduceByKey有本地的规约,而groupByKey没有本地规约,所以一般情况下, 尽量慎用groupByKey,如果一定要用的话,可以自定义一个groupByKey,在自定义的gbk中添加本地预聚合操作
val list = List("give you", "give me", "a job")
val rdd=sc.parallelize(list)
val rdd9=rdd.map(x=>{
new Tuple2(x,1)
})
rdd9.foreach(println)
val rdd10=rdd9.groupByKey()
rdd10.foreach(x=>{
println(x._1+"sss"+x._2)
})
(give me,1)
(give you,1)
(a job,1)
give yousssCompactBuffer(1)
a jobsssCompactBuffer(1)
give messsCompactBuffer(1)
reduceByKey
val list = List("give you", "give me", "a job")
val rdd=sc.parallelize(list)
val rdds=rdd.flatMap(x=>{
x.split(" ")
})
val rdd9=rdds.map(x=>{
new Tuple2(x,1)
}).reduceByKey((v1,v2)=>{
v1+v2
}).foreach(println)
(job,1)
(you,1)
(a,1)
(give,2)
(me,1)
sortbykey
val list = List("give you", "give me", "a job")
val rdd=sc.parallelize(list)
val rdds=rdd.flatMap(x=>{
x.split(" ")
})
val rdd9=rdds.map(x=>{
new Tuple2(x,1)
})
rdd9.foreach(println)
rdd9.sortByKey().foreach(println)
(give,1)
(give,1)
(me,1)
(you,1)
(a,1)
(job,1)
(job,1)
(me,1)
(you,1)
(a,1)
(give,1)
(give,1)
join
val pairs1 = Seq(("A",1), ("B",1), ("C",1), ("D", 1), ("A", 2), ("C", 3))
val rdd5 = sc.makeRDD(pairs1, 3)
val pairs2 = Seq(("A",1), ("D",1), ("E", 1))
val rdd6 = sc.makeRDD(pairs2, 2)
val joinrdd=rdd5.join(rdd6)
joinrdd.foreach(println)
(A,(1,1))
(A,(2,1))
(D,(1,1))
repartition
val list = List("give you", "give me", "a job")
val rdd=sc.parallelize(list)
val rdds=rdd.flatMap(x=>{
x.split(" ")
})
val rdd9=rdds.map(x=>{
new Tuple2(x,1)
})
rdd9.foreach(x=>{
println(x+"当前分区"+TaskContext.getPartitionId())
})
println("=======")
rdd9.repartition(3).foreach(x=>{
println(x+"当前分区"+TaskContext.getPartitionId())
})
(give,1)当前分区1
(me,1)当前分区1
(a,1)当前分区1
(job,1)当前分区1
(give,1)当前分区0
(you,1)当前分区0
(give,1)当前分区1
(give,1)当前分区1
(job,1)当前分区1
(a,1)当前分区0
(you,1)当前分区2
(me,1)当前分区2