温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spark RDD常用算子是什么类型的

发布时间:2022-02-19 11:34:57 来源:亿速云 阅读:295 作者:小新 栏目:开发技术

小编给大家分享一下Spark RDD常用算子是什么类型的,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

Spark RDD常用算子:Value类型

Spark之所以比Hadoop灵活和强大,其中一个原因是Spark内置了许多有用的算子,也就是方法。通过对这些方法的组合,编程人员就可以写出自己想要的功能。说白了spark编程就是对spark算子的使用,下面为大家详细讲解一下SparkValue类型的常用算子

Spark RDD常用算子是什么类型的

map

函数说明:

map() 接收一个函数,该函数将RDD中的元素逐条进行映射转换,可以是类型的转换,也可以是值的转换,将函数的返回结果作为结果RDD编程。

函数签名:

def map[U: ClassTag](f: T => U): RDD[U]

案例演示

   val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")    val sc = new SparkContext(sparkConf)    //算子 -map    val rdd = sc.makeRDD(List(1, 2, 3, 4),2)    val mapRdd1 = rdd.map(      _*2    )    mapRdd1.collect().foreach(println)    sc.stop()

运行结果

2 4 6 8

mapPartitons

函数说明:

将待处理的数据以分区为单位发送到待计算节点上进行处理,mapPartition是对RDD的每一个分区的迭代器进行操作,返回的是迭代器。这里的处理可以进行任意的处理。

函数签名:

def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

案例演示

 def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")    val sc = new SparkContext(sparkConf)    //算子 -mapPartitons 计算每个分区的最大数    val rdd = sc.makeRDD(List(1, 34, 36,345,2435,2342,62,35, 4),4)    val mapParRdd = rdd.mapPartitions(      iter => {        List(iter.max).iterator      }    )    mapParRdd.foreach(println)    sc.stop()  }

运行结果:

62 2435 34 345

mapPartitonsWithIndex

函数说明:

将待处理的数据以分区为单位发送到计算节点上,这里的处理可以进行任意的处理,哪怕是过滤数据,在处理的同时可以获取当前分区的索引值。

函数签名:

def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

案例演示:

  1. 将数据进行扁平化映射并且打印所在的分区数
def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")    val sc = new SparkContext(conf)    val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala", "Word Count"),2)    val mapRDD = rdd.flatMap(_.split(" "))    val mpwiRdd = mapRDD.mapPartitionsWithIndex(      (index, datas) => {        datas.map(          num => {            (index, num)          }        )      }    )    mpwiRdd.collect().foreach(println)  }

运行结果:

(0,Hello) (0,Spark) (1,Hello) (1,Scala) (1,Word) (1,Count)
  1. 将数据进行扁平化映射只打印所在第一分区的数据
def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")    val sc = new SparkContext(conf)    val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala", "Word Count"),2)    val mapRDD = rdd.flatMap(_.split(" "))    val mpwiRdd = mapRDD.mapPartitionsWithIndex(      (index, datas) => {        if (index==0){          datas.map(            num => {              (index, num)            }          )        }else{        Nil.iterator        }      }    )    mpwiRdd.collect().foreach(println)

运行结果:

(0,Hello) (0,Spark)

flatMap

函数说明:

将数据进行扁平化之后在做映射处理,所以算子也称为扁平化映射

函数签名:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

案例演示:

将每个单词进行扁平化映射

def main(args: Array[String]): Unit = {  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")  val sc = new SparkContext(sparkConf)  //算子 -map  val rdd = sc.makeRDD(List("Hello Scala","Hello Spark"), 2)  val FltRdd = rdd.flatMap(    _.split(" ")  )  FltRdd.foreach(println)  sc.stop() }

运行结果:

Hello Scala Hello Spark

glom

函数说明:

glom的作用就是将一个分区的数据合并到一个array中。

函数签名:

def glom(): RDD[Array[T]]

案例演示:

  1. 将不同分区rdd的元素合并到一个分区
 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")    val sc = new SparkContext(conf)    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9),2)    val glomRdd = rdd.glom()    glomRdd.collect().foreach(data=>println(data.mkString(",")))    sc.stop()  }

运行结果:

1,2,3,4 5,6,7,8,9

groupBy

函数说明:

将数据根据指定的规则进行分组,分区默认不变,单数数据会被打乱,我们成这样的操作为shuffer,

函数签名:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

案例演示:

  1. 按照奇偶数进行groupby分区
 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")    val sc = new SparkContext(conf)    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8,10),2)    val groupByRDD = rdd.groupBy(_ % 2 == 0)    groupByRDD.collect().foreach(println)    sc.stop()  }

运行结果:

(false,CompactBuffer(1, 3, 5, 7)) (true,CompactBuffer(2, 4, 6, 8, 10))
  1. 按照单词的首字母进行分组
 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")    val sc = new SparkContext(conf)    val rdd = sc.makeRDD(List("Hello","Tom","Timi","Scala","Spark"))    val groupByRDD = rdd.groupBy(_.charAt(0))    groupByRDD.collect().foreach(println)    sc.stop()  }

运行结果:

(T,CompactBuffer(Tom, Timi)) (H,CompactBuffer(Hello)) (S,CompactBuffer(Scala, Spark))

filter

函数说明:

filter即过滤器的意思,所以filter算子的作用就是过滤的作用。filter将根据指定的规则进行筛选过滤,符合条件的数据保留,不符合的数据丢弃,当数据进行筛选过滤之后,分区不变,但分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

函数签名:

def filter(f: T => Boolean): RDD[T]

案例演示:

  1. 筛选出能被二整除的数字
 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")    val sc = new SparkContext(conf)    val rdd = sc.makeRDD(List(46,235,246,2346,3276,235,234,6234,6245,246,24,6246,235,26,265))    val filterRDD = rdd.filter(_ % 2 == 0)    filterRDD.collect().foreach(println)    sc.stop()  }

运行结果:

46 246 2346 3276 234 6234 246 24 6246 26

2.筛选单词中包含H的

 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")    val sc = new SparkContext(conf)    val rdd = sc.makeRDD(List("Hello","Horber","Hbeer","ersfgH","Scala","Hadoop","Zookeeper"))    val filterRDD = rdd.filter(_.contains("H"))    filterRDD.collect().foreach(println)    sc.stop()  }

运行结果:

Hello Horber Hbeer ersfgH Hadoop

以上是“Spark RDD常用算子是什么类型的”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI