✅P259_商城业务-消息队列-RabbitListener-RabbitHandler接收消息
商城业务-消息队列-RabbitListener&RabbitHandler接收消息
@RabbitListener
@RabbitListener使用前提:必须有@EnableRabbit并且标注方法的类必须在组件中(启动类加上@EnableRabbit注解)
@RabbitListener标注类上监听多个队列
@RabbitHandler标注在方法上用于接受不同类型的消息对象
示例
cfmall-order/src/main/java/com/gyz/cfmall/order/service/impl/OrderServiceImpl.java
@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {
	@RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Object message) {
        System.out.println("接收到消息...内容:" + message + "===>类型:" + message.getClass());
    }
}启动订单服务,打印如下:
接收到消息...内容:(Body:'{"id":1,"name":"hahaha","sort":1,"status":0,"createTime":1694764377331}' MessageProperties [headers={__TypeId__=com.gyz.cfmall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=new-direct-change, receivedRoutingKey=queuesChange, deliveryTag=1, consumerTag=amq.ctag-7wgm24_4RakC5ay-BPvFtA, consumerQueue=new-queus])===>类型:class org.springframework.amqp.core.Messagemessage组成:①消息体+消息头 ②class org.springframework.amqp.core.Message类型的对象
方法的参数类型
1.class org.springframework.amqp.core.Message类型的对象
    @RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message msg) {
        //获取消息体
        byte[] body = msg.getBody();
        //获取消息头
        MessageProperties messageProperties = msg.getMessageProperties();
        System.out.println("接收到消息...内容:" + msg + "===>类型:" + msg.getClass());
    }2.T<发送消息的类型> :假如发送消息的类型为 OrderReturnReasonEntity 则接受的消息类型也可以为 OrderReturnReasonEntity
    @RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message message, OrderReturnReasonEntity entity) {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
    }输出:
接收到消息...内容:OrderReturnReasonEntity(id=1, name=hahaha, sort=1, status=0, createTime=Fri Sep 15 15:45:14 GMT+08:00 2023)3.Channel channel:当前传输数据的通道
    @RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message message, OrderReturnReasonEntity entity, Channel channel) {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
    }模拟多个客户端接收消息
多个客户端监听Queue。只要收到消息,队列就删除消息,而且只能有一个客户端收到此消息的场景
1.订单服务启动多个,同一个消息,只能有一个客户端收到
新创建一个订单服务,操作如下:


新建完成启动订单服务CfmallOrderApplication--8901。
接收消息代码如下:
com.gyz.cfmall.order.service.impl.OrderServiceImpl#receiveMessage
    @RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message message, OrderReturnReasonEntity entity, Channel channel) {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
        System.out.println("消息处理完成" + entity.getName());
    }发送消息代码如下:
com.gyz.cfmall.order.controller.RabbitController#sendMessage
package com.gyz.cfmall.order.controller;
import com.gyz.cfmall.order.entity.OrderReturnReasonEntity;
import com.gyz.common.utils.R;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
/**
 * @author gong_yz
 * @Description
 * @Date 2023/9/15
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMessage")
    public R sendMessage(@RequestParam(value = "num", defaultValue = "10") Integer num) {
        for (int i = 0; i < num; i++) {
            OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setName("哈哈" + i);
            orderReturnReasonEntity.setSort(1);
            orderReturnReasonEntity.setStatus(0);
            orderReturnReasonEntity.setCreateTime(new Date());
            rabbitTemplate.convertAndSend("new-direct-change", "queuesChange", orderReturnReasonEntity);
            System.out.println("消息发送成功");
        }
        return R.ok();
    }
}测试结果:同一个消息,只能有一个客户端收到


2. 只有当一个消息完全处理完,方法运行结束,客户端才可以接收下一个消息
接收消息代码:
com.gyz.cfmall.order.service.impl.OrderServiceImpl#receiveMessage
	@RabbitListener(queues = {"new-queus"})
    public void receiveMessage(Message message, OrderReturnReasonEntity entity) throws InterruptedException {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
        Thread.sleep(3000);
        System.out.println("消息处理完成==>" + entity.getName());
    }测试结果:

@RabbitHandler
@RabbitListener标注类上监听多个队列
@RabbitHandler标注在方法上用于接受不同类型的消息对象
模拟向队列发送不同消息对象
发送消息代码:cfmall-order/src/main/java/com/gyz/cfmall/order/controller/RabbitController.java
package com.gyz.cfmall.order.controller;
import com.gyz.cfmall.order.entity.OrderEntity;
import com.gyz.cfmall.order.entity.OrderReturnReasonEntity;
import com.gyz.common.utils.R;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.UUID;
/**
 * @author gong_yz
 * @Description
 * @Date 2023/9/15
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMessage")
    public R sendMessage(@RequestParam(value = "num", defaultValue = "10") Integer num) {
        for (int i = 0; i < num; i++) {
            if (i % 2 == 0) {
                OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
                orderReturnReasonEntity.setId(1L);
                orderReturnReasonEntity.setName("哈哈" + i);
                orderReturnReasonEntity.setSort(1);
                orderReturnReasonEntity.setStatus(0);
                orderReturnReasonEntity.setCreateTime(new Date());
                rabbitTemplate.convertAndSend("new-direct-change", "queuesChange", orderReturnReasonEntity);
                System.out.println("OrderReturnReasonEntity消息发送成功");
            } else {
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("new-direct-change", "queuesChange", orderEntity);
                System.out.println("OrderEntity消息发送成功");
            }
        }
        return R.ok();
    }
}接收消息代码:
	/**
	* @RabbitHandler标注在方法上,重载区分不同的消息
	*/
	@RabbitHandler
    public void receiveMessage(Message message, OrderReturnReasonEntity entity) {
        byte[] body = message.getBody();
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接收到消息...内容:" + entity);
    }
    @RabbitHandler
    public void receiveMessage(OrderEntity orderEntity) {
        System.out.println("接收到消息...内容:" + orderEntity.getOrderSn());
    }测试结果如下
