【世界快播报】RabbitMq TTL+死信队列 延迟消息问题记录
原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。
延迟队列存储的对象是对应的延迟消息,所谓的延迟消息是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
(资料图片)
利用RabbitMq
的TTL
和死信队列 来实现延时消费。
如果设置的是队列统一过期时间放到死信队列,没有什么问题。
如果是延时时间设置到每条消息上的。而不是给队列的。
实现方式为消息存活时间为动态用户页面可配置的。
先用一条消息的存活时间是1天。后面又进了一条消息存活时间是1小时。
结果一小时到了,发现这条消息并没有被转发到消费延时过期消息的队列。
原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。
它不会检测每一条消息是否过期。而是顺序检测。
如果first in
的消息过期时间很长,会导致它阻塞后进的消息。
不仅无法实现真正的过期时间。还会导致,一个大的过期时间的先进的消息,会堆积一堆后进的过期时间短的消息。
这个时候可以使用rabbitMq的一个插件:rabbitmq_delayed_message_exchange
一段时间以来,人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送
需要根据自己的rabbitMq选择对应的版本。我rabbitMq的版本是RabbitMQ 3.11.0
,对应的插件版本就是:3.11.1
--1、cd到rabbitmq默认安装位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通过ftp工具将插件上传到此目录下--3、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重启MQ服务systemctl restart rabbitmq-server
--1、通过ftp工具将插件上传到Linux服务器的根目录下--2、拷贝到docker中rabbitmq插件目录下,rabbitmq_delayed_message_exchange-3.9.0.ez(下载包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、进入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(确保2中的操作已经将插件拷贝过来了)cd pluginsls |grep delay--5、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重启MQ服务docker restart 容器ID
web界面新建交换机选择类型出现红框标注即表示成功
@Configurationpublic class DelayRabbitmqConfig { /** * 声明延迟队列 * @return */ @Bean public Queue delayQueue(){ return new Queue(QueueConstant.DelayQueue, true,false,false); } /** * 声明延迟自定义交换机类型 * @return */ @Bean public CustomExchange delayCustomExchange(){ HashMap args = new HashMap<>();// 设置 x-delayed-type 为 direct,当然也可以是 topic 等 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递 args.put("x-delayed-type","direct"); return new CustomExchange(ExchangeConstant.DelayCustomerExchange, "x-delayed-message",true,false,args); } /** * 绑定延迟交换机和队列 * @return */ @Bean public Binding delayQueueAndCustomExchange(){ return BindingBuilder.bind(delayQueue()) .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs(); }}
引入依赖: xmlns:util="http://www.springframework.org/schema/util" http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd
//消息发送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每条消息时间配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延迟消息处理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor { /** * 消息延迟时间,单位:毫秒 */ private final Integer TTL; public MyMessagePostProcessor(final Integer ttl) { this.TTL = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(TTL); return message; }}
关键词: RabbitMQ
原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。
1、°染不尽的余温-╮〃樱花路‖、俄的悲伤支离破碎俄们的结局用句号表示。2、我们
全系售价:13 99万-16 49万元测试车辆:2022款1 5TCVTRS蜂芒版测试车辆价格:16 49万元测试地点:广西-崇左测试道路:城市综合公路、
1、微博城市广场是指将微博集中体现的网络服务点。2、2011年5月25日,南京市整合各方资源,率先建立全国第一家微博
韩国国防部发言人全河奎27日在例行记者会上表示,援助乌克兰武器需从政府层面进行考虑。
1、弑神狂徒作者:残兮 浩瀚世界,星球无数,大陆万千。2、种族不计其数,修炼一途,让无数人痴迷,向往。3、 弑神一族
1、没有钱也可以作音乐---无师自通的音乐天才,简单的器材也能作音乐宋岳庭在短短23年的生命当中连接受一分钟的音乐指导
绘画中的点光与平行光有何不同?点光与平行光的投影差异何在?初学者选择培训班的时候,往往会看重学习氛围。培训班主要是提供学习的环境,起
提示:本文所有内容,仅为投资逻辑分享,不构成投资建议,股市有风险,盈亏自负!【事件驱动】[人造金刚石]驱动因素:24日,
1、是一款类似于厄命游戏的休闲类游戏,是自杀吐血用橙光制作,人们被迫来到一个诡异的空间。2、被迫来到这里的目的就是参加变
同花顺金融研究中心2月26日讯,有投资者向英搏尔提问,你好公司有轮边电机吗或者有轮边电机专利吗 公司回答表示,尊敬的投资者,您好!公司
1、首先,如果你是家长,一定要记得不要责怪孩子。这个时候,你的孩子需要的不是更多的打击,而是一句温暖的安慰。2、告诉孩子
1、赵娜,中国内地配音女演员。2、代表作有《喜羊羊与灰太狼》《钢铁飞龙》《幻变精灵蛋糕甜心》《神龙拯救队之元气星魂》等。
网易智造x3plus耳机的开机和关机方法网易制造的x3plus耳机多功能按键位于线控板上,三个按键中的圆形按键就是耳机的
1、《彩云追月》是一首家喻户晓的民乐。2、1932年,任光在上海百代唱片公司任节目部主任时,同聂耳一起,为百代国乐队写了
1、红旗连锁全称成都红旗连锁有限公司成立于2000年6月22日。2、2010年6月9日,整体变更为成都红旗连锁股份有限公
参考消息网2月26日报道据美联社2月24日报道,今天,在美国的最新一场冬季风暴中,强雨雪袭击了加利福尼亚州和西部其他地区,密歇根州的大量人
1、自责:因个人缺点或错误而感愧疚,是自我反省的一种形式。2、自我反省时的自责,即对自己的过失言行作一番剖析,并通过内部
1、青竹湖街道隶属于湖南省长沙市开福区,总面积约41平方公里,下辖11个社区。2、截至2020年11月1日,青竹湖街道常
有些事你不教给孩子,一辈子都不会有人告诉他