RDD的转换算子

点击查看sparkrdd官方文档

算子名称 官网解释 作用
map(func) Return a new distributed dataset formed by passing each element of the source through a function func. map (f: T => U) : RDD[U] 其中f定义了类型为T的元素到类型为U,如果计算key值可写为map(_,1)
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true. filter(f: T => Boolean)其中f定义了类型为T的元素是否留下,过滤输入RDD中的元素,将f返回true的元素留下,比如可用来找出响应码404的用户rdd.filter(_404)
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). flatMap(f: T =>TraversableOnce[U]): RDD[U],将函数f作用在RDD中每个元素上,并展开(flatten)输出的每个结果, flatMap = flatten + map,先映射(map),再拍扁(flatten )将二维集合变成一维集合
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T. mapPartitions(x:Iterator[Int])
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T. mapPartitionsWithIndex()传入的方法需要两个参数,一个为分区Id,另一个为分区数据,该方法可用来查看各分区的数据
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. 根据给定的随机种子seed,随机抽样出数量为frac的数据
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument. 将多个RDD合并为一个RDD
distinct([numPartitions])) Return a new dataset that contains the distinct elements of the source dataset. 去重
groupByKey([numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks. 按Key进行分组,返回[K,Iterable[V]],numPartitions设置分区数,提高作业并行度
reduceByKey(func, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. 按Key进行分组,使用给定的func函数聚合value值, numPartitions设置分区数,提高作业并行度
sortByKey([ascending], [numPartitions]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. 对两个RDD(如:(K,V)和(K,W))相同Key的元素先分别做聚合,最后返回(K,Iterator,Iterator)形式的RDD,numPartitions设置分区数,提高作业并行度
join(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. 对两个RDD先进行cogroup操作形成新的RDD,再对每个Key下的元素进行笛卡尔积,numPartitions设置分区数,提高作业并行度
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument. 交集
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. 通过创建更过或更少的分区将数据随机的打散,让数据在不同分区之间相对均匀。这个操作经常是通过网络进行数打散。
构建rdd
val conf = new SparkConf()
      .setAppName("demo1")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\mapdata\\")
    val rdd: RDD[String] = sc.textFile("C:\\Users\\Administrator\\Desktop\\mapdata\\access.log")

def map(f: T => U): RDD[U]

1.map
1.1声明 def map(f: T => U): RDD[U]
1.2参数 一个一元函数,参数是原RDD的元素类型,返回值可以改变类型
1.3返回值 新的RDD,泛型是f的返回值类型
1.4作用 将原RDD中的每个元素应用到f中,将返回值收集到一个新的RDD中

rdd1.map((x=>{
      new Tuple2(x._1,1)
    }))

filter def filter(f: T => Boolean): RDD[T]

2.filter
2.1.声明def filter(f: T => Boolean): RDD[T]
2.2.参数 一元函数,参数是原rdd的中的元素,返回值是Boolean
2.3.返回值 和原RDD泛型一致的RDD
2.4.作用 过滤元素 (f返回True的元素将被留下)

rdd1.filter(x=>{
  x._3==404
})
rdd1.filter(_._3 == 404)

def flatMap(f: T => TraversableOnce[U]): RDD[U]

3.flatMap
1.声明def flatMap(f: T => TraversableOnce[U]): RDD[U]
2.参数 一个一元函数,参数是原RDD的元素类型,返回值是一个集合
3.返回值 是一个f的返回值泛型的RDD
4.作用 将二维集合变成一维集合

    val conf = new SparkConf().setAppName("0113").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val list = List("give you", "give me", "a job")
    val rdd=sc.parallelize(list)
    val rdd2=rdd.flatMap(x=>{
     val s = x.split(" ")
      s
    })
    rdd2.foreach(println)
    val rdd3: RDD[String] = rdd1.flatMap(_._1.replace(".", "").split(""))

mapPartitions

4.mapPartitions
4.1.声明 def mapPartitions(
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
4.2.参数
第一个参数
一个一元函数,参数是一个原RDD泛型的迭代器,
这个迭代器每次传入一个分区的全部元素
返回值也是一个迭代器,泛型不限
4.3.返回值 RDD[U] 泛型为f返回值泛型的RDD
4.4.作用
之前map是每次拿到一个元素
mapPartitions一次性拿到一个分区的元素
在将处理完的结果放回到新的RDD中

如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
val data = sc.parallelize(1 to 4)

    data.mapPartitions(x=>mapParrtitionsF(x)).count()
    data.map(x=>mapF(x)).count()

    def mapF(x:Int):Int={
      println("调用了mapF方法")
      x
    }

    def mapParrtitionsF(x:Iterator[Int]):Iterator[Int]={
      println("调用了mapParrtitionsF方法")
      x
    }
调用了mapParrtitionsF方法
调用了mapParrtitionsF方法

调用了mapF方法
调用了mapF方法
调用了mapF方法
调用了mapF方法

mapPartitionsWithIndex

5.mapPartitionsWithIndex
5.1.声明 def mapPartitionsWithIndex(
分区索引
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
5.2.参数 与mapPartitions类似,多了一个int类型的参数,接收传进来的分区索引
5.3.返回值 RDD[U]
5.4.作用 与mapPartitions类似

al data = sc.parallelize(1 to 5,3)

    data.mapPartitionsWithIndex((x,iter)=>{
      var result=List[String]()
      while (iter.hasNext){
        result ::=(x+"-"+iter.next())
      }
      result.toIterator
    }).foreach(println)
0-1
1-3
1-2

2-5
2-4

sample

计算机的随机算法,可以保证如果输入参数不同–>>输出结果不同

1.声明  def sample(
      withReplacement: Boolean, //是否放回
      fraction: Double, //抽样比例 不是非常精准
      seed: Long = Utils.random.nextLong //随机种子
     ): RDD[T]
2.参数
3.返回值 和原RDD类型相同的RDD
4.作用  在海量数据中进行抽样
 val list1 = 1 to 1000
    val listRDD = sc.parallelize(list1)
    val sampleRDD = listRDD.sample(false, 0.2)

    sampleRDD.foreach(num => print(num + " "))
    println("第一次抽样 " + sampleRDD.count())
    println("第二次抽样" + sc.parallelize(list1).sample(false, 0.2).count())

    1 3 19 20 22 25 33...
    第一次抽样 181
    第二次抽样197

union RDDA ∪ RDDB

7.union RDDA ∪ RDDB
1.声明 def union(other: RDD[T]): RDD[T]
2.参数 另外一个和原RDD类型相同的RDD
3.返回值 和原RDD类型相同
4.作用 合并RDD

val pairs1 = Seq(("A",1), ("B",1), ("C",1), ("D", 1), ("A", 2), ("C", 3))

    val rdd5 = sc.makeRDD(pairs1, 3)

    val pairs2 = Seq(("A",4), ("D",1), ("E", 1))

    val rdd6 = sc.makeRDD(pairs2, 2)

    rdd5.union(rdd6).foreach(println)
(A,1)
(B,1)
(D,1)
(E,1)
(A,4)

intersection RDDA ∩ RDDB

8.intersection RDDA ∩ RDDB
1.声明 def intersection(other: RDD[T]): RDD[T]
2.参数 另外一个和原RDD类型相同的RDD
3.返回值 和原RDD类型相同
4.作用 交集

    val pairs1 = Seq(("A",1), ("B",1), ("C",1), ("D", 1), ("A", 2), ("C", 3))

    val rdd5 = sc.makeRDD(pairs1, 3)

    val pairs2 = Seq(("A",1), ("D",1), ("E", 1))

    val rdd6 = sc.makeRDD(pairs2, 2)

    rdd5.intersection(rdd6).foreach(println)
(A,1)
(D,1)

distinct

sc.makeRDD(Array(1, 2, 1, 1, 2, 3, 4, 5))
      .distinct()
         .foreach(println)
1
4
3
2
5

partitionBy

0 partitionBy
1.声明 def partitionBy(partitioner: Partitioner): RDD[(K, V)]
2.参数 是一个分区器对象的实例
3.返回值 和原RDD类型相同
4.作用 按照自定的分区器,对RDD中的元素进行分区

//使用partitionBy重分区
var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))

groupByKey

对数组进行 group by key操作
groupByKey([numTasks]): 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。
注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
mr中:<k1, v1>—>map操作—><k2, v2>—>shuffle—><k2, [v21, v22, v23…]>—><k3, v3>
groupByKey类似于shuffle操作
和reduceByKey有点类似,但是有区别,reduceByKey有本地的规约,而groupByKey没有本地规约,所以一般情况下, 尽量慎用groupByKey,如果一定要用的话,可以自定义一个groupByKey,在自定义的gbk中添加本地预聚合操作

 val list = List("give you", "give me", "a job")
    val rdd=sc.parallelize(list)
    val rdd9=rdd.map(x=>{
      new Tuple2(x,1)
    })
    rdd9.foreach(println)
    val rdd10=rdd9.groupByKey()
    rdd10.foreach(x=>{
      println(x._1+"sss"+x._2)
    })
(give me,1)
(give you,1)
(a job,1)

give yousssCompactBuffer(1)
a jobsssCompactBuffer(1)
give messsCompactBuffer(1)

reduceByKey

 val list = List("give you", "give me", "a job")
    val rdd=sc.parallelize(list)
    val rdds=rdd.flatMap(x=>{
      x.split(" ")
    })
    val rdd9=rdds.map(x=>{
      new Tuple2(x,1)
    }).reduceByKey((v1,v2)=>{
      v1+v2
    }).foreach(println)
(job,1)
(you,1)
(a,1)
(give,2)
(me,1)

sortbykey

    val list = List("give you", "give me", "a job")
    val rdd=sc.parallelize(list)
    val rdds=rdd.flatMap(x=>{
      x.split(" ")
    })
    val rdd9=rdds.map(x=>{
      new Tuple2(x,1)
    })
    rdd9.foreach(println)
    rdd9.sortByKey().foreach(println)
(give,1)
(give,1)
(me,1)
(you,1)
(a,1)
(job,1)

(job,1)
(me,1)
(you,1)
(a,1)
(give,1)
(give,1)

join

 val pairs1 = Seq(("A",1), ("B",1), ("C",1), ("D", 1), ("A", 2), ("C", 3))

    val rdd5 = sc.makeRDD(pairs1, 3)

    val pairs2 = Seq(("A",1), ("D",1), ("E", 1))

    val rdd6 = sc.makeRDD(pairs2, 2)

    val joinrdd=rdd5.join(rdd6)
    joinrdd.foreach(println)
(A,(1,1))
(A,(2,1))
(D,(1,1))

repartition

 val list = List("give you", "give me", "a job")
    val rdd=sc.parallelize(list)
    val rdds=rdd.flatMap(x=>{
      x.split(" ")
    })
    val rdd9=rdds.map(x=>{
      new Tuple2(x,1)
    })
    rdd9.foreach(x=>{
      println(x+"当前分区"+TaskContext.getPartitionId())
    })
    println("=======")
    rdd9.repartition(3).foreach(x=>{
      println(x+"当前分区"+TaskContext.getPartitionId())
    })
(give,1)当前分区1
(me,1)当前分区1
(a,1)当前分区1
(job,1)当前分区1
(give,1)当前分区0
(you,1)当前分区0

(give,1)当前分区1
(give,1)当前分区1
(job,1)当前分区1
(a,1)当前分区0
(you,1)当前分区2
(me,1)当前分区2