前言
技术栈
Windows 10 JDK 1.8 Flink 1.18.1相关文档
- Flink DataStream API 编程指南: https://nightlies.apache.org/flink/flink-docs-release-1.18/zh...
- File Sink: https://nightlies.apache.org/flink/flink-docs-release-1.18/zh...
示例代码
package qbit.example; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.HashMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.Encoder; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.sink.compactor.ConcatFileCompactor; import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; import com.alibaba.fastjson2.JSON; public class WordCountStreamJsonl { public static class Tuple2Json implements Encoder<Tuple2<String, Integer>> { public void encode(Tuple2<String, Integer> element, OutputStream stream) throws IOException { String word = element.f0; int cnt = element.f1; HashMap<String, Object> map = new HashMap<>(3); map.put("word", word); map.put("cnt", cnt); String line = JSON.toJSONString(map) + "\n"; stream.write(line.getBytes(StandardCharsets.UTF_8)); } } public static void main(String[] args) throws Exception { Path inputPath = new Path("K:/tmp/"); Path outputPath = new Path("K:/out"); String checkPointDir = "file:///K:/flink_checkpoint"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 启用 Checkpointing,每 1000 毫秒触发一次 Checkpoint env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointStorage(checkPointDir); final FileSource<String> source = FileSource .forRecordStreamFormat(new TextLineInputFormat(), inputPath) .monitorContinuously(Duration.ofSeconds(1L)) .build(); final DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source"); DataStream<Tuple2<String, Integer>> dataStream = stream .flatMap(new Splitter()) .keyBy(value -> value.f0) .sum(1); dataStream.print(); OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") // 文件前缀 .withPartSuffix(".jsonl") // 文件后缀 .build(); final FileSink<Tuple2<String, Integer>> sink = FileSink .forRowFormat(outputPath, new Tuple2Json()) .withRollingPolicy( OnCheckpointRollingPolicy.build()) .withOutputFileConfig(config) .enableCompact( FileCompactStrategy.Builder.newBuilder() .enableCompactionOnCheckpoint(30) // checkpoint30次后合并一次 .build(), new ConcatFileCompactor()) .build(); dataStream.sinkTo(sink).uid("custom-sink-uid"); env.execute("WordCountStreamJsonl"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word : sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }输出与输出
输入文件内容
hello flink hello spark hello spark hello python hello flink hello spark hello flink hello flink输出文件内容
{"cnt":1,"word":"spark"} {"cnt":2,"word":"spark"} {"cnt":3,"word":"spark"} {"cnt":1,"word":"hello"} {"cnt":2,"word":"hello"} {"cnt":3,"word":"hello"} {"cnt":4,"word":"hello"} {"cnt":1,"word":"python"} {"cnt":5,"word":"hello"} {"cnt":6,"word":"hello"} {"cnt":7,"word":"hello"} {"cnt":8,"word":"hello"} {"cnt":1,"word":"flink"} {"cnt":2,"word":"flink"} {"cnt":3,"word":"flink"} {"cnt":4,"word":"flink"}
项目配置文件 pom.xml
<project> <name>test_flink</name> <modelVersion>4.0.0</modelVersion> <version>0.1</version> <groupId>cn.qbit</groupId> <artifactId>test_flink</artifactId> <packaging>jar</packaging> <description>test flink</description> <properties> <java.version>1.8</java.version> <flink.version>1.18.1</flink.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> <exclusions> <exclusion> <artifactId>chill-java</artifactId> <groupId>com.twitter</groupId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <!-- 从 Flink 1.11 开始,flink-streaming-java 不再直接依赖于 flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.51</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> </plugins> </build> </project>本文出自 qbit snap
**粗体** _斜体_ [链接](http://example.com) `代码` - 列表 > 引用。你还可以使用@来通知其他用户。