温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

第97课:Spark Streaming 结合Spark SQL 案例

发布时间:2020-08-11 00:10:12 来源:网络 阅读:4376 作者:lqding1980 栏目:大数据

代码如下:

package com.dt.spark.streaming import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{StreamingContext, Duration} /**  * 使用SparkStreaming结合SparkSQL对日志进行分析。  * 假设电商网站点击日志格式(简化)如下:  * userid,itemId,clickTime  * 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中  * Created by dinglq on 2016/5/4.  */ object LogAnalyzerStreamingSQL {   val WINDOW_LENGTH = new Duration(600 * 1000)   val SLIDE_INTERVAL = new Duration(10 * 1000)   def main(args: Array[String]) {     val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]")     val sc = new SparkContext(sparkConf)     val sqlContext = new SQLContext(sc)     import sqlContext.implicits._     //从数据库中加载itemInfo表     val itemInfoDF = sqlContext.read.format("jdbc").options(Map(       "url"-> "jdbc:mysql://spark-master:3306/spark",       "driver"->"com.mysql.jdbc.Driver",       "dbtable"->"iteminfo",       "user"->"root",       "password"-> "vincent"       )).load()     itemInfoDF.registerTempTable("itemInfo")     val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)     val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming")     val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache()     val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)     windowDStream.foreachRDD(accessLogs => {       if (accessLogs.isEmpty()) {         println("No logs received in this time interval")       } else {         accessLogs.toDF().registerTempTable("accessLogs")         val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " +           " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " +           " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "         val topTenClickItemLast10Minus = sqlContext.sql(sqlStr)         // Persist top ten table for this window to HDFS as parquet file         topTenClickItemLast10Minus.show()       }     })     streamingContext.start()     streamingContext.awaitTermination()   } } case class AccessLog(userId: String, itemId: String, clickTime: String) { } object AccessLog {   def parseLogLine(log: String): AccessLog = {     val logInfo = log.split(",")     if (logInfo.length == 3) {       AccessLog(logInfo(0),logInfo(1), logInfo(2))     }     else {       AccessLog("0","0","0")     }   } }


MySQL中表的内容如下:

mysql> select * from spark.iteminfo; +--------+----------+ | itemid | itemname | +--------+----------+ | 001    | phone    | | 002    | computer | | 003    | TV       | +--------+----------+ 3 rows in set (0.00 sec)


在D创建目录logs_incoming


运行Spark Streaming 程序。


新建文件,内容如下:

0001,001,2016-05-04 22:10:20 0002,001,2016-05-04 22:10:21 0003,001,2016-05-04 22:10:22 0004,002,2016-05-04 22:10:23 0005,002,2016-05-04 22:10:24 0006,001,2016-05-04 22:10:25 0007,002,2016-05-04 22:10:26 0008,001,2016-05-04 22:10:27 0009,003,2016-05-04 22:10:28 0010,003,2016-05-04 22:10:29 0011,001,2016-05-04 22:10:30 0012,003,2016-05-04 22:10:31 0013,003,2016-05-04 22:10:32

将文件保存到目录logs_incoming 中,观察Spark程序的输出:

+------+--------+---+ |itemid|itemname|cnt| +------+--------+---+ |   001|   phone|  6| |   003|      TV|  4| |   002|computer|  3| +------+--------+---+



备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI