温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么通过SparkSql将scala类转换为DataFrame

发布时间:2021-06-16 17:17:26 来源:亿速云 阅读:293 作者:Leah 栏目:开发技术

这篇文章将为大家详细讲解有关怎么通过SparkSql将scala类转换为DataFrame,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

如下所示:

import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory   /**  * Created by silentwolf on 2016/6/3.  */   case class UserTag(SUUID: String,      MAN: Float,      WOMAN: Float,      AGE10_19: Float,      AGE20_29: Float,      AGE30_39: Float,      AGE40_49: Float,      AGE50_59: Float,      GAME: Float,      MOVIE: Float,      MUSIC: Float,      ART: Float,      POLITICS_NEWS: Float,      FINANCIAL: Float,      EDUCATION_TRAINING: Float,      HEALTH_CARE: Float,      TRAVEL: Float,      AUTOMOBILE: Float,      HOUSE_PROPERTY: Float,      CLOTHING_ACCESSORIES: Float,      BEAUTY: Float,      IT: Float,      BABY_PRODUCT: Float,      FOOD_SERVICE: Float,      HOME_FURNISHING: Float,      SPORTS: Float,      OUTDOOR_ACTIVITIES: Float,      MEDICINE: Float      )   object UserTagTable {    val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass)    val REP_HOME = s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"    def main(args: Array[String]) {    var startTime = System.currentTimeMillis()    val conf: com.typesafe.config.Config = ConfigFactory.load()    val sc = new SparkContext()    val sqlContext = new SQLContext(sc)    var df1: DataFrame = null    if (args.length == 0) {   println("请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11")  }  else {     var appkey = args(0)     var lastdate = args(1)     df1 = loadDataFrame(sqlContext, appkey, "2016-04-10", lastdate)     df1.registerTempTable("suuidTable")     sqlContext.udf.register("taginfo", (a: String) => userTagInfo(a))   sqlContext.udf.register("intToString", (b: Long) => intToString(b))   import sqlContext.implicits._     //***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。  sqlContext.sql(" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid").map { case Row(suuid: String, taginfo: String) =>   val taginfoObj = JSON.parseObject(taginfo)   UserTag(suuid.toString,    taginfoObj.getFloat("man"),    taginfoObj.getFloat("woman"),    taginfoObj.getFloat("age10_19"),    taginfoObj.getFloat("age20_29"),    taginfoObj.getFloat("age30_39"),    taginfoObj.getFloat("age40_49"),    taginfoObj.getFloat("age50_59"),    taginfoObj.getFloat("game"),    taginfoObj.getFloat("movie"),    taginfoObj.getFloat("music"),    taginfoObj.getFloat("art"),    taginfoObj.getFloat("politics_news"),    taginfoObj.getFloat("financial"),    taginfoObj.getFloat("education_training"),    taginfoObj.getFloat("health_care"),    taginfoObj.getFloat("travel"),    taginfoObj.getFloat("automobile"),    taginfoObj.getFloat("house_property"),    taginfoObj.getFloat("clothing_accessories"),    taginfoObj.getFloat("beauty"),    taginfoObj.getFloat("IT"),    taginfoObj.getFloat("baby_Product"),    taginfoObj.getFloat("food_service"),    taginfoObj.getFloat("home_furnishing"),    taginfoObj.getFloat("sports"),    taginfoObj.getFloat("outdoor_activities"),    taginfoObj.getFloat("medicine")   )}.toDF().registerTempTable("resultTable")     val resultDF = sqlContext.sql(s"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," +   "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," +   "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," +   "MEDICINE from resultTable WHERE SUUID IS NOT NULL")   resultDF.write.mode(SaveMode.Overwrite).options(   Map("table" -> "USER_TAGS", "zkUrl" -> conf.getString("Hbase.url"))   ).format("org.apache.phoenix.spark").save()    }  }    def intToString(suuid: Long): String = {  suuid.toString()  }    def userTagInfo(num1: String): String = {    var de = new DecimalFormat("0.00")  var mannum = de.format(math.random).toFloat  var man = mannum  var woman = de.format(1 - mannum).toFloat    var age10_19num = de.format(math.random * 0.2).toFloat  var age20_29num = de.format(math.random * 0.2).toFloat  var age30_39num = de.format(math.random * 0.2).toFloat  var age40_49num = de.format(math.random * 0.2).toFloat    var age10_19 = age10_19num  var age20_29 = age20_29num  var age30_39 = age30_39num  var age40_49 = age40_49num  var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat    var game = de.format(math.random * 1).toFloat  var movie = de.format(math.random * 1).toFloat  var music = de.format(math.random * 1).toFloat  var art = de.format(math.random * 1).toFloat  var politics_news = de.format(math.random * 1).toFloat    var financial = de.format(math.random * 1).toFloat  var education_training = de.format(math.random * 1).toFloat  var health_care = de.format(math.random * 1).toFloat  var travel = de.format(math.random * 1).toFloat  var automobile = de.format(math.random * 1).toFloat    var house_property = de.format(math.random * 1).toFloat  var clothing_accessories = de.format(math.random * 1).toFloat  var beauty = de.format(math.random * 1).toFloat  var IT = de.format(math.random * 1).toFloat  var baby_Product = de.format(math.random * 1).toFloat    var food_service = de.format(math.random * 1).toFloat  var home_furnishing = de.format(math.random * 1).toFloat  var sports = de.format(math.random * 1).toFloat  var outdoor_activities = de.format(math.random * 1).toFloat  var medicine = de.format(math.random * 1).toFloat    "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," +   "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," +   "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," +   "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," +   "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," +   "\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," +   "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine +   "}";    }    def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = {  val path = s"$REP_HOME/appstatistic"  ctx.read.parquet(path)   .filter(s"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'")  }     }

关于怎么通过SparkSql将scala类转换为DataFrame就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI