温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flink的函数有哪些

发布时间:2021-12-28 17:43:33 来源:亿速云 阅读:167 作者:小新 栏目:大数据

这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

1. Map: 将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素

具体代码实现

package com.wudl.core; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /**  * @version v1.0  * @ProjectName Flinklearning  * @ClassName WordMap  * @Description TODO map 算子实例  * @Date 2020/10/29 10:15  */ public class WordMap {     /**      * @param args      * Map 函数的用法      * 映射:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素      *参数: Lambda 表达式或者,new MapFunction实现类      * 返回值:DataStream      */     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setMaxParallelism(1);         env.socketTextStream("10.204.125.140", 8899)                 .map(new MapFunction<String, String>() {                     @Override                     public String map(String s) throws Exception {                         String[] split = s.split(",");                         return split[0] + "---" + split[1];                     }                 }).print();         env.execute();     } }

2. FlatMap:

将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素
package com.wudl.core; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.List; /**  * @version v1.0  * @ProjectName Flinklearning  * @ClassName TransformFlatMap  * @Description TODO FlatMap  *  * FlatMap: 是一种扁平的映射,将数据流中的整体拆分成为一个个的个体使用, 消费后的元素产生零到多个元素  *  *  *  * @Author wudl  * @Date 2020/10/29 10:46  *  *  * 函数 FlatMap  * 将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素  * 参数: lambda 表达式或者是FlatFunction的实现类  * 返回值:DataStream  *  *  *  */ public class TransformFlatMap {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1); //        DataStreamSource<List<Integer>> listDs = env.fromCollection(Arrays.asList( //                Arrays.asList(1, 2, 3), //                Arrays.asList(3, 4, 5), //                Arrays.asList(8,9,0) //        )); //        listDs.flatMap(new FlatMapFunction<List<Integer>, Integer>() { //            @Override //            public void flatMap(List<Integer> list, Collector<Integer> collector) throws Exception { // //                for (Integer number : list) { //                    collector.collect(number + 100); //                } // //            } //        }).print();         DataStreamSource<String> strDs = env.socketTextStream("10.204.125.140", 8899);         strDs.flatMap(new FlatMapFunction<String, String>() {             @Override             public void flatMap(String s, Collector<String> collector) throws Exception {                 String[] split = s.split(",");                 collector.collect(split[0]+split[1]);             }         }).print();         env.execute();     } }

第三种:Filter  对数据流的过滤根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃

package com.wudl.core; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /**  * @version v1.0  * @ProjectName Flinklearning  * @ClassName TransformFilter  * @Description TODO 流的过滤  * @Date 2020/11/5 10:26  */ public class TransformFilter {     /**      * 函数中Filter 中过滤      * 过滤:根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false)  将丢弃      * 返回值:DataStream      */     public static void main(String[] args) throws Exception {         //1.获取上下文的环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //2.设置并行度         env.setParallelism(1);         //3.获取数据流         DataStreamSource<String> SourceDs = env.socketTextStream("10.204.125.140", 8899);         //4. 过滤数据流         DataStream<String> filter = SourceDs.filter(new FilterFunction<String>() {             @Override             public boolean filter(String value) throws Exception {                 String[] split = value.split(",");                 return split[1].length() > 3;             }         });         filter.print();         env.execute();     } }

感谢你能够认真阅读完这篇文章,希望小编分享的“Flink的函数有哪些”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI