当前位置: 首页 > news >正文

南京网站建设哪里好济宁seo优化公司

南京网站建设哪里好,济宁seo优化公司,河南做外贸网站的公司简介,开发wordpress主题文章目录0. 什么是消息的可靠性投递1. confirm机制2. return机制3. 总结0. 什么是消息的可靠性投递 在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息…

文章目录

  • 0. 什么是消息的可靠性投递
  • 1. confirm机制
  • 2. return机制
  • 3. 总结

0. 什么是消息的可靠性投递

在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息从交换机到队列的过程中出现意外,消息没有正常投放,我们需要消息回到交换机重新投放。

为了解决因为这些种种意外而产生的问题,就需要使用消息的可靠性投递。我们需要在三个过程保证消息传输的正常 :

  1. 消息从生产者到交换机的过程
  2. 消息从交换机到队列的过程
  3. 消息从队列到消费者的过程

消息从生产者到交换机的过程中,我们可以使用confirm机制来保障消息的可靠性。

消息从交换机到队列的过程中,我们可以使用return机制来保障消息的可靠性。

消息从队列到消费者的过程中,我们可以使用ack这个参数来保障消息的可靠性。

其中confirm机制return机制都是使用回调函数,当消息投放失败后在回调函数中将消息放入redis,用定时任务重新投放。需要我们自己编码。

ack参数是RabbitMQ实现的,我们需要手动ack。RabbitMQ将一条消息推送给消费者,如果没有接收到ack就会重发。

1. confirm机制

使用confirm机制需要在配置文件中将rabbitmq.publisher-confirm-type设置为correlated。表示使用confirm机制。

publisher-confirm-type: correlated

publisher-confirm-type有以下三种值:

  • simple :简单的执行ack的判断,在发布消息成功后使用 rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判断下一步的逻辑。但是要注意的是当 waitForConfirmsOrDie 方法如果返回false则会关闭channel。(同步confirm,性能较差)
  • correlated :打开消息确认机制, 执行ack的时候还会携带消息的元数据。
  • none :禁用发布确认模式,默认值。

最后在RabbitTemplate中设置回调函数。这个函数不管是消息发送成功与否都会执行。该函数接收一个接口 :ConfirmCallback,此接口只有一个方法 :

void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
  • correlationData :保存消息id以及相关信息

  • ack :交换机是否收到消息,收到为true

  • cause :消息接收不到原因,成功接收消息则为null

在这个函数里,可以判断ack,如果为false则代表信息发送失败,可以存入redis,让定时任务扫描redis重新发送消息。(解决方案有很多,这里简单提个建议)

@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入数据库中定时重发.// ...});return rabbitTemplate;}
}

总结 :

confirm机制有两步 :

  1. 在yml配置文件中将publisher-confirm-type设置为correlated,开启confirm机制

    publisher-confirm-type: correlated
    
  2. 设置回调函数,setConfirmCallback

    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入数据库中定时重发.// ...
    });
    

2. return机制

在了解return 机制前先了解一个参数 :mandatory

mandatory是AMQP协议中basic.publish方法中的标识位。

如果交换机根据路由键找不到对应的队列,那么此时它有两个选择 :将消息返回交换机、将消息丢弃,当mandatory的值为true时,消息将返回给交换机;当mandatory的值为false时,消息将被丢弃。

我们想要实现消息的可靠性投递,当消息投放错时让消息重新投放,那么我们就需要将mandatory的值设置为true。

所以实现return机制有三步 :

想使用return机制,首先要在配置文件中将publisher-returns的值设置为true,代表开启return机制

publisher-returns: true

接下来将mandatory这个值设置为true,代表如果消息丢失或者出现意外,将消息返回,而不是丢弃。

rabbitTemplate.setMandatory(true);

最后实现ReturnCallback这个接口 :

