温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

flink连接消费kafka实例

发布时间:2021-08-31 15:51:03 来源:亿速云 阅读:330 作者:chen 栏目:游戏开发

这篇文章主要讲解了“flink连接消费kafka实例”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink连接消费kafka实例”吧!

package flink.streaming import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time object StreamingTest {   def main(args: Array[String]): Unit = {     val kafkaProps = new Properties()     //kafka的一些属性     kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092")     //所在的消费组     kafkaProps.setProperty("group.id", "group1")     //获取当前的执行环境     val evn = StreamExecutionEnvironment.getExecutionEnvironment     //kafka的consumer,test1是要消费的topic     val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema,kafkaProps)     //设置从最新的offset开始消费     kafkaSource.setStartFromLatest()     //自动提交offset     kafkaSource.setCommitOffsetsOnCheckpoints(true)          //flink的checkpoint的时间间隔     evn.enableCheckpointing(5000)     //添加consumer     val stream = evn.addSource(kafkaSource)     stream.setParallelism(3)     val text = stream.flatMap{ _.toLowerCase().split("\\W+")filter { _.nonEmpty} }           .map{(_,1)}           .keyBy(0)           .timeWindow(Time.seconds(5))           .sum(1)          text.print()      //启动执行         evn.execute("kafkawd")                       } }

//

pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">   <modelVersion>4.0.0</modelVersion>   <groupId>hgs</groupId>   <artifactId>flink_lesson</artifactId>   <version>1.0.0</version>   <packaging>jar</packaging>   <name>flink_lesson</name>   <url>http://maven.apache.org</url>   <properties>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>   </properties>   <dependencies>     <dependency>       <groupId>junit</groupId>       <artifactId>junit</artifactId>       <version>4.1</version>       <scope>test</scope>     </dependency>          <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->	<dependency>	    <groupId>org.apache.flink</groupId>	    <artifactId>flink-core</artifactId>	    <version>1.7.1</version>	</dependency>	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->	<dependency>	    <groupId>org.apache.flink</groupId>	    <artifactId>flink-clients_2.12</artifactId>	    <version>1.7.1</version>	</dependency>	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->	<dependency>	    <groupId>org.apache.flink</groupId>	    <artifactId>flink-streaming-scala_2.12</artifactId>	    <version>1.7.1</version>	</dependency>	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-connector-kafka_2.12</artifactId>   <version>1.7.1</version> </dependency>	<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->	<dependency>	    <groupId>io.netty</groupId>	    <artifactId>netty-all</artifactId>	    <version>4.1.32.Final</version>	</dependency>   </dependencies>          <build>         <plugins>             <plugin>                 <artifactId>maven-assembly-plugin</artifactId>                 <version>2.6</version>                 <configuration>                               <archive>                         <manifest>                             <!-- 我运行这个jar所运行的主类 -->                             <mainClass>hgs.flink_lesson.WordCount</mainClass>                         </manifest>                     </archive>                                           <descriptorRefs>                         <descriptorRef>                             <!-- 必须是这样写 -->                             jar-with-dependencies                         </descriptorRef>                     </descriptorRefs>                 </configuration>                                  <executions>                     <execution>                         <id>make-assembly</id>                         <phase>package</phase>                         <goals>                             <goal>single</goal>                         </goals>                     </execution>                 </executions>             </plugin>                            <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <configuration>                     <source>1.8</source>                     <target>1.8</target>                 </configuration>             </plugin>               <plugin>	<groupId>net.alchim31.maven</groupId>	<artifactId>scala-maven-plugin</artifactId>	<version>3.2.0</version>	<executions>	<execution>	<goals>	<goal>compile</goal>	<goal>testCompile</goal>	    </goals>	<configuration>	<args>	<!-- <arg>-make:transitive</arg> -->                 	<arg>-dependencyfile</arg>                 	<arg>${project.build.directory}/.scala_dependencies</arg>               	</args>	</configuration>	</execution>	</executions>	</plugin>	<plugin>	<groupId>org.apache.maven.plugins</groupId>	<artifactId>maven-surefire-plugin</artifactId>	<version>2.18.1</version>	<configuration>	<useFile>false</useFile>	<disableXmlReport>true</disableXmlReport>	<!-- If you have classpath issue like NoDefClassError,... -->	<!-- useManifestOnlyJar>false</useManifestOnlyJar -->	<includes>	<include>**/*Test.*</include>	<include>**/*Suite.*</include>	</includes>	</configuration>	</plugin>                  </plugins>     </build> </project>

感谢各位的阅读,以上就是“flink连接消费kafka实例”的内容了,经过本文的学习后,相信大家对flink连接消费kafka实例这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI