spark算子


def map[U: ClassTag](f: T => U): RDD[U]
遍历每个元素通过f函数变换
def map[U: ClassTag](f: T => U): RDD[U]
遍历每个元素通过f函数变换

def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]
以分区为单位执行Map

def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]
以分区为单位执行Map,带分区号

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中

def glom(): RDD[Array[T]]
将RDD中每一个分区变成一个数组

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
分组,按照传入函数的返回值进行分组

def filter(f: T => Boolean): RDD[T]
对RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中.

def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T]
从数据中采样

def distinct(): RDD[T]
去重

def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T]
合并分区

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
内部其实执行的是coalesce操作,参数shuffle的默认值为true

 def sortBy[K](
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。

def pipe(command: String, env: Map[String, String]): RDD[String]
 调用脚本

def intersection(other: RDD[T], numPartitions: Int): RDD[T]
 求交集

def union(other: RDD[T]): RDD[T]
  求并集

def subtract(other: RDD[T]): RDD[T]
   求差集

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
将两个RDD中的元素,以键值对的形式进行合并

 def partitionBy(partitioner: Partitioner): RDD[(K, V)]
 按照k重新分区

 def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
 将RDD[K,V]中的元素按照相同的K对V进行聚合

def groupByKey(): RDD[(K, Iterable[V])]
groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。

 def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)]
 按照K处理分区内和分区间逻辑

 def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
 分区内和分区间相同的aggregateByKey()

  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)]
  (1)createCombiner(转换数据的结构): combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
2)mergeValue(分区内): 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
3)mergeCombiners(分区间): 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的 mergeCombiners()方法将各个分区的结果进行合并。
针对相同K,将V合并成一个集合。

def sortByKey(
       ascending: Boolean = true, // 默认,升序
       numPartitions: Int = self.partitions.length)  : RDD[(K, V)]
按照K进行排序

def mapValues[U](f: V => U): RDD[(K, U)]
针对于(K,V)形式的类型只对V进行操作

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
在类型为(K,V)(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
在类型为(K,V)(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))

def reduce(f: (T, T) => T): T
f函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

def collect(): Array[T]
以数组Array的形式返回数据集的所有元素

def count(): Long
返回RDD中元素的个数

first()返回RDD中的第一个元素

take()返回由RDD前n个元素组成的数组

takeOrdered()返回该RDD排序后前n个元素组成的数组

aggregate()

fold()

countByKey()

saveAsTextFile(path)保存成Text文件

foreach(f)
spark算子

发表评论

电子邮件地址不会被公开。 必填项已用*标注

滚动到顶部