Skip to content

Commit 2f2a988

Browse files
committed
transaction msg
1 parent 0ce7ed2 commit 2f2a988

File tree

6 files changed

+70
-22
lines changed

6 files changed

+70
-22
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.lazycece.sbac.rocketmq.run;
2+
3+
import com.lazycece.sbac.rocketmq.simple.RocketmqProducer;
4+
import com.lazycece.sbac.rocketmq.transaction.TransactionProducer;
5+
import org.springframework.boot.CommandLineRunner;
6+
import org.springframework.stereotype.Component;
7+
8+
import javax.annotation.Resource;
9+
10+
11+
/**
12+
* @author lazycece
13+
* @date 2019/8/21
14+
*/
15+
@Component
16+
public class Runner implements CommandLineRunner {
17+
18+
@Resource
19+
private RocketmqProducer rocketmqProducer;
20+
@Resource
21+
private TransactionProducer transactionProducer;
22+
23+
@Override
24+
public void run(String... args) throws Exception {
25+
// rocketmqProducer.sync();
26+
transactionProducer.produce();
27+
28+
}
29+
}

springboot-ac-rocketmq/src/main/java/com/lazycece/sbac/rocketmq/consumer/RocketmqConsumer.java renamed to springboot-ac-rocketmq/src/main/java/com/lazycece/sbac/rocketmq/simple/RocketmqConsumer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1-
package com.lazycece.sbac.rocketmq.consumer;
1+
package com.lazycece.sbac.rocketmq.simple;
22

33
import com.lazycece.sbac.rocketmq.message.Message;
44
import lombok.extern.slf4j.Slf4j;
55
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
66
import org.apache.rocketmq.spring.core.RocketMQListener;
7-
import org.springframework.stereotype.Service;
7+
import org.springframework.stereotype.Component;
88

99
/**
1010
* @author lazycece
1111
* @date 2019/6/6
1212
*/
1313
@Slf4j
14-
@Service
14+
@Component
1515
public class RocketmqConsumer {
1616

17-
@Service
17+
@Component
1818
@RocketMQMessageListener(topic = "topic-queue-one", consumerGroup = "consumer_topic-queue-one")
1919
public class ConsumerOne implements RocketMQListener<Message> {
2020
@Override
@@ -23,7 +23,7 @@ public void onMessage(Message message) {
2323
}
2424
}
2525

26-
@Service
26+
@Component
2727
@RocketMQMessageListener(topic = "topic-queue-two", consumerGroup = "consumer_topic-queue-two")
2828
public class ConsumerTwo implements RocketMQListener<String> {
2929
@Override

springboot-ac-rocketmq/src/main/java/com/lazycece/sbac/rocketmq/producer/RocketmqProducer.java renamed to springboot-ac-rocketmq/src/main/java/com/lazycece/sbac/rocketmq/simple/RocketmqProducer.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,43 @@
1-
package com.lazycece.sbac.rocketmq.producer;
1+
package com.lazycece.sbac.rocketmq.simple;
22

33
import com.lazycece.sbac.rocketmq.message.Message;
44
import org.apache.rocketmq.spring.core.RocketMQTemplate;
5-
import org.springframework.beans.factory.annotation.Autowired;
6-
import org.springframework.boot.CommandLineRunner;
75
import org.springframework.stereotype.Component;
86

7+
import javax.annotation.Resource;
98
import java.util.UUID;
109

10+
1111
/**
1212
* @author lazycece
1313
* @date 2019/6/6
1414
*/
1515
@Component
16-
public class RocketmqProducer implements CommandLineRunner {
16+
public class RocketmqProducer {
1717

18+
@Resource
1819
private RocketMQTemplate rocketMQTemplate;
1920

20-
@Autowired
21-
public RocketmqProducer(RocketMQTemplate rocketMQTemplate) {
22-
this.rocketMQTemplate = rocketMQTemplate;
23-
}
24-
25-
@Override
26-
public void run(String... args) throws Exception {
21+
/**
22+
* 同步发送
23+
*/
24+
public void sync() {
2725
Message<String> message = new Message<>();
2826
message.setId(UUID.randomUUID().toString());
2927
message.setContent("Hello, springboot-ac-rocketmq !");
3028
rocketMQTemplate.convertAndSend("topic-queue-one", message);
3129
rocketMQTemplate.convertAndSend("topic-queue-two", "Hello, springboot-ac-rocketmq !");
3230
}
31+
32+
/**
33+
* 异步发送
34+
*/
35+
public void async() {
36+
Message<String> message = new Message<>();
37+
message.setId(UUID.randomUUID().toString());
38+
message.setContent("send async message!");
39+
// rocketMQTemplate.
40+
}
41+
42+
3343
}

springboot-ac-rocketmq/src/main/java/com/lazycece/sbac/rocketmq/transaction/TransactionListenerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
77
import org.springframework.messaging.Message;
88

9-
109
/**
1110
* @author lazycece
1211
* @date 2019/8/20
@@ -18,6 +17,7 @@ public class TransactionListenerImpl implements RocketMQLocalTransactionListener
1817
@Override
1918
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
2019

20+
// 模拟本地事务不通过
2121
log.info("============== executeLocalTransaction");
2222

2323
return RocketMQLocalTransactionState.UNKNOWN;
@@ -26,8 +26,11 @@ public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object
2626
@Override
2727
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
2828

29+
// 模拟回查本地事务
2930
log.info("============== checkLocalTransaction");
3031

32+
// todo 收不到回查消息
33+
3134
return RocketMQLocalTransactionState.COMMIT;
3235
}
3336
}

springboot-ac-rocketmq/src/main/java/com/lazycece/sbac/rocketmq/transaction/TransactionProducer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package com.lazycece.sbac.rocketmq.transaction;
22

33
import com.lazycece.sbac.rocketmq.message.Message;
4+
import lombok.extern.slf4j.Slf4j;
45
import org.apache.rocketmq.spring.core.RocketMQTemplate;
6+
import org.springframework.messaging.support.MessageBuilder;
7+
import org.springframework.stereotype.Component;
58

69
import javax.annotation.Resource;
710
import java.util.UUID;
@@ -10,16 +13,20 @@
1013
* @author lazycece
1114
* @date 2019/8/20
1215
*/
16+
@Component
17+
@Slf4j
1318
public class TransactionProducer {
1419

1520
@Resource
1621
private RocketMQTemplate rocketMQTemplate;
1722

18-
public void produce(){
23+
public void produce() {
1924
Message<String> message = new Message<>();
2025
message.setId(UUID.randomUUID().toString());
2126
message.setContent("transaction message");
22-
// rocketMQTemplate.sendMessageInTransaction("tx-group","topic-tx",message,null);
27+
log.info("========sending message=========");
28+
rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null);
29+
log.info("========finish send =========");
2330
}
2431

2532
}
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
rocketmq:
2-
# name-server: 127.0.0.1:9876
3-
name-server: 10.40.7.18:9876
2+
name-server: 127.0.0.1:9876
43
producer:
5-
group: rocketmq-group
4+
group: rocketmq-group

0 commit comments

Comments
 (0)