RabbitMQ——死信队列和延迟队列

03-18 1296阅读 0评论

文章目录

  • RabbitMQ——死信队列和延迟队列
    • 1、死信队列
    • 2、基于插件的延迟队列
      • 2.1、安装延迟队列插件
      • 2.2、代码实例

        RabbitMQ——死信队列和延迟队列

        1、死信队列

        死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。

        RabbitMQ——死信队列和延迟队列,RabbitMQ——死信队列和延迟队列,词库加载错误:未能找到文件“C:\Users\Administrator\Desktop\火车头9.8破解版\Configuration\Dict_Stopwords.txt”。,使用,安装,下载,第1张
        (图片来源网络,侵删)

        死信的来源

        在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:

        • 消息被拒绝(basic.reject 或 basic.nack)且不重新入队(requeue 参数为 false)。
        • 消息过期(TTL,Time-To-Live)。
        • 队列长度超过限制,无法再添加数据到mq中。

          生产者

          package com.weipch.rabbitmq.dlq;
          import com.rabbitmq.client.AMQP;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.GetResponse;
          import com.weipch.util.RabbitMqUtils;
          /**
           * @Author 方唐镜
           * @Create 2024-03-03 14:08
           * @Description
           */
          public class Produce {
          	private static final String NORMAL_EXCHANGE = "normal_exchange";
          	public static void main(String[] args) throws Exception {
          		Channel channel = RabbitMqUtils.getChannel();
                  //模拟消息过期 10s
          		//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
          		for (int i = 0; i  
           
           

          消费者

          正常队列:

          package com.weipch.rabbitmq.dlq;
          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.DeliverCallback;
          import com.weipch.util.RabbitMqUtils;
          import java.nio.charset.StandardCharsets;
          import java.util.HashMap;
          import java.util.Map;
          /**
           * @Author 方唐镜
           * @Create 2024-03-03 13:50
           * @Description
           */
          public class Consumer01 {
          	private static final String NORMAL_EXCHANGE = "normal_exchange";
          	private static final String DEAD_EXCHANGE = "dead_exchange";
          	private static final String NORMAL_QUEUE = "normal_queue";
          	private static final String DEAD_QUEUE = "dead_queue";
          	public static void main(String[] args) throws Exception {
          		Channel channel = RabbitMqUtils.getChannel();
          		//声明死信交换机和队列
          		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
          		channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
          		//绑定
          		channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");
          		//声明普通交换机和队列
          		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
          		//死信配制 指定死信交换机和死信路由键
          		Map map = new HashMap();
          		map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
          		map.put("x-dead-letter-routing-key", "dead-routing-key");
          		//最大长度
          		//map.put("x-max-length", 6);
          		channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);
          		channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");
          		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
          			String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
          			if (message.contains("5")){
          				System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝");
          				//拒绝消息并把消息丢入死信队列
          				channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
          			}else {
          				System.out.println("Consumer01接收消息:" + message);
          				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
          			}
          		};
          		channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});
          	}
          }
          

          死信队列:

          RabbitMQ——死信队列和延迟队列,RabbitMQ——死信队列和延迟队列,词库加载错误:未能找到文件“C:\Users\Administrator\Desktop\火车头9.8破解版\Configuration\Dict_Stopwords.txt”。,使用,安装,下载,第2张
          (图片来源网络,侵删)
          package com.weipch.rabbitmq.dlq;
          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.Channel;
          import com.weipch.util.RabbitMqUtils;
          import java.nio.charset.StandardCharsets;
          import java.util.HashMap;
          import java.util.Map;
          /**
           * @Author 方唐镜
           * @Create 2024-03-03 13:50
           * @Description
           */
          public class Consumer02 {
          	
          	private static final String DEAD_QUEUE = "dead_queue";
          	public static void main(String[] args) throws Exception {
          		Channel channel = RabbitMqUtils.getChannel();
          		channel.basicConsume(DEAD_QUEUE, true,
          			(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),
          			(consumerTag, e) -> {});
          	}
          }
          

          生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。

          2、基于插件的延迟队列

          延迟队列是一种消息队列中的一种特殊类型,它允许消息在一定的延迟时间后再被消费。延迟队列的元素是希望在指定时间到了以后或之前取出处理。在实际应用中,延迟队列通常用于处理需要延时执行的任务或事件。

          使用场景

          1. 定时任务执行: 在需要定时执行任务的应用中,可以使用延迟队列来实现。将任务消息发送到延迟队列,设置消息的过期时间为任务执行的时间,当消息过期时,消费者即可执行相应的任务。
          2. 消息重试机制: 当某个操作失败时,可以将操作消息发送到延迟队列,并设置合适的重试时间。在消息重试的过程中,如果操作成功,消息将正常被消费;如果一直失败,可以选择在一定时间后放弃重试,将消息发送到死信队列或进行其他处理。
          3. 订单超时处理: 在电商等场景中,对于长时间未支付的订单,可以将订单消息发送到延迟队列,并设置订单的过期时间。当订单过期时,系统可以取消订单、释放库存等操作。
          4. 限流与流控: 通过使用延迟队列,可以实现消息的有序处理和限流,确保系统在高峰期不会因为瞬时大量请求而过载。
          5. 系统通知与提醒: 在需要发送系统通知或提醒的场景中,可以使用延迟队列来实现消息的定时推送。
          6. 缓解数据库压力: 对于一些需要定期清理的数据,可以使用延迟队列来触发数据清理操作,减轻数据库压力。

          2.1、安装延迟队列插件

          下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

          以docker方式安装

          1、把下载好的插件从服务器拷贝到 RabbitMQ 容器内plugins目录

          RabbitMQ——死信队列和延迟队列,RabbitMQ——死信队列和延迟队列,词库加载错误:未能找到文件“C:\Users\Administrator\Desktop\火车头9.8破解版\Configuration\Dict_Stopwords.txt”。,使用,安装,下载,第3张
          (图片来源网络,侵删)
          docker cp rabbitmq_delayed_message_exchange-3.13.0.ez 7c8726620871:/plugins
          

          插件版本和rabbitmq版本一致

          2、进入容器查看插件

          RabbitMQ——死信队列和延迟队列

          3、启动插件

          root@my-rabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
          

          4、重启容器

          docker restart 7c8726620871
          

          5、安装成功

          RabbitMQ——死信队列和延迟队列

          2.2、代码实例

          配置类

          package springbootrabbitmq.config;
          import org.springframework.amqp.core.Binding;
          import org.springframework.amqp.core.BindingBuilder;
          import org.springframework.amqp.core.CustomExchange;
          import org.springframework.amqp.core.Queue;
          import org.springframework.beans.factory.annotation.Qualifier;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import java.util.HashMap;
          @Configuration
          public class DelayedQueueConfig {
              //    队列
              public static final String DELAYED_QUEUE_NAME = "delayed.queue";
              //    交换机
              public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
              //    routingKey
              public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
              //    声明队列
              @Bean
              public Queue delayedQueue() {
                  return new Queue(DELAYED_QUEUE_NAME);
              }
              //    声明交换机 基于插件的交换机
              @Bean
              public CustomExchange delayedExchange() {
                  HashMap arguments = new HashMap();
                  arguments.put("x-delayed-type", "direct");
                  /*
                   * 1.交换机名称
                   * 2.交换机类型
                   * 3.是否需要持久化
                   * 4.是否需要自动删除
                   * 5.其他参数
                   * */
                  return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
              }
              //    绑定
              @Bean
              public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
                  return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
              }
          }
          

          生产者

          @GetMapping("/sendDelayMsg/{message}/{delayTime}")
          public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
              log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), delayTime, message);
              rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
                  //            发送消息的时候 延迟时长
                  msg.getMessageProperties().setDelay(delayTime);
                  return msg;
              });
          }
          

          消费者

          @Slf4j
          @Component
          public class DelayedQueueConsumer {
              //监听消息
              @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
              public void receiveDelayedQueue(Message message) {
                  String msg = new String(message.getBody());
                  log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);
              }
          }
          

免责声明
本网站所收集的部分公开资料来源于AI生成和互联网,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

发表评论

快捷回复: 表情:
评论列表 (暂无评论,1296人围观)

还没有评论,来说两句吧...

目录[+]