RMQ是什么
- RabbitMQ是一个基于AMQP(高级消息队列协议)实现的消息中间件,在两个应用或者服务中的消息通信(数据交换)起着重要的作用。

交换机和队列就相当于组成了一个RMQ - RMQ是异步并跨平台的。
- 消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。队列先进先出
RMQ应用场景
1、订单支付:我们只要关注订单支付成功与否的状态。剩下的库存减一,积分加1,通知商家有新订单,增加物流信息等等这一系列操作都可以交给RMQ中间件来通知其它的接口。
2、订单超时取消订单:用户下单时会锁定库存,也有可能使用了优惠券,积分等提交订单,但是用户在一定时间内没有进行支付操作,订单就会失效。我们可以在订单失效时发消息给RMQ进行取消订单操作,并释放库存,返还优惠券和积分,通知商家,取消显示物流等一系列操作。
RMQ优点
- 可靠性:消息生产和消息都有消息确认机制。保证消息的安全可靠性。
- 集群:可搭建多个RMQ服务器来实现集群。
- 高可用:在同一集群,可将消息数据复制到多个服务器中,当某一服务器宕机后消息还可用。
- 追踪和性能监控
RMQ一些术语
生产者: 消息产生方
消费者:消息消费方
交换机:把消息推送到指定队列
队列:消息的载体
路由键:交换机根据键值对消息进行投递到指定队列
虚拟主机:可以有多个
消息通道:可以有多个
什么是死信
- 消息被拒收(basic.reject / basic.nack),并且requeue = false
- 消息ttl到期
- 队列容不下的消息
fanout、direct、topic
RMQ的几种交换机模式
- 简单模式
- 工作模式
- 一个生产者、一个队列和多个消费者竞争消费消息,消费者之前消费到的消息不平等,有多有少
- fanout(发布订阅模式)、
- 一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,广播消息给
所有绑定
交换机的队列,即所有消费者接收并消费消息
- direct(路由模式)、
- 生产者发送消息到交换机,交换机通过
路由键
转发到不同队列,队列绑定的消费者接收并消费消息
- topic(通配符模式)
如何保持消息顺序性
- 消息体通过hash处理后(
hash具有不可逆特性
)分派到多个队列里,每个队列对应一个消费者,同一组的顺序消息分配到同一个队列里
RMQ集群
- 单机集群
- 只在一个服务器中存在,Demo级别应用,生产环境一般不用这种模式,性能不好,数据也不安全。
- 普通集群
- 在多个服务器上启动多个 RMQ 实例,实例中同步包含有相关元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例),消费时如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。让集群中多个节点来服务某个 queue 的读写操作来提高吞吐量。
- 镜像集群
- 区别于普通集群的是。镜像集群的实例不仅包含元数据(一些配置),还包含消息本身。一旦某个服务器宕机,会从其它有备份消息的服务器中获取消息来消费。
SpringBoot整合RMQ
安装
- 1、因为RMQ是Erlang语言编写的。所以要先Erlang下载和安装Erlang
- 2、RMQ下载并安装RMQ
- 3、在安装目录下的sbin目录下运行以下命令行
rabbitmq-plugins enable rabbitmq_management
- 4、测试运行:http://localhost:15672
- 5、登录默认账号和密码guest、guest
- 6、创建帐号并设置其角色为管理员:mall mall
- 7、创建/mall虚拟主机
- 8、配置mall用户拥有/mall主机的权限
Ps;如有不明白,请看原教程
整合
1、添加AMQG依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
2、spring节点下增加配置
rabbitmq: host: localhost port: 5672 virtual-host: /mall username: mall password: mall publisher-confirms: true
|
3、订单dto类
public class OrderParam { private Long memberReceiveAddressId; private Long couponId; private Integer useIntegration; private Integer payType; }
|
4、定义一个包含交换机名称、队列名称、路由键名称的枚举类
mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。
mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。
@Getter public enum QueueEnum {
QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");
private String exchange;
private String name;
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
|
5、添加用于配置交换机、队列及队列与交换机的绑定关系配置类RabbitMqConfig
- rabbitmq延迟配置实现原理:队列中的消息设置定时,到期后消息会被放到信队列中去,用户再去消费死信的同时。设置转发规则
orderTtlQueue
,通过x-dead-letter-exchange,和x-dead-letter-routing-key
这两个withArgument参数来转发到相应的交换机和队列中。 - 这里是mall.order.cancel.ttl过期后的消息交给mall.order.cancel处理
@Bean
注解交给Spring容器管理,项目一启动就扫描。
@Configuration public class RabbitMqConfig {
@Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); }
@Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); }
@Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); }
@Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()) .build(); }
@Bean Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); }
@Bean Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); }
}
|
6、添加向订单延迟消息队列(mall.order.cancel.ttl)发消息类
- 注入amqp依赖包下的AmqpTemplate,指定一组基本的AMQP操作,提供同步的发送和接收方法。源码中的
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
方法将java对象转换成Amqp的消息,然后使用特定的路由键将消息发送到特定的交换机上。MessagePostProcessor
可以更改消息的属性。这里传入一个延迟时间,到期后取消订单。
@Component public class CancelOrderSender { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate;
public void sendMessage(Long orderId,final long delayTimes){ amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); LOGGER.info("send delay message orderId:{}",orderId); } }
|
7、添加取消订单消息的接收者
@RabbitListener
单独使用时,注解在方法上,表示用于监听mall.order.cancel队列,进而消费消息,上面配置文件中已经有定义好消息到期后会从死信的队列转发到mall.order.cancel这里来,所以我们能监听到,然后再调用取消单个超时订单cancelOrder
方法。@RabbitListener
搭配@RabbitHandler
使用时,前者注解在类上,然后调用后者的方法消费消息。
@Component @RabbitListener(queues = "mall.order.cancel") public class CancelOrderReceiver { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); @Autowired private OmsPortalOrderService portalOrderService; @RabbitHandler public void handle(Long orderId){ LOGGER.info("receive delay message orderId:{}",orderId); portalOrderService.cancelOrder(orderId); } }
|
8、添加前台订单管理Service层接口
public interface OmsPortalOrderService {
@Transactional CommonResult generateOrder(OrderParam orderParam);
@Transactional void cancelOrder(Long orderId); }
|
9、Service层实现类
@Service public class OmsPortalOrderServiceImpl implements OmsPortalOrderService { private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class); @Autowired private CancelOrderSender cancelOrderSender;
@Override public CommonResult generateOrder(OrderParam orderParam) { LOGGER.info("process generateOrder"); sendDelayMessageCancelOrder(11L); return CommonResult.success(null, "下单成功"); }
@Override public void cancelOrder(Long orderId) { LOGGER.info("process cancelOrder orderId:{}",orderId); }
private void sendDelayMessageCancelOrder(Long orderId) { long delayTimes = 30 * 1000; cancelOrderSender.sendMessage(orderId, delayTimes); }
}
|
10、Controller层
@Controller @Api(tags = "OmsPortalOrderController", description = "订单管理") @RequestMapping("/order") public class OmsPortalOrderController { @Autowired private OmsPortalOrderService portalOrderService;
@ApiOperation("根据购物车信息生成订单") @RequestMapping(value = "/generateOrder", method = RequestMethod.POST) @ResponseBody public Object generateOrder(@RequestBody OrderParam orderParam) { return portalOrderService.generateOrder(orderParam); } }
|
11、测试
- 11.1、执行
generateOrder
方法后,会模拟下单操作,完成下单后会发送一条延迟消息,如下图,
- 延迟消息发送成功后。到期时间后过期,变成死信,然后又根据配置中设置的转发规则,转发到mall.order.cancel中去执行取消订单,形成闭环。
、如看不懂,
消息丢失的几种情况
1、生产者丢失消息
- 在发消息前开启事务(开启事务后性能变慢不推荐),利用事务的ACID四个特性来保证消息安全
- 发送方确认模式机制,当消息被发送到队列后,RMQ 就会发送ACK标识给生产者,如果RMQ处理该消息出现问题,则发送一个 Nack标识给生产者进行重发。
2、队列丢失消息 - 解决办法:消息到达队列后,开启消息持久化(也会影响性能,一般不建议),当RMQ宕机重启后可恢复消息
3、消费者丢失消息
消费者丢失消息一般是因为采用了消息自动确认机制,改为手动确认消息即可。
消息幂等性
- 消息的重复消息,比如重复支付,会引起不必要的麻烦,所以为了保持系统的安全,消息要遵循幂等性原则。实现幂等性有如下方法
- 1.消息生成时,RMQ会生成个 inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免消息重复入队
- 2.消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重的依据,避免同一条消息被重复消费。
更多方法请参考
RMQ面试题合集