当前位置: 首页 > news >正文

现代化专业群建设网站某网站搜索引擎优化

现代化专业群建设网站,某网站搜索引擎优化,电子商务的建站流程,教你做美食的网站Rabbit高级用法 一、Rabbit Springboot集成1.1 引入依赖1.2 添加配置1.3 添加Config1.4 编写Consumer1.5 发送消息 二、Rabbit 高级用法2.1 消息发送前置处理器2.2 消息发送确认机制2.3 消息接收后处理器2.4 事务消息 一、Rabbit Springboot集成 1.1 引入依赖 <dependency…

Rabbit高级用法

  • 一、Rabbit Springboot集成
    • 1.1 引入依赖
    • 1.2 添加配置
    • 1.3 添加Config
    • 1.4 编写Consumer
    • 1.5 发送消息
  • 二、Rabbit 高级用法
    • 2.1 消息发送前置处理器
    • 2.2 消息发送确认机制
    • 2.3 消息接收后处理器
    • 2.4 事务消息

一、Rabbit Springboot集成

1.1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2 添加配置

server:port: 8080
spring:application:name: rabbitmq-testrabbitmq:host: 127.0.0.1port: 5672username: testpassword: testvirtual-host: /publisher-returns: true

1.3 添加Config

@Configuration
@EnableConfigurationProperties(MqProperties.class)
public class MqConfig {@Autowiredprivate MqProperties mqProperties;@Beanpublic MessageConverter messageConverter() {// 设置消息转换器return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory());// 设置消息转换器rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;}@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(mqProperties.getHost());connectionFactory.setPort(mqProperties.getPort());connectionFactory.setUsername(mqProperties.getUsername());connectionFactory.setPassword(mqProperties.getPassword());connectionFactory.setVirtualHost(mqProperties.getVirtualHost());connectionFactory.setPublisherReturns(mqProperties.getPublisherReturns());connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Beanpublic Queue queue() {return new Queue("test-queue", true, false, false);}@Beanpublic Exchange exchange() {return new DirectExchange("direct-exchange-test", true, false);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with("direct-key").noargs();}
}

1.4 编写Consumer

@Component
public class DirectConsumer extends MessageListenerAdapter {private static final Logger logger = LoggerFactory.getLogger(DirectConsumer.class);@Autowiredprivate MessageConverter messageConverter;@Override@RabbitListener(queues = {"test-queue"}, ackMode = "MANUAL")public void onMessage(Message message, Channel channel) throws Exception {try {Map<String, String> msg = (Map<String, String>) messageConverter.fromMessage(message);// 获取 correlation idString id = (String) message.getMessageProperties().getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY);// String msg = new String(message.getBody(), StandardCharsets.UTF_8);logger.info("directConsumer>>>>>>message={}", msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {logger.error("directConsumer>>>>>>exception", e);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}

1.5 发送消息

@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{data}")public void test(@PathVariable(value = "data", required = false) String data) {Map<String, String> msg = new HashMap<>();msg.put("time", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));if (StringUtils.isEmpty(data)) {data = String.valueOf(System.currentTimeMillis());}msg.put("data", "this is data:" + data);rabbitTemplate.convertAndSend("direct-exchange-test", "direct-key", msg, new CorrelationData(UUID.randomUUID().toString()));}
}

二、Rabbit 高级用法

2.1 消息发送前置处理器

RabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {....})

@Bean
public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate();// 添加消息前置处理器。rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {logger.info("-----------------rabbitmq before postProcess message={}", JSON.toJSONString(message));return message;}});rabbitTemplate.setConnectionFactory(connectionFactory());// 设置消息转换器rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;
}

通过前置处理器,可以修改消息、保存消息,设置通用header等。

2.2 消息发送确认机制

设置消息发送ConfirmCallback,消息发送成功 / 失败都会调用当前方法。


@Bean
public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {@Overridepublic CorrelationData postProcess(Message message, CorrelationData correlationData) {logger.info("-----------------rabbitmq correlationData postProcess message={}, correlationData={}",JSON.toJSONString(message), JSON.toJSONString(correlationData));return correlationData;}});rabbitTemplate.setConnectionFactory(connectionFactory());// 设置消息转换器rabbitTemplate.setMessageConverter(messageConverter());// 消息确认,需要配置 spring.rabbitmq.publisher-confirm-type = correlatedrabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {logger.info("setConfirmCallback>>>>>>correlationData={} ack={}, cause={}", JSON.toJSONString(correlationData), ack, cause);}});
//开启mandatory模式(开启失败回调)rabbitTemplate.setMandatory(true);//添加失败回调方法rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {logger.info("setReturnCallback>>>>>消息发送队列不可达, message:{}, exchange:{}, routingKey:{}, 原因:{}", message, exchange, routingKey, replyText);});return rabbitTemplate;
}

通过 new RabbitTemplate.ConfirmCallback() 中的 confirm(CorrelationData correlationData, boolean ack, String cause) 判断消息是否发送成功。

  • ack=true:发送成功
  • ack=false:发送失败

2.3 消息接收后处理器

消息接收前执行。
方式1:

/**
* 添加SimpleRabbitListenerContainerFactory 
* 通过 @RabbitListener(queues = {"test-queue"}, containerFactory = "containerFactory", ackMode = "MANUAL") 设置
*/
@Bean("containerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 并发消费者数,默认为 1factory.setConcurrentConsumers(5);// 最大并发消费者数,默认为 1factory.setMaxConcurrentConsumers(10);// 拒绝未确认的消息并重新将它们放回队列,默认为 truefactory.setDefaultRequeueRejected(false);// 容器启动时是否自动启动,默认为 truefactory.setAutoStartup(true);// 消息确认模式,默认为 AUTOfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 每个消费者在一次请求中预获取的消息数,默认为 1factory.setPrefetchCount(5);// 从队列中接收消息的超时时间,默认为 0,表示没有超时限制factory.setReceiveTimeout(0L);// 与容器一起使用的事务管理器。默认情况下,容器不会使用事务// factory.setTransactionManager(platformTransactionManager());// 消息转换器,用于将接收到的消息转换为 Java 对象或将 Java 对象转换为消息factory.setMessageConverter(messageConverter);// 用于异步消息处理的线程池。默认情况下,容器使用一个简单的 SimpleAsyncTaskExecutorfactory.setTaskExecutor(new SimpleAsyncTaskExecutor());// 重试失败的消息之前等待的时间,默认为 5000 毫秒factory.setRecoveryInterval(5000L);// 如果消息处理器尝试监听不存在的队列,是否抛出异常。默认为 truefactory.setMissingQueuesFatal(false);// 监听器容器连接工厂factory.setConnectionFactory(connectionFactory());// 设置后置处理器factory.setAfterReceivePostProcessors(new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));return message;}});return factory;
}

方式2:

@Bean
public SimpleMessageListenerContainer listenerContainer(@Qualifier("directConsumer") DirectConsumer directConsumer) {String queueName = "test-queue";SimpleMessageListenerContainer simpleMessageListenerContainer =new SimpleMessageListenerContainer(connectionFactory());simpleMessageListenerContainer.setQueueNames(queueName);simpleMessageListenerContainer.setMessageListener(directConsumer);return simpleMessageListenerContainer;
}@Autowired(required = false)
private List<AbstractMessageListenerContainer> simpleMessageListenerContainers;@PostConstruct
public void init() {if (CollectionUtils.isEmpty(simpleMessageListenerContainers)) {return;}for (AbstractMessageListenerContainer simpleMessageListenerContainer : simpleMessageListenerContainers) {simpleMessageListenerContainer.setAfterReceivePostProcessors(new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {logger.info("-----------------rabbitmq after postProcess message={}", JSON.toJSONString(message));return message;}});// 设置手动 ACKsimpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);}
}

2.4 事务消息

不建议使用 rabbitmq 事务消息,对性能非常影响。建议通过消息发送确认机制实现事务。

http://www.ritt.cn/news/22044.html

相关文章:

  • 学习建站的网站夫唯老师seo
  • 广州红盾信息门户网站东莞seo建站如何推广
  • 网站制作网站开发ple id充值百度交易平台官网
  • 椒江网站制作网站建设策划书
  • 重庆南岸营销型网站建设公司推荐seo产品优化免费软件
  • 个人软件外包接单平台宁波网站制作优化服务公司
  • 华为云域名注册seo学徒
  • 关于做花茶网站的策划书网站查询域名入口
  • 嘉兴高端网站建设有限公司女装关键词排名
  • 网站建设网站自助建设信息流优化师工作内容
  • 网络安全网站游戏广告联盟平台
  • qq推广开通南宁百度seo推广
  • 58同城青岛网站建设新冠疫情最新消息
  • 国外经典手机网站设计东莞百度推广排名
  • 江西头条新闻今天seo技术培训沈阳
  • 湛江专门做网站今日新闻头条最新消息
  • 网站内部资源推广牛排seo系统
  • qq怎么做自己的网站seo是什么职务
  • 网站建设和seo的工作好不好安阳企业网站优化外包
  • 网站建设价钱最新疫情最新消息
  • 高端html5网站建设织梦模板营业推广的目标通常是
  • 湖南的商城网站建设关键词seo价格
  • 长沙 网站开发报价排名优化哪家专业
  • 网站后台登陆路径登封网络推广公司
  • wordpress禁用谷歌的插件合肥网站seo整站优化
  • 聚名网域名转出搜狗搜索引擎优化指南
  • 潍坊网站建设wf3网片
  • php旅游网站开发背景电商运营培训机构哪家好
  • 个性化网站建设seo搜索引擎入门教程
  • 网站响应式好吗东莞做一个企业网站