mq延时队列使用
一、基本配置
- 导入依赖
<!--高级消息队列协议amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- application.yml配置
#rabbitmqrabbitmq:host: 192.168.56.10virtual-host: /port: 5672
- 启动类添加注解
@EnableRabbit
- 配置mq的json序列化
@Configuration
public class RabbitmqConfig {@AutowiredRabbitTemplate rabbitTemplate;@Beanpublic MessageConverter RabbitmqConvertJSON(){return new Jackson2JsonMessageConverter();}}
- 配置交换机、队列、绑定规则
@Beanpublic Exchange StockEventExchange(){//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new TopicExchange("stock-event-exchange",true,false);}@Beanpublic Queue StockReleaseStockQueue(){//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)return new Queue("stock.release.stock.queue",true,false,false);}@Beanpublic Queue StockDelayQueue(){Map<String,Object> map = new HashMap<>();//死信路由exchangemap.put("x-dead-letter-exchange","stock-event-exchange");//死信routing-keymap.put("x-dead-letter-routing-key","stock.release");//time to livemap.put("x-message-ttl",60000);return new Queue("stock.delay.queue",true,false,false,map);}@Beanpublic Binding StockLocked(){//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}@Beanpublic Binding StockRelease(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}
- 建立连接,即可在mq中生成交换机、队列、绑定规则
@RabbitListener(queues = {"stock.release.stock.queue"})public void ListenQueue(Message message, Channel channel){}
二、订单服务使用mq
1.生成队列、交换机、绑定规则
/*** 如果设置错误需要删掉错误的Queue重启服务即可,重启服务不会覆盖原有的Queue*/
@Configuration
public class MyMQConfig {@Beanpublic Queue OrderDelayQueue(){Map<String,Object> map = new HashMap<>();//死信路由map.put("x-dead-letter-exchange","order-event-exchange");//死信map.put("x-dead-letter-routing-key","order.release.order");//time to livemap.put("x-message-ttl",30000);//持久化,排它//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)return new Queue("order.delay.order",true,false,false,map);}@Beanpublic Queue OrderReleaseOrderQueue(){return new Queue("order.release.order.queue",true,false,false,null);}//选用topic类型交换机是因为需要binding多个队列@Beanpublic Exchange OrderEventExchange(){//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new TopicExchange("order-event-exchange",true,false,null);}@Beanpublic Binding OrderCreateOrder(){//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)return new Binding("order.delay.order", Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding OrderReleaseOrder(){return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}
2.生产者发送消息
@ResponseBody@GetMapping("/test/queue")public String testQueue(){OrderEntity orderEntity = new OrderEntity();orderEntity.setOrderSn(UUID.randomUUID().toString());//给队列发消息,指定routing key//convertAndSend(String exchange, String routingKey, Object object)rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);return "给mq发消息完成";}
3.订阅队列
@RabbitListener(queues = {"order.release.order.queue"})public void ListenQueue(Channel channel, Message message, OrderEntity orderEntity) throws IOException {//因为配置了手动ack,所有这里需要签收消息//basicAck(long deliveryTag, boolean multiple)channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("手动过期的订单信息,准备关闭订单:"+orderEntity.getOrderSn());}
三、库存服务使用mq
@Configuration
public class RabbitmqConfig {@AutowiredRabbitTemplate rabbitTemplate;@RabbitListener(queues = {"stock.release.stock.queue"})public void ListenQueue(Message message, Channel channel){}/*** json序列化*/@Beanpublic MessageConverter RabbitmqConvertJSON(){return new Jackson2JsonMessageConverter();}@Beanpublic Exchange StockEventExchange(){//TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new TopicExchange("stock-event-exchange",true,false);}@Beanpublic Queue StockReleaseStockQueue(){//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)return new Queue("stock.release.stock.queue",true,false,false);}@Beanpublic Queue StockDelayQueue(){Map<String,Object> map = new HashMap<>();//死信路由exchangemap.put("x-dead-letter-exchange","stock-event-exchange");//死信routing-keymap.put("x-dead-letter-routing-key","stock.release");//time to livemap.put("x-message-ttl",60000);return new Queue("stock.delay.queue",true,false,false,map);}@Beanpublic Binding StockLocked(){//Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments)return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}@Beanpublic Binding StockRelease(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}}
发布评论