void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
  • message :此条消息
  • replyCode :错误编码
  • replyText :消息接收失败的原因
  • exchange :此条消息的目标交换机
  • routingKey :此条消息的路由键
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 打印日志log.info("ReturnCallback:" + "消息:" + message);log.info("ReturnCallback:" + "回应码:" + replyCode);log.info("ReturnCallback:" + "回应信息:" + replyText);log.info("ReturnCallback:" + "交换机:" + exchange);log.info("ReturnCallback:" + "路由键:" + routingKey);// 做其他处理...
});

执行到returnCallback函数就代表消息从交换机到队列的过程出现问题,例如路由键错误网络问题找不到相应队列…这些情况下可以先打印日志,再使用手动签收机制让交换机重新发送消息。

因为路由键错误这种就是代码写错了,重发消息也没用,只能打印日志等操作来尽可能提醒开发人员,但是网络问题可以重新发送消息。

3. 总结

实现消息的可靠性投递通过confirm机制和return机制。

  1. 在配置文件中开启这两种机制:

    publisher-confirm-type: correlated
    publisher-returns: true
    
  2. 注入RabbitTemplate时将mandatory设置为true,代表消息投递异常时将消息返回,而不是丢弃。

  3. 注入RabbitTemplate时实现ConfirmCallback和ReturnCallback。在里面完成消息投放错误后的保障工作。

    @Slf4j
    @Configuration
    public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入redis数据库中定时重发.// ...});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 打印日志log.info("ReturnCallback:" + "消息:" + message);log.info("ReturnCallback:" + "回应码:" + replyCode);log.info("ReturnCallback:" + "回应信息:" + replyText);log.info("ReturnCallback:" + "交换机:" + exchange);log.info("ReturnCallback:" + "路由键:" + routingKey);});return rabbitTemplate;}
    }
    
    1. 在保证confirm机制和return机制的同时别忘记让消费者手动ack。

    注意 :

    为什么return机制和ack机制可以重新发送,而confirm机制只能借用外部手段呢(例如redis、xxl-job)?

    因为return机制触发时消息已经到了交换机,可以通过判断通过mandatory这个参数来确定是重新发送还是丢弃。(只是在发送给队列时出现错误,人家RabbitMQ自己设计的可以重发。)

    但是confirm机制触发时消息刚从生产者发送给交换机,还没进入RabbitMQ,只能借用外部手段了。

http://www.ritt.cn/news/6721.html

相关文章:

  • 国家卫健委投诉热线南宁百度seo推广
  • 湛江网站seoseo黑帽技术
  • wordpress建站linuxseo排名点击报价
  • 青海建设兵团青岛战友网站广州网站营销推广
  • 动漫网站网页设计代码泰州seo外包
  • 网站建设公司业务员武汉百度搜索优化
  • 做seo网站空间软文广告怎么写
  • 怎么创建网站平台.com搜索引擎排名优化公司
  • 啤酒招商网站大全北京优化核酸检测
  • 专业做合同的网站如何提升百度关键词排名
  • 静态网站有什么用网站域名查询ip地址
  • 彩票网站搭建多钱品牌营销推广代运营
  • 三合一网站建设推广汕头疫情最新消息
  • 移动建站是什么意思网络推广是指什么
  • 小百姓这个网站谁做的网站设计公司怎么样
  • 开源cms框架长沙整站优化
  • 网站seo搜索引擎优化怎么做青岛的seo服务公司
  • 佛山顺德专业做网站seo怎么学在哪里学
  • 安徽省外经建设集团有限公司网站网站关键词seo排名
  • 云端物联网管理平台长沙seo网站优化
  • 一张图片切块做网站背景如何提高搜索引擎优化
  • 东莞长安网站制作百度网站关键词排名查询
  • 鹤山市网站建设公司互联网营销师培训费用是多少
  • 做独立网站需要注意什么手续廊坊网站
  • 兰州网站设计公司重庆seo搜索引擎优化优与略
  • 做网站公司 上海广告营销策略有哪些
  • 没有网站怎么做链接视频播放器电商营销策划方案范文
  • 吉林网站备案seo网站地图
  • 网站广告如何做seo应该怎么做
  • 四川建设厅网站招聘网络营销的现状分析