温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

flink 从mysql 读取数据 放入kafka中 用于搜索全量

发布时间:2020-07-26 08:16:04 来源:网络 阅读:1796 作者:大海之中 栏目:大数据

接着上一篇,将mysql的数据导入kafka中

public static void main(String[] arg) throws Exception { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "name", "address" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/tablespace?characterEncoding=utf8") .setUsername("user").setPassword("root") .setQuery("select LOGIC_CODE, SHARE_LOG_CODE from table").setRowTypeInfo(rowTypeInfo).finish(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Row> s = env.createInput(jdbcInputFormat); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerDataSet("t2", s); Table tapiResult = tableEnv.scan("t2"); System.out.println("schema is:"); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select name, address from t2"); DataSet<Result> ds= tableEnv.toDataSet(query, Result.class); DataSet<String> temp=ds.map(new MapFunction<Result, String>() { @Override public String map(Result result) throws Exception { String name = result.name; String value = result.address; return name+":->:"+value; } }); logger.info("read db end"); KafkaOutputFormat kafkaOutput = KafkaOutputFormat.buildKafkaOutputFormat() .setBootstrapServers("ip:9092").setTopic("search_test_whk").setAcks("all").setBatchSize("1000") .setBufferMemory("100000").setLingerMs("1").setRetries("2").finish(); temp.output(kafkaOutput); logger.info("write kafka end"); env.execute("Flink add data source"); }
向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI