温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

SparkStreaming的实现和使用方法

发布时间:2021-09-07 10:33:31 来源:亿速云 阅读:154 作者:chen 栏目:编程语言

这篇文章主要讲解了“SparkStreaming的实现和使用方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SparkStreaming的实现和使用方法”吧!

一.DStream 整合RDD

1.官网算子

SparkStreaming的实现和使用方法

2.使用案例

生产中使用多的是一个文件中有很多域名,另一个中是黑名单,要进行剔除 数据一:日志信息    DStream     domain,traffic     xinlang.com     xinlang.com     baidu.com 数据二:已有的文件  黑名单  RDD     domain     baidu.com

3.RDD实现上述需求

package sparkstreaming02 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object Demo1 {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setAppName("Demo1").setMaster("local[2]")     val sc = new SparkContext(conf)     val input1 = new ListBuffer[(String,Long)]     input1.append(("www.xinlang.com", 8888))     input1.append(("www.xinalng.com", 9999))     input1.append(("www.baidu.com", 7777))     val data1 = sc.parallelize(input1)     //进行join一定要是key,value形式的     val input2 = new ListBuffer[(String,Boolean)]     input2.append(("www.baidu.com",true))     val data2 = sc.parallelize(input2)     data1.leftOuterJoin(data2)       .filter(x => {         x._2._2.getOrElse(false) != true       }).map(x => (x._1,x._2._1))       .collect().foreach(println)   } }

4.SparkStreaming实现

package sparkstreaming02 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ListBuffer object Streaming {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setAppName("Streaming").setMaster("local[2]")     val ssc = new StreamingContext(conf,Seconds(10))     val lines = ssc.socketTextStream("s201",9999)     // 数据二: rdd     val input2 = new ListBuffer[(String,Boolean)]     input2.append(("www.baidu.com",true))     val data2 = ssc.sparkContext.parallelize(input2)     lines.map(x=>(x.split(",")(0), x)).transform(       rdd => {         rdd.leftOuterJoin(data2)           .filter(x => {             x._2._2.getOrElse(false) != true //注意 join之后过滤           }).map(x => (x._1,x._2._1))       }     ).print()     ssc.start()     ssc.awaitTermination()   } }

二.SparkStreaming插入外部数据源

1.插入外部数据源用的,但是使用这个有几个坑

SparkStreaming的实现和使用方法

2.错误一官网例子

SparkStreaming的实现和使用方法

3.原因

connect 在Driver端创建,record在executor,发过去序列化错误

SparkStreaming的实现和使用方法

4.解决

解决:第一种把connect放到executor端 这样弊端是每条记录会生成一个connect太耗费资源         words.foreachRDD { rdd =>           rdd.foreach { record =>             val connection = createConnection()  // executed at the driver             val word = record._1             val count = record._2.toInt             val sql = s"insert into wc (wc,count) values($word,$count)"            connection.createStatement().execute(sql)          }

5.最终解决办法

package sparkstreaming02 import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object MysqlStreaming {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setMaster("local[2]").setAppName("MysqlStreaming")     val ssc = new StreamingContext(conf,Seconds(1))     val lines = ssc.socketTextStream("s201",9999)     val words = lines.flatMap(x => x.split(",")).map((_,1)).reduceByKey(_+_) //    words.foreachRDD { rdd => //      val connection = createConnection()  // executed at the driver //      rdd.foreach { record => //        val word = record._1 //        val count = record._2 //        val sql = s"insert into wc (word,count) values($word,$count)" //        connection.createStatement().execute(sql) //      } //    } //        words.foreachRDD { rdd => //          rdd.foreach { record => //            val connection = createConnection()  // executed at the driver //            val word = record._1 //            val count = record._2.toInt //            val sql = s"insert into wc (wc,count) values($word,$count)" //            connection.createStatement().execute(sql) //          } //        }     //最终的写法     words.foreachRDD { rdd =>       rdd.foreachPartition { partitionOfRecords =>         val connection = createConnection()         partitionOfRecords.foreach(           record =>{         val word = record._1         val count = record._2         val sql = s"insert into wc (wc,count) values('$word',$count)"         connection.createStatement().execute(sql)}         )       }     }     ssc.start()     ssc.awaitTermination()   }   def createConnection() = {     Class.forName("com.mysql.cj.jdbc.Driver")     DriverManager.getConnection("jdbc:mysql://localhost:3306/hive?serverTimezone=UTC&useSSL=false","root","123456")   } }

6.出现问题

错误,插入数据库时,你要插入字符串要用'' 例如: val sql = s"insert into wc (wc,count) values($word,$count)" word是字符串,你要不加双引号就报这个错误 正确 val sql = s"insert into wc (wc,count) values('$word',$count)"

SparkStreaming的实现和使用方法

感谢各位的阅读,以上就是“SparkStreaming的实现和使用方法”的内容了,经过本文的学习后,相信大家对SparkStreaming的实现和使用方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI