温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何使用Apache Flink实现自定义Sink

发布时间:2021-09-13 14:29:48 来源:亿速云 阅读:293 作者:柒染 栏目:大数据

如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中。

创建数据库和表

create database imooc_flink; create table student( id int(11) NOT NULL AUTO_INCREMENT, name varchar(25), age int(10), primary key(id) )

导入mysql依赖:

<dependency>	<groupId>mysql</groupId>	<artifactId>mysql-connector-java</artifactId>	<version>8.0.15</version>	</dependency>

创建POJO Student

package com.vincent.course05; public class Student {     private int id;     private String name;     private int age;     @Override     public String toString() {         return "Student{" +                 "id=" + id +                 ", name='" + name + '\'' +                 ", age=" + age +                 '}';     }     public int getId() {         return id;     }     public void setId(int id) {         this.id = id;     }     public String getName() {         return name;     }     public void setName(String name) {         this.name = name;     }     public int getAge() {         return age;     }     public void setAge(int age) {         this.age = age;     } }

然后创建连接,SinkToMySQL.java

package com.vincent.course05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class SinkToMySQL extends RichSinkFunction<Student> {     PreparedStatement ps;     private Connection connection;     /**      * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接      *      * @param parameters      * @throws Exception      */     @Override     public void open(Configuration parameters) throws Exception {         super.open(parameters);         connection = getConnection();         String sql = "insert into student(id, name, age) values(?, ?, ?);";         ps = this.connection.prepareStatement(sql);     }     @Override     public void close() throws Exception {         super.close();         //关闭连接和释放资源         if (connection != null) {             connection.close();         }         if (ps != null) {             ps.close();         }     }     /**      * 每条数据的插入都要调用一次 invoke() 方法      *      * @param value      * @param context      * @throws Exception      */     @Override     public void invoke(Student value, Context context) throws Exception {         //组装数据,执行插入操作         ps.setInt(1, value.getId());         ps.setString(2, value.getName());         ps.setInt(3, value.getAge());         ps.executeUpdate();     }     private static Connection getConnection() {         Connection con = null;         try {             Class.forName("com.mysql.cj.jdbc.Driver");             con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456");         } catch (Exception e) {             e.printStackTrace();             System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());         }         return con;     } }

main方法:

public static void main(String[] args) throws Exception {         StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();         DataStreamSource<String> source = environment.socketTextStream("192.168.152.45", 9999);         SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() {             @Override             public Student map(String value) throws Exception {                 String[] splits = value.split(",");                 Student student = new Student();                 student.setId(Integer.parseInt(splits[0]));                 student.setName(splits[1]);                 student.setAge(Integer.parseInt(splits[2]));                 return student;             }         });         studentStream.addSink(new SinkToMySQL());         environment.execute("JavaCustomSinkToMysql");     }

从socket中获取数据,数据格式使用逗号分割,在控制台中输入:

nc -lk 9999 1,tom,23

检查数据库,数据库中多了一条数据

mysql> select * from student; +----+------+------+ | id | name | age  | +----+------+------+ |  1 | tom  |   23 | +----+------+------+ 1 row in set (0.00 sec)

这样就很方便的使用自定义的sink,写入到MySQL中去。

总结:

第一步:继承RichSinkFunction<T> T就是想要写入的对象类型

第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次

默认情况下open方法的并行度不是1,跟具体的电脑有关系。

看完上述内容,你们掌握如何使用Apache Flink实现自定义Sink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI