温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark怎么编写udaf函数求中位数

发布时间:2021-09-04 15:40:01 来源:亿速云 阅读:184 作者:chen 栏目:云计算

本篇内容主要讲解“spark怎么编写udaf函数求中位数”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“spark怎么编写udaf函数求中位数”吧!

package com.frank.sparktest.java; import org.apache.spark.sql.Row; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; public class MedianUdaf extends UserDefinedAggregateFunction {     private StructType inputSchema;     private StructType bufferSchema;     public MedianUdaf(){         List<StructField> inputFields = new ArrayList<>();         inputFields.add(DataTypes.createStructField("nums",DataTypes.IntegerType,true));         inputSchema=DataTypes.createStructType(inputFields);         List<StructField> bufferFields = new ArrayList<>();         bufferFields.add(DataTypes.createStructField("datas",DataTypes.StringType,true));         bufferSchema=DataTypes.createStructType(bufferFields);     }     @Override     public StructType inputSchema() {         return inputSchema;     }     @Override     public StructType bufferSchema() {         return bufferSchema;     }     @Override     public DataType dataType() {         return DataTypes.DoubleType;     }     @Override     public boolean deterministic() {         return true;     }     @Override     public void initialize(MutableAggregationBuffer buffer) {         buffer.update(0,0);         buffer.update(1,0);     }     @Override     public void update(MutableAggregationBuffer buffer, Row input) {         if (!input.isNullAt(0)){             buffer.update(0,buffer.getString(0)+","+input.getInt(0));         }     }     @Override     public void merge(MutableAggregationBuffer buffer1, Row buffer2) {         buffer1.update(0,buffer1.getString(0)+","+buffer2.getInt(0));     }     @Override     public Object evaluate(Row buffer) {         List<Integer> list = new ArrayList<Integer>();         List<String> stringList = Arrays.asList(buffer.getString(0).split(","));         for (String s : stringList){             list.add(Integer.valueOf(s));         }         Collections.sort(list);         int size = list.size();         int num=0;         if(size % 2 == 1) {             num = list.get((size / 2)+1);         }         if(size %2  == 0) {             num = (list.get(size / 2)+list.get((size / 2)+1))/2;         }         return num;     } }

上面是代码段,可以直接拿来使用

下面是测试程序

package com.frank.sparktest.java; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import java.io.IOException; import java.util.stream.IntStream; public class DemoUDAF {     public static void main(String[] args) throws IOException {         SQLContext sqlContext = SparkSession.builder().master("local").getOrCreate().sqlContext();         sqlContext.udf().register("generate", (Integer start, Integer end)-> IntStream.range(start, end+1).boxed().toArray(), DataTypes.createArrayType(DataTypes.IntegerType));         sqlContext.udf().register("media",new MedianUdaf());         sqlContext.sql("select generate(1,10)").show();     } }

到此,相信大家对“spark怎么编写udaf函数求中位数”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI