温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spring Boot + RabbitMQ如何实现分布式事务

发布时间:2021-12-24 09:43:23 来源:亿速云 阅读:792 作者:小新 栏目:安全技术

小编给大家分享一下Spring Boot + RabbitMQ如何实现分布式事务,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

一:分布式事务解决方案

1.两阶段提交(2PC)

第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.

第二阶段:事务协调器要求每个数据库提交数据。

Spring Boot + RabbitMQ如何实现分布式事务

案例可参照http://blog.itpub.net/28624388/viewspace-2137095/

2.补偿事务(TCC)

TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:

Try 阶段主要是对业务系统做检测及资源预留

Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。

Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

3.本地消息表(异步确保)

本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理。

Spring Boot + RabbitMQ如何实现分布式事务

基本思路:

a.消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。

b.消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

c.生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

二:Spring Boot + RabbitMQ分布式事务实现

1.pom.xml依赖配置

<dependency>	<groupId>org.springframework.boot</groupId>	<artifactId>spring-boot-starter-amqp</artifactId>	</dependency>

 2.application.yaml  rabbitmq配置

# RabbitMQ           rabbitmq:     host: 112.74.105.178     port: 5672     username: admin     password: admin     virtual-host: /     publisher-confirms: true     publisher-returns: true     listener:       simple:         acknowledge-mode: manual

3.RabbitMQConfig.java

@Configuration public class RabbitMQConfig { // 下单并且派单存队列	public static final String ORDER_DIC_QUEUE = "order_dis_queue";	// 补单队列,判断订单是否已经被创建	public static final String ORDER_CREATE_QUEUE = "order_create_queue";	// 下单并且派单交换机	private static final String ORDER_EXCHANGE_NAME = "order_exchange_name";	@Bean	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {	RabbitTemplate template = new RabbitTemplate(connectionFactory);	template.setMessageConverter(new Jackson2JsonMessageConverter());	return template;	}	@Bean	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {	SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();	factory.setConnectionFactory(connectionFactory);	factory.setMessageConverter(new Jackson2JsonMessageConverter());	return factory;	}	@Bean	public Queue OrderDicQueue() {	return new Queue(ORDER_DIC_QUEUE);	}	@Bean	public Queue OrderCreateQueue() {	return new Queue(ORDER_CREATE_QUEUE);	}	@Bean	DirectExchange directOrderExchange() {	return new DirectExchange(ORDER_EXCHANGE_NAME);	}	@Bean	Binding bindingExchangeOrderDicQueue() {	return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey");	}	@Bean	Binding bindingExchangeOrderCreateQueue() {	return BindingBuilder.bind(OrderCreateQueue()).to(directOrderExchange()).with("orderRoutingKey");	} }

4. 消息生产者

public class MsgPushInfoServiceImpl extends ServiceImpl<MsgPushInfoMapper, MsgPushInfoEntity>	implements MsgPushInfoService, RabbitTemplate.ConfirmCallback {	@Autowired	private RabbitTemplate rabbitTemplate;	public void orderAndDsipatch() {	try {	String orderId = "123456";	JSONObject jsonObect = new JSONObject();	jsonObect.put("orderId", orderId);	String msg = jsonObect.toString();	System.out.println("msg:" + msg);	MessageProperties messageProperties = new MessageProperties();	        messageProperties.setContentType("application/json");	        messageProperties.setMessageId(orderId);	        Message message = new Message(msg.getBytes(),messageProperties);	        	CorrelationData correlationData = new CorrelationData(orderId);	rabbitTemplate.setMandatory(true);	rabbitTemplate.setConfirmCallback(this);	rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData);	} catch (Exception e) {	e.printStackTrace();	}	}	@Override	public void confirm(CorrelationData correlationData, boolean ack, String cause) {	String orderId = correlationData.getId();	System.out.println("消息id:" + orderId);	if (ack) { // 消息发送成功	System.out.println("消息发送确认成功");	} else {	// 重试机制	System.out.println("消息发送确认失败:" + cause);	}	} }

5.消息消费者

@Component public class DispatchReceiver {	@RabbitHandler	@RabbitListener(queues = "order_dis_queue", containerFactory = "rabbitListenerContainerFactory")	public void process(Message message, Channel channel) {	System.out.println("rev : " + message.getMessageProperties().getMessageId());	try {	System.out.println("======basicNack====="+message.getMessageProperties().getDeliveryTag());	//业务处理成功,则删除消息	channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);	//业务处理失败,则发送补偿消息	} catch (Exception e) {	e.printStackTrace();	}	} }

看完了这篇文章,相信你对“Spring Boot + RabbitMQ如何实现分布式事务”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI