Skip to content

Commit 37deffd

Browse files
Wenfeng.HuangWenfeng.Huang
authored andcommitted
springboot-kafka
1 parent c08c020 commit 37deffd

File tree

11 files changed

+300
-0
lines changed

11 files changed

+300
-0
lines changed

springboot-kafka/.gitattributes

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*.js linguist-language=java
2+
*.html linguist-language=java
3+
*.java linguist-language=java
4+
*.xml linguist-language=java
5+
*.sh linguist-language=java

springboot-kafka/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
1.下载kafka安装包、并安装:
2+
3+
下载地址:http://kafka.apache.org/
4+
5+
6+
安装:
7+
tar -zxvf kafka_2.11-0.11.0.0.tgz
8+
2.修改kafka配置文件 server.properties 加入如下配置:
9+
10+
listeners=PLAINTEXT://192.168.0.195:9092 #使其他主机访问
11+
auto.create.topics.enable = true #自动创建topic
12+
13+
3.开启9092端口,使外部可以访问。
14+
15+
4.启动kafka:
16+
17+
cd kafka_2.11-0.11.0.0
18+
bin/zookeeper-server-start.sh config/zookeeper.properties & # 先启动zookeeper
19+
bin/kafka-server-start.sh config/server.properties & # 启动kafka服务
20+
21+
5.打开项目选择KafkaApplication并运行。

