接着上一篇,将mysql的数据导入kafka中
public static void main(String[] arg) throws Exception { TypeInformation[] fieldTypes = new TypeInformation[] { BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; String[] fieldNames = new String[] { "name", "address" }; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://ip:3306/tablespace?characterEncoding=utf8") .setUsername("user").setPassword("root") .setQuery("select LOGIC_CODE, SHARE_LOG_CODE from table").setRowTypeInfo(rowTypeInfo).finish(); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Row> s = env.createInput(jdbcInputFormat); BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); tableEnv.registerDataSet("t2", s); Table tapiResult = tableEnv.scan("t2"); System.out.println("schema is:"); tapiResult.printSchema(); Table query = tableEnv.sqlQuery("select name, address from t2"); DataSet<Result> ds= tableEnv.toDataSet(query, Result.class); DataSet<String> temp=ds.map(new MapFunction<Result, String>() { @Override public String map(Result result) throws Exception { String name = result.name; String value = result.address; return name+":->:"+value; } }); logger.info("read db end"); KafkaOutputFormat kafkaOutput = KafkaOutputFormat.buildKafkaOutputFormat() .setBootstrapServers("ip:9092").setTopic("search_test_whk").setAcks("all").setBatchSize("1000") .setBufferMemory("100000").setLingerMs("1").setRetries("2").finish(); temp.output(kafkaOutput); logger.info("write kafka end"); env.execute("Flink add data source"); }
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。