要使用Flink SQL读取Kafka数据,需要按照以下步骤进行操作:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>${flink.version}</version> </dependency> 确保${flink.version}是Flink的版本号。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String createTableSql = "CREATE TABLE kafka_table (\n" + " key STRING,\n" + " value STRING\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'your_topic',\n" + " 'properties.bootstrap.servers' = 'your_bootstrap_servers',\n" + " 'properties.group.id' = 'your_group_id',\n" + " 'format' = 'json',\n" + " 'scan.startup.mode' = 'earliest-offset'\n" + ")"; tEnv.executeSql(createTableSql); 在上述代码中,'topic'和'properties.bootstrap.servers'需要替换为你的Kafka主题和启动服务器的地址。'properties.group.id'是Flink消费者组的唯一标识符。
另外,'format'参数指定了数据格式,可以根据实际情况将其设置为适当的值。
String querySql = "SELECT * FROM kafka_table"; Table result = tEnv.sqlQuery(querySql); DataStream<Row> resultStream = tEnv.toAppendStream(result, Row.class); 现在,你可以对resultStream进行进一步处理,如打印或写入到其他系统中。
最后,记得调用env.execute()启动Flink作业。