springboot-kafka/pom.xml

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>com.chen</groupId>
7+
<artifactId>kafka</artifactId>
8+
<version>0.0.1-SNAPSHOT</version>
9+
<packaging>jar</packaging>
10+
11+
<name>kafka</name>
12+
<description>Demo project for Spring Boot</description>
13+
14+
<parent>
15+
<groupId>org.springframework.boot</groupId>
16+
<artifactId>spring-boot-starter-parent</artifactId>
17+
<version>1.5.6.RELEASE</version>
18+
<relativePath/> <!-- lookup parent from repository -->
19+
</parent>
20+
21+
<properties>
22+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
23+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
24+
<java.version>1.8</java.version>
25+
</properties>
26+
27+
<dependencies>
28+
<dependency>
29+
<groupId>org.springframework.kafka</groupId>
30+
<artifactId>spring-kafka</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>org.springframework.boot</groupId>
34+
<artifactId>spring-boot-starter-thymeleaf</artifactId>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.springframework.boot</groupId>
38+
<artifactId>spring-boot-starter-web</artifactId>
39+
</dependency>
40+
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
41+
<dependency>
42+
<groupId>com.google.code.gson</groupId>
43+
<artifactId>gson</artifactId>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.projectlombok</groupId>
47+
<artifactId>lombok</artifactId>
48+
<optional>true</optional>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.springframework.boot</groupId>
52+
<artifactId>spring-boot-starter-test</artifactId>
53+
<scope>test</scope>
54+
</dependency>
55+
</dependencies>
56+
57+
<build>
58+
<plugins>
59+
<plugin>
60+
<groupId>org.springframework.boot</groupId>
61+
<artifactId>spring-boot-maven-plugin</artifactId>
62+
</plugin>
63+
</plugins>
64+
</build>
65+
66+
67+
</project>
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.chen;
2+
3+
import com.chen.common.util.ToolsUtil;
4+
import com.chen.common.message.PayMessage;
5+
import com.chen.producer.MessageProducer;
6+
import org.springframework.boot.SpringApplication;
7+
import org.springframework.boot.autoconfigure.SpringBootApplication;
8+
import org.springframework.context.ApplicationContext;
9+
10+
@SpringBootApplication
11+
public class KafkaApplication {
12+
13+
public static void main(String[] args) {
14+
ApplicationContext applicationContext = SpringApplication.run(KafkaApplication.class, args);
15+
MessageProducer producer = applicationContext.getBean(MessageProducer.class);
16+
while (true){
17+
PayMessage message = new PayMessage();
18+
message.setFee(ToolsUtil.getFee());
19+
message.setOrderCode(ToolsUtil.getNextCode());
20+
message.setSendTime(System.currentTimeMillis());
21+
producer.send(message);
22+
try {
23+
Thread.sleep(2000);
24+
} catch (InterruptedException e) {
25+
e.printStackTrace();
26+
}
27+
}
28+
}
29+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.chen.common.constant;
2+
3+
/**
4+
* 类的功能描述:
5+
* 定义topic名称
6+
*
7+
* @ClassName: TopicConst
8+
* @Author hcxin
9+
* @Date 2017-09-13 03:49:22
10+
*/
11+
public class TopicConst {
12+
public static final String PAY_TOPIC = "payTopic";
13+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.chen.common.message;
2+
3+
import lombok.Getter;
4+
import lombok.Setter;
5+
import lombok.ToString;
6+
7+
import java.util.Date;
8+
9+
@Getter
10+
@Setter
11+
@ToString
12+
public class PayMessage {
13+
private String orderCode;
14+
private Float fee;
15+
private Long sendTime;
16+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.chen.common.util;
2+
3+
import java.util.Random;
4+
import java.util.UUID;
5+
6+
/**
7+
* 类的功能描述:
8+
* 工具类
9+
*
10+
* @ClassName: ToolsUtil
11+
* @Author hcxin
12+
* @Date 2017-09-13 03:49:22
13+
*/
14+
public class ToolsUtil {
15+
16+
public synchronized static String getNextCode() {
17+
return UUID.randomUUID().toString();
18+
}
19+
20+
21+
public synchronized static Float getFee() {
22+
Random rand = new Random();
23+
float fee = rand.nextFloat();
24+
25+
return fee;
26+
}
27+
28+
29+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.chen.consumer;
2+
3+
import com.chen.common.constant.TopicConst;
4+
import com.chen.common.message.PayMessage;
5+
import com.google.gson.Gson;
6+
import com.google.gson.GsonBuilder;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.kafka.annotation.KafkaListener;
10+
import org.springframework.stereotype.Component;
11+
import org.springframework.stereotype.Service;
12+
13+
/**
14+
* 类的功能描述:
15+
* 消息消费者者用于处理消息
16+
*
17+
* @ClassName: MessageConsumer
18+
* @Author haichen
19+
* @Date 2017-09-13 03:45:55
20+
*/
21+
@Service
22+
public class MessageConsumer {
23+
private static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
24+
private Gson gson = new GsonBuilder().create();
25+
26+
@KafkaListener(topics = TopicConst.PAY_TOPIC)
27+
public void onMessage(String payMessage) {
28+
PayMessage msg = gson.fromJson(payMessage, PayMessage.class);
29+
logger.info("MessageConsumer: onMessage: message is: [" + msg + "]");
30+
}
31+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.chen.producer;
2+
3+
import com.chen.common.constant.TopicConst;
4+
import com.chen.common.message.PayMessage;
5+
import com.google.gson.Gson;
6+
import com.google.gson.GsonBuilder;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.kafka.core.KafkaTemplate;
11+
import org.springframework.stereotype.Component;
12+
13+
/**
14+
* 类的功能描述:
15+
* 消息生产者用于发送消息
16+
*
17+
* @ClassName: MessageProducer
18+
* @Author hcxin
19+
* @Date 2017-09-13 03:45:02
20+
*/
21+
@Component
22+
public class MessageProducer {
23+
private static Logger logger = LoggerFactory.getLogger(MessageProducer.class);
24+
@Autowired
25+
private KafkaTemplate kafkaTemplate;
26+
private Gson gson = new GsonBuilder().create();
27+
28+
public void send(PayMessage payMessage) {
29+
String msg = gson.toJson(payMessage);
30+
kafkaTemplate.send(TopicConst.PAY_TOPIC, msg);
31+
logger.info("MessageProducer: send: message is: [" + msg + "]");
32+
33+
}
34+
35+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
server:
2+
port: 8999
3+
contextPath : /kafka
4+
spring:
5+
application:
6+
name: kafka
7+
kafka:
8+
bootstrapServers: 192.168.0.195:9092
9+
consumer:
10+
groupId: myGroup
11+
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
12+
valueDserializer: org.apache.kafka.common.serialization.StringDeserializer
13+
producer:
14+
groupId: myGroup
15+
keyDeserializer: org.apache.kafka.common.serialization.StringSerializer
16+
valueDserializer: org.apache.kafka.common.serialization.StringSerializer

0 commit comments

Comments
 (0)