RMQ是什么

  • RabbitMQ是一个基于AMQP(高级消息队列协议)实现的消息中间件,在两个应用或者服务中的消息通信(数据交换)起着重要的作用。
    图1
    交换机和队列就相当于组成了一个RMQ
  • RMQ是异步并跨平台的。
  • 消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。队列先进先出

RMQ应用场景

1、订单支付:我们只要关注订单支付成功与否的状态。剩下的库存减一,积分加1,通知商家有新订单,增加物流信息等等这一系列操作都可以交给RMQ中间件来通知其它的接口。
2、订单超时取消订单:用户下单时会锁定库存,也有可能使用了优惠券,积分等提交订单,但是用户在一定时间内没有进行支付操作,订单就会失效。我们可以在订单失效时发消息给RMQ进行取消订单操作,并释放库存,返还优惠券和积分,通知商家,取消显示物流等一系列操作。

RMQ优点

  • 可靠性:消息生产和消息都有消息确认机制。保证消息的安全可靠性。
  • 集群:可搭建多个RMQ服务器来实现集群。
  • 高可用:在同一集群,可将消息数据复制到多个服务器中,当某一服务器宕机后消息还可用。
  • 追踪和性能监控

RMQ一些术语

生产者: 消息产生方
消费者:消息消费方
交换机:把消息推送到指定队列
队列:消息的载体
路由键:交换机根据键值对消息进行投递到指定队列
虚拟主机:可以有多个
消息通道:可以有多个

什么是死信

  • 消息被拒收(basic.reject / basic.nack),并且requeue = false
  • 消息ttl到期
  • 队列容不下的消息

fanout、direct、topic

RMQ的几种交换机模式

  • 简单模式
    • 生产者、消费者、队列各一个
  • 工作模式
    • 一个生产者、一个队列和多个消费者竞争消费消息,消费者之前消费到的消息不平等,有多有少
  • fanout(发布订阅模式)、
    • 一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,广播消息给所有绑定交换机的队列,即所有消费者接收并消费消息
  • direct(路由模式)、
    • 生产者发送消息到交换机,交换机通过路由键转发到不同队列,队列绑定的消费者接收并消费消息
  • topic(通配符模式)
    • 根据路由键的通配符规则匹配对应的交换机和队列

如何保持消息顺序性

  • 消息体通过hash处理后(hash具有不可逆特性)分派到多个队列里,每个队列对应一个消费者,同一组的顺序消息分配到同一个队列里

RMQ集群

  • 单机集群
    • 只在一个服务器中存在,Demo级别应用,生产环境一般不用这种模式,性能不好,数据也不安全。
  • 普通集群
    • 在多个服务器上启动多个 RMQ 实例,实例中同步包含有相关元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例),消费时如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。让集群中多个节点来服务某个 queue 的读写操作来提高吞吐量。
  • 镜像集群
    • 区别于普通集群的是。镜像集群的实例不仅包含元数据(一些配置),还包含消息本身。一旦某个服务器宕机,会从其它有备份消息的服务器中获取消息来消费。

SpringBoot整合RMQ

安装

  • 1、因为RMQ是Erlang语言编写的。所以要先Erlang下载和安装Erlang
  • 2、RMQ下载并安装RMQ
  • 3、在安装目录下的sbin目录下运行以下命令行

rabbitmq-plugins enable rabbitmq_management

  • 4、测试运行:http://localhost:15672
  • 5、登录默认账号和密码guest、guest
  • 6、创建帐号并设置其角色为管理员:mall mall
  • 7、创建/mall虚拟主机
  • 8、配置mall用户拥有/mall主机的权限
    Ps;如有不明白,请看原教程

整合

  • 根据应用场景2,来整合RMQ

1、添加AMQG依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、spring节点下增加配置

rabbitmq:
host: localhost # ip
port: 5672 # 端口
virtual-host: /mall # RMQ的虚拟host
username: mall # 用户名
password: mall # 密码
publisher-confirms: true #确保消息成功发送到交换器

3、订单dto类

public class OrderParam {
//收货地址id
private Long memberReceiveAddressId;
//优惠券id
private Long couponId;
//使用的积分数
private Integer useIntegration;
//支付方式
private Integer payType;
//省略get/set
}

4、定义一个包含交换机名称、队列名称、路由键名称的枚举类

mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。


mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。

@Getter public enum QueueEnum {
/**
* 消息通知队列
*/
QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),
/**
* 消息通知ttl队列
*/
QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");

/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;

QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}

5、添加用于配置交换机、队列及队列与交换机的绑定关系配置类RabbitMqConfig

  • rabbitmq延迟配置实现原理:队列中的消息设置定时,到期后消息会被放到信队列中去,用户再去消费死信的同时。设置转发规则orderTtlQueue,通过x-dead-letter-exchange,和x-dead-letter-routing-key这两个withArgument参数来转发到相应的交换机和队列中。
  • 这里是mall.order.cancel.ttl过期后的消息交给mall.order.cancel处理
  • @Bean注解交给Spring容器管理,项目一启动就扫描。
@Configuration public class RabbitMqConfig {

/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange orderDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}

/**
* 订单延迟队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}

/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}

/**
* 订单延迟队列(死信队列)
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
.build();
}

/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderDirect)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}

/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlDirect)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}

}

6、添加向订单延迟消息队列(mall.order.cancel.ttl)发消息类

  • 注入amqp依赖包下的AmqpTemplate,指定一组基本的AMQP操作,提供同步的发送和接收方法。源码中的void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)方法将java对象转换成Amqp的消息,然后使用特定的路由键将消息发送到特定的交换机上。MessagePostProcessor可以更改消息的属性。这里传入一个延迟时间,到期后取消订单。
@Component public class CancelOrderSender {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
@Autowired
private AmqpTemplate amqpTemplate;

public void sendMessage(Long orderId,final long delayTimes){
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
}
});
LOGGER.info("send delay message orderId:{}",orderId);
}
}

7、添加取消订单消息的接收者

  • @RabbitListener单独使用时,注解在方法上,表示用于监听mall.order.cancel队列,进而消费消息,上面配置文件中已经有定义好消息到期后会从死信的队列转发到mall.order.cancel这里来,所以我们能监听到,然后再调用取消单个超时订单cancelOrder方法。
  • @RabbitListener搭配@RabbitHandler使用时,前者注解在类上,然后调用后者的方法消费消息。
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
@Autowired
private OmsPortalOrderService portalOrderService;
@RabbitHandler
public void handle(Long orderId){
LOGGER.info("receive delay message orderId:{}",orderId);
portalOrderService.cancelOrder(orderId);
}
}

8、添加前台订单管理Service层接口

public interface OmsPortalOrderService {

/**
* 根据提交信息生成订单
*/
@Transactional
CommonResult generateOrder(OrderParam orderParam);

/**
* 取消单个超时订单
*/
@Transactional
void cancelOrder(Long orderId);
}

9、Service层实现类

@Service public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
@Autowired
private CancelOrderSender cancelOrderSender;

@Override
public CommonResult generateOrder(OrderParam orderParam) {
//todo 下单操作,生成订单号。。这里省略
LOGGER.info("process generateOrder");
//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
sendDelayMessageCancelOrder(11L); //参数为订单ID,这里测试直接写死
return CommonResult.success(null, "下单成功");
}

@Override
public void cancelOrder(Long orderId) {
//todo 这里开始处理超时订单,可加失效标识
LOGGER.info("process cancelOrder orderId:{}",orderId);
}

private void sendDelayMessageCancelOrder(Long orderId) {
//获取订单超时时间,假设为60分钟(测试用的30秒)
long delayTimes = 30 * 1000;
//发送延迟消息
cancelOrderSender.sendMessage(orderId, delayTimes);
}

}

10、Controller层

@Controller @Api(tags = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {
@Autowired
private OmsPortalOrderService portalOrderService;

@ApiOperation("根据购物车信息生成订单")
@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)
@ResponseBody
public Object generateOrder(@RequestBody OrderParam orderParam) {
return portalOrderService.generateOrder(orderParam);
}
}

11、测试

  • 11.1、执行generateOrder方法后,会模拟下单操作,完成下单后会发送一条延迟消息,如下图,ciFvMq.png
  • 延迟消息发送成功后。到期时间后过期,变成死信,然后又根据配置中设置的转发规则,转发到mall.order.cancel中去执行取消订单,形成闭环。

、如看不懂,

消息丢失的几种情况

1、生产者丢失消息

  • 在发消息前开启事务(开启事务后性能变慢不推荐),利用事务的ACID四个特性来保证消息安全
  • 发送方确认模式机制,当消息被发送到队列后,RMQ 就会发送ACK标识给生产者,如果RMQ处理该消息出现问题,则发送一个 Nack标识给生产者进行重发。
    2、队列丢失消息
  • 解决办法:消息到达队列后,开启消息持久化(也会影响性能,一般不建议),当RMQ宕机重启后可恢复消息
    3、消费者丢失消息
    消费者丢失消息一般是因为采用了消息自动确认机制,改为手动确认消息即可。

消息幂等性

  • 消息的重复消息,比如重复支付,会引起不必要的麻烦,所以为了保持系统的安全,消息要遵循幂等性原则。实现幂等性有如下方法
    • 1.消息生成时,RMQ会生成个 inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免消息重复入队
    • 2.消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付 ID、订单 ID、帖子 ID 等)作为去重的依据,避免同一条消息被重复消费。
      更多方法请参考

RMQ面试题